第六章:错误处理与资源清理
6.1 错误处理的重要性
在响应式编程中,错误处理比传统编程更重要:
1 2 3 4 5 6 7 8 9 10 11 12 13
| 传统同步: try { result = callApi(); // 抛出异常 } catch (Exception e) { handleError(e); // 捕获处理 }
响应式: callApi() .subscribe( // 异步回调,try-catch 不起作用! result -> process(result), error -> handleError(error) // 必须在 subscribe 中处理 );
|
关键点:响应式流中的异常不会自动抛出,必须通过操作符处理。
6.2 错误处理操作符
6.2.1 onErrorReturn - 错误返回默认值
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux.just(1, 2, 3) .map(i -> { if (i == 2) throw new RuntimeException("错误"); return i * 10; }) .onErrorReturn(0) .subscribe( System.out::println, e -> System.out.println("错误被捕获: " + e.getMessage()) );
|
使用场景:API 调用失败时返回默认值
6.2.2 onErrorResume - 错误切换到另一个流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Flux.just(1, 2, 3) .map(i -> { if (i == 2) throw new RuntimeException("错误"); return i * 10; }) .onErrorResume(e -> { System.out.println("错误: " + e.getMessage()); return Flux.just(100, 200); }) .subscribe(System.out::println);
|
使用场景:主 API 失败时尝试备用 API
1 2 3 4 5 6 7 8 9 10
| webClient.get().uri(primaryApiUrl) .retrieve() .bodyToMono(String.class) .onErrorResume(e -> webClient.get().uri(backupApiUrl) .retrieve() .bodyToMono(String.class) ) .subscribe(result -> System.out.println("最终结果: " + result));
|
6.2.3 onErrorContinue - 忽略错误继续
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux.just(1, 2, 3) .map(i -> { if (i == 2) throw new RuntimeException("忽略这个"); return i * 10; }) .onErrorContinue((e, obj) -> { System.out.println("跳过错误: " + obj + ", 原因: " + e.getMessage()); }) .subscribe(System.out::println);
|
使用场景:处理批量数据,某条失败不影响其他
6.2.4 retry - 重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Flux.just(1, 2, 3) .flatMap(i -> Mono.fromCallable(() -> { if (i == 2) throw new RuntimeException("临时错误"); return i * 10; })) .retry(3) .subscribe( System.out::println, e -> System.out.println("最终失败: " + e.getMessage()) );
|
使用场景:网络不稳定时自动重试
1 2 3 4 5 6 7
| webClient.get().uri(url) .retrieve() .bodyToMono(String.class) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .filter(e -> e instanceof WebClientResponseException)) .subscribe();
|
6.2.5 onErrorMap - 错误转换
1 2 3 4 5 6 7 8 9 10 11 12
| webClient.get().uri(url) .retrieve() .bodyToMono(String.class) .onErrorMap( e -> e instanceof WebClientResponseException, e -> new BusinessException("API 调用失败: " + e.getMessage()) ) .subscribe( System.out::println, e -> System.out.println("业务错误: " + e.getClass().getSimpleName()) );
|
6.2.6 timeout - 超时处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Mono<String> result = webClient.get() .uri(url) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(5)) .onErrorResume(TimeoutException.class, e -> Mono.just("请求超时,使用缓存数据"));
Mono<String> robust = webClient.get() .uri(url) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(3)) .retry(2) .onErrorReturn("降级数据");
|
6.3 doFinally 资源清理
doFinally 是最重要的清理方法,无论成功、错误还是取消,都会执行。
6.3.1 基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux.just("A", "B", "C") .doFinally(signal -> { System.out.println("最终清理: " + signal); }) .subscribe();
|
6.3.2 SignalType 类型
1 2 3 4 5 6 7 8 9 10 11 12 13
| .doFinally(signal -> { switch (signal) { case OnComplete: System.out.println("正常完成"); break; case OnError: System.out.println("错误终止"); break; case Cancel: System.out.println("被取消"); break; } })
|
6.4 取消订阅 Disposable
6.4.1 什么是 Disposable?
Disposable 是一个可以取消的订阅对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Disposable disposable = Flux.interval(Duration.ofSeconds(1)) .subscribe(i -> System.out.println("Tick: " + i));
Thread.sleep(5000); disposable.dispose();
System.out.println("已取消");
|
6.4.2 项目中的任务中断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public class AgentTaskManager { private final Map<String, TaskInfo> taskMap = new ConcurrentHashMap<>(); public static class TaskInfo { private final Sinks.Many<String> sink; private Disposable disposable; public void setDisposable(Disposable disposable) { this.disposable = disposable; } } public boolean stopTask(String conversationId) { TaskInfo taskInfo = taskMap.get(conversationId); if (taskInfo.disposable != null) { taskInfo.disposable.dispose(); } taskInfo.sink.tryEmitNext("⏹ 用户已停止生成"); taskInfo.sink.tryEmitComplete(); taskMap.remove(conversationId); return true; } }
|
6.4.3 WebSearchReactAgent 中的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
Disposable disposable = chatClient.prompt() .messages(messages) .stream() .chatResponse() .doOnNext(chunk -> processChunk(chunk, sink, state)) .doOnComplete(...) .subscribe();
if (conversationId != null && taskManager != null) { taskManager.setDisposable(conversationId, disposable); }
|
6.5 doOnCancel 取消时处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Flux.interval(Duration.ofSeconds(1)) .doOnCancel(() -> System.out.println("流被取消")) .doOnComplete(() -> System.out.println("流完成")) .take(5) .subscribe();
Thread.sleep(3000); System.out.println("主线程结束");
|
6.6 完���错误处理模式
项目中常用的完整模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| chatClient.prompt() .messages(messages) .stream() .chatResponse() .doOnNext(chunk -> processChunk(chunk, sink, state)) .doOnComplete(() -> { sink.tryEmitNext(createReferenceResponse(referenceJson)); sink.tryEmitComplete(); }) .doOnError(err -> { if (!hasSentFinalResult.get()) { hasSentFinalResult.set(true); sink.tryEmitError(err); } }) .doFinally(signal -> { log.info("流结束,信号: {}", signal); saveSessionResult(...); if (taskManager != null) { taskManager.removeTask(conversationId); } }) .subscribe();
|
6.7 常见错误处理模式
6.7.1 模式1:静默失败,返回空
1 2 3 4 5 6 7
| service.call() .onErrorResume(e -> { log.warn("调用失败", e); return Flux.empty(); }) .subscribe();
|
6.7.2 模式2:降级处理
1 2 3 4 5
| service.getData() .onErrorResume(e -> cache.get(key)) .onErrorResume(e -> Mono.just(defaultData)) .subscribe();
|
6.7.3 模式3:重试 + 降级
1 2 3 4 5 6 7
| service.getData() .retry(3) .onErrorResume(e -> { log.error("重试后仍失败", e); return Mono.just(fallbackData); }) .subscribe();
|
6.8 本章小结
- 错误处理是必须的:响应式异常不会自动抛出
- onErrorReturn:返回默认值
- onErrorResume:切换到备用流
- retry:重试机制
- doFinally:无论成功/错误/取消都执行
- Disposable:取消正在进行的订阅
- doOnCancel:取消时的处理
6.9 练习题
- 编写代码:调用 API,失败后重试 3 次,每次延迟递增
- 使用 doFinally 确保流结束后释放数据库连接
- 模拟用户点击”取消”按钮中断 LLM 调用的完整流程
下一章:我们将学习背压处理,这是应对高并发的重要机制。