LangGraph 1.0 实战:3 个高频业务场景与解决方案

LangGraph 是当前 AI Agent 开发最主流的框架(2026年 LangChain 1.0 已完全基于 LangGraph 底层驱动)。本文聚焦 3 个真实的 Agent 开发业务场景,讲清楚每个场景的问题本质、为什么这样解决、逐行代码实现,以及背后的设计思想


场景一:Agent 执行中突然挂掉,重启后状态全丢了——Checkpoint 持久化

1.1 问题描述

你写了一个多步骤 Agent,用户问了一个复杂问题,Agent 开始执行:查天气 → 查交通 → 规划路线 → 发邮件。执行到第三步时,服务重启了。用户重新发同样的问题,Agent 从头开始,用户之前看到的结果全丢了。

这是生产级 Agent 最常见的问题之一:Agent 耗时长、中间状态需要跨进程恢复。

1.2 错误做法(开发阶段容易踩的坑)

1
2
3
4
5
6
# ❌ 错误:内存型 Checkpointer,服务重启就丢失
from langgraph.checkpoint.memory import MemorySaver

graph = builder.compile(checkpointer=MemorySaver())
# 问题:服务重启后,所有对话历史全部丢失
# 测试环境看不出来,生产环境用户会抓狂

1.3 正确做法:数据库持久化 Checkpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ✅ 正确:生产环境必须用数据库 Checkpointer
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import AsyncConnectionPool

# 使用连接池,不要每次创建新连接(否则性能会崩溃)
pool = AsyncConnectionPool(
"postgresql://user:pass@localhost/agent_db",
min_size=10, # 池最小连接数
max_size=100, # 池最大连接数
max_idle=300.0, # 连接最大空闲时间(秒)
max_lifetime=3600.0, # 连接最大生命周期(秒)
)

checkpointer = PostgresSaver(pool)

# 编译图时指定 checkpointer
graph = builder.compile(checkpointer=checkpointer)

# 调用时传入 thread_id,同一个 thread_id 重启后自动恢复状态
config = {"configurable": {"thread_id": f"user_{user_id}_session_{session_id}"}}
for event in graph.stream({"messages": [("user", user_input)]}, config=config):
yield event

为什么需要 thread_id?

1
2
3
4
5
6
thread_id = "user_123_session_456"
├── 作用:唯一标识一个会话的执行线程
├── 重启后:用同样的 thread_id 调用 graph.invoke()
│ → LangGraph 自动从最后一个 checkpoint 恢复状态
│ → 用户看到的是中断前的结果,不是从头开始
└── 格式建议:user_{user_id}_session_{session_id}_timestamp

1.4 原理:Checkpoint 的工作机制

LangGraph 在每个节点执行完毕后,会自动调用 checkpointer 将当前状态写入持久化存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
节点 step_1 执行完毕

checkpointer 保存快照(values: {messages, current_node, plan}, next: ["step_2"], config: {thread_id})

节点 step_2 执行完毕

checkpointer 再次保存快照(覆盖上一次)

服务崩溃,重启

用户再次调用 graph.invoke(thread_id="user_123...")

LangGraph 从 checkpointer 读取最后一个快照,恢复状态,继续执行 step_2

一个 checkpointer 能做的事

  • get_state(config):读取当前状态
  • get_state_history(config):读取历史快照列表(可用于”回放”整个执行过程)
  • update_state(config, new_state):手动修改状态后恢复执行(用于人工修正)
  • put(state):手动写入快照(用于调试)

1.5 思想总结

为什么 Agent 需要持久化 Checkpoint?因为 Agent 的执行时间是未知的,可能 5 秒,可能 30 分钟,中间任何一步服务崩溃都需要能从断点恢复。Checkpoint 是生产级 Agent 的基础设施,不是可选项。

选型建议

  • 测试/单机:MemorySaver(内存,重启丢失)
  • 生产环境:PostgresSaver(PostgreSQL,推荐)/ RedisSaver(Redis)/ SqliteSaver(轻量级单机)

场景二:高风险操作必须人工审批后才能执行——interrupt() 与 Human-in-the-Loop

2.1 问题描述

你的 Agent 有删除文件、删除数据库表、转账等高风险操作。不能让它全自动执行,必须在执行前暂停,等人工确认后才继续。

典型场景

  • 删库操作:Agent 判断要清理历史数据,需要人工点”确认删除”
  • 外部通知:Agent 准备给客户发邮件,人工审核邮件内容后才发
  • 资金操作:Agent 判断要执行转账,必须人工授权

2.2 核心 API:interrupt()

interrupt() 是 LangGraph 1.0 提供的标准中断 API。在节点内部调用 interrupt() 时,图执行会暂停,等待外部用 Command(resume=...) 恢复。

2.3 完整实现:删除文件前的人工审批流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import AsyncConnectionPool

