Skip to main content

Overview

This guide demonstrates how to set up a simple conversational agent using LangGraph in Python, connected to the Thesys C1 API endpoint. We’ll build a basic graph that manages message history, a dummy weather tool and streams responses back. All the messages are stored in-memory using Langgraph MemorySaver The complete code for this guide can be found in the Thesys examples repository.

Backend Setup

1

Add Dependencies

Ensure LangGraph and its dependencies are in your requirements.txt. You’ll need langchain-openai to interact with the Thesys C1 API endpoint.
requirements.txt
langgraph
langchain-openai
langchain-core
# Add other necessary dependencies like fastapi, uvicorn, python-dotenv, sse-starlette
2

Define the Graph (`graph.py`)

Create graph.py to define the agent’s structure and state management.Agent State (AgentState)The AgentState includes messages annotated with add_messages for automatic history updates, and a response_id string. This response_id is crucial as it’s passed from the @thesysai/genui-sdk frontend with each user message and needs to be assigned to the corresponding AI response message for the UI to track it correctly.
graph.py (AgentState Definition)
class AgentState(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]
    response_id: str
Model and Agent Node (call_model)Next, initialize the ChatOpenAI model, pointing it to your Thesys C1 API endpoint and binding any necessary tools. The call_model function invokes the model, and importantly, checks if the response is the final AIMessage which will be sent to User. If it is, it updates the assistant id with the response_id present in the state.
graph.py (Model Init & call_model)
model = ChatOpenAI(
    model="c1/anthropic/claude-sonnet-4/v-20250617",
    temperature=0,
    base_url=os.getenv("THESYS_API_BASE_URL", "https://api.thesys.dev/v1/embed"),
    api_key=os.getenv("THESYS_API_KEY"),
).bind_tools(runnable_tools)

async def call_model(state: AgentState):
    messages = state["messages"]
    response = await model.ainvoke(messages)
    # Assign the response_id from state if it's a direct AIMessage response
    if isinstance(response, AIMessage) and not response.tool_calls:
        response.id = state["response_id"]
    return {"messages": [response]}
Full Graph ImplementationThe rest of the graph.py file involves setting up the ToolNode, defining the should_continue logic for routing between the agent and tools, building the StateGraph, adding nodes and edges, and finally compiling the graph with a MemorySaver checkpointer.
graph.py
import os
from typing import TypedDict, Annotated, List
from langchain_core.messages import AnyMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from dotenv import load_dotenv
# Assume tools.py defines runnable_tools
from tools import runnable_tools

load_dotenv()

# 1. Define the State
class AgentState(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]
    response_id: str

# 2. Initialize Model and Tools
model = ChatOpenAI(
    model="c1/anthropic/claude-sonnet-4/v-20250617",
    temperature=0,
    # Ensure this points to your Thesys C1 API endpoint
    base_url=os.getenv("THESYS_API_BASE_URL", "https://api.thesys.dev/v1/embed"),
    api_key=os.getenv("THESYS_API_KEY"),
).bind_tools(runnable_tools)

tool_node = ToolNode(runnable_tools)

# 3. Define Nodes
async def call_model(state: AgentState):
    messages = state["messages"]
    response = await model.ainvoke(messages)
    # Assign the response_id from state if it's a direct AIMessage response
    if isinstance(response, AIMessage) and not response.tool_calls:
        response.id = state["response_id"]
    return {"messages": [response]}

# 4. Define Conditional Logic
def should_continue(state: AgentState):
    last_message = state["messages"][-1]
    if isinstance(last_message, AIMessage) and last_message.tool_calls:
        return "tools"
    return END

# 5. Build the Graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", END: END},
)
workflow.add_edge("tools", "agent")

# 6. Compile with Persistence
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
3

Implement Thread Service (`thread_service.py`)

Create thread_service.py to handle thread metadata and message retrieval/updates for the UI.Thread Metadata StorageThis service uses an in-memory dictionary (_thread_metadata_store) to store thread metadata. The ThreadMetadata model holds the title and creation timestamp, while ThreadInfo is used for sending thread details to the client.
thread_service.py (Metadata Models)
class ThreadMetadata(BaseModel):
    title: str
    createdAt: datetime = Field(default_factory=datetime.now(timezone.utc))

class ThreadInfo(BaseModel):
    threadId: str
    title: str
    createdAt: datetime

_thread_metadata_store: Dict[str, ThreadMetadata] = {}
Message Formatting and UpdatesThe service interacts with the compiled LangGraph app to fetch and format messages for the UI (get_formatted_ui_messages) using app.aget_state, and to update messages within the graph’s state (update_message) using app.update_state.
thread_service.py (Message Interaction)
class UIMessage(TypedDict): # Simplified representation for UI
    id: str
    role: Literal["user", "assistant", "system", "tool"]
    content: Optional[str]

