黑白梦黑白梦

toggle navtoggle nav
  • 文章
  • 专栏
  • 文章
  • 专栏

对话系统”流式输出“的前后端完整实现方案

发布于 2026-05-17, 更新于 2026-06-20

在构建大模型系统时,流式输出已成为 AI 产品的标配。本文将详解一套基于 Server-Sent Events (SSE) 的全链路方案,涵盖基础的单接口双流同步与流中断机制,延伸至双通道异步队列解耦与长连接生命周期清理,以及 Nginx 反向代理部署和防缓冲优化。

后端:将 LangGraph 事件转化为数据流

在使用 LangGraph 搭建的 Agent 中,对话过程包含模型思考、工具调用、模型回复等多个步骤。利用 astream 配合 stream_mode="messages",我们可以细粒度地截获图在运行时的各种消息事件。

双流输出设计:

  • 给用户呈现的消息,主要是 AI 处理后返回的消息。Tool 结果一般是 AI 读取的,不需要直接体现给用户。但如果工具返回的 ToolMessage 中携带了自定义的 artifact 字段,则可将其作为结构化引用透传给前端,用于用户侧的“引用来源”显示。
  • 在部分 LangGraph Agent 配置下,ToolMessage 会出现在 stream_mode="messages" 事件流中;若未出现,则需要结合 stream_mode="updates" 或自定义事件机制传递 Artifact。通过对这两种消息格式的合理侦听/捕获,系统不仅能向用户流式输出文字内容,还能将工具运行后产生的结构化引用(Artifact)抛出给前端,做到了答复内容与参考文献卡片的同步展示。

我们将产生的四种关键信息包装为统一的 JSON 格式:token (文本内容)、citation (来源引用)、error (异常情况) 和 done (结束标志)。

from langchain_core.messages import AIMessageChunk, ToolMessage, HumanMessage
import json
import asyncio

async def stream_chat(message: str, session_id: str):
    config = {"configurable": {"thread_id": session_id}}

    try:
        # 监听图流转的每一个 message 事件
        async for msg, metadata in agent.astream(
            {"messages": [HumanMessage(content=message)]},
            config=config,
            stream_mode="messages",
        ):
            # 1. 拦截模型生成的 Token
            if isinstance(msg, AIMessageChunk) and msg.content:
                yield {"type": "token", "content": msg.content}
            
            # 2. 拦截工具执行完毕后返回的 Artifact(我们在定义 tool 时预留的结构化数据)
            elif isinstance(msg, ToolMessage) and msg.artifact:
                # 可以在 Tool 执行完毕的瞬间将引用推送给前端,让用户在等待最终回答时先看到文献来源
                yield {"type": "citation", "content": msg.artifact}
                
        # 3. 输出结束标志
        yield {"type": "done", "content": "[DONE]"}
        
    except asyncio.CancelledError:
        # 正常的中断信号,向外抛出交由底层框架接管
        raise
    except Exception as e:
        # 兜底:处理生成过程中的异常情况
        yield {"type": "error", "content": f"生成失败: {str(e)}"}

接下来,在 FastAPI 中,我们将这个生成器包装成标准的 SSE 数据帧:data: <json>\n\n。

from fastapi.responses import StreamingResponse

def _sse_encode(data: dict) -> str:
    return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"

@router.post("/chat")
async def chat_endpoint(req: ChatRequest):
    async def event_stream():
        async for item in stream_chat(req.message, req.session_id):
            yield _sse_encode(item)

    return StreamingResponse(event_stream(), media_type="text/event-stream")

前端:利用 Fetch API 与 ReadableStream 消费 SSE

在前端,为了灵活性,我们通常不使用原生的 EventSource (因为它仅支持 GET 请求,且无法自定义请求体,难以直接承载聊天输入内容),而是使用 fetch 结合 ReadableStream 来处理。

关键点:

  • 由于网络传输,SSE 事件帧可能被拆包或粘包,且 SSE 标准及部分代理服务器可能使用 Windows 风格的换行符(\r\n\r\n)。因此,我们需要在前端维护一个 buffer,兼容处理换行符并进行可靠的帧解析。
  • 每个帧可能包含多行,所以先 split("\n") 再取 data: 行
  • token 到来时更新 Markdown 渲染,实现打字机效果
  • citation 事件可以在 token 流中途到达,因此需要“边渲染边补全引用”
