04 - WebSearch 智能体

一、定位

WebSearchReactAgent 是基于 ReAct 模式 的联网搜索智能体,集成 Tavily 搜索引擎作为 MCP 工具,负责处理需要实时信息查询的场景。

二、核心字段

1
2
3
4
5
6
7
8
9
10
11
public class WebSearchReactAgent extends BaseAgent {

private ChatClient chatClient; // Spring AI ChatClient
private final List<ToolCallback> tools; // 工具回调(Tavily MCP)
private final String systemPrompt; // 自定义系统提示词
@Setter private int maxRounds; // 最大工具调用轮次(默认 5)
private final List<Advisor> advisors; // ChatMemory advisors
private final int maxReflectionRounds; // 最大反思轮次(预留)

private static final ObjectMapper MAPPER = new ObjectMapper();
}

三、执行流程(ReAct 循环)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
    ┌────────────────────────────┐
│ 用户问题 (question) │
└─────────────┬──────────────┘

┌────────────────────────────────┐
│ 1. checkRunningTask() │ 防止重复执行
│ 2. initTimers() │ 开始计时
│ 3. registerTask() │ 注册到任务管理器
│ 4. saveQuestion() │ 写入 ai_session
└────────────────┬───────────────┘

┌────────────────────────────────┐
│ Schedule Round 1 │ ← chatClient.stream()
└────────────────┬───────────────┘

┌───────────────────────────┐
│ processChunk(chunk) │
│ - 工具调用? → mergeToolCall│
│ - 文本? → ThinkTagParser│
┌──→ │ - emit 思考/正文 │
│ └─────────────┬─────────────┘
│ ↓ doOnComplete
│ ┌─────────────────────────────┐
│ │ finishRound() │
│ │ - mode == TEXT ? 结束 │
│ │ - mode == TOOL_CALL ? │
│ │ - 已达 maxRounds? 强制结束│
│ │ - 否则: 执行工具 │
│ └──────────┬────────────────┘
│ ↓
│ ┌─────────────────────────────┐
│ │ executeToolCalls() │
│ │ - 并行执行所有 tool_calls │
│ │ - Tavily 搜索结果解析 │
│ │ - 追加 ToolResponseMessage│
│ └──────────┬─────────────────┘
│ ↓
└────────────────┘ (继续下一轮)

┌────────────────────────────────┐
│ forceFinalStream() 强制收尾 │ (达到 maxRounds 时)
└────────────────────────────────┘

┌────────────────────────────────┐
│ doFinally() │
│ - 输出 reference / recommend │
│ - saveSessionResult() │
│ - taskManager.stopTask() │
└────────────────────────────────┘

四、关键方法解析

4.1 ChatClient 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void initChatClient() {
// 关键:关闭 Spring AI 内部工具执行,由 Agent 自己控制
ToolCallingChatOptions toolOptions = ToolCallingChatOptions.builder()
.toolCallbacks(tools)
.internalToolExecutionEnabled(false)
.build();

ChatClient.Builder builder = ChatClient.builder(chatModel);
if (!CollectionUtils.isEmpty(advisors)) {
builder.defaultAdvisors(advisors);
}
this.chatClient = builder.defaultOptions(toolOptions)
.defaultToolCallbacks(tools)
.build();
}

为什么关闭内部工具执行?因为:

  1. 需要自定义执行逻辑(解析 Tavily 搜索结果、发送 thinking 消息)
  2. 需要在工具执行前后插入业务处理
  3. 需要把工具结果按 toolCallId 重组