async def get_formatted_ui_messages(thread_id: str) -> List[UIMessage]:
    config = {"configurable": {"thread_id": thread_id}}
    snapshot = await app.aget_state(config)
    # ... formatting logic ...
    return formatted_messages

async def update_message(thread_id: str, message: UIMessage) -> None:
    config = {"configurable": {"thread_id": thread_id}}
    # ... get snapshot, find message by id, update list ...
    app.update_state(config, {"messages": updated_raw_messages})
Full Service Implementation
thread_service.py
import uuid
import json
from datetime import datetime, timezone
from typing import Dict, List, Literal, Optional, Sequence, TypedDict

from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from pydantic import BaseModel, Field

from graph import app


class UIMessage(TypedDict):
    id: str
    role: Literal["user", "assistant", "system", "tool"]
    content: Optional[str]

# Metadata for each thread (stored in memory)
class ThreadMetadata(BaseModel):
    title: str
    createdAt: datetime = Field(default_factory=datetime.now(timezone.utc))

# Information about each thread to be sent to the client
class ThreadInfo(BaseModel):
    threadId: str
    title: str
    createdAt: datetime

# Stores metadata {thread_id: ThreadMetadata}
_thread_metadata_store: Dict[str, ThreadMetadata] = {}

def create_thread(title: str) -> ThreadInfo:
    """Creates a new thread with a unique ID and initial metadata."""
    thread_id = str(uuid.uuid4())
    metadata = ThreadMetadata(title=title)
    _thread_metadata_store[thread_id] = metadata
    print(f"In-memory thread created: {thread_id}, Title: {title}")
    return ThreadInfo(
        threadId=thread_id,
        title=metadata.title,
        createdAt=metadata.createdAt
    )

def get_thread_list() -> List[ThreadInfo]:
    """Retrieves a list of all threads, sorted by creation date descending."""
    threads = [
        ThreadInfo(threadId=tid, title=meta.title, createdAt=meta.createdAt)
        for tid, meta in _thread_metadata_store.items()
    ]
    threads.sort(key=lambda t: t.createdAt, reverse=True)
    print(f"Fetched in-memory thread list: {len(threads)} threads")
    return threads

def delete_thread(thread_id: str) -> bool:
    """Deletes a thread's metadata. Returns True if deleted, False otherwise."""
    if thread_id in _thread_metadata_store:
        del _thread_metadata_store[thread_id]
        print(f"In-memory thread metadata deleted: {thread_id}")
        return True
    else:
        print(f"Attempted to delete non-existent in-memory thread: {thread_id}")
        return False

def update_thread(thread_id: str, title: str) -> Optional[ThreadInfo]:
    """Updates the title of a thread. Returns updated ThreadInfo or None if not found."""
    metadata = _thread_metadata_store.get(thread_id)
    if metadata:
        metadata.title = title
        _thread_metadata_store[thread_id] = metadata # Update the store
        print(f"In-memory thread updated: {thread_id}, New Title: {title}")
        return ThreadInfo(
            threadId=thread_id,
            title=metadata.title,
            createdAt=metadata.createdAt
        )
    else:
        print(f"Attempted to update non-existent in-memory thread: {thread_id}")
        return None

def _format_message_content(content: any) -> Optional[str]:
    """Safely converts message content to a string."""
    if isinstance(content, str):
        return content
    elif isinstance(content, (list, dict)):
        return json.dumps(content)
    return str(content) if content is not None else None

async def get_formatted_ui_messages(thread_id: str) -> List[UIMessage]:
    """Retrieves messages from LangGraph state and formats them for the UI."""
    config = {"configurable": {"thread_id": thread_id}}
    snapshot = await app.aget_state(config)
    raw_messages: Sequence[BaseMessage] = snapshot.values.get("messages", []) if snapshot else []


    formatted_messages: List[UIMessage] = []
    for i, msg in enumerate(raw_messages):
        if isinstance(msg, HumanMessage) or (isinstance(msg, AIMessage) and not msg.tool_calls):
            formatted_messages.append(UIMessage(
                id=msg.id,
                role="user" if isinstance(msg, HumanMessage) else "assistant",
                content=_format_message_content(msg.content),
            ))
    return formatted_messages

async def update_message(thread_id: str, message: UIMessage) -> None:
    """Updates a message in the LangGraph state."""
    config = {"configurable": {"thread_id": thread_id}}
    snapshot = await app.aget_state(config)
    raw_messages: Sequence[BaseMessage] = snapshot.values.get("messages", []) if snapshot else []
    updated_raw_messages = list(raw_messages) # Create mutable copy
    for i, msg in enumerate(updated_raw_messages):
        if msg.id == message["id"]:
            print(f"Updating message in state: ID {message['id']}")
            # Reconstruct the message object based on role
            if message["role"] == "user":
                 updated_raw_messages[i] = HumanMessage(content=message["content"], id=message["id"])
            elif message["role"] == "assistant":
                 updated_raw_messages[i] = AIMessage(content=message["content"], id=message["id"])
            # Add handling for other roles if necessary
            break # Assuming IDs are unique per thread
    await app.aupdate_state(config, {"messages": updated_raw_messages})

