2026-05-30 | LangGraph 实战:多Agent协作——Supervisor模式与任务动态分配

多个专业 Agent(研究员、程序员、审核员)如何通过 LangGraph 协作?Supervisor 如何动态分配任务?本文从实际问题出发,讲清楚 Supervisor 模式的原理、代码实现、适用场景与局限性。


一、实际问题:为什么需要多 Agent 协作?

想象你要做一个代码审查场景:

1
用户:帮我把这个 Python 函数重构一下,要求加类型注解和文档注释

这个任务需要拆分成多个步骤,每个步骤可能需要不同专业能力的 Agent:

  1. 代码分析 Agent:理解现有函数逻辑,识别输入输出类型
  2. 重构执行 Agent:生成带类型注解的新代码
  3. 文档生成 Agent:为函数生成 docstring
  4. 审核 Agent:检查生成的代码是否符合规范

如果把这四个步骤写在一个 Agent 里,prompt 会变得很长,而且不同专业能力的指令相互干扰,导致输出不稳定。

解决方案:Supervisor 模式——一个中央调度 Agent 负责分解任务、分发给专业 Agent、收集结果、决定下一步。


二、Supervisor 模式的原理

2.1 架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
     用户输入
|
+-------------+
| Supervisor |
| (LLM 调度) |
+-----+-------+
|
+-----+-----+-----+-----+
| | | | |
分析 重构 文档 审核
Agent Agent Agent Agent
| | | |
+-----+-----+-----+-----+
|
各 Agent 完成后汇报给 Supervisor
|
Supervisor 判断:
任务完成 or 继续分发

2.2 Supervisor 的决策循环

Supervisor 本质上是一个带条件的循环节点。每次收到子 Agent 的结果后,做三个判断:

  1. 任务是否完成? -> 结束,返回最终结果
  2. 哪个 Agent 适合处理下一步? -> 分发新任务
  3. 任务失败怎么办? -> 换其他 Agent 重试 or 降级处理

三、代码实现:从 0 到 1 搭建 Supervisor Agent

3.1 定义各专业 Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
 1  from typing import TypedDict, Literal      # TypedDict: 定义状态类型字典;Literal: 限制路由返回值
2 from langgraph.graph import StateGraph, START, END # StateGraph: 状态图;START/END: 入口/出口节点
3 from langgraph.types import Command # Command: 原子操作,同时更新状态 + 跳转节点
4 from langgraph.checkpoint.memory import MemorySaver # 内存型检查点,重启丢失(仅测试用)
5 from langchain_openai import ChatOpenAI # OpenAI LLM 接口
6
7 # -------------------- LLM 实例 --------------------
8 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) # temperature=0: 输出稳定,方便调试
9
10 # ================================================
11 # 代码分析 Agent:理解函数逻辑,提取输入输出类型
12 # ================================================
13 def analyzer_node(state: dict) -> dict:
14 # 从状态中取出待分析的代码
15 code = state["code"]
16
17 # 组装 prompt:要求 LLM 输出结构化分析结果
18 prompt = (
19 "你是一个代码分析师。分析以下Python函数:\n"
20 + code + "\n\n"
21 + "输出一份结构化分析,包含:input_types(输入类型), "
22 + "output_type(输出类型), logic_summary(逻辑摘要)"
23 )
24
25 # 调用 LLM 获取分析结果
26 analysis = llm.invoke(prompt)
27
28 # 返回更新后的状态:analysis=分析结果,next_action="refactor" 告诉 Supervisor 下一步走哪个节点
29 return {"analysis": analysis.content, "next_action": "refactor"}
30
31
32 # ================================================
33 # 重构 Agent:根据分析结果,为代码加上类型注解
34 # ================================================
35 def refactor_node(state: dict) -> dict:
36 code = state["code"] # 原始代码(不变,用于对比)
37 analysis = state.get("analysis", "") # 如果没有分析结果则为空字符串(容错)
38
39 prompt = (
40 "你是代码重构专家。根据以下分析结果,给原始函数加上类型注解和 docstring。\n"
41 + "原始代码:" + code + "\n分析结果:" + analysis
42 )
43
44 new_code = llm.invoke(prompt)
45
46 # 返回重构后的代码,下一步流转到文档生成节点
47 return {"refactored_code": new_code.content, "next_action": "document"}
48
49 # ================================================
50 # 文档 Agent:为代码补充完整文档注释
51 # ================================================
52 def documenter_node(state: dict) -> dict:
53 # 优先使用已重构的代码,如果没有则用原始代码
54 code = state.get("refactored_code", state["code"])
55
56 prompt = (
57 "你是技术文档专家。为以下代码补充完整的 docstring(Google风格):\n"
58 + code + "\n\n输出:带完整 docstring 的代码"
59 )
60
61 documented = llm.invoke(prompt)
62
63 # 返回带文档的代码,下一步流转到审核节点
64 return {"documented_code": documented.content, "next_action": "review"}
65
66 # ================================================
67 # 审核 Agent:检查代码质量,通过则结束,拒则重新重构
68 # ================================================
69 def reviewer_node(state: dict) -> dict:
70 code = state.get("documented_code", state["code"])
71
72 prompt = (
73 "你是代码审核专家。检查以下代码是否符合PEP8规范、类型注解是否正确、docstring是否完整。\n"
74 + "代码:" + code + "\n\n"
75 + "输出格式:\n status: APPROVED(通过)/ REJECTED(拒绝)\n"
76 + " issues: [如果拒绝,列出具体问题]"
77 )
78
79 review = llm.invoke(prompt)
80
81 # 如果 LLM 判断为 APPROVED,返回最终代码,next_action="done" 触发结束
82 if "APPROVED" in review.content:
83 return {"final_code": code, "status": "approved", "next_action": "done"}
84
85 # 如果为 REJECTED,更新状态为 rejected,Supervisor 会根据这个状态决定重新走 refactor
86 else:
87 return {
88 "status": "rejected",
89 "review_feedback": review.content, # 将审核拒绝原因写入状态,供 Supervisor 参考
90 "next_action": "refactor" # 下一步重新走 refactor 节点
91 }

