Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.langchain.com/llms.txt

Use this file to discover all available pages before exploring further.

This page covers streaming concerns specific to Deep Agents—most importantly, streaming from delegated subagents via stream.subagents. For general agent streaming (stream.messages, stream.values, tool calls, custom updates), see LangChain Event Streaming.

Stream subagents

Deep Agents add a subagent projection on top of LangGraph streaming. Use stream.subagents when you want one stream handle per delegated task call. The projection is lightweight: it discovers subagent tasks first, and message, tool-call, and value streams are opened only when you access them on a subagent handle.
stream = agent.stream_events({
    "messages": [{"role": "user", "content": "Write me a haiku about the sea"}],
}, version="v3")

for subagent in stream.subagents:
    print(subagent.name, subagent.path, subagent.status)

    for message in subagent.messages:
        print(message.text)

Subagent stream fields

Each subagent stream exposes the same kinds of projections as the parent run, such as messages, tool calls, nested subagents, and final output. For the general parent-run streaming model, see LangChain Event Streaming. Python uses snake_case projection names such as tool_calls. Each subagent stream can expose .messages, .tool_calls, .values, .subagents, and .output.
FieldDescription
nameSubagent name.
messagesMessages emitted by the subagent.
subagentsNested subagent invocations.
outputFinal subagent state, or completion signal for the delegated task.
pathNamespace path for the subagent stream.
statusLifecycle status such as started, completed, failed, or interrupted.
tool_callsTool calls scoped to the subagent.

Track subagent lifecycle

Use stream.subagents when you only need to show which subagents started and finished. You do not need to subscribe to message or value streams unless you access those projections on an individual subagent.
stream = agent.stream_events(input, version="v3")

running = 0
completed = 0
failed = 0

for subagent in stream.subagents:
    running += 1
    print(f"{subagent.name}: started")

    try:
        _ = subagent.output
        running -= 1
        completed += 1
        print(f"{subagent.name}: completed")
    except Exception:
        running -= 1
        failed += 1
        print(f"{subagent.name}: failed")

Stream messages

Deep Agents can emit messages from the coordinator agent and from delegated subagents. Use stream.messages for top-level messages and subagent.messages for each delegated subagent.
stream = agent.stream_events(input, version="v3")

for message in stream.messages:
    print("[coordinator]", message.text)

for subagent in stream.subagents:
    for message in subagent.messages:
        print(f"[{subagent.name}]", message.text)

Stream tool calls

Deep Agents expose tool calls at each level of the agent tree. Use the top-level stream.tool_calls for coordinator tools and each subagent.tool_calls for delegated work.
stream = agent.stream_events(input, version="v3")

for call in stream.tool_calls:
    print("[coordinator tool]", call.tool_name, call.input)
    print(call.completed, call.error)

for subagent in stream.subagents:
    for call in subagent.tool_calls:
        print(f"[{subagent.name} tool]", call.tool_name, call.input)
        for delta in call.output_deltas:
            print(delta, end="", flush=True)

        if call.completed and call.error is None:
            print(call.output)
        elif call.error is not None:
            print(call.error)

Stream nested work

You can recurse into a subagent stream to observe nested subagents, messages, and tool calls.
stream = agent.stream_events(input, version="v3")

for subagent in stream.subagents:
    print(f"subagent {subagent.name}: {subagent.status}")

    for tool_call in subagent.tool_calls:
        print(f"{tool_call.tool_name}({tool_call.input})")
        for delta in tool_call.output_deltas:
            print(delta, end="", flush=True)

    for nested in subagent.subagents:
        print(f"nested subagent {nested.name}: {nested.status}")

Consume concurrently

Coordinator and subagent output often interleave. Consume projections concurrently when you need live UI updates. Use stream.interleave(...) when you want one sync loop over multiple projections:
stream = agent.stream_events(input, version="v3")

for name, item in stream.interleave("messages", "subagents"):
    if name == "messages":
        print("[coordinator]", item.text)
    else:
        for message in item.messages:
            print(f"[{item.name}]", message.text)
When you need exact arrival order across the coordinator and all subagents, iterate raw protocol events and use namespace to identify the source:
stream = agent.stream_events(input, version="v3")

for event in stream:
    if event.get("method") != "messages":
        continue

    payload = event["params"]["data"]
    if payload.get("event") != "content-block-delta":
        continue

    block = payload.get("delta") or {}
    if block.get("type") == "text":
        source = "subagent" if event["params"]["namespace"] else "coordinator"
        print(f"[{source}] {block['text']}", end="", flush=True)

Subagents versus subgraphs

stream.subgraphs shows graph execution structure. stream.subagents shows product-level Deep Agents task delegations. Use stream.subagents for user-facing UI because it hides internal graph nodes and exposes the subagent concept directly.