4.2 Chunk 处理(processChunk)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void processChunk(ChatResponse chunk, Sinks.Many<String> sink, RoundState state) {
Generation gen = chunk.getResult();
String text = gen.getOutput().getText();
String reasoningContent = (String) gen.getOutput().getMetadata().get("reasoningContent");
List<AssistantMessage.ToolCall> tc = gen.getOutput().getToolCalls();

// 1. 工具调用模式
if (tc != null && !tc.isEmpty()) {
state.mode = RoundMode.TOOL_CALL;
for (AssistantMessage.ToolCall incoming : tc) {
mergeToolCall(state, incoming); // 合并同一 tool_call 的多次返回
}
return;
}

// 2. 文本模式:使用 ThinkTagParser 拆出思考/正文
if (text != null) {
ThinkTagParser.ParseResult result = ThinkTagParser.parse(text, state.inThink);
state.inThink = result.inThink();
for (Segment seg : result.segments()) {
if (seg.thinking()) {
sink.tryEmitNext(createThinkingResponse(seg.content()));
} else {
sink.tryEmitNext(createTextResponse(seg.content()));
state.textBuffer.append(seg.content());
}
}
}
}

4.3 工具调用合并(mergeToolCall)

大模型返回的 tool_call 经常是分片的(参数很长时),需要按 id 合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void mergeToolCall(RoundState state, AssistantMessage.ToolCall incoming) {
for (int i = 0; i < state.toolCalls.size(); i++) {
AssistantMessage.ToolCall existing = state.toolCalls.get(i);
if (existing.id().equals(incoming.id())) {
String mergedArgs = Objects.toString(existing.arguments(), "")
+ Objects.toString(incoming.arguments(), "");
state.toolCalls.set(i, new AssistantMessage.ToolCall(
existing.id(), "function", existing.name(), mergedArgs));
return;
}
}
state.toolCalls.add(incoming);
}

4.4 工具并行执行(executeToolCalls)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void executeToolCalls(...) {
AtomicInteger completedCount = new AtomicInteger(0);
int totalToolCalls = toolCalls.size();
Map<String, ToolResponseMessage.ToolResponse> responseMap = new ConcurrentHashMap<>();

for (AssistantMessage.ToolCall tc : toolCalls) {
Schedulers.boundedElastic().schedule(() -> {
if (hasSentFinalResult.get()) {
completeToolCall(...);
return;
}

String toolName = tc.name();
String argsJson = tc.arguments();
ToolCallback callback = findTool(toolName);

if (callback == null) {
// 工具未找到
responseMap.put(tc.id(), new ToolResponseMessage.ToolResponse(
tc.id(), toolName, "{ \"error\": \"工具未找到\" }"));
completeToolCall(...);
return;
}

// 工具名前缀匹配:发送 thinking 消息
if (toolName.contains("search")) {
JSONObject args = JSON.parseObject(argsJson);
String query = (String) args.get("query");
String queryThink = "🔍 正在搜索信息: " + query + "\n";
sink.tryEmitNext(createThinkingResponse(queryThink));
}

try {
Object result = callback.call(argsJson);
String resultStr = result.toString();

// 记录使用的工具
recordUsedTool(toolName);

// 解析 Tavily 搜索结果
if (toolName.contains("tavily")) {
parseSearchResult(resultStr, agentState);
}

responseMap.put(tc.id(), new ToolResponseMessage.ToolResponse(
tc.id(), toolName, resultStr));
} catch (Exception ex) {
responseMap.put(tc.id(), new ToolResponseMessage.ToolResponse(
tc.id(), toolName, "{ \"error\": \"工具执行失败\" }"));
} finally {
completeToolCall(completedCount, totalToolCalls, responseMap, ...);
}
});
}
}

关键点

  • 使用 Schedulers.boundedElastic() 在弹性线程池并行执行
  • 使用 ConcurrentHashMap<toolCallId, response> 收集结果
  • completeToolCall 用 CAS 模式保证只触发一次回调
  • 工具名前缀 search / tavily 用于特殊处理(发送 thinking / 解析搜索结果)

4.5 Tavily 搜索结果解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void parseSearchResult(String resultJson, AgentState state) {
JsonNode root = MAPPER.readTree(resultJson);

if (!root.isArray() || root.isEmpty()) return;
JsonNode first = root.get(0);
JsonNode textNode = first.get("text");

// text 字段可能是 JSON 字符串,需要再次解析
JsonNode textJson = textNode.isTextual()
? MAPPER.readTree(textNode.asText())
: textNode;

JsonNode results = textJson.get("results");
for (JsonNode item : results) {
String url = getSafe(item, "url");
String title = getSafe(item, "title");
String content = getSafe(item, "content");
if (url != null && !url.isBlank()) {
state.searchResults.add(new SearchResult(url, title, content));
}
}
}

