03 - 智能体框架(BaseAgent 抽象基类) 一、定位 agent/BaseAgent.java 是所有智能体的抽象基类,定义了统一的:
通用字段(ChatModel、ChatMemory、SessionService、TaskManager) 通用方法(持久化记忆加载、推荐问题生成、任务管理) 统一的 SSE 输出格式 子类只需实现 execute() 一个方法即可。
二、类图与继承关系 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ┌────────────────────────────┐ │ BaseAgent (抽象) │ │ - chatModel │ │ - name │ │ - chatMemory │ │ - sessionService │ │ - taskManager │ │ - agentType │ │ - enableRecommendations │ │ - startTime, usedTools... │ └─────────────┬──────────────┘ │ extends ┌─────────────┬───────────┼───────────┬─────────────┐ ↓ ↓ ↓ ↓ ↓ ┌────────┐ ┌────────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐ │WebSearch│ │FileReact │ │PlanExec │ │PPTBuilder│ │SkillsReac│ │React │ │Agent │ │uteAgent │ │Agent │ │tAgent │ └────────┘ └────────────┘ └─────────┘ └──────────┘ └──────────┘
三、字段定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j public abstract class BaseAgent { protected final ChatModel chatModel; protected final String name; protected ChatMemory chatMemory; protected AiSessionService sessionService; protected AgentTaskManager taskManager; protected String agentType; protected boolean enableRecommendations = true ; protected long startTime; protected long firstResponseTime; protected Set<String> usedTools; protected Long currentSessionId; protected String currentConversationId; protected String currentQuestion; protected String currentRecommendations; }
四、抽象方法 1 2 3 4 5 6 7 public abstract Flux<String> execute (String conversationId, String question) ;
五、核心方法 5.1 持久化记忆加载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public ChatMemory createPersistentChatMemory (String sessionId, int maxMessages) { List<AiSession> history = sessionService.findRecentBySessionId(sessionId, maxMessages); ChatMemory chatMemory = MessageWindowChatMemory.builder() .maxMessages(maxMessages) .build(); if (history != null && !history.isEmpty()) { for (int i = history.size() - 1 ; i >= 0 ; i--) { AiSession record = history.get(i); if (record.getQuestion() != null ) { chatMemory.add(sessionId, new UserMessage (record.getQuestion())); } if (record.getAnswer() != null ) { chatMemory.add(sessionId, new AssistantMessage (record.getAnswer())); } } } return chatMemory; }
调用流程 (以 AgentController 为例):
调用 agent.createPersistentChatMemory(conversationId, 30) → 返回装载了历史的 ChatMemory 调用 agent.setChatMemory(memory) 注入 Agent 调用 agent.stream(conversationId, query) 开始执行 5.2 历史记忆辅助 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected void loadChatHistory (String conversationId, List<Message> messages, boolean skipSystem, boolean addLabel) { if (conversationId != null && chatMemory != null ) { List<Message> history = chatMemory.get(conversationId); if (history != null && !history.isEmpty()) { if (addLabel) { messages.add(new UserMessage ("对话历史:" )); } for (Message msg : history) { if (skipSystem && msg instanceof SystemMessage) { continue ; } messages.add(msg); } } } }
5.3 SSE 响应工厂 BaseAgent 提供了 5 个工厂方法,对应 5 类事件:
工厂方法 JSON 结构 用途 createTextResponse(content){"type":"text","content":"..."}最终回答 createThinkingResponse(content){"type":"thinking","content":"..."}思考过程 createReferenceResponse(content){"type":"reference","content":"...","count":N}参考链接 createErrorResponse(content){"type":"error","content":"..."}错误信息 createRecommendResponse(content){"type":"recommend","content":"...","count":N}推荐问题
底层均委托给 common/AgentResponse.java(参见 [01 架构概览 - SSE 协议](./01-architecture-overview.md#四统一流 式响应协议sse))。
5.4 任务管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected Flux<String> checkRunningTask (String conversationId) { if (conversationId != null && taskManager != null && taskManager.hasRunningTask(conversationId)) { return Flux.error(new IllegalStateException ("该会话正在执行中,请稍后再试" )); } return null ; } protected AgentTaskManager.TaskInfo registerTask (String conversationId, Sinks.Many<String> sink) { if (conversationId != null && taskManager != null ) { return taskManager.registerTask(conversationId, sink, agentType); } return null ; }
5.5 性能指标收集 1 2 3 4 5 protected void initTimers () { ... }protected void recordFirstResponse () { ... } protected long getTotalResponseTime () { ... } protected void recordUsedTool (String toolName) { ... } protected String getUsedToolsString () { ... }
这些指标最终会保存到 ai_session 表的 first_response_time / total_response_time / tools 字段。
5.6 推荐问题生成 1 2 3 4 5 6 7 8 9 10 protected String generateRecommendations (String conversationId, String currentQuestion, String currentAnswer) { if (!enableRecommendations) return null ; }
注意 :推荐问题生成是阻塞式 调用,会增加额外的 LLM 调用开销,因此默认 enableRecommendations=true 但子类可按需关闭。
5.7 会话结果保存 1 2 3 4 5 6 protected boolean updateAnswer (UpdateAnswerRequest request) { if (sessionService != null ) { return sessionService.updateAnswer(request); } return false ; }
六、典型执行流程 以 WebSearchReactAgent 为例,整体执行链路如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 Client ──HTTP──→ AgentController.webSearchStream() │ ▼ initWebSearchAgent() ─── builder 模式构造 │ ▼ createPersistentChatMemory() ←─ sessionService.findRecentBySessionId │ ▼ streamInternal(conversationId, query) │ ├── checkRunningTask(conversationId) ──→ taskManager.hasRunningTask ├── registerTask(conversationId, sink) ──→ taskManager.registerTask ├── initTimers() ├── 构造 messages(System + History + Question) ├── sessionService.saveQuestion() ──→ ai_session 插入 question │ ▼ ┌─────── ReAct 循环 ───────┐ │ 1. chatClient.stream() │ │ 2. processChunk() │ ──→ createThinkingResponse / createTextResponse │ 3. finishRound() │ │ 4. executeToolCalls() │ ──→ Tavily MCP 搜索 │ 5. scheduleRound() │ └───────────────────────────┘ │ ▼ doFinally(): saveSessionResult() ──→ sessionService.updateAnswer │ ▼ taskManager.stopTask(conversationId) │ ▼ Client ←─SSE chunks─
七、扩展指南 7.1 新增一个智能体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Slf4j public class MyAgent extends BaseAgent { public MyAgent (ChatModel chatModel, ...) { super ("MyAgent" , chatModel, "my-type" ); } @Override public Flux<String> execute (String conversationId, String question) { } }
7.2 复用 SSE 响应 子类应统一使用 createTextResponse() / createThinkingResponse() 等工厂方法,避免直接拼接 JSON。
7.3 复用任务管理 每个 execute() 方法都应:
在开始前调用 checkRunningTask() 检查冲突 在开始时调用 registerTask() 注册任务 在结束时通过 taskManager.stopTask() 清理 八、注意事项 usedTools 初始化 :子类构造函数需 this.usedTools = new HashSet<>();首字响应计时 :recordFirstResponse() 应在 doOnNext 中首次调用思考/正文分离 :使用 ThinkTagParser.parse() 解析 <think/> 标签持久化调用时机 :先 saveQuestion(保存 question)→ 流完成后 updateAnswer(保存 answer)