LangGraph 1.0 实战:3 个高频业务场景与解决方案 LangGraph 是当前 AI Agent 开发最主流的框架(2026年 LangChain 1.0 已完全基于 LangGraph 底层驱动)。本文聚焦 3 个真实的 Agent 开发业务场景,讲清楚每个场景的问题本质、为什么这样解决、逐行代码实现 ,以及背后的设计思想 。
场景一:Agent 执行中突然挂掉,重启后状态全丢了——Checkpoint 持久化 1.1 问题描述 你写了一个多步骤 Agent,用户问了一个复杂问题,Agent 开始执行:查天气 → 查交通 → 规划路线 → 发邮件。执行到第三步时,服务重启了。用户重新发同样的问题,Agent 从头开始,用户之前看到的结果全丢了。
这是生产级 Agent 最常见的问题之一 :Agent 耗时长、中间状态需要跨进程恢复。
1.2 错误做法(开发阶段容易踩的坑) 1 2 3 4 5 6 from langgraph.checkpoint.memory import MemorySavergraph = builder.compile (checkpointer=MemorySaver())
1.3 正确做法:数据库持久化 Checkpoint 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langgraph.checkpoint.postgres import PostgresSaverfrom psycopg_pool import AsyncConnectionPoolpool = AsyncConnectionPool( "postgresql://user:pass@localhost/agent_db" , min_size=10 , max_size=100 , max_idle=300.0 , max_lifetime=3600.0 , ) checkpointer = PostgresSaver(pool) graph = builder.compile (checkpointer=checkpointer) config = {"configurable" : {"thread_id" : f"user_{user_id} _session_{session_id} " }} for event in graph.stream({"messages" : [("user" , user_input)]}, config=config): yield event
为什么需要 thread_id?
1 2 3 4 5 6 thread_id = "user_123_session_456" ├── 作用:唯一标识一个会话的执行线程 ├── 重启后:用同样的 thread_id 调用 graph.invoke() │ → LangGraph 自动从最后一个 checkpoint 恢复状态 │ → 用户看到的是中断前的结果,不是从头开始 └── 格式建议:user_{user_id}_session_{session_id}_timestamp
1.4 原理:Checkpoint 的工作机制 LangGraph 在每个节点执行完毕后,会自动调用 checkpointer 将当前状态写入持久化存储:
1 2 3 4 5 6 7 8 9 10 11 12 13 节点 step_1 执行完毕 ↓ checkpointer 保存快照(values: {messages, current_node, plan}, next: ["step_2"], config: {thread_id}) ↓ 节点 step_2 执行完毕 ↓ checkpointer 再次保存快照(覆盖上一次) ↓ 服务崩溃,重启 ↓ 用户再次调用 graph.invoke(thread_id="user_123...") ↓ LangGraph 从 checkpointer 读取最后一个快照,恢复状态,继续执行 step_2
一个 checkpointer 能做的事 :
get_state(config):读取当前状态get_state_history(config):读取历史快照列表(可用于”回放”整个执行过程)update_state(config, new_state):手动修改状态后恢复执行(用于人工修正)put(state):手动写入快照(用于调试)1.5 思想总结 为什么 Agent 需要持久化 Checkpoint?因为 Agent 的执行时间是未知的,可能 5 秒,可能 30 分钟,中间任何一步服务崩溃都需要能从断点恢复。Checkpoint 是生产级 Agent 的基础设施,不是可选项。
选型建议 :
测试/单机:MemorySaver(内存,重启丢失) 生产环境:PostgresSaver(PostgreSQL,推荐)/ RedisSaver(Redis)/ SqliteSaver(轻量级单机) 场景二:高风险操作必须人工审批后才能执行——interrupt() 与 Human-in-the-Loop 2.1 问题描述 你的 Agent 有删除文件、删除数据库表、转账等高风险操作。不能让它全自动执行,必须在执行前暂停,等人工确认后才继续。
典型场景 :
删库操作:Agent 判断要清理历史数据,需要人工点”确认删除” 外部通知:Agent 准备给客户发邮件,人工审核邮件内容后才发 资金操作:Agent 判断要执行转账,必须人工授权 2.2 核心 API:interrupt() interrupt() 是 LangGraph 1.0 提供的标准中断 API。在节点内部调用 interrupt() 时,图执行会暂停,等待外部用 Command(resume=...) 恢复。
2.3 完整实现:删除文件前的人工审批流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 from typing import TypedDict, Literal from langgraph.graph import StateGraph, START, ENDfrom langgraph.types import interrupt, Commandfrom langgraph.checkpoint.postgres import PostgresSaverfrom psycopg_pool import AsyncConnectionPoolclass AgentState (TypedDict ): messages: list pending_action: dict | None approved: bool def execute_delete (state: AgentState ) -> AgentState: action = state["pending_action" ] file_path = action["file_path" ] os.remove(file_path) return { "messages" : state["messages" ] + [f"已删除文件: {file_path} " ], "pending_action" : None , "approved" : False , } def prepare_delete (state: AgentState ) -> AgentState: action = state["pending_action" ] file_path = action["file_path" ] human_value = interrupt({ "title" : "高风险操作需要审批" , "description" : f"Agent 准备执行删除文件操作" , "file_path" : file_path, "action" : "delete_file" , }) if not human_value.get("approved" ): return { "messages" : state["messages" ] + ["操作已取消(人工否决)" ], "pending_action" : None , "approved" : False , } return state from langchain_openai import ChatOpenAIllm = ChatOpenAI(model="gpt-4o-mini" ) def llm_router (state: AgentState ) -> AgentState: last_msg = state["messages" ][-1 ]["content" ] response = llm.invoke(f"用户说:{last_msg} ,判断是否需要执行删除文件操作。" "如果需要,返回文件路径;如果不需要,返回 NO_ACTION。" ) if "NO_ACTION" in response.content: return {"messages" : state["messages" ] + [("assistant" , "好的,我知道了。" )]} file_path = response.content.strip() return { "pending_action" : {"action" : "delete_file" , "file_path" : file_path}, } def should_approve (state: AgentState ) -> Literal ["prepare_delete" , "llm_router" ]: if state.get("pending_action" ): return "prepare_delete" return "llm_router" builder = StateGraph(AgentState) builder.add_node("llm_router" , llm_router) builder.add_node("prepare_delete" , prepare_delete) builder.add_node("execute_delete" , execute_delete) builder.add_edge(START, "llm_router" ) builder.add_edge("execute_delete" , END) builder.add_conditional_edges("llm_router" , should_approve) builder.add_conditional_edges("prepare_delete" , lambda s: "execute_delete" if s.get("pending_action" ) else END) pool = AsyncConnectionPool("postgresql://user:pass@localhost/agent_db" ) graph = builder.compile (checkpointer=PostgresSaver(pool)) import uuiddef run_agent (user_message: str , user_id: str ): thread_config = { "configurable" : { "thread_id" : f"user_{user_id} _agent_{uuid.uuid4().hex [:8 ]} " , "pool" : pool, } } for event in graph.stream( {"messages" : [("user" , user_message)], "pending_action" : None , "approved" : False }, config=thread_config ): if event.get("pending_action" ): print (f"⏸️ Agent 已暂停,等待审批..." ) return event return event def approve_action (thread_id: str , approval: dict ): """ 人工审批后调用此方法恢复执行: - approval = {"approved": True, "note": "确认可以删除"} """ graph.invoke( Command(resume=approval), config={"configurable" : {"thread_id" : thread_id, "pool" : pool}} )
流程图 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 用户: "帮我清理 /tmp/old_logs.txt" ↓ llm_router(LLM 判断) ↓ pending_action = {file_path: "/tmp/old_logs.txt"} prepare_delete(触发 interrupt 暂停) ↓ ⏸️ 人工审批界面展示:文件路径 + 操作类型 ↓ 人工点击"确认"或"取消" ↓ Command(resume={approved: True}) 恢复执行 ↓ execute_delete → 执行真正的删除 ↓ END
2.4 思想总结 为什么 interrupt() 比直接 return 后判断更合理?因为 interrupt() 把状态快照持久化到 checkpointer,即使服务重启也能从断点恢复。如果用简单的 if 判断后 return,中断期间服务重启,用户就无法恢复了。interrupt() 是 LangGraph 1.0 设计 Human-in-the-Loop 的标准方式,替代了旧版 NodeInterrupt。
场景三:Agent 内部多个子任务需要并行执行——Send API 3.1 问题描述 你的 Agent 要完成一个调研任务:
同时查多个信息源(新闻搜索 + 学术搜索 + 社交媒体) 每个信息源独立查询,互不依赖 全部查完后,再把结果汇总给 LLM 生成报告 如果用普通边,三个信息源必须串行执行 ,速度慢;用 Send API,三个信息源并行执行 ,速度提升 3 倍。
3.2 核心 API:Send Send 是 LangGraph 1.0 提供的并行任务分发 API。在条件边或节点中返回 Send("节点名", state),会在同一个时间步内并行触发多个节点执行。
3.3 完整实现:调研 Agent 并行查询 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 from typing import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Sendimport operatorclass ResearchState (TypedDict ): query: str results: Annotated[list , operator.add] final_report: str def search_news (state: ResearchState ) -> ResearchState: """搜索新闻""" query = state["query" ] news_results = news_tool.search(query) return {"results" : [f"[新闻] {news_results} " ]} def search_academic (state: ResearchState ) -> ResearchState: """搜索学术论文""" query = state["query" ] academic_results = academic_tool.search(query) return {"results" : [f"[学术] {academic_results} " ]} def search_social (state: ResearchState ) -> ResearchState: """搜索社交媒体""" query = state["query" ] social_results = social_tool.search(query) return {"results" : [f"[社交] {social_results} " ]} def write_report (state: ResearchState ) -> ResearchState: """汇总所有结果,生成报告""" all_results = "\n\n" .join(state["results" ]) report = llm.invoke( f"根据以下调研结果,写一份完整报告:\n\n{all_results} " ) return {"final_report" : report} def distribute_search (state: ResearchState ) -> list [Send]: """ 返回 Send 列表:同一时间步并行触发三个搜索节点 每个 Send 携带不同的输入状态(这里 query 相同) """ query = state["query" ] return [ Send("search_news" , {"query" : query, "results" : []}), Send("search_academic" , {"query" : query, "results" : []}), Send("search_social" , {"query" : query, "results" : []}), ] builder = StateGraph(ResearchState) builder.add_node("search_news" , search_news) builder.add_node("search_academic" , search_academic) builder.add_node("search_social" , search_social) builder.add_node("write_report" , write_report) builder.add_edge(START, "distribute_search" ) builder.add_node("distribute_search" , distribute_search) builder.add_conditional_edges("distribute_search" , distribute_search) for node in ["search_news" , "search_academic" , "search_social" ]: builder.add_edge(node, "write_report" ) builder.add_edge("write_report" , END) graph = builder.compile () result = graph.invoke({"query" : "LangGraph 1.0 最新特性" , "results" : []}) print (result["final_report" ])
执行时序对比 :
1 2 3 4 5 6 7 8 9 普通边(串行): distribute_search → search_news → search_academic → search_social → write_report 总耗时 = t_news + t_academic + t_social ≈ 3T Send API(并行): distribute_search → search_news ─┐ ├→ search_academic ─┼→ write_report └→ search_social ─┘ 总耗时 = max(t_news, t_academic, t_social) ≈ T
3.4 思想总结 为什么用 Send 而不是并行调用三个函数?Send 的核心优势是状态共享 + 结果聚合。三个节点执行时共享同一个 results 列表(通过 Annotated[list, operator.add] 累加器),全部完成后才进入 write_report。直接并行调用函数也可以做到,但状态需要手动管理,Send 让你用图的思维表达并行,状态管理全部由 LangGraph 统一处理。
三个场景的共同主题 场景 解决的问题 核心 API 为什么重要 Checkpoint 持久化 Agent 执行中断/重启状态丢失 PostgresSaver + thread_id 生产级 Agent 的基础设施 Human-in-the-Loop 高风险操作需要人工审批 interrupt() + Command(resume=…) 安全合规,线上必需 并行子任务 多个独立任务串行太慢 Send 性能优化,真实业务提速
这三个场景覆盖了 AI Agent 开发 80% 的核心需求 :状态持久化、人机协作、并行执行。
明天预告 场景四:多 Agent 协作(Supervisor 模式)
多个专业 Agent(研究员、程序员、审核员)如何通过 LangGraph 协作?Supervisor 如何动态分配任务?遇到任务失败如何让 Supervisor 自动重试或换人?
参考来源 :LangGraph 1.0 官方文档(2026-05),LangGraph v1.0 完全指南 GitHub(2026-05),LangGraph 控制流原语解析 CSDN(2026-05),LangGraph 持久化完全指南 CSDN(2026-04)