第八章:Spring 集成与项目实战
8.1 Spring WebFlux 基础
Spring WebFlux 是 Spring 的响应式 Web 框架,完全支持 Reactor。
8.1.1 WebFlux vs MVC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@RestController public class SyncController { @GetMapping("/user/{id}") public User getUser(@PathVariable Long id) { return userService.findById(id); } @GetMapping("/users") public List<User> getUsers() { return userService.findAll(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@RestController public class ReactiveController { @GetMapping("/user/{id}") public Mono<User> getUser(@PathVariable Long id) { return userService.findById(id); } @GetMapping("/users") public Flux<User> getUsers() { return userService.findAll(); } }
|
8.1.2 SSE 流式响应
Server-Sent Events (SSE) 实现实时推送:
1 2 3 4 5 6 7 8 9
| @RestController public class ChatController { @GetMapping(value = "/chat", produces = "text/event-stream;charset=UTF-8") public Flux<String> chat(@RequestParam String query) { return agent.stream(query); } }
|
前端接收:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| const source = new EventSource('/chat?query=你好');
source.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'text') { appendToChat(data.content); } else if (data.type === 'thinking') { showThinkingIndicator(data.content); } else if (data.type === 'error') { showError(data.content); } };
source.onerror = () => { console.log('连接断开'); source.close(); };
|
8.2 Dodo-Agent 项目实战
8.2.1 AgentController 完整解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
@GetMapping(value = "/chat/stream", produces = "text/event-stream;charset=UTF-8") @Operation(summary = "智能问答", description = "接收用户查询并返回流式响应") public Flux<String> webSearchStream( @RequestParam String query, @RequestParam String conversationId) { log.info("收到请求: query={}, conversationId={}", query, conversationId); if (query == null || query.trim().isEmpty()) { return Flux.error(new IllegalArgumentException("查询参数不能为空")); } try { WebSearchReactAgent agent = initWebSearchAgent(); ChatMemory memory = agent.createPersistentChatMemory(conversationId, 30); agent.setChatMemory(memory); return agent.stream(conversationId, query); } catch (Exception e) { log.error("处理请求时发生错误: ", e); return Flux.error(e); } }
|
8.2.2 WebSearchReactAgent 核心流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
public Flux<String> stream(String conversationId, String question) { Flux<String> checkResult = checkRunningTask(conversationId); if (checkResult != null) { return checkResult; } initTimers(); clearUsedTools(); Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(); registerTask(conversationId, sink); List<Message> messages = Collections.synchronizedList(new ArrayList<>()); messages.add(new SystemMessage(prompt)); loadChatHistory(conversationId, messages); messages.add(new UserMessage(question)); if (sessionService != null) { AiSession session = sessionService.saveQuestion(...); currentSessionId = session.getId(); } scheduleRound(messages, sink, ...); return sink.asFlux() .doOnNext(chunk -> { recordFirstResponse(); processResponse(chunk); }) .doOnCancel(() -> { taskManager.stopTask(conversationId); }) .doFinally(signal -> { saveSessionResult(...); taskManager.removeTask(conversationId); }); }
|
8.2.3 工具调用流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
private void executeToolCalls(Sinks.Many<String> sink, List<ToolCall> toolCalls, List<Message> messages, ...) { AtomicInteger completedCount = new AtomicInteger(0); int total = toolCalls.size(); for (ToolCall tc : toolCalls) { Schedulers.boundedElastic().schedule(() -> { String thinking = "🔍 正在搜索: " + query; sink.tryEmitNext(createThinkingResponse(thinking)); try { Object result = callback.call(argsJson); ToolResponse tr = new ToolResponse(tc.id(), tc.name(), result.toString()); messages.add(ToolResponseMessage.builder() .responses(List.of(tr)) .build()); recordUsedTool(tc.name()); } catch (Exception e) { addErrorToolResponse(messages, tc, e.getMessage()); } finally { if (completedCount.incrementAndGet() >= total) { scheduleRound(...); } } }); } }
|
8.3 Sinks.Many + Flux 实现 SSE
8.3.1 完整模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public Flux<String> streamResponse(String question) { Sinks.Many<String> sink = Sinks.many() .unicast() .onBackpressureBuffer(1000); CompletableFuture.runAsync(() -> { try { String response = llmClient.chat(question); for (int i = 0; i < response.length(); i++) { String chunk = response.substring(i, i + 1); sink.tryEmitNext(createTextResponse(chunk)); Thread.sleep(30); } sink.tryEmitComplete(); } catch (Exception e) { sink.tryEmitError(e); } }); return sink.asFlux() .doOnNext(chunk -> { }) .doFinally(signal -> { }); }
|
8.3.2 多种消息类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| private String createTextResponse(String content) { return JSON.toJSONString( Map.of("type", "text", "content", content) ); }
private String createThinkingResponse(String content) { return JSON.toJSONString( Map.of("type", "thinking", "content", content) ); }
private String createReferenceResponse(String content) { List<Map> refs = JSON.parseArray(content, Map.class); return JSON.toJSONString( Map.of("type", "reference", "content", content, "count", refs.size()) ); }
private String createRecommendResponse(String content) { return JSON.toJSONString( Map.of("type", "recommend", "content", content) ); }
private String createErrorResponse(String content) { return JSON.toJSONString( Map.of("type", "error", "content", content) ); }
|
8.4 任务管理:中断与取消
8.4.1 AgentTaskManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
public class AgentTaskManager { private final Map<String, TaskInfo> taskMap = new ConcurrentHashMap<>(); public TaskInfo registerTask(String conversationId, Sinks.Many<String> sink, String agentType) { if (taskMap.containsKey(conversationId)) { log.warn("会话已有任务在执行: {}", conversationId); return null; } TaskInfo info = new TaskInfo(sink, agentType); taskMap.put(conversationId, info); log.info("注册任务: conversationId={}, agentType={}", conversationId, agentType); return info; } public boolean stopTask(String conversationId) { TaskInfo info = taskMap.get(conversationId); if (info == null) { log.warn("没有正在执行的任务: {}", conversationId); return false; } try { Disposable disposable = info.getDisposable(); if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); log.info("已中断 LLM 调用: {}", conversationId); } String stopMsg = createStopMessage(); info.getSink().tryEmitNext(stopMsg); info.getSink().tryEmitComplete(); taskMap.remove(conversationId); log.info("任务已停止: {}", conversationId); return true; } catch (Exception e) { log.error("停止任务失败: {}", conversationId, e); return false; } } }
|
8.4.2 Controller 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@GetMapping("/stop") @Operation(summary = "停止Agent执行") public Map<String, Object> stopAgent(@RequestParam String conversationId) { log.info("收到停止请求: conversationId={}", conversationId); boolean success = taskManager.stopTask(conversationId); Map<String, Object> result = new HashMap<>(); if (success) { result.put("success", true); result.put("message", "已停止执行"); } else { result.put("success", false); result.put("message", "没有找到正在执行的任务"); } return result; }
|
8.5 错误处理的最佳实践
8.5.1 Controller 层错误处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @GetMapping(value = "/chat/stream", produces = "text/event-stream;charset=UTF-8") public Flux<String> chatStream(String query, String conversationId) { if (query == null || query.trim().isEmpty()) { return Flux.error(new IllegalArgumentException("查询参数不能为空")); } try { return agent.stream(conversationId, query); } catch (Exception e) { log.error("处理请求异常", e); return Flux.error(new RuntimeException("服务处理异常: " + e.getMessage())); } }
|
8.5.2 Agent 层错误处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| chatClient.prompt() .messages(messages) .stream() .chatResponse() .doOnNext(chunk -> processChunk(chunk, sink, state)) .doOnComplete(() -> { sink.tryEmitNext(referenceJson); sink.tryEmitComplete(); }) .doOnError(err -> { if (!hasSentFinalResult.get()) { hasSentFinalResult.set(true); sink.tryEmitError(err); } }) .doFinally(signal -> { saveSessionResult(...); taskManager.removeTask(conversationId); }) .subscribe();
|
8.6 WebClient - 响应式 HTTP 客户端
WebClient 是 Spring WebFlux 提供的非阻塞 HTTP 客户端。
8.6.1 基础用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| WebClient client = WebClient.create("https://api.example.com");
Mono<String> result = client.get() .uri("/users/{id}", userId) .retrieve() .bodyToMono(String.class);
Mono<User> created = client.post() .uri("/users") .contentType(MediaType.APPLICATION_JSON) .bodyValue(newUser) .retrieve() .bodyToMono(User.class);
Flux<String> stream = client.get() .uri("/chat/stream") .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class);
|
8.6.2 错误处理
1 2 3 4 5 6 7 8 9 10 11
| client.get() .uri("/api/data") .retrieve() .onStatus(HttpStatusCode::is4xxClientError, response -> Mono.error(new ClientException("客户端错误"))) .onStatus(HttpStatusCode::is5xxServerError, response -> Mono.error(new ServerException("服务器错误"))) .bodyToMono(String.class) .retry(3) .timeout(Duration.ofSeconds(5)) .onErrorReturn("降级数据");
|
8.7 StepVerifier - 响应式测试利器
8.7.1 基础用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| StepVerifier.create(Flux.just("A", "B", "C")) .expectNext("A") .expectNext("B") .expectNext("C") .verifyComplete();
StepVerifier.create(Mono.just("Hello")) .expectNext("Hello") .verifyComplete();
StepVerifier.create(Flux.error(new RuntimeException("错误"))) .expectError(RuntimeException.class) .verify();
|
8.7.2 高级用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| StepVerifier.create(Flux.interval(Duration.ofSeconds(1)).take(3)) .expectNext(0L) .expectNext(1L) .expectNext(2L) .verifyComplete();
StepVerifier.create(Mono.delay(Duration.ofSeconds(10))) .expectNext(0L) .verifyTimeout(Duration.ofSeconds(5));
StepVerifier.create(Flux.range(1, 5)) .expectNextCount(5) .verifyComplete();
|
8.8 性能优化技巧
8.6.1 减少内存占用
1 2 3 4 5 6 7 8
| flux.collectList().block();
flux .filter(...) .map(...) .subscribe();
|
8.6.2 并行处理
1 2 3 4 5 6 7
| Flux.just(1, 2, 3, 4) .parallel() .runOn(Schedulers.parallel()) .map(i -> computeHeavy(i)) .sequential() .subscribe();
|
8.6.3 合理使用缓存
1 2 3 4 5 6 7 8 9 10 11
| public Mono<User> getUser(Long id) { return CacheService.get("user:" + id) .switchIfEmpty( userRepository.findById(id) .flatMap(user -> CacheService.set("user:" + id, user) .then(Mono.just(user)) ) ); }
|
8.9 本章小结
- Spring WebFlux 是响应式 Web 框架
- Sinks.Many + Flux 是 SSE 流式响应的核心模式
- AgentTaskManager 管理任务的中断和取消
- 多种消息类型(text/thinking/reference/recommend)
- 错误处理链:doOnError + doFinally
- WebClient:响应式 HTTP 客户端,替代 RestTemplate
- StepVerifier:响应式测试利器
- 性能优化:流式处理、并行化、合理缓存
8.10 练习题
- 分析 AgentController 中
/chat/stream 接口的完整请求流程 - 模拟实现一个带停止功能的流式问答
- 如果 LLM 返回出错,前端如何接收错误消息?
附录:Reactor 快速入门总结
核心类型
| 类型 | 元素数 | 场景 |
|---|
Flux<T> | 0-N | 列表、流式数据 |
Mono<T> | 0-1 | 单值、HTTP 响应 |
关键概念
- 订阅才执行:创建 Flux/Mono 后,需要 subscribe() 才开始流动
- 非阻塞:线程不被阻塞,通过回调处理结果
- 操作符:map/flatMap/filter 是最常用的转换操作符
- Sinks.Many:手动推送数据,实现 SSE
- Schedulers:boundedElastic 适合 I/O 操作
- 错误处理:onErrorReturn/onErrorResume 必须有兜底
项目模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public Flux<String> stream(String question) { Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(); executeAsync(question, result -> { for (String chunk : result) { sink.tryEmitNext(chunk); } sink.tryEmitComplete(); }); return sink.asFlux() .doFinally(cleanup); }
|
学习路径建议:
- 第一遍:通读所有章节,理解概念
- 第二遍:结合项目代码,理解实际应用
- 第三遍:动手实践,写一个简单的流式响应示例
祝学习愉快! 🚀