TURION .AI

LangGraph State, Checkpointing, and Resumable Agents

TURION.AI · · 5 min read
LangGraph agent architecture with state persistence, checkpoint snapshots, and time-travel controls

Build a production-grade LangGraph agent with TypedDict state, SQLite checkpointing, and human-in-the-loop interrupts. Complete runnable code.

Every LangGraph tutorial online shows you a toy graph: a node calls an LLM, the LLM calls a tool, done. Real production agents need three things those tutorials skip:

  • Typed state so your nodes compile and don’t silently drop fields.
  • Durable checkpointing so a crash mid-way doesn’t destroy a 47-step conversation.
  • Resumable execution so you can interrupt, inspect state, time-travel, and continue.

We’re going to build a research agent that plans, executes tool calls, and pauses for human approval before each action — with full SQLite checkpointing, typed state, and the ability to rewind and edit past decisions. The code runs end-to-end on your machine.

Prerequisites

  • Python 3.10 or higher
  • An OpenAI API key (or any LangChain-compatible provider)
  • ~10 minutes of your time
mkdir langgraph-persistent-agent && cd langgraph-persistent-agent
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
pip install langgraph langchain-openai

Create a .env file with your key:

OPENAI_API_KEY=sk-your-key-here

Defining Typed State

LangGraph graphs operate on a shared state object. Using TypedDict (or LangGraph’s newer @dataclass-style State class), every node sees exactly what it needs and the type-checker catches mistakes before they hit production.

We’ll define a research agent state that tracks messages, a plan, the current step index, tool results, and optional metadata:

import uuid
import json
import datetime as dt
from typing import Annotated, TypedDict, List

from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.messages import AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command, interrupt


class ResearchState(TypedDict):
    """Typed state for our research agent."""
    messages: Annotated[List[BaseMessage], "conversation_history"]
    plan: List[str]
    current_step: int
    tool_results: List[str]
    metadata: dict

The Annotated type on messages tells LangGraph this field is a reducer — new messages get appended to the list rather than replaced. The remaining fields use the default assignment reducer (last write wins), which is fine for plan, step, and results.

The Nodes

Our research agent has three nodes:

  1. planner_node — generates a research plan from the user query.
  2. tool_caller_node — executes the current plan step, interrupting for approval.
  3. summarizer_node — compiles all results into a final answer.
# Simulated research tool — replace with real web-search tools in production
class SearchTool:
    """A simulated search tool for demonstration."""

    name = "web_search"
    description = "Search the web for information about a topic."

    def search(self, query: str) -> str:
        # In production, use Tavily, DuckDuckGo, or a real search API
        return (
            f"Search results for '{query}': "
            f"The most recent findings suggest that {query} "
            f"is an active area with developments in automation, "
            f"machine learning integration, and open-source tooling. "
            f"See https://example.com/{query.replace(' ', '-')} for details."
        )


search_tool = SearchTool()
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)


def planner_node(state: ResearchState) -> dict:
    """Generate a research plan based on the user's query."""
    print("\n[PLANNER] Generating research plan...")
    plan_prompt = (
        "You are a research assistant. Given a user query, "
        "create a numbered research plan with 2-4 steps. "
        "Respond ONLY with a JSON array of step strings. "
        'Example: ["Step 1: ...", "Step 2: ..."]\n\n'
        f"Query: {state['messages'][-1].content}"
    )

    response = model.invoke([HumanMessage(content=plan_prompt)])
    plan_text = response.content.strip()

    # Parse the JSON array from the response
    try:
        if plan_text.startswith("```"):
            # Strip markdown code fences if present
            plan_text = plan_text.split("```")[1]
            if plan_text.startswith("json"):
                plan_text = plan_text[4:]
        plan = json.loads(plan_text)
    except (json.JSONDecodeError, IndexError):
        # Fallback: create a simple plan
        plan = [
            f"Search for key information about {state['messages'][-1].content}",
            "Compile findings into a summary"
        ]

    return {
        "plan": plan,
        "current_step": 0,
        "metadata": {
            "plan_created_at": dt.datetime.now().isoformat(),
            **state.get("metadata", {})
        }
    }


