Skip to main content

BigQuery Callback Handler

CommunityPythonPreview
Google BigQuery is a serverless and cost-effective enterprise data warehouse that works across clouds and scales with your data.
The BigQueryCallbackHandler allows you to log events from LangChain to Google BigQuery. This is useful for monitoring, auditing, and analyzing the performance of your LLM applications. !!! example “Preview release” This feature is in Preview. APIs and functionality are subject to change. !!! warning “BigQuery Storage Write API” This feature uses the BigQuery Storage Write API, which is a paid service. For information on costs, see the BigQuery documentation.

Use cases

  • Agent workflow debugging and analysis: Capture a wide range of LangChain/LangGraph lifecycle events (LLM calls, tool usage) and agent-yielded events (user input, model responses), into a well-defined schema.
  • High-volume analysis and debugging: Logging operations are performed asynchronously using the Storage Write API to allow high throughput and low latency.
  • Multimodal Analysis: Log and analyze text, images, and other modalities. Large files are offloaded to GCS, making them accessible to BigQuery ML via Object Tables.
  • Distributed Tracing: Uses LangChain’s native tracing identifiers (run_id, parent_run_id) to visualize agent execution flows.
The agent event data recorded varies based on the LangChain/LangGraph event type. For more information, see Event types and payloads.

Installation

You need to install langchain-google-community with bigquery extra dependencies. For this example, you will also need langchain-google-genai and langgraph.
pip install "langchain-google-community[bigquery]" langchain langchain-google-genai langgraph

Prerequisites

  1. Google Cloud Project with the BigQuery API enabled.
  2. BigQuery Dataset: Create a dataset to store logging tables before using the callback handler. The callback handler automatically creates the necessary events table within the dataset if the table does not exist.
  3. Google Cloud Storage Bucket (Optional): If you plan to log multimodal content (images, audio, etc.), creating a GCS bucket is recommended for offloading large files.
  4. Authentication:
    • Local: Run gcloud auth application-default login.
    • Cloud: Ensure your service account has the required permissions.

IAM Permissions

For the callback handler to work properly, the principal (e.g., service account, user account) under which the application is running needs these Google Cloud roles:
  • roles/bigquery.jobUser at Project Level to run BigQuery queries.
  • roles/bigquery.dataEditor at Table Level to write log/event data.
  • If using GCS offloading: roles/storage.objectCreator and roles/storage.objectViewer on the target bucket.

Use with LangChain Agent

To use the AsyncBigQueryCallbackHandler, you need to instantiate it with your Google Cloud project ID, dataset ID, and table ID. If you want to log session_id, user_id, and agent fields to BigQuery, you must pass them via the metadata dictionary in the config object when invoking the chain or agent.
import asyncio
import os
from datetime import datetime

from langchain.tools import tool
from langchain_google_community.callbacks.bigquery_callback import AsyncBigQueryCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import create_react_agent

# 1. Define some tools for the agent
@tool
def get_current_time():
    """Returns the current local time."""
    return datetime.now().strftime("%H:%M:%S")

@tool
def get_weather(city: str):
    """Returns the current weather for a specific city."""
    return f"The weather in {city} is sunny and 25°C."

async def run_example_async_agent(bq_project_id: str):
    """Runs an asynchronous Agent with BigQuery logging."""

    # Setup BigQuery logging details
    dataset_id = "your_dataset_id"
    table_id = "your_table_id"

    print(f"--- Starting Async Agent Example ---")
    print(f"Logging to: {bq_project_id}.{dataset_id}.{table_id}")

    # Initialize the async callback handler
    bigquery_handler = AsyncBigQueryCallbackHandler(
        project_id=bq_project_id, dataset_id=dataset_id, table_id=table_id
    )

    try:
        # Setup LLM and Tools
        llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro")
        tools = [get_current_time, get_weather]

        # Create the Agent (LangGraph ReAct implementation)
        agent_executor = create_react_agent(llm, tools)

        query = "What time is it now, and what is the weather like in New York?"
        print(f"User Query: {query}")

        # Run the agent asynchronously
        # We use astream to process chunks, but you can also use ainvoke
        async for chunk in agent_executor.astream(
            {"messages": [("user", query)]},
            config={
                "callbacks": [bigquery_handler],  # Pass the handler here
                "metadata": {
                    "session_id": "agent-session-001",
                    "user_id": "user-123",
                    "agent": "weather-agent",
                },
            },
        ):
            # Print agent thoughts/actions as they happen
            if "agent" in chunk:
                print(f"🤖 Agent: {chunk['agent']['messages'][0].content}")
            elif "tools" in chunk:
                print(f"🛠️ Tool Output: {chunk['tools']['messages'][0].content}")

        print("✅ Agent finished. Logs are being written in the background...")
    finally:
        # Ensure resources are cleaned up
        await bigquery_handler.close()

