15 — SSE 流式通信

本章讲解 AI 「打字机效果」是怎么实现的。
这是项目中最技术性的部分。


1. 什么是 SSE?

Server-Sent Events = 服务器单向推送数据给浏览器的协议。

对比其他技术

方式方向用途
HTTP 请求客户端 → 服务器普通 API 调用
SSE (Server-Sent Events)服务器 → 客户端(单向)AI 回复、实时通知
WebSocket双向聊天(双向)、实时游戏
长轮询客户端反复请求旧方案,已不推荐

SSE 特点

  • 基于 HTTP(简单)
  • 服务器主动推送
  • 数据格式简单(data: ...\n\n
  • 自动重连
  • 浏览器原生 API:EventSource(但本项目用 fetch + ReadableStream,更灵活)

2. AI 流式回复的工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
用户点发送

前端发 GET 请求到 /api/chat/stream

服务器开始返回数据(不是一次性,是持续的)

data: {"type":"thinking","content":"让我想想"}
↓ 5ms 后
data: {"type":"thinking","content":"..."}
↓ 10ms 后
data: {"type":"text","content":"Spring"}
↓ 5ms 后
data: {"type":"text","content":" AI 2.0"}
↓ ...
data: {"type":"complete"}
data: [DONE]

服务器关闭连接

前端解析完所有事件,UI 显示完整回复

关键点

  • 数据是流式的(一次拿一点)
  • 前端实时显示(打字机效果)
  • 不是等所有数据到齐再渲染

3. 浏览器原生 API:EventSource(不推荐用)

1
2
3
4
5
6
// 简单但不够灵活
const source = new EventSource('/api/chat/stream');
source.onmessage = (e) => {
const data = JSON.parse(e.data);
console.log(data);
};

缺点

  • 不能自定义请求方法(只能 GET)
  • 不能自定义请求头
  • 取消比较麻烦
  • 错误处理不够灵活

4. 项目用的方式:fetch + ReadableStream

优势

  • 完全控制请求(方法、头、body)
  • 灵活取消(AbortController)
  • 可以发 POST 请求

4.1 核心代码(useSseStream.ts 简化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// hooks/useSseStream.ts
export async function parseSseStream({ url, signal, onEvent, onDone, onError }) {
try {
// 1. 发起 fetch 请求
const response = await fetch(url, { signal });
// ↑ signal 用于取消请求(AbortController)

// 2. 拿到响应体的「流式读取器」
const reader = response.body!.getReader();
// ↑ getReader() 返回 ReadableStreamDefaultReader
// ↑ ! 断言 body 一定存在

// 3. 文本解码器(处理 UTF-8 编码)
const decoder = new TextDecoder();
let buffer = '';

// 4. 循环读取数据块
while (true) {
const { done, value } = await reader.read();
// ↑ done: 流是否结束
// ↑ value: Uint8Array 数据块

if (done) break; // 流结束,退出循环

// 5. 解码为字符串
buffer += decoder.decode(value, { stream: true });
// ↑ { stream: true } 表示可能还有后续数据

// 6. 按行分割(每行是一个事件)
const lines = buffer.split('\n');
buffer = lines.pop() || '';
// ↑ 最后一行可能不完整,保留到下次

// 7. 处理每一行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 去掉 "data: " 前缀

if (data === '[DONE]') {
onDone();
return;
}

try {
const event = JSON.parse(data);
onEvent(event); // 回调:处理事件
} catch (e) {
console.warn('解析事件失败:', data);
}
}
}
}

onDone();
} catch (err) {
if ((err as Error).name !== 'AbortError') {
onError(err);
}
// AbortError 是用户主动取消,不算错误
}
}

4.2 关键概念解释

ReadableStream

一种「流式数据源」,可以一边产生数据一边消费

1
2
数据源(网络)→ ReadableStream → 消费者(你的代码)
↑ 可以暂停、读取、取消

类比:水龙头(数据源)→ 水管(ReadableStream)→ 你接水(消费者)

getReader()

拿到流的「读取器」,用来读数据:

1
2
const reader = stream.getReader();
const { done, value } = await reader.read();

TextDecoder

把字节(Uint8Array)转成字符串:

1
2
const decoder = new TextDecoder();
const text = decoder.decode(uint8Array); // '你好'

AbortController + signal

用来取消请求

1
2
3
4
5
6
7
const controller = new AbortController();

// 5 秒后取消
setTimeout(() => controller.abort(), 5000);

fetch(url, { signal: controller.signal });
// ↑ 传入 signal,请求被取消时会抛 AbortError

项目里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// chatStore.ts - sendMessage
abortController = new AbortController(); // 创建
await parseSseStream({
url,
signal: abortController.signal, // 传给 fetch
...
});

// 用户点「停止」时
async stopMessage() {
abortController?.abort(); // 取消!
await fetch(getStopUrl(chat.id)); // 同时通知后端
set({ isSending: false });
}

5. 后端数据格式约定

后端必须按这个格式返回:

1
2
3
4
5
6
7
8
9
10
data: {"type":"thinking","content":"让我想想..."}\n
\n
data: {"type":"text","content":"Spring"}\n
\n
data: {"type":"text","content":" AI 2.0"}\n
\n
data: {"type":"complete"}\n
\n
data: [DONE]\n
\n

字段说明

  • 每行 data: 开头
  • 一个事件用 \n\n 分隔
  • JSON 内容是事件数据
  • [DONE] 表示流结束

前端代码按这个约定解析


6. 事件类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// src/constants/streamTypes.ts
export const STREAM_TYPES = {
TEXT: 'text',
THINKING: 'thinking',
TOOL_START: 'tool_start',
TOOL_END: 'tool_end',
REFERENCE: 'reference',
RECOMMEND: 'recommend',
ERROR: 'error',
COMPLETE: 'complete',
DONE: '[DONE]',
} as const;

export type StreamEvent =
| { type: 'text'; content: string }
| { type: 'thinking'; content: string }
| { type: 'tool_start'; toolName: string; toolCallId: string }
| { type: 'tool_end'; toolName: string; toolCallId: string }
| { type: 'reference'; content: string }
| { type: 'recommend'; content: string }
| { type: 'error'; content: string; detail?: string; code?: string }
| { type: 'complete' }
| { type: 'done' };

7. applyEvent — 事件处理函数

位置chatStore.ts 内部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
function applyEvent(aiMsg: ChatMessage, event: StreamEvent) {
switch (event.type) {
case 'text': {
currentStreamContent += event.content;
aiMsg.content = currentStreamContent;
break;
}
case 'thinking': {
// 思考文本追加到 timeline
aiMsg.timeline = aiMsg.timeline ?? [];
const last = aiMsg.timeline[aiMsg.timeline.length - 1];
if (last && last.type === 'thinking') {
last.content = (last.content ?? '') + event.content; // 合并相邻
} else {
aiMsg.timeline.push({ type: 'thinking', content: event.content });
}
currentThinkingText += event.content;
break;
}
case 'tool_start': {
// 工具开始
aiMsg.timeline.push({
type: 'tool',
toolName: event.toolName,
toolCallId: event.toolCallId,
status: 'running',
});
break;
}
case 'tool_end': {
// 工具结束:找到对应的 running 工具标记为 completed
const idx = aiMsg.timeline.findIndex(
(t) => t.type === 'tool' && t.toolCallId === event.toolCallId && t.status === 'running'
);
if (idx >= 0) {
aiMsg.timeline[idx] = { ...aiMsg.timeline[idx], status: 'completed' };
} else {
aiMsg.timeline.push({
type: 'tool',
toolName: event.toolName,
toolCallId: event.toolCallId,
status: 'completed',
});
}
break;
}
case 'reference': {
const refs = processReferences(event.content); // 解析参考 JSON
aiMsg.reference = refs;
if (refs.length > 0) aiMsg.showReference = true;
break;
}
case 'recommend': {
aiMsg.recommend = processRecommendations(event.content);
break;
}
case 'error': {
// 错误:追加到 timeline,自动展开
aiMsg.timeline.push({
type: 'error',
message: event.content,
detail: event.detail,
code: event.code,
});
aiMsg.showTimeline = true;
break;
}
case 'complete': {
// 标记完成
break;
}
}
}

