LangGraph SDK allows you to stream outputs from the LangGraph API server.
LangGraph SDK and LangGraph Server are a part of LangGraph Platform.

Basic usage

Basic usage example:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
# highlight-next-line
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)

Supported stream modes

ModeDescriptionLangGraph Library Method
valuesStream the full graph state after each super-step..stream() / .astream() with stream_mode="values"
updatesStreams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately..stream() / .astream() with stream_mode="updates"
messages-tupleStreams LLM tokens and metadata for the graph node where the LLM is invoked (useful for chat apps)..stream() / .astream() with stream_mode="messages"
debugStreams as much information as possible throughout the execution of the graph..stream() / .astream() with stream_mode="debug"
customStreams custom data from inside your graph.stream() / .astream() with stream_mode="custom"
eventsStream all events (including the state of the graph); mainly useful when migrating large LCEL apps..astream_events()

Stream multiple modes

You can pass a list as the stream_mode parameter to stream multiple modes at once. The streamed outputs will be tuples of (mode, chunk) where mode is the name of the stream mode and chunk is the data streamed by that mode.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode=["updates", "custom"]
):
    print(chunk)

Stream graph state

Use the stream modes updates and values to stream the state of the graph as it executes.
  • updates streams the updates to the state after each step of the graph.
  • values streams the full value of the state after each step of the graph.
Stateful runs Examples below assume that you want to persist the outputs of a streaming run in the checkpointer DB and have created a thread. To create a thread:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
If you don’t need to persist the outputs of a run, you can pass None instead of thread_id when streaming.

Stream Mode: updates

Use this to stream only the state updates returned by the nodes after each step. The streamed outputs include the name of the node as well as the update.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    # highlight-next-line
    stream_mode="updates"
):
    print(chunk.data)

Stream Mode: values

Use this to stream the full state of the graph after each step.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    # highlight-next-line
    stream_mode="values"
):
    print(chunk.data)

Subgraphs

To include outputs from subgraphs in the streamed outputs, you can set subgraphs=True in the .stream() method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.
for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    # highlight-next-line
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. Set stream_subgraphs=True to stream outputs from subgraphs.

Debugging

Use the debug streaming mode to stream as much information as possible throughout the execution of the graph. The streamed outputs include the name of the node as well as the full state.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    # highlight-next-line
    stream_mode="debug"
):
    print(chunk.data)

LLM tokens

Use the messages-tuple streaming mode to stream Large Language Model (LLM) outputs token by token from any part of your graph, including nodes, tools, subgraphs, or tasks. The streamed output from messages-tuple mode is a tuple (message_chunk, metadata) where:
  • message_chunk: the token or message segment from the LLM.
  • metadata: a dictionary containing details about the graph node and LLM invocation.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    # highlight-next-line
    stream_mode="messages-tuple",
):
    if chunk.event != "messages":
        continue

    message_chunk, metadata = chunk.data  # (1)!
    if message_chunk["content"]:
        print(message_chunk["content"], end="|", flush=True)
  1. The “messages-tuple” stream mode returns an iterator of tuples (message_chunk, metadata) where message_chunk is the token streamed by the LLM and metadata is a dictionary with information about the graph node where the LLM was called and other information.

Filter LLM tokens

Stream custom data

To send custom user-defined data:
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"query": "example"},
    # highlight-next-line
    stream_mode="custom"
):
    print(chunk.data)

Stream events

To stream all events, including the state of the graph:
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    # highlight-next-line
    stream_mode="events"
):
    print(chunk.data)

Stateless runs

If you don’t want to persist the outputs of a streaming run in the checkpointer DB, you can create a stateless run without creating a thread:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.stream(
    # highlight-next-line
    None,  # (1)!
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
  1. We are passing None instead of a thread_id UUID.

Join and stream

LangGraph Platform allows you to join an active background run and stream outputs from it. To do so, you can use LangGraph SDK’s client.runs.join_stream method:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# highlight-next-line
async for chunk in client.runs.join_stream(
    thread_id,
    # highlight-next-line
    run_id,  # (1)!
):
    print(chunk)
  1. This is the run_id of an existing run you want to join.
Outputs not buffered When you use .join_stream, output is not buffered, so any output produced before joining will not be received.

API Reference

For API usage and implementation, refer to the API reference.