if __name__ == "__main__":
    # Ensure GOOGLE_API_KEY is set in your environment
    if "GOOGLE_API_KEY" not in os.environ:
        raise ValueError("Please set the GOOGLE_API_KEY environment variable.")

    project_id = "your-project-id"
    asyncio.run(run_example_async_agent(project_id))

Configuration options

You can customize the callback handler using BigQueryLoggerConfig.
enabled
bool
default:"True"
To disable the handler from logging data to the BigQuery table, set this parameter to False.
clustering_fields
List[str]
default:"[\"event_type\", \"agent\", \"user_id\"]"
The fields used to cluster the BigQuery table when it is automatically created.
gcs_bucket_name
str
default:"None"
The name of the GCS bucket to offload large content (images, blobs, large text) to. If not provided, large content may be truncated or replaced with placeholders.
connection_id
str
default:"None"
The BigQuery connection ID (e.g., us.my-connection) to use as the authorizer for ObjectRef columns. Required for using ObjectRef with BigQuery ML.
max_content_length
int
default:"512000"
(500 KB) The maximum length (in characters) of text content to store inline in BigQuery before offloading to GCS (if configured) or truncating.
batch_size
int
default:"1"
The number of events to batch before writing to BigQuery.
batch_flush_interval
float
default:"1.0"
The maximum time (in seconds) to wait before flushing a partial batch.
shutdown_timeout
float
default:"10.0"
Seconds to wait for logs to flush during shutdown.
event_allowlist
List[str]
default:"None"
A list of event types to log. If None, all events are logged except those in event_denylist.
event_denylist
List[str]
default:"None"
A list of event types to skip logging.
log_multi_modal_content
bool
default:"True"
Whether to log detailed content parts (including GCS references).
table_id
str
default:"agent_events_v2"
The default table ID to use if not explicitly provided to the callback handler constructor.
retry_config
RetryConfig
default:"RetryConfig()"
Configuration for retry logic (max retries, delay, multiplier) when writing to BigQuery fails.
queue_max_size
int
default:"10000"
The maximum number of events to hold in the internal buffer queue before dropping new events.
The following code sample shows how to define a configuration for the BigQuery callback handler, including a custom content formatter:
import json
import re
from typing import Any

from langchain_google_community.callbacks.bigquery_callback import (
    BigQueryCallbackHandler,
    BigQueryLoggerConfig
)

def redact_dollar_amounts(event_content: Any) -> str:
    """
    Custom formatter to redact dollar amounts (e.g., $600, $12.50)
    and ensure JSON output if the input is a dict.
    """
    text_content = ""
    # If the content is a dictionary (e.g., a list of messages), convert it to a JSON string first.
    if isinstance(event_content, dict):
        text_content = json.dumps(event_content)
    else:
        text_content = str(event_content)

    # Regex to find dollar amounts: $ followed by digits, optionally with commas or decimals.
    # Examples: $600, $1,200.50, $0.99
    redacted_content = re.sub(r'\$\d+(?:,\d{3})*(?:\.\d+)?', 'xxx', text_content)

    return redacted_content

# 1. Configure BigQueryLoggerConfig
config = BigQueryLoggerConfig(
    enabled=True,
    event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # Only log these specific events
    shutdown_timeout=10.0,   # Wait up to 10s for logs to flush on exit
    max_content_length=500,  # Truncate content to 500 characters
    content_formatter=redact_dollar_amounts, # Set the custom formatting function
)

# 2. Initialize the Callback Handler
handler = BigQueryCallbackHandler(
    project_id="your-project-id",
    dataset_id="your_dataset",
    table_id="your_table",
    config=config
)

Schema and production setup

