第四章:线程调度 Schedulers
4.1 为什么需要线程调度?
响应式编程的核心是非阻塞,但这不意味着不需要线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ┌─────────────────────────────────────────────────────────────────┐ │ 响应式编程的线程模型 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 传统同步: 响应式: │ │ │ │ 线程 A: 线程 A: │ │ request() ──等待──► DB request() ──立即返回──► │ │ ◄──结果── (不等待) │ │ process() (等回调) │ │ │ │ 线程 B: 线程 A: ← 继续处理其他请求 │ │ request() ──等待──► API 回调 ──► 处理结果 │ │ ◄──结果── │ │ │ │ 线程 C: 线程 B: (回调执行) │ │ request() ──等待──► ... 处理耗时操作 │ │ │ └─────────────────────────────────────────────────────────────────┘
|
关键点:
- 发出请求的线程不等结果,立即返回
- 结果通过回调在另一个线程执行
- 同一线程可以处理多个请求
4.2 Schedulers 类型详解
Reactor 提供了多种调度器:
| 调度器 | 创建方式 | 线程池特点 | 适用场景 |
|---|
immediate | Schedulers.immediate() | 当前线程 | 不切换线程 |
single | Schedulers.single() | 单线程池 | 顺序执行 |
parallel | Schedulers.parallel() | CPU 核数 | CPU 密集型 |
boundedElastic | Schedulers.boundedElastic() | 弹性线程池(默认 10*CPU) | I/O 阻塞操作 |
elastic | Schedulers.elastic() | 弹性线程池(已废弃) | 不推荐 |
1 2 3 4 5 6
| Flux.just(1, 2, 3) .subscribeOn(Schedulers.immediate()) .subscribe(i -> System.out.println("线程: " + Thread.currentThread().getName()));
|
4.2.2 single - 单线程
1 2 3 4 5 6 7
| Flux.just(1, 2, 3) .subscribeOn(Schedulers.single()) .subscribe(i -> System.out.println("线程: " + Thread.currentThread().getName()));
|
4.2.3 parallel - 并行(CPU 核数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Flux.just(1, 2, 3, 4) .parallel(4) .runOn(Schedulers.parallel()) .map(i -> { System.out.println("处理: " + i + ", 线程: " + Thread.currentThread().getName()); return i * 10; }) .sequential() .subscribe();
|
4.2.4 boundedElastic - 弹性线程池(最重要!)
1 2 3 4 5 6 7 8 9 10 11 12
| Flux.just(1, 2, 3) .flatMap(i -> Mono.fromCallable(() -> { Thread.sleep(100); return i * 10; })) .subscribeOn(Schedulers.boundedElastic()) .subscribe(i -> System.out.println("结果: " + i));
|
4.3 subscribeOn vs publishOn
这是两个最容易混淆的概念。
4.3.1 subscribeOn - 影响源
subscribeOn 影响数据源的执行线程,只生效一次。
1 2 3 4 5 6 7 8
| Flux.just(1, 2, 3) .subscribeOn(Schedulers.single()) .subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.parallel()) .subscribe(i -> System.out.println("线程: " + Thread.currentThread().getName()));
|
场景:数据源是阻塞的(如数据库查询)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Flux.create(sink -> { List<User> users = jdbcTemplate.query(...); users.forEach(sink::next); }) .map(User::getName) .subscribeOn(Schedulers.boundedElastic()) .subscribe();
Flux.create(sink -> { List<User> users = jdbcTemplate.query(...); users.forEach(sink::next); }) .subscribeOn(Schedulers.boundedElastic()) .subscribe();
|
4.3.2 publishOn - 影响下游
publishOn 影响当前操作符之后的所有操作,可以多次调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| Flux.just(1, 2, 3) .map(i -> { System.out.println("① map: " + Thread.currentThread().getName()); return i * 10; }) .publishOn(Schedulers.single()) .map(i -> { System.out.println("② map: " + Thread.currentThread().getName()); return i + 1; }) .publishOn(Schedulers.boundedElastic()) .map(i -> { System.out.println("③ map: " + Thread.currentThread().getName()); return i * 2; }) .subscribe();
|
4.3.3 对比总结
| 特性 | subscribeOn | publishOn |
|---|
| 作用位置 | 影响数据源(上游) | 影响当前操作之后(下游) |
| 生效次数 | 首次有效 | 可多次调用 |
| 典型场景 | 阻塞的数据源 | 分阶段处理 |
| 代码位置 | 任意(会向上传播) | 影响之后的操作 |
4.3.4 组合使用
1 2 3 4 5 6 7 8
| Flux.just(1, 2, 3) .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel()) .map(i -> computeExpensive(i)) .publishOn(Schedulers.boundedElastic()) .doOnNext(result -> writeToDb(result)) .subscribe();
|
4.4 项目中的实际使用
4.4.1 WebSearchReactAgent 中的线程调度
1 2 3 4 5 6 7 8 9 10
|
Disposable disposable = chatClient.prompt() .messages(messages) .stream() .chatResponse() .publishOn(Schedulers.boundedElastic()) .doOnNext(chunk -> processChunk(chunk, sink, state)) .subscribe();
|
为什么用 boundedElastic?
- LLM API 调用是网络 I/O 操作
- 需要等待响应,线程会阻塞
- 使用弹性线程池,避免阻塞主线程
4.4.2 工具执行的线程调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
Schedulers.boundedElastic().schedule(() -> { if (hasSentFinalResult.get()) { return; } String toolName = tc.name(); try { Object result = callback.call(argsJson); messages.add(ToolResponseMessage.builder() .responses(List.of(tr)) .build()); } catch (Exception ex) { } });
|
4.5 常见错误与最佳实践
4.5.1 常见错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Flux.just(1, 2, 3) .subscribeOn(Schedulers.boundedElastic()) .map(i -> blockingCall(i)) .subscribe();
Flux.just(1, 2, 3) .map(i -> { Thread.sleep(1000); return i * 10; }) .subscribe();
Flux.just(1, 2, 3) .flatMap(i -> Mono.fromCallable(() -> { Thread.sleep(1000); return i * 10; })) .subscribeOn(Schedulers.boundedElastic()) .subscribe();
|
4.5.2 最佳实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Mono.fromCallable(() -> blockingCall()) .subscribeOn(Schedulers.boundedElastic()) .subscribe();
Flux.just(1, 2, 3, 4) .parallel() .runOn(Schedulers.parallel()) .map(i -> cpuHeavyComputation(i)) .sequential() .subscribe();
Mono.zip( Mono.fromCallable(() -> callServiceA()).subscribeOn(Schedulers.boundedElastic()), Mono.fromCallable(() -> callServiceB()).subscribeOn(Schedulers.boundedElastic()), Mono.fromCallable(() -> callServiceC()).subscribeOn(Schedulers.boundedElastic()) ).subscribe();
|
4.6 自定义调度器
1 2 3 4 5 6 7 8 9 10 11
| ExecutorService executor = Executors.newFixedThreadPool(10); Scheduler customScheduler = Schedulers.fromExecutorService(executor);
Flux.just(1, 2, 3) .publishOn(customScheduler) .subscribe();
customScheduler.dispose();
|
4.7 block() 方法 - 阻塞等待结果
在某些场景下(如测试、启动时初始化),需要阻塞等待响应式流的结果:
1 2 3 4 5 6 7 8 9 10 11
| Mono<String> mono = Mono.just("Hello"); String result = mono.block(); System.out.println(result);
Flux<Integer> flux = Flux.range(1, 10); Integer first = flux.blockFirst();
Integer last = flux.blockLast();
|
注意事项:
block() 会阻塞线程,在响应式链中使用可能导致死锁- 在 WebFlux 中不要在请求线程上调用
block() - 仅在测试或启动初始化时使用
4.8 checkpoint() - 调试利器
响应式链中的异常堆栈通常难以定位,checkpoint() 可以记录调用链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Flux.just(1, 2, 0, 4) .map(i -> 10 / i) .checkpoint("Division in map") .subscribe( System.out::println, e -> e.printStackTrace() );
|
1 2 3 4 5
| Flux.just(1, 2, 0, 4) .map(i -> 10 / i) .checkpoint(true) .subscribe();
|
4.9 本章小结
- 为什么需要调度器:非阻塞 ≠ 不用线程,而是线程复用
- boundedElastic:最适合 I/O 阻塞操作(数据库、网络、LLM 调用)
- parallel:适合 CPU 密集型计算
- subscribeOn:影响数据源,只生效一次
- publishOn:影响下游操作,可多次调用
- block():阻塞等待结果,仅在测试/初始化时使用
- checkpoint():调试利器,帮助定位响应式链中的异常
- 最佳实践:阻塞操作 → boundedElastic,CPU 计算 → parallel
4.10 练习题
- 分析以下代码的线程执行路径:
1 2 3 4 5 6
| Flux.just(1, 2, 3) .map(i -> { System.out.println("① " + Thread.currentThread().getName()); return i; }) .subscribeOn(Schedulers.single()) .publishOn(Schedulers.boundedElastic()) .map(i -> { System.out.println("② " + Thread.currentThread().getName()); return i; }) .subscribe();
|
- 如果 LLM 调用需要 10 秒,boundedElastic 线程池默认最多有多少个并发?为什么?
下一章:我们将学习 Sinks.Many,这是项目中实现 SSE 流式响应的核心。