Alpha Notice: These docs cover the v1-alpha release. Content is incomplete and subject to change.For the latest stable version, see the current LangGraph Python or LangGraph JavaScript docs.
This guide demonstrates the basics of LangGraph’s Graph API. It walks through state, as well as composing common graph structures such as sequences, branches, and loops. It also covers LangGraph’s control features, including the Send API for map-reduce workflows and the Command API for combining state updates with “hops” across nodes.
Set up LangSmith for better debugging
Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started in the docs.
State in LangGraph can be defined using Zod schemas. Below we will use Zod. See this section for detail on using alternative approaches.By default, graphs will have the same input and output schema, and the state determines that schema. See this section for how to define distinct input and output schemas.Let’s consider a simple example using messages. This represents a versatile formulation of state for many LLM applications. See our concepts page for more detail.
Copy
import { BaseMessage } from "@langchain/core/messages";import { MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z.array(z.custom<BaseMessage>()).register(registry, MessagesZodMeta), extraField: z.number(),});
This state tracks a list of message objects, as well as an extra integer field.
Let’s build an example graph with a single node. Our node is just a TypeScript function that reads our graph’s state and makes updates to it. The first argument to this function will always be the state:
This node simply appends a message to our message list, and populates an extra field.
Nodes should return updates to the state directly, instead of mutating the state.
Let’s next define a simple graph containing this node. We use StateGraph to define a graph that operates on this state. We then use addNode populate our graph.
Copy
import { StateGraph } from "@langchain/langgraph";const graph = new StateGraph(State) .addNode("node", node) .addEdge("__start__", "node") .compile();
LangGraph provides built-in utilities for visualizing your graph. Let’s inspect our graph. See this section for detail on visualization.
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
In this case, our graph just executes a single node. Let’s proceed with a simple invocation:
Copy
import { HumanMessage } from "@langchain/core/messages";const result = await graph.invoke({ messages: [new HumanMessage("Hi")], extraField: 0 });console.log(result);
Each key in the state can have its own independent reducer function, which controls how updates from nodes are applied. If no reducer function is explicitly specified then it is assumed that all updates to the key should override it.For Zod state schemas, we can define reducers by using the special .langgraph.reducer() method on the schema field.In the earlier example, our node updated the "messages" key in the state by appending a message to it. Below, we add a reducer to this key, such that updates are automatically appended:
This is a versatile representation of state for applications involving chat models. LangGraph includes this pre-built MessagesZodMeta for convenience, so that we can have:
Copy
import { MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta), extraField: z.number(),});
By default, StateGraph operates with a single schema, and all nodes are expected to communicate using that schema. However, it’s also possible to define distinct input and output schemas for a graph.When distinct schemas are specified, an internal schema will still be used for communication between nodes. The input schema ensures that the provided input matches the expected structure, while the output schema filters the internal data to return only the relevant information according to the defined output schema.Below, we’ll see how to define distinct input and output schema.
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import * as z from "zod";// Define the schema for the inputconst InputState = z.object({ question: z.string(),});// Define the schema for the outputconst OutputState = z.object({ answer: z.string(),});// Define the overall schema, combining both input and outputconst OverallState = InputState.merge(OutputState);// Build the graph with input and output schemas specifiedconst graph = new StateGraph({ input: InputState, output: OutputState, state: OverallState,}) .addNode("answerNode", (state) => { // Example answer and an extra key return { answer: "bye", question: state.question }; }) .addEdge(START, "answerNode") .addEdge("answerNode", END) .compile();// Invoke the graph with an input and print the resultconsole.log(await graph.invoke({ question: "hi" }));
Copy
{ answer: 'bye' }
Notice that the output of invoke only includes the output schema.
In some cases, you may want nodes to exchange information that is crucial for intermediate logic but doesn’t need to be part of the main schema of the graph. This private data is not relevant to the overall input/output of the graph and should only be shared between certain nodes.Below, we’ll create an example sequential graph consisting of three nodes (node_1, node_2 and node_3), where private data is passed between the first two steps (node_1 and node_2), while the third step (node_3) only has access to the public overall state.
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import * as z from "zod";// The overall state of the graph (this is the public state shared across nodes)const OverallState = z.object({ a: z.string(),});// Output from node1 contains private data that is not part of the overall stateconst Node1Output = z.object({ privateData: z.string(),});// The private data is only shared between node1 and node2const node1 = (state: z.infer<typeof OverallState>): z.infer<typeof Node1Output> => { const output = { privateData: "set by node1" }; console.log(`Entered node 'node1':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Node 2 input only requests the private data available after node1const Node2Input = z.object({ privateData: z.string(),});const node2 = (state: z.infer<typeof Node2Input>): z.infer<typeof OverallState> => { const output = { a: "set by node2" }; console.log(`Entered node 'node2':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Node 3 only has access to the overall state (no access to private data from node1)const node3 = (state: z.infer<typeof OverallState>): z.infer<typeof OverallState> => { const output = { a: "set by node3" }; console.log(`Entered node 'node3':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Connect nodes in a sequence// node2 accepts private data from node1, whereas// node3 does not see the private data.const graph = new StateGraph({ state: OverallState, nodes: { node1: { action: node1, output: Node1Output }, node2: { action: node2, input: Node2Input }, node3: { action: node3 }, }}) .addEdge(START, "node1") .addEdge("node1", "node2") .addEdge("node2", "node3") .addEdge("node3", END) .compile();// Invoke the graph with the initial stateconst response = await graph.invoke({ a: "set at start" });console.log(`\nOutput of graph invocation: ${JSON.stringify(response)}`);
Copy
Entered node 'node1': ut: {"a":"set at start"}. urned: {"privateData":"set by node1"}Entered node 'node2': ut: {"privateData":"set by node1"}. urned: {"a":"set by node2"}Entered node 'node3': ut: {"a":"set by node2"}. urned: {"a":"set by node3"}Output of graph invocation: {"a":"set by node3"}
Sometimes you want to be able to configure your graph when calling it. For example, you might want to be able to specify what LLM or system prompt to use at runtime, without polluting the graph state with these parameters.To add runtime configuration:
Specify a schema for your configuration
Add the configuration to the function signature for nodes or conditional edges
Pass the configuration into the graph.
See below for a simple example:
Copy
import { StateGraph, END, START } from "@langchain/langgraph";import * as z from "zod";// 1. Specify config schemaconst ContextSchema = z.object({ myRuntimeValue: z.string(),});// 2. Define a graph that accesses the config in a nodeconst StateSchema = z.object({ myStateValue: z.number(),});const graph = new StateGraph(StateSchema, ContextSchema) .addNode("node", (state, runtime) => { if (runtime?.context?.myRuntimeValue === "a") { return { myStateValue: 1 }; } else if (runtime?.context?.myRuntimeValue === "b") { return { myStateValue: 2 }; } else { throw new Error("Unknown values."); } }) .addEdge(START, "node") .addEdge("node", END) .compile();// 3. Pass in configuration at runtime:console.log(await graph.invoke({}, { context: { myRuntimeValue: "a" } }));console.log(await graph.invoke({}, { context: { myRuntimeValue: "b" } }));
Copy
{ myStateValue: 1 }{ myStateValue: 2 }
Extended example: specifying LLM at runtime
Below we demonstrate a practical example in which we configure what LLM to use at runtime. We will use both OpenAI and Anthropic models.
Copy
import { ChatOpenAI } from "@langchain/openai";import { ChatAnthropic } from "@langchain/anthropic";import { BaseMessage } from "@langchain/core/messages";import { MessagesZodMeta, StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import { RunnableConfig } from "@langchain/core/runnables";import * as z from "zod";const ConfigSchema = z.object({ modelProvider: z.string().default("anthropic"),});const MessagesZodState = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta),});const MODELS = { anthropic: new ChatAnthropic({ model: "claude-3-5-haiku-latest" }), openai: new ChatOpenAI({ model: "gpt-4o-mini" }),};const graph = new StateGraph(MessagesZodState, ConfigSchema) .addNode("model", async (state, config) => { const modelProvider = config?.configurable?.modelProvider || "anthropic"; const model = MODELS[modelProvider as keyof typeof MODELS]; const response = await model.invoke(state.messages); return { messages: [response] }; }) .addEdge(START, "model") .addEdge("model", END) .compile();// Usageconst inputMessage = { role: "user", content: "hi" };// With no configuration, uses default (Anthropic)const response1 = await graph.invoke({ messages: [inputMessage] });// Or, can set OpenAIconst response2 = await graph.invoke( { messages: [inputMessage] }, { configurable: { modelProvider: "openai" } },);console.log(response1.messages.at(-1)?.response_metadata?.model);console.log(response2.messages.at(-1)?.response_metadata?.model);
Copy
claude-3-5-haiku-20241022gpt-4o-mini-2024-07-18
Extended example: specifying model and system message at runtime
Below we demonstrate a practical example in which we configure two parameters: the LLM and system message to use at runtime.
Copy
import { ChatOpenAI } from "@langchain/openai";import { ChatAnthropic } from "@langchain/anthropic";import { SystemMessage, BaseMessage } from "@langchain/core/messages";import { MessagesZodMeta, StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const ConfigSchema = z.object({ modelProvider: z.string().default("anthropic"), systemMessage: z.string().optional(),});const MessagesZodState = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta),});const MODELS = { anthropic: new ChatAnthropic({ model: "claude-3-5-haiku-latest" }), openai: new ChatOpenAI({ model: "gpt-4o-mini" }),};const graph = new StateGraph(MessagesZodState, ConfigSchema) .addNode("model", async (state, config) => { const modelProvider = config?.configurable?.modelProvider || "anthropic"; const systemMessage = config?.configurable?.systemMessage; const model = MODELS[modelProvider as keyof typeof MODELS]; let messages = state.messages; if (systemMessage) { messages = [new SystemMessage(systemMessage), ...messages]; } const response = await model.invoke(messages); return { messages: [response] }; }) .addEdge(START, "model") .addEdge("model", END) .compile();// Usageconst inputMessage = { role: "user", content: "hi" };const response = await graph.invoke( { messages: [inputMessage] }, { configurable: { modelProvider: "openai", systemMessage: "Respond in Italian." } });for (const message of response.messages) { console.log(`${message.getType()}: ${message.content}`);}
There are many use cases where you may wish for your node to have a custom retry policy, for example if you are calling an API, querying a database, or calling an LLM, etc. LangGraph lets you add retry policies to nodes.To configure a retry policy, pass the retryPolicy parameter to the addNode. The retryPolicy parameter takes in a RetryPolicy object. Below we instantiate a RetryPolicy object with the default parameters and associate it with a node:
Copy
import { RetryPolicy } from "@langchain/langgraph";const graph = new StateGraph(State) .addNode("nodeName", nodeFunction, { retryPolicy: {} }) .compile();
By default, the retry policy retries on any exception except for the following:
TypeError
SyntaxError
ReferenceError
Extended example: customizing retry policies
Consider an example in which we are reading from a SQL database. Below we pass two different retry policies to nodes:
Copy
import Database from "better-sqlite3";import { ChatAnthropic } from "@langchain/anthropic";import { StateGraph, START, END, MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import { AIMessage, BaseMessage } from "@langchain/core/messages";import * as z from "zod";const MessagesZodState = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta),});// Create an in-memory databaseconst db: typeof Database.prototype = new Database(":memory:");const model = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620" });const callModel = async (state: z.infer<typeof MessagesZodState>) => { const response = await model.invoke(state.messages); return { messages: [response] };};const queryDatabase = async (state: z.infer<typeof MessagesZodState>) => { const queryResult: string = JSON.stringify( db.prepare("SELECT * FROM Artist LIMIT 10;").all(), ); return { messages: [new AIMessage({ content: "queryResult" })] };};const workflow = new StateGraph(MessagesZodState) // Define the two nodes we will cycle between .addNode("call_model", callModel, { retryPolicy: { maxAttempts: 5 } }) .addNode("query_database", queryDatabase, { retryPolicy: { retryOn: (e: any): boolean => { if (e instanceof Database.SqliteError) { // Retry on "SQLITE_BUSY" error return e.code === "SQLITE_BUSY"; } return false; // Don't retry on other errors }, }, }) .addEdge(START, "call_model") .addEdge("call_model", "query_database") .addEdge("query_database", END);const graph = workflow.compile();
Why split application steps into a sequence with LangGraph?
LangGraph makes it easy to add an underlying persistence layer to your application.
This allows state to be checkpointed in between the execution of nodes, so your LangGraph nodes govern:
How we can “rewind” and branch-off executions using LangGraph’s time travel features
They also determine how execution steps are streamed, and how your application is visualized
and debugged using LangGraph Studio.Let’s demonstrate an end-to-end example. We will create a sequence of three steps:
Populate a value in a key of the state
Update the same value
Populate a different value
Let’s first define our state. This governs the schema of the graph, and can also specify how to apply updates. See this section for more detail.In our case, we will just keep track of two values:
Copy
import * as z from "zod";const State = z.object({ value1: z.string(), value2: z.number(),});
Our nodes are just TypeScript functions that read our graph’s state and make updates to it. The first argument to this function will always be the state:
Note that when issuing updates to the state, each node can just specify the value of the key it wishes to update.By default, this will overwrite the value of the corresponding key. You can also use reducers to control how updates are processed— for example, you can append successive updates to a key instead. See this section for more detail.
Finally, we define the graph. We use StateGraph to define a graph that operates on this state.We will then use addNode and addEdge to populate our graph and define its control flow.
Specifying custom names
You can specify custom names for nodes using .addNode:
Copy
const graph = new StateGraph(State).addNode("myNode", step1).compile();
Note that:
.addEdge takes the names of nodes, which for functions defaults to node.name.
We must specify the entry point of the graph. For this we add an edge with the START node.
The graph halts when there are no more nodes to execute.
We next compile our graph. This provides a few basic checks on the structure of the graph (e.g., identifying orphaned nodes). If we were adding persistence to our application via a checkpointer, it would also be passed in here.LangGraph provides built-in utilities for visualizing your graph. Let’s inspect our sequence. See this guide for detail on visualization.
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
Let’s proceed with a simple invocation:
Copy
const result = await graph.invoke({ value1: "c" });console.log(result);
Copy
{ value1: 'a b', value2: 10 }
Note that:
We kicked off invocation by providing a value for a single state key. We must always provide a value for at least one key.
The value we passed in was overwritten by the first node.
Parallel execution of nodes is essential to speed up overall graph operation. LangGraph offers native support for parallel execution of nodes, which can significantly enhance the performance of graph-based workflows. This parallelization is achieved through fan-out and fan-in mechanisms, utilizing both standard edges and conditional_edges. Below are some examples showing how to add create branching dataflows that work for you.
In this example, we fan out from Node A to B and C and then fan in to D. With our state, we specify the reducer add operation. This will combine or accumulate values for the specific key in the State, rather than simply overwriting the existing value. For lists, this means concatenating the new list with the existing list. See the above section on state reducers for more detail on updating state with reducers.
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // The reducer makes this append-only aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Adding "A" to ${state.aggregate}`); return { aggregate: ["A"] };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Adding "B" to ${state.aggregate}`); return { aggregate: ["B"] };};const nodeC = (state: z.infer<typeof State>) => { console.log(`Adding "C" to ${state.aggregate}`); return { aggregate: ["C"] };};const nodeD = (state: z.infer<typeof State>) => { console.log(`Adding "D" to ${state.aggregate}`); return { aggregate: ["D"] };};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addNode("c", nodeC) .addNode("d", nodeD) .addEdge(START, "a") .addEdge("a", "b") .addEdge("a", "c") .addEdge("b", "d") .addEdge("c", "d") .addEdge("d", END) .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
With the reducer, you can see that the values added in each node are accumulated.
Copy
const result = await graph.invoke({ aggregate: [],});console.log(result);
Copy
Adding "A" to []Adding "B" to ['A']Adding "C" to ['A']Adding "D" to ['A', 'B', 'C']{ aggregate: ['A', 'B', 'C', 'D'] }
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. Because they are in the same step, node "d" executes after both "b" and "c" are finished.Importantly, updates from a parallel superstep may not be ordered consistently. If you need a consistent, predetermined ordering of updates from a parallel superstep, you should write the outputs to a separate field in the state together with a value with which to order them.
Exception handling?
LangGraph executes nodes within supersteps, meaning that while parallel branches are executed in parallel, the entire superstep is transactional. If any of these branches raises an exception, none of the updates are applied to the state (the entire superstep errors).Importantly, when using a checkpointer, results from successful nodes within a superstep are saved, and don’t repeat when resumed.If you have error-prone (perhaps want to handle flakey API calls), LangGraph provides two ways to address this:
You can write regular python code within your node to catch and handle exceptions.
You can set a retry_policy to direct the graph to retry nodes that raise certain types of exceptions. Only failing branches are retried, so you needn’t worry about performing redundant work.
Together, these let you perform parallel execution and fully control exception handling.
Set max concurrency
You can control the maximum number of concurrent tasks by setting max_concurrency in the configuration when invoking the graph.
If your fan-out should vary at runtime based on the state, you can use addConditionalEdges to select one or more paths using the graph state. See example below, where node a generates a state update that determines the following node.
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }), // Add a key to the state. We will set this key to determine // how we branch. which: z.string().register(registry, { reducer: { fn: (x, y) => y ?? x, }, }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Adding "A" to ${state.aggregate}`); return { aggregate: ["A"], which: "c" };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Adding "B" to ${state.aggregate}`); return { aggregate: ["B"] };};const nodeC = (state: z.infer<typeof State>) => { console.log(`Adding "C" to ${state.aggregate}`); return { aggregate: ["C"] };};const conditionalEdge = (state: z.infer<typeof State>): "b" | "c" => { // Fill in arbitrary logic here that uses the state // to determine the next node return state.which as "b" | "c";};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addNode("c", nodeC) .addEdge(START, "a") .addEdge("b", END) .addEdge("c", END) .addConditionalEdges("a", conditionalEdge) .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
Copy
const result = await graph.invoke({ aggregate: [] });console.log(result);
Copy
Adding "A" to []Adding "C" to ['A']{ aggregate: ['A', 'C'], which: 'c' }
Your conditional edges can route to multiple destination nodes. For example:
LangGraph supports map-reduce and other advanced branching patterns using the Send API. Here is an example of how to use it:
Copy
import { StateGraph, START, END, Send } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const OverallState = z.object({ topic: z.string(), subjects: z.array(z.string()), jokes: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, }), bestSelectedJoke: z.string(),});const generateTopics = (state: z.infer<typeof OverallState>) => { return { subjects: ["lions", "elephants", "penguins"] };};const generateJoke = (state: { subject: string }) => { const jokeMap: Record<string, string> = { lions: "Why don't lions like fast food? Because they can't catch it!", elephants: "Why don't elephants use computers? They're afraid of the mouse!", penguins: "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." }; return { jokes: [jokeMap[state.subject]] };};const continueToJokes = (state: z.infer<typeof OverallState>) => { return state.subjects.map((subject) => new Send("generateJoke", { subject }));};const bestJoke = (state: z.infer<typeof OverallState>) => { return { bestSelectedJoke: "penguins" };};const graph = new StateGraph(OverallState) .addNode("generateTopics", generateTopics) .addNode("generateJoke", generateJoke) .addNode("bestJoke", bestJoke) .addEdge(START, "generateTopics") .addConditionalEdges("generateTopics", continueToJokes) .addEdge("generateJoke", "bestJoke") .addEdge("bestJoke", END) .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
Copy
// Call the graph: here we call it to generate a list of jokesfor await (const step of await graph.stream({ topic: "animals" })) { console.log(step);}
Copy
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }{ bestJoke: { bestSelectedJoke: 'penguins' } }
When creating a graph with a loop, we require a mechanism for terminating execution. This is most commonly done by adding a conditional edge that routes to the END node once we reach some termination condition.You can also set the graph recursion limit when invoking or streaming the graph. The recursion limit sets the number of supersteps that the graph is allowed to execute before it raises an error. Read more about the concept of recursion limits here.Let’s consider a simple graph with a loop to better understand how these mechanisms work.
To return the last value of your state instead of receiving a recursion limit error, see the next section.
When creating a loop, you can include a conditional edge that specifies a termination condition:
Let’s define a graph with a simple loop. Note that we use a conditional edge to implement a termination condition.
Copy
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // The reducer makes this append-only aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Node A sees ${state.aggregate}`); return { aggregate: ["A"] };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Node B sees ${state.aggregate}`); return { aggregate: ["B"] };};// Define edgesconst route = (state: z.infer<typeof State>): "b" | typeof END => { if (state.aggregate.length < 7) { return "b"; } else { return END; }};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addEdge(START, "a") .addConditionalEdges("a", route) .addEdge("b", "a") .compile();
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
This architecture is similar to a ReAct agent in which node "a" is a tool-calling model, and node "b" represents the tools.In our route conditional edge, we specify that we should end after the "aggregate" list in the state passes a threshold length.Invoking the graph, we see that we alternate between nodes "a" and "b" before terminating once we reach the termination condition.
Copy
const result = await graph.invoke({ aggregate: [] });console.log(result);
Copy
Node A sees []Node B sees ['A']Node A sees ['A', 'B']Node B sees ['A', 'B', 'A']Node A sees ['A', 'B', 'A', 'B']Node B sees ['A', 'B', 'A', 'B', 'A']Node A sees ['A', 'B', 'A', 'B', 'A', 'B']{ aggregate: ['A', 'B', 'A', 'B', 'A', 'B', 'A'] }
In some applications, we may not have a guarantee that we will reach a given termination condition. In these cases, we can set the graph’s recursion limit. This will raise a GraphRecursionError after a given number of supersteps. We can then catch and handle this exception:
Combine control flow and state updates with Command
It can be useful to combine control flow (edges) and state updates (nodes). For example, you might want to BOTH perform state updates AND decide which node to go to next in the SAME node. LangGraph provides a way to do so by returning a Command object from node functions:
Copy
import { Command } from "@langchain/langgraph";const myNode = (state: State): Command => { return new Command({ // state update update: { foo: "bar" }, // control flow goto: "myOtherNode" });};
We show an end-to-end example below. Let’s create a simple graph with 3 nodes: A, B and C. We will first execute node A, and then decide whether to go to Node B or Node C next based on the output of node A.
Copy
import { StateGraph, START, Command } from "@langchain/langgraph";import * as z from "zod";// Define graph stateconst State = z.object({ foo: z.string(),});// Define the nodesconst nodeA = (state: z.infer<typeof State>): Command => { console.log("Called A"); const value = Math.random() > 0.5 ? "b" : "c"; // this is a replacement for a conditional edge function const goto = value === "b" ? "nodeB" : "nodeC"; // note how Command allows you to BOTH update the graph state AND route to the next node return new Command({ // this is the state update update: { foo: value }, // this is a replacement for an edge goto, });};const nodeB = (state: z.infer<typeof State>) => { console.log("Called B"); return { foo: state.foo + "b" };};const nodeC = (state: z.infer<typeof State>) => { console.log("Called C"); return { foo: state.foo + "c" };};
We can now create the StateGraph with the above nodes. Notice that the graph doesn’t have conditional edges for routing! This is because control flow is defined with Command inside nodeA.
You might have noticed that we used ends to specify which nodes nodeA can navigate to. This is necessary for the graph rendering and tells LangGraph that nodeA can navigate to nodeB and nodeC.
Copy
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
If we run the graph multiple times, we’d see it take different paths (A -> B or A -> C) based on the random choice in node A.
Copy
const result = await graph.invoke({ foo: "" });console.log(result);
If you are using subgraphs, you might want to navigate from a node within a subgraph to a different subgraph (i.e. a different node in the parent graph). To do so, you can specify graph=Command.PARENT in Command:
Copy
const myNode = (state: State): Command => { return new Command({ update: { foo: "bar" }, goto: "otherSubgraph", // where `otherSubgraph` is a node in the parent graph graph: Command.PARENT });};
Let’s demonstrate this using the above example. We’ll do so by changing nodeA in the above example into a single-node graph that we’ll add as a subgraph to our parent graph.
State updates with Command.PARENT
When you send updates from a subgraph node to a parent graph node for a key that’s shared by both parent and subgraph state schemas, you must define a reducer for the key you’re updating in the parent graph state. See the example below.
Copy
import { StateGraph, START, Command } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // NOTE: we define a reducer here foo: z.string().register(registry, { reducer: { fn: (x, y) => x + y, }, }),});const nodeA = (state: z.infer<typeof State>) => { console.log("Called A"); const value = Math.random() > 0.5 ? "nodeB" : "nodeC"; // note how Command allows you to BOTH update the graph state AND route to the next node return new Command({ update: { foo: "a" }, goto: value, // this tells LangGraph to navigate to nodeB or nodeC in the parent graph // NOTE: this will navigate to the closest parent graph relative to the subgraph graph: Command.PARENT, });};const subgraph = new StateGraph(State) .addNode("nodeA", nodeA, { ends: ["nodeB", "nodeC"] }) .addEdge(START, "nodeA") .compile();const nodeB = (state: z.infer<typeof State>) => { console.log("Called B"); // NOTE: since we've defined a reducer, we don't need to manually append // new characters to existing 'foo' value. instead, reducer will append these // automatically return { foo: "b" };};const nodeC = (state: z.infer<typeof State>) => { console.log("Called C"); return { foo: "c" };};const graph = new StateGraph(State) .addNode("subgraph", subgraph, { ends: ["nodeB", "nodeC"] }) .addNode("nodeB", nodeB) .addNode("nodeC", nodeC) .addEdge(START, "subgraph") .compile();
Copy
const result = await graph.invoke({ foo: "" });console.log(result);
A common use case is updating graph state from inside a tool. For example, in a customer support application you might want to look up customer information based on their account number or ID in the beginning of the conversation. To update the graph state from the tool, you can return Command(update={"my_custom_key": "foo", "messages": [...]}) from the tool:
Copy
import { tool } from "@langchain/core/tools";import { Command } from "@langchain/langgraph";import * as z from "zod";const lookupUserInfo = tool( async (input, config) => { const userId = config.configurable?.userId; const userInfo = getUserInfo(userId); return new Command({ update: { // update the state keys userInfo: userInfo, // update the message history messages: [{ role: "tool", content: "Successfully looked up user information", tool_call_id: config.toolCall.id }] } }); }, { name: "lookupUserInfo", description: "Use this to look up user information to better assist them with their questions.", schema: z.object({}), });
You MUST include messages (or any state key used for the message history) in Command.update when returning Command from a tool and the list of messages in messages MUST contain a ToolMessage. This is necessary for the resulting message history to be valid (LLM providers require AI messages with tool calls to be followed by the tool result messages).
If you are using tools that update state via Command, we recommend using prebuilt ToolNode which automatically handles tools returning Command objects and propagates them to the graph state. If you’re writing a custom node that calls tools, you would need to manually propagate Command objects returned by the tools as the update from the node.
Here we demonstrate how to visualize the graphs you create.You can visualize any arbitrary Graph, including StateGraph.Let’s create a simple example graph to demonstrate visualization.
Copy
import { StateGraph, START, END, MessagesZodMeta } from "@langchain/langgraph";import { BaseMessage } from "@langchain/core/messages";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta), value: z.number().register(registry, { reducer: { fn: (x, y) => x + y, }, }),});const app = new StateGraph(State) .addNode("node1", (state) => { return { value: state.value + 1 }; }) .addNode("node2", (state) => { return { value: state.value * 2 }; }) .addEdge(START, "node1") .addConditionalEdges("node1", (state) => { if (state.value < 10) { return "node2"; } return END; }) .addEdge("node2", "node1") .compile();