关键模式

  • 用闭包变量 currentStreamContent 累积文本
  • 每次 set 触发 React 更新
  • 累加效果:UI 看起来像打字机

8. 时序图:完整的一次 AI 回复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
时间线   前端                           后端
────────────────────────────────────────────────
t0 userMsg 推入 chat.messages
aiMsg 占位推入
set({ isSending: true })

fetch(url) ─────────────────→ 处理请求

t1 ←─── data: {"type":"thinking"...}
applyEvent(thinking)
set({ chatList: [...] })
React 重渲染:UI 显示思考内容

t2 ←─── data: {"type":"tool_start",...}
applyEvent(tool_start)
set({ chatList: [...] })
React 重渲染:UI 显示工具调用中

t3 ←─── data: {"type":"tool_end",...}
applyEvent(tool_end)
set({ chatList: [...] })
React 重渲染:UI 显示工具完成

t4 ←─── data: {"type":"text","content":"Spring"}
applyEvent(text)
currentStreamContent = "Spring"
set({ chatList: [...] })
React 重渲染:UI 显示 "Spring"

t5 ←─── data: {"type":"text","content":" AI"}
currentStreamContent = "Spring AI"
set → UI 更新为 "Spring AI"

(继续接收 text 事件...)

tN ←─── data: {"type":"recommend",...}
applyEvent(recommend)
set → UI 显示推荐问题

tN+1 ←─── data: {"type":"complete"}
applyEvent(complete)

tN+2 ←─── data: [DONE]
onDone()
set({ isSending: false })
React 重渲染:UI 隐藏 loading,显示完整回复

9. 怎么调试 SSE

9.1 浏览器 DevTools

  1. 打开 DevTools → Network 标签
  2. 过滤 /stream
  3. 发送消息
  4. 点击 chat/stream 请求
  5. 切换到 Response 标签
  6. 实时看到推送的数据(不会一次性显示)

9.2 console.log 调试

parseSseStream 中加日志:

1
2
3
4
5
6
7
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
console.log('[SSE]', data); // 看每条数据
// ...
}
}

9.3 用 curl 模拟

1
2
curl -N http://localhost:8080/api/chat/stream?agentType=chat&conversationId=test&message=hello
# ↑ -N 不缓冲,实时显示

10. 后端需要做什么

如果你要写一个测试后端(参考),需要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Python (Flask) 示例
from flask import Flask, Response, stream_with_context
import json
import time

app = Flask(__name__)

@app.route('/api/chat/stream')
def stream():
def generate():
# 模拟 AI 思考
yield 'data: ' + json.dumps({"type": "thinking", "content": "让我想想..."}) + '\n\n'
time.sleep(0.5)

# 模拟逐字返回
for word in "你好世界".split(''):
yield 'data: ' + json.dumps({"type": "text", "content": word}) + '\n\n'
time.sleep(0.1)

yield 'data: ' + json.dumps({"type": "complete"}) + '\n\n'
yield 'data: [DONE]\n\n'

return Response(
stream_with_context(generate()),
content_type='text/event-stream', # 重要!SSE 必须这个 Content-Type
)

关键点

  • Content-Type: text/event-stream
  • 每行 data: ...\n\n
  • 后端要保持连接(不要关闭)

11. 一段话总结

SSE = 服务器单向推送数据。
本项目用 fetch + ReadableStream(不是 EventSource)— 更灵活。
parseSseStream 函数:发起请求 → 循环读流 → 按行解析 JSON → 回调 onEvent。
applyEvent 函数:根据事件类型更新消息。
打字机效果:每收到一个 text 事件就 setState 触发重渲染。
取消:AbortController.abort() 中断 fetch。


接下来

到这里,所有项目用到的技术都讲完了。
接下来是项目代码导览:16-项目组件详解.md — 把所有组件串起来。
然后是 17-API与后端交互.md18-开发者指南.md