发布于 2026-05-17
在构建大模型 AI 对话系统时,如果让用户一直等待模型生成完毕才展示内容(通常需要几秒到十几秒),体验会很差。因此,“打字机效果”的流式输出(Streaming)已经成为 AI 产品的标配。
同时,在引入了 Agent 和 Tool Calling 之后,我们不仅需要输出大模型生成的 Token,还需要在特定时刻输出工具执行后产生的结构化信息(如引用文档的链接)。本文将详解一套基于 Server-Sent Events (SSE) 的前后端流式全链路方案。
在使用 LangGraph 搭建的 Agent 中,对话过程包含模型思考、工具调用、模型回复等多个步骤。利用 astream 配合 stream_mode="messages",我们可以细粒度地截获图在运行时的各种消息事件。
双流输出设计:
我们将产生的四种关键信息包装为统一的 JSON 格式:token (文本内容)、citation (来源引用)、error (异常情况) 和 done (结束标志)。
from langchain_core.messages import AIMessageChunk, ToolMessage
import json
import asyncio
async def stream_chat(message: str, session_id: str):
config = {"configurable": {"thread_id": session_id}}
try:
# 监听图流转的每一个 message 事件
async for msg, metadata in agent.astream(
{"messages": [HumanMessage(content=message)]},
config=config,
stream_mode="messages",
):
# 1. 拦截模型生成的 Token
if isinstance(msg, AIMessageChunk) and msg.content:
yield {"type": "token", "content": msg.content}
# 2. 拦截工具执行完毕后返回的 Artifact(我们在定义 tool 时预留的结构化数据)
elif isinstance(msg, ToolMessage) and msg.artifact:
# 可以在 Tool 执行完毕的瞬间将引用推送给前端,让用户在等待最终回答时先看到文献来源
yield {"type": "citation", "content": msg.artifact}
# 3. 输出结束标志
yield {"type": "done", "content": "[DONE]"}
except asyncio.CancelledError:
# 正常的中断信号,向外抛出交由底层框架接管
raise
except Exception as e:
# 兜底:处理生成过程中的异常情况
yield {"type": "error", "content": f"生成失败: {str(e)}"}
接下来,在 FastAPI 中,我们将这个生成器包装成标准的 SSE 数据帧:data: <json>\n\n。
from fastapi.responses import StreamingResponse
def _sse_encode(data: dict) -> str:
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
@router.post("/chat")
async def chat_endpoint(req: ChatRequest):
async def event_stream():
async for item in stream_chat(req.message, req.session_id):
yield _sse_encode(item)
return StreamingResponse(event_stream(), media_type="text/event-stream")
在前端,为了灵活性,我们通常不使用原生的 EventSource (因为它仅支持 GET 请求),而是使用 fetch 结合 ReadableStream 来处理。
关键点:
\n\n,我们需要维护一个 buffer 来进行可靠的行解析split("\n") 再取 data: 行async function sendChatRequest(text, sessionId) {
const res = await fetch("/api/v1/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: text, session_id: sessionId, stream: true }),
});
// 严谨度提升:拦截非 200 的异常响应,避免流解析器报错
if (!res.ok) {
throw new Error(`HTTP Error: ${res.status}`);
}
const reader = res.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
let aiText = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
// 追加新数据到 buffer
buffer += decoder.decode(value, { stream: true });
// 依次回车换行符 '\n\n' 解析完整的 SSE 帧
while (true) {
const idx = buffer.indexOf("\n\n");
if (idx === -1) break;
const frame = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
for (const line of frame.split("\n")) {
if (!line.startsWith("data: ")) continue;
const payload = JSON.parse(line.slice(6));
// 根据协议分发事件
if (payload.type === "token") {
aiText += payload.content;
renderMarkdown(aiText); // 实时渲染打字机效果
} else if (payload.type === "citation") {
renderCitations(payload.content); // 在气泡下方渲染参考链接
} else if (payload.type === "error") {
console.error("Agent Error:", payload.content);
renderError(payload.content); // 渲染流式返回的报错提示
}
}
}
}
}
前端和 node.js 可以通过 AbortController 取消掉发出的 fetch 请求。
const abortController = new AbortController();
fetch(url, {
...,
signal: abortController.signal,
})
// 需要中断请求时调用
abortController.abort();
而在服务端方面,由于项目底层依赖的是 FastAPI (Starlette / Uvicorn) 和 LangGraph,当客户端(前端或调用方)强行中断连接或刷新页面时,服务端会自动进行以下处理:
yield 或 async for 处)抛出一个 asyncio.CancelledError。CancelledError ,进行日志记录后,重新 raise 抛出。finally 块会被成功触发,打印出原有的 chat_stream_finished 耗时日志,防止线程泄漏。CancelledError 会向下传递给 LangGraph 的 agent.astream(...),LangChain 底层的网络请求(比如请求通义千问大模型接口的 HTTP 请求)也会随之中断,自动释放资源,不会导致大模型继续在后台空跑产生不必要的 Token 消耗。CancelledError 和 Exception 都继承自 BaseException ,业务代码中 except Exception as e: 不会捕获到 asyncio.CancelledError。 CancelledError 不是一个通常意义上的代码“错误(Error)”,而是一种“系统控制流信号(Signal)”,应该被允许畅通无阻地向外层抛出,直到被底层异步框架(如 uvicorn/asyncio)接管并安全销毁任务。基于赖 FastAPI 的异步流控制和 asyncio 的任务取消机制机制,服务端不需要做显式的 request.is_disconnected() 轮询检查,不需要在业务逻辑中插入多余的检查点,就能实现“连接断开即停止计算”的效果。
如果在生产环境中 FastAPI 前方架设了 Nginx,你可能会发现前端并没有体验到“流式”效果,而是等待了许久后突然收到了一整段文字。
这是因为 Nginx 默认启用了代理缓冲(Proxy Buffering),它会尝试把后端的响应攒满一定大小后再一次性发给客户端。对于 SSE 来说,这是致命的。
我们需要针对流式路由单独关闭缓冲机制:
# 通用 API 路由配置
location /api/ {
proxy_pass http://backend_upstream;
# ... 常规代理头配置
}
# 针对流式聊天接口的专属配置
location = /api/v1/chat {
proxy_pass http://backend_upstream;
proxy_http_version 1.1;
proxy_set_header Connection "";
# 关键配置:关闭缓冲!
proxy_buffering off;
proxy_cache off;
# 禁用 gzip(防止压缩引擎导致的数据滞留)
gzip off;
# 延长超时时间,避免大模型长时间生成时 Nginx 掐断连接
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
send_timeout 3600s;
}
统一的 JSON Payload 结构、健壮的前端缓冲解析机制,以及关键的反向代理配置,共同构成了一个可用性极高的流式对话全链路方案。这使得我们的 AI 系统不仅能够“边想边说”,还能在对话途中动态插入多媒体卡片或结构化引用,极大地丰富了产品的交互表现力。