2026-05-30 | LangGraph 实战:多Agent协作——Supervisor模式与任务动态分配
多个专业 Agent(研究员、程序员、审核员)如何通过 LangGraph 协作?Supervisor 如何动态分配任务?本文从实际问题出发,讲清楚 Supervisor 模式的原理、代码实现、适用场景与局限性。
一、实际问题:为什么需要多 Agent 协作?
想象你要做一个代码审查场景:
1
| 用户:帮我把这个 Python 函数重构一下,要求加类型注解和文档注释
|
这个任务需要拆分成多个步骤,每个步骤可能需要不同专业能力的 Agent:
- 代码分析 Agent:理解现有函数逻辑,识别输入输出类型
- 重构执行 Agent:生成带类型注解的新代码
- 文档生成 Agent:为函数生成 docstring
- 审核 Agent:检查生成的代码是否符合规范
如果把这四个步骤写在一个 Agent 里,prompt 会变得很长,而且不同专业能力的指令相互干扰,导致输出不稳定。
解决方案:Supervisor 模式——一个中央调度 Agent 负责分解任务、分发给专业 Agent、收集结果、决定下一步。
二、Supervisor 模式的原理
2.1 架构设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 用户输入 | +-------------+ | Supervisor | | (LLM 调度) | +-----+-------+ | +-----+-----+-----+-----+ | | | | | 分析 重构 文档 审核 Agent Agent Agent Agent | | | | +-----+-----+-----+-----+ | 各 Agent 完成后汇报给 Supervisor | Supervisor 判断: 任务完成 or 继续分发
|
2.2 Supervisor 的决策循环
Supervisor 本质上是一个带条件的循环节点。每次收到子 Agent 的结果后,做三个判断:
- 任务是否完成? -> 结束,返回最终结果
- 哪个 Agent 适合处理下一步? -> 分发新任务
- 任务失败怎么办? -> 换其他 Agent 重试 or 降级处理
三、代码实现:从 0 到 1 搭建 Supervisor Agent
3.1 定义各专业 Agent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| 1 from typing import TypedDict, Literal 2 from langgraph.graph import StateGraph, START, END 3 from langgraph.types import Command 4 from langgraph.checkpoint.memory import MemorySaver 5 from langchain_openai import ChatOpenAI 6 7 8 llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) 9 10 11 12 13 def analyzer_node(state: dict) -> dict: 14 15 code = state["code"] 16 17 18 prompt = ( 19 "你是一个代码分析师。分析以下Python函数:\n" 20 + code + "\n\n" 21 + "输出一份结构化分析,包含:input_types(输入类型), " 22 + "output_type(输出类型), logic_summary(逻辑摘要)" 23 ) 24 25 26 analysis = llm.invoke(prompt) 27 28 29 return {"analysis": analysis.content, "next_action": "refactor"} 30 31 32 33 34 35 def refactor_node(state: dict) -> dict: 36 code = state["code"] 37 analysis = state.get("analysis", "") 38 39 prompt = ( 40 "你是代码重构专家。根据以下分析结果,给原始函数加上类型注解和 docstring。\n" 41 + "原始代码:" + code + "\n分析结果:" + analysis 42 ) 43 44 new_code = llm.invoke(prompt) 45 46 47 return {"refactored_code": new_code.content, "next_action": "document"} 48 49 50 51 52 def documenter_node(state: dict) -> dict: 53 54 code = state.get("refactored_code", state["code"]) 55 56 prompt = ( 57 "你是技术文档专家。为以下代码补充完整的 docstring(Google风格):\n" 58 + code + "\n\n输出:带完整 docstring 的代码" 59 ) 60 61 documented = llm.invoke(prompt) 62 63 64 return {"documented_code": documented.content, "next_action": "review"} 65 66 67 68 69 def reviewer_node(state: dict) -> dict: 70 code = state.get("documented_code", state["code"]) 71 72 prompt = ( 73 "你是代码审核专家。检查以下代码是否符合PEP8规范、类型注解是否正确、docstring是否完整。\n" 74 + "代码:" + code + "\n\n" 75 + "输出格式:\n status: APPROVED(通过)/ REJECTED(拒绝)\n" 76 + " issues: [如果拒绝,列出具体问题]" 77 ) 78 79 review = llm.invoke(prompt) 80 81 82 if "APPROVED" in review.content: 83 return {"final_code": code, "status": "approved", "next_action": "done"} 84 85 86 else: 87 return { 88 "status": "rejected", 89 "review_feedback": review.content, 90 "next_action": "refactor" 91 }
|
3.2 Supervisor 决策节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| 1 def supervisor_node(state: dict) -> Command: 2 3 4 current_action = state.get("next_action", "analyzer") 5 6 7 status = state.get("status", "in_progress") 8 9 10 loop_count = state.get("loop_count", 0) 11 12 13 14 if loop_count >= 5: 15 return Command( 16 update={"status": "max_loops_reached"}, 17 goto=END 18 ) 19 20 21 22 23 if status == "rejected": 24 return Command( 25 update={ 26 "status": "in_progress", 27 "loop_count": loop_count + 1 28 }, 29 goto="refactor" 30 ) 31 32 33 34 35 routing = { 36 "analyzer": "analyzer", 37 "refactor": "refactor", 38 "document": "documenter", 39 "review": "reviewer", 40 "done": END, 41 } 42 43 44 45 next_node = routing.get(current_action, END) 46 47 48 return Command(update={"loop_count": loop_count + 1}, goto=next_node)
|
3.3 构建图并执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| 1 2 builder = StateGraph(dict) 3 4 5 builder.add_node("supervisor", supervisor_node) 6 builder.add_node("analyzer", analyzer_node) 7 builder.add_node("refactor", refactor_node) 8 builder.add_node("documenter", documenter_node) 9 builder.add_node("reviewer", reviewer_node) 10 11 12 13 builder.add_edge(START, "supervisor") 14 15 16 17 18 for node in ["analyzer", "refactor", "documenter", "reviewer"]: 19 builder.add_edge(node, "supervisor") 20 21 22 checkpointer = MemorySaver() 23 graph = builder.compile(checkpointer=checkpointer) 24 25 26 27 28 initial_state = { 29 "code": ''' 30 def calculate_distance(lat1, lon1, lat2, lon2): 31 import math 32 R = 6371 33 dlat = math.radians(lat2 - lat1) 34 dlon = math.radians(lon2 - lon1) 35 a = (math.sin(dlat/2)**2 36 + math.cos(math.radians(lat1)) 37 * math.cos(math.radians(lat2)) 38 * math.sin(dlon/2)**2) 39 return 2 * R * math.asin(math.sqrt(a)) 40 ''', 41 "next_action": "analyzer", 42 "status": "in_progress", 43 "loop_count": 0, 44 } 45 46 47 config = {"configurable": {"thread_id": "refactor-session-001"}} 48 49 50 51 52 for event in graph.stream(initial_state, config=config): 53 node_name = list(event.keys())[0] 54 print(f"执行节点: {node_name}") 55 56 57 if "final_code" in event[node_name]: 58 print("审核通过,任务完成") 59 break 60
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 用户输入 -> supervisor ↓ analyzer(分析代码) ↓ supervisor(决策:下一步 refactor) ↓ refactor(加类型注解) ↓ supervisor(决策:下一步 document) ↓ documenter(补充 docstring) ↓ supervisor(决策:下一步 review) ↓ reviewer ├─ APPROVED → supervisor → done(结束) └─ REJECTED → supervisor → refactor(重新重构,循环保护生效)
|
四、思想总结
| 设计点 | 说明 |
|---|
| 为什么要 LLM 当 Supervisor | 任务分配需要根据中间结果做判断,固定规则处理不了复杂场景 |
| 为什么每个 Agent 独立返回 next_action | Supervisor 需要知道每个 Agent 完成后应该往哪走 |
| 为什么要用 Command 而不是 return dict | Command 可以同时更新状态 AND 决定下一步,原子操作 |
| 为什么每个 Agent 完成后都回到 supervisor | Supervisor 是唯一的决策入口,所有节点执行完都要经过它重新决策 |
| 循环次数保护的必要性 | 防止审核被无限拒绝导致死循环,5 次后强制结束 |
| 为什么用 stream() 而不是 invoke() | stream() 流式输出,可以看到每个节点的中间结果,便于调试 |
五、明天预告
Command API 深度用法:Command 跨图调用、子图返回、HITL 恢复等高级场景。
参考来源:LangGraph 1.0 官方文档(2026-05),LangGraph 多Agent协作实战(2026-05)