Tavily MCP 返回结构(嵌套):

1
2
3
4
5
[
{
"text": "{\"results\":[{\"url\":\"...\",\"title\":\"...\",\"content\":\"...\"}]}"
}
]

解析后保存到 AgentState.searchResultsList<SearchResult>),最终在流结束时通过 createReferenceResponse 输出。

4.6 强制结束(forceFinalStream)

当达到 maxRounds 时,不再允许 AI 调用工具,改为直接生成最终答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void forceFinalStream(...) {
// 1. 重建消息列表
List<Message> newMessages = new ArrayList<>();
newMessages.add(new SystemMessage(ReactAgentPrompts.getWebSearchPrompt()));
for (Message msg : messages) {
if (!(msg instanceof SystemMessage)) newMessages.add(msg);
}
// 2. 追加限制提示
newMessages.add(new UserMessage("""
你已达到最大推理轮次限制。
请基于当前已有的上下文信息,
直接给出最终答案。
禁止再调用任何工具。
"""));
// 3. 重新调用 LLM(流式)
chatClient.prompt().messages(newMessages).stream().content()...
}

五、Tavily MCP 集成

AgentController.afterPropertiesSet() 启动时初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void initWebSearchToolCallbacks() throws Exception {
String authorizationHeader = "Bearer " + tavilyApiKey;

HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.header("Authorization", authorizationHeader);

HttpClientStreamableHttpTransport tavTransport = HttpClientStreamableHttpTransport
.builder(tavilyMcpUrl)
.requestBuilder(requestBuilder)
.build();

McpSyncClient tavilyMcp = McpClient.sync(tavTransport)
.requestTimeout(Duration.ofSeconds(300))
.build();
tavilyMcp.initialize();

SyncMcpToolCallbackProvider provider = SyncMcpToolCallbackProvider.builder()
.mcpClients(List.of(tavilyMcp))
.build();

webSearchToolCallbacks = provider.getToolCallbacks();
}

配置项application.yml):

1
2
3
tavily:
api-key: tvly-dev-xxxxxx
mcp-url: https://mcp.tavily.com/mcp/

六、Builder 模式

1
2
3
4
5
6
7
8
WebSearchReactAgent agent = WebSearchReactAgent.builder()
.name("web react")
.chatModel(chatModel)
.tools(webSearchToolCallbacks)
.sessionService(sessionService)
.taskManager(taskManager)
.maxRounds(5)
.build();

七、典型请求/响应示例

请求

1
GET /agent/chat/stream?query=今天的天气怎么样&conversationId=user-123

SSE 响应(简化):

1
2
3
4
5
6
7
8
9
10
11
12
13
data: {"type":"thinking","content":"🔍 正在搜索信息: 今天的天气\n"}

data: {"type":"thinking","content":"🔍 正在搜索信息: 上海天气\n"}

data: {"type":"text","content":"根据最新"}

data: {"type":"text","content":"的搜索结果,"}

data: {"type":"text","content":"今天上海晴..."}

data: {"type":"reference","content":"[{\"url\":\"https://...\",\"title\":\"天气预报\"}]","count":1}

data: {"type":"recommend","content":"[\"明天天气如何\",\"这周末适合出游吗\"]","count":2}

八、错误处理

  • 工具未找到:返回 {"error": "工具未找到"} 加入 responseMap
  • 工具执行失败:捕获异常后继续流程
  • 流式输出错误doOnError 触发 sink.tryEmitError(err)
  • 用户主动停止taskManager.stopTask() 中断 Disposable

九、性能指标

每次执行都会记录:

  • firstResponseTime:首字响应时间(ms)
  • totalResponseTime:总响应时间(ms)
  • usedTools:使用的工具列表
  • references:参考链接 JSON
  • recommend:推荐问题 JSON

这些数据最终会保存到 ai_session 表中。