def tool_caller_node(state: ResearchState) -> Command:
    """Execute the current plan step — but pause for human approval first."""
    step = state["current_step"]
    plan = state["plan"]

    if step >= len(plan):
        # No more plan steps — go to summarizer
        return Command(goto="summarizer_node")

    step_description = plan[step]
    print(f"\n[TOOL_CALLER] Step {step + 1}/{len(plan)}: {step_description}")

    # Build the tool call
    search_query = state["messages"][-1].content
    tool_result = search_tool.search(f"{step_description} about {search_query}")

    # Build the proposed action description for human review
    interrupt_question = (
        f"Step {step + 1}/{len(plan)} complete.\n"
        f"Plan: {step_description}\n"
        f"Tool: web_search\n"
        f"Query: {step_description} about {search_query}\n"
        f"Result: {tool_result[:200]}...\n\n"
        f"Do you approve this result? Reply 'yes' to accept, "
        f"or provide a correction."
    )

    # PAUSE HERE — interrupts execution and waits for Command(resume=...)
    resume_value = interrupt(interrupt_question)

    # When we resume, resume_value contains the human's response
    if isinstance(resume_value, str) and resume_value.lower().strip() != "yes":
        # Human provided a correction — incorporate it
        tool_result += f"\n\n[Human correction]: {resume_value}"

    return Command(
        update={
            "tool_results": state["tool_results"] + [tool_result],
            "current_step": step + 1,
            "messages": state["messages"] + [
                AIMessage(content=f"Completed step {step + 1}: {step_description}"),
                ToolMessage(content=tool_result, tool_call_id=str(uuid.uuid4())),
            ],
        },
        goto="tool_caller_node"  # Loop back for the next step
    )


def summarizer_node(state: ResearchState) -> dict:
    """Compile all tool results into a final answer."""
    print("\n[SUMMARIZER] Compiling final research summary...")

    results_text = "\n\n".join(
        f"Step {i + 1}: {r}" for i, r in enumerate(state["tool_results"])
    )

    summary_prompt = (
        "You are a research assistant. Based on the following research results, "
        "write a comprehensive summary answering the user's original question.\n\n"
        f"Original query: {state['messages'][0].content}\n\n"
        f"Results:\n{results_text}\n\n"
        "Write a well-structured summary with key findings."
    )

    response = model.invoke([HumanMessage(content=summary_prompt)])

    return {
        "messages": state["messages"] + [AIMessage(content=response.content)],
    }

Building the Graph

With nodes defined, assembling the StateGraph is straightforward. The tool_caller_node returns Command(goto=...) to decide where execution goes next, so we don’t need a separate conditional edge for it.

def build_research_graph(checkpointer=None) -> StateGraph:
    """Construct and compile the research agent graph."""
    graph = StateGraph(ResearchState)

    # Add nodes
    graph.add_node("planner_node", planner_node)
    graph.add_node("tool_caller_node", tool_caller_node)
    graph.add_node("summarizer_node", summarizer_node)

    # Define entry point and edges
    graph.set_entry_point("planner_node")
    graph.add_edge("planner_node", "tool_caller_node")
    # tool_caller_node uses Command(goto=...) to self-loop or go to summarizer
    graph.add_edge("summarizer_node", END)

    return graph.compile(checkpointer=checkpointer)

Running with an In-Memory Checkpointer

Let’s start with the simplest setup: MemorySaver. This stores checkpoints in RAM — fine for prototyping, disastrous for production.

# Run the agent
memory_checkpointer = MemorySaver()
app = build_research_graph(checkpointer=memory_checkpointer)

# Every execution gets a unique thread ID for isolation
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# Seed the conversation
user_query = "What are the main trade-offs between vLLM and SGLang for LLM inference?"
inputs = {
    "messages": [HumanMessage(content=user_query)],
    "plan": [],
    "current_step": 0,
    "tool_results": [],
    "metadata": {"query": user_query}
}

print(f"=== Starting research thread {thread_id[:8]} ===")

# First invocation: runs planner → tool_caller_node → INTERRUPTS
for event in app.stream(inputs, config, stream_mode="updates"):
    for node_name, update in event.items():
        print(f"\n>>> Completed: {node_name}")

# Check what interrupts we have
state = app.get_state(config)
interrupts = state.tasks
if interrupts:
    print("\n" + "=" * 50)
    print(f"INTERRUPT: {interrupts[0].interrupts}")
    print("=" * 50)

When you run this, you’ll see the graph execute the planner, run the first tool step, and interrupt — surfacing the action for human review.

Resume with Command

