2026-05-31 | LangGraph Command API 深度用法:状态更新与动态路由

当一个 Agent 的执行结果需要改变另一个 Agent 的目标时怎么办?当用户想在中间暂停审批时怎么办?Command API 给了你”原子性地同时更新状态 + 跳转节点”的能力。本文从实际场景出发,讲清楚 Command 的原理、逐行代码、以及它与普通 return 的本质区别。


一、实际问题:为什么普通 return 不够用?

场景 A:状态需要跨节点累积

假设你有一个多轮对话场景,用户说”帮我查一下苹果股价”,然后说”再查一下谷歌”,然后说”帮我算平均”。

如果用普通 return {"stock_price": "150"} 的方式,每一轮都会覆盖之前的状态,结果第三次就只剩下最后一个价格,前面的上下文全丢了。

解决方案:用 Command 让 update 增量合并,而不是覆盖。

场景 B:需要根据结果动态改变目标

假设审核 Agent 发现内容违规,返回 status="rejected",Supervisor 收到这个状态后需要动态决定:不是去下一个节点,而是回到上一步重新来

普通 return dict 只能决定”我执行完了”,不能决定”接下来谁跑”。Command 可以同时更新状态 AND 指定下一步去哪。

场景 C:HITL(Human-in-the-Loop)

高风险操作需要人工审批后才能继续。普通 return 无法让流程”暂停”,必须用 interrupt() 让图停在某个节点,等用户审批后才恢复。


二、Command API 详解

2.1 Command 的本质

1
2
3
4
5
6
7
8
9
10
 1  from langgraph.types import Command
2
3 # Command 的两个核心参数:
4 # update : 字典,要合并到当前状态(增量更新,不是覆盖)
5 # goto : 字符串或 END,下一个要执行的节点
6 def some_node(state: dict) -> Command:
7 return Command(
8 update={"result": "new_value"}, # 增量更新状态
9 goto="next_node" # 跳转到下一个节点
10 )

2.2 为什么叫”原子操作”

普通 return:

  • 第 1 步:更新状态
  • 第 2 步:跳转节点
  • 两步之间可能有状态不一致(竞态条件)

Command:

  • 一步完成:状态更新 + 跳转确定
  • 不存在中间状态不一致的问题

2.3 update 是增量合并,不是覆盖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 1  # 假设当前状态是:
2 # state = {"count": 0, "history": ["a", "b"]}
3
4 def increment_node(state: dict) -> Command:
5 # 普通 return 会覆盖整个状态:
6 # return {"count": 1} # state.history 丢了!
7
8 # Command 的 update 是增量合并:
9 return Command(
10 update={
11 "count": state["count"] + 1, # 只更新 count 字段
12 "history": state["history"] + ["inc"] # 追加 history
13 },
14 goto="next_node"
15 )
16 # 结果:state = {"count": 1, "history": ["a", "b", "inc"]}

三、逐行代码:3 个实战场景

3.1 场景 A:多轮对话状态累积(incremental update)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
  1  from typing import TypedDict, List
