LangGraph implements a streaming system to surface real-time updates. Streaming is crucial for enhancing the responsiveness of applications built on LLMs. By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.
LangGraph graphs expose the stream (sync) and astream (async) methods to yield streamed outputs as iterators. Pass one or more stream modes to control what data you receive.
Copy
for chunk in graph.stream( {"topic": "ice cream"}, stream_mode=["updates", "custom"], version="v2",): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node {node_name} updated: {state}") elif chunk["type"] == "custom": print(f"Status: {chunk['data']['status']}")
Output
Copy
Status: thinking of a joke...Node generate_joke updated: {'joke': 'Why did the ice cream go to school? To get a sundae education!'}
Full example
Copy
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.config import get_stream_writerclass State(TypedDict): topic: str joke: strdef generate_joke(state: State): writer = get_stream_writer() writer({"status": "thinking of a joke..."}) return {"joke": f"Why did the {state['topic']} go to school? To get a sundae education!"}graph = ( StateGraph(State) .add_node(generate_joke) .add_edge(START, "generate_joke") .add_edge("generate_joke", END) .compile())for chunk in graph.stream( {"topic": "ice cream"}, stream_mode=["updates", "custom"], version="v2",): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node {node_name} updated: {state}") elif chunk["type"] == "custom": print(f"Status: {chunk['data']['status']}")
Output
Copy
Status: thinking of a joke...Node generate_joke updated: {'joke': 'Why did the ice cream go to school? To get a sundae education!'}
Requires LangGraph >= 1.1. All examples on this page use version="v2".
Pass version="v2" to stream() or astream() to get a unified output format. Every chunk is a StreamPart dict with a consistent shape — regardless of stream mode, number of modes, or subgraph settings:
Copy
{ "type": "values" | "updates" | "messages" | "custom" | "checkpoints" | "tasks" | "debug", "ns": (), # namespace tuple, populated for subgraph events "data": ..., # the actual payload (type varies by stream mode)}
Each stream mode has a corresponding TypedDict containing ValuesStreamPart, UpdatesStreamPart, MessagesStreamPart, CustomStreamPart, CheckpointStreamPart, TasksStreamPart, DebugStreamPart. You can import these types from langgraph.types. The union type StreamPart is a disjoing union on part["type"], enabling full type narrowing in editors and type checkers.With v1 (default), the output format changes based on your streaming options (single mode returns raw data, multiple modes return (mode, data) tuples, subgraphs return (namespace, data) tuples). With v2, the format is always the same:
Copy
for chunk in graph.stream(inputs, stream_mode="updates", version="v2"): print(chunk["type"]) # "updates" print(chunk["ns"]) # () print(chunk["data"]) # {"node_name": {"key": "value"}}
The v2 format also enables type narrowing, which means you can filter chunks by chunk["type"] and get the correct payload type. Each branch narrows part["data"] to the specific type for that mode:
Copy
for part in graph.stream( {"topic": "ice cream"}, stream_mode=["values", "updates", "messages", "custom"], version="v2",): if part["type"] == "values": # ValuesStreamPart — full state snapshot after each step print(f"State: topic={part['data']['topic']}") elif part["type"] == "updates": # UpdatesStreamPart — only the changed keys from each node for node_name, state in part["data"].items(): print(f"Node `{node_name}` updated: {state}") elif part["type"] == "messages": # MessagesStreamPart — (message_chunk, metadata) from LLM calls msg, metadata = part["data"] print(msg.content, end="", flush=True) elif part["type"] == "custom": # CustomStreamPart — arbitrary data from get_stream_writer() print(f"Progress: {part['data']['progress']}%")
Use the stream modes updates and values to stream the state of the graph as it executes.
updates streams the updates to the state after each step of the graph.
values streams the full value of the state after each step of the graph.
Copy
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): topic: str joke: strdef refine_topic(state: State): return {"topic": state["topic"] + " and cats"}def generate_joke(state: State): return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(refine_topic) .add_node(generate_joke) .add_edge(START, "refine_topic") .add_edge("refine_topic", "generate_joke") .add_edge("generate_joke", END) .compile())
updates
values
Use this to stream only the state updates returned by the nodes after each step. The streamed outputs include the name of the node as well as the update.
Copy
for chunk in graph.stream( {"topic": "ice cream"}, stream_mode="updates", version="v2",): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node `{node_name}` updated: {state}")
Output
Copy
Node `refine_topic` updated: {'topic': 'ice cream and cats'}Node `generate_joke` updated: {'joke': 'This is a joke about ice cream and cats'}
Use this to stream the full state of the graph after each step.
Copy
for chunk in graph.stream( {"topic": "ice cream"}, stream_mode="values", version="v2",): if chunk["type"] == "values": print(f"topic: {chunk['data']['topic']}, joke: {chunk['data']['joke']}")
Output
Copy
topic: ice cream, joke:topic: ice cream and cats, joke:topic: ice cream and cats, joke: This is a joke about ice cream and cats
Use the messages streaming mode to stream Large Language Model (LLM) outputs token by token from any part of your graph, including nodes, tools, subgraphs, or tasks.The streamed output from messages mode is a tuple (message_chunk, metadata) where:
message_chunk: the token or message segment from the LLM.
metadata: a dictionary containing details about the graph node and LLM invocation.
If your LLM is not available as a LangChain integration, you can stream its outputs using custom mode instead. See use with any LLM for details.
Manual config required for async in Python < 3.11
When using Python < 3.11 with async code, you must explicitly pass RunnableConfig to ainvoke() to enable proper streaming. See Async with Python < 3.11 for details or upgrade to Python 3.11+.
Copy
from dataclasses import dataclassfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import StateGraph, START@dataclassclass MyState: topic: str joke: str = ""model = init_chat_model(model="gpt-4.1-mini")def call_model(state: MyState): """Call the LLM to generate a joke about a topic""" # Note that message events are emitted even when the LLM is run using .invoke rather than .stream model_response = model.invoke( [ {"role": "user", "content": f"Generate a joke about {state.topic}"} ] ) return {"joke": model_response.content}graph = ( StateGraph(MyState) .add_node(call_model) .add_edge(START, "call_model") .compile())# The "messages" stream mode streams LLM tokens with metadata# Use version="v2" for a unified StreamPart formatfor chunk in graph.stream( {"topic": "ice cream"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": message_chunk, metadata = chunk["data"] if message_chunk.content: print(message_chunk.content, end="|", flush=True)
You can associate tags with LLM invocations to filter the streamed tokens by LLM invocation.
Copy
from langchain.chat_models import init_chat_model# model_1 is tagged with "joke"model_1 = init_chat_model(model="gpt-4.1-mini", tags=['joke'])# model_2 is tagged with "poem"model_2 = init_chat_model(model="gpt-4.1-mini", tags=['poem'])graph = ... # define a graph that uses these LLMs# The stream_mode is set to "messages" to stream LLM tokens# The metadata contains information about the LLM invocation, including the tagsasync for chunk in graph.astream( {"topic": "cats"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] # Filter the streamed tokens by the tags field in the metadata to only include # the tokens from the LLM invocation with the "joke" tag if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True)
Extended example: filtering by tags
Copy
from typing import TypedDictfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import START, StateGraph# The joke_model is tagged with "joke"joke_model = init_chat_model(model="gpt-4.1-mini", tags=["joke"])# The poem_model is tagged with "poem"poem_model = init_chat_model(model="gpt-4.1-mini", tags=["poem"])class State(TypedDict): topic: str joke: str poem: strasync def call_model(state, config): topic = state["topic"] print("Writing joke...") # Note: Passing the config through explicitly is required for python < 3.11 # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks # The config is passed through explicitly to ensure the context vars are propagated correctly # This is required for Python < 3.11 when using async code. Please see the async section for more details joke_response = await joke_model.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], config, ) print("\n\nWriting poem...") poem_response = await poem_model.ainvoke( [{"role": "user", "content": f"Write a short poem about {topic}"}], config, ) return {"joke": joke_response.content, "poem": poem_response.content}graph = ( StateGraph(State) .add_node(call_model) .add_edge(START, "call_model") .compile())# The stream_mode is set to "messages" to stream LLM tokens# The metadata contains information about the LLM invocation, including the tagsasync for chunk in graph.astream( {"topic": "cats"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True)
To stream tokens only from specific nodes, use stream_mode="messages" and filter the outputs by the langgraph_node field in the streamed metadata:
Copy
# The "messages" stream mode streams LLM tokens with metadata# Use version="v2" for a unified StreamPart formatfor chunk in graph.stream( inputs, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] # Filter the streamed tokens by the langgraph_node field in the metadata # to only include the tokens from the specified node if msg.content and metadata["langgraph_node"] == "some_node_name": ...
Extended example: streaming LLM tokens from specific nodes
Copy
from typing import TypedDictfrom langgraph.graph import START, StateGraphfrom langchain_openai import ChatOpenAImodel = ChatOpenAI(model="gpt-4.1-mini")class State(TypedDict): topic: str joke: str poem: strdef write_joke(state: State): topic = state["topic"] joke_response = model.invoke( [{"role": "user", "content": f"Write a joke about {topic}"}] ) return {"joke": joke_response.content}def write_poem(state: State): topic = state["topic"] poem_response = model.invoke( [{"role": "user", "content": f"Write a short poem about {topic}"}] ) return {"poem": poem_response.content}graph = ( StateGraph(State) .add_node(write_joke) .add_node(write_poem) # write both the joke and the poem concurrently .add_edge(START, "write_joke") .add_edge(START, "write_poem") .compile())# The "messages" stream mode streams LLM tokens with metadata# Use version="v2" for a unified StreamPart formatfor chunk in graph.stream( {"topic": "cats"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] # Filter the streamed tokens by the langgraph_node field in the metadata # to only include the tokens from the write_poem node if msg.content and metadata["langgraph_node"] == "write_poem": print(msg.content, end="|", flush=True)
To send custom user-defined data from inside a LangGraph node or tool, follow these steps:
Use get_stream_writer to access the stream writer and emit custom data.
Set stream_mode="custom" when calling .stream() or .astream() to get the custom data in the stream. You can combine multiple modes (e.g., ["updates", "custom"]), but at least one must be "custom".
No get_stream_writer in async for Python < 3.11
In async code running on Python < 3.11, get_stream_writer will not work.
Instead, add a writer parameter to your node or tool and pass it manually.
See Async with Python < 3.11 for usage examples.
node
tool
Copy
from typing import TypedDictfrom langgraph.config import get_stream_writerfrom langgraph.graph import StateGraph, STARTclass State(TypedDict): query: str answer: strdef node(state: State): # Get the stream writer to send custom data writer = get_stream_writer() # Emit a custom key-value pair (e.g., progress update) writer({"custom_key": "Generating custom data inside node"}) return {"answer": "some data"}graph = ( StateGraph(State) .add_node(node) .add_edge(START, "node") .compile())inputs = {"query": "example"}# Set stream_mode="custom" to receive the custom data in the streamfor chunk in graph.stream(inputs, stream_mode="custom", version="v2"): if chunk["type"] == "custom": print(f"Custom event: {chunk['data']['custom_key']}")
Copy
from langchain.tools import toolfrom langgraph.config import get_stream_writer@tooldef query_database(query: str) -> str: """Query the database.""" # Access the stream writer to send custom data writer = get_stream_writer() # Emit a custom key-value pair (e.g., progress update) writer({"data": "Retrieved 0/100 records", "type": "progress"}) # perform query # Emit another custom key-value pair writer({"data": "Retrieved 100/100 records", "type": "progress"}) return "some-answer"graph = ... # define a graph that uses this tool# Set stream_mode="custom" to receive the custom data in the streamfor chunk in graph.stream(inputs, stream_mode="custom", version="v2"): if chunk["type"] == "custom": print(f"{chunk['data']['type']}: {chunk['data']['data']}")
To include outputs from subgraphs in the streamed outputs, you can set subgraphs=True in the .stream() method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.The outputs will be streamed as tuples (namespace, data), where namespace is a tuple with the path to the node where a subgraph is invoked, e.g. ("parent_node:<task_id>", "child_node:<task_id>").
v2 (LangGraph >= 1.1)
v1 (default)
With version="v2", subgraph events use the same StreamPart format. The ns field identifies the source:
Copy
for chunk in graph.stream( {"foo": "foo"}, subgraphs=True, stream_mode="updates", version="v2",): print(chunk["type"]) # "updates" print(chunk["ns"]) # () for root, ("node_name:<task_id>",) for subgraph print(chunk["data"]) # {"node_name": {"key": "value"}}
Copy
for chunk in graph.stream( {"foo": "foo"}, # Set subgraphs=True to stream outputs from subgraphs subgraphs=True, stream_mode="updates",): print(chunk)
Extended example: streaming from subgraphs
Copy
from langgraph.graph import START, StateGraphfrom typing import TypedDict# Define subgraphclass SubgraphState(TypedDict): foo: str # note that this key is shared with the parent graph state bar: strdef subgraph_node_1(state: SubgraphState): return {"bar": "bar"}def subgraph_node_2(state: SubgraphState): return {"foo": state["foo"] + state["bar"]}subgraph_builder = StateGraph(SubgraphState)subgraph_builder.add_node(subgraph_node_1)subgraph_builder.add_node(subgraph_node_2)subgraph_builder.add_edge(START, "subgraph_node_1")subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")subgraph = subgraph_builder.compile()# Define parent graphclass ParentState(TypedDict): foo: strdef node_1(state: ParentState): return {"foo": "hi! " + state["foo"]}builder = StateGraph(ParentState)builder.add_node("node_1", node_1)builder.add_node("node_2", subgraph)builder.add_edge(START, "node_1")builder.add_edge("node_1", "node_2")graph = builder.compile()for chunk in graph.stream( {"foo": "foo"}, stream_mode="updates", # Set subgraphs=True to stream outputs from subgraphs subgraphs=True, version="v2",): if chunk["type"] == "updates": if chunk["ns"]: print(f"Subgraph {chunk['ns']}: {chunk['data']}") else: print(f"Root: {chunk['data']}")
Use the checkpoints streaming mode to receive checkpoint events as the graph executes. Each checkpoint event has the same format as the output of get_state(). Requires a checkpointer.
Use the tasks streaming mode to receive task start and finish events as the graph executes. Task events include information about which node is running, its results, and any errors. Requires a checkpointer.
Use the debug streaming mode to stream as much information as possible throughout the execution of the graph. The streamed outputs include the name of the node as well as the full state.
Copy
for chunk in graph.stream( {"topic": "ice cream"}, stream_mode="debug", version="v2",): if chunk["type"] == "debug": print(chunk["data"])
The debug mode combines checkpoints and tasks events with additional metadata. Use checkpoints or tasks directly if you only need a subset of the debug information.
You can pass a list as the stream_mode parameter to stream multiple modes at once.With version="v2", every chunk is a StreamPart dict. Use chunk["type"] to distinguish between modes:
Copy
for chunk in graph.stream(inputs, stream_mode=["updates", "custom"], version="v2"): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node `{node_name}` updated: {state}") elif chunk["type"] == "custom": print(f"Custom event: {chunk['data']}")
You can use stream_mode="custom" to stream data from any LLM API—even if that API does not implement the LangChain chat model interface.This lets you integrate raw LLM clients or external services that provide their own streaming interfaces, making LangGraph highly flexible for custom setups.
Copy
from langgraph.config import get_stream_writerdef call_arbitrary_model(state): """Example node that calls an arbitrary model and streams the output""" # Get the stream writer to send custom data writer = get_stream_writer() # Assume you have a streaming client that yields chunks # Generate LLM tokens using your custom streaming client for chunk in your_custom_streaming_client(state["topic"]): # Use the writer to send custom data to the stream writer({"custom_llm_chunk": chunk}) return {"result": "completed"}graph = ( StateGraph(State) .add_node(call_arbitrary_model) # Add other nodes and edges as needed .compile())# Set stream_mode="custom" to receive the custom data in the streamfor chunk in graph.stream( {"topic": "cats"}, stream_mode="custom", version="v2",): if chunk["type"] == "custom": # The chunk data will contain the custom data streamed from the llm print(chunk["data"])
Extended example: streaming arbitrary chat model
Copy
import operatorimport jsonfrom typing import TypedDictfrom typing_extensions import Annotatedfrom langgraph.graph import StateGraph, STARTfrom openai import AsyncOpenAIopenai_client = AsyncOpenAI()model_name = "gpt-4.1-mini"async def stream_tokens(model_name: str, messages: list[dict]): response = await openai_client.chat.completions.create( messages=messages, model=model_name, stream=True ) role = None async for chunk in response: delta = chunk.choices[0].delta if delta.role is not None: role = delta.role if delta.content: yield {"role": role, "content": delta.content}# this is our toolasync def get_items(place: str) -> str: """Use this tool to list items one might find in a place you're asked about.""" writer = get_stream_writer() response = "" async for msg_chunk in stream_tokens( model_name, [ { "role": "user", "content": ( "Can you tell me what kind of items " f"i might find in the following place: '{place}'. " "List at least 3 such items separating them by a comma. " "And include a brief description of each item." ), } ], ): response += msg_chunk["content"] writer(msg_chunk) return responseclass State(TypedDict): messages: Annotated[list[dict], operator.add]# this is the tool-calling graph nodeasync def call_tool(state: State): ai_message = state["messages"][-1] tool_call = ai_message["tool_calls"][-1] function_name = tool_call["function"]["name"] if function_name != "get_items": raise ValueError(f"Tool {function_name} not supported") function_arguments = tool_call["function"]["arguments"] arguments = json.loads(function_arguments) function_response = await get_items(**arguments) tool_message = { "tool_call_id": tool_call["id"], "role": "tool", "name": function_name, "content": function_response, } return {"messages": [tool_message]}graph = ( StateGraph(State) .add_node(call_tool) .add_edge(START, "call_tool") .compile())
Let’s invoke the graph with an AIMessage that includes a tool call:
If your application mixes models that support streaming with those that do not, you may need to explicitly disable streaming for
models that do not support it.Set streaming=False when initializing the model.
init_chat_model
Chat model interface
Copy
from langchain.chat_models import init_chat_modelmodel = init_chat_model( "claude-sonnet-4-6", # Set streaming=False to disable streaming for the chat model streaming=False)
Copy
from langchain_openai import ChatOpenAI# Set streaming=False to disable streaming for the chat modelmodel = ChatOpenAI(model="o1-preview", streaming=False)
Not all chat model integrations support the streaming parameter. If your model doesn’t support it, use disable_streaming=True instead. This parameter is available on all chat models via the base class.
When you pass version="v2" to invoke() or ainvoke(), it returns a GraphOutput object with .value and .interrupts attributes:
Copy
from langgraph.types import GraphOutputresult = graph.invoke(inputs, version="v2")assert isinstance(result, GraphOutput)result.value # your output — dict, Pydantic model, or dataclassresult.interrupts # tuple[Interrupt, ...], empty if none occurred
With any stream mode other than the default "values", invoke(..., stream_mode="updates", version="v2") returns list[StreamPart] instead of list[tuple].
Dict-style access on GraphOutput (result["key"], "key" in result, result["__interrupt__"]) still works for backwards compatibility but is deprecated and will be removed in a future version. Migrate to result.value and result.interrupts.
This separates state from interrupt metadata. With v1, interrupts are embedded in the returned dict under __interrupt__:
In Python versions < 3.11, asyncio tasks do not support the context parameter.
This limits LangGraph ability to automatically propagate context, and affects LangGraph’s streaming mechanisms in two key ways:
You must explicitly pass RunnableConfig into async LLM calls (e.g., ainvoke()), as callbacks are not automatically propagated.
You cannot use get_stream_writer in async nodes or tools—you must pass a writer argument directly.
Extended example: async LLM call with manual config
Copy
from typing import TypedDictfrom langgraph.graph import START, StateGraphfrom langchain.chat_models import init_chat_modelmodel = init_chat_model(model="gpt-4.1-mini")class State(TypedDict): topic: str joke: str# Accept config as an argument in the async node functionasync def call_model(state, config): topic = state["topic"] print("Generating joke...") # Pass config to model.ainvoke() to ensure proper context propagation joke_response = await model.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], config, ) return {"joke": joke_response.content}graph = ( StateGraph(State) .add_node(call_model) .add_edge(START, "call_model") .compile())# Set stream_mode="messages" to stream LLM tokensasync for chunk in graph.astream( {"topic": "ice cream"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": message_chunk, metadata = chunk["data"] if message_chunk.content: print(message_chunk.content, end="|", flush=True)
Extended example: async custom streaming with stream writer
Copy
from typing import TypedDictfrom langgraph.types import StreamWriterclass State(TypedDict): topic: str joke: str# Add writer as an argument in the function signature of the async node or tool# LangGraph will automatically pass the stream writer to the functionasync def generate_joke(state: State, writer: StreamWriter): writer({"custom_key": "Streaming custom data while generating a joke"}) return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(generate_joke) .add_edge(START, "generate_joke") .compile())# Set stream_mode="custom" to receive the custom data in the stream #async for chunk in graph.astream( {"topic": "ice cream"}, stream_mode="custom", version="v2",): if chunk["type"] == "custom": print(chunk["data"])