This guide demonstrates how to set up a simple conversational agent using LangGraph in Python, connected to the Thesys C1 API endpoint. We’ll build a basic graph that manages message history, a dummy weather tool and streams responses back.
All the messages are stored in-memory using Langgraph MemorySaver
The complete code for this guide can be found in the Thesys examples repository.
Ensure LangGraph and its dependencies are in your requirements.txt. You’ll need langchain-openai to interact with the Thesys C1 API endpoint.
requirements.txt
Copy
langgraphlangchain-openailangchain-core# Add other necessary dependencies like fastapi, uvicorn, python-dotenv, sse-starlette
2
Define the Graph (`graph.py`)
Create graph.py to define the agent’s structure and state management.
Agent State (AgentState)
The AgentState includes messages annotated with add_messages for automatic history updates, and a response_id string. This response_id is crucial as it’s passed from the @thesysai/genui-sdk frontend with each user message and needs to be assigned to the corresponding AI response message for the UI to track it correctly.
graph.py (AgentState Definition)
Copy
class AgentState(TypedDict): messages: Annotated[List[AnyMessage], add_messages] response_id: str
Model and Agent Node (call_model)
Next, initialize the ChatOpenAI model, pointing it to your Thesys C1 API endpoint and binding any necessary tools. The call_model function invokes the model, and importantly, checks if the response is the final AIMessage which will be sent to User. If it is, it updates the assistant id with the response_id present in the state.
graph.py (Model Init & call_model)
Copy
model = ChatOpenAI( model="c1-nightly", temperature=0, base_url=os.getenv("THESYS_API_BASE_URL", "https://api.thesys.dev/v1/embed"), api_key=os.getenv("THESYS_API_KEY"),).bind_tools(runnable_tools)async def call_model(state: AgentState): messages = state["messages"] response = await model.ainvoke(messages) # Assign the response_id from state if it's a direct AIMessage response if isinstance(response, AIMessage) and not response.tool_calls: response.id = state["response_id"] return {"messages": [response]}
Full Graph Implementation
The rest of the graph.py file involves setting up the ToolNode, defining the should_continue logic for routing between the agent and tools, building the StateGraph, adding nodes and edges, and finally compiling the graph with a MemorySaver checkpointer.
graph.py
Copy
import osfrom typing import TypedDict, Annotated, Listfrom langchain_core.messages import AnyMessage, AIMessagefrom langchain_openai import ChatOpenAIfrom langgraph.checkpoint.memory import MemorySaverfrom langgraph.graph import END, StateGraphfrom langgraph.graph.message import add_messagesfrom langgraph.prebuilt import ToolNodefrom dotenv import load_dotenv# Assume tools.py defines runnable_toolsfrom tools import runnable_tools load_dotenv()# 1. Define the Stateclass AgentState(TypedDict): messages: Annotated[List[AnyMessage], add_messages] response_id: str# 2. Initialize Model and Toolsmodel = ChatOpenAI( model="c1-nightly", temperature=0, # Ensure this points to your Thesys C1 API endpoint base_url=os.getenv("THESYS_API_BASE_URL", "https://api.thesys.dev/v1/embed"), api_key=os.getenv("THESYS_API_KEY"),).bind_tools(runnable_tools)tool_node = ToolNode(runnable_tools)# 3. Define Nodesasync def call_model(state: AgentState): messages = state["messages"] response = await model.ainvoke(messages) # Assign the response_id from state if it's a direct AIMessage response if isinstance(response, AIMessage) and not response.tool_calls: response.id = state["response_id"] return {"messages": [response]}# 4. Define Conditional Logicdef should_continue(state: AgentState): last_message = state["messages"][-1] if isinstance(last_message, AIMessage) and last_message.tool_calls: return "tools" return END# 5. Build the Graphworkflow = StateGraph(AgentState)workflow.add_node("agent", call_model)workflow.add_node("tools", tool_node)workflow.set_entry_point("agent")workflow.add_conditional_edges( "agent", should_continue, {"tools": "tools", END: END},)workflow.add_edge("tools", "agent")# 6. Compile with Persistencememory = MemorySaver()app = workflow.compile(checkpointer=memory)
3
Implement Thread Service (`thread_service.py`)
Create thread_service.py to handle thread metadata and message retrieval/updates for the UI.
Thread Metadata Storage
This service uses an in-memory dictionary (_thread_metadata_store) to store thread metadata. The ThreadMetadata model holds the title and creation timestamp, while ThreadInfo is used for sending thread details to the client.
The service interacts with the compiled LangGraph app to fetch and format messages for the UI (get_formatted_ui_messages) using app.aget_state, and to update messages within the graph’s state (update_message) using app.update_state.
import uuidimport jsonfrom datetime import datetime, timezonefrom typing import Dict, List, Literal, Optional, Sequence, TypedDictfrom langchain_core.messages import BaseMessage, HumanMessage, AIMessagefrom pydantic import BaseModel, Fieldfrom graph import appclass UIMessage(TypedDict): id: str role: Literal["user", "assistant", "system", "tool"] content: Optional[str]# Metadata for each thread (stored in memory)class ThreadMetadata(BaseModel): title: str createdAt: datetime = Field(default_factory=datetime.now(timezone.utc))# Information about each thread to be sent to the clientclass ThreadInfo(BaseModel): threadId: str title: str createdAt: datetime# Stores metadata {thread_id: ThreadMetadata}_thread_metadata_store: Dict[str, ThreadMetadata] = {}def create_thread(title: str) -> ThreadInfo: """Creates a new thread with a unique ID and initial metadata.""" thread_id = str(uuid.uuid4()) metadata = ThreadMetadata(title=title) _thread_metadata_store[thread_id] = metadata print(f"In-memory thread created: {thread_id}, Title: {title}") return ThreadInfo( threadId=thread_id, title=metadata.title, createdAt=metadata.createdAt )def get_thread_list() -> List[ThreadInfo]: """Retrieves a list of all threads, sorted by creation date descending.""" threads = [ ThreadInfo(threadId=tid, title=meta.title, createdAt=meta.createdAt) for tid, meta in _thread_metadata_store.items() ] threads.sort(key=lambda t: t.createdAt, reverse=True) print(f"Fetched in-memory thread list: {len(threads)} threads") return threadsdef delete_thread(thread_id: str) -> bool: """Deletes a thread's metadata. Returns True if deleted, False otherwise.""" if thread_id in _thread_metadata_store: del _thread_metadata_store[thread_id] print(f"In-memory thread metadata deleted: {thread_id}") return True else: print(f"Attempted to delete non-existent in-memory thread: {thread_id}") return Falsedef update_thread(thread_id: str, title: str) -> Optional[ThreadInfo]: """Updates the title of a thread. Returns updated ThreadInfo or None if not found.""" metadata = _thread_metadata_store.get(thread_id) if metadata: metadata.title = title _thread_metadata_store[thread_id] = metadata # Update the store print(f"In-memory thread updated: {thread_id}, New Title: {title}") return ThreadInfo( threadId=thread_id, title=metadata.title, createdAt=metadata.createdAt ) else: print(f"Attempted to update non-existent in-memory thread: {thread_id}") return Nonedef _format_message_content(content: any) -> Optional[str]: """Safely converts message content to a string.""" if isinstance(content, str): return content elif isinstance(content, (list, dict)): return json.dumps(content) return str(content) if content is not None else Noneasync def get_formatted_ui_messages(thread_id: str) -> List[UIMessage]: """Retrieves messages from LangGraph state and formats them for the UI.""" config = {"configurable": {"thread_id": thread_id}} snapshot = await app.aget_state(config) raw_messages: Sequence[BaseMessage] = snapshot.values.get("messages", []) if snapshot else [] formatted_messages: List[UIMessage] = [] for i, msg in enumerate(raw_messages): if isinstance(msg, HumanMessage) or (isinstance(msg, AIMessage) and not msg.tool_calls): formatted_messages.append(UIMessage( id=msg.id, role="user" if isinstance(msg, HumanMessage) else "assistant", content=_format_message_content(msg.content), )) return formatted_messagesasync def update_message(thread_id: str, message: UIMessage) -> None: """Updates a message in the LangGraph state.""" config = {"configurable": {"thread_id": thread_id}} snapshot = await app.aget_state(config) raw_messages: Sequence[BaseMessage] = snapshot.values.get("messages", []) if snapshot else [] updated_raw_messages = list(raw_messages) # Create mutable copy for i, msg in enumerate(updated_raw_messages): if msg.id == message["id"]: print(f"Updating message in state: ID {message['id']}") # Reconstruct the message object based on role if message["role"] == "user": updated_raw_messages[i] = HumanMessage(content=message["content"], id=message["id"]) elif message["role"] == "assistant": updated_raw_messages[i] = AIMessage(content=message["content"], id=message["id"]) # Add handling for other roles if necessary break # Assuming IDs are unique per thread await app.aupdate_state(config, {"messages": updated_raw_messages})
4
Define API Endpoints (`main.py`)
Create main.py to expose the LangGraph agent and thread management via a FastAPI application.
Core Chat Streaming Endpoint (/chat)
The primary endpoint is /chat. It receives the user’s prompt, the threadId, and the responseId (generated by GenUI). It uses an async generator stream_langgraph_events to interact with the compiled LangGraph app. This function constructs the input message, includes the response_id in the graph input, streams events using app.astream_events with the correct thread_id config, and yields only the content chunks from on_chat_model_stream events. The /chat endpoint returns a StreamingResponse.
Additional endpoints (/threads, /threads/{thread_id}, /threads/{thread_id}/messages, /threads/{thread_id}/message) are defined to handle thread metadata (create, list, update, delete) and message operations (get history, update message content). These endpoints primarily call the corresponding functions implemented in thread_service.py.
Full API Implementation
main.py
Copy
from fastapi import FastAPI, HTTPException, Bodyfrom pydantic import BaseModelfrom langchain_core.messages import HumanMessagefrom typing import AsyncIterable, List, Literal, TypedDictfrom fastapi.responses import StreamingResponsefrom fastapi.middleware.cors import CORSMiddleware # Added for frontend interactionfrom graph import appimport thread_servicefrom thread_service import ThreadInfo, UIMessageclass Prompt(TypedDict): role: Literal["user"] content: str id: strclass ChatRequest(BaseModel): prompt: Prompt threadId: str responseId: strclass CreateThreadRequest(BaseModel): name: strclass UpdateThreadRequest(BaseModel): name: str# --- FastAPI App Instance --- #fastapi_app = FastAPI(title="LangGraph Chat API", docs_url="/docs")# --- CORS Middleware (Allow frontend requests) --- #fastapi_app.add_middleware( CORSMiddleware, allow_origins=["*"], # Or specify frontend origin e.g., "http://localhost:3000" allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)# --- Core Chat Streaming Logic --- #async def stream_langgraph_events(thread_id: str, prompt: Prompt, responseId: str) -> AsyncIterable[str]: """Streams LangGraph events, yielding final content chunks.""" config = {"configurable": {"thread_id": thread_id}} input_message = HumanMessage(content=prompt['content'], id=prompt['id']) graph_input = {"messages": [input_message], "response_id": responseId} async for event in app.astream_events(graph_input, config=config, version="v1"): kind = event["event"] if kind == "on_chat_model_stream": content = event["data"]["chunk"].content if content: yield content@fastapi_app.post("/chat")async def chat_endpoint(request: ChatRequest): """Handles the chat request using LangGraph stream.""" return StreamingResponse( stream_langgraph_events(request.threadId, request.prompt, request.responseId), media_type="text/event-stream", )# --- Thread Management Endpoints --- #@fastapi_app.get("/threads", response_model=List[ThreadInfo])def get_threads(): """Returns a list of all threads (metadata only).""" return thread_service.get_thread_list()@fastapi_app.post("/threads", response_model=ThreadInfo)def create_thread_endpoint(request: CreateThreadRequest): """Creates a new thread metadata entry.""" # Note: The name from the request should be used as the title return thread_service.create_thread(title=request.name)@fastapi_app.get("/threads/{thread_id}/messages", response_model=List[UIMessage])async def get_messages_endpoint(thread_id: str): """Returns formatted messages for a specific thread.""" messages = await thread_service.get_formatted_ui_messages(thread_id) # Check if thread exists *at all* before returning empty list or messages if thread_id not in thread_service._thread_metadata_store: raise HTTPException(status_code=404, detail="Thread not found") return messages@fastapi_app.delete("/threads/{thread_id}", status_code=204)def delete_thread_endpoint(thread_id: str): """Deletes a thread's metadata.""" deleted = thread_service.delete_thread(thread_id) if not deleted: raise HTTPException(status_code=404, detail="Thread not found") # No content returned on success (status 204) return None@fastapi_app.put("/threads/{thread_id}", response_model=ThreadInfo)def update_thread_endpoint(thread_id: str, request: UpdateThreadRequest): """Updates a thread's metadata (name).""" updated_thread = thread_service.update_thread(thread_id, title=request.name) if updated_thread is None: raise HTTPException(status_code=404, detail="Thread not found") return updated_thread@fastapi_app.put("/threads/{thread_id}/message")async def update_message_endpoint(thread_id: str, message: UIMessage = Body(...)): """Updates a specific message (e.g., feedback).""" # Check if thread exists before attempting update if thread_id not in thread_service._thread_metadata_store: raise HTTPException(status_code=404, detail="Thread not found") await thread_service.update_message(thread_id, message) return {"status": "Message update acknowledged"} # Return JSON confirmation# --- Server Startup --- #if __name__ == "__main__": import uvicorn uvicorn.run(fastapi_app, host="0.0.0.0", port=8000)
Use the useThreadListManager and useThreadManager hooks from the SDK to manage threads and messages.
The example below shows how to initialize these hooks and pass them to the <C1Chat /> component. It also includes a useEffect to handle loading a specific thread based on a URL query parameter (threadId).
App.tsx
Copy
"use client";import "@crayonai/react-ui/styles/index.css";import { C1Chat, useThreadListManager, useThreadManager,} from "@thesysai/genui-sdk";import { useEffect } from "react";// Import your API client functions and typesimport { Thread, Message, UserMessage, fetchThreadList, // Calls GET /threads deleteThread, // Calls DELETE /threads/{id} updateThreadAPI, // Calls PUT /threads/{id} createThreadAPI, // Calls POST /threads loadThread, // Calls GET /threads/{id}/messages updateMessage // Calls PUT /threads/{id}/message} from "./client"; // Assume this file implements the fetch callsexport default function App() { const threadListManager = useThreadListManager({ // Provide functions that call your backend API fetchThreadList, deleteThread, updateThread: async (updated: Thread): Promise<Thread> => { const result = await updateThreadAPI(updated.threadId, updated.title); // Map backend response back to SDK Thread type if (result) { return { ...updated, title: result.title, createdAt: result.createdAt }; } return updated; }, createThread: (firstMessage: UserMessage) => { // Use the message content as the initial thread name/title return createThreadAPI(firstMessage.message ?? "New Chat"); }, // Handlers for URL synchronization onSwitchToNew: () => { const currentPath = window.location.pathname; window.history.replaceState(null, '', currentPath); }, onSelectThread: (threadId: string) => { const currentUrl = new URL(window.location.href); currentUrl.searchParams.set("threadId", threadId); window.history.replaceState(null, '', currentUrl.toString()); }, }); const threadManager = useThreadManager({ threadListManager, loadThread, // Function to load messages for the selected thread onUpdateMessage: ({ message }: { message: Message }) => { // Call API to update message (e.g., feedback) if (threadListManager.selectedThreadId) { updateMessage(threadListManager.selectedThreadId, message); } }, // Note: The core chat streaming is handled internally by C1Chat // when it receives the threadManager prop. }); // Effect to load thread from URL on initial mount useEffect(() => { const searchParams = new URLSearchParams(window.location.search); const threadIdFromUrl = searchParams.get("threadId"); if (threadIdFromUrl && threadListManager.selectedThreadId !== threadIdFromUrl) { threadListManager.selectThread(threadIdFromUrl); } // eslint-disable-next-line react-hooks/exhaustive-deps }, []); // Runs once on mount // --- Render Component --- return ( <C1Chat threadManager={threadManager} threadListManager={threadListManager} /> );}