async function sendChatRequest(text, sessionId) {
  const res = await fetch("/api/v1/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message: text, session_id: sessionId, stream: true }),
  });

  // 严谨度提升:拦截非 200 的异常响应,避免流解析器报错
  if (!res.ok) {
    throw new Error(`HTTP Error: ${res.status}`);
  }

  const reader = res.body.getReader();
  const decoder = new TextDecoder("utf-8");
  let buffer = "";
  let aiText = "";

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    
    // 追加新数据到 buffer
    buffer += decoder.decode(value, { stream: true });

    // 兼容处理 Windows 风格的换行符 \r\n,统一替换为 \n 方便解析
    buffer = buffer.replace(/\r\n/g, "\n");

    // 依次回车换行符 '\n\n' 解析完整的 SSE 帧
    while (true) {
      const idx = buffer.indexOf("\n\n");
      if (idx === -1) break;
      
      const frame = buffer.slice(0, idx);
      buffer = buffer.slice(idx + 2);

      for (const line of frame.split("\n")) {
        if (!line.startsWith("data: ")) continue;
        const payload = JSON.parse(line.slice(6));

        // 根据协议分发事件
        if (payload.type === "token") {
          aiText += payload.content;
          renderMarkdown(aiText); // 实时渲染打字机效果
        } else if (payload.type === "citation") {
          renderCitations(payload.content); // 在气泡下方渲染参考链接
        } else if (payload.type === "error") {
          console.error("Agent Error:", payload.content);
          renderError(payload.content); // 渲染流式返回的报错提示
        }
      }
    }
  }
}

流式输出的中断机制说明

前端和 node.js 可以通过 AbortController 取消掉发出的 fetch 请求。

const abortController = new AbortController();

fetch(url, {
  ...,
  signal: abortController.signal,
})

// 需要中断请求时调用
abortController.abort();

而在服务端方面,由于项目底层依赖的是 FastAPI (Starlette / Uvicorn) 和 LangGraph,当客户端(前端或调用方)强行中断连接或刷新页面时,服务端会自动进行以下处理:

  • 框架自动捕获断连:Uvicorn 会检测到底层 TCP 连接被关闭,并在 Python 的异步上下文中(即 yield 或 async for 处)抛出一个 asyncio.CancelledError。
  • 可捕获该 CancelledError ,进行日志记录后,重新 raise 抛出。
  • 优雅地终止协程:抛出异常后, finally 块会被成功触发,打印出原有的 chat_stream_finished 耗时日志,防止线程泄漏。
  • LangGraph 的自动中断:CancelledError 会向下传递给 LangGraph 的 agent.astream(...)。若底层模型 SDK 正确支持 asyncio 取消,则底层的网络请求(比如请求大模型接口的 HTTP 请求)通常会被终止并释放资源,不会导致大模型继续在后台空跑产生不必要的 Token 消耗。
  • 存储状态 (Checkpointer):中断发生时,尚未成功提交到状态图中的状态更新通常不会被持久化,但此前已经完成并 checkpoint 的状态仍可能保留,这通常符合预期,下一次会话恢复时仍能读取到最后一个完整的 Checkpoint。
  • CancelledError 与 Exception 均直接(或间接)继承自 BaseException,但 CancelledError 不属于 Exception 体系,因此业务代码中的 except Exception as e: 不会捕获到 asyncio.CancelledError。CancelledError 不是一个通常意义上的代码“错误(Error)”,而是一种“系统控制流信号(Signal)”,应该被允许畅通无阻地向外层抛出,直到被底层异步框架(如 uvicorn/asyncio)接管并安全销毁任务。

基于 FastAPI 的异步流控制和 asyncio 的任务取消机制,对于频繁产生 SSE 数据的场景,通常无需额外轮询 request.is_disconnected()(因为在 StreamingResponse 消费生成器时,每次向客户端发送数据都会进入一次异步发送流程,这构成了天然的取消感知点);但若存在长时间无数据发送的耗时任务,主动断连检测或使用 asyncio.TaskGroup 仍可能有价值。

SSE 进阶:自定义命名事件、通道解耦与连接回收

在处理更复杂的 AI 业务场景时(例如需要流式反馈多步骤检索进度,或解决原生 EventSource 不支持 POST 请求且无法自定义请求体的限制),可以采用以下进阶技术点。

自定义命名事件协议 (Multi-Event Stream)

除默认的 data: 外,SSE 支持声明 event: 自定义事件类型。前端能直接利用浏览器原生的 EventSource.addEventListener 绑定事件处理器进行消费,无需手动拆包解析:

协议格式:

event: progress
data: {"step": "retrieval", "status": "active"}

event: delta
data: "文本片段"

前端消费:

const eventSource = new EventSource(`/api/v1/stream/${sessionId}`);
eventSource.addEventListener('progress', (e) => updateProgressBar(JSON.parse(e.data)));
eventSource.addEventListener('delta', (e) => appendToken(JSON.parse(e.data)));

双通道异步队列解耦架构