2 from langgraph.graph import StateGraph, START, END
3 from langgraph.types import Command
4 from langgraph.checkpoint.memory import MemorySaver
5 from langchain_openai import ChatOpenAI
6
7 # -------------------- 状态定义 --------------------
8 class AgentState(TypedDict):
9 messages: List[str] # 所有对话历史(会累积)
10 current_intent: str # 当前用户意图
11 context: dict # 跨轮次积累的上下文
12
13 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
14
15 # ================================================
16 # 查询节点:向 LLM 提问,累积查询结果到 context
17 # ================================================
18 def query_node(state: AgentState) -> Command:
19 user_message = state["messages"][-1] # 取最新一条用户消息
20 intent = state["current_intent"] # 当前意图
21
22 prompt = (
23 "你是一个信息查询助手。用户的问题是:\n"
24 + user_message + "\n\n"
25 + "请给出简洁的答案(50字以内)。"
26 )
27 result = llm.invoke(prompt)
28
29 # -------------------- 核心:增量更新 context --------------------
30 # update 中直接引用 state 的旧值,自动完成合并
31 # 这里把新的查询结果追加到 context["queries"] 列表
32 return Command(
33 update={
34 "context": {
35 **state.get("context", {}), # 保留旧 context
36 "last_query": user_message, # 追加本次查询
37 "last_result": result.content, # 记录本次结果
38 "query_count": state.get("context", {}).get("query_count", 0) + 1 # 计数 +1
39 }
40 },
41 goto="router_node" # 跳转到路由节点判断下一步
42 )
43
44 # ================================================
45 # 路由节点:根据意图决定下一步
46 # ================================================
47 def router_node(state: AgentState) -> Command:
48 intent = state.get("current_intent", "")
49
50 # -------------------- 意图关键词识别 --------------------
51 if any(kw in intent for kw in ["平均", "对比", "统计"]):
52 goto_target = "analyze_node"
53 elif any(kw in intent for kw in ["结束", "done", "完成"]):
54 goto_target = END
55 else:
56 goto_target = "query_node" # 继续查询
57
58 return Command(update={}, goto=goto_target) # update 为空:只跳转,不改状态
59
60 # ================================================
61 # 分析节点:对累积的查询结果做综合分析
62 # ================================================
63 def analyze_node(state: AgentState) -> Command:
64 context = state.get("context", {})
65 query_count = context.get("query_count", 0)
66
67 prompt = (
68 "你是一个数据分析助手。用户共查询了 " + str(query_count) + " 次。\n"
69 + "查询记录:" + str(context) + "\n\n"
70 + "请给出一个综合分析结论。"
71 )
72 analysis = llm.invoke(prompt)
73
74 return Command(
75 update={"final_analysis": analysis.content},
76 goto=END
77 )
78
79 # -------------------- 构建图 --------------------
80 builder = StateGraph(AgentState)
81 builder.add_node("query_node", query_node)
82 builder.add_node("router_node", router_node)
83 builder.add_node("analyze_node", analyze_node)
84 builder.add_edge(START, "query_node")
85 builder.add_edge("analyze_node", END)
86
87 checkpointer = MemorySaver()
88 graph = builder.compile(checkpointer=checkpointer)
89
90 # -------------------- 演示:多轮查询 --------------------
91 config = {"configurable": {"thread_id": "multi-turn-001"}}
92 initial_state = {
93 "messages": ["帮我查一下苹果股价"],
94 "current_intent": "查询",
95 "context": {}, # 初始空的 context,Command 会增量合并
96 }
97
98 # 第一轮
99 for event in graph.stream(initial_state, config=config):
100 pass
101
102 # 第二轮(从检查点恢复,加上新消息)
103 checkpoint = checkpointer.get(config["configurable"])
104 resumed_state = checkpoint["configurable"]
105 resumed_state["messages"].append("再查一下谷歌股价")
106 resumed_state["current_intent"] = "查询"
107
108 for event in graph.stream(resumed_state, config=config):
109 pass
110
111 # 第三轮(用户要求计算平均)
112 resumed_state["messages"].append("帮我算一下平均")
113 resumed_state["current_intent"] = "平均"
114 for event in graph.stream(resumed_state, config=config):
115 print(event)

执行流程

1
2
3
4
5
6
7
8
9
10
11
初始状态:messages=["苹果股价"],context={}

query_node:查询苹果 → update={last_query:"苹果", last_result:"$180", query_count:1}

router_node:intent="查询" → goto=query_node(继续查询)

query_node:查询谷歌 → update={...last_query:"谷歌", last_result:"$140", query_count:2}

router_node:intent="平均" → goto=analyze_node(进入分析)

analyze_node:综合分析两次查询结果 → final_analysis="两只股票平均..."

3.2 场景 B:动态路由(根据结果改变目标)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  1  # -------------------- 审核节点:发现问题返回 rejected --------------------
2 def review_node(state: dict) -> Command:
3 content = state.get("content", "")
4
5 prompt = "检查以下内容是否合规(涉及敏感词返回 REJECTED,否则 APPROVED):\n" + content
6 result = llm.invoke(prompt)
7
8 # -------------------- 动态路由:根据 LLM 结果决定下一步 --------------------
9 if "REJECTED" in result.content:
10 return Command(
11 update={
12 "review_status": "rejected", # 更新审核状态
13 "issues": result.content # 将拒绝原因写入状态
14 },
15 goto="edit_node" # 动态跳转到编辑节点(不是下一个固定节点)
16 )
17 else:
18 return Command(
19 update={"review_status": "approved"},
20 goto=END # 通过就结束
21 )
22
23 # -------------------- 编辑节点:根据审核意见修改内容 --------------------
24 def edit_node(state: dict) -> Command:
25 issues = state.get("issues", "")
26 content = state.get("content", "")
27
28 prompt = (
29 "你是内容编辑专家。根据以下审核意见修改内容:\n"
30 + "审核意见:" + issues + "\n"
31 + "原始内容:" + content + "\n\n"
32 + "输出一版修改后的内容。"
33 )
34 new_content = llm.invoke(prompt)
35
36 return Command(
37 update={"content": new_content.content}, # 更新内容
38 goto="review_node" # 重新进入审核(循环)
39 )
40

