第三章:订阅流程与生命周期
3.1 订阅到底做了什么?
当你调用 subscribe() 时,Reactor 内部会发生什么?
1 2 3 4 5 6
| Flux.just("A", "B", "C") .subscribe( element -> System.out.println("收到: " + element), error -> System.out.println("错误: " + error), () -> System.out.println("完成") );
|
3.1.1 订阅流程图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ┌─────────────────────────────────────────────────────────────────┐ │ subscribe() 完整流程 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. subscribe() 被调用 │ │ │ │ │ ▼ │ │ 2. 创建 Subscription(订阅关系) │ │ │ │ │ ▼ │ │ 3. request(n) - 请求数据(默认无限) │ │ │ │ │ ▼ │ │ 4. onNext() - 逐个收到元素 │ │ │ │ │ ├── 元素 A → 处理 → 继续请求 │ │ ├── 元素 B → 处理 → 继续请求 │ │ └── 元素 C → 处理 → 完成 or 继续 │ │ │ │ │ ▼ │ │ 5. onComplete() / onError() - 流结束 │ │ │ └─────────────────────────────────────────────────────────────────┘
|
3.1.2 代码层面理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public interface Subscriber<T> { void onSubscribe(Subscription s); void onNext(T t); void onError(Throwable t); void onComplete(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| Flux.just("A", "B", "C") .subscribe(new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { System.out.println("订阅成功!"); this.subscription = s; s.request(1); } @Override public void onNext(String s) { System.out.println("收到: " + s); subscription.request(1); } @Override public void onError(Throwable t) { System.out.println("错误: " + t.getMessage()); } @Override public void onComplete() { System.out.println("完成!"); } });
|
3.2 生命周期钩子
Reactor 提供了多个 doOn* 方法,可以在流的各个阶段插入自定义逻辑。
3.2.1 完整生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Flux.just("A", "B", "C") .doOnSubscribe(sub -> System.out.println("1. 订阅开始")) .doOnRequest(n -> System.out.println("2. 请求元素: " + n)) .doOnNext(s -> System.out.println("3. 收到元素: " + s)) .doOnComplete(() -> System.out.println("4. 完成")) .doOnError(e -> System.out.println("4. 错误: " + e.getMessage())) .doFinally(signal -> System.out.println("5. 最终清理: " + signal)) .subscribe();
|
3.2.2 生命周期顺序图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 元素 "A" 的完整生命周期: ┌─────────────────┐ │ doOnSubscribe │ ← 订阅时触发一次 └────────┬────────┘ │ ▼ ┌─────────────────┐ │ doOnRequest │ ← 每次 request 时触发 └────────┬────────┘ │ ▼ ┌─────────────────┐ │ doOnNext │ ← 收到每个元素时触发 └────────┬────────┘ │ ▼ ┌─────────────────┐ │ doOnComplete │ ← 正常完成时触发 └────────┬────────┘ │ ▼ ┌─────────────────┐ │ doFinally │ ← 无论成功/错误/取消都触发 └─────────────────┘
|
3.2.3 常用生命周期钩子
| 钩子方法 | 触发时机 | 典型用途 |
|---|
doOnSubscribe() | 订阅时 | 初始化、资源准备 |
doOnRequest() | 请求元���时 | 限流、监控 |
doOnNext() | 每个元素 | 日志、统计、收集 |
doOnComplete() | 正常完成 | 成功处理、资源释放 |
doOnError() | 错误时 | 错误日志、监控 |
doOnCancel() | 取消订阅 | 清理资源 |
doFinally() | 最终清理 | 无论如何都清理 |
3.3 操作符详解
3.3.1 map - 一对一转换
1 2 3 4 5 6
| Flux.just(1, 2, 3) .map(i -> i * 10) .subscribe(System.out::println);
|
3.3.2 flatMap - 一对多 / 异步转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
Flux.just(1, 2, 3) .map(i -> Flux.just(i * 10)) .subscribe(flux -> flux.subscribe(System.out::println));
Flux.just(1, 2, 3) .flatMap(i -> Flux.just(i * 10)) .subscribe(System.out::println);
public Mono<User> getUserWithOrders(Long userId) { Mono<User> userMono = userRepository.findById(userId); return userMono.flatMap(user -> orderRepository.findByUserId(user.getId()) .collectList() .map(orders -> { user.setOrders(orders); return user; }) ); }
|
3.3.3 filter - 过滤
1 2 3 4 5
| Flux.just(1, 2, 3, 4, 5) .filter(i -> i > 3) .subscribe(System.out::println);
|
3.3.4 take - 取前 N 个
1 2 3 4 5 6 7 8 9 10 11
| Flux.just(1, 2, 3, 4, 5) .take(3) .subscribe(System.out::println);
.take(3) .takeLast(3) .takeUntil(i -> i >= 3) .takeWhile(i -> i < 3)
|
3.3.5 distinct - 去重
1 2 3 4 5
| Flux.just(1, 2, 1, 3, 2, 4) .distinct() .subscribe(System.out::println);
|
3.4 组合操作符
3.4.1 merge - 合并(交错)
1 2 3 4 5 6 7 8 9 10
| Flux.merge( Flux.just(1, 2), Flux.just(3, 4) ).subscribe(System.out::println);
.mergeWith(otherFlux) .mergeSequential(f1, f2, f3)
|
3.4.2 zip - 按索引组合
1 2 3 4 5 6 7
| Flux.zip( Flux.just("A", "B", "C"), Flux.just(1, 2, 3), (letter, number) -> letter + number ).subscribe(System.out::println);
|
3.4.3 concat - 按顺序拼接
1 2 3 4 5 6 7 8 9 10
| Flux.concat( Flux.just(1, 2), Flux.just(3, 4) ).subscribe(System.out::println);
|
3.4.4 flatMap vs concatMap vs switchMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Flux.just(1, 2, 3) .flatMap(i -> Mono.just(i * 10).delayElement(Duration.ofMillis(100))) .subscribe(System.out::println);
Flux.just(1, 2, 3) .concatMap(i -> Mono.just(i * 10).delayElement(Duration.ofMillis(100))) .subscribe(System.out::println);
Flux.just(1, 2, 3) .switchMap(i -> Mono.just(i * 10).delayElement(Duration.ofMillis(100))) .subscribe(System.out::println);
|
选择指南:
| 操作符 | 并行 | 顺序 | 取消旧的 | 适用场景 |
|---|
flatMap | ✅ | ❌ | ❌ | 独立异步任务 |
concatMap | ✅ | ✅ | ❌ | 需要保序的异步任务 |
switchMap | ✅ | ✅ | ✅ | 搜索/轮询(只关心最新) |
3.5 资源管理:using 和 defer
3.5.1 Flux.using - 自动资源管理
1 2 3 4 5 6 7 8 9
| Flux.using( () -> new BufferedReader(new FileReader("data.txt")), reader -> Flux.fromStream(reader.lines()), reader -> { try { reader.close(); } catch (IOException e) { } } ).subscribe(System.out::println);
|
3.5.2 doOnDispose - 订阅取消时清理
1 2 3 4 5 6
| Flux.interval(Duration.ofSeconds(1)) .doOnDispose(() -> System.out.println("资源已清理")) .take(5) .subscribe();
|
3.6 实战:用户查询流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public Mono<UserOrderVO> getUserWithOrders(Long userId) { return userRepository.findById(userId) .flatMap(user -> Mono.zip( orderRepository.findRecentByUserId(userId, 5).collectList(), addressRepository.findDefaultByUserId(userId) ).map(tuple -> { UserOrderVO vo = new UserOrderVO(); vo.setUser(user); vo.setOrders(tuple.getT1()); vo.setAddress(tuple.getT2()); return vo; }) ) .doOnSuccess(vo -> log.info("查询成功: {}", vo.getUser().getName())) .doOnError(e -> log.error("查询失败", e)) .onErrorResume(e -> Mono.empty()); }
|
3.7 本章小结
- 订阅流程:subscribe → request → onNext → onComplete/onError
- 生命周期钩子:doOnSubscribe/doOnNext/doOnComplete/doFinally
- 转换操作符:map(同步)、flatMap(异步)
- 过滤操作符:filter、take、distinct
- 组合操作符:merge(交错)、zip(按索引)、concat(顺序)
- 重要区别:flatMap(并行无序)vs concatMap(顺序执行)vs switchMap(只保留最新)
- 资源管理:Flux.using()、doOnDispose() 确保资源释放
3.8 练习题
- 使用 doOnNext 统计 Flux 中偶数的数量
- 使用 flatMap 将 1-5 每个数字转换为包含其平方的 Flux
- 比较 merge 和 concat 的输出顺序差异
- 使用 switchMap 实现搜索防抖:只处理最后一次输入,取消之前的请求
下一章:我们将学习线程调度 Schedulers,这是实现非阻塞的关键。