第一章:初识响应式编程
1.1 什么是响应式编程?
响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。
传统命令式 vs 响应式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
public class SyncExample { public String chat(String question) { String result = llmClient.call(question); return result; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public class ReactiveExample { public Flux<String> chat(String question) { return Flux.create(sink -> { llmClient.callAsync(question, result -> { sink.next(result); sink.complete(); }); }); } }
|
对比总结
| 特性 | 传统同步 | 响应式 |
|---|
| 线程模型 | 一请求一线程 | 线程复用 |
| 阻塞等待 | 阻塞 | 非阻塞 |
| 资源消耗 | 高 | 低 |
| 并发能力 | 有限 | 强 |
| 适用场景 | CPU 密集型 | I/O 密集型 |
1.2 为什么需要响应式编程?
1.2.1 现代应用的挑战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| ┌─────────────────────────────────────────────────────────────────┐ │ 高并发互联网应用 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ 用户 A │ │ 用户 B │ │ 用户 C │ │ 用户 D │ ... │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ │ │ │ └───────────┴───────────┴───────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ 并发请求数可能 │ │ │ │ 上万甚至更多 │ │ │ └─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ 这些请求大部分时间在等待: │ │ │ │ • 数据库查询 │ │ │ │ • 外部 API 调用 │ │ │ │ • 文件读写 │ │ │ │ • LLM 生成回答 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
|
1.2.2 I/O 操作的等待时间
| 操作 | 等待时间 |
|---|
| 内存访问 | 纳秒级 |
| SSD 读取 | 微秒级 |
| 网络请求(本地) | 毫秒级 |
| 网络请求(远程) | 10-100 毫秒 |
| 数据库查询 | 1-100 毫秒 |
| LLM API 调用 | 1-30 秒 💀 |
关键洞察:在等待 I/O 的过程中,线程完全空闲!
1.2.3 响应式编程的价值
1 2 3 4 5 6 7 8 9 10
| 传统方式:10000 并发 = 10000 线程 = 大量内存 + 频繁上下文切换 ┌───┬───┬───┬───┬───┐ │T1 │T2 │T3 │T4 │T5 │ ... 9995 more threads └───┴───┴───┴───┴───┘
响应式:10000 并发 = 少量线程(复用) ┌─────────────────────┐ │ 少量工作线程 │ │ (复用处理请求) │ └─────────────────────┘
|
1.3 响应式编程解决的问题
问题 1:线程资源浪费
1 2 3 4 5
| public List<Order> getOrders() { return jdbcTemplate.query("SELECT * FROM orders", ...); }
|
1 2 3 4 5
| public Flux<Order> getOrders() { return reactiveTemplate.query("SELECT * FROM orders", ...); }
|
问题 2:级联等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public OrderDetail getOrderDetail(Long orderId) { Order order = orderDao.findById(orderId); User user = userDao.findById(order.getUserId()); List<Item> items = itemDao.findByOrderId(orderId); return new OrderDetail(order, user, items); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public Mono<OrderDetail> getOrderDetail(Long orderId) { Mono<Order> orderMono = orderDao.findById(orderId); Mono<User> userMono = userMono.flatMap(u -> userDao.findById(u.getId())); Mono<List<Item>> itemsMono = itemDao.findByOrderId(orderId); return Mono.zip(orderMono, userMono, itemsMono) .map(tuple -> new OrderDetail( tuple.getT1(), tuple.getT2(), tuple.getT3() )); }
|
问题 3:流式数据处理
1 2 3 4 5
| public List<String> readLargeFile(String path) { return Files.readAllLines(Paths.get(path)); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public Flux<String> readLargeFile(String path) { return Flux.create(sink -> { try (BufferedReader reader = new BufferedReader( new FileReader(path))) { String line; while ((line = reader.readLine()) != null) { sink.next(line); } sink.complete(); } catch (IOException e) { sink.error(e); } }); }
|
1.4 Spring 生态中的响应式编程
1.4.1 Spring WebFlux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RestController public class SyncController { @GetMapping("/users") public List<User> getUsers() { return userService.findAll(); } }
@RestController public class ReactiveController { @GetMapping("/users") public Flux<User> getUsers() { return userService.findAll(); } }
|
1.4.2 Spring Data 响应式
1 2 3 4 5 6 7 8 9
| public interface UserRepository extends JpaRepository<User, Long> { List<User> findByName(String name); }
public interface UserRepository extends ReactiveCrudRepository<User, Long> { Flux<User> findByName(String name); }
|
1.4.3 Spring AI 中的响应式
1 2 3 4 5 6 7 8 9 10
| @RestController public class ChatController { @GetMapping(value = "/chat", produces = "text/event-stream") public Flux<String> chat(@RequestParam String query) { return chatModel.stream(prompt); } }
|
1.5 Reactor 简介
Reactor 是 Spring 5 引入的响应式编程库,也是 Spring WebFlux 的底层实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ┌─────────────────────────────────────────────────────────────┐ │ Reactor 在 Spring 生态中的位置 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Spring Application │ │ │ ├─────────────────────────────────────────────────────┤ │ │ │ Spring WebFlux │ Spring Data │ Spring AI │ │ │ │ │ │ R2DBC │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ ▼ │ ▼ │ │ │ │ ┌─────────┐ │ ┌─────────┐ │ ┌─────────┐ │ │ │ │ │Reactor │◄──────┼──│Reactor │◄───┼──│ Reactor │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─────────┘ │ └─────────┘ │ └─────────┘ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ Netty / Servlet 容器 │ │ └──────────────┘ │ │ │ └────────────────────────────────���────────────────────────────┘
|
核心类型
| 类型 | 含义 | 元素数量 |
|---|
Flux<T> | 异步序列 | 0 到 N |
Mono<T> | 异步单值 | 0 或 1 |
1.6 本章小结
- 响应式编程 = 非阻塞 + 异步 + 数据流
- 核心优势:线程复用、高并发、低资源消耗
- 适用场景:I/O 密集型(数据库、API、LLM 调用)
- 不适用:CPU 密集型计算
- Reactor 是 Spring 生态的响应式基础库
- Flux = 0-N 元素流,Mono = 0-1 单值
1.7 思考题
- 为什么说”LLM API 调用”特别适合响应式编程?
- 如果你的应用是 CPU 密集型的计算,响应式编程还有优势吗?
- 响应式编程能否完全替代传统的同步代码?
下一章:我们将深入学习 Flux 和 Mono 的创建方式与基本操作。