# ========== 1. 定义状态 ==========
class AgentState(TypedDict):
messages: list
pending_action: dict | None # 待审批操作
approved: bool # 审批结果

# ========== 2. 节点:执行文件删除(高风险) ==========
def execute_delete(state: AgentState) -> AgentState:
action = state["pending_action"]
file_path = action["file_path"]
# 实际删除操作
os.remove(file_path)
return {
"messages": state["messages"] + [f"已删除文件: {file_path}"],
"pending_action": None,
"approved": False,
}

# ========== 3. 节点:准备删除(触发人工审批) ==========
def prepare_delete(state: AgentState) -> AgentState:
action = state["pending_action"]
file_path = action["file_path"]

# ⚠️ interrupt() 暂停图执行,等待人工审批
# 这里传入要展示给人工的内容(文件路径、操作类型)
human_value = interrupt({
"title": "高风险操作需要审批",
"description": f"Agent 准备执行删除文件操作",
"file_path": file_path,
"action": "delete_file",
# human_value 可以是任何可 JSON 序列化的内容
})

# ⚠️ 执行流恢复后,interrupt() 会返回 human 提供的值
# human 可以提供: {"approved": True, "note": "已确认清理"}
if not human_value.get("approved"):
return {
"messages": state["messages"] + ["操作已取消(人工否决)"],
"pending_action": None,
"approved": False,
}

# 审批通过,继续执行
return state

# ========== 4. 节点:LLM 判断是否需要删除 ==========
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")

def llm_router(state: AgentState) -> AgentState:
last_msg = state["messages"][-1]["content"]
response = llm.invoke(f"用户说:{last_msg},判断是否需要执行删除文件操作。"
"如果需要,返回文件路径;如果不需要,返回 NO_ACTION。")
if "NO_ACTION" in response.content:
return {"messages": state["messages"] + [("assistant", "好的,我知道了。")]}
# 假设 LLM 返回了文件路径
file_path = response.content.strip()
return {
"pending_action": {"action": "delete_file", "file_path": file_path},
}

# ========== 5. 条件边:根据 pending_action 决定路由 ==========
def should_approve(state: AgentState) -> Literal["prepare_delete", "llm_router"]:
if state.get("pending_action"):
return "prepare_delete"
return "llm_router"

# ========== 6. 构建图 ==========
builder = StateGraph(AgentState)
builder.add_node("llm_router", llm_router)
builder.add_node("prepare_delete", prepare_delete)
builder.add_node("execute_delete", execute_delete)

builder.add_edge(START, "llm_router")
builder.add_edge("execute_delete", END)
builder.add_conditional_edges("llm_router", should_approve)
builder.add_conditional_edges("prepare_delete", lambda s: "execute_delete" if s.get("pending_action") else END)

pool = AsyncConnectionPool("postgresql://user:pass@localhost/agent_db")
graph = builder.compile(checkpointer=PostgresSaver(pool))

# ========== 7. 对外暴露的 Agent 接口 ==========
import uuid

def run_agent(user_message: str, user_id: str):
thread_config = {
"configurable": {
"thread_id": f"user_{user_id}_agent_{uuid.uuid4().hex[:8]}",
"pool": pool,
}
}

# 第一次调用:Agent 开始执行,遇到 interrupt() 后暂停
for event in graph.stream(
{"messages": [("user", user_message)], "pending_action": None, "approved": False},
config=thread_config
):
# event 是流式事件,keys 包括 pending_action 等状态
if event.get("pending_action"):
print(f"⏸️ Agent 已暂停,等待审批...")
return event # 返回给调用方展示审批 UI

return event

def approve_action(thread_id: str, approval: dict):
"""
人工审批后调用此方法恢复执行:
- approval = {"approved": True, "note": "确认可以删除"}
"""
graph.invoke(
Command(resume=approval),
config={"configurable": {"thread_id": thread_id, "pool": pool}}
)

流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
用户: "帮我清理 /tmp/old_logs.txt"

llm_router(LLM 判断)
↓ pending_action = {file_path: "/tmp/old_logs.txt"}
prepare_delete(触发 interrupt 暂停)
↓ ⏸️ 人工审批界面展示:文件路径 + 操作类型

人工点击"确认"或"取消"

Command(resume={approved: True}) 恢复执行

execute_delete → 执行真正的删除

END

2.4 思想总结

为什么 interrupt() 比直接 return 后判断更合理?因为 interrupt() 把状态快照持久化到 checkpointer,即使服务重启也能从断点恢复。如果用简单的 if 判断后 return,中断期间服务重启,用户就无法恢复了。interrupt() 是 LangGraph 1.0 设计 Human-in-the-Loop 的标准方式,替代了旧版 NodeInterrupt。


场景三:Agent 内部多个子任务需要并行执行——Send API

3.1 问题描述

你的 Agent 要完成一个调研任务:

  • 同时查多个信息源(新闻搜索 + 学术搜索 + 社交媒体)
  • 每个信息源独立查询,互不依赖
  • 全部查完后,再把结果汇总给 LLM 生成报告