执行流程

1
2
3
4
5
6
7
8
9
10
用户输入内容

review_node:检查内容
├─ REJECTED → goto=edit_node(动态跳转)
│ ↓
│ edit_node:修改内容
│ ↓
│ review_node:重新审核(循环)
│ ↓
└─ APPROVED → goto=END(结束)

3.3 场景 C:HITL 人工审批后恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
  1  # -------------------- 高风险节点:执行前先中断,等用户审批 --------------------
2 def dangerous_operation_node(state: dict) -> Command:
3 action = state.get("action", "")
4
5 # -------------------- interrupt() 让图在这里暂停 --------------------
6 # 状态会保存到检查点,thread_id 可以随时恢复
7 # 用户在外部审批后调用 graph.invoke(Command(resume=...)) 恢复
8 return Command(
9 update={
10 "pending_action": action, # 记录待审批的操作
11 "status": "awaiting_approval" # 状态标记为"待审批"
12 },
13 goto=END # 暂停在这里(END 或其他节点都行)
14 )
15
16 # -------------------- 恢复后继续执行的节点 --------------------
17 def after_approval_node(state: dict) -> dict:
18 # 此时 state 中包含用户审批结果(通过/拒绝)
19 approved = state.get("approval_result", False)
20
21 if not approved:
22 return {"status": "rejected", "final_result": "操作被用户拒绝"}
23
24 # 用户批准,执行实际操作
25 return {
26 "status": "executed",
27 "final_result": f"操作已执行: {state.get('pending_action', '')}"
28 }
29
30 # -------------------- 在图外部恢复执行 --------------------
31 # 用户在审批页面点击"同意"后:
32 user_command = Command(
33 resume={"approval_result": True}, # 传入审批结果
34 goto="after_approval_node" # 恢复到 after_approval_node
35 )
36
37 # 通过 thread_id 恢复对应会话的执行状态
38 config = {"configurable": {"thread_id": "pending-approval-001"}}
39 for event in graph.invoke(user_command, config=config):
40 print(event)
41

执行流程

1
2
3
4
5
6
7
8
9
10
11
用户提交高风险操作

dangerous_operation_node:执行 interrupt(),状态保存到 thread_id="pending-approval-001"

(图执行暂停,thread_id 保存了当前状态)

用户审批 → 点击"同意"或"拒绝"

外部调用 graph.invoke(Command(resume={approval_result: True}, goto="after_approval_node"))

after_approval_node:根据审批结果继续执行

四、Command vs 普通 return:对比总结

特性普通 return dictCommand
更新状态✅(增量合并)
跳转节点❌(写死在 add_edge)✅(动态决定)
原子性❌(两步独立)✅(一步完成)
跨图跳转✅(通过 goto=”subgraph_name”)
HITL 恢复✅(resume 参数)
适用场景固定线性流程需要动态决策的业务

五、思想总结

设计点说明
为什么 update 是增量合并保证状态在多轮执行中持续累积,不会被覆盖丢失
为什么需要原子操作状态更新和路由决策在一步内完成,防止竞态条件
什么时候用普通 return节点只做计算、不影响流程走向时,用 return 更简单
什么时候用 Command需要动态路由、状态累积、HITL 暂停恢复时必须用 Command
interrupt() 的本质保存当前状态快照到检查点,图执行暂停,等外部触发恢复

六、明天预告

Store 长期记忆:跨会话用户画像持久化,语义检索长期记忆。


参考来源:LangGraph 1.0 官方文档(2026-05),LangGraph Command API 实战(2026-05)