06 - 深度研究智能体(PlanExecuteAgent)

一、定位

PlanExecuteAgent 是平台最复杂的智能体,实现了真正的自主研究能力。它通过 Plan → Execute → Critique 循环完成深度研究任务,能够自主:

  • 澄清用户需求
  • 生成研究主题
  • 规划任务并并发执行
  • 自我批判与反思
  • 综合生成报告

二、核心创新点

能力实现方式
智能并发控制Semaphore(3) 控制同时进行的工具调用数
断点续传OverAllState 维护完整状态机
自我批判每轮执行后评估信息充分性,驱动多轮迭代
上下文压缩contextCharLimit=50000 触发自动截断
工具重试maxToolRetries=2 应对瞬时失败

三、四阶段执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
            用户查询

┌──────────────────────────────┐
│ Phase 1: 需求澄清 │
│ - 自动判断信息是否充足 │
│ - 不足则停止(转到 FAILED) │
│ - 标记【需要补充信息】 │
└──────────────┬───────────────┘

┌──────────────────────────────┐
│ Phase 2: 研究主题生成 │
│ - 基于澄清结果生成精准主题 │
│ - 存储到 state.refinedTopic │
└──────────────┬───────────────┘

┌──────────────────────────────┐
│ Phase 3: 计划-执行-批判循环 │
│ ├─ Plan: 生成任务列表 │
│ ├─ Execute: 并发执行工具 │
│ └─ Critique: 评估充分性 │
│ 最多 maxRounds 轮 │
└──────────────┬───────────────┘

┌──────────────────────────────┐
│ Phase 4: 综合报告生成 │
│ - 汇总所有工具结果 │
│ - LLM 整理为结构化报告 │
└──────────────────────────────┘

四、核心字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class PlanExecuteAgent extends BaseAgent {

private ChatClient chatClient;
private final List<ToolCallback> tools;

// 配置参数
private final int maxRounds; // Plan-Execute 总轮数(默认 3)
private final int contextCharLimit; // 上下文压缩阈值(默认 50000)
private final int maxToolRetries; // 工具重试次数(默认 2)
private final Semaphore toolSemaphore; // 工具并发信号量(默认 3)

// 资源管理
private Disposable.Composite compositeDisposable; // 组合 Disposable 用于取消

// 状态收集
private List<SearchResult> allReferences; // 所有搜索结果
private static final ObjectMapper MAPPER = new ObjectMapper();
}

五、OverAllState 状态管理

entity/OverAllState.java 是 Plan-Execute 循环的全局状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
public class OverAllState {
private final String conversationId;
private final String question;
private final List<Message> messages = new ArrayList<>();
private int round = 0; // 当前轮次
private String refinedResearchTopic; // 精炼的研究主题

public int currentChars() { ... } // 当前消息总字符数
public void clearMessages() { ... } // 清空消息(压缩)
public String renderFullContext() { ... } // 渲染完整上下文
public String extractToolResults() { ... } // 提取工具结果
public String getLastCritique() { ... } // 获取最近一次批判
}

特殊标记约定

  • 【需要补充信息】 - Phase 1 标识
  • 【Critique Feedback】 - Phase 3 标识
  • 【Completed Task Result】 - Phase 3 工具结果标识
  • 【开始生成PPT】 / 【暂停生成PPT】 - PPT 状态机标识

六、关键阶段详解

6.1 Phase 1: 需求澄清

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void clarifyRequirementPhase(OverAllState state, Sinks.Many<String> sink,
AtomicBoolean finished, StringBuilder thinkingBuffer, Runnable onComplete) {
emit(sink, finished, "\n🔍 正在分析您的需求...\n", "thinking", thinkingBuffer);

List<Message> messages = new ArrayList<>();
messages.add(new SystemMessage(
PlanExecutePrompts.getCurrentTime() + "\n\n"
+ PlanExecutePrompts.REQUIREMENT_CLARIFICATION));
messages.addAll(state.getMessages());

chatClient.prompt().messages(messages).stream().content()
.doOnNext(chunk -> { /* ThinkTagParser 解析 */ })
.doOnComplete(() -> handleClarificationComplete(...))
.doOnError(err -> handleError(...))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}

完成处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void handleClarificationComplete(StringBuilder responseBuffer, ...) {
String response = responseBuffer.toString();
boolean needsMoreInfo = response.contains("【需要补充信息】");

if (needsMoreInfo) {
// 暂停:保存并停止
String pauseMessage = "⏸【暂停深入研究】" + response.replace("【需要补充信息】", "").trim();
sink.tryEmitNext(createTextResponse(pauseMessage));
complete(sink, finished);
} else {
// 继续:进入 Phase 2
onComplete.run();
}
}

6.2 Phase 2: 研究主题生成

1
2
3
4
5
6
7
8
9
10
private void generateResearchTopicPhase(OverAllState state, Sinks.Many<String> sink, ...) {
emit(sink, finished, "📝 正在生成研究主题...\n", "thinking", thinkingBuffer);

messages.add(new SystemMessage(PlanExecutePrompts.getCurrentTime()
+ "\n\n" + PlanExecutePrompts.RESEARCH_TOPIC_GENERATION));
messages.add(new UserMessage("<original_question>" + state.getQuestion() + "</original_question>"));

// 流式生成
// ... 完成后调用 state.setRefinedResearchTopic(topic)
}

6.3 Phase 3: Plan-Execute-Critique 循环

executeLoopPhase 是核心:

