黑白梦黑白梦

toggle navtoggle nav
  • 文章
  • 专栏
  • 文章
  • 专栏

对话系统”流式输出“的前后端完整实现方案

发布于 2026-05-17

在构建大模型 AI 对话系统时,如果让用户一直等待模型生成完毕才展示内容(通常需要几秒到十几秒),体验会很差。因此,“打字机效果”的流式输出(Streaming)已经成为 AI 产品的标配。

同时,在引入了 Agent 和 Tool Calling 之后,我们不仅需要输出大模型生成的 Token,还需要在特定时刻输出工具执行后产生的结构化信息(如引用文档的链接)。本文将详解一套基于 Server-Sent Events (SSE) 的前后端流式全链路方案。

后端:将 LangGraph 事件转化为数据流

在使用 LangGraph 搭建的 Agent 中,对话过程包含模型思考、工具调用、模型回复等多个步骤。利用 astream 配合 stream_mode="messages",我们可以细粒度地截获图在运行时的各种消息事件。

双流输出设计:

  • 给用户呈现的消息,主要是 AI 处理后返回的消息。Tool 结果一般是 AI 读取的,不需要体现给用户。但 search_blog 这个 RAG 工具返回的 Artifact ,可用于用户侧的“引用来源”显示,需要特殊处理,额外返回给前端。
  • 通过对底层消息格式(AIMessageChunk 和 ToolMessage)的分别侦听,系统不仅能向用户流式输出文字内容,还能将工具运行后产生的结构化引用(Artifact)抛出给前端,做到了答复内容与参考文献卡片的同步展示。

我们将产生的四种关键信息包装为统一的 JSON 格式:token (文本内容)、citation (来源引用)、error (异常情况) 和 done (结束标志)。

from langchain_core.messages import AIMessageChunk, ToolMessage
import json
import asyncio

async def stream_chat(message: str, session_id: str):
    config = {"configurable": {"thread_id": session_id}}

    try:
        # 监听图流转的每一个 message 事件
        async for msg, metadata in agent.astream(
            {"messages": [HumanMessage(content=message)]},
            config=config,
            stream_mode="messages",
        ):
            # 1. 拦截模型生成的 Token
            if isinstance(msg, AIMessageChunk) and msg.content:
                yield {"type": "token", "content": msg.content}
            
            # 2. 拦截工具执行完毕后返回的 Artifact(我们在定义 tool 时预留的结构化数据)
            elif isinstance(msg, ToolMessage) and msg.artifact:
                # 可以在 Tool 执行完毕的瞬间将引用推送给前端,让用户在等待最终回答时先看到文献来源
                yield {"type": "citation", "content": msg.artifact}
                
        # 3. 输出结束标志
        yield {"type": "done", "content": "[DONE]"}
        
    except asyncio.CancelledError:
        # 正常的中断信号,向外抛出交由底层框架接管
        raise
    except Exception as e:
        # 兜底:处理生成过程中的异常情况
        yield {"type": "error", "content": f"生成失败: {str(e)}"}

接下来,在 FastAPI 中,我们将这个生成器包装成标准的 SSE 数据帧:data: <json>\n\n。

from fastapi.responses import StreamingResponse

def _sse_encode(data: dict) -> str:
    return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"

@router.post("/chat")
async def chat_endpoint(req: ChatRequest):
    async def event_stream():
        async for item in stream_chat(req.message, req.session_id):
            yield _sse_encode(item)

    return StreamingResponse(event_stream(), media_type="text/event-stream")

前端:利用 Fetch API 与 ReadableStream 消费 SSE

在前端,为了灵活性,我们通常不使用原生的 EventSource (因为它仅支持 GET 请求),而是使用 fetch 结合 ReadableStream 来处理。

关键点:

  • 由于网络传输,SSE 事件帧可能被拆包或粘包,SSE 帧以空行分隔(\n\n,我们需要维护一个 buffer 来进行可靠的行解析
  • 每个帧可能包含多行,所以先 split("\n") 再取 data: 行
  • token 到来时更新 Markdown 渲染,实现打字机效果
  • citation 事件可以在 token 流中途到达,因此需要“边渲染边补全引用”
async function sendChatRequest(text, sessionId) {
  const res = await fetch("/api/v1/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message: text, session_id: sessionId, stream: true }),
  });

  // 严谨度提升:拦截非 200 的异常响应,避免流解析器报错
  if (!res.ok) {
    throw new Error(`HTTP Error: ${res.status}`);
  }

  const reader = res.body.getReader();
  const decoder = new TextDecoder("utf-8");
  let buffer = "";
  let aiText = "";

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    
    // 追加新数据到 buffer
    buffer += decoder.decode(value, { stream: true });

    // 依次回车换行符 '\n\n' 解析完整的 SSE 帧
    while (true) {
      const idx = buffer.indexOf("\n\n");
      if (idx === -1) break;
      
      const frame = buffer.slice(0, idx);
      buffer = buffer.slice(idx + 2);

      for (const line of frame.split("\n")) {
        if (!line.startsWith("data: ")) continue;
        const payload = JSON.parse(line.slice(6));

        // 根据协议分发事件
        if (payload.type === "token") {
          aiText += payload.content;
          renderMarkdown(aiText); // 实时渲染打字机效果
        } else if (payload.type === "citation") {
          renderCitations(payload.content); // 在气泡下方渲染参考链接
        } else if (payload.type === "error") {
          console.error("Agent Error:", payload.content);
          renderError(payload.content); // 渲染流式返回的报错提示
        }
      }
    }
  }
}

