2026-05-31 | LangGraph Command API 深度用法:状态更新与动态路由
当一个 Agent 的执行结果需要改变另一个 Agent 的目标时怎么办?当用户想在中间暂停审批时怎么办?Command API 给了你”原子性地同时更新状态 + 跳转节点”的能力。本文从实际场景出发,讲清楚 Command 的原理、逐行代码、以及它与普通 return 的本质区别。
一、实际问题:为什么普通 return 不够用?
场景 A:状态需要跨节点累积
假设你有一个多轮对话场景,用户说”帮我查一下苹果股价”,然后说”再查一下谷歌”,然后说”帮我算平均”。
如果用普通 return {"stock_price": "150"} 的方式,每一轮都会覆盖之前的状态,结果第三次就只剩下最后一个价格,前面的上下文全丢了。
解决方案:用 Command 让 update 增量合并,而不是覆盖。
场景 B:需要根据结果动态改变目标
假设审核 Agent 发现内容违规,返回 status="rejected",Supervisor 收到这个状态后需要动态决定:不是去下一个节点,而是回到上一步重新来。
普通 return dict 只能决定”我执行完了”,不能决定”接下来谁跑”。Command 可以同时更新状态 AND 指定下一步去哪。
场景 C:HITL(Human-in-the-Loop)
高风险操作需要人工审批后才能继续。普通 return 无法让流程”暂停”,必须用 interrupt() 让图停在某个节点,等用户审批后才恢复。
二、Command API 详解
2.1 Command 的本质
1 2 3 4 5 6 7 8 9 10
| 1 from langgraph.types import Command 2 3 4 5 6 def some_node(state: dict) -> Command: 7 return Command( 8 update={"result": "new_value"}, 9 goto="next_node" 10 )
|
2.2 为什么叫”原子操作”
普通 return:
- 第 1 步:更新状态
- 第 2 步:跳转节点
- 两步之间可能有状态不一致(竞态条件)
Command:
- 一步完成:状态更新 + 跳转确定
- 不存在中间状态不一致的问题
2.3 update 是增量合并,不是覆盖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1 2 3 4 def increment_node(state: dict) -> Command: 5 6 7 8 9 return Command( 10 update={ 11 "count": state["count"] + 1, 12 "history": state["history"] + ["inc"] 13 }, 14 goto="next_node" 15 ) 16
|
三、逐行代码:3 个实战场景
3.1 场景 A:多轮对话状态累积(incremental update)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| 1 from typing import TypedDict, List 2 from langgraph.graph import StateGraph, START, END 3 from langgraph.types import Command 4 from langgraph.checkpoint.memory import MemorySaver 5 from langchain_openai import ChatOpenAI 6 7 8 class AgentState(TypedDict): 9 messages: List[str] 10 current_intent: str 11 context: dict 12 13 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) 14 15 16 17 18 def query_node(state: AgentState) -> Command: 19 user_message = state["messages"][-1] 20 intent = state["current_intent"] 21 22 prompt = ( 23 "你是一个信息查询助手。用户的问题是:\n" 24 + user_message + "\n\n" 25 + "请给出简洁的答案(50字以内)。" 26 ) 27 result = llm.invoke(prompt) 28 29 30 31 32 return Command( 33 update={ 34 "context": { 35 **state.get("context", {}), 36 "last_query": user_message, 37 "last_result": result.content, 38 "query_count": state.get("context", {}).get("query_count", 0) + 1 39 } 40 }, 41 goto="router_node" 42 ) 43 44 45 46 47 def router_node(state: AgentState) -> Command: 48 intent = state.get("current_intent", "") 49 50 51 if any(kw in intent for kw in ["平均", "对比", "统计"]): 52 goto_target = "analyze_node" 53 elif any(kw in intent for kw in ["结束", "done", "完成"]): 54 goto_target = END 55 else: 56 goto_target = "query_node" 57 58 return Command(update={}, goto=goto_target) 59 60 61 62 63 def analyze_node(state: AgentState) -> Command: 64 context = state.get("context", {}) 65 query_count = context.get("query_count", 0) 66 67 prompt = ( 68 "你是一个数据分析助手。用户共查询了 " + str(query_count) + " 次。\n" 69 + "查询记录:" + str(context) + "\n\n" 70 + "请给出一个综合分析结论。" 71 ) 72 analysis = llm.invoke(prompt) 73 74 return Command( 75 update={"final_analysis": analysis.content}, 76 goto=END 77 ) 78 79 80 builder = StateGraph(AgentState) 81 builder.add_node("query_node", query_node) 82 builder.add_node("router_node", router_node) 83 builder.add_node("analyze_node", analyze_node) 84 builder.add_edge(START, "query_node") 85 builder.add_edge("analyze_node", END) 86 87 checkpointer = MemorySaver() 88 graph = builder.compile(checkpointer=checkpointer) 89 90 91 config = {"configurable": {"thread_id": "multi-turn-001"}} 92 initial_state = { 93 "messages": ["帮我查一下苹果股价"], 94 "current_intent": "查询", 95 "context": {}, 96 } 97 98 99 for event in graph.stream(initial_state, config=config): 100 pass 101 102 103 checkpoint = checkpointer.get(config["configurable"]) 104 resumed_state = checkpoint["configurable"] 105 resumed_state["messages"].append("再查一下谷歌股价") 106 resumed_state["current_intent"] = "查询" 107 108 for event in graph.stream(resumed_state, config=config): 109 pass 110 111 112 resumed_state["messages"].append("帮我算一下平均") 113 resumed_state["current_intent"] = "平均" 114 for event in graph.stream(resumed_state, config=config): 115 print(event)
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11
| 初始状态:messages=["苹果股价"],context={} ↓ query_node:查询苹果 → update={last_query:"苹果", last_result:"$180", query_count:1} ↓ router_node:intent="查询" → goto=query_node(继续查询) ↓ query_node:查询谷歌 → update={...last_query:"谷歌", last_result:"$140", query_count:2} ↓ router_node:intent="平均" → goto=analyze_node(进入分析) ↓ analyze_node:综合分析两次查询结果 → final_analysis="两只股票平均..."
|
3.2 场景 B:动态路由(根据结果改变目标)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| 1 2 def review_node(state: dict) -> Command: 3 content = state.get("content", "") 4 5 prompt = "检查以下内容是否合规(涉及敏感词返回 REJECTED,否则 APPROVED):\n" + content 6 result = llm.invoke(prompt) 7 8 9 if "REJECTED" in result.content: 10 return Command( 11 update={ 12 "review_status": "rejected", 13 "issues": result.content 14 }, 15 goto="edit_node" 16 ) 17 else: 18 return Command( 19 update={"review_status": "approved"}, 20 goto=END 21 ) 22 23 24 def edit_node(state: dict) -> Command: 25 issues = state.get("issues", "") 26 content = state.get("content", "") 27 28 prompt = ( 29 "你是内容编辑专家。根据以下审核意见修改内容:\n" 30 + "审核意见:" + issues + "\n" 31 + "原始内容:" + content + "\n\n" 32 + "输出一版修改后的内容。" 33 ) 34 new_content = llm.invoke(prompt) 35 36 return Command( 37 update={"content": new_content.content}, 38 goto="review_node" 39 ) 40
|
执行流程:
1 2 3 4 5 6 7 8 9 10
| 用户输入内容 ↓ review_node:检查内容 ├─ REJECTED → goto=edit_node(动态跳转) │ ↓ │ edit_node:修改内容 │ ↓ │ review_node:重新审核(循环) │ ↓ └─ APPROVED → goto=END(结束)
|
3.3 场景 C:HITL 人工审批后恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| 1 2 def dangerous_operation_node(state: dict) -> Command: 3 action = state.get("action", "") 4 5 6 7 8 return Command( 9 update={ 10 "pending_action": action, 11 "status": "awaiting_approval" 12 }, 13 goto=END 14 ) 15 16 17 def after_approval_node(state: dict) -> dict: 18 19 approved = state.get("approval_result", False) 20 21 if not approved: 22 return {"status": "rejected", "final_result": "操作被用户拒绝"} 23 24 25 return { 26 "status": "executed", 27 "final_result": f"操作已执行: {state.get('pending_action', '')}" 28 } 29 30 31 32 user_command = Command( 33 resume={"approval_result": True}, 34 goto="after_approval_node" 35 ) 36 37 38 config = {"configurable": {"thread_id": "pending-approval-001"}} 39 for event in graph.invoke(user_command, config=config): 40 print(event) 41
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11
| 用户提交高风险操作 ↓ dangerous_operation_node:执行 interrupt(),状态保存到 thread_id="pending-approval-001" ↓ (图执行暂停,thread_id 保存了当前状态) ↓ 用户审批 → 点击"同意"或"拒绝" ↓ 外部调用 graph.invoke(Command(resume={approval_result: True}, goto="after_approval_node")) ↓ after_approval_node:根据审批结果继续执行
|
四、Command vs 普通 return:对比总结
| 特性 | 普通 return dict | Command |
|---|
| 更新状态 | ✅ | ✅(增量合并) |
| 跳转节点 | ❌(写死在 add_edge) | ✅(动态决定) |
| 原子性 | ❌(两步独立) | ✅(一步完成) |
| 跨图跳转 | ❌ | ✅(通过 goto=”subgraph_name”) |
| HITL 恢复 | ❌ | ✅(resume 参数) |
| 适用场景 | 固定线性流程 | 需要动态决策的业务 |
五、思想总结
| 设计点 | 说明 |
|---|
| 为什么 update 是增量合并 | 保证状态在多轮执行中持续累积,不会被覆盖丢失 |
| 为什么需要原子操作 | 状态更新和路由决策在一步内完成,防止竞态条件 |
| 什么时候用普通 return | 节点只做计算、不影响流程走向时,用 return 更简单 |
| 什么时候用 Command | 需要动态路由、状态累积、HITL 暂停恢复时必须用 Command |
| interrupt() 的本质 | 保存当前状态快照到检查点,图执行暂停,等外部触发恢复 |
六、明天预告
Store 长期记忆:跨会话用户画像持久化,语义检索长期记忆。
参考来源:LangGraph 1.0 官方文档(2026-05),LangGraph Command API 实战(2026-05)