After the interrupt, we resume by invoking the graph with a Command(resume=...) value:

# Resume with approval
approval_response = "yes"
print(f"\n[RESUME] Approving with: {approval_response}")

for event in app.stream(
    Command(resume=approval_response),
    config,
    stream_mode="updates"
):
    for node_name, update in event.items():
        print(f"\n>>> Completed: {node_name}")

# After processing all steps, the summarizer produces the final answer
final_state = app.get_state(config)
messages = final_state.values.get("messages", [])
for msg in messages:
    if isinstance(msg, AIMessage) and not msg.tool_calls:
        print(f"\n{'=' * 50}")
        print(f"FINAL RESPONSE:\n{msg.content}")
        print(f"{'=' * 50}")
        break

The flow is:

  1. Planner generates the research plan
  2. Tool caller executes step 1 and interrupts
  3. We resume with Command(resume="yes")
  4. Tool caller loops through remaining steps, interrupting each one
  5. Once all steps are complete, the summarizer compiles the final answer

Interactive Run Loop

In production, you’ll want a proper loop that keeps resuming until the graph reaches END. Here’s the pattern:

def run_agent():
    app = build_research_graph(checkpointer=MemorySaver())
    thread_id = str(uuid.uuid4())
    config = {"configurable": {"thread_id": thread_id}}

    inputs = {
        "messages": [HumanMessage(
            content="What are the trade-offs between vLLM and SGLang?"
        )],
        "plan": [],
        "current_step": 0,
        "tool_results": [],
        "metadata": {},
    }

    while True:
        # Stream until interrupt or END
        for event in app.stream(inputs, config, stream_mode="values"):
            pass
        inputs = None  # Don't resend original inputs on resume

        # Check for interrupts
        current_state = app.get_state(config)
        if current_state.tasks and current_state.tasks[0].interrupts:
            question = current_state.tasks[0].interrupts[0]
            print(f"\n{question}")
            response = input("Your response: ")
            inputs = Command(resume=response)
        else:
            break

    # Print final answer
    final_state = app.get_state(config)
    for msg in final_state.values["messages"]:
        if isinstance(msg, AIMessage) and not msg.tool_calls:
            print(f"\n{'=' * 50}")
            print(msg.content)
            print(f"{'=' * 50}")

Switching to SQLite Checkpointing

The MemorySaver vanishes when the process exits. For any real workload, you need durable persistence. LangGraph ships with a SQLite checkpointer that writes every checkpoint to a file — zero infrastructure, perfect for single-node deployments.

from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

# Create the SQLite connection and pass it to SqliteSaver
with sqlite3.connect("agent_checkpoints.db", check_same_thread=False) as conn:
    with SqliteSaver(conn=conn) as sql_checkpointer:
        app = build_research_graph(checkpointer=sql_checkpointer)

        # Run exactly as before — checkpoints survive process restarts
        thread_id = str(uuid.uuid4())
        config = {"configurable": {"thread_id": thread_id}}

        inputs = {
            "messages": [HumanMessage(
                content="Compare RAG vs fine-tuning for domain adaptation"
            )],
            "plan": [],
            "current_step": 0,
            "tool_results": [],
            "metadata": {}
        }

        for event in app.stream(inputs, config, stream_mode="updates"):
            for node_name, update in event.items():
                print(f"Completed: {node_name}")

The key difference: after your Python process exits, agent_checkpoints.db on disk contains every checkpoint. Restart the process with the same thread_id and the agent resumes exactly where it left off.

For interactive resume with SQLite, wrap the same run_agent function from above, but pass a SqliteSaver checkpointer instead of MemorySaver.

Time-Travel: Rewinding and Editing State

One of LangGraph’s most powerful features is time-travel — the ability to rewind to any checkpoint, modify state, and rerun from that point. This is invaluable for debugging agent behavior and correcting bad decisions without restarting from scratch.