The plugin automatically creates the table if it does not exist. However, for production, we recommend creating the table manually using the following DDL, which utilizes the JSON type for flexibility and REPEATED RECORDs for multimodal content. Recommended DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`
(
  timestamp TIMESTAMP NOT NULL OPTIONS(description="The UTC timestamp when the event occurred."),
  event_type STRING OPTIONS(description="The category of the event."),
  agent STRING OPTIONS(description="The name of the agent."),
  session_id STRING OPTIONS(description="A unique identifier for the conversation session."),
  invocation_id STRING OPTIONS(description="A unique identifier for a single turn."),
  user_id STRING OPTIONS(description="The identifier of the end-user."),
  trace_id STRING OPTIONS(description="OpenTelemetry trace ID."),
  span_id STRING OPTIONS(description="OpenTelemetry span ID."),
  parent_span_id STRING OPTIONS(description="OpenTelemetry parent span ID."),
  content JSON OPTIONS(description="The primary payload of the event."),
  content_parts ARRAY<STRUCT<
    mime_type STRING,
    uri STRING,
    object_ref STRUCT<
      uri STRING,
      version STRING,
      authorizer STRING,
      details JSON
    >,
    text STRING,
    part_index INT64,
    part_attributes STRING,
    storage_mode STRING
  >> OPTIONS(description="For multi-modal events, contains a list of content parts."),
  attributes JSON OPTIONS(description="Arbitrary key-value pairs."),
  latency_ms JSON OPTIONS(description="Latency measurements."),
  status STRING OPTIONS(description="The outcome of the event."),
  error_message STRING OPTIONS(description="Detailed error message."),
  is_truncated BOOLEAN OPTIONS(description="Flag indicating if content was truncated.")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;

Event types and payloads

The content column contains a JSON object specific to the event_type. The content_parts column provides a structured view of the content, especially useful for images or offloaded data. !!! note “Content Truncation”
  • Variable content fields are truncated to max_content_length (configured in BigQueryLoggerConfig, default 500KB).
  • If gcs_bucket_name is configured, large content is offloaded to GCS instead of being truncated, and a reference is stored in content_parts.object_ref.

LLM interactions

These events track the raw requests sent to and responses received from the LLM.
Event TypeContent (JSON) StructureAttributes (JSON)Example Content (Simplified)
LLM_REQUEST

{
“messages”: [
{“content”: ”…”}
]
}

{
“prompts”: [”…”]
}
{
“tags”: [“tag1”],
“model”: “gemini-1.5-pro”
}
{
“messages”: [
{“content”: “What is the weather?”}
]
}
LLM_RESPONSE
“The weather is sunny.”
(Stored as JSON string)
{
“usage”: {
“total_tokens”: 20
}
}
“The weather is sunny.”
LLM_ERROR
null
{}
null (See error_message column)

Tool usage

These events track the execution of tools by the agent.
Event TypeContent (JSON) StructureAttributes (JSON)
TOOL_STARTING

“city=‘Paris’”
{}
TOOL_COMPLETED

“25°C, Sunny”
{}
TOOL_ERROR
”Error: Connection timeout”
{}

Chain Execution

These events track the start and end of high-level chains/graphs.
Event TypeContent (JSON) Structure
CHAIN_START
{
“messages”: […]
}
CHAIN_END
{
“output”: ”…”
}
CHAIN_ERROR
null (See error_message column)

Retriever usage

These events track the execution of retrievers.
Event TypeContent (JSON) Structure
RETRIEVER_START

“What is the capital of France?”
RETRIEVER_END

[
{
“page_content”: “Paris is the capital…”,
“metadata”: {“source”: “wiki”}
}
]
RETRIEVER_ERROR
null (See error_message column)

Agent Actions

These events track specific actions taken by the agent.
Event TypeContent (JSON) Structure
AGENT_ACTION
{
“tool”: “Calculator”,
“input”: “2 + 2”
}
AGENT_FINISH
{
“output”: “The answer is 4”
}

Other Events

Event TypeContent (JSON) Structure
TEXT

“Some logging text…”

Advanced analysis queries

Once your agent is running and logging events, you can perform power analysis on the agent_events_v2 table.

1. Reconstruct a Trace (Conversation Turn)

Use the trace_id to group all events (Chain, LLM, Tool) belonging to a single execution flow.
SELECT
  timestamp,
  event_type,
  span_id,
  parent_span_id,
  -- Extract summary or specific content based on event type
  COALESCE(
    JSON_VALUE(content, '$.messages[0].content'),
    JSON_VALUE(content, '$.summary'),
    JSON_VALUE(content)
  ) AS summary,
  JSON_VALUE(latency_ms, '$.total_ms') AS duration_ms
FROM
  `your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
    -- Replace with a specific trace_id from your logs
  trace_id = '019bb986-a0db-7da1-802d-2725795ab340'
ORDER BY
  timestamp ASC;

2. Analyze LLM Latency & Token Usage

Calculate the average latency and total token usage for your LLM calls.
SELECT
  JSON_VALUE(attributes, '$.model') AS model,
  COUNT(*) AS total_calls,
  AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms,
  SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokens
FROM
  `your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
  event_type = 'LLM_RESPONSE'
GROUP BY
  1;

3. Analyze Multimodal Content with BigQuery Remote Model (Gemini)

If you are offloading images to GCS, you can use BigQuery ML to analyze them directly.
SELECT
  logs.session_id,
  -- Get a signed URL for the image (optional, for viewing)
  STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
  -- Analyze the image using a remote model (e.g., gemini-1.5-pro)
  AI.GENERATE(
    ('Describe this image briefly. What company logo?', parts.object_ref)
  ) AS generated_result
FROM
  `your-gcp-project-id.adk_agent_logs.agent_events_v2` logs,
  UNNEST(logs.content_parts) AS parts
WHERE
  parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;

4. Querying Offloaded Content (Get Signed URLs)

SELECT
  timestamp,
  event_type,
  part.mime_type,
  part.storage_mode,
  part.object_ref.uri AS gcs_uri,
  -- Generate a signed URL to read the content directly (requires connection_id configuration)
  STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;

Additional resources


Connect these docs to Claude, VSCode, and more via MCP for real-time answers.