04 - WebSearch 智能体
一、定位
WebSearchReactAgent 是基于 ReAct 模式 的联网搜索智能体,集成 Tavily 搜索引擎作为 MCP 工具,负责处理需要实时信息查询的场景。
二、核心字段
1 2 3 4 5 6 7 8 9 10 11
| public class WebSearchReactAgent extends BaseAgent {
private ChatClient chatClient; private final List<ToolCallback> tools; private final String systemPrompt; @Setter private int maxRounds; private final List<Advisor> advisors; private final int maxReflectionRounds;
private static final ObjectMapper MAPPER = new ObjectMapper(); }
|
三、执行流程(ReAct 循环)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| ┌────────────────────────────┐ │ 用户问题 (question) │ └─────────────┬──────────────┘ ↓ ┌────────────────────────────────┐ │ 1. checkRunningTask() │ 防止重复执行 │ 2. initTimers() │ 开始计时 │ 3. registerTask() │ 注册到任务管理器 │ 4. saveQuestion() │ 写入 ai_session └────────────────┬───────────────┘ ↓ ┌────────────────────────────────┐ │ Schedule Round 1 │ ← chatClient.stream() └────────────────┬───────────────┘ ↓ ┌───────────────────────────┐ │ processChunk(chunk) │ │ - 工具调用? → mergeToolCall│ │ - 文本? → ThinkTagParser│ ┌──→ │ - emit 思考/正文 │ │ └─────────────┬─────────────┘ │ ↓ doOnComplete │ ┌─────────────────────────────┐ │ │ finishRound() │ │ │ - mode == TEXT ? 结束 │ │ │ - mode == TOOL_CALL ? │ │ │ - 已达 maxRounds? 强制结束│ │ │ - 否则: 执行工具 │ │ └──────────┬────────────────┘ │ ↓ │ ┌─────────────────────────────┐ │ │ executeToolCalls() │ │ │ - 并行执行所有 tool_calls │ │ │ - Tavily 搜索结果解析 │ │ │ - 追加 ToolResponseMessage│ │ └──────────┬─────────────────┘ │ ↓ └────────────────┘ (继续下一轮) ↓ ┌────────────────────────────────┐ │ forceFinalStream() 强制收尾 │ (达到 maxRounds 时) └────────────────────────────────┘ ↓ ┌────────────────────────────────┐ │ doFinally() │ │ - 输出 reference / recommend │ │ - saveSessionResult() │ │ - taskManager.stopTask() │ └────────────────────────────────┘
|
四、关键方法解析
4.1 ChatClient 初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void initChatClient() { ToolCallingChatOptions toolOptions = ToolCallingChatOptions.builder() .toolCallbacks(tools) .internalToolExecutionEnabled(false) .build();
ChatClient.Builder builder = ChatClient.builder(chatModel); if (!CollectionUtils.isEmpty(advisors)) { builder.defaultAdvisors(advisors); } this.chatClient = builder.defaultOptions(toolOptions) .defaultToolCallbacks(tools) .build(); }
|
为什么关闭内部工具执行?因为:
- 需要自定义执行逻辑(解析 Tavily 搜索结果、发送 thinking 消息)
- 需要在工具执行前后插入业务处理
- 需要把工具结果按
toolCallId 重组
4.2 Chunk 处理(processChunk)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| private void processChunk(ChatResponse chunk, Sinks.Many<String> sink, RoundState state) { Generation gen = chunk.getResult(); String text = gen.getOutput().getText(); String reasoningContent = (String) gen.getOutput().getMetadata().get("reasoningContent"); List<AssistantMessage.ToolCall> tc = gen.getOutput().getToolCalls();
if (tc != null && !tc.isEmpty()) { state.mode = RoundMode.TOOL_CALL; for (AssistantMessage.ToolCall incoming : tc) { mergeToolCall(state, incoming); } return; }
if (text != null) { ThinkTagParser.ParseResult result = ThinkTagParser.parse(text, state.inThink); state.inThink = result.inThink(); for (Segment seg : result.segments()) { if (seg.thinking()) { sink.tryEmitNext(createThinkingResponse(seg.content())); } else { sink.tryEmitNext(createTextResponse(seg.content())); state.textBuffer.append(seg.content()); } } } }
|
大模型返回的 tool_call 经常是分片的(参数很长时),需要按 id 合并:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private void mergeToolCall(RoundState state, AssistantMessage.ToolCall incoming) { for (int i = 0; i < state.toolCalls.size(); i++) { AssistantMessage.ToolCall existing = state.toolCalls.get(i); if (existing.id().equals(incoming.id())) { String mergedArgs = Objects.toString(existing.arguments(), "") + Objects.toString(incoming.arguments(), ""); state.toolCalls.set(i, new AssistantMessage.ToolCall( existing.id(), "function", existing.name(), mergedArgs)); return; } } state.toolCalls.add(incoming); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| private void executeToolCalls(...) { AtomicInteger completedCount = new AtomicInteger(0); int totalToolCalls = toolCalls.size(); Map<String, ToolResponseMessage.ToolResponse> responseMap = new ConcurrentHashMap<>();
for (AssistantMessage.ToolCall tc : toolCalls) { Schedulers.boundedElastic().schedule(() -> { if (hasSentFinalResult.get()) { completeToolCall(...); return; }
String toolName = tc.name(); String argsJson = tc.arguments(); ToolCallback callback = findTool(toolName);
if (callback == null) { responseMap.put(tc.id(), new ToolResponseMessage.ToolResponse( tc.id(), toolName, "{ \"error\": \"工具未找到\" }")); completeToolCall(...); return; }
if (toolName.contains("search")) { JSONObject args = JSON.parseObject(argsJson); String query = (String) args.get("query"); String queryThink = "🔍 正在搜索信息: " + query + "\n"; sink.tryEmitNext(createThinkingResponse(queryThink)); }
try { Object result = callback.call(argsJson); String resultStr = result.toString();
recordUsedTool(toolName);
if (toolName.contains("tavily")) { parseSearchResult(resultStr, agentState); }
responseMap.put(tc.id(), new ToolResponseMessage.ToolResponse( tc.id(), toolName, resultStr)); } catch (Exception ex) { responseMap.put(tc.id(), new ToolResponseMessage.ToolResponse( tc.id(), toolName, "{ \"error\": \"工具执行失败\" }")); } finally { completeToolCall(completedCount, totalToolCalls, responseMap, ...); } }); } }
|
关键点:
- 使用
Schedulers.boundedElastic() 在弹性线程池并行执行 - 使用
ConcurrentHashMap<toolCallId, response> 收集结果 completeToolCall 用 CAS 模式保证只触发一次回调- 工具名前缀
search / tavily 用于特殊处理(发送 thinking / 解析搜索结果)
4.5 Tavily 搜索结果解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private void parseSearchResult(String resultJson, AgentState state) { JsonNode root = MAPPER.readTree(resultJson);
if (!root.isArray() || root.isEmpty()) return; JsonNode first = root.get(0); JsonNode textNode = first.get("text");
JsonNode textJson = textNode.isTextual() ? MAPPER.readTree(textNode.asText()) : textNode;
JsonNode results = textJson.get("results"); for (JsonNode item : results) { String url = getSafe(item, "url"); String title = getSafe(item, "title"); String content = getSafe(item, "content"); if (url != null && !url.isBlank()) { state.searchResults.add(new SearchResult(url, title, content)); } } }
|
Tavily MCP 返回结构(嵌套):
1 2 3 4 5
| [ { "text": "{\"results\":[{\"url\":\"...\",\"title\":\"...\",\"content\":\"...\"}]}" } ]
|
解析后保存到 AgentState.searchResults(List<SearchResult>),最终在流结束时通过 createReferenceResponse 输出。
4.6 强制结束(forceFinalStream)
当达到 maxRounds 时,不再允许 AI 调用工具,改为直接生成最终答案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void forceFinalStream(...) { List<Message> newMessages = new ArrayList<>(); newMessages.add(new SystemMessage(ReactAgentPrompts.getWebSearchPrompt())); for (Message msg : messages) { if (!(msg instanceof SystemMessage)) newMessages.add(msg); } newMessages.add(new UserMessage(""" 你已达到最大推理轮次限制。 请基于当前已有的上下文信息, 直接给出最终答案。 禁止再调用任何工具。 """)); chatClient.prompt().messages(newMessages).stream().content()... }
|
五、Tavily MCP 集成
AgentController.afterPropertiesSet() 启动时初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private void initWebSearchToolCallbacks() throws Exception { String authorizationHeader = "Bearer " + tavilyApiKey;
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() .header("Authorization", authorizationHeader);
HttpClientStreamableHttpTransport tavTransport = HttpClientStreamableHttpTransport .builder(tavilyMcpUrl) .requestBuilder(requestBuilder) .build();
McpSyncClient tavilyMcp = McpClient.sync(tavTransport) .requestTimeout(Duration.ofSeconds(300)) .build(); tavilyMcp.initialize();
SyncMcpToolCallbackProvider provider = SyncMcpToolCallbackProvider.builder() .mcpClients(List.of(tavilyMcp)) .build();
webSearchToolCallbacks = provider.getToolCallbacks(); }
|
配置项(application.yml):
1 2 3
| tavily: api-key: tvly-dev-xxxxxx mcp-url: https://mcp.tavily.com/mcp/
|
六、Builder 模式
1 2 3 4 5 6 7 8
| WebSearchReactAgent agent = WebSearchReactAgent.builder() .name("web react") .chatModel(chatModel) .tools(webSearchToolCallbacks) .sessionService(sessionService) .taskManager(taskManager) .maxRounds(5) .build();
|
七、典型请求/响应示例
请求:
1
| GET /agent/chat/stream?query=今天的天气怎么样&conversationId=user-123
|
SSE 响应(简化):
1 2 3 4 5 6 7 8 9 10 11 12 13
| data: {"type":"thinking","content":"🔍 正在搜索信息: 今天的天气\n"}
data: {"type":"thinking","content":"🔍 正在搜索信息: 上海天气\n"}
data: {"type":"text","content":"根据最新"}
data: {"type":"text","content":"的搜索结果,"}
data: {"type":"text","content":"今天上海晴..."}
data: {"type":"reference","content":"[{\"url\":\"https://...\",\"title\":\"天气预报\"}]","count":1}
data: {"type":"recommend","content":"[\"明天天气如何\",\"这周末适合出游吗\"]","count":2}
|
八、错误处理
- 工具未找到:返回
{"error": "工具未找到"} 加入 responseMap - 工具执行失败:捕获异常后继续流程
- 流式输出错误:
doOnError 触发 sink.tryEmitError(err) - 用户主动停止:
taskManager.stopTask() 中断 Disposable
九、性能指标
每次执行都会记录:
firstResponseTime:首字响应时间(ms)totalResponseTime:总响应时间(ms)usedTools:使用的工具列表references:参考链接 JSONrecommend:推荐问题 JSON
这些数据最终会保存到 ai_session 表中。