def time_travel_example():
    """Demonstrate time-travel: rewind, edit, and rerun."""
    memory = MemorySaver()
    app = build_research_graph(checkpointer=memory)

    thread_id = "tt-demo-thread-001"
    config = {"configurable": {"thread_id": thread_id}}

    # Run the graph to the first interrupt
    inputs = {
        "messages": [HumanMessage(
            content="Research quantum computing advances in 2026"
        )],
        "plan": [],
        "current_step": 0,
        "tool_results": [],
        "metadata": {}
    }

    for event in app.stream(inputs, config, stream_mode="updates"):
        pass

    # Walk through all checkpoint snapshots for this thread
    history = list(app.get_state_history(config))
    print(f"\nFound {len(history)} checkpoint(s) for this thread")

    for snapshot in reversed(history):
        step = snapshot.values.get("current_step", "N/A")
        print(f"  Checkpoint: {snapshot.checkpoint['id']} step={step}")

    # Find the checkpoint right after the planner ran
    # (has a plan but no tool results yet)
    target = next(
        s for s in app.get_state_history(config)
        if s.values.get("plan") and not s.values.get("tool_results")
    )

    # Fork from this checkpoint with a modified plan
    fork_config = {
        "configurable": {
            "thread_id": thread_id + "_fork",
            "checkpoint_id": target.checkpoint["id"],
        }
    }

    # Inject additional plan steps
    existing_plan = target.values["plan"]
    modified_inputs = {
        **target.values,
        "plan": existing_plan + ["Additional: Check arXiv for recent papers"]
    }

    print(f"\nOriginal plan: {existing_plan}")
    print(f"Modified plan: {modified_inputs['plan']}")

    # Re-run from the fork point
    for event in app.stream(None, fork_config, stream_mode="updates"):
        for node_name, update in event.items():
            print(f"  Fork node: {node_name}")


time_travel_example()

In this example we:

  1. Run the graph until it interrupts.
  2. Walk through get_state_history to find the checkpoint we want.
  3. Create a fork config pointing to that checkpoint_id.
  4. Inject additional plan steps and re-run from that fork.

The original thread is untouched — we’ve created a parallel branch from the same checkpoint. This is how you debug agent behavior without destroying the original execution.

Internal State Structure

For debugging, here’s what a checkpoint snapshot looks like after the planner runs:

state = app.get_state(config)
print(state.values)
# {
#   "messages": [
#     HumanMessage(content="Research X"),
#     AIMessage(content="Step 1: Search... Step 2: Analyze...")
#   ],
#   "plan": ["Search...", "Analyze..."],
#   "current_step": 0,
#   "tool_results": [],
#   "metadata": {"plan_created_at": "2026-05-12T09:00:00"}
# }

Each checkpoint captures the full state at that moment, not just the delta. This is what makes time-travel possible — you’re restoring from a complete snapshot, then replaying from there.

When to Use Which Checkpointer

Not every project needs SQLite. Here’s our recommendation based on deployment scale:

  • Prototyping / colab notebooks: MemorySaver — zero setup, loses state on exit.
  • Single-process deployments (sidecar, single container): SqliteSaver — file-based, no external dependencies, ACID-compliant. Install with pip install langgraph-checkpoint-sqlite.
  • Multi-process / scaled deployments: PostgresSaver (from langgraph-checkpoint-postgres) — shared state across replicas, required when multiple workers handle the same thread.
  • Managed LangGraph platform: The LangGraph server API handles checkpointing for you with built-in thread and state management (LangGraph persistent memory docs).

Complete Runnable File

All six pieces fit together into a single file. Save this as agent.py:

# agent.py — complete production-ready LangGraph agent
import uuid
import json
import datetime as dt
from typing import Annotated, TypedDict, List

from langchain_core.messages import (
    BaseMessage, HumanMessage, AIMessage, ToolMessage
)
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command, interrupt


class ResearchState(TypedDict):
    """Typed state for the research agent."""
    messages: Annotated[List[BaseMessage], "conversation_history"]
    plan: List[str]
    current_step: int
    tool_results: List[str]
    metadata: dict


# -- Simulated search tool ------------------------------------------
class SearchTool:
    """Replace with Tavily, DuckDuckGo, or a real search API."""

    name = "web_search"
    description = "Search the web for information."

    def search(self, query: str) -> str:
        return (
            f"Results for '{query}': active developments in {query}."
        )


search_tool = SearchTool()
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)


# -- Nodes ----------------------------------------------------------
def planner_node(state: ResearchState) -> dict:
    prompt = (
        "You are a research assistant. Create a research plan "
        "as a JSON array. Query: " + state["messages"][-1].content
    )
    response = model.invoke([HumanMessage(content=prompt)])
    try:
        text = response.content.strip()
        if text.startswith("```"):
            text = text.split("```")[1].lstrip("json").strip()
        plan = json.loads(text)
    except (json.JSONDecodeError, IndexError):
        plan = ["Search for relevant information", "Compile findings"]
    return {
        "plan": plan,
        "current_step": 0,
        "metadata": {"plan_at": dt.datetime.now().isoformat()},
    }


