15 — SSE 流式通信
本章讲解 AI 「打字机效果」是怎么实现的。
这是项目中最技术性的部分。
1. 什么是 SSE?
Server-Sent Events = 服务器单向推送数据给浏览器的协议。
对比其他技术:
| 方式 | 方向 | 用途 |
|---|
| HTTP 请求 | 客户端 → 服务器 | 普通 API 调用 |
| SSE (Server-Sent Events) | 服务器 → 客户端(单向) | AI 回复、实时通知 |
| WebSocket | 双向 | 聊天(双向)、实时游戏 |
| 长轮询 | 客户端反复请求 | 旧方案,已不推荐 |
SSE 特点:
- 基于 HTTP(简单)
- 服务器主动推送
- 数据格式简单(
data: ...\n\n) - 自动重连
- 浏览器原生 API:
EventSource(但本项目用 fetch + ReadableStream,更灵活)
2. AI 流式回复的工作流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 用户点发送 ↓ 前端发 GET 请求到 /api/chat/stream ↓ 服务器开始返回数据(不是一次性,是持续的) ↓ data: {"type":"thinking","content":"让我想想"} ↓ 5ms 后 data: {"type":"thinking","content":"..."} ↓ 10ms 后 data: {"type":"text","content":"Spring"} ↓ 5ms 后 data: {"type":"text","content":" AI 2.0"} ↓ ... data: {"type":"complete"} data: [DONE] ↓ 服务器关闭连接 ↓ 前端解析完所有事件,UI 显示完整回复
|
关键点:
- 数据是流式的(一次拿一点)
- 前端实时显示(打字机效果)
- 不是等所有数据到齐再渲染
3. 浏览器原生 API:EventSource(不推荐用)
1 2 3 4 5 6
| const source = new EventSource('/api/chat/stream'); source.onmessage = (e) => { const data = JSON.parse(e.data); console.log(data); };
|
缺点:
- 不能自定义请求方法(只能 GET)
- 不能自定义请求头
- 取消比较麻烦
- 错误处理不够灵活
4. 项目用的方式:fetch + ReadableStream
优势:
- 完全控制请求(方法、头、body)
- 灵活取消(AbortController)
- 可以发 POST 请求
4.1 核心代码(useSseStream.ts 简化)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| export async function parseSseStream({ url, signal, onEvent, onDone, onError }) { try { const response = await fetch(url, { signal });
const reader = response.body!.getReader();
const decoder = new TextDecoder(); let buffer = '';
while (true) { const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n'); buffer = lines.pop() || '';
for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6);
if (data === '[DONE]') { onDone(); return; }
try { const event = JSON.parse(data); onEvent(event); } catch (e) { console.warn('解析事件失败:', data); } } } }
onDone(); } catch (err) { if ((err as Error).name !== 'AbortError') { onError(err); } } }
|
4.2 关键概念解释
ReadableStream
一种「流式数据源」,可以一边产生数据一边消费:
1 2
| 数据源(网络)→ ReadableStream → 消费者(你的代码) ↑ 可以暂停、读取、取消
|
类比:水龙头(数据源)→ 水管(ReadableStream)→ 你接水(消费者)
getReader()
拿到流的「读取器」,用来读数据:
1 2
| const reader = stream.getReader(); const { done, value } = await reader.read();
|
TextDecoder
把字节(Uint8Array)转成字符串:
1 2
| const decoder = new TextDecoder(); const text = decoder.decode(uint8Array);
|
AbortController + signal
用来取消请求:
1 2 3 4 5 6 7
| const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
fetch(url, { signal: controller.signal });
|
项目里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| abortController = new AbortController(); await parseSseStream({ url, signal: abortController.signal, ... });
async stopMessage() { abortController?.abort(); await fetch(getStopUrl(chat.id)); set({ isSending: false }); }
|
5. 后端数据格式约定
后端必须按这个格式返回:
1 2 3 4 5 6 7 8 9 10
| data: {"type":"thinking","content":"让我想想..."}\n \n data: {"type":"text","content":"Spring"}\n \n data: {"type":"text","content":" AI 2.0"}\n \n data: {"type":"complete"}\n \n data: [DONE]\n \n
|
字段说明:
- 每行
data: 开头 - 一个事件用
\n\n 分隔 - JSON 内容是事件数据
[DONE] 表示流结束
前端代码按这个约定解析。
6. 事件类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| export const STREAM_TYPES = { TEXT: 'text', THINKING: 'thinking', TOOL_START: 'tool_start', TOOL_END: 'tool_end', REFERENCE: 'reference', RECOMMEND: 'recommend', ERROR: 'error', COMPLETE: 'complete', DONE: '[DONE]', } as const;
export type StreamEvent = | { type: 'text'; content: string } | { type: 'thinking'; content: string } | { type: 'tool_start'; toolName: string; toolCallId: string } | { type: 'tool_end'; toolName: string; toolCallId: string } | { type: 'reference'; content: string } | { type: 'recommend'; content: string } | { type: 'error'; content: string; detail?: string; code?: string } | { type: 'complete' } | { type: 'done' };
|
7. applyEvent — 事件处理函数
位置:chatStore.ts 内部
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| function applyEvent(aiMsg: ChatMessage, event: StreamEvent) { switch (event.type) { case 'text': { currentStreamContent += event.content; aiMsg.content = currentStreamContent; break; } case 'thinking': { aiMsg.timeline = aiMsg.timeline ?? []; const last = aiMsg.timeline[aiMsg.timeline.length - 1]; if (last && last.type === 'thinking') { last.content = (last.content ?? '') + event.content; } else { aiMsg.timeline.push({ type: 'thinking', content: event.content }); } currentThinkingText += event.content; break; } case 'tool_start': { aiMsg.timeline.push({ type: 'tool', toolName: event.toolName, toolCallId: event.toolCallId, status: 'running', }); break; } case 'tool_end': { const idx = aiMsg.timeline.findIndex( (t) => t.type === 'tool' && t.toolCallId === event.toolCallId && t.status === 'running' ); if (idx >= 0) { aiMsg.timeline[idx] = { ...aiMsg.timeline[idx], status: 'completed' }; } else { aiMsg.timeline.push({ type: 'tool', toolName: event.toolName, toolCallId: event.toolCallId, status: 'completed', }); } break; } case 'reference': { const refs = processReferences(event.content); aiMsg.reference = refs; if (refs.length > 0) aiMsg.showReference = true; break; } case 'recommend': { aiMsg.recommend = processRecommendations(event.content); break; } case 'error': { aiMsg.timeline.push({ type: 'error', message: event.content, detail: event.detail, code: event.code, }); aiMsg.showTimeline = true; break; } case 'complete': { break; } } }
|
关键模式:
- 用闭包变量
currentStreamContent 累积文本 - 每次
set 触发 React 更新 - 累加效果:UI 看起来像打字机
8. 时序图:完整的一次 AI 回复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| 时间线 前端 后端 ──────────────────────────────────────────────── t0 userMsg 推入 chat.messages aiMsg 占位推入 set({ isSending: true }) ↓ fetch(url) ─────────────────→ 处理请求 ↓ t1 ←─── data: {"type":"thinking"...} applyEvent(thinking) set({ chatList: [...] }) React 重渲染:UI 显示思考内容 ↓ t2 ←─── data: {"type":"tool_start",...} applyEvent(tool_start) set({ chatList: [...] }) React 重渲染:UI 显示工具调用中 ↓ t3 ←─── data: {"type":"tool_end",...} applyEvent(tool_end) set({ chatList: [...] }) React 重渲染:UI 显示工具完成 ↓ t4 ←─── data: {"type":"text","content":"Spring"} applyEvent(text) currentStreamContent = "Spring" set({ chatList: [...] }) React 重渲染:UI 显示 "Spring" ↓ t5 ←─── data: {"type":"text","content":" AI"} currentStreamContent = "Spring AI" set → UI 更新为 "Spring AI" ↓ (继续接收 text 事件...) ↓ tN ←─── data: {"type":"recommend",...} applyEvent(recommend) set → UI 显示推荐问题 ↓ tN+1 ←─── data: {"type":"complete"} applyEvent(complete) ↓ tN+2 ←─── data: [DONE] onDone() set({ isSending: false }) React 重渲染:UI 隐藏 loading,显示完整回复
|
9. 怎么调试 SSE
- 打开 DevTools → Network 标签
- 过滤
/stream - 发送消息
- 点击
chat/stream 请求 - 切换到 Response 标签
- 实时看到推送的数据(不会一次性显示)
9.2 console.log 调试
在 parseSseStream 中加日志:
1 2 3 4 5 6 7
| for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); console.log('[SSE]', data); } }
|
9.3 用 curl 模拟
1 2
| curl -N http://localhost:8080/api/chat/stream?agentType=chat&conversationId=test&message=hello
|
10. 后端需要做什么
如果你要写一个测试后端(参考),需要:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| from flask import Flask, Response, stream_with_context import json import time
app = Flask(__name__)
@app.route('/api/chat/stream') def stream(): def generate(): yield 'data: ' + json.dumps({"type": "thinking", "content": "让我想想..."}) + '\n\n' time.sleep(0.5) for word in "你好世界".split(''): yield 'data: ' + json.dumps({"type": "text", "content": word}) + '\n\n' time.sleep(0.1) yield 'data: ' + json.dumps({"type": "complete"}) + '\n\n' yield 'data: [DONE]\n\n' return Response( stream_with_context(generate()), content_type='text/event-stream', )
|
关键点:
Content-Type: text/event-stream- 每行
data: ...\n\n - 后端要保持连接(不要关闭)
11. 一段话总结
SSE = 服务器单向推送数据。
本项目用 fetch + ReadableStream(不是 EventSource)— 更灵活。
parseSseStream 函数:发起请求 → 循环读流 → 按行解析 JSON → 回调 onEvent。
applyEvent 函数:根据事件类型更新消息。
打字机效果:每收到一个 text 事件就 setState 触发重渲染。
取消:AbortController.abort() 中断 fetch。
接下来
到这里,所有项目用到的技术都讲完了。
接下来是项目代码导览:16-项目组件详解.md — 把所有组件串起来。
然后是 17-API与后端交互.md 和 18-开发者指南.md。