Skip to main content
Agent2Agent (A2A) is Google’s protocol for enabling communication between conversational AI agents. LangSmith implements A2A support, allowing your agents to communicate with other A2A-compatible agents through a standardized protocol. The A2A endpoint is available in Agent Server at /a2a/{assistant_id}.

Supported methods

Agent Server supports the following A2A RPC methods:
  • message/send: Send a message to an assistant and receive a complete response
  • message/stream: Send a message and stream responses in real-time using Server-Sent Events (SSE)
  • tasks/get: Retrieve the status and results of a previously created task

Agent card discovery

Each assistant automatically exposes an A2A Agent Card that describes its capabilities and provides the information needed for other agents to connect. You can retrieve the agent card for any assistant using:
GET /.well-known/agent-card.json?assistant_id={assistant_id}
The agent card includes the assistant’s name, description, available skills, supported input/output modes, and the A2A endpoint URL for communication.

Requirements

To use A2A, ensure you have the following dependencies installed:
  • langgraph-api >= 0.4.21
Install with:
pip install "langgraph-api>=0.4.21"

Usage overview

To enable A2A:
  • Upgrade to use langgraph-api>=0.4.21.
  • Deploy your agent with message-based state structure.
  • Connect with other A2A-compatible agents using the endpoint.

Creating an A2A-compatible agent

This example creates an A2A-compatible agent that processes incoming messages using OpenAI’s API and maintains conversational state. The agent defines a message-based state structure and handles the A2A protocol’s message format. To be compatible with the A2A “text” parts, the agent must have a messages key in state. The A2A protocol uses two identifiers to maintain conversational continuity:
  • contextId: Groups messages into a conversation thread (like a session ID)
  • taskId: Identifies each individual request within that conversation
