一、为什么需要流式输出
大语言模型生成一段 1000 token 的回答,完整跑完可能需要 5 到 10 秒。如果等全部生成完毕再返回,用户盯着空白屏幕干等——这个体验在 GPT-3 时代是常态。ChatGPT 之所以让人觉得”快”,不是因为推理速度更快,而是它用了流式输出:第一个 token 在 200ms 内就出现在屏幕上,后续 token 逐个追加,用户看到的不再是空白等待,而是像打字一样的渐进呈现。
流式输出的价值不只是 UX 层面。服务端不需要把整个响应缓存在内存里再一次性发送,每生成一个片段就立刻 flush 出去,内存占用从 O(n) 降到 O(1)。用户也可以提前终止——如果前几个 token 已经明显跑偏,直接停止生成,省下的计算资源不用白白浪费。
LLM 之外,流式输出同样有用:实时通知推送、股票行情、日志尾随、在线协作的实时编辑同步。这些场景的共同特征是——数据由服务端持续产生,客户端需要尽快看到增量更新,而不是等一轮完整的请求-响应周期。
二、现实类比
新闻滚动条(ticker)和晚间新闻的区别。滚动条来了头条就播,一条接一条,你随时能看到最新消息;晚间新闻要等编辑把所有素材剪辑完才能播出,中间什么都不看得到。流式输出就是新闻滚动条——信息增量到达,你可以逐条处理;传统请求-响应是晚间新闻——等全部准备好才一次性端上来。
还有一层区别:滚动条上的新闻你看了就看了,想跳过也行,不存在”来不及看”的问题;晚间新闻如果你没在电视机前,整段内容就错过了。流式输出天然支持客户端按自己的节奏消费,而批量响应必须一次性接收完整载荷。
三、核心思想
SSE(Server-Sent Events)是一种基于 HTTP 的单向推送协议。客户端发起一个普通 HTTP 请求,服务端不关闭连接,持续发送格式化的事件流。浏览器内置的 EventSource API 自动处理解析和重连。
SSE 协议的文本格式极简:
id: 1\nevent: message\ndata: 一行数据\ndata: 第二行数据\nretry: 3000\n\n每个事件以空行(\n\n)结尾。data 字段可以出现多行,会被自动拼接。id 用于断线重连时告诉服务端上次收到哪条。event 区分事件类型,客户端可以按类型注册监听器。retry 指定重连间隔(毫秒)。
| 字段 | 作用 | 示例 |
|---|---|---|
data | 事件数据,可多行 | data: hello |
id | 事件 ID,用于断线重连 | id: 42 |
event | 事件类型,默认 message | event: update |
retry | 重连间隔(毫秒) | retry: 5000 |
浏览器 EventSource 的自动重连机制是一个被低估的特性。连接断开后,它会按 retry 指定的间隔自动重试,并在请求头中携带 Last-Event-ID,服务端据此从断点续传。这比 WebSocket 断线后需要自己实现重连逻辑要省心得多。
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| 发送单个事件 | O(1) | 追加写入响应流 |
| 客户端接收事件 | O(1) | 逐行解析,无需缓冲整个流 |
| 断线重连 | O(1) | 基于 Last-Event-ID 续传 |
3.1 SSE vs WebSocket vs Long Polling
三种实时通信方式的对比:
| 维度 | SSE | WebSocket | Long Polling |
|---|---|---|---|
| 通信方向 | 单向(服务端→客户端) | 双向 | 单向(模拟) |
| 协议 | HTTP/1.1+ | WS 协议(需 upgrade 握手) | HTTP/1.1+ |
| 自动重连 | 浏览器内置 | 需自行实现 | 天然重试 |
| 数据格式 | 纯文本 | 文本 + 二进制 | 任意 |
| 浏览器支持 | EventSource API | WebSocket API | 普通请求 |
| 连接开销 | 低——复用 HTTP 基础设施 | 中——需要 upgrade | 高——每次都新建请求 |
| 代理/CDN 兼容 | 好——标准 HTTP | 差——部分代理不支持 upgrade | 好——标准 HTTP |
选型建议:如果你只需要服务端往客户端推数据,SSE 是最简单的选择。需要双向通信(聊天室、多人协作编辑)用 WebSocket。Long Polling 已经是过渡方案,只在 SSE 和 WebSocket 都不可用时考虑。
3.2 背压处理
流式输出有一个容易被忽略的问题:如果客户端消费速度跟不上服务端生产速度怎么办?这就是背压(Backpressure)。
在 Go 中,http.ResponseWriter 的 Write 调用在底层缓冲区满时会阻塞,相当于自带背压——服务端写不出去就等着,不会无限膨胀内存。但要注意 http.Flusher 的使用频率:每次 Write 后立刻 Flush 可以保证实时性,但如果客户端处理慢,频繁 flush 可能导致大量小包发送,增加网络开销。实际中通常攒几个事件再 flush 一次。
浏览器端的 EventSource 没有提供背压机制——所有收到的事件都在内存中排队,等待 onmessage 回调处理。如果回调处理慢,事件会堆积。对于高吞吐场景(日志流、行情推送),EventSource 不如 fetch + ReadableStream 灵活,后者可以通过 reader.read() 按需拉取,天然支持背压。
四、变体与对比
| 方案 | 通信方向 | 典型场景 | 优势 | 劣势 |
|---|---|---|---|---|
| SSE | 服务端→客户端 | LLM 流式输出、通知推送、仪表盘 | 协议简单、自动重连、HTTP 原生 | 单向、仅文本、浏览器 6 连接限制(HTTP/1.1) |
| WebSocket | 双向 | 聊天、协作编辑、游戏 | 双向、低延迟、支持二进制 | 需 upgrade、无自动重连、代理兼容性差 |
| Long Polling | 服务端→客户端(模拟) | 兼容性要求高的旧系统 | 最广泛的兼容性 | 延迟高、开销大、非真正流式 |
| Chunked Transfer | 服务端→客户端 | 大文件下载、渐进式渲染 | HTTP/1.1 原生支持 | 无事件结构、无 ID/重连机制 |
Chunked Transfer Encoding 和 SSE 的区别值得多说一句:Chunked 是 HTTP 层面的传输编码,解决的是”响应体长度未知时分块发送”的问题,客户端拿到的是一个连续的字节流,没有事件边界。SSE 是应用层协议,在字节流上定义了事件格式(data:\n\n),有事件 ID、事件类型、重连语义。Chunked 是管道,SSE 是管道上跑的消息协议。
HTTP/1.1 下浏览器对同一域名的 SSE 连接数有限制(通常 6 个),这是一个常被忽略的约束。如果你的页面同时打开多个 SSE 连接,可能阻塞其他 HTTP 请求。HTTP/2 没有这个限制,多路复用让 SSE 连接共享同一个 TCP 连接。
五、多语言实现
5.1 Go 实现
package main
import ( "fmt" "log" "net/http" "time")
// sseHandler 处理 SSE 连接func sseHandler(w http.ResponseWriter, r *http.Request) { // 设置 SSE 必需的响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive")
// 获取 Flusher,用于立即将数据推送到客户端 flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "不支持 Streaming", http.StatusInternalServerError) return }
// 模拟 LLM 逐 token 生成 tokens := []string{"你好", ",", "我是", "一个", "流式", "输出", "的", "示例", "。"} for i, token := range tokens { // 检查客户端是否已断开 select { case <-r.Context().Done(): log.Println("客户端断开连接,停止发送") return default: }
// 写入 SSE 格式的事件 fmt.Fprintf(w, "id: %d\n", i+1) fmt.Fprintf(w, "data: %s\n\n", token) flusher.Flush()
// 模拟生成延迟 time.Sleep(200 * time.Millisecond) }
// 发送结束标记,客户端据此判断流结束 fmt.Fprintf(w, "event: done\ndata: [DONE]\n\n") flusher.Flush()}
func main() { http.HandleFunc("/stream", sseHandler) log.Println("SSE 服务启动在 :8080") log.Fatal(http.ListenAndServe(":8080", nil))}Go 实现的关键点:r.Context().Done() 监听客户端断开——当用户关闭页面或取消请求时,context 会被取消,服务端可以及时停止生成,避免浪费计算资源。http.Flusher 确保每次写入后立刻推送到客户端,而不是等内核缓冲区满。如果服务端是一个 LLM 推理服务,context 取消后应该同时终止推理过程,这通常需要把 context 传递到推理引擎内部。
5.2 TypeScript 实现
服务端(Node.js):
// Node.js SSE 服务端import { createServer } from "http";
createServer((req, res) => { if (req.url === "/stream") { // 设置 SSE 响应头 res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", });
const tokens = ["你好", ",", "我是", "流式", "输出", "示例", "。"]; let index = 0;
const interval = setInterval(() => { // 检查是否还有 token 要发送 if (index < tokens.length) { res.write(`id: ${index + 1}\ndata: ${tokens[index]}\n\n`); index++; } else { // 发送结束事件 res.write("event: done\ndata: [DONE]\n\n"); clearInterval(interval); res.end(); } }, 200);
// 客户端断开时清理 req.on("close", () => { clearInterval(interval); }); }}).listen(8080, () => console.log("SSE 服务启动在 :8080"));客户端(浏览器 EventSource):
// 使用 EventSource 消费 SSE 流function consumeSSE(url: string): void { const source = new EventSource(url);
source.onmessage = (event: MessageEvent) => { // 每收到一个事件,追加到页面 console.log(`收到事件 #${event.lastEventId}: ${event.data}`); };
// 监听自定义事件类型 source.addEventListener("done", (event: MessageEvent) => { console.log("流结束:", event.data); source.close(); });
source.onerror = () => { // EventSource 自动重连,这里只需要记录日志 console.log("连接中断,自动重连中..."); };}
consumeSSE("http://localhost:8080/stream");客户端(fetch + ReadableStream,支持背压):
// 使用 fetch + ReadableStream 消费 SSE,支持背压async function consumeWithBackpressure(url: string): Promise<void> { const response = await fetch(url); const reader = response.body!.getReader(); const decoder = new TextDecoder(); let buffer = "";
while (true) { // read() 返回后才继续拉取,天然背压 const { done, value } = await reader.read(); if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按 SSE 事件边界(双换行)拆分 const parts = buffer.split("\n\n"); buffer = parts.pop()!; // 最后一段可能不完整,保留到下次
for (const part of parts) { const lines = part.split("\n"); let data = ""; for (const line of lines) { if (line.startsWith("data: ")) { data += line.slice(6); } } if (data) { console.log("收到:", data); // 这里可以做耗时处理,处理完才继续 read } } }}
consumeWithBackpressure("http://localhost:8080/stream");TypeScript 客户端有两种消费方式:EventSource 简单开箱即用,自动重连,但只支持 GET 请求,无法自定义请求头;fetch + ReadableStream 更灵活,支持 POST、自定义 Header、背压控制,但需要手动解析 SSE 格式和处理重连。OpenAI 的 ChatGPT API 就是用 fetch + ReadableStream 的方式消费流——因为需要 POST 请求体和 Authorization 头,EventSource 做不到。
六、生产验证
| 项目 | 源码位置 | 用途 |
|---|---|---|
| OpenAI ChatGPT | platform.openai.com/docs | 流式 Chat Completions API,SSE 格式输出 |
| Vercel AI SDK | github.com/vercel/ai | 统一的流式 AI 调用抽象,支持多种模型提供商 |
| Anthropic Claude | docs.anthropic.com | 流式 Messages API,SSE 格式输出 |
- OpenAI ChatGPT — 流式 API 返回的每个事件格式为
data: {"choices":[{"delta":{"content":"..."}}]}\n\n,最后以data: [DONE]\n\n结束。客户端解析delta.content逐字追加。OpenAI 的实现有一个细节值得注意:流式和非流式使用同一个 endpoint,区别仅在于请求体中的stream: true参数,服务端据此切换响应模式。 - Vercel AI SDK — 提供了
streamText和StreamingTextResponse等高层抽象,把不同模型提供商的流式输出统一为标准 SSE 格式。前端用useChathook 一行代码接入,内部自动处理 SSE 解析、错误重试和 UI 更新。对于需要快速接入 LLM 流式输出的项目,这是最省力的方案。 - Anthropic Claude — 流式输出的事件类型更丰富:
message_start、content_block_start、content_block_delta、content_block_stop、message_delta、message_stop,通过event字段区分。这种设计让客户端可以精确追踪生成进度,比如区分”正在思考”和”正在输出文本”。
七、小结
何时使用:
- LLM 流式输出——降低首字延迟,改善用户体验
- 实时通知推送——订单状态变更、系统告警等服务端主动推送场景
- 实时仪表盘——在线人数、交易量、监控指标等持续更新的数据
- 日志流——尾随日志文件,增量推送到前端
何时不用:
- 需要双向通信——聊天室、多人协作编辑,用 WebSocket
- 需要传输二进制数据——图片、音视频流,用 WebSocket
- 高频小更新且协议开销敏感——SSE 的
data: ...\n\n格式有文本开销,纯二进制场景 WebSocket 更紧凑 - 只需要一次性获取完整数据——普通 HTTP 请求即可,流式增加了复杂度没有收益
八、参考资料
- MDN: EventSource - 浏览器端 SSE API 完整文档
- HTML5 Server-Sent Events Specification - W3C SSE 协议规范
- OpenAI Streaming API - ChatGPT 流式输出接口文档
- Vercel AI SDK - Streaming - AI SDK 的流式数据传输指南
- Anthropic Messages API - Streaming - Claude 流式输出接口文档
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