针对耗时较长、包含复杂串/并行计算的工作流,可将触发计算与数据推送拆分为双通道:

  • 触发通道(POST 请求):前端发送请求启动计算任务,服务端为该会话初始化一个 asyncio.Queue,并将任务丢入后台异步执行,接口立即返回会话 ID 以结束 POST 请求。
  • 分发通道(GET 请求):前端通过原生 EventSource 发起 GET 长连接,服务端从该会话的队列中持续读取消息并推送。
  • 优点:计算与推送协程彻底解耦,计算开销不阻塞连接,且规避了 POST 请求做流式返回的浏览器兼容性问题。

服务端异步检测断连与资源回收

在解耦架构下,如果客户端主动断开长连接(如刷新页面),服务端需及时释放内存中对应的会话队列:

  • 检测断连:在生成器循环中,利用异步 request 对象的 await request.is_disconnected() 实时判断客户端 TCP 连接状态。
  • 资源清理:一旦断连或任务被取消,在 finally 块中执行清理逻辑,将对应的 asyncio.Queue 移出全局存储字典,避免内存泄露。

接口保活机制(SSE 心跳)

在复杂的生产环境中,连接往往会经过反向代理(Nginx/Kong)、负载均衡器(ALB/ELB)或 CDN(Cloudflare)。这些网关设备通常设置了连接空闲超时(通常为 30~120 秒)。如果大模型在进行长时间的深度思考,或复杂工具调用导致长达几十秒没有新的 SSE 数据产出,连接极易被网关强制掐断。

为解决这一问题,建议在无数据发送时定期(例如每 15~30 秒)向客户端发送一个 SSE 注释帧(以冒号 : 开头)作为心跳包:

# 每隔 15 秒发送一个心跳注释帧,前端 EventSource 会自动忽略以冒号开头的行,但能保持连接活跃
yield ": ping\n\n"

接口部署:Nginx 代理配置与防缓冲方案

如果在生产环境中 FastAPI 前方架设了 Nginx,你可能会发现前端并没有体验到“流式”效果,而是等待了许久后突然收到了一整段文字。

这是因为 Nginx 默认启用了代理缓冲(Proxy Buffering),它会尝试把后端的响应攒满一定大小后再一次性发给客户端。对于 SSE 来说,这是致命的。针对此问题,有以下两种解决方案:

方案一:Nginx 配置文件级代理配置

我们需要针对流式路由单独关闭缓冲机制。需要注意的是,如果使用了进阶的双通道解耦架构,Nginx 不仅需要对 /api/v1/chat 接口关闭缓冲,也需要对流分发接口 /api/v1/stream/ 关闭缓冲:

# 通用 API 路由配置
location /api/ {
    proxy_pass http://backend_upstream;
    # ... 常规代理头配置
}

# 针对流式接口(对话及分发通道)的专属配置(使用正则匹配)
location ~ ^/api/v1/(chat|stream/) {
    proxy_pass http://backend_upstream;
    proxy_http_version 1.1;
    proxy_set_header Connection "";

    # 关键配置:关闭缓冲!
    proxy_buffering off;
    proxy_cache off;
    
    # 禁用 gzip(防止压缩引擎导致的数据滞留)
    gzip off;
    
    # 延长超时时间,避免大模型长时间生成时 Nginx 掐断连接
    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
    send_timeout 3600s;
}

方案二:后端代码级免配置防缓冲

在部署 SSE 服务时,若不想或无法修改 Nginx 配置文件,通常可以通过在后端响应头中加入 X-Accel-Buffering: no 来关闭 Nginx 的代理缓冲(需注意:如果 Nginx 配置了 proxy_ignore_headers 忽略该响应头,则此设置会失效):

# FastAPI 示例
return StreamingResponse(
    event_generator(),
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no"  # 通知 Nginx 关闭此响应的代理缓冲
    }
)

总结

统一的 JSON Payload 结构、健壮的前端缓冲解析机制,以及灵活的反向代理防缓冲部署方案(涵盖 Nginx 配置与后端代码级处理),共同构成了一个可用性极高的流式对话全链路方案。这使得我们的 AI 系统不仅能够“边想边说”,还能在对话途中动态插入多媒体卡片或结构化引用,极大地丰富了产品的交互表现力。

而在面对多步骤、长耗时的复杂 Agent 场景时,通过进一步引入双通道异步队列解耦、自定义命名事件等,则能以更低的前端开销和更健壮的架构设计,实现高度工程化的流式步骤追踪与多模态数据分发。

目录
后端:将 LangGraph 事件转化为数据流前端:利用 Fetch API 与 ReadableStream 消费 SSE流式输出的中断机制说明SSE 进阶:自定义命名事件、通道解耦与连接回收自定义命名事件协议 (Multi-Event Stream)双通道异步队列解耦架构服务端异步检测断连与资源回收接口保活机制(SSE 心跳)接口部署:Nginx 代理配置与防缓冲方案方案一:Nginx 配置文件级代理配置方案二:后端代码级免配置防缓冲总结

©2015-2026 黑白梦 粤ICP备15018165号

联系: heibaimeng@foxmail.com