06 - 深度研究智能体(PlanExecuteAgent)
一、定位
PlanExecuteAgent 是平台最复杂的智能体,实现了真正的自主研究能力。它通过 Plan → Execute → Critique 循环完成深度研究任务,能够自主:
- 澄清用户需求
- 生成研究主题
- 规划任务并并发执行
- 自我批判与反思
- 综合生成报告
二、核心创新点
| 能力 | 实现方式 |
|---|
| 智能并发控制 | Semaphore(3) 控制同时进行的工具调用数 |
| 断点续传 | OverAllState 维护完整状态机 |
| 自我批判 | 每轮执行后评估信息充分性,驱动多轮迭代 |
| 上下文压缩 | contextCharLimit=50000 触发自动截断 |
| 工具重试 | maxToolRetries=2 应对瞬时失败 |
三、四阶段执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| 用户查询 ↓ ┌──────────────────────────────┐ │ Phase 1: 需求澄清 │ │ - 自动判断信息是否充足 │ │ - 不足则停止(转到 FAILED) │ │ - 标记【需要补充信息】 │ └──────────────┬───────────────┘ ↓ ┌──────────────────────────────┐ │ Phase 2: 研究主题生成 │ │ - 基于澄清结果生成精准主题 │ │ - 存储到 state.refinedTopic │ └──────────────┬───────────────┘ ↓ ┌──────────────────────────────┐ │ Phase 3: 计划-执行-批判循环 │ │ ├─ Plan: 生成任务列表 │ │ ├─ Execute: 并发执行工具 │ │ └─ Critique: 评估充分性 │ │ 最多 maxRounds 轮 │ └──────────────┬───────────────┘ ↓ ┌──────────────────────────────┐ │ Phase 4: 综合报告生成 │ │ - 汇总所有工具结果 │ │ - LLM 整理为结构化报告 │ └──────────────────────────────┘
|
四、核心字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Slf4j public class PlanExecuteAgent extends BaseAgent {
private ChatClient chatClient; private final List<ToolCallback> tools;
private final int maxRounds; private final int contextCharLimit; private final int maxToolRetries; private final Semaphore toolSemaphore;
private Disposable.Composite compositeDisposable;
private List<SearchResult> allReferences; private static final ObjectMapper MAPPER = new ObjectMapper(); }
|
五、OverAllState 状态管理
entity/OverAllState.java 是 Plan-Execute 循环的全局状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Data public class OverAllState { private final String conversationId; private final String question; private final List<Message> messages = new ArrayList<>(); private int round = 0; private String refinedResearchTopic;
public int currentChars() { ... } public void clearMessages() { ... } public String renderFullContext() { ... } public String extractToolResults() { ... } public String getLastCritique() { ... } }
|
特殊标记约定:
【需要补充信息】 - Phase 1 标识【Critique Feedback】 - Phase 3 标识【Completed Task Result】 - Phase 3 工具结果标识【开始生成PPT】 / 【暂停生成PPT】 - PPT 状态机标识
六、关键阶段详解
6.1 Phase 1: 需求澄清
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void clarifyRequirementPhase(OverAllState state, Sinks.Many<String> sink, AtomicBoolean finished, StringBuilder thinkingBuffer, Runnable onComplete) { emit(sink, finished, "\n🔍 正在分析您的需求...\n", "thinking", thinkingBuffer);
List<Message> messages = new ArrayList<>(); messages.add(new SystemMessage( PlanExecutePrompts.getCurrentTime() + "\n\n" + PlanExecutePrompts.REQUIREMENT_CLARIFICATION)); messages.addAll(state.getMessages());
chatClient.prompt().messages(messages).stream().content() .doOnNext(chunk -> { }) .doOnComplete(() -> handleClarificationComplete(...)) .doOnError(err -> handleError(...)) .subscribeOn(Schedulers.boundedElastic()) .subscribe(); }
|
完成处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private void handleClarificationComplete(StringBuilder responseBuffer, ...) { String response = responseBuffer.toString(); boolean needsMoreInfo = response.contains("【需要补充信息】");
if (needsMoreInfo) { String pauseMessage = "⏸【暂停深入研究】" + response.replace("【需要补充信息】", "").trim(); sink.tryEmitNext(createTextResponse(pauseMessage)); complete(sink, finished); } else { onComplete.run(); } }
|
6.2 Phase 2: 研究主题生成
1 2 3 4 5 6 7 8 9 10
| private void generateResearchTopicPhase(OverAllState state, Sinks.Many<String> sink, ...) { emit(sink, finished, "📝 正在生成研究主题...\n", "thinking", thinkingBuffer);
messages.add(new SystemMessage(PlanExecutePrompts.getCurrentTime() + "\n\n" + PlanExecutePrompts.RESEARCH_TOPIC_GENERATION)); messages.add(new UserMessage("<original_question>" + state.getQuestion() + "</original_question>"));
}
|
6.3 Phase 3: Plan-Execute-Critique 循环
executeLoopPhase 是核心:
1 2 3 4 5 6 7 8 9 10 11 12
| private void executeLoopPhase(OverAllState state, Sinks.Many<String> sink, AtomicBoolean finished, StringBuilder finalAnswerBuffer, StringBuilder thinkingBuffer) { Mono<Void> executionMono = executeLoop(state, sink, finished, finalAnswerBuffer, thinkingBuffer);
Disposable executionDisposable = executionMono .subscribeOn(Schedulers.boundedElastic()) .subscribe(unused -> {}, e -> handleExecutionError(e, sink, finished));
compositeDisposable.add(executionDisposable); }
|
executeLoop 内部循环:
1 2 3 4 5 6
| for round in 1..maxRounds: 1. generatePlan(state) # Plan: LLM 生成任务列表 2. compressContext(state) # 可选:上下文压缩 3. executePlan(state) # Execute: 并发执行任务 4. critique(state) # Critique: 评估信息充分性 5. if 充分: break
|
6.3.1 信号量并发控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private final Semaphore toolSemaphore = new Semaphore(3);
private void executePlanWithSemaphore(List<PlanTask> tasks) { for (PlanTask task : tasks) { Schedulers.boundedElastic().schedule(() -> { try { toolSemaphore.acquire(); try { executeSingleTask(task); } finally { toolSemaphore.release(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } }
|
6.3.2 工具重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private Object executeToolWithRetry(ToolCallback callback, String args, int retries) { Exception lastException = null; for (int i = 0; i < retries; i++) { try { return callback.call(args); } catch (Exception e) { lastException = e; log.warn("工具调用失败,重试 {}/{}: {}", i + 1, retries, e.getMessage()); try { Thread.sleep(1000 * (i + 1)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return null; } } } throw new RuntimeException("工具调用失败,已重试 " + retries + " 次", lastException); }
|
6.4 上下文压缩
PlanExecuteAgent 在每轮执行前检查并压缩:
1 2 3 4 5 6 7 8 9 10 11
| private void compressContext(OverAllState state) { int currentChars = state.currentChars(); if (currentChars > contextCharLimit) { log.info("上下文超限 ({} > {}), 触发压缩", currentChars, contextCharLimit); state.clearMessages(); state.add(); } }
|
七、Builder 配置
1 2 3 4 5 6 7 8 9
| PlanExecuteAgent agent = PlanExecuteAgent.builder() .chatModel(chatModel) .tools(webSearchToolCallbacks) .sessionService(sessionService) .taskManager(taskManager) .maxRounds(3) .contextCharLimit(50000) .maxToolRetries(2) .build();
|
八、典型请求/响应
请求:
1
| GET /agent/deep/stream?query=分析2024年中国新能源汽车市场发展趋势&conversationId=research-001
|
SSE 响应(简化):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| data: {"type":"thinking","content":"\n🔍 正在分析您的需求...\n"}
data: {"type":"text","content":"好的"}
data: {"type":"text","content":",我需要研究2024年..."}
data: {"type":"thinking","content":"\n✅ 需求分析完成\n"}
data: {"type":"thinking","content":"\n✅ 信息充足,准备生成研究主题\n"}
data: {"type":"thinking","content":"📝 正在生成研究主题...\n"}
data: {"type":"thinking","content":"\n✅ 研究主题已生成\n\n"}
data: {"type":"thinking","content":"\n🔄 第 1 轮计划生成...\n"}
data: {"type":"thinking","content":"🔍 正在搜索信息: 2024年新能源汽车销量\n"}
data: {"type":"thinking","content":"🔍 正在搜索信息: 比亚迪市场份额\n"}
data: {"type":"thinking","content":"🔍 正在搜索信息: 新能源汽车政策\n"}
data: {"type":"thinking","content":"\n🔍 正在评估信息充分性...\n"}
data: {"type":"thinking","content":"\n✅ 信息充分,开始生成最终报告\n"}
data: {"type":"text","content":"# 2024年中国新能源汽车市场分析报告\n\n## 1. 市场规模..."}
data: {"type":"text","content":"## 2. 主要参与者..."}
data: {"type":"reference","content":"[{...}]","count":5}
|
九、错误处理与中断
- Phase 1 信息不足:自动停止,输出”⏸【暂停深入研究】”
- 工具调用失败:自动重试
maxToolRetries 次,最终失败抛出异常 - 用户停止:
taskManager.stopTask() 触发 compositeDisposable.dispose() 中断所有子任务 - 上下文超限:自动压缩,保留关键信息
十、与 WebSearch Agent 的对比
| 维度 | WebSearch | PlanExecute |
|---|
| 循环模式 | 简单 ReAct | Plan-Execute-Critique |
| 轮数 | 5 轮 | 3 轮(更深入) |
| 并发 | 工具并行 | 工具并行 + 信号量(3) |
| 工具重试 | 无 | 2 次 |
| 上下文压缩 | 无 | 50000 字符阈值 |
| 需求澄清 | 无 | 自动判断 |
| 反思能力 | 无 | 每轮批判 |
| 状态管理 | 简单 | OverAllState 持久化 |
十一、关键提示词
| 提示词常量 | 用途 |
|---|
PlanExecutePrompts.REQUIREMENT_CLARIFICATION | 需求澄清阶段 |
PlanExecutePrompts.RESEARCH_TOPIC_GENERATION | 研究主题生成 |
PlanExecutePrompts.PLAN_GENERATION | 计划生成 |
PlanExecutePrompts.CRITIQUE | 批判反思 |
PlanExecutePrompts.SYNTHESIS | 综合报告 |
PlanExecutePrompts.getCurrentTime() | 注入当前时间 |
十二、扩展方向
- 计划依赖关系:当前为简单列表,可扩展为 DAG 支持任务依赖
- 持久化进度:将
OverAllState 持久化到 MySQL 实现真正的断点续传 - 并行报告生成:每个 Plan Task 独立生成片段,最后合并
- 自适应轮数:根据问题复杂度动态调整
maxRounds