3.2 Supervisor 决策节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 1  def supervisor_node(state: dict) -> Command:
2 # -------------------- 从状态中读取关键字段 --------------------
3 # current_action: 上一个 Agent 认为下一步应该执行什么
4 current_action = state.get("next_action", "analyzer") # 默认从分析开始
5
6 # status: 整体任务状态(in_progress / rejected / approved 等)
7 status = state.get("status", "in_progress")
8
9 # loop_count: 当前已循环次数,防止无限循环
10 loop_count = state.get("loop_count", 0)
11
12 # -------------------- 保护机制:最大循环次数 --------------------
13 # 如果循环超过 5 次,强制结束任务(防止某些边缘情况死循环)
14 if loop_count >= 5:
15 return Command(
16 update={"status": "max_loops_reached"}, # 更新状态记录
17 goto=END # 跳转 END,直接结束整张图
18 )
19
20 # -------------------- 审核拒绝恢复逻辑 --------------------
21 # 当 reviewer 返回 REJECTED 时,status="rejected"
22 # Supervisor 收到这个状态后,让 refactor_node 重新处理,而不是从头开始
23 if status == "rejected":
24 return Command(
25 update={
26 "status": "in_progress", # 重置状态为进行中
27 "loop_count": loop_count + 1 # 循环计数 +1
28 },
29 goto="refactor" # 跳过 analyzer,直接回到重构节点
30 )
31
32 # -------------------- 动态路由映射表 --------------------
33 # next_action 由各 Agent 填充,Supervisor 根据这个值决定下一个节点
34 # 注意:"done" 映射到 END,表示任务完成
35 routing = {
36 "analyzer": "analyzer", # 分析节点完成后去分析
37 "refactor": "refactor", # 重构节点完成后去重构
38 "document": "documenter", # 文档节点完成后去文档
39 "review": "reviewer", # 审核节点完成后去审核
40 "done": END, # done 表示流程结束
41 }
42
43 # -------------------- 核心决策逻辑 --------------------
44 # 从 routing 表中查下一个节点(如果不在表里,默认 END)
45 next_node = routing.get(current_action, END)
46
47 # 返回 Command:更新状态(累加 loop_count)+ 跳转目标节点
48 return Command(update={"loop_count": loop_count + 1}, goto=next_node)

3.3 构建图并执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
 1  # -------------------- 构建状态图 --------------------
