03 - 智能体框架(BaseAgent 抽象基类)

一、定位

agent/BaseAgent.java 是所有智能体的抽象基类,定义了统一的:

  • 通用字段(ChatModel、ChatMemory、SessionService、TaskManager)
  • 通用方法(持久化记忆加载、推荐问题生成、任务管理)
  • 统一的 SSE 输出格式

子类只需实现 execute() 一个方法即可。

二、类图与继承关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
                 ┌────────────────────────────┐
│ BaseAgent (抽象) │
│ - chatModel │
│ - name │
│ - chatMemory │
│ - sessionService │
│ - taskManager │
│ - agentType │
│ - enableRecommendations │
│ - startTime, usedTools... │
└─────────────┬──────────────┘
│ extends
┌─────────────┬───────────┼───────────┬─────────────┐
↓ ↓ ↓ ↓ ↓
┌────────┐ ┌────────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│WebSearch│ │FileReact │ │PlanExec │ │PPTBuilder│ │SkillsReac│
│React │ │Agent │ │uteAgent │ │Agent │ │tAgent │
└────────┘ └────────────┘ └─────────┘ └──────────┘ └──────────┘

三、字段定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public abstract class BaseAgent {

protected final ChatModel chatModel; // Spring AI ChatModel
protected final String name; // Agent 名称
protected ChatMemory chatMemory; // 会话记忆
protected AiSessionService sessionService; // 会话持久化服务
protected AgentTaskManager taskManager; // 任务管理器(停止/TTL)
protected String agentType; // 智能体类型标识

protected boolean enableRecommendations = true; // 是否启用推荐问题

// 计时相关
protected long startTime; // 开始时间(毫秒)
protected long firstResponseTime; // 首字响应时间
protected Set<String> usedTools; // 使用的工具集合
protected Long currentSessionId; // 当前会话数据库 ID
protected String currentConversationId; // 当前会话业务 ID
protected String currentQuestion; // 当前问题
protected String currentRecommendations; // 当前推荐问题
}

四、抽象方法

1
2
3
4
5
6
7
/**
* 子类必须实现的执行方法
* @param conversationId 会话ID
* @param question 用户问题
* @return 流式输出(Flux<String>)
*/
public abstract Flux<String> execute(String conversationId, String question);

五、核心方法

5.1 持久化记忆加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ChatMemory createPersistentChatMemory(String sessionId, int maxMessages) {
// 1. 查询数据库中的对话历史
List<AiSession> history = sessionService.findRecentBySessionId(sessionId, maxMessages);

// 2. 创建 MessageWindowChatMemory
ChatMemory chatMemory = MessageWindowChatMemory.builder()
.maxMessages(maxMessages)
.build();

// 3. 将历史记录按时间顺序添加到 ChatMemory
if (history != null && !history.isEmpty()) {
for (int i = history.size() - 1; i >= 0; i--) {
AiSession record = history.get(i);
if (record.getQuestion() != null) {
chatMemory.add(sessionId, new UserMessage(record.getQuestion()));
}
if (record.getAnswer() != null) {
chatMemory.add(sessionId, new AssistantMessage(record.getAnswer()));
}
}
}
return chatMemory;
}

调用流程(以 AgentController 为例):

  1. 调用 agent.createPersistentChatMemory(conversationId, 30) → 返回装载了历史的 ChatMemory
  2. 调用 agent.setChatMemory(memory) 注入 Agent
  3. 调用 agent.stream(conversationId, query) 开始执行

5.2 历史记忆辅助

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 加载历史记忆并添加到消息列表
*/
protected void loadChatHistory(String conversationId, List<Message> messages,
boolean skipSystem, boolean addLabel) {
if (conversationId != null && chatMemory != null) {
List<Message> history = chatMemory.get(conversationId);
if (history != null && !history.isEmpty()) {
if (addLabel) {
messages.add(new UserMessage("对话历史:"));
}
for (Message msg : history) {
if (skipSystem && msg instanceof SystemMessage) {
continue;
}
messages.add(msg);
}
}
}
}

5.3 SSE 响应工厂

BaseAgent 提供了 5 个工厂方法,对应 5 类事件:

工厂方法JSON 结构用途
createTextResponse(content){"type":"text","content":"..."}最终回答
createThinkingResponse(content){"type":"thinking","content":"..."}思考过程
createReferenceResponse(content){"type":"reference","content":"...","count":N}参考链接
createErrorResponse(content){"type":"error","content":"..."}错误信息
createRecommendResponse(content){"type":"recommend","content":"...","count":N}推荐问题