4

Define API Endpoints (`main.py`)

Create main.py to expose the LangGraph agent and thread management via a FastAPI application.Core Chat Streaming Endpoint (/chat)The primary endpoint is /chat. It receives the user’s prompt, the threadId, and the responseId (generated by GenUI). It uses an async generator stream_langgraph_events to interact with the compiled LangGraph app. This function constructs the input message, includes the response_id in the graph input, streams events using app.astream_events with the correct thread_id config, and yields only the content chunks from on_chat_model_stream events. The /chat endpoint returns a StreamingResponse.
main.py (Core Chat Logic)
async def stream_langgraph_events(thread_id: str, prompt: Prompt, responseId: str) -> AsyncIterable[str]:
    config = {"configurable": {"thread_id": thread_id}}
    input_message = HumanMessage(content=prompt['content'], id=prompt['id'])
    graph_input = {"messages": [input_message], "response_id": responseId}

    async for event in app.astream_events(graph_input, config=config, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                yield content # Stream content chunks

@fastapi_app.post("/chat")
async def chat_endpoint(request: ChatRequest):
    return StreamingResponse(
        stream_langgraph_events(request.threadId, request.prompt, request.responseId),
        media_type="text/event-stream",
    )
Thread Management EndpointsAdditional endpoints (/threads, /threads/{thread_id}, /threads/{thread_id}/messages, /threads/{thread_id}/message) are defined to handle thread metadata (create, list, update, delete) and message operations (get history, update message content). These endpoints primarily call the corresponding functions implemented in thread_service.py.Full API Implementation
main.py
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
from langchain_core.messages import HumanMessage
from typing import AsyncIterable, List, Literal, TypedDict
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware # Added for frontend interaction

from graph import app
import thread_service
from thread_service import ThreadInfo, UIMessage

class Prompt(TypedDict):
    role: Literal["user"]
    content: str
    id: str

class ChatRequest(BaseModel):
    prompt: Prompt
    threadId: str
    responseId: str

class CreateThreadRequest(BaseModel):
    name: str

class UpdateThreadRequest(BaseModel):
    name: str

# --- FastAPI App Instance --- #
fastapi_app = FastAPI(title="LangGraph Chat API", docs_url="/docs")

# --- CORS Middleware (Allow frontend requests) --- #
fastapi_app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Or specify frontend origin e.g., "http://localhost:3000"
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- Core Chat Streaming Logic --- #
async def stream_langgraph_events(thread_id: str, prompt: Prompt, responseId: str) -> AsyncIterable[str]:
    """Streams LangGraph events, yielding final content chunks."""
    config = {"configurable": {"thread_id": thread_id}}
    input_message = HumanMessage(content=prompt['content'], id=prompt['id'])
    graph_input = {"messages": [input_message], "response_id": responseId}

    async for event in app.astream_events(graph_input, config=config, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                yield content


@fastapi_app.post("/chat")
async def chat_endpoint(request: ChatRequest):
    """Handles the chat request using LangGraph stream."""
    return StreamingResponse(
        stream_langgraph_events(request.threadId, request.prompt, request.responseId),
        media_type="text/event-stream",
    )

# --- Thread Management Endpoints --- #

@fastapi_app.get("/threads", response_model=List[ThreadInfo])
def get_threads():
    """Returns a list of all threads (metadata only)."""
    return thread_service.get_thread_list()

@fastapi_app.post("/threads", response_model=ThreadInfo)
def create_thread_endpoint(request: CreateThreadRequest):
    """Creates a new thread metadata entry."""
    # Note: The name from the request should be used as the title
    return thread_service.create_thread(title=request.name)

@fastapi_app.get("/threads/{thread_id}/messages", response_model=List[UIMessage])
async def get_messages_endpoint(thread_id: str):
    """Returns formatted messages for a specific thread."""
    messages = await thread_service.get_formatted_ui_messages(thread_id)
    # Check if thread exists *at all* before returning empty list or messages
    if thread_id not in thread_service._thread_metadata_store:
         raise HTTPException(status_code=404, detail="Thread not found")
    return messages

@fastapi_app.delete("/threads/{thread_id}", status_code=204)
def delete_thread_endpoint(thread_id: str):
    """Deletes a thread's metadata."""
    deleted = thread_service.delete_thread(thread_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Thread not found")
    # No content returned on success (status 204)
    return None

@fastapi_app.put("/threads/{thread_id}", response_model=ThreadInfo)
def update_thread_endpoint(thread_id: str, request: UpdateThreadRequest):
    """Updates a thread's metadata (name)."""
    updated_thread = thread_service.update_thread(thread_id, title=request.name)
    if updated_thread is None:
        raise HTTPException(status_code=404, detail="Thread not found")
    return updated_thread

@fastapi_app.put("/threads/{thread_id}/message")
async def update_message_endpoint(thread_id: str, message: UIMessage = Body(...)):
    """Updates a specific message (e.g., feedback)."""
    # Check if thread exists before attempting update
    if thread_id not in thread_service._thread_metadata_store:
         raise HTTPException(status_code=404, detail="Thread not found")
    await thread_service.update_message(thread_id, message)
    return {"status": "Message update acknowledged"} # Return JSON confirmation

# --- Server Startup --- #
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(fastapi_app, host="0.0.0.0", port=8000)

Frontend Setup

To connect a React frontend, use the @thesysai/genui-sdk components and hooks.

Integrate with Generative UI SDK (App.tsx)

Use the useThreadListManager and useThreadManager hooks from the SDK to manage threads and messages. The example below shows how to initialize these hooks and pass them to the <C1Chat /> component. It also includes a useEffect to handle loading a specific thread based on a URL query parameter (threadId).
App.tsx
"use client";

import "@crayonai/react-ui/styles/index.css";
import {
  C1Chat,
  useThreadListManager,
  useThreadManager,
} from "@thesysai/genui-sdk";
import { useEffect } from "react";
// Import your API client functions and types
import {
    Thread,
    Message,
    UserMessage,
    fetchThreadList, // Calls GET /threads
    deleteThread,    // Calls DELETE /threads/{id}
    updateThreadAPI, // Calls PUT /threads/{id}
    createThreadAPI, // Calls POST /threads
    loadThread,      // Calls GET /threads/{id}/messages
    updateMessage    // Calls PUT /threads/{id}/message
} from "./client"; // Assume this file implements the fetch calls

export default function App() {
  const threadListManager = useThreadListManager({
    // Provide functions that call your backend API
    fetchThreadList,
    deleteThread,
    updateThread: async (updated: Thread): Promise<Thread> => {
      const result = await updateThreadAPI(updated.threadId, updated.title);
      // Map backend response back to SDK Thread type
      if (result) {
          return { ...updated, title: result.title, createdAt: result.createdAt };
      }
      return updated;
  },
    createThread: (firstMessage: UserMessage) => {
      // Use the message content as the initial thread name/title
      return createThreadAPI(firstMessage.message ?? "New Chat");
    },
    // Handlers for URL synchronization
    onSwitchToNew: () => {
      const currentPath = window.location.pathname;
      window.history.replaceState(null, '', currentPath);
    },
    onSelectThread: (threadId: string) => {
      const currentUrl = new URL(window.location.href);
      currentUrl.searchParams.set("threadId", threadId);
      window.history.replaceState(null, '', currentUrl.toString());
    },
  });

  const threadManager = useThreadManager({
    threadListManager,
    loadThread, // Function to load messages for the selected thread
    onUpdateMessage: ({ message }: { message: Message }) => {
      // Call API to update message (e.g., feedback)
      if (threadListManager.selectedThreadId) {
         updateMessage(threadListManager.selectedThreadId, message);
      }
    },
    // Note: The core chat streaming is handled internally by C1Chat
    // when it receives the threadManager prop.
  });

  // Effect to load thread from URL on initial mount
  useEffect(() => {
    const searchParams = new URLSearchParams(window.location.search);
    const threadIdFromUrl = searchParams.get("threadId");

    if (threadIdFromUrl && threadListManager.selectedThreadId !== threadIdFromUrl) {
      threadListManager.selectThread(threadIdFromUrl);
    }
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []); // Runs once on mount

  // --- Render Component ---
  return (
    <C1Chat
      threadManager={threadManager}
      threadListManager={threadListManager}
    />
  );
}

Running the Agent

Follow these steps to set up and run both the Python backend and the React frontend. 1. Backend Setup (Python/FastAPI)
  • Set API Key: Export your Thesys API key as an environment variable. Create a new key on the Thesys Console if you haven’t already.
    export THESYS_API_KEY=sk-th-...
    # Replace sk-th-... with your actual API key
    
  • Install Dependencies: Navigate to your backend project directory in the terminal.
    pip install -r requirements.txt
    
  • Run Server: Start the FastAPI server.
    uvicorn main:fastapi_app --reload
    
    The backend API will be available at http://localhost:8000..
2. Frontend Setup (React)
  • Install Dependencies: Navigate to your frontend project directory (where App.tsx and package.json are) in a separate terminal.
    npm install
    
  • Run Development Server: Start the React development server.
    npm run dev
    
    Open http://localhost:3000 (or the port specified by your setup) in your browser to view the chat interface.
3. Test it Out!