第五章:Sinks.Many - 手动控制数据流
5.1 什么是 Sinks?
前面我们学习的 Flux/Mono 是声明式的:定义数据流和处理逻辑,由 Reactor 决定何时发出元素。
但在真实项目中,我们经常需要:
- 手动推送数据(比如 LLM 逐字返回时)
- 动态控制流的开始和结束
- 推送方和订阅方是分离的
这就需要 Sinks!
Sinks vs Flux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Flux.just("A", "B", "C") .subscribe();
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux().subscribe();
sink.tryEmitNext("A"); sink.tryEmitNext("B"); sink.tryEmitNext("C"); sink.tryEmitComplete();
|
5.2 Sinks 类型
Reactor 提供 4 种 Sinks 类型:
| 类型 | 消费者数量 | 行为 |
|---|
unicast | 1 个 | 单消费者,缓冲背压 |
multicast | 多个 | 多消费者,实时推送 |
replay | 多个 | 多消费者,重放所有历史 |
publish | 多个 | 多消费者,只接收新数据 |
5.2.1 Unicast - 单消费者(最常用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Sinks.Many<String> sink = Sinks.many() .unicast() .onBackpressureBuffer();
sink.asFlux().subscribe(s -> System.out.println("收到: " + s));
sink.tryEmitNext("第一块"); sink.tryEmitNext("第二块"); sink.tryEmitNext("第三块"); sink.tryEmitComplete();
|
项目中典型用法:SSE 流式响应(每个会话一个消费者)
5.2.2 Multicast - 多消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Sinks.Many<String> sink = Sinks.many() .multicast() .onBackpressureBuffer();
sink.asFlux().subscribe(s -> System.out.println("订阅者1: " + s)); sink.asFlux().subscribe(s -> System.out.println("订阅者2: " + s));
sink.tryEmitNext("广播消息");
|
5.2.3 Replay - 重放历史
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Sinks.Many<String> sink = Sinks.many() .replay() .all();
sink.tryEmitNext("A"); sink.tryEmitNext("B"); sink.tryEmitNext("C");
sink.asFlux().subscribe(s -> System.out.println("新订阅者: " + s));
|
5.3 Sinks 的核心方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("data");
sink.tryEmitError(new RuntimeException("错误"));
sink.tryEmitComplete();
if (result == Sinks.EmitResult.OK) { } else if (result == Sinks.EmitResult.FAIL_OVERFLOW) { } else if (result == Sinks.EmitResult.FAIL_TERMINATED) { }
|
5.3.1 tryEmitNext vs emitNext
1 2 3 4 5
| Sinks.EmitResult result = sink.tryEmitNext("data");
sink.emitNext("data", FAIL_FAST);
|
5.3.2 Sinks.One - 单值 Sinks
1 2 3 4 5 6 7 8 9 10 11
| Sinks.One<String> oneSink = Sinks.one();
oneSink.asMono().subscribe(System.out::println);
oneSink.tryEmitValue("Hello");
oneSink.tryEmitError(new RuntimeException("错误"));
|
5.3.3 线程安全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureBuffer();
Sinks.Many<String> multicast = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<String> safe = Sinks.many() .unicast() .onBackpressureBuffer();
IntStream.range(0, 10).parallel().forEach(i -> { Sinks.EmitResult result = safe.tryEmitNext("item-" + i); if (result != Sinks.EmitResult.OK) { } });
|
5.4 实战:模拟 LLM 流式响应
这是 Dodo-Agent 项目的核心模式!
5.4.1 简单模拟
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class LlmStreamingExample { public static void main(String[] args) throws InterruptedException { Sinks.Many<String> sink = Sinks.many() .unicast() .onBackpressureBuffer(); sink.asFlux() .subscribe(chunk -> System.out.println("前端收到: " + chunk)); new Thread(() -> { String response = "Hello, 我是 AI 助手,很高兴为您服务!"; try { for (int i = 0; i < response.length(); i++) { String chunk = response.substring(i, i + 1); sink.tryEmitNext(chunk); Thread.sleep(50); } sink.tryEmitComplete(); } catch (Exception e) { sink.tryEmitError(e); } }).start(); Thread.sleep(3000); } }
|
运行结果:
1 2 3 4 5 6
| 前端收到: H 前端收到: e 前端收到: l 前端收到: l 前端收到: o ...
|
5.4.2 结合项目代码理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
public Flux<String> stream(String conversationId, String question) { Sinks.Many<String> sink = Sinks.many() .unicast() .onBackpressureBuffer(); registerTask(conversationId, sink); chatClient.prompt() .messages(messages) .stream() .chatResponse() .doOnNext(chunk -> { String text = chunk.getResult().getOutput().getText(); if (text != null) { sink.tryEmitNext(createTextResponse(text)); } }) .doOnComplete(() -> { sink.tryEmitNext(createReferenceResponse(referenceJson)); sink.tryEmitComplete(); }) .doOnError(err -> { sink.tryEmitError(err); }) .subscribe(); return sink.asFlux(); }
|
5.5 Sinks 完整生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux() .doOnSubscribe(s -> System.out.println("1. 订阅")) .doOnRequest(n -> System.out.println("2. 请求: " + n)) .doOnNext(s -> System.out.println("3. 收到: " + s)) .doOnComplete(() -> System.out.println("4. 完成")) .doOnError(e -> System.out.println("4. 错误")) .doFinally(s -> System.out.println("5. 最终: " + s)) .subscribe();
System.out.println("推送 A"); sink.tryEmitNext("A"); System.out.println("推送 B"); sink.tryEmitNext("B"); System.out.println("完成"); sink.tryEmitComplete();
|
输出:
1 2 3 4 5 6 7 8 9
| 1. 订阅 2. 请求: 9223372036854775807 推送 A 3. 收到: A 推送 B 3. 收到: B 完成 4. 完成 5. 最终: onComplete
|
5.6 项目中的 Sinks 使用场景
5.6.1 场景1:WebSearchReactAgent
1 2 3 4 5 6 7 8
| private final Map<String, TaskInfo> taskMap = new ConcurrentHashMap<>();
public TaskInfo registerTask(String conversationId, Sinks.Many<String> sink, String agentType) { taskMap.put(conversationId, new TaskInfo(sink, agentType)); return taskInfo; }
|
5.6.2 场景2:停止任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public boolean stopTask(String conversationId) { TaskInfo taskInfo = taskMap.get(conversationId); Disposable disposable = taskInfo.getDisposable(); if (disposable != null) { disposable.dispose(); } Sinks.Many<String> sink = taskInfo.getSink(); sink.tryEmitNext(createStopMessage()); sink.tryEmitComplete(); taskMap.remove(conversationId); return true; }
|
5.6.3 场景3:多种消息类型
1 2 3 4 5
| sink.tryEmitNext(createThinkingResponse("正在思考...")); sink.tryEmitNext(createTextResponse("这是回答")); sink.tryEmitNext(createReferenceResponse("[{\"title\":\"来源\"}]")); sink.tryEmitNext(createRecommendResponse("[\"推荐问题\"]"));
|
5.7 背压处理
当消费者处理速度慢于生产者推送速度时,需要背压处理。
5.7.1 onBackpressureBuffer
1 2 3 4 5 6 7 8 9
| Sinks.Many<String> sink = Sinks.many() .unicast() .onBackpressureBuffer();
Sinks.Many<String> sink2 = Sinks.many() .unicast() .onBackpressureBuffer(1000);
|
5.7.2 背压溢出处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Sinks.EmitResult result = sink.tryEmitNext("data");
switch (result) { case OK: break; case FAIL_OVERFLOW: break; case FAIL_NON_SERIALIZED: break; case FAIL_TERMINATED: break; }
|
5.8 本章小结
- Sinks.Many 允许手动推送数据到 Flux
- Unicast 最常用(单消费者,如 SSE)
- 核心方法:
tryEmitNext(), tryEmitComplete(), tryEmitError() - 项目中:Sinks.Many + Flux 实现 LLM 流式响应
- 背压处理:使用
onBackpressureBuffer() 缓冲
5.9 练习题
- 使用 Sinks.Many 实现一个简单的聊天消息推送功能
- 如果有多个消费者分别以不同速度处理数据,会发生什么?
- 如何实现”新订阅者只能收到新数据,不收历史数据”?
下一章:我们将学习错误处理和资源清理,这是生产环境必需的知识。