BigQuery Callback Handler
CommunityPythonPreview
Google BigQuery is a serverless and cost-effective enterprise data warehouse that works across clouds and scales with your data.
The BigQueryCallbackHandler allows you to log events from LangChain and LangGraph to Google BigQuery. This is useful for monitoring, auditing, and analyzing the performance of your LLM applications.
Key features:
- LangGraph support: Automatic detection of LangGraph nodes with
NODE_STARTING, NODE_COMPLETED, and GRAPH_START/END events
- Latency tracking: Built-in latency measurement for all LLM and tool calls
- Event filtering: Configurable allowlist/denylist to control which events are logged
- Graph context manager: Explicit graph execution boundaries with accurate timing
- Real-time dashboard: FastAPI-based monitoring webapp with live event streaming
Preview releaseThe BigQuery Callback Handler is in Preview. APIs and functionality are subject to change.
For more information, see the
launch stage descriptions.
BigQuery Storage Write APIThis feature uses the BigQuery Storage Write API, which is a paid service.
For information on costs, see the
BigQuery documentation.
Installation
You need to install langchain-google-community with bigquery extra dependencies. For this example, you will also need langchain-google-genai and langgraph.
pip install "langchain-google-community[bigquery]" langchain langchain-google-genai langgraph
Prerequisites
- Google Cloud Project with the BigQuery API enabled.
- BigQuery Dataset: Create a dataset to store logging tables before using the callback handler. The callback handler automatically creates the necessary events table within the dataset if the table does not exist.
- Google Cloud Storage Bucket (Optional): If you plan to log multimodal content (images, audio, etc.), creating a GCS bucket is recommended for offloading large files.
- Authentication:
- Local: Run
gcloud auth application-default login.
- Cloud: Ensure your service account has the required permissions.
IAM Permissions
For the callback handler to work properly, the principal (e.g., service account, user account) under which the application is running needs these Google Cloud roles:
roles/bigquery.jobUser at Project Level to run BigQuery queries.
roles/bigquery.dataEditor at Table Level to write log/event data.
- If using GCS offloading:
roles/storage.objectCreator and roles/storage.objectViewer on the target bucket.
Use with LangGraph agent
To use the BigQueryCallbackHandler with a LangGraph agent, instantiate it with your Google Cloud project ID, dataset ID, and table ID. Use the graph_context() method to track graph execution boundaries and enable GRAPH_START/GRAPH_END events with latency measurements.
Pass session_id, user_id, and agent via the metadata dictionary in the config object when invoking the agent.
import os
from datetime import datetime
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
from langchain_google_genai import ChatGoogleGenerativeAI
# 1. Define tools for the agent
@tool
def get_current_time() -> str:
"""Returns the current local time."""
now = datetime.now()
return f"Current time: {now.strftime('%I:%M:%S %p')} on {now.strftime('%B %d, %Y')}"
@tool
def get_weather(city: str) -> str:
"""Returns the current weather for a specific city."""
# Simulated weather data (use real API in production)
weather_data = {
"new york": {"temp": 22, "condition": "Clear"},
"tokyo": {"temp": 24, "condition": "Sunny"},
"london": {"temp": 14, "condition": "Overcast"},
}
city_lower = city.lower()
if city_lower in weather_data:
data = weather_data[city_lower]
return f"Weather in {city.title()}: {data['temp']}°C, {data['condition']}"
return f"Weather data for '{city}' not available."
@tool
def convert_currency(amount: float, from_currency: str, to_currency: str) -> str:
"""Convert an amount between currencies."""
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "JPY": 0.0067}
from_curr, to_curr = from_currency.upper(), to_currency.upper()
if from_curr not in rates or to_curr not in rates:
return f"Unknown currency"
result = amount * rates[from_curr] / rates[to_curr]
return f"{amount} {from_curr} = {result:.2f} {to_curr}"
def run_agent_example(project_id: str):
"""Run a LangGraph agent with BigQuery logging."""
dataset_id = "agent_analytics"
table_id = "agent_events_v2"
# 2. Configure the callback handler
config = BigQueryLoggerConfig(
batch_size=1,
batch_flush_interval=0.5,
)
handler = BigQueryCallbackHandler(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
config=config,
graph_name="travel_assistant", # Enable LangGraph tracking
)
# 3. Create the LLM and agent
# Use project parameter for Vertex AI, or api_key for Gemini Developer API
llm = ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
project=project_id, # For Vertex AI
)
tools = [get_current_time, get_weather, convert_currency]
agent = create_agent(llm, tools)
# 4. Run with graph_context for GRAPH_START/GRAPH_END events
query = "What time is it? What's the weather in Tokyo? How much is 100 USD in EUR?"
run_metadata = {
"session_id": "session-001",
"user_id": "user-123",
"agent": "travel_assistant",
}
with handler.graph_context("travel_assistant", metadata=run_metadata):
result = agent.invoke(
{"messages": [HumanMessage(content=query)]},
config={
"callbacks": [handler],
"metadata": run_metadata,
},
)
print(f"Response: {result['messages'][-1].content}")
# 5. Clean up
handler.shutdown()
if __name__ == "__main__":
project_id = os.environ.get("GCP_PROJECT_ID", "your-project-id")
run_agent_example(project_id)
Configuration options
You can customize the callback handler using BigQueryLoggerConfig.
To disable the handler from logging data to the BigQuery table, set this parameter to False.
clustering_fields
List[str]
default:"['event_type', 'agent', 'user_id']"
The fields used to cluster the BigQuery table when it is automatically created.
The name of the GCS bucket to offload large content (images, blobs, large text) to. If not provided, large content may be truncated or replaced with placeholders.
The BigQuery connection ID (e.g., us.my-connection) to use as the authorizer for ObjectRef columns. Required for using ObjectRef with BigQuery ML.
(500 KB) The maximum length (in characters) of text content to store inline in BigQuery before offloading to GCS (if configured) or truncating.
The number of events to batch before writing to BigQuery.
The maximum time (in seconds) to wait before flushing a partial batch.
Seconds to wait for logs to flush during shutdown.
A list of event types to log. If None, all events are logged except those in event_denylist.
A list of event types to skip logging.
Whether to log detailed content parts (including GCS references).
table_id
str
default:"agent_events_v2"
The default table ID to use if not explicitly provided to the callback handler constructor.
retry_config
RetryConfig
default:"RetryConfig()"
Configuration for retry logic (max retries, delay, multiplier) when writing to BigQuery fails.
The maximum number of events to hold in the internal buffer queue before dropping new events.
The following code sample shows how to define a configuration for the BigQuery callback handler with event filtering:
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
# 1. Configure BigQueryLoggerConfig
config = BigQueryLoggerConfig(
enabled=True,
event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # Only log these specific events
shutdown_timeout=10.0, # Wait up to 10s for logs to flush on exit
max_content_length=500, # Truncate content to 500 characters
)
# 2. Initialize the Callback Handler
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
config=config,
)
Schema and production setup
The plugin automatically creates the table if it does not exist. However, for production, we recommend creating the table manually using the following DDL, which utilizes the JSON type for flexibility and REPEATED RECORDs for multimodal content.
Recommended DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`
(
timestamp TIMESTAMP NOT NULL OPTIONS(description="The UTC timestamp when the event occurred."),
event_type STRING OPTIONS(description="The category of the event."),
agent STRING OPTIONS(description="The name of the agent."),
session_id STRING OPTIONS(description="A unique identifier for the conversation session."),
invocation_id STRING OPTIONS(description="A unique identifier for a single turn."),
user_id STRING OPTIONS(description="The identifier of the end-user."),
trace_id STRING OPTIONS(description="OpenTelemetry trace ID."),
span_id STRING OPTIONS(description="OpenTelemetry span ID."),
parent_span_id STRING OPTIONS(description="OpenTelemetry parent span ID."),
content JSON OPTIONS(description="The primary payload of the event."),
content_parts ARRAY<STRUCT<
mime_type STRING,
uri STRING,
object_ref STRUCT<
uri STRING,
version STRING,
authorizer STRING,
details JSON
>,
text STRING,
part_index INT64,
part_attributes STRING,
storage_mode STRING
>> OPTIONS(description="For multi-modal events, contains a list of content parts."),
attributes JSON OPTIONS(description="Arbitrary key-value pairs."),
latency_ms JSON OPTIONS(description="Latency measurements."),
status STRING OPTIONS(description="The outcome of the event."),
error_message STRING OPTIONS(description="Detailed error message."),
is_truncated BOOLEAN OPTIONS(description="Flag indicating if content was truncated.")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;
Event types and payloads
The content column contains a JSON object specific to the event_type.
The content_parts column provides a structured view of the content, especially useful for images or offloaded data.
Content Truncation
- Variable content fields are truncated to
max_content_length (configured in BigQueryLoggerConfig, default 500KB).
- If
gcs_bucket_name is configured, large content is offloaded to GCS instead of being truncated, and a reference is stored in content_parts.object_ref.
LLM interactions
These events track the raw requests sent to and responses received from the LLM.
| Event Type | Content (JSON) Structure | Attributes (JSON) | Example Content (Simplified) |
|---|
LLM_REQUEST | Chat Model: {"messages": [{"content": "..."}]} Legacy Model: {"prompts": ["..."]} | {"tags": ["tag1"], "model": "gemini-2.5-flash"} | {"messages": [{"content": "What is the weather?"}]} |
LLM_RESPONSE | "The weather is sunny." (Stored as JSON string) | {"usage": {"total_tokens": 20}} | "The weather is sunny." |
LLM_ERROR | null | {} | null |
These events track the execution of tools by the agent.
| Event Type | Content (JSON) Structure |
|---|
TOOL_STARTING | Input String: "city='Paris'" |
TOOL_COMPLETED | Output String: "25°C, Sunny" |
TOOL_ERROR | "Error: Connection timeout" |
Chain Execution
These events track the start and end of high-level chains/graphs.
| Event Type | Content (JSON) Structure |
|---|
CHAIN_START | {"messages": [...]} |
CHAIN_END | {"output": "..."} |
CHAIN_ERROR | null (See error_message column) |
Retriever usage
These events track the execution of retrievers.
| Event Type | Content (JSON) Structure |
|---|
RETRIEVER_START | Query String: "What is the capital of France?" |
RETRIEVER_END | Documents List: [{"page_content": "Paris is the capital...", "metadata": {"source": "wiki"}}] |
RETRIEVER_ERROR | null (See error_message column) |
Agent Actions
These events track specific actions taken by the agent.
| Event Type | Content (JSON) Structure |
|---|
AGENT_ACTION | {"tool": "Calculator", "input": "2 + 2"} |
AGENT_FINISH | {"output": "The answer is 4"} |
Other Events
| Event Type | Content (JSON) Structure |
|---|
TEXT | Arbitrary Text: "Some logging text..." |
Advanced analysis queries
Once your agent is running and logging events, you can perform power analysis on the agent_events_v2 table.
1. Reconstruct a Trace (Conversation Turn)
Use the trace_id to group all events (Chain, LLM, Tool) belonging to a single execution flow.
SELECT
timestamp,
event_type,
span_id,
parent_span_id,
-- Extract summary or specific content based on event type
COALESCE(
JSON_VALUE(content, '$.messages[0].content'),
JSON_VALUE(content, '$.summary'),
JSON_VALUE(content)
) AS summary,
JSON_VALUE(latency_ms, '$.total_ms') AS duration_ms
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
-- Replace with a specific trace_id from your logs
trace_id = '019bb986-a0db-7da1-802d-2725795ab340'
ORDER BY
timestamp ASC;
2. Analyze LLM Latency & Token Usage
Calculate the average latency and total token usage for your LLM calls.
SELECT
JSON_VALUE(attributes, '$.model') AS model,
COUNT(*) AS total_calls,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms,
SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokens
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
event_type = 'LLM_RESPONSE'
GROUP BY
1;
3. Analyze Multimodal Content with BigQuery Remote Model (Gemini)
If you are offloading images to GCS, you can use BigQuery ML to analyze them directly.
SELECT
logs.session_id,
-- Get a signed URL for the image (optional, for viewing)
STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
-- Analyze the image using a remote model (e.g., gemini-2.5-flash)
AI.GENERATE(
('Describe this image briefly. What company logo?', parts.object_ref)
) AS generated_result
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2` logs,
UNNEST(logs.content_parts) AS parts
WHERE
parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;
4. Analyze Span Hierarchy & Duration
Visualize the execution flow and performance of your agent’s operations (LLM calls, Tool usage) using span IDs.
SELECT
span_id,
parent_span_id,
event_type,
timestamp,
-- Extract duration from latency_ms for completed operations
CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms,
-- Identify the specific tool or operation
COALESCE(
JSON_VALUE(content, '$.tool'),
'LLM_CALL'
) as operation
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE trace_id = 'your-trace-id'
AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
ORDER BY timestamp ASC;
5. Querying Offloaded Content (Get Signed URLs)
SELECT
timestamp,
event_type,
part.mime_type,
part.storage_mode,
part.object_ref.uri AS gcs_uri,
-- Generate a signed URL to read the content directly (requires connection_id configuration)
STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;
6. Advanced SQL Scenarios
These advanced patterns demonstrate how to sessionize data, analyze tool usage, and perform root cause analysis using BigQuery ML.
-- 1. Sessionize Conversation History (Create View)
-- Consolidates all events into a single row per session with a formatted history.
CREATE OR REPLACE VIEW `your-project.your-dataset.agent_sessions` AS
SELECT
session_id,
user_id,
MIN(timestamp) AS session_start,
MAX(timestamp) AS session_end,
ARRAY_AGG(
STRUCT(timestamp, event_type, TO_JSON_STRING(content) as content, error_message)
ORDER BY timestamp ASC
) AS events,
STRING_AGG(
CASE
WHEN event_type = 'USER_MESSAGE_RECEIVED' THEN CONCAT('User: ', JSON_VALUE(content, '$.input'))
WHEN event_type = 'LLM_RESPONSE' THEN CONCAT('Agent: ', JSON_VALUE(content, '$.text'))
WHEN event_type = 'TOOL_STARTING' THEN CONCAT('SYS: Calling ', JSON_VALUE(content, '$.tool_name'))
WHEN event_type = 'TOOL_COMPLETED' THEN CONCAT('SYS: Result from ', JSON_VALUE(content, '$.tool_name'))
WHEN event_type = 'TOOL_ERROR' THEN CONCAT('SYS: ERROR in ', JSON_VALUE(content, '$.tool_name'))
ELSE NULL
END,
'\n' ORDER BY timestamp ASC
) AS full_conversation
FROM
`your-project.your-dataset.agent_events_v2`
GROUP BY
session_id, user_id;
-- 2. Tool Usage Analysis
-- Extract tool names and count execution status
SELECT
JSON_VALUE(content, '$.tool_name') AS tool_name,
event_type,
COUNT(*) as count
FROM `your-project.your-dataset.agent_events_v2`
WHERE event_type IN ('TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR')
GROUP BY 1, 2
ORDER BY tool_name, event_type;
-- 3. Granular Cost & Token Estimation
-- Estimate tokens based on content character length (approx 4 chars/token)
SELECT
session_id,
COUNT(*) as interaction_count,
SUM(LENGTH(TO_JSON_STRING(content))) / 4 AS estimated_tokens,
-- Example cost: $0.0001 per 1k tokens
ROUND((SUM(LENGTH(TO_JSON_STRING(content))) / 4) / 1000 * 0.0001, 6) AS estimated_cost_usd
FROM `your-project.your-dataset.agent_events_v2`
GROUP BY session_id
ORDER BY estimated_cost_usd DESC
LIMIT 5;
-- 4. AI-Powered Root Cause Analysis (Requires BigQuery ML)
-- Use Gemini to analyze failed sessions
SELECT
session_id,
AI.GENERATE(
('Analyze this conversation and explain the failure root cause. Log: ', full_conversation),
connection_id => 'your-project.us.bqml_connection',
endpoint => 'gemini-2.5-flash'
).result AS root_cause_explanation
FROM `your-project.your-dataset.agent_sessions`
WHERE error_message IS NOT NULL
LIMIT 5;
Conversational Analytics in BigQuery
Conversational AnalyticsYou can also use BigQuery Conversational Analytics to analyze your agent logs using natural language.
Just ask questions like:
- “Show me the error rate over time”
- “What are the most common tool calls?”
- “Identify sessions with high token usage”
Looker Studio Dashboard
You can visualize your agent’s performance using our pre-built Looker Studio Dashboard template.
To connect this dashboard to your own BigQuery table, use the following link format, replacing the placeholders with your specific project, dataset, and table IDs:
https://lookerstudio.google.com/reporting/create?c.reportId=f1c5b513-3095-44f8-90a2-54953d41b125&ds.ds3.connector=bigQuery&ds.ds3.type=TABLE&ds.ds3.projectId=<your-project-id>&ds.ds3.datasetId=<your-dataset-id>&ds.ds3.tableId=<your-table-id>
LangGraph integration
The BigQueryCallbackHandler provides enhanced support for LangGraph agents with automatic node detection, graph-level tracking, and latency measurements.
LangGraph event types
In addition to standard LangChain events, the callback handler automatically detects and logs LangGraph-specific events:
| Event Type | Description |
|---|
NODE_STARTING | Emitted when a LangGraph node begins execution |
NODE_COMPLETED | Emitted when a LangGraph node completes successfully |
NODE_ERROR | Emitted when a LangGraph node fails |
GRAPH_START | Emitted when a graph execution begins (via context manager) |
GRAPH_END | Emitted when a graph execution completes |
GRAPH_ERROR | Emitted when a graph execution fails |
Graph context manager
Use the graph_context() method to explicitly mark graph execution boundaries. This enables GRAPH_START and GRAPH_END events with accurate latency measurements:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
# Initialize handler with graph name
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="agent_analytics",
table_id="agent_events_v2",
graph_name="my_agent",
)
# Create your agent
agent = create_agent(llm, tools)
# Use the graph context manager for proper GRAPH_START/GRAPH_END events
run_metadata = {
"session_id": "session-123",
"user_id": "user-456",
"agent": "my_agent",
}
with handler.graph_context("my_agent", metadata=run_metadata):
result = agent.invoke(
{"messages": [HumanMessage(content="What is the weather in Tokyo?")]},
config={
"callbacks": [handler],
"metadata": run_metadata,
},
)
Latency tracking
The callback handler automatically tracks latency for all operations and stores measurements in the latency_ms JSON column:
-- Query latency by event type
SELECT
event_type,
agent,
COUNT(*) as count,
ROUND(AVG(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64)), 2) as avg_latency_ms,
ROUND(APPROX_QUANTILES(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64), 100)[OFFSET(95)], 2) as p95_latency_ms
FROM `your-project.your-dataset.agent_events_v2`
WHERE DATE(timestamp) = CURRENT_DATE()
AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED', 'GRAPH_END')
GROUP BY event_type, agent
ORDER BY avg_latency_ms DESC;
Event filtering
Use event_allowlist and event_denylist to control which events are logged:
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
# Production config: Only log important events
config = BigQueryLoggerConfig(
event_allowlist=[
"LLM_RESPONSE",
"LLM_ERROR",
"TOOL_COMPLETED",
"TOOL_ERROR",
"GRAPH_END",
"GRAPH_ERROR",
],
)
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="agent_analytics",
config=config,
)
Or exclude noisy events:
# Exclude chain events but log everything else
config = BigQueryLoggerConfig(
event_denylist=["CHAIN_START", "CHAIN_END"],
)
Examples and resources
Example code
The following examples demonstrate various features of the BigQuery callback handler:
| Example | Description |
|---|
| Basic example | Basic callback usage with LLM calls |
| LangGraph agent | Complete ReAct agent with 6 realistic tools |
| Async example | Async handler with concurrent queries |
| Event filtering | Allowlist/denylist configurations |
| Sample data generator | Generate sample data across multiple agent types |
Analytics notebook
The LangGraph Agent Analytics notebook provides comprehensive BigQuery analytics queries for:
- Real-time event monitoring
- Tool usage analytics
- Latency analysis and trends
- Error debugging
- User engagement metrics
- Time-series visualization
Real-time monitoring dashboard
A FastAPI-based monitoring dashboard is available for real-time agent monitoring:
Features:
- Live event stream via Server-Sent Events (SSE)
- Interactive charts for event distribution and latency trends
- Session tracing with detailed timeline view
- 20+ REST API endpoints for analytics queries
- Auto-refresh every 5 seconds
# Run the dashboard
cd libs/community/examples/bigquery_callback/webapp
pip install -r requirements.txt
uvicorn main:app --port 8001
# Open http://localhost:8001
Additional resources