On the first message, omit contextId and taskId - the agent will generate and return them. For all subsequent messages in the conversation, include the contextId and taskId from the prior response to maintain thread continuity. LangSmith Tracing: The Langsmith Deployment A2A endpoint automatically converts the A2A contextId to thread_id for LangSmith tracing, grouping all messages in the conversation under a single thread. For example:
"""LangGraph A2A conversational agent.

Supports the A2A protocol with messages input for conversational interactions.
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Any, Dict, List, TypedDict

from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
from openai import AsyncOpenAI


class Context(TypedDict):
    """Context parameters for the agent."""
    my_configurable_param: str


@dataclass
class State:
    """Input state for the agent.

    Defines the initial structure for A2A conversational messages.
    """
    messages: List[Dict[str, Any]]


async def call_model(state: State, runtime: Runtime[Context]) -> Dict[str, Any]:
    """Process conversational messages and returns output using OpenAI."""
    # Initialize OpenAI client
    client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    # Process the incoming messages
    latest_message = state.messages[-1] if state.messages else {}
    user_content = latest_message.get("content", "No message content")

    # Create messages for OpenAI API
    openai_messages = [
        {
            "role": "system",
            "content": "You are a helpful conversational agent. Keep responses brief and engaging."
        },
        {
            "role": "user",
            "content": user_content
        }
    ]

    try:
        # Make OpenAI API call
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=openai_messages,
            max_tokens=100,
            temperature=0.7
        )

        ai_response = response.choices[0].message.content

    except Exception as e:
        ai_response = f"I received your message but had trouble processing it. Error: {str(e)[:50]}..."

    # Create a response message
    response_message = {
        "role": "assistant",
        "content": ai_response
    }

    return {
        "messages": state.messages + [response_message]
    }


# Define the graph
graph = (
    StateGraph(State, context_schema=Context)
    .add_node(call_model)
    .add_edge("__start__", "call_model")
    .compile()
)

Agent-to-agent communication

Once your agents are running locally via langgraph dev or deployed to production, you can facilitate communication between them using the A2A protocol. This example demonstrates how two agents can communicate by sending JSON-RPC messages to each other’s A2A endpoints. The script simulates a multi-turn conversation where each agent processes the other’s response and continues the dialogue.
#!/usr/bin/env python3
"""Agent-to-Agent conversation simulation using the LangGraph A2A endpoint."""

import asyncio
import aiohttp
import os
import uuid


def extract_text(result: dict) -> str:
    """Best-effort extraction of response text from an A2A result."""
    for art in result.get("result", {}).get("artifacts", []) or []:
        for part in art.get("parts", []) or []:
            if part.get("kind") == "text" and part.get("text"):
                return part["text"]

    msg = (result.get("result", {}).get("status", {}) or {}).get("message", {}) or {}
    for part in msg.get("parts", []) or []:
        if part.get("kind") == "text" and part.get("text"):
            return part["text"]

    return "(no text found)"


async def send_message(session, port, assistant_id, text, context_id=None, task_id=None):
    """Send an A2A message. Returns (response_text, returned_context_id, returned_task_id)."""
    url = f"http://127.0.0.1:{port}/a2a/{assistant_id}"

    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }

    # A2A multi-turn continuity: reuse contextId and taskId across turns/agents
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
    }

    headers = {"Accept": "application/json"}
    async with session.post(url, json=payload, headers=headers) as response:
        result = await response.json()

    returned_context_id = result.get("result", {}).get("contextId") or context_id
    returned_task_id = result.get("result", {}).get("id")
    return extract_text(result), returned_context_id, returned_task_id


async def simulate_conversation():
    """Simulate a conversation between two agents."""

    #Assistant IDs
    agent_a_id = os.getenv("AGENT_A_ID")
    agent_b_id = os.getenv("AGENT_B_ID")

    if not agent_a_id or not agent_b_id:
        print("Set AGENT_A_ID and AGENT_B_ID environment variables")
        return

    message = "Hello! Let's have a conversation."
    context_id = None
    task_id = None

    async with aiohttp.ClientSession() as session:
        for i in range(3):
            print(f"--- Round {i + 1} ---")

            message, context_id, task_id = await send_message(
                session, 2024, agent_a_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔵 Agent A: {message}")

            message, context_id, task_id = await send_message(
                session, 2025, agent_b_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔴 Agent B: {message}\n")


if __name__ == "__main__":
    asyncio.run(simulate_conversation())
For complete working examples, see:

Distributed tracing

When multiple agents communicate over A2A, LangSmith can group all their traces into a single thread, which gives you a unified view of the entire multi-agent conversation.

How contextId maps to thread_id

The Agent Server A2A endpoint automatically converts the A2A contextId to thread_id for LangSmith tracing. This means every message in a conversation, across all participating agents, is grouped under the same thread in LangSmith without any extra configuration on your part. The flow works as follows:
  1. On the first message, the client omits contextId. The server generates one and returns it in the response.
  2. The client passes the contextId in all subsequent messages to maintain conversation continuity.
  3. Agent Server maps the contextId to thread_id in LangSmith metadata, so all turns appear in the same thread.

Tracing across multiple agents

When agents from different frameworks communicate over A2A, you can unify their traces in LangSmith by sharing the same thread_id across all agents. Use the contextId returned by the first agent as the thread_id for all subsequent requests. The following code snippet demonstrates the key concepts. For a complete runnable implementation with two agents, refer to the Google ADK + LangChain example.
import asyncio
import aiohttp
import uuid


async def send_message(session, url, text, context_id=None, task_id=None, thread_id=None):
    """Send an A2A message and return (response_text, context_id, task_id)."""

    # --- 1. Build the message ---
    # On follow-up turns, include contextId and taskId inside the message object
    # so the server associates them with the ongoing conversation.
    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    # --- 2. Set thread_id in metadata ---
    # thread_id goes at the top level of the JSON-RPC payload, not inside params.
    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
        "metadata": {"thread_id": thread_id},
    }

    async with session.post(url, json=payload, headers={"Accept": "application/json"}) as response:
        if response.status != 200:
            raise RuntimeError(f"HTTP {response.status}: {await response.text()}")
        result = await response.json()

    if "error" in result:
        raise RuntimeError(result["error"].get("message", "Unknown error"))

    result_obj = result.get("result", {})
    returned_context_id = result_obj.get("contextId") or context_id
    returned_task_id = result_obj.get("id")
    text_out = next(
        (
            part.get("text", "")
            for art in result_obj.get("artifacts", []) or []
            for part in art.get("parts", []) or []
            if part.get("kind") == "text"
        ),
        "(no text)",
    )
    return text_out, returned_context_id, returned_task_id


async def run_conversation(agent_a_url, agent_b_url):
    # --- 3. Share thread_id across agents ---
    # Generate a shared thread_id upfront. Once the server returns a contextId,
    # use that instead — this keeps the A2A context and LangSmith thread in sync.
    thread_id = str(uuid.uuid4())
    context_id = None
    task_id = None
    message = "Hello! Let's collaborate."

    async with aiohttp.ClientSession() as session:
        for _ in range(3):
            message, context_id, task_id = await send_message(
                session, agent_a_url, message,
                context_id=context_id, task_id=task_id,
                thread_id=context_id or thread_id,
            )

            # Passing the same thread_id to every agent groups all traces in LangSmith
            message, context_id, task_id = await send_message(
                session, agent_b_url, message,
                context_id=context_id, task_id=task_id,
                thread_id=context_id or thread_id,
            )


asyncio.run(run_conversation(
    "http://localhost:2024/a2a/<agent_a_assistant_id>",
    "http://localhost:2025/a2a/<agent_b_assistant_id>",
))
1. Build the message: Include contextId and taskId inside the message object on follow-up turns so the server can associate them with the ongoing conversation. Omit them on the first message, because the server generates a contextId and returns it in the response. 2. Set thread_id in metadata: Pass thread_id in the top-level metadata field of the JSON-RPC payload, not inside params. 3. Share thread_id across agents: Generate a random thread_id before the first message. Once the server returns a contextId, use it as the thread_id for all subsequent requests, which keeps the A2A conversation context and the LangSmith thread in sync. Pass the same thread_id to every agent so all traces are grouped into one thread.

Receive thread_id in non-LangGraph agents

The previous section covers the client side—propagating thread_id when sending messages. If one of your agents is not built on LangGraph, it also needs to extract and attach the thread_id on the receiving end so its traces land in the same LangSmith thread. Use langsmith.integrations.otel.configure() to set up automatic tracing, and extract the thread_id from incoming A2A request metadata to group traces in the same thread.
from fastapi import FastAPI, Request
from langsmith.integrations.otel import configure as configure_otel
from opentelemetry import trace
import json

# --- 1. Configure OTel ---
# Set up automatic tracing to LangSmith for your non-LangGraph agent.
configure_otel(project_name="my-a2a-project")
tracer = trace.get_tracer(__name__)

app = FastAPI()

@app.middleware("http")
async def set_thread_id_middleware(request: Request, call_next):
    thread_id = None
    if request.method == "POST":
        body_bytes = await request.body()
        if body_bytes:
            # --- 2. Extract thread_id from incoming A2A metadata ---
            try:
                body = json.loads(body_bytes)
                thread_id = body.get("metadata", {}).get("thread_id")
            except Exception:
                pass
            # Re-inject the body so downstream handlers can still read it
            async def receive():
                return {"type": "http.request", "body": body_bytes}
            request._receive = receive

    # --- 3. Attach thread_id to the trace ---
    # langsmith.metadata.thread_id groups this trace with others in the same thread.
    with tracer.start_as_current_span("agent") as span:
        if thread_id:
            span.set_attribute("langsmith.metadata.thread_id", thread_id)
        return await call_next(request)
Register your agent routes on app after this middleware.
Set LANGSMITH_API_KEY and optionally LANGSMITH_PROJECT in your environment to enable tracing. All agents in the conversation should use the same project so their traces are visible together.

View traces in LangSmith

After running a multi-agent conversation, open the LangSmith UI and navigate to Threads. All turns from all participating agents will appear under a single thread, identified by the shared thread_id.

Disable A2A

To disable the A2A endpoint, set disable_a2a to true in your langgraph.json configuration file:
{
  "$schema": "https://langgra.ph/schema.json",
  "http": {
    "disable_a2a": true
  }
}