Reactor 响应式编程教程 - 目录

本教程共 8 章,采用循序渐进的方式,从基础概念到项目实战。


学习路径

1
2
3
4
5
第一章 →    第二章 →    第三章 →    第四章 →  第五章 → 第六章 → 第七章 → 第八章
│ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼
入门 基础 订阅流程 线程调度 Sinks 错误处理 项目实战
(Why?) (What?) (How?) (Where?) (Control) (Safety) (Real)

章节简介

📚 第一章:初识响应式编程

  • 为什么需要响应式编程
  • 传统同步 vs 响应式
  • Reactor 简介
  • 适合场景分析

目标:理解”为什么”,建立背景认知


📚 第二章:Flux 与 Mono 基础

  • Flux 和 Mono 的区别
  • 创建 Flux/Mono 的多种方式
  • 订阅机制
  • 常用操作符(map、flatMap、filter、merge、zip)

目标:掌握”是什么”,学会创建和订阅流


📚 第三章:订阅流程与生命周期

  • subscribe 完整流程
  • 生命周期钩子(doOnSubscribe、doOnNext、doOnComplete、doFinally)
  • 操作符详解(map、flatMap、filter、take、distinct)
  • 组合操作符(merge、zip、concat)

目标:理解流的执行过程,学会使用操作符


📚 第四章:线程调度 Schedulers

  • 为什么需要线程调度
  • Schedulers 类型(immediate、single、parallel、boundedElastic)
  • subscribeOn vs publishOn 区别
  • 项目中的实际使用

目标:掌握线程控制,理解非阻塞原理


📚 第五章:Sinks.Many - 手动控制数据流

  • Sinks 是什么
  • Sinks 类型(unicast、multicast、replay)
  • 核心方法(tryEmitNext、tryEmitComplete、tryEmitError)
  • 项目实战:模拟 LLM 流式响应

目标:学会手动控制数据流,实现 SSE


📚 第六章:错误处理与资源清理

  • 为什么错误处理更重要
  • 错误处理操作符(onErrorReturn、onErrorResume、retry)
  • doFinally 资源清理
  • Disposable 取消订阅
  • 项目实战:任务中断流程

目标:掌握错误处理和资源管理


📚 第七章:背压处理

  • 什么是背压
  • 背压策略(buffer、drop、latest、error)
  • Sinks 中的背压处理
  • request 手动控制请求量

目标:理解快慢匹配机制


📚 第八章:Spring 集成与项目实战

  • Spring WebFlux 基础
  • 项目实战:Dodo-Agent 完整流程分析
  • SSE 流式响应实现
  • 任务管理(中断与取消)
  • 错误处理最佳实践

目标:将所有知识应用到实际项目


快速查阅

需要某个具体功能时

功能章节
创建 Flux/Mono第二章
热流与冷流第二章
转换数据(map/flatMap)第三章
顺序控制(concatMap/switchMap)第三章
资源管理(using/defer)第三章
控制线程第四章
调试定位(checkpoint)第四章
手动推送数据第五章
单值 Sinks(Sinks.One)第五章
处理错误第六章
错误转换(onErrorMap)第六章
超时处理(timeout)第六章
处理积压第七章
限制请求量(limitRequest)第七章
实际项目应用第八章
WebClient HTTP 客户端第八章
响应式测试(StepVerifier)第八章

常见错误解决

问题解决方案
线程阻塞使用 boundedElastic 调度器
内存爆炸使用背压策略或 limitRequest
错误未处理使用 onErrorReturn/onErrorResume
资源未释放使用 doFinally/using
异常堆栈难定位使用 checkpoint()
调试响应式链使用 StepVerifier

Reactor 操作符速查表

创建操作符

操作符说明示例
Flux.just()从值创建Flux.just("A", "B")
Flux.range()数字范围Flux.range(1, 5)
Flux.interval()定时生成Flux.interval(Duration.ofSeconds(1))
Flux.defer()延迟创建Flux.defer(() -> Flux.just(now()))
Mono.just()单值Mono.just("Hello")
Mono.delay()延迟Mono.delay(Duration.ofSeconds(1))
Mono.fromCallable()惰性执行Mono.fromCallable(() -> db.query())

转换操作符

操作符说明示例
map一对一转换flux.map(i -> i * 10)
flatMap一对多/异步(并行)flux.flatMap(i -> service.call(i))
concatMap一对多/异步(顺序)flux.concatMap(i -> service.call(i))
switchMap只保留最新flux.switchMap(i -> search(i))
filter过滤flux.filter(i -> i > 3)
distinct去重flux.distinct()
take取前 N 个flux.take(3)

组合操作符

操作符说明示例
merge并行合并Flux.merge(f1, f2)
concat顺序拼接Flux.concat(f1, f2)
zip按索引组合Flux.zip(f1, f2, combiner)
Mono.zip组合多个 MonoMono.zip(m1, m2, m3)

错误处理操作符

操作符说明示例
onErrorReturn错误返回默认值flux.onErrorReturn("default")
onErrorResume错误切换流flux.onErrorResume(e -> fallback)
onErrorMap错误转换flux.onErrorMap(e -> new BizException(e))
timeout超时处理mono.timeout(Duration.ofSeconds(5))
retry重试flux.retry(3)
retryWhen智能重试flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))

调度器选择

场景调度器说明
I/O 阻塞操作boundedElastic数据库、网络、文件
CPU 密集计算parallel计算、加密、压缩
顺序执行single单线程任务
不切换线程immediate当前线程执行

Sinks 类型

类型消费者行为场景
unicast1 个缓冲背压SSE 流式响应
multicast多个实时推送广播消息
replay多个重放历史事件回放
Sinks.One1 个单值Mono 场景

推荐学习方式

方式一:系统学习(推荐)

按顺序学习 1-8 章,每章配合代码练习

方式二:快速入门

  1. 第一章(理解概念)
  2. 第二章(掌握基础)
  3. 第五章(Sinks 核心)
  4. 第八章(项目实战)

方式三:问题驱动

遇到具体问题时,查阅相关章节


相关资源


练习答案

详见各章节练习题解答(待补充)


Reactor 面试高频问题

  1. Flux 和 Mono 的区别是什么?

    • Flux = 0-N 元素流,Mono = 0-1 单值
  2. subscribeOn 和 publishOn 的区别?

    • subscribeOn 影响数据源(只生效一次),publishOn 影响下游(可多次调用)
  3. flatMap 和 concatMap 的区别?

    • flatMap 并行执行不保证顺序,concatMap 顺序执行保证顺序
  4. 什么是背压?如何处理?

    • 生产者速度 > 消费者速度时的机制。策略:buffer/drop/latest/error
  5. 为什么响应式编程能提高并发?

    • 非阻塞 + 线程复用,少量线程处理大量并发请求
  6. Sinks.Many 的作用是什么?

    • 手动推送数据到 Flux,实现 SSE 流式响应
  7. 如何测试响应式代码?

    • 使用 StepVerifier 验证 Flux/Mono 的输出

持续更新中… 🚀