Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.langchain.com/llms.txt

Use this file to discover all available pages before exploring further.

Event Streaming is the recommended in-process streaming model for most LangGraph application code. It returns a run stream object that can be consumed in multiple ways at the same time.
Check out the streaming cookbook for runnable examples and links to detailed reference documentation.
const run = await graph.streamEvents(
  { messages: [{ role: "user", content: "What is 42 * 17?" }] },
  { version: "v3" }
);

for await (const message of run.messages) {
  for await (const token of message.text) {
    process.stdout.write(token);
  }
}

const finalState = await run.output;
The version="v3" flag is temporarily required to opt in to this stream behavior. The new behavior will become the default stream version in the next major LangGraph release.

What Event Streaming provides

The run stream exposes typed projections over one underlying event flow:
ProjectionUse
runIterate every protocol event.
run.messagesStream chat model messages and token deltas.
run.valuesIterate state snapshots and await the final value.
run.outputAwait the final output.
run.subgraphsDiscover and observe nested graph executions.
run.interruptsInspect human-in-the-loop interrupt payloads.
run.interruptedCheck whether the run paused for human input.
run.extensionsConsume custom stream transformer projections.
Multiple consumers can read these projections concurrently. Reading run.messages does not consume events needed by run.values, run.subgraphs, or run.output.

Stream protocol events

Use the run object itself when you want the raw protocol event stream:
const run = await graph.streamEvents(input, { version: "v3" });

for await (const event of run) {
  const namespace = event.params.namespace;
  console.log(namespace, event.method, event.params.data);
}
Each protocol event has a channel-like method, a monotonic seq number, and params containing namespace, timestamp, optional node, and channel-specific data.
{
  "type": "event",
  "seq": 42,
  "method": "messages",
  "params": {
    "namespace": [],
    "timestamp": 1770000000000,
    "node": "agent",
    "data": {
      "event": "content-block-delta",
      "content_block": {
        "type": "text",
        "text": "hello"
      }
    }
  }
}
Core channels include:
ChannelPurpose
valuesFull graph state snapshots.
updatesPer-node state deltas.
messagesContent-block-centric chat model output.
toolsTool call start, streamed output, finish, and error events.
lifecycleRun, subgraph, and subagent status changes.
checkpointsLightweight checkpoint envelopes for branching and time travel.
inputHuman-in-the-loop input requests and responses.
tasksPregel task creation and result events.
customUser-defined payloads from graph code.
custom:<name>Application-defined stream transformer output.
The messages channel models output as content blocks. This makes token streaming, reasoning blocks, tool-call blocks, and multimodal content explicit without requiring provider-specific formats in application code.

Add custom transformers

Stream transformers are the projection layer in Event Streaming. They observe protocol events, keep their own state, and expose derived views of a run such as progress events, artifacts, token totals, tool activity, or third-party protocol messages. A transformer creates a projection in init(), observes each event in process(), and finalizes or fails the projection when the run completes.
interface StreamTransformer<TProjection = unknown> {
  init(): TProjection;
  process(event: ProtocolEvent): boolean;
  finalize?(): void | PromiseLike<void>;
  fail?(err: unknown): void;
}
Pass transformers when you start the event stream:
const run = await graph.streamEvents(input, {
  version: "v3",
  transformers: [toolActivityTransformer],
});

for await (const activity of run.extensions.toolActivity) {
  console.log(activity);
}

Use StreamChannel

StreamChannel is the projection primitive for custom streaming data. It gives in-process consumers an iterable stream, and it can also forward pushed values to remote SDK clients when the channel has a protocol name.
NeedUse
Data should stay in process onlyStreamChannel() or new StreamChannel<T>()
Data should be available in process and over the wireStreamChannel(name) or new StreamChannel<T>(name)
Named channel payloads must be serializable because they are also emitted as custom:<name> protocol events. Keep promises, async iterables, class instances, and other in-process handles local.
import { StreamChannel } from "@langchain/langgraph";

const toolActivityTransformer = () => {
  const activity = new StreamChannel<{
    name: string;
    status: "started" | "finished" | "error";
  }>("toolActivity");

  return {
    init: () => ({ toolActivity: activity }),
    process(event) {
      if (event.method === "tools") {
        const data = event.params.data as { tool_name?: string; event?: string };
        if (data.tool_name && data.event) {
          activity.push({
            name: data.tool_name,
            status: data.event === "tool-error" ? "error" : "started",
          });
        }
      }
      return true;
    },
  };
};