发布于 2026-05-17, 更新于 2026-06-20
在构建大模型系统时,流式输出已成为 AI 产品的标配。本文将详解一套基于 Server-Sent Events (SSE) 的全链路方案,涵盖基础的单接口双流同步与流中断机制,延伸至双通道异步队列解耦与长连接生命周期清理,以及 Nginx 反向代理部署和防缓冲优化。
在使用 LangGraph 搭建的 Agent 中,对话过程包含模型思考、工具调用、模型回复等多个步骤。利用 astream 配合 stream_mode="messages",我们可以细粒度地截获图在运行时的各种消息事件。
双流输出设计:
artifact 字段,则可将其作为结构化引用透传给前端,用于用户侧的“引用来源”显示。stream_mode="messages" 事件流中;若未出现,则需要结合 stream_mode="updates" 或自定义事件机制传递 Artifact。通过对这两种消息格式的合理侦听/捕获,系统不仅能向用户流式输出文字内容,还能将工具运行后产生的结构化引用(Artifact)抛出给前端,做到了答复内容与参考文献卡片的同步展示。我们将产生的四种关键信息包装为统一的 JSON 格式:token (文本内容)、citation (来源引用)、error (异常情况) 和 done (结束标志)。
from langchain_core.messages import AIMessageChunk, ToolMessage, HumanMessage
import json
import asyncio
async def stream_chat(message: str, session_id: str):
config = {"configurable": {"thread_id": session_id}}
try:
# 监听图流转的每一个 message 事件
async for msg, metadata in agent.astream(
{"messages": [HumanMessage(content=message)]},
config=config,
stream_mode="messages",
):
# 1. 拦截模型生成的 Token
if isinstance(msg, AIMessageChunk) and msg.content:
yield {"type": "token", "content": msg.content}
# 2. 拦截工具执行完毕后返回的 Artifact(我们在定义 tool 时预留的结构化数据)
elif isinstance(msg, ToolMessage) and msg.artifact:
# 可以在 Tool 执行完毕的瞬间将引用推送给前端,让用户在等待最终回答时先看到文献来源
yield {"type": "citation", "content": msg.artifact}
# 3. 输出结束标志
yield {"type": "done", "content": "[DONE]"}
except asyncio.CancelledError:
# 正常的中断信号,向外抛出交由底层框架接管
raise
except Exception as e:
# 兜底:处理生成过程中的异常情况
yield {"type": "error", "content": f"生成失败: {str(e)}"}
接下来,在 FastAPI 中,我们将这个生成器包装成标准的 SSE 数据帧:data: <json>\n\n。
from fastapi.responses import StreamingResponse
def _sse_encode(data: dict) -> str:
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
@router.post("/chat")
async def chat_endpoint(req: ChatRequest):
async def event_stream():
async for item in stream_chat(req.message, req.session_id):
yield _sse_encode(item)
return StreamingResponse(event_stream(), media_type="text/event-stream")
在前端,为了灵活性,我们通常不使用原生的 EventSource (因为它仅支持 GET 请求,且无法自定义请求体,难以直接承载聊天输入内容),而是使用 fetch 结合 ReadableStream 来处理。
关键点:
\r\n\r\n)。因此,我们需要在前端维护一个 buffer,兼容处理换行符并进行可靠的帧解析。split("\n") 再取 data: 行async function sendChatRequest(text, sessionId) {
const res = await fetch("/api/v1/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: text, session_id: sessionId, stream: true }),
});
// 严谨度提升:拦截非 200 的异常响应,避免流解析器报错
if (!res.ok) {
throw new Error(`HTTP Error: ${res.status}`);
}
const reader = res.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
let aiText = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
// 追加新数据到 buffer
buffer += decoder.decode(value, { stream: true });
// 兼容处理 Windows 风格的换行符 \r\n,统一替换为 \n 方便解析
buffer = buffer.replace(/\r\n/g, "\n");
// 依次回车换行符 '\n\n' 解析完整的 SSE 帧
while (true) {
const idx = buffer.indexOf("\n\n");
if (idx === -1) break;
const frame = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
for (const line of frame.split("\n")) {
if (!line.startsWith("data: ")) continue;
const payload = JSON.parse(line.slice(6));
// 根据协议分发事件
if (payload.type === "token") {
aiText += payload.content;
renderMarkdown(aiText); // 实时渲染打字机效果
} else if (payload.type === "citation") {
renderCitations(payload.content); // 在气泡下方渲染参考链接
} else if (payload.type === "error") {
console.error("Agent Error:", payload.content);
renderError(payload.content); // 渲染流式返回的报错提示
}
}
}
}
}
前端和 node.js 可以通过 AbortController 取消掉发出的 fetch 请求。
const abortController = new AbortController();
fetch(url, {
...,
signal: abortController.signal,
})
// 需要中断请求时调用
abortController.abort();
而在服务端方面,由于项目底层依赖的是 FastAPI (Starlette / Uvicorn) 和 LangGraph,当客户端(前端或调用方)强行中断连接或刷新页面时,服务端会自动进行以下处理:
yield 或 async for 处)抛出一个 asyncio.CancelledError。CancelledError ,进行日志记录后,重新 raise 抛出。finally 块会被成功触发,打印出原有的 chat_stream_finished 耗时日志,防止线程泄漏。CancelledError 会向下传递给 LangGraph 的 agent.astream(...)。若底层模型 SDK 正确支持 asyncio 取消,则底层的网络请求(比如请求大模型接口的 HTTP 请求)通常会被终止并释放资源,不会导致大模型继续在后台空跑产生不必要的 Token 消耗。CancelledError 与 Exception 均直接(或间接)继承自 BaseException,但 CancelledError 不属于 Exception 体系,因此业务代码中的 except Exception as e: 不会捕获到 asyncio.CancelledError。CancelledError 不是一个通常意义上的代码“错误(Error)”,而是一种“系统控制流信号(Signal)”,应该被允许畅通无阻地向外层抛出,直到被底层异步框架(如 uvicorn/asyncio)接管并安全销毁任务。基于 FastAPI 的异步流控制和 asyncio 的任务取消机制,对于频繁产生 SSE 数据的场景,通常无需额外轮询 request.is_disconnected()(因为在 StreamingResponse 消费生成器时,每次向客户端发送数据都会进入一次异步发送流程,这构成了天然的取消感知点);但若存在长时间无数据发送的耗时任务,主动断连检测或使用 asyncio.TaskGroup 仍可能有价值。
在处理更复杂的 AI 业务场景时(例如需要流式反馈多步骤检索进度,或解决原生 EventSource 不支持 POST 请求且无法自定义请求体的限制),可以采用以下进阶技术点。
除默认的 data: 外,SSE 支持声明 event: 自定义事件类型。前端能直接利用浏览器原生的 EventSource.addEventListener 绑定事件处理器进行消费,无需手动拆包解析:
协议格式:
event: progress
data: {"step": "retrieval", "status": "active"}
event: delta
data: "文本片段"
前端消费:
const eventSource = new EventSource(`/api/v1/stream/${sessionId}`);
eventSource.addEventListener('progress', (e) => updateProgressBar(JSON.parse(e.data)));
eventSource.addEventListener('delta', (e) => appendToken(JSON.parse(e.data)));
针对耗时较长、包含复杂串/并行计算的工作流,可将触发计算与数据推送拆分为双通道:
asyncio.Queue,并将任务丢入后台异步执行,接口立即返回会话 ID 以结束 POST 请求。EventSource 发起 GET 长连接,服务端从该会话的队列中持续读取消息并推送。在解耦架构下,如果客户端主动断开长连接(如刷新页面),服务端需及时释放内存中对应的会话队列:
await request.is_disconnected() 实时判断客户端 TCP 连接状态。finally 块中执行清理逻辑,将对应的 asyncio.Queue 移出全局存储字典,避免内存泄露。在复杂的生产环境中,连接往往会经过反向代理(Nginx/Kong)、负载均衡器(ALB/ELB)或 CDN(Cloudflare)。这些网关设备通常设置了连接空闲超时(通常为 30~120 秒)。如果大模型在进行长时间的深度思考,或复杂工具调用导致长达几十秒没有新的 SSE 数据产出,连接极易被网关强制掐断。
为解决这一问题,建议在无数据发送时定期(例如每 15~30 秒)向客户端发送一个 SSE 注释帧(以冒号 : 开头)作为心跳包:
# 每隔 15 秒发送一个心跳注释帧,前端 EventSource 会自动忽略以冒号开头的行,但能保持连接活跃
yield ": ping\n\n"
如果在生产环境中 FastAPI 前方架设了 Nginx,你可能会发现前端并没有体验到“流式”效果,而是等待了许久后突然收到了一整段文字。
这是因为 Nginx 默认启用了代理缓冲(Proxy Buffering),它会尝试把后端的响应攒满一定大小后再一次性发给客户端。对于 SSE 来说,这是致命的。针对此问题,有以下两种解决方案:
我们需要针对流式路由单独关闭缓冲机制。需要注意的是,如果使用了进阶的双通道解耦架构,Nginx 不仅需要对 /api/v1/chat 接口关闭缓冲,也需要对流分发接口 /api/v1/stream/ 关闭缓冲:
# 通用 API 路由配置
location /api/ {
proxy_pass http://backend_upstream;
# ... 常规代理头配置
}
# 针对流式接口(对话及分发通道)的专属配置(使用正则匹配)
location ~ ^/api/v1/(chat|stream/) {
proxy_pass http://backend_upstream;
proxy_http_version 1.1;
proxy_set_header Connection "";
# 关键配置:关闭缓冲!
proxy_buffering off;
proxy_cache off;
# 禁用 gzip(防止压缩引擎导致的数据滞留)
gzip off;
# 延长超时时间,避免大模型长时间生成时 Nginx 掐断连接
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
send_timeout 3600s;
}
在部署 SSE 服务时,若不想或无法修改 Nginx 配置文件,通常可以通过在后端响应头中加入 X-Accel-Buffering: no 来关闭 Nginx 的代理缓冲(需注意:如果 Nginx 配置了 proxy_ignore_headers 忽略该响应头,则此设置会失效):
# FastAPI 示例
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 通知 Nginx 关闭此响应的代理缓冲
}
)
统一的 JSON Payload 结构、健壮的前端缓冲解析机制,以及灵活的反向代理防缓冲部署方案(涵盖 Nginx 配置与后端代码级处理),共同构成了一个可用性极高的流式对话全链路方案。这使得我们的 AI 系统不仅能够“边想边说”,还能在对话途中动态插入多媒体卡片或结构化引用,极大地丰富了产品的交互表现力。
而在面对多步骤、长耗时的复杂 Agent 场景时,通过进一步引入双通道异步队列解耦、自定义命名事件等,则能以更低的前端开销和更健壮的架构设计,实现高度工程化的流式步骤追踪与多模态数据分发。