Skip to main content

Documentation Index

Fetch the complete documentation index at: https://langchain-5e9cc07a-preview-mdrxyo-1777658790-7be347c.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Pregel implements LangGraph’s runtime, managing the execution of LangGraph applications. Compiling a StateGraph or creating an @entrypoint produces a Pregel instance that can be invoked with input. This guide explains the runtime at a high level and provides instructions for directly implementing applications with Pregel.
Note: The Pregel runtime is named after Google’s Pregel algorithm, which describes an efficient method for large-scale parallel computation using graphs.

Overview

In LangGraph, Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model. Each step consists of three phases:
  • Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
  • Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
  • Update: Update the channels with the values written by the actors in this step.
Repeat until no actors are selected for execution, or a maximum number of steps is reached.

Actors

An actor is a PregelNode. It subscribes to channels, reads data from them, and writes data to them. It can be thought of as an actor in the Pregel algorithm. PregelNodes implement LangChain’s Runnable interface.

Channels

Channels are used to communicate between actors (PregelNodes). Each channel has a value type, an update type, and an update function—which takes a sequence of updates and modifies the stored value. Channels can be used to send data from one chain to another, or to send data from a chain to itself in a future step.

LastValue

LastValue is the default channel type. It stores the last value written to it, overwriting any previous value. Use it for input and output values, or for passing data from one step to the next.
from langgraph.channels import LastValue

channel: LastValue[int] = LastValue(int)

Topic

Topic is a configurable PubSub channel useful for sending multiple values between actors or accumulating output across steps. It can be configured to deduplicate values or to accumulate all values written during a run.
from langgraph.channels import Topic

# Accumulate all values written across steps
channel: Topic[str] = Topic(str, accumulate=True)

BinaryOperatorAggregate

BinaryOperatorAggregate stores a persistent value that is updated by applying a binary operator to the current value and each new update. Use it to compute running aggregates across steps.
import operator
from langgraph.channels import BinaryOperatorAggregate

# Running total: each write adds to the current value
total = BinaryOperatorAggregate(int, operator.add)

DeltaChannel (beta)

DeltaChannel requires langgraph>=1.2 and is currently in beta. The API may change in future releases.
DeltaChannel stores only the incremental delta at each step rather than the full accumulated value. This is most useful for channels that are written frequently and accumulate large values over time—for example, a conversation message list in a long-running thread. Without delta storage, the full list is re-serialized into every checkpoint; with DeltaChannel, only the new messages written at each step are stored.
Consider DeltaChannel when a channel is both written to frequently and grows large over time. A good signal: if you notice checkpoint sizes growing linearly with thread length for a particular channel, DeltaChannel is likely a good fit.
Use DeltaChannel in an Annotated type annotation the same way you would use a plain reducer:
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.channels import DeltaChannel


def my_reducer(state: list[str], writes: Sequence[list[str]]) -> list[str]:
    result = list(state)
    for write in writes:
        result.extend(write)
    return result


class State(TypedDict):
    messages: Annotated[list[str], DeltaChannel(my_reducer)]

Bulk reducer requirement

The reducer passed to DeltaChannel is a bulk reducer: it receives the current state and a sequence of all writes from the current step in a single call—not pairwise like a standard reducer. This differs from the per-key reducers used with Annotated in a StateGraph, where the reducer is called once per update.
The bulk reducer must be associative (batching-invariant):
reducer(reducer(state, [xs]), [ys]) == reducer(state, [xs, ys])
If your reducer is not associative, the reconstructed state may differ depending on how LangGraph batches writes across steps, producing inconsistent behavior.
Here are bulk reducers for the two most common cases:
from typing import Any, Sequence


# List: append all writes in order
def list_reducer(state: list[Any], writes: Sequence[list[Any]]) -> list[Any]:
    result = list(state)
    for write in writes:
        result.extend(write)
    return result


# Dict: merge all writes, last write wins on key conflicts
def dict_reducer(
    state: dict[str, Any], writes: Sequence[dict[str, Any]]
) -> dict[str, Any]:
    result = dict(state)
    for write in writes:
        result.update(write)
    return result
Both are associative: applying batches one at a time produces the same result as applying them together.

Use snapshot_frequency for bounded read latency

Without snapshots, reading a DeltaChannel value requires replaying the full write history—O(N) for a thread with N steps. Setting snapshot_frequency=K writes a full snapshot every K pregel steps, bounding read depth to at most K steps:
class State(TypedDict):
    messages: Annotated[
        list[str],
        DeltaChannel(my_reducer, snapshot_frequency=5),
    ]
Higher values of snapshot_frequency reduce storage overhead but increase read latency. Lower values bound latency more tightly at the cost of larger checkpoints. None (the default) skips snapshots entirely—appropriate when reads are rare or threads are short.

Examples

While most users will interact with Pregel through the StateGraph API or the @entrypoint decorator, it is possible to interact with Pregel directly. Below are a few different examples to give you a sense of the Pregel API.
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}

High-level API

LangGraph provides two high-level APIs for creating a Pregel application: the StateGraph (Graph API) and the Functional API.
The StateGraph (Graph API) is a higher-level abstraction that simplifies the creation of Pregel applications. It allows you to define a graph of nodes and edges. When you compile the graph, the StateGraph API automatically creates the Pregel application for you.
from typing import TypedDict

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: str | None
    score: float | None

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
builder.add_edge("write_essay", "score_essay")

# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
The compiled Pregel instance will be associated with a list of nodes and channels. You can inspect the nodes and channels by printing them.
print(graph.nodes)
You will see something like this:
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)
You should see something like this
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}