1
2
3
4
5
6
7
8
9
10
11
12
private void executeLoopPhase(OverAllState state, Sinks.Many<String> sink,
AtomicBoolean finished, StringBuilder finalAnswerBuffer,
StringBuilder thinkingBuffer) {
Mono<Void> executionMono = executeLoop(state, sink, finished, finalAnswerBuffer, thinkingBuffer);

Disposable executionDisposable = executionMono
.subscribeOn(Schedulers.boundedElastic())
.subscribe(unused -> {},
e -> handleExecutionError(e, sink, finished));

compositeDisposable.add(executionDisposable);
}

executeLoop 内部循环:

1
2
3
4
5
6
for round in 1..maxRounds:
1. generatePlan(state) # Plan: LLM 生成任务列表
2. compressContext(state) # 可选:上下文压缩
3. executePlan(state) # Execute: 并发执行任务
4. critique(state) # Critique: 评估信息充分性
5. if 充分: break

6.3.1 信号量并发控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 同时最多 3 个工具调用
private final Semaphore toolSemaphore = new Semaphore(3);

private void executePlanWithSemaphore(List<PlanTask> tasks) {
for (PlanTask task : tasks) {
Schedulers.boundedElastic().schedule(() -> {
try {
toolSemaphore.acquire();
try {
executeSingleTask(task);
} finally {
toolSemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}

6.3.2 工具重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Object executeToolWithRetry(ToolCallback callback, String args, int retries) {
Exception lastException = null;
for (int i = 0; i < retries; i++) {
try {
return callback.call(args);
} catch (Exception e) {
lastException = e;
log.warn("工具调用失败,重试 {}/{}: {}", i + 1, retries, e.getMessage());
try { Thread.sleep(1000 * (i + 1)); } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return null;
}
}
}
throw new RuntimeException("工具调用失败,已重试 " + retries + " 次", lastException);
}

6.4 上下文压缩

PlanExecuteAgent 在每轮执行前检查并压缩:

1
2
3
4
5
6
7
8
9
10
11
private void compressContext(OverAllState state) {
int currentChars = state.currentChars();
if (currentChars > contextCharLimit) {
log.info("上下文超限 ({} > {}), 触发压缩", currentChars, contextCharLimit);
// 1. 保留 SystemMessage 和最新一轮
// 2. 移除中间轮次的详细信息
// 3. 保留工具结果摘要
state.clearMessages();
state.add(/* 重新构造的精简 messages */);
}
}

七、Builder 配置

1
2
3
4
5
6
7
8
9
PlanExecuteAgent agent = PlanExecuteAgent.builder()
.chatModel(chatModel)
.tools(webSearchToolCallbacks) // 工具列表
.sessionService(sessionService)
.taskManager(taskManager)
.maxRounds(3) // Plan-Execute 循环轮数
.contextCharLimit(50000) // 上下文压缩阈值
.maxToolRetries(2) // 工具重试次数
.build();

八、典型请求/响应

请求

1
GET /agent/deep/stream?query=分析2024年中国新能源汽车市场发展趋势&conversationId=research-001

SSE 响应(简化):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
data: {"type":"thinking","content":"\n🔍 正在分析您的需求...\n"}

data: {"type":"text","content":"好的"}

data: {"type":"text","content":",我需要研究2024年..."}

data: {"type":"thinking","content":"\n✅ 需求分析完成\n"}

data: {"type":"thinking","content":"\n✅ 信息充足,准备生成研究主题\n"}

data: {"type":"thinking","content":"📝 正在生成研究主题...\n"}

data: {"type":"thinking","content":"\n✅ 研究主题已生成\n\n"}

data: {"type":"thinking","content":"\n🔄 第 1 轮计划生成...\n"}

data: {"type":"thinking","content":"🔍 正在搜索信息: 2024年新能源汽车销量\n"}

data: {"type":"thinking","content":"🔍 正在搜索信息: 比亚迪市场份额\n"}

data: {"type":"thinking","content":"🔍 正在搜索信息: 新能源汽车政策\n"}

data: {"type":"thinking","content":"\n🔍 正在评估信息充分性...\n"}

data: {"type":"thinking","content":"\n✅ 信息充分,开始生成最终报告\n"}

data: {"type":"text","content":"# 2024年中国新能源汽车市场分析报告\n\n## 1. 市场规模..."}

data: {"type":"text","content":"## 2. 主要参与者..."}

data: {"type":"reference","content":"[{...}]","count":5}

九、错误处理与中断

  • Phase 1 信息不足:自动停止,输出”⏸【暂停深入研究】”
  • 工具调用失败:自动重试 maxToolRetries 次,最终失败抛出异常
  • 用户停止taskManager.stopTask() 触发 compositeDisposable.dispose() 中断所有子任务
  • 上下文超限:自动压缩,保留关键信息

十、与 WebSearch Agent 的对比

维度WebSearchPlanExecute
循环模式简单 ReActPlan-Execute-Critique
轮数5 轮3 轮(更深入)
并发工具并行工具并行 + 信号量(3)
工具重试2 次
上下文压缩50000 字符阈值
需求澄清自动判断
反思能力每轮批判
状态管理简单OverAllState 持久化

十一、关键提示词

提示词常量用途
PlanExecutePrompts.REQUIREMENT_CLARIFICATION需求澄清阶段
PlanExecutePrompts.RESEARCH_TOPIC_GENERATION研究主题生成
PlanExecutePrompts.PLAN_GENERATION计划生成
PlanExecutePrompts.CRITIQUE批判反思
PlanExecutePrompts.SYNTHESIS综合报告
PlanExecutePrompts.getCurrentTime()注入当前时间

十二、扩展方向

  • 计划依赖关系:当前为简单列表,可扩展为 DAG 支持任务依赖
  • 持久化进度:将 OverAllState 持久化到 MySQL 实现真正的断点续传
  • 并行报告生成:每个 Plan Task 独立生成片段,最后合并
  • 自适应轮数:根据问题复杂度动态调整 maxRounds