如果用普通边,三个信息源必须串行执行,速度慢;用 Send API,三个信息源并行执行,速度提升 3 倍。

3.2 核心 API:Send

Send 是 LangGraph 1.0 提供的并行任务分发 API。在条件边或节点中返回 Send("节点名", state),会在同一个时间步内并行触发多个节点执行。

3.3 完整实现:调研 Agent 并行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
import operator

# ========== 1. 状态定义 ==========
class ResearchState(TypedDict):
query: str
results: Annotated[list, operator.add] # 累加模式,多个结果追加
final_report: str

# ========== 2. 各信息源节点 ==========
def search_news(state: ResearchState) -> ResearchState:
"""搜索新闻"""
query = state["query"]
# 调用新闻搜索工具(伪代码)
news_results = news_tool.search(query)
return {"results": [f"[新闻] {news_results}"]}

def search_academic(state: ResearchState) -> ResearchState:
"""搜索学术论文"""
query = state["query"]
academic_results = academic_tool.search(query)
return {"results": [f"[学术] {academic_results}"]}

def search_social(state: ResearchState) -> ResearchState:
"""搜索社交媒体"""
query = state["query"]
social_results = social_tool.search(query)
return {"results": [f"[社交] {social_results}"]}

def write_report(state: ResearchState) -> ResearchState:
"""汇总所有结果,生成报告"""
all_results = "\n\n".join(state["results"])
report = llm.invoke(
f"根据以下调研结果,写一份完整报告:\n\n{all_results}"
)
return {"final_report": report}

# ========== 3. 调度节点:并行分发三个子任务 ==========
def distribute_search(state: ResearchState) -> list[Send]:
"""
返回 Send 列表:同一时间步并行触发三个搜索节点
每个 Send 携带不同的输入状态(这里 query 相同)
"""
query = state["query"]
return [
Send("search_news", {"query": query, "results": []}),
Send("search_academic", {"query": query, "results": []}),
Send("search_social", {"query": query, "results": []}),
]

# ========== 4. 构建图 ==========
builder = StateGraph(ResearchState)
builder.add_node("search_news", search_news)
builder.add_node("search_academic", search_academic)
builder.add_node("search_social", search_social)
builder.add_node("write_report", write_report)

builder.add_edge(START, "distribute_search")

# distribute_search 返回 [Send(...), Send(...), Send(...)]
# LangGraph 会在这一个时间步内并行执行三个搜索节点
builder.add_node("distribute_search", distribute_search)

# 条件边:distribute_search → 并行执行三个搜索节点
builder.add_conditional_edges("distribute_search", distribute_search)

# 三个搜索节点都执行完毕后,流向 write_report
for node in ["search_news", "search_academic", "search_social"]:
builder.add_edge(node, "write_report")

builder.add_edge("write_report", END)

graph = builder.compile()

# ========== 5. 执行 ==========
result = graph.invoke({"query": "LangGraph 1.0 最新特性", "results": []})
print(result["final_report"])

执行时序对比

1
2
3
4
5
6
7
8
9
普通边(串行):
distribute_search → search_news → search_academic → search_social → write_report
总耗时 = t_news + t_academic + t_social ≈ 3T

Send API(并行):
distribute_search → search_news ─┐
├→ search_academic ─┼→ write_report
└→ search_social ─┘
总耗时 = max(t_news, t_academic, t_social) ≈ T

3.4 思想总结

为什么用 Send 而不是并行调用三个函数?Send 的核心优势是状态共享 + 结果聚合。三个节点执行时共享同一个 results 列表(通过 Annotated[list, operator.add] 累加器),全部完成后才进入 write_report。直接并行调用函数也可以做到,但状态需要手动管理,Send 让你用图的思维表达并行,状态管理全部由 LangGraph 统一处理。


三个场景的共同主题

场景解决的问题核心 API为什么重要
Checkpoint 持久化Agent 执行中断/重启状态丢失PostgresSaver + thread_id生产级 Agent 的基础设施
Human-in-the-Loop高风险操作需要人工审批interrupt() + Command(resume=…)安全合规,线上必需
并行子任务多个独立任务串行太慢Send性能优化,真实业务提速

这三个场景覆盖了 AI Agent 开发 80% 的核心需求:状态持久化、人机协作、并行执行。


明天预告

场景四:多 Agent 协作(Supervisor 模式)

多个专业 Agent(研究员、程序员、审核员)如何通过 LangGraph 协作?Supervisor 如何动态分配任务?遇到任务失败如何让 Supervisor 自动重试或换人?


参考来源:LangGraph 1.0 官方文档(2026-05),LangGraph v1.0 完全指南 GitHub(2026-05),LangGraph 控制流原语解析 CSDN(2026-05),LangGraph 持久化完全指南 CSDN(2026-04)