流式输出的中断机制说明

前端和 node.js 可以通过 AbortController 取消掉发出的 fetch 请求。

const abortController = new AbortController();

fetch(url, {
  ...,
  signal: abortController.signal,
})

// 需要中断请求时调用
abortController.abort();

而在服务端方面,由于项目底层依赖的是 FastAPI (Starlette / Uvicorn) 和 LangGraph,当客户端(前端或调用方)强行中断连接或刷新页面时,服务端会自动进行以下处理:

  • 框架自动捕获断连:Uvicorn 会检测到底层 TCP 连接被关闭,并在 Python 的异步上下文中(即 yield 或 async for 处)抛出一个 asyncio.CancelledError。
  • 可捕获该 CancelledError ,进行日志记录后,重新 raise 抛出。
  • 优雅地终止协程:抛出异常后, finally 块会被成功触发,打印出原有的 chat_stream_finished 耗时日志,防止线程泄漏。
  • LangGraph 的自动中断:CancelledError 会向下传递给 LangGraph 的 agent.astream(...),LangChain 底层的网络请求(比如请求通义千问大模型接口的 HTTP 请求)也会随之中断,自动释放资源,不会导致大模型继续在后台空跑产生不必要的 Token 消耗。
  • 存储状态 (Checkpointer):由于中断发生在中途,LangGraph 这一步的状态(当前尚未完全回答完毕的 AI 消息)不会写入,这通常符合预期,下次进来的依然是中断前的 Checkpoint。
  • CancelledError 和 Exception 都继承自 BaseException ,业务代码中 except Exception as e: 不会捕获到 asyncio.CancelledError。 CancelledError 不是一个通常意义上的代码“错误(Error)”,而是一种“系统控制流信号(Signal)”,应该被允许畅通无阻地向外层抛出,直到被底层异步框架(如 uvicorn/asyncio)接管并安全销毁任务。

基于赖 FastAPI 的异步流控制和 asyncio 的任务取消机制机制,服务端不需要做显式的 request.is_disconnected() 轮询检查,不需要在业务逻辑中插入多余的检查点,就能实现“连接断开即停止计算”的效果。

接口部署:Nginx SSE 代理配置

如果在生产环境中 FastAPI 前方架设了 Nginx,你可能会发现前端并没有体验到“流式”效果,而是等待了许久后突然收到了一整段文字。

这是因为 Nginx 默认启用了代理缓冲(Proxy Buffering),它会尝试把后端的响应攒满一定大小后再一次性发给客户端。对于 SSE 来说,这是致命的。

我们需要针对流式路由单独关闭缓冲机制:

# 通用 API 路由配置
location /api/ {
    proxy_pass http://backend_upstream;
    # ... 常规代理头配置
}

# 针对流式聊天接口的专属配置
location = /api/v1/chat {
    proxy_pass http://backend_upstream;
    proxy_http_version 1.1;
    proxy_set_header Connection "";

    # 关键配置:关闭缓冲!
    proxy_buffering off;
    proxy_cache off;
    
    # 禁用 gzip(防止压缩引擎导致的数据滞留)
    gzip off;
    
    # 延长超时时间,避免大模型长时间生成时 Nginx 掐断连接
    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
    send_timeout 3600s;
}

总结

统一的 JSON Payload 结构、健壮的前端缓冲解析机制,以及关键的反向代理配置,共同构成了一个可用性极高的流式对话全链路方案。这使得我们的 AI 系统不仅能够“边想边说”,还能在对话途中动态插入多媒体卡片或结构化引用,极大地丰富了产品的交互表现力。

目录
后端:将 LangGraph 事件转化为数据流前端:利用 Fetch API 与 ReadableStream 消费 SSE流式输出的中断机制说明接口部署:Nginx SSE 代理配置总结

©2015-2026 黑白梦 粤ICP备15018165号

联系: heibaimeng@foxmail.com