第七章:背压处理 - 让快慢匹配
7.1 什么是背压?
背压(Backpressure)是指消费者处理速度跟不上生产者生产速度时的处理机制。
1 2 3 4
| 生产者(LLM 逐字返回): ████ ████ ████ ████ ████ → 每秒 20 个字 消费者(前端渲染): ████ ████ → 每秒只能渲染 5 个字
问题:积压越来越多,内存爆炸 💥
|
7.1.1 真实场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| LLMClient client = new LLMClient();
Flux<String> tokenStream = client.streamChat("写一篇长文");
tokenStream.subscribe(token -> { renderToScreen(token); Thread.sleep(200); });
|
7.2 背压策略
Reactor 提供了 4 种背压策略:
| 策略 | 行为 | 适用场景 |
|---|
onBackpressureBuffer() | 缓冲到内存 | 允许延迟,内存足够 |
onBackpressureDrop() | 丢弃新数据 | 数据可以丢失 |
onBackpressureLatest() | 只保留最新 | 只关心最新状态 |
onBackpressureError() | 抛出异常 | 不允许积压 |
7.2.1 onBackpressureBuffer - 缓冲
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Flux.interval(Duration.ofMillis(10)) .onBackpressureBuffer() .subscribe(i -> { Thread.sleep(100); });
Flux.interval(Duration.ofMillis(10)) .onBackpressureBuffer(1000) .subscribe(i -> { Thread.sleep(100); });
.onBackpressureBuffer(1000, overflowStrategy -> OverflowStrategy.ERROR)
|
7.2.2 onBackpressureDrop - 丢弃
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Flux.interval(Duration.ofMillis(10)) .onBackpressureDrop(dropped -> System.out.println("丢弃: " + dropped)) .subscribe(i -> { Thread.sleep(100); });
|
使用场景:实时行情数据,只关心最新价格
1 2 3 4
| stockClient.priceStream("AAPL") .onBackpressureLatest() .subscribe(price -> updateUI(price));
|
7.2.3 onBackpressureLatest - 只保留最新
1 2 3 4 5 6 7 8 9 10 11 12
| Flux.interval(Duration.ofMillis(10)) .onBackpressureLatest() .subscribe(i -> { Thread.sleep(100); });
|
7.2.4 onBackpressureError - 抛异常
1 2 3 4 5 6 7 8 9 10 11
| Flux.interval(Duration.ofMillis(10)) .onBackpressureError() .subscribe(i -> { Thread.sleep(100); });
|
7.3 Sinks 中的背压
7.3.1 Unicast Sink 的背压
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Sinks.Many<String> sink = Sinks.many() .unicast() .onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("data");
switch (result) { case OK: break; case FAIL_OVERFLOW: break; case FAIL_NON_SERIALIZED: break; case FAIL_TERMINATED: break; }
|
7.3.2 设置缓冲区大小
1 2 3 4 5 6 7 8 9
| Sinks.Many<String> sink1 = Sinks.many() .unicast() .onBackpressureBuffer(10);
Sinks.Many<String> sink2 = Sinks.many() .unicast() .onBackpressureBuffer(10000);
|
7.4 request - 手动控制请求量
除了被动处理背压,还可以主动控制请求量。
7.4.1 按需请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .subscribe(new Subscriber<Integer>() { private Subscription subscription; private int count = 0; @Override public void onSubscribe(Subscription s) { this.subscription = s; subscription.request(2); } @Override public void onNext(Integer i) { count++; System.out.println("处理: " + i); if (count % 2 == 0) { subscription.request(2); } } @Override public void onError(Throwable t) { System.out.println("错误: " + t.getMessage()); } @Override public void onComplete() { System.out.println("完成"); } });
|
7.4.2 项目中的实际使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| chatClient.prompt() .messages(messages) .stream() .chatResponse() .subscribe(new Subscriber<ChatResponse>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { this.subscription = s; subscription.request(1); } @Override public void onNext(ChatResponse chunk) { processChunk(chunk); subscription.request(1); } });
|
7.4.3 limitRequest - 限制总请求量
1 2 3 4 5 6 7 8 9
| Flux.interval(Duration.ofMillis(10)) .limitRequest(100) .subscribe( i -> System.out.println(i), e -> System.out.println("错误: " + e), () -> System.out.println("完成") );
|
7.4.4 concatMap - 顺序背压处理
1 2 3 4 5 6 7 8 9 10 11
| Flux.range(1, 1000) .concatMap(i -> Mono.fromCallable(() -> { Thread.sleep(10); return i * 10; })) .subscribe(System.out::println);
|
7.5 使用场景与选择
7.5.1 场景1:LLM 流式输出
1 2 3 4 5 6 7 8 9 10
|
chatClient.stream(prompt) .onBackpressureBuffer(1000) .publishOn(Schedulers.boundedElastic()) .subscribe(chunk -> { renderToScreen(chunk); });
|
7.5.2 场景2:实时数据
1 2 3 4
| stockService.priceStream("AAPL") .onBackpressureLatest() .subscribe(price -> updateUI(price));
|
7.5.3 场景3:日志收集
1 2 3 4 5
| logger.logStream() .onBackpressureDrop(dropped -> metrics.increment("droppedLogs")) .subscribe(log -> writeToFile(log));
|
7.5.4 场景4:严格顺序
1 2 3 4 5
| orderProcessor.orderStream() .onBackpressureError() .concatMap(order -> processOrder(order)) .subscribe();
|
7.6 背压监控与调优
7.6.1 监控指标
1 2 3 4 5 6 7 8 9
| MetricsFlux.MeterIdMeterIdMeter registry = new MicrometerMeterRegistry();
Flux.interval(Duration.ofMillis(10)) .name("llm.stream") .tag("type", "tokens") .register(registry) .onBackpressureBuffer(1000) .subscribe();
|
7.6.2 调优策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| .onBackpressureBuffer(5000)
.onBackpressureLatest()
.parallel() .runOn(Schedulers.parallel())
.onBackpressureBuffer(100, overflow -> { producer.reduceRate(); })
|
7.7 本章小结
- 背压:消费者速度 < 生产者速度时的处理机制
- Buffer:缓冲到内存(默认 256)
- Drop:丢弃新数据
- Latest:只保留最新
- Error:抛出异常
- request():主动控制请求量
- 选择策略:根据业务场景选择合适的背压策略
7.8 练习题
- 设计一个场景:LLM 每秒返回 50 个 token,前端每秒只能处理 10 个,应该选择哪种背压策略?
- 使用 onBackpressureLatest 实现一个只显示最新股价的组件
- 分析项目中 Sinks.Many 的背压处理方式
下一章:我们将学习响应式编程在 Spring 中的实际应用,包括 WebFlux 和 Spring AI。