Skip to main content
This guide provides a quick overview for getting started with the Drasi tool. For a detailed listing of all Drasi features, parameters, and configurations, head to the Drasi documentation, and the langchain_drasi repository.

Overview

Drasi is a change detection platform that makes it easy and efficient to detect and react to changes in databases. The LangChain-Drasi integration creates reactive, change-driven AI agents by connecting external data changes with workflow execution. This allows agents to discover, subscribe to, and react to real-time query updates by bridging external data changes with agentic workflows. Drasi continuous queries stream real-time updates that trigger agent state transitions, modify memory, or dynamically control workflow execution—transforming static agents into ambient long-lived, responsive systems.

Details

ClassPackageSerializableJS supportDownloadsVersion
DrasiToollangchain-drasiPyPI - DownloadsPyPI - Version

Features

  • Query Discovery - Automatically identify available Drasi queries
  • Real-time Subscriptions - Monitor continuous query updates
  • Notification Handlers - Six built-in handlers for different use cases
    • Console
    • Logging
    • Memory
    • Buffer
    • LangChain Memory
    • LangGraph Memory
  • Custom Handlers - Extend base handler for domain-specific logic

Setup

To access the Drasi tool, you’ll need to have Drasi and the Drasi MCP server running.

Prerequisites

Credentials (Optional)

If your Drasi MCP server requires authentication, you can configure headers with Bearer tokens or other authentication methods:
Configure authentication
from langchain_drasi import MCPConnectionConfig

config = MCPConnectionConfig(
    server_url="http://localhost:8083",
    headers={"Authorization": "Bearer your-token"},
    timeout=30.0
)

Installation

The Drasi tool lives in the langchain-drasi package:
pip install -U langchain-drasi

Instantiation

Now we can instantiate an instance of the Drasi tool. You’ll need to configure the MCP connection and optionally add notification handlers to process real-time updates:
Initialize tool instance
from langchain_drasi import create_drasi_tool, MCPConnectionConfig, ConsoleHandler

# Configure connection to Drasi MCP server
config = MCPConnectionConfig(
    server_url="http://localhost:8083",
    timeout=30.0
)

# Create a notification handler
handler = ConsoleHandler()

# Create the tool
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

Invocation

Directly

Below is a simple example of calling the tool directly.
Call tool
# Discover available queries
queries = await tool.discover_queries()
# Returns: [QueryInfo, QueryInfo, ...]

# Subscribe to a specific query
await tool.subscribe("hot-freezers")
# Notifications routed to registered handlers

# Read current results from a query
result = await tool.read_query("active-orders")
# Returns: QueryResult with current data

As a ToolCall

We can also invoke the tool with a model-generated ToolCall, in which case a ToolMessage will be returned.

Within an agent

We can use the Drasi tool in a LangGraph agent to create reactive, event-driven workflows. For this we will need a model with tool-calling capabilities.
Agent with tool
from langchain_anthropic import ChatAnthropic
from langchain.agents import create_agent

# Initialize the model
model = ChatAnthropic(model="claude-sonnet-4-5-20250929")

# Create agent with Drasi tool
agent = create_agent(model, [tool])

# Run the agent
result = agent.invoke(
    {"messages": [{"role": "user", "content": "What queries are available?"}]}
)

print(result["messages"][-1].content)

result = agent.invoke(
    {"messages": [{"role": "user", "content": "Subscribe to the customer-orders query"}]}
)

print(result["messages"][-1].content)

Notification handlers

One of Drasi’s key features is its built-in notification handlers that process real-time query result changes. You can use these handlers to take specific actions based on the data changes.

Built-in handlers

ConsoleHandler - Outputs formatted notifications to stdout:
from langchain_drasi import ConsoleHandler

handler = ConsoleHandler()
LoggingHandler - Logs notifications using Python’s logging framework:
from langchain_drasi import LoggingHandler
import logging

handler = LoggingHandler(
    logger_name="drasi.notifications",
    log_level=logging.INFO
)
MemoryHandler - Stores notifications in memory with optional filtering:
from langchain_drasi import MemoryHandler

handler = MemoryHandler(max_size=100)

# Retrieve notifications
all_notifs = handler.get_all()
freezer_notifs = handler.get_by_query("hot-freezers")
added_events = handler.get_by_type("added")
BufferHandler - FIFO queue for sequential processing: This is useful for buffering incoming change notifications when your workflow is busy on something else; you can then have a loop in the workflow to consume the notifications from the buffer when it is ready.
from langchain_drasi import BufferHandler

handler = BufferHandler(max_size=100)
# Later, consume notifications
notification = handler.consume()  # Remove and return next notification
notification = handler.peek()     # View next notification without removing
LangGraphMemoryHandler - Inject updates directly into LangGraph checkpoints:
from langchain_drasi import LangGraphMemoryHandler
from langgraph.checkpoint.memory import MemorySaver

checkpoint_manager = MemorySaver()
handler = LangGraphMemoryHandler(
    checkpointer=checkpoint_manager,
    thread_id="your-thread-id"
)

Custom handlers

You can create custom handlers by extending BaseDrasiNotificationHandler:
from langchain_drasi import BaseDrasiNotificationHandler

class CustomHandler(BaseDrasiNotificationHandler):
    def on_result_added(self, query_name: str, added_data: dict):
        # Handle new results
        print(f"New result in {query_name}: {added_data}")

    def on_result_updated(self, query_name: str, updated_data: dict):
        # Handle updated results
        print(f"Updated result in {query_name}: {updated_data}")

    def on_result_deleted(self, query_name: str, deleted_data: dict):
        # Handle deleted results
        print(f"Deleted result in {query_name}: {deleted_data}")

handler = CustomHandler()
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

Examples

  • Interactive Chat: A chat application that uses Drasi for real-time memory updates.
  • Terminator Game: A game that leverages Drasi for dynamic NPC behavior.

Use cases

Drasi is particularly useful for building ambient agents that need to react to real-time data changes. Some example use cases include:
  • AI Co-pilots - Assistants that monitor and respond to system events
  • AI game players - NPCs that adapt to in-game events
  • IoT Monitoring - Agents that process sensor data streams
  • Customer Support - Bots that react to ticket updates or customer actions
  • DevOps Assistants - Tools that monitor infrastructure changes
  • Collaborative Editing - Systems that respond to document or code changes

API reference

For detailed documentation of all Drasi features and configurations, head to the API reference.
Connect these docs to Claude, VSCode, and more via MCP for real-time answers.