2 builder = StateGraph(dict) # dict: 状态类型(用普通字典,不用 TypedDict)
3
4 # -------------------- 添加所有节点 --------------------
5 builder.add_node("supervisor", supervisor_node) # 调度节点(LLM 决策中心)
6 builder.add_node("analyzer", analyzer_node) # 代码分析节点
7 builder.add_node("refactor", refactor_node) # 重构节点
8 builder.add_node("documenter", documenter_node) # 文档生成节点
9 builder.add_node("reviewer", reviewer_node) # 审核节点
10
11 # -------------------- 设置入口边 --------------------
12 # 用户的输入首先进入 supervisor,由它决定第一个要执行的节点
13 builder.add_edge(START, "supervisor")
14
15 # -------------------- 各 Agent 完成后回到 Supervisor --------------------
16 # 这是一个关键设计:每个 Agent 执行完毕后,都回到 supervisor_node 重新做决策
17 # 这样 Supervisor 可以根据当前状态动态决定下一步,而不是写死流程
18 for node in ["analyzer", "refactor", "documenter", "reviewer"]:
19 builder.add_edge(node, "supervisor")
20
21 # -------------------- 编译图 --------------------
22 checkpointer = MemorySaver() # 内存检查点(测试用);生产用 PostgresSaver
23 graph = builder.compile(checkpointer=checkpointer) # 编译后得到可执行的 graph 对象
24
25 # ================================================
26 # 执行示例:重构一个计算距离的函数
27 # ================================================
28 initial_state = {
29 "code": '''
30 def calculate_distance(lat1, lon1, lat2, lon2):
31 import math
32 R = 6371
33 dlat = math.radians(lat2 - lat1)
34 dlon = math.radians(lon2 - lon1)
35 a = (math.sin(dlat/2)**2
36 + math.cos(math.radians(lat1))
37 * math.cos(math.radians(lat2))
38 * math.sin(dlon/2)**2)
39 return 2 * R * math.asin(math.sqrt(a))
40 ''',
41 "next_action": "analyzer", # 第一个节点:代码分析
42 "status": "in_progress", # 任务状态:进行中
43 "loop_count": 0, # 初始循环次数:0
44 }
45
46 # thread_id 是检查点的唯一标识,同一个 thread_id 重启后可恢复状态
47 config = {"configurable": {"thread_id": "refactor-session-001"}}
48
49 # -------------------- 流式执行(streaming) --------------------
50 # graph.stream() 是流式调用,每次返回一个 {节点名: 输出状态} 的事件
51 # 我们遍历这些事件,直到 final_code 出现在某个节点的输出中,表示流程结束
52 for event in graph.stream(initial_state, config=config):
53 node_name = list(event.keys())[0] # 取出事件中的节点名
54 print(f"执行节点: {node_name}") # 打印当前执行到的节点(调试用)
55
56 # 如果 final_code 出现,说明 reviewer 认为代码已通过审核,流程结束
57 if "final_code" in event[node_name]:
58 print("审核通过,任务完成")
59 break
60

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
用户输入 -> supervisor

analyzer(分析代码)

supervisor(决策:下一步 refactor)

refactor(加类型注解)

supervisor(决策:下一步 document)

documenter(补充 docstring)

supervisor(决策:下一步 review)

reviewer
├─ APPROVED → supervisor → done(结束)
└─ REJECTED → supervisor → refactor(重新重构,循环保护生效)

四、思想总结

设计点说明
为什么要 LLM 当 Supervisor任务分配需要根据中间结果做判断,固定规则处理不了复杂场景
为什么每个 Agent 独立返回 next_actionSupervisor 需要知道每个 Agent 完成后应该往哪走
为什么要用 Command 而不是 return dictCommand 可以同时更新状态 AND 决定下一步,原子操作
为什么每个 Agent 完成后都回到 supervisorSupervisor 是唯一的决策入口,所有节点执行完都要经过它重新决策
循环次数保护的必要性防止审核被无限拒绝导致死循环,5 次后强制结束
为什么用 stream() 而不是 invoke()stream() 流式输出,可以看到每个节点的中间结果,便于调试

五、明天预告

Command API 深度用法:Command 跨图调用、子图返回、HITL 恢复等高级场景。


参考来源:LangGraph 1.0 官方文档(2026-05),LangGraph 多Agent协作实战(2026-05)