def tool_caller_node(state: ResearchState) -> Command:
    step = state["current_step"]
    plan = state["plan"]
    if step >= len(plan):
        return Command(goto="summarizer_node")

    query = state["messages"][-1].content
    result = search_tool.search(f"{plan[step]} about {query}")

    question = (
        f"Step {step + 1}: {plan[step]}\nResult: {result[:200]}\n\n"
        f"Approve? Reply 'yes' or provide a correction."
    )
    resume_value = interrupt(question)

    if isinstance(resume_value, str) and resume_value.strip().lower() != "yes":
        result += f"\n[Human correction: {resume_value}]"

    return Command(
        update={
            "tool_results": state["tool_results"] + [result],
            "current_step": step + 1,
            "messages": state["messages"] + [
                AIMessage(content=f"Done step {step + 1}"),
                ToolMessage(content=result, tool_call_id=str(uuid.uuid4())),
            ],
        },
        goto="tool_caller_node",
    )


def summarizer_node(state: ResearchState) -> dict:
    results = "\n\n".join(
        f"Step {i + 1}: {r}" for i, r in enumerate(state["tool_results"])
    )
    prompt = (
        f"Original query: {state['messages'][0].content}\n\n"
        f"Results: {results}\n\n"
        "Write a summary."
    )
    response = model.invoke([HumanMessage(content=prompt)])
    return {
        "messages": state["messages"] + [AIMessage(content=response.content)]
    }


# -- Graph ----------------------------------------------------------
def build_app():
    graph = StateGraph(ResearchState)
    graph.add_node("planner_node", planner_node)
    graph.add_node("tool_caller_node", tool_caller_node)
    graph.add_node("summarizer_node", summarizer_node)
    graph.set_entry_point("planner_node")
    graph.add_edge("planner_node", "tool_caller_node")
    graph.add_edge("summarizer_node", END)
    return graph.compile(checkpointer=MemorySaver())


# -- Run loop -------------------------------------------------------
def run_agent():
    app = build_app()
    thread_id = str(uuid.uuid4())
    config = {"configurable": {"thread_id": thread_id}}

    inputs = {
        "messages": [HumanMessage(
            content="What are the trade-offs between vLLM and SGLang?"
        )],
        "plan": [],
        "current_step": 0,
        "tool_results": [],
        "metadata": {},
    }

    while True:
        # Stream until interrupt or END
        for event in app.stream(inputs, config, stream_mode="values"):
            pass
        inputs = None  # Don't resend original inputs on resume

        # Check for interrupts
        current_state = app.get_state(config)
        if current_state.tasks and current_state.tasks[0].interrupts:
            question = current_state.tasks[0].interrupts[0]
            print(f"\n{question}")
            response = input("Your response: ")
            inputs = Command(resume=response)
        else:
            break

    # Print final answer
    final_state = app.get_state(config)
    for msg in final_state.values["messages"]:
        if isinstance(msg, AIMessage) and not msg.tool_calls:
            print(f"\n{'=' * 50}")
            print(msg.content)
            print(f"{'=' * 50}")


if __name__ == "__main__":
    run_agent()

Run it with python agent.py. The agent plans, executes each step, pauses for your approval, and delivers a researched summary. To upgrade to SQLite persistence, swap MemorySaver() for SqliteSaver(conn=sqlite3.connect("checkpoints.db")) in the build_app function.

The same graph compiles with any LangGraph checkpointer — MemorySaver, SqliteSaver, PostgresSaver, or the cloud platform’s built-in persistence. See our post on the agent durability gap for why checkpointing is table stakes for production agents.

Next Steps

Once your graph is checkpointed and resumable, the next questions are:

  • Evaluation — How do you know your agent gives good answers? Read our agent evaluation guide for testing frameworks and regression testing patterns.
  • Observability — Trace every node, every interrupt, every resume. Tracing LLM apps with OpenTelemetry covers the full pipeline.
  • Deployment — Getting this behind a production API means handling concurrent threads, connection pooling for the checkpointer, and graceful process shutdowns. Our deployment guide walks through the full stack.

The code above is your foundation. Build on it.

← back to blog