第二章:Flux 与 Mono 基础 2.1 什么是 Flux 和 Mono? Reactor 中有两个核心类型:
类型 元素数量 读作 典型场景 Flux<T>0 到 N 个 “Flux” 列表、流式数据、SSE Mono<T>0 或 1 个 “Mono” HTTP 响应、数据库单条记录
1 2 3 4 5 6 7 Flux<String> flux = Flux.just("A" , "B" , "C" ); Mono<String> mono = Mono.just("Hello" );
2.2 创建 Flux 的多种方式 2.2.1 从固定值创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Flux<String> f1 = Flux.just("A" , "B" , "C" ); String[] arr = {"A" , "B" , "C" }; Flux<String> f2 = Flux.fromArray(arr); List<String> list = Arrays.asList("A" , "B" , "C" ); Flux<String> f3 = Flux.fromIterable(list); Stream<String> stream = Stream.of("A" , "B" , "C" ); Flux<String> f4 = Flux.fromStream(stream);
2.2.2 从 0 个元素创建 1 2 3 4 5 Flux<String> empty = Flux.empty(); Flux<String> error = Flux.error(new RuntimeException ("出错了" ));
2.2.3 动态生成 1 2 3 4 5 6 7 8 9 10 11 12 13 Flux<String> generated = Flux.generate( () -> 0 , (state, sink) -> { if (state >= 3 ) { sink.complete(); } else { sink.next("Value-" + state); } return state + 1 ; } );
Flux.generate vs Flux.create 区别 :
特性 Flux.generateFlux.create线程模型 单线程(顺序调用) 多线程(可并发推送) 状态管理 内置状态机 需自行管理 背压处理 内置 request(n) 需手动处理 典型场景 同步数据源 异步/多线程数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Flux<String> created = Flux.create(sink -> { sink.next("Hello" ); sink.next("World" ); sink.complete(); }); Flux<Long> timer = Flux.interval(Duration.ofSeconds(1 )); Flux<Long> deferred = Flux.defer(() -> { System.out.println("创建 Flux" ); return Flux.just(System.currentTimeMillis()); });
2.2.4 热流与冷流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Flux<Integer> cold = Flux.range(1 , 5 ); cold.subscribe(i -> System.out.println("订阅者1: " + i)); cold.subscribe(i -> System.out.println("订阅者2: " + i)); Sinks.Many<Integer> hotSink = Sinks.many().multicast().onBackpressureBuffer(); Flux<Integer> hot = hotSink.asFlux(); hot.subscribe(i -> System.out.println("订阅者1: " + i)); hotSink.tryEmitNext(1 ); hotSink.tryEmitNext(2 ); hot.subscribe(i -> System.out.println("订阅者2: " + i)); hotSink.tryEmitNext(3 );
2.2.4 从其他类型转换 1 2 3 4 5 6 7 8 9 10 Mono<String> mono = Mono.just("Hello" ); Flux<String> fromMono = mono.flux(); Flux<String> flux = Flux.just("A" , "B" , "C" ); Mono<String> first = flux.next(); Mono<List<String>> list = flux.collectList();
2.3 创建 Mono 的多种方式 2.3.1 从值创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Mono<String> m1 = Mono.just("Hello" ); Mono<String> m2 = Mono.fromCallable(() -> { return computeExpensiveValue(); }); Mono<String> m3 = Mono.fromSupplier(() -> "World" ); Mono<Void> m4 = Mono.fromRunnable(() -> { System.out.println("执行一些操作" ); }); Mono<Long> m5 = Mono.delay(Duration.ofSeconds(2 ));
2.3.2 创建空或错误的 Mono 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Mono<String> empty = Mono.empty(); Mono<String> error = Mono.error(new RuntimeException ("错误" )); Mono<String> result = Mono.empty() .switchIfEmpty(Mono.just("默认值" )) .subscribe(); Mono<Long> deferred = Mono.defer(() -> { System.out.println("创建 Mono" ); return Mono.just(System.currentTimeMillis()); }); Mono<String> name = Mono.just("Alice" ); Mono<Integer> age = Mono.just(25 ); Mono.zip(name, age, (n, a) -> n + " is " + a + " years old" ) .subscribe(System.out::println);
2.4 订阅:让流开始流动 创建 Flux/Mono 后,数据不会自动流动,需要订阅 才会执行。
2.4.1 最简单的订阅 1 2 Flux.just("A" , "B" , "C" ) .subscribe();
2.4.2 带回调的订阅 1 2 3 4 5 6 7 8 9 10 11 12 Flux.just("A" , "B" , "C" ) .subscribe( element -> System.out.println("收到: " + element), error -> System.out.println("错误: " + error), () -> System.out.println("完成" ) );
2.4.3 订阅流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ┌─────────────────────────────────────────────────────────────┐ │ 订阅流程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 创建 Flux/Mono │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ subscribe() 订阅 │ │ │ └──────────────────────┬──────────────────────────────┘ │ │ │ │ │ ┌────────────────┼────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ onNext() │ │ onError()│ │ onComplete│ │ │ │ 每个元素 │ │ 发生错误 │ │ 完成 │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
2.4.4 不同订阅方式对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 flux.subscribe( element -> process(element), error -> handleError(error), () -> cleanup() ); flux.subscribe(element -> System.out.println(element)); flux.subscribe(new Consumer <String>() { @Override public void accept (String s) { System.out.println(s); } }); Disposable disposable = flux.subscribe();disposable.dispose();
2.5 错误处理基础 2.5.1 onErrorReturn - 错误时返回默认值 1 2 3 4 5 6 7 8 9 10 11 12 Flux.just(1 , 2 , 3 ) .map(i -> { if (i == 2 ) throw new RuntimeException ("错误" ); return i * 10 ; }) .onErrorReturn(0 ) .subscribe( System.out::println, System.out::println );
2.5.2 onErrorResume - 错误时切换到另一个流 1 2 3 4 5 6 7 8 9 Flux.just(1 , 2 , 3 ) .map(i -> { if (i == 2 ) throw new RuntimeException ("错误" ); return i * 10 ; }) .onErrorResume(e -> Flux.just(100 , 200 , 300 )) .subscribe(System.out::println);
2.6 第一个完整示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;public class FirstReactorExample { public static void main (String[] args) { System.out.println("=== 示例1: Flux ===" ); Flux.just("Apple" , "Banana" , "Orange" ) .subscribe( fruit -> System.out.println("水果: " + fruit), error -> System.out.println("错误: " + error), () -> System.out.println("完成!" ) ); System.out.println("\n=== 示例2: Mono ===" ); Mono.just("Hello Reactor" ) .subscribe( msg -> System.out.println("消息: " + msg), error -> System.out.println("错误: " + error), () -> System.out.println("完成!" ) ); System.out.println("\n=== 示例3: 错误处理 ===" ); Flux.error(new RuntimeException ("出错了" )) .onErrorReturn("默认值" ) .subscribe( System.out::println, System.out::println ); System.out.println("\n=== 示例4: map 转换 ===" ); Flux.just(1 , 2 , 3 ) .map(i -> i * 10 ) .map(i -> "数字: " + i) .subscribe(System.out::println); } }
运行结果 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 === 示例1: Flux === 水果: Apple 水果: Banana 水果: Orange 完成! === 示例2: Mono === 消息: Hello Reactor 完成! === 示例3: 错误处理 === 默认值 === 示例4: map 转换 === 数字: 10 数字: 20 数字: 30
2.7 常用操作符速查 操作符 作用 示例 map一对一转换 Flux.just(1,2,3).map(i->i*10)flatMap一对多/异步转换 Flux.just(1,2).flatMap(i->Mono.just(i*10))filter过滤 Flux.just(1,2,3).filter(i->i>1)take取前 N 个 Flux.just(1,2,3,4,5).take(3)distinct去重 Flux.just(1,2,1,3).distinct()merge合并 Flux.merge(Flux.just(1,2), Flux.just(3,4))zip按索引组合 Flux.zip(Flux.just("A","B"), Flux.just(1,2))onErrorReturn错误默认值 flux.onErrorReturn("default")onErrorResume错误切换流 flux.onErrorResume(e->Flux.empty())
2.8 本章小结 Flux = 0-N 元素流,Mono = 0-1 单值创建方式:just(), fromIterable(), empty(), error(), generate(), interval() 订阅才执行:subscribe(onNext, onError, onComplete) 错误处理:onErrorReturn(), onErrorResume() 常用操作符:map, flatMap, filter, take, merge, zip 2.9 练习题 创建一个发出数字 1-10 的 Flux,过滤出偶数并乘以 2 创建一个 Mono,延迟 3 秒后发出 “Done” 模拟一个错误流,体验 onErrorReturn 和 onErrorResume 的区别 下一章 :我们将学习 Flux 的订阅流程和生命周期,这是理解响应式编程的关键。