import { v7 as uuid7 } from "uuid";import { entrypoint, task, MemorySaver } from "@langchain/langgraph";// Task that checks if a number is evenconst isEven = task("isEven", async (number: number) => { return number % 2 === 0;});// Task that formats a messageconst formatMessage = task("formatMessage", async (isEven: boolean) => { return isEven ? "The number is even." : "The number is odd.";});// Create a checkpointer for persistenceconst checkpointer = new MemorySaver();const workflow = entrypoint( { checkpointer, name: "workflow" }, async (inputs: { number: number }) => { // Simple workflow to classify a number const even = await isEven(inputs.number); return await formatMessage(even); });// Run the workflow with a unique thread IDconst config = { configurable: { thread_id: uuid7() } };const result = await workflow.invoke({ number: 7 }, config);console.log(result);
Extended example: Compose an essay with an LLM
This example demonstrates how to use the @task and @entrypoint decorators
syntactically. Given that a checkpointer is provided, the workflow results will
be persisted in the checkpointer.
import { v7 as uuid7 } from "uuid";import { ChatOpenAI } from "@langchain/openai";import { entrypoint, task, MemorySaver } from "@langchain/langgraph";const model = new ChatOpenAI({ model: "gpt-3.5-turbo" });// Task: generate essay using an LLMconst composeEssay = task("composeEssay", async (topic: string) => { // Generate an essay about the given topic const response = await model.invoke([ { role: "system", content: "You are a helpful assistant that writes essays." }, { role: "user", content: `Write an essay about ${topic}.` } ]); return response.content as string;});// Create a checkpointer for persistenceconst checkpointer = new MemorySaver();const workflow = entrypoint( { checkpointer, name: "workflow" }, async (topic: string) => { // Simple workflow that generates an essay with an LLM return await composeEssay(topic); });// Execute the workflowconst config = { configurable: { thread_id: uuid7() } };const result = await workflow.invoke("the history of flight", config);console.log(result);
Tasks can be executed in parallel by invoking them concurrently and waiting for the results. This is useful for improving performance in IO bound tasks (e.g., calling APIs for LLMs).
This example demonstrates how to run multiple LLM calls in parallel using @task. Each call generates a paragraph on a different topic, and results are joined into a single text output.
import { v7 as uuid7 } from "uuid";import { ChatOpenAI } from "@langchain/openai";import { entrypoint, task, MemorySaver } from "@langchain/langgraph";// Initialize the LLM modelconst model = new ChatOpenAI({ model: "gpt-3.5-turbo" });// Task that generates a paragraph about a given topicconst generateParagraph = task("generateParagraph", async (topic: string) => { const response = await model.invoke([ { role: "system", content: "You are a helpful assistant that writes educational paragraphs." }, { role: "user", content: `Write a paragraph about ${topic}.` } ]); return response.content as string;});// Create a checkpointer for persistenceconst checkpointer = new MemorySaver();const workflow = entrypoint( { checkpointer, name: "workflow" }, async (topics: string[]) => { // Generates multiple paragraphs in parallel and combines them const paragraphs = await Promise.all(topics.map(generateParagraph)); return paragraphs.join("\n\n"); });// Run the workflowconst config = { configurable: { thread_id: uuid7() } };const result = await workflow.invoke(["quantum computing", "climate change", "history of aviation"], config);console.log(result);
This example uses LangGraph’s concurrency model to improve execution time, especially when tasks involve I/O like LLM completions.
The Functional API uses the same streaming mechanism as the Graph API. Please
read the streaming guide section for more details.Example of using the streaming API to stream both updates and custom data.
import { MemorySaver, entrypoint, task, RetryPolicy,} from "@langchain/langgraph";// This variable is just used for demonstration purposes to simulate a network failure.// It's not something you will have in your actual code.let attempts = 0;// Let's configure the RetryPolicy to retry on ValueError.// The default RetryPolicy is optimized for retrying specific network errors.const retryPolicy: RetryPolicy = { retryOn: (error) => error instanceof Error };const getInfo = task( { name: "getInfo", retry: retryPolicy, }, () => { attempts += 1; if (attempts < 2) { throw new Error("Failure"); } return "OK"; });const checkpointer = new MemorySaver();const main = entrypoint( { checkpointer, name: "main" }, async (inputs: Record<string, any>) => { return await getInfo(); });const config = { configurable: { thread_id: "1", },};await main.invoke({ any_input: "foobar" }, config);
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";// This variable is just used for demonstration purposes to simulate a network failure.// It's not something you will have in your actual code.let attempts = 0;const getInfo = task("getInfo", async () => { /** * Simulates a task that fails once before succeeding. * Throws an exception on the first attempt, then returns "OK" on subsequent tries. */ attempts += 1; if (attempts < 2) { throw new Error("Failure"); // Simulate a failure on the first attempt } return "OK";});// Initialize an in-memory checkpointer for persistenceconst checkpointer = new MemorySaver();const slowTask = task("slowTask", async () => { /** * Simulates a slow-running task by introducing a 1-second delay. */ await new Promise((resolve) => setTimeout(resolve, 1000)); return "Ran slow task.";});const main = entrypoint( { checkpointer, name: "main" }, async (inputs: Record<string, any>) => { /** * Main workflow function that runs the slowTask and getInfo tasks sequentially. * * Parameters: * - inputs: Record<string, any> containing workflow input values. * * The workflow first executes `slowTask` and then attempts to execute `getInfo`, * which will fail on the first invocation. */ const slowTaskResult = await slowTask(); // Blocking call to slowTask await getInfo(); // Exception will be raised here on the first attempt return slowTaskResult; });// Workflow execution configuration with a unique thread identifierconst config = { configurable: { thread_id: "1", // Unique identifier to track workflow execution },};// This invocation will take ~1 second due to the slowTask executiontry { // First invocation will raise an exception due to the `getInfo` task failing await main.invoke({ any_input: "foobar" }, config);} catch (err) { // Handle the failure gracefully}
When we resume execution, we won’t need to re-run the slowTask as its result is already saved in the checkpoint.
interrupt() is called inside a task, enabling a human to review and edit the output of the previous task. The results of prior tasks— in this case step_1— are persisted, so that they are not run again following the interrupt.Let’s send in a query string:
Note that we’ve paused with an interrupt after step_1. The interrupt provides instructions to resume the run. To resume, we issue a Command containing the data expected by the human_feedback task.
// Continue executionfor await (const event of await graph.stream( new Command({ resume: "baz" }), config)) { console.log(event); console.log("\n");}
After resuming, the run proceeds through the remaining step and terminates as expected.
To review tool calls before execution, we add a review_tool_call function that calls interrupt. When this function is called, execution will be paused until we issue a command to resume it.Given a tool call, our function will interrupt for human review. At that point we can either:
Accept the tool call
Revise the tool call and continue
Generate a custom tool message (e.g., instructing the model to re-format its tool call)
import { ToolCall } from "@langchain/core/messages/tool";import { ToolMessage } from "@langchain/core/messages";function reviewToolCall(toolCall: ToolCall): ToolCall | ToolMessage { // Review a tool call, returning a validated version const humanReview = interrupt({ question: "Is this correct?", tool_call: toolCall, }); const reviewAction = humanReview.action; const reviewData = humanReview.data; if (reviewAction === "continue") { return toolCall; } else if (reviewAction === "update") { const updatedToolCall = { ...toolCall, args: reviewData }; return updatedToolCall; } else if (reviewAction === "feedback") { return new ToolMessage({ content: reviewData, name: toolCall.name, tool_call_id: toolCall.id, }); } throw new Error(`Unknown review action: ${reviewAction}`);}
We can now update our entrypoint to review the generated tool calls. If a tool call is accepted or revised, we execute in the same way as before. Otherwise, we just append the ToolMessage supplied by the human. The results of prior tasks—in this case the initial model call—are persisted, so that they are not run again following the interrupt.
import { MemorySaver, entrypoint, interrupt, Command, addMessages,} from "@langchain/langgraph";import { ToolMessage, AIMessage, BaseMessage } from "@langchain/core/messages";const checkpointer = new MemorySaver();const agent = entrypoint( { checkpointer, name: "agent" }, async ( messages: BaseMessage[], previous?: BaseMessage[] ): Promise<BaseMessage> => { if (previous !== undefined) { messages = addMessages(previous, messages); } let modelResponse = await callModel(messages); while (true) { if (!modelResponse.tool_calls?.length) { break; } // Review tool calls const toolResults: ToolMessage[] = []; const toolCalls: ToolCall[] = []; for (let i = 0; i < modelResponse.tool_calls.length; i++) { const review = reviewToolCall(modelResponse.tool_calls[i]); if (review instanceof ToolMessage) { toolResults.push(review); } else { // is a validated tool call toolCalls.push(review); if (review !== modelResponse.tool_calls[i]) { modelResponse.tool_calls[i] = review; // update message } } } // Execute remaining tool calls const remainingToolResults = await Promise.all( toolCalls.map((toolCall) => callTool(toolCall)) ); // Append to message list messages = addMessages(messages, [ modelResponse, ...toolResults, ...remainingToolResults, ]); // Call model again modelResponse = await callModel(messages); } // Generate final response messages = addMessages(messages, modelResponse); return entrypoint.final({ value: modelResponse, save: messages }); });
const config = { configurable: { thread_id: "1", // optionally provide an ID for a specific checkpoint, // otherwise the latest checkpoint is shown // checkpoint_id: "1f029ca3-1f5b-6704-8004-820c16b69a5a" },};await graph.getState(config);
An example of a simple chatbot using the functional API and the InMemorySaver checkpointer.The bot is able to remember the previous conversation and continue from where it left off.
long-term memory allows storing information across different thread ids. This could be useful for learning information about a given user in one conversation and using it in another.