Stream real-time updates from deep agent runs and subagent execution
Deep agents build on LangGraph’s streaming infrastructure with first-class support for subagent streams. When a deep agent delegates work to subagents, you can stream updates from each subagent independently—tracking progress, LLM tokens, and tool calls in real time.What’s possible with deep agent streaming:
Deep agents use LangGraph’s subgraph streaming to surface events from subagent execution. To receive subagent events, enable stream_subgraphs when streaming.
Copy
from deepagents import create_deep_agentagent = create_deep_agent( system_prompt="You are a helpful research assistant", subagents=[ { "name": "researcher", "description": "Researches a topic in depth", "system_prompt": "You are a thorough researcher.", }, ],)for chunk in agent.stream( {"messages": [{"role": "user", "content": "Research quantum computing advances"}]}, stream_mode="updates", subgraphs=True, version="v2",): if chunk["type"] == "updates": if chunk["ns"]: # Subagent event - namespace identifies the source print(f"[subagent: {chunk['ns']}]") else: # Main agent event print("[main agent]") print(chunk["data"])
When subgraphs is enabled, each streaming event includes a namespace that identifies which agent produced it. The namespace is a path of node names and task IDs that represents the agent hierarchy.
Namespace
Source
() (empty)
Main agent
("tools:abc123",)
A subagent spawned by the main agent’s task tool call abc123
("tools:abc123", "model_request:def456")
The model request node inside a subagent
Use namespaces to route events to the correct UI component:
Copy
for chunk in agent.stream( {"messages": [{"role": "user", "content": "Plan my vacation"}]}, stream_mode="updates", subgraphs=True, version="v2",): if chunk["type"] == "updates": # Check if this event came from a subagent is_subagent = any( segment.startswith("tools:") for segment in chunk["ns"] ) if is_subagent: # Extract the tool call ID from the namespace tool_call_id = next( s.split(":")[1] for s in chunk["ns"] if s.startswith("tools:") ) print(f"Subagent {tool_call_id}: {chunk['data']}") else: print(f"Main agent: {chunk['data']}")
Use stream_mode="updates" to track subagent progress as each step completes. This is useful for showing which subagents are active and what work they’ve completed.
Copy
from deepagents import create_deep_agentagent = create_deep_agent( system_prompt=( "You are a project coordinator. Always delegate research tasks " "to your researcher subagent using the task tool. Keep your final response to one sentence." ), subagents=[ { "name": "researcher", "description": "Researches topics thoroughly", "system_prompt": ( "You are a thorough researcher. Research the given topic " "and provide a concise summary in 2-3 sentences." ), }, ],)for chunk in agent.stream( {"messages": [{"role": "user", "content": "Write a short summary about AI safety"}]}, stream_mode="updates", subgraphs=True, version="v2",): if chunk["type"] == "updates": # Main agent updates (empty namespace) if not chunk["ns"]: for node_name, data in chunk["data"].items(): if node_name == "tools": # Subagent results returned to main agent for msg in data.get("messages", []): if msg.type == "tool": print(f"\nSubagent complete: {msg.name}") print(f" Result: {str(msg.content)[:200]}...") else: print(f"[main agent] step: {node_name}") # Subagent updates (non-empty namespace) else: for node_name, data in chunk["data"].items(): print(f" [{chunk['ns'][0]}] step: {node_name}")
Use stream_mode="messages" to stream individual tokens from both the main agent and subagents. Each message event includes metadata that identifies the source agent.
Copy
current_source = ""for chunk in agent.stream( {"messages": [{"role": "user", "content": "Research quantum computing advances"}]}, stream_mode="messages", subgraphs=True, version="v2",): if chunk["type"] == "messages": token, metadata = chunk["data"] # Check if this event came from a subagent (namespace contains "tools:") is_subagent = any(s.startswith("tools:") for s in chunk["ns"]) if is_subagent: # Token from a subagent subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:")) if subagent_ns != current_source: print(f"\n\n--- [subagent: {subagent_ns}] ---") current_source = subagent_ns if token.content: print(token.content, end="", flush=True) else: # Token from the main agent if "main" != current_source: print("\n\n--- [main agent] ---") current_source = "main" if token.content: print(token.content, end="", flush=True)print()
When subagents use tools, you can stream tool call events to display what each subagent is doing. Tool call chunks appear in the messages stream mode.
Copy
for chunk in agent.stream( {"messages": [{"role": "user", "content": "Research recent quantum computing advances"}]}, stream_mode="messages", subgraphs=True, version="v2",): if chunk["type"] == "messages": token, metadata = chunk["data"] # Identify source: "main" or the subagent namespace segment is_subagent = any(s.startswith("tools:") for s in chunk["ns"]) source = next((s for s in chunk["ns"] if s.startswith("tools:")), "main") if is_subagent else "main" # Tool call chunks (streaming tool invocations) if token.tool_call_chunks: for tc in token.tool_call_chunks: if tc.get("name"): print(f"\n[{source}] Tool call: {tc['name']}") # Args stream in chunks - write them incrementally if tc.get("args"): print(tc["args"], end="", flush=True) # Tool results if token.type == "tool": print(f"\n[{source}] Tool result [{token.name}]: {str(token.content)[:150]}") # Regular AI content (skip tool call messages) if token.type == "ai" and token.content and not token.tool_call_chunks: print(token.content, end="", flush=True)print()
Use @[get_stream_writer][langgraph.config.get_stream_writer] inside your subagent tools to emit custom progress events:
Copy
import timefrom langchain.tools import toolfrom langgraph.config import get_stream_writerfrom deepagents import create_deep_agent@tooldef analyze_data(topic: str) -> str: """Run a data analysis on a given topic. This tool performs the actual analysis and emits progress updates. You MUST call this tool for any analysis request. """ writer = get_stream_writer() writer({"status": "starting", "topic": topic, "progress": 0}) time.sleep(0.5) writer({"status": "analyzing", "progress": 50}) time.sleep(0.5) writer({"status": "complete", "progress": 100}) return ( f'Analysis of "{topic}": Customer sentiment is 85% positive, ' "driven by product quality and support response times." )agent = create_deep_agent( system_prompt=( "You are a coordinator. For any analysis request, you MUST delegate " "to the analyst subagent using the task tool. Never try to answer directly. " "After receiving the result, summarize it in one sentence." ), subagents=[ { "name": "analyst", "description": "Performs data analysis with real-time progress tracking", "system_prompt": ( "You are a data analyst. You MUST call the analyze_data tool " "for every analysis request. Do not use any other tools. " "After the analysis completes, report the result." ), "tools": [analyze_data], }, ],)for chunk in agent.stream( {"messages": [{"role": "user", "content": "Analyze customer satisfaction trends"}]}, stream_mode="custom", subgraphs=True, version="v2",): if chunk["type"] == "custom": is_subagent = any(s.startswith("tools:") for s in chunk["ns"]) if is_subagent: subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:")) print(f"[{subagent_ns}]", chunk["data"]) else: print("[main]", chunk["data"])
Combine multiple stream modes to get a complete picture of agent execution:
Copy
# Skip internal middleware steps - only show meaningful node namesINTERESTING_NODES = {"model_request", "tools"}last_source = ""mid_line = False # True when we've written tokens without a trailing newlinefor chunk in agent.stream( {"messages": [{"role": "user", "content": "Analyze the impact of remote work on team productivity"}]}, stream_mode=["updates", "messages", "custom"], subgraphs=True, version="v2",): is_subagent = any(s.startswith("tools:") for s in chunk["ns"]) source = "subagent" if is_subagent else "main" if chunk["type"] == "updates": for node_name in chunk["data"]: if node_name not in INTERESTING_NODES: continue if mid_line: print() mid_line = False print(f"[{source}] step: {node_name}") elif chunk["type"] == "messages": token, metadata = chunk["data"] if token.content: # Print a header when the source changes if source != last_source: if mid_line: print() mid_line = False print(f"\n[{source}] ", end="") last_source = source print(token.content, end="", flush=True) mid_line = True elif chunk["type"] == "custom": if mid_line: print() mid_line = False print(f"[{source}] custom event:", chunk["data"])print()
active_subagents = {}for chunk in agent.stream( {"messages": [{"role": "user", "content": "Research the latest AI safety developments"}]}, stream_mode="updates", subgraphs=True, version="v2",): if chunk["type"] == "updates": for node_name, data in chunk["data"].items(): # ─── Phase 1: Detect subagent starting ──────────────────────── # When the main agent's model_request contains task tool calls, # a subagent has been spawned. if not chunk["ns"] and node_name == "model_request": for msg in data.get("messages", []): for tc in getattr(msg, "tool_calls", []): if tc["name"] == "task": active_subagents[tc["id"]] = { "type": tc["args"].get("subagent_type"), "description": tc["args"].get("description", "")[:80], "status": "pending", } print( f'[lifecycle] PENDING → subagent "{tc["args"].get("subagent_type")}" ' f'({tc["id"]})' ) # ─── Phase 2: Detect subagent running ───────────────────────── # When we receive events from a tools:UUID namespace, that # subagent is actively executing. if chunk["ns"] and chunk["ns"][0].startswith("tools:"): pregel_id = chunk["ns"][0].split(":")[1] # Check if any pending subagent needs to be marked running. # Note: the pregel task ID differs from the tool_call_id, # so we mark any pending subagent as running on first subagent event. for sub_id, sub in active_subagents.items(): if sub["status"] == "pending": sub["status"] = "running" print( f'[lifecycle] RUNNING → subagent "{sub["type"]}" ' f"(pregel: {pregel_id})" ) break # ─── Phase 3: Detect subagent completing ────────────────────── # When the main agent's tools node returns a tool message, # the subagent has completed and returned its result. if not chunk["ns"] and node_name == "tools": for msg in data.get("messages", []): if msg.type == "tool": sub = active_subagents.get(msg.tool_call_id) if sub: sub["status"] = "complete" print( f'[lifecycle] COMPLETE → subagent "{sub["type"]}" ' f"({msg.tool_call_id})" ) print(f" Result preview: {str(msg.content)[:120]}...")# Print final stateprint("\n--- Final subagent states ---")for sub_id, sub in active_subagents.items(): print(f" {sub['type']}: {sub['status']}")
All examples on this page use the v2 streaming format (version="v2"), which is the recommended approach. Every chunk is a StreamPart dict with type, ns, and data keys — the same shape regardless of stream mode, number of modes, or subgraph settings.The v2 format eliminates nested tuple unpacking, making it straightforward to handle subgraph streaming in deep agents. Compare the two formats:
Copy
# Unified format — no nested tuple unpackingfor chunk in agent.stream( {"messages": [{"role": "user", "content": "Research quantum computing"}]}, stream_mode=["updates", "messages", "custom"], subgraphs=True, version="v2",): print(chunk["type"]) # "updates", "messages", or "custom" print(chunk["ns"]) # () for main agent, ("tools:<id>",) for subagent print(chunk["data"]) # payload
See the LangGraph streaming docs for more details on the v2 format, including type narrowing and Pydantic/dataclass coercion.