总览:Reactor 响应式编程教程目录
Reactor 响应式编程教程 - 目录
本教程共 8 章,采用循序渐进的方式,从基础概念到项目实战。
学习路径
1 | 第一章 → 第二章 → 第三章 → 第四章 → 第五章 → 第六章 → 第七章 → 第八章 |
章节简介
📚 第一章:初识响应式编程
- 为什么需要响应式编程
- 传统同步 vs 响应式
- Reactor 简介
- 适合场景分析
目标:理解”为什么”,建立背景认知
📚 第二章:Flux 与 Mono 基础
- Flux 和 Mono 的区别
- 创建 Flux/Mono 的多种方式
- 订阅机制
- 常用操作符(map、flatMap、filter、merge、zip)
目标:掌握”是什么”,学会创建和订阅流
📚 第三章:订阅流程与生命周期
- subscribe 完整流程
- 生命周期钩子(doOnSubscribe、doOnNext、doOnComplete、doFinally)
- 操作符详解(map、flatMap、filter、take、distinct)
- 组合操作符(merge、zip、concat)
目标:理解流的执行过程,学会使用操作符
📚 第四章:线程调度 Schedulers
- 为什么需要线程调度
- Schedulers 类型(immediate、single、parallel、boundedElastic)
- subscribeOn vs publishOn 区别
- 项目中的实际使用
目标:掌握线程控制,理解非阻塞原理
📚 第五章:Sinks.Many - 手动控制数据流
- Sinks 是什么
- Sinks 类型(unicast、multicast、replay)
- 核心方法(tryEmitNext、tryEmitComplete、tryEmitError)
- 项目实战:模拟 LLM 流式响应
目标:学会手动控制数据流,实现 SSE
📚 第六章:错误处理与资源清理
- 为什么错误处理更重要
- 错误处理操作符(onErrorReturn、onErrorResume、retry)
- doFinally 资源清理
- Disposable 取消订阅
- 项目实战:任务中断流程
目标:掌握错误处理和资源管理
📚 第七章:背压处理
- 什么是背压
- 背压策略(buffer、drop、latest、error)
- Sinks 中的背压处理
- request 手动控制请求量
目标:理解快慢匹配机制
📚 第八章:Spring 集成与项目实战
- Spring WebFlux 基础
- 项目实战:Dodo-Agent 完整流程分析
- SSE 流式响应实现
- 任务管理(中断与取消)
- 错误处理最佳实践
目标:将所有知识应用到实际项目
快速查阅
需要某个具体功能时
| 功能 | 章节 |
|---|---|
| 创建 Flux/Mono | 第二章 |
| 热流与冷流 | 第二章 |
| 转换数据(map/flatMap) | 第三章 |
| 顺序控制(concatMap/switchMap) | 第三章 |
| 资源管理(using/defer) | 第三章 |
| 控制线程 | 第四章 |
| 调试定位(checkpoint) | 第四章 |
| 手动推送数据 | 第五章 |
| 单值 Sinks(Sinks.One) | 第五章 |
| 处理错误 | 第六章 |
| 错误转换(onErrorMap) | 第六章 |
| 超时处理(timeout) | 第六章 |
| 处理积压 | 第七章 |
| 限制请求量(limitRequest) | 第七章 |
| 实际项目应用 | 第八章 |
| WebClient HTTP 客户端 | 第八章 |
| 响应式测试(StepVerifier) | 第八章 |
常见错误解决
| 问题 | 解决方案 |
|---|---|
| 线程阻塞 | 使用 boundedElastic 调度器 |
| 内存爆炸 | 使用背压策略或 limitRequest |
| 错误未处理 | 使用 onErrorReturn/onErrorResume |
| 资源未释放 | 使用 doFinally/using |
| 异常堆栈难定位 | 使用 checkpoint() |
| 调试响应式链 | 使用 StepVerifier |
Reactor 操作符速查表
创建操作符
| 操作符 | 说明 | 示例 |
|---|---|---|
Flux.just() | 从值创建 | Flux.just("A", "B") |
Flux.range() | 数字范围 | Flux.range(1, 5) |
Flux.interval() | 定时生成 | Flux.interval(Duration.ofSeconds(1)) |
Flux.defer() | 延迟创建 | Flux.defer(() -> Flux.just(now())) |
Mono.just() | 单值 | Mono.just("Hello") |
Mono.delay() | 延迟 | Mono.delay(Duration.ofSeconds(1)) |
Mono.fromCallable() | 惰性执行 | Mono.fromCallable(() -> db.query()) |
转换操作符
| 操作符 | 说明 | 示例 |
|---|---|---|
map | 一对一转换 | flux.map(i -> i * 10) |
flatMap | 一对多/异步(并行) | flux.flatMap(i -> service.call(i)) |
concatMap | 一对多/异步(顺序) | flux.concatMap(i -> service.call(i)) |
switchMap | 只保留最新 | flux.switchMap(i -> search(i)) |
filter | 过滤 | flux.filter(i -> i > 3) |
distinct | 去重 | flux.distinct() |
take | 取前 N 个 | flux.take(3) |
组合操作符
| 操作符 | 说明 | 示例 |
|---|---|---|
merge | 并行合并 | Flux.merge(f1, f2) |
concat | 顺序拼接 | Flux.concat(f1, f2) |
zip | 按索引组合 | Flux.zip(f1, f2, combiner) |
Mono.zip | 组合多个 Mono | Mono.zip(m1, m2, m3) |
错误处理操作符
| 操作符 | 说明 | 示例 |
|---|---|---|
onErrorReturn | 错误返回默认值 | flux.onErrorReturn("default") |
onErrorResume | 错误切换流 | flux.onErrorResume(e -> fallback) |
onErrorMap | 错误转换 | flux.onErrorMap(e -> new BizException(e)) |
timeout | 超时处理 | mono.timeout(Duration.ofSeconds(5)) |
retry | 重试 | flux.retry(3) |
retryWhen | 智能重试 | flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) |
调度器选择
| 场景 | 调度器 | 说明 |
|---|---|---|
| I/O 阻塞操作 | boundedElastic | 数据库、网络、文件 |
| CPU 密集计算 | parallel | 计算、加密、压缩 |
| 顺序执行 | single | 单线程任务 |
| 不切换线程 | immediate | 当前线程执行 |
Sinks 类型
| 类型 | 消费者 | 行为 | 场景 |
|---|---|---|---|
unicast | 1 个 | 缓冲背压 | SSE 流式响应 |
multicast | 多个 | 实时推送 | 广播消息 |
replay | 多个 | 重放历史 | 事件回放 |
Sinks.One | 1 个 | 单值 | Mono 场景 |
推荐学习方式
方式一:系统学习(推荐)
按顺序学习 1-8 章,每章配合代码练习
方式二:快速入门
- 第一章(理解概念)
- 第二章(掌握基础)
- 第五章(Sinks 核心)
- 第八章(项目实战)
方式三:问题驱动
遇到具体问题时,查阅相关章节
相关资源
练习答案
详见各章节练习题解答(待补充)
Reactor 面试高频问题
Flux 和 Mono 的区别是什么?
- Flux = 0-N 元素流,Mono = 0-1 单值
subscribeOn 和 publishOn 的区别?
- subscribeOn 影响数据源(只生效一次),publishOn 影响下游(可多次调用)
flatMap 和 concatMap 的区别?
- flatMap 并行执行不保证顺序,concatMap 顺序执行保证顺序
什么是背压?如何处理?
- 生产者速度 > 消费者速度时的机制。策略:buffer/drop/latest/error
为什么响应式编程能提高并发?
- 非阻塞 + 线程复用,少量线程处理大量并发请求
Sinks.Many 的作用是什么?
- 手动推送数据到 Flux,实现 SSE 流式响应
如何测试响应式代码?
- 使用 StepVerifier 验证 Flux/Mono 的输出
持续更新中… 🚀
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 Henry's Blog!