底层均委托给 common/AgentResponse.java(参见 [01 架构概览 - SSE 协议](./01-architecture-overview.md#四统一流 式响应协议sse))。

5.4 任务管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 检查是否有任务在执行
*/
protected Flux<String> checkRunningTask(String conversationId) {
if (conversationId != null && taskManager != null
&& taskManager.hasRunningTask(conversationId)) {
return Flux.error(new IllegalStateException("该会话正在执行中,请稍后再试"));
}
return null;
}

/**
* 注册任务到管理器
*/
protected AgentTaskManager.TaskInfo registerTask(String conversationId, Sinks.Many<String> sink) {
if (conversationId != null && taskManager != null) {
return taskManager.registerTask(conversationId, sink, agentType);
}
return null;
}

5.5 性能指标收集

1
2
3
4
5
protected void initTimers() { ... }
protected void recordFirstResponse() { ... } // 记录首字响应时间
protected long getTotalResponseTime() { ... } // 获取总响应时长
protected void recordUsedTool(String toolName) { ... } // 记录使用工具
protected String getUsedToolsString() { ... } // 拼接工具列表

这些指标最终会保存到 ai_session 表的 first_response_time / total_response_time / tools 字段。

5.6 推荐问题生成

1
2
3
4
5
6
7
8
9
10
protected String generateRecommendations(String conversationId,
String currentQuestion,
String currentAnswer) {
if (!enableRecommendations) return null;

// 1. 构造 messages:系统提示词 + 历史记忆 + 当前会话
// 2. 使用 BeanOutputConverter<List<String>> 进行结构化输出
// 3. 调用 ChatClient 一次性生成 3 个推荐问题
// 4. 返回 JSON 数组字符串
}

注意:推荐问题生成是阻塞式调用,会增加额外的 LLM 调用开销,因此默认 enableRecommendations=true 但子类可按需关闭。

5.7 会话结果保存

1
2
3
4
5
6
protected boolean updateAnswer(UpdateAnswerRequest request) {
if (sessionService != null) {
return sessionService.updateAnswer(request);
}
return false;
}

六、典型执行流程

WebSearchReactAgent 为例,整体执行链路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Client ──HTTP──→ AgentController.webSearchStream()


initWebSearchAgent() ─── builder 模式构造


createPersistentChatMemory() ←─ sessionService.findRecentBySessionId


streamInternal(conversationId, query)

├── checkRunningTask(conversationId) ──→ taskManager.hasRunningTask
├── registerTask(conversationId, sink) ──→ taskManager.registerTask
├── initTimers()
├── 构造 messages(System + History + Question)
├── sessionService.saveQuestion() ──→ ai_session 插入 question


┌─────── ReAct 循环 ───────┐
│ 1. chatClient.stream() │
│ 2. processChunk() │ ──→ createThinkingResponse / createTextResponse
│ 3. finishRound() │
│ 4. executeToolCalls() │ ──→ Tavily MCP 搜索
│ 5. scheduleRound() │
└───────────────────────────┘


doFinally(): saveSessionResult() ──→ sessionService.updateAnswer


taskManager.stopTask(conversationId)


Client ←─SSE chunks─

七、扩展指南

7.1 新增一个智能体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class MyAgent extends BaseAgent {

public MyAgent(ChatModel chatModel, ...) {
super("MyAgent", chatModel, "my-type");
// 注入自己的依赖
}

@Override
public Flux<String> execute(String conversationId, String question) {
// 1. 复用 checkRunningTask / registerTask
// 2. 复用 initTimers / loadChatHistory
// 3. 实现自己的 ReAct 循环
// 4. 在 doFinally 中调用 saveSessionResult
}
}

7.2 复用 SSE 响应

子类应统一使用 createTextResponse() / createThinkingResponse() 等工厂方法,避免直接拼接 JSON。

7.3 复用任务管理

每个 execute() 方法都应:

  1. 在开始前调用 checkRunningTask() 检查冲突
  2. 在开始时调用 registerTask() 注册任务
  3. 在结束时通过 taskManager.stopTask() 清理

八、注意事项

  • usedTools 初始化:子类构造函数需 this.usedTools = new HashSet<>();
  • 首字响应计时recordFirstResponse() 应在 doOnNext 中首次调用
  • 思考/正文分离:使用 ThinkTagParser.parse() 解析 <think/> 标签
  • 持久化调用时机:先 saveQuestion(保存 question)→ 流完成后 updateAnswer(保存 answer)