checkpoint
of the graph state at every super-step. Those checkpoints are saved to a thread
, which can be accessed after graph execution. Because threads
allow access to graph’s state after execution, several powerful capabilities including human-in-the-loop, memory, time travel, and fault-tolerance are all possible. Below, we’ll discuss each of these concepts in more detail.
thread_id
as part of the configurable
portion of the config:
StateSnapshot
object with the following key properties:
config
: Config associated with this checkpoint.metadata
: Metadata associated with this checkpoint.values
: Values of the state channels at this point in time.next
A tuple of the node names to execute next in the graph.tasks
: A tuple of PregelTask
objects that contain information about next tasks to be executed. If the step was previously attempted, it will include error information. If a graph was interrupted dynamically from within a node, tasks will contain additional data associated with interrupts.START
as the next node to be executed{'foo': '', 'bar': []}
and node_a
as the next node to be executednode_a
{'foo': 'a', 'bar': ['a']}
and node_b
as the next node to be executednode_b
{'foo': 'b', 'bar': ['a', 'b']}
and no next nodes to be executedbar
channel values contain outputs from both nodes as we have a reducer for bar
channel.
graph.get_state(config)
. This will return a StateSnapshot
object that corresponds to the latest checkpoint associated with the thread ID provided in the config or a checkpoint associated with a checkpoint ID for the thread, if provided.
get_state
will look like this:
graph.get_state_history(config)
. This will return a list of StateSnapshot
objects associated with the thread ID provided in the config. Importantly, the checkpoints will be ordered chronologically with the most recent checkpoint / StateSnapshot
being the first in the list.
get_state_history
will look like this:
invoke
a graph with a thread_id
and a checkpoint_id
, then we will re-play the previously executed steps before a checkpoint that corresponds to the checkpoint_id
, and only execute the steps after the checkpoint.
thread_id
is the ID of a thread.checkpoint_id
is an identifier that refers to a specific checkpoint within a thread.configurable
portion of the config:
checkpoint_id
. All of the steps after checkpoint_id
will be executed (i.e., a new fork), even if they have been executed previously. See this how to guide on time-travel to learn more about replaying.
checkpoints
, we can also edit the graph state. We do this using graph.update_state()
. This method accepts three different arguments:
config
thread_id
specifying which thread to update. When only the thread_id
is passed, we update (or fork) the current state. Optionally, if we include checkpoint_id
field, then we fork that selected checkpoint.
values
update_state
does NOT automatically overwrite the channel values for every channel, but only for the channels without reducers. Let’s walk through an example.
Let’s assume you have defined the state of your graph with the following schema (see full example above):
foo
key (channel) is completely changed (because there is no reducer specified for that channel, so update_state
overwrites it). However, there is a reducer specified for the bar
key, and so it appends "b"
to the state of bar
.
as_node
update_state
is as_node
. If you provided it, the update will be applied as if it came from node as_node
. If as_node
is not provided, it will be set to the last node that updated the state, if not ambiguous. The reason this matters is that the next steps to execute depend on the last node to have given an update, so this can be used to control which node executes next. See this how to guide on time-travel to learn more about forking state.
Store
interface. As an illustration, we can define an InMemoryStore
to store information about a user across threads. We simply compile our graph with a checkpointer, as before, and with our new in_memory_store
variable.
tuple
, which in this specific example will be (<user_id>, "memories")
. The namespace can be any length and represent anything, does not have to be user specific.
store.put
method to save memories to our namespace in the store. When we do this, we specify the namespace, as defined above, and a key-value pair for the memory: the key is simply a unique identifier for the memory (memory_id
) and the value (a dictionary) is the memory itself.
store.search
method, which will return all memories for a given user as a list. The most recent memory is the last in the list.
Item
) with certain attributes. We can access it as a dictionary by converting via .dict
as above.
The attributes it has are:
value
: The value (itself a dictionary) of this memorykey
: A unique key for this memory in this namespacenamespace
: A list of strings, the namespace of this memory typecreated_at
: Timestamp for when this memory was createdupdated_at
: Timestamp for when this memory was updatedfields
parameter or by specifying the index
parameter when storing memories:
in_memory_store
in LangGraph. The in_memory_store
works hand-in-hand with the checkpointer: the checkpointer saves state to threads, as discussed above, and the in_memory_store
allows us to store arbitrary information for access across threads. We compile the graph with both the checkpointer and the in_memory_store
as follows.
thread_id
, as before, and also with a user_id
, which we’ll use to namespace our memories to this particular user as we showed above.
in_memory_store
and the user_id
in any node by passing store: BaseStore
and config: RunnableConfig
as node arguments. Here’s how we might use semantic search in a node to find relevant memories:
store.search
method to get memories. Recall the memories are returned as a list of objects that can be converted to a dictionary.
user_id
is the same.
langgraph.json
file. For example:
langgraph-checkpoint
: The base interface for checkpointer savers (BaseCheckpointSaver) and serialization/deserialization interface (SerializerProtocol). Includes in-memory checkpointer implementation (InMemorySaver) for experimentation. LangGraph comes with langgraph-checkpoint
included.langgraph-checkpoint-sqlite
: An implementation of LangGraph checkpointer that uses SQLite database (SqliteSaver / AsyncSqliteSaver). Ideal for experimentation and local workflows. Needs to be installed separately.langgraph-checkpoint-postgres
: An advanced checkpointer that uses Postgres database (PostgresSaver / AsyncPostgresSaver), used in LangGraph Platform. Ideal for using in production. Needs to be installed separately..put
- Store a checkpoint with its configuration and metadata..put_writes
- Store intermediate writes linked to a checkpoint (i.e. pending writes)..get_tuple
- Fetch a checkpoint tuple using for a given configuration (thread_id
and checkpoint_id
). This is used to populate StateSnapshot
in graph.get_state()
..list
- List checkpoints that match a given configuration and filter criteria. This is used to populate state history in graph.get_state_history()
.ainvoke
, .astream
, .abatch
), asynchronous versions of the above methods will be used (.aput
, .aput_writes
, .aget_tuple
, .alist
).
InMemorySaver
, or async versions of Sqlite/Postgres checkpointers — AsyncSqliteSaver
/ AsyncPostgresSaver
checkpointers.langgraph_checkpoint
defines protocol for implementing serializers provides a default implementation (JsonPlusSerializer) that handles a wide variety of types, including LangChain and LangGraph primitives, datetimes, enums and more.
pickle
JsonPlusSerializer
, uses ormsgpack and JSON under the hood, which is not suitable for all types of objects.
If you want to fallback to pickle for objects not currently supported by our msgpack encoder (such as Pandas dataframes),
you can use the pickle_fallback
argument of the JsonPlusSerializer
:
EncryptedSerializer
to the serde
argument of any BaseCheckpointSaver
implementation. The easiest way to create an encrypted serializer is via from_pycryptodome_aes
, which reads the AES key from the LANGGRAPH_AES_KEY
environment variable (or accepts a key
argument):
LANGGRAPH_AES_KEY
is present, so you only need to provide the environment variable. Other encryption schemes can be used by implementing CipherProtocol
and supplying it to EncryptedSerializer
.