发布于 2026-05-16
在构建大模型应用时,RAG 检索增强生成是引入私有知识的标配方案。本次实战以黑白梦博客的数据为基础,从数据处理、向量库同步、多视角检索等维度,探索如何从零构建一个 RAG 检索系统。
RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索技术和大语言模型 (LLM) 的架构。它的核心思想是:在让大模型回答问题之前,先从外部知识库中检索出与问题相关的上下文片段,然后将这些片段连同用户的问题一起"喂"给大模型,让大模型"开卷考试",基于检索到的事实来生成答案。
大模型虽然聪明,但有几个问题:
RAG 完美解决了这些问题。它的工作流程通常分为两大阶段:
本项目使用了以下技术栈来完成完整的 RAG 流程:
qwen3.5-flashtext-embedding-v4下面是本项目 RAG 系统的整体数据流向,分为离线数据入库和在线检索生成两条主线:
┌─────────────────────── 离线:数据入库 (Ingestion) ───────────────────────┐
│ │
│ MySQL 数据清洗 两阶段切分 层级前缀注入 Embedding │
│ (博客源) ──→ (HTML/零宽/ ──→ (标题切分 + ──→ (h1/h2/h3 ──→ (text- │
│ Front Matter) 递归细切) 拼接) embedding │
│ -v4) │
└──────────────────────────────────────────────────────┬──────────────────┘
│
▼
┌───────────┐
│ Milvus │
│ Lite │
│ 向量数据库 │
└─────┬─────┘
│
┌──────────────────── 在线:检索与生成 (Retrieval & Generation) ──────────┐
│ │ │
│ 用户提问 ──→ Agent ──→ 多查询生成 ──→ 并发向量检索 ◄─┘ │
│ 判断意图 (LLM 扩展 (abatch) │
│ 3 个变体) │ │
│ ▼ │
│ 流式返回 ◄── LLM 生成回答 ◄── 去重 & 格式化 │
│ (SSE/引用) (qwen3.5-flash) (上下文 + 引用) │
└─────────────────────────────────────────────────────────────────────────┘
数据质量直接决定了 RAG 系统的效果上限。如果把整篇文章直接丢给大模型,不仅会消耗大量 Token,还容易导致“大海捞针”问题(Lost in the Middle)。因此,我们需要先将博客数据从数据库中提取出来,再切分成语义相对完整的片段(Chunks)。
黑白梦博客的原始数据存储在 MySQL 数据库中。为了方便在 Python 中进行数据处理,我们使用了 SQLAlchemy 作为 ORM 框架。这不仅能让我们以面向对象的方式操作数据库,还能通过模型映射轻松获取博客内容的元数据(如标题、更新时间等)。
from sqlalchemy import Column, Integer, String, Text, DateTime
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Diary(Base):
__tablename__ = "diary"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
update_time = Column(DateTime, nullable=False)
status = Column(Integer, default=1)
通过定义数据模型,我们可以非常方便地筛选出已发布的文章,并利用这些结构化字段为后续的切分和增量同步提供支持。
以黑白梦博客这种技术博客类结构化文档为例,最自然的语义边界是标题 (Headers)。我们通常采用“两阶段切分”策略:
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
def split_markdown_content(content: str):
# 第一阶段:按 Markdown 标题切分
headers_to_split_on = [("#", "h1"), ("##", "h2"), ("###", "h3"), ("####", "h4")]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
header_splits = markdown_splitter.split_text(content)
# 第二阶段:长文本递归细切
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
return text_splitter.split_documents(header_splits)
切分后的 Document 对象结构如下(JSON 示意):
[
{
"page_content": "Python 是一门动态语言...",
"metadata": {
"h1": "学习笔记",
"h2": "Python 基础"
}
}
]
这种结构的好处是,即使是正文片段,也携带了“上下文信息”(它属于哪个大标题下的哪个小节),这对后续的检索准确度提升非常有帮助。
虽然切分后的 Chunk 在 metadata 中保留了标题层级,但当 Chunk 被独立送去 Embedding 时,模型只能看到 page_content——也就是正文片段本身。为了让每个片段在向量空间中也能体现其所属章节的语义信息,我们在数据入库前,将层级标题作为前缀重新拼接到文本中:
# 在同步处理 (Ingestion) 时,将层级信息注回文本
for chunk in chunks:
prefix = ""
if "h1" in chunk.metadata: prefix += f"# {chunk.metadata['h1']}\n"
if "h2" in chunk.metadata: prefix += f"## {chunk.metadata['h2']}\n"
if "h3" in chunk.metadata: prefix += f"### {chunk.metadata['h3']}\n"
final_text = prefix + chunk.page_content
# 最终的 final_text 交给模型 Embedding
这样做的效果是:即使一个片段的正文只是"使用 pip install 安装依赖",经过前缀注入后变成了"# 部署指南\n## 环境准备\n使用 pip install 安装依赖",嵌入模型能更准确地理解这段文字的上下文语境,从而在检索时显著提升匹配精度。
高质量的 RAG 离不开高质量的数据。在向量化之前,进行轻量级的数据清洗可以显著提升检索质量,减少无意义的嵌入成本:
Front Matter(YAML 元数据块)。import re
import html
import unicodedata
def clean_text(text: str) -> str:
"""文本清洗主入口:过滤零宽字符、控制字符与残留标签"""
if not text:
return ""
# Unicode 归一化
t = unicodedata.normalize("NFKC", text)
# 反转义 HTML 实体 (如 < 变为 <)
t = html.unescape(t)
# 移除零宽字符和控制字符
for z in ["\u200b", "\u200c", "\u200d", "\ufeff"]:
t = t.replace(z, "")
t = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", "", t)
# 移除残留 HTML 标签
t = re.sub(r"<[^>]+>", " ", t)
# 压缩连续换行和空格
t = re.sub(r"\n{3,}", "\n\n", t)
t = re.sub(r"[ \t]{2,}", " ", t)
return t.strip()
def should_keep(text: str, min_chars: int = 20) -> bool:
"""判断片段是否值得索引:过滤毫无意义的极短片段"""
if not text or len(text.strip()) < min_chars:
return False
# 必须包含字母、数字或中文字符
return re.search(r"[A-Za-z0-9\u4e00-\u9fff]", text) is not None
知识库是不断更新的,如果每次都清空重建索引,效率极低且会浪费大量 API 额度。我们需要一套基于状态的增量同步机制。
在这里,我们选择了轻量级的 Milvus Lite 作为向量数据库,它作为 Python 库运行,非常适合中小型项目快速启动。
在存入数据库前,我们需要将文本片段通过 Embedding 模型转换为机器可理解的向量。在项目中,我们使用了 DashScope (阿里千问) 的 text-embedding-v4 文本向量模型,并通过 OpenAI 兼容接口进行调用。
这里有几个关键的工程细节:
text-embedding-v4 的默认输出维度为 1024。这个数值必须与 Milvus Collection Schema 中的 dim 参数严格一致,否则插入数据时会报错。DIMENSION = 1024 # 匹配 text-embedding-v4
schema = client.create_schema(
auto_id=True,
enable_dynamic_field=True,
description="Blog content chunks for RAG"
)
# langchain-milvus 默认使用 "pk" 作为主键,"vector" 作为向量字段,"text" 作为文本字段
schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="doc_id", datatype=DataType.INT64) # 关联 MySQL ID
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
批处理限制: 虽然 LangChain 提供了方便的 OpenAIEmbeddings 封装,但像 DashScope 这样的 API 对 Batch Size 有严格限制(例如不超过 10)。我们需要对底层方法的调用参数进行限制,以避免由于单次提交片段过多而导致的 400 错误。 https://help.aliyun.com/zh/model-studio/text-embedding-batch-api
禁用上下文长度检查: LangChain 的 OpenAIEmbeddings 默认会使用 tiktoken 对文本进行分词以检查是否超出上下文长度,但第三方兼容 API(如 DashScope)的 tokenizer 与 OpenAI 并不一致。设置 check_embedding_ctx_length=False 可以跳过这一检查,避免潜在的分词不匹配问题。
from langchain_openai import OpenAIEmbeddings
class DashScopeTextEmbeddings(OpenAIEmbeddings):
def __init__(self, model="text-embedding-v4", **kwargs):
super().__init__(
model=model,
openai_api_key=settings.DASHSCOPE_API_KEY,
openai_api_base=settings.DASHSCOPE_BASE_URL,
check_embedding_ctx_length=False, # 禁用 tiktoken 上下文长度检查
**kwargs
)
def embed_documents(self, texts: list[str], chunk_size: int = 10) -> list[list[float]]:
"""
重写 embed_documents 以强制指定单批次提交的数量 (chunk_size=10),
适应 DashScope API 的并发限制。
"""
return super().embed_documents(texts, chunk_size=chunk_size)
有了可用的 Embedding 封装后,我们可以利用 LangChain 官方的 Milvus 集成,快速建立指定维度向量和包含关联数据的 Collection,还可以自定义索引来提升检索性能:
from langchain_milvus import Milvus
def get_vector_store():
return Milvus(
embedding_function=DashScopeTextEmbeddings(),
collection_name="blog_chunks",
connection_args={"uri": "./data/milvus_demo.db"}, # 存放为本地 SQLite-like 的文件
auto_id=True,
drop_old=False,
# Milvus 官方库支持自定义索引参数提升召回速度
index_params={
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 128}
}
)
有了 Embedding 模型和 Milvus 向量库后,我们就可以通过 LangChain 的标准化 API 进行数据的存取操作。以下是一个最基础的读写示例,帮助理解 RAG 底层的工作方式:
from langchain_core.documents import Document
# 获取向量库实例
store = get_vector_store()
# 1. 写入数据 (add_documents)
# LangChain 会自动调用 Embedding 模型将文本转为向量,再存入 Milvus
docs = [
Document(page_content="RAG 是检索增强生成", metadata={"doc_id": 1, "title": "RAG 简介"}),
Document(page_content="LangChain 提供了极简的接口", metadata={"doc_id": 2, "title": "开发指南"})
]
store.add_documents(docs)
# 2. 检索数据 (as_retriever)
# 获取一个基础检索器,配置为返回相关度最高的 2 条结果
retriever = store.as_retriever(search_kwargs={"k": 2})
# 传入问题,检索相关文档
results = retriever.invoke("RAG 的全称是什么?")
print(results[0].page_content) # 输出: RAG 是检索增强生成
理解了这个基础流程后,在生产环境中我们会在存入和检索的各个环节进行更精细的控制(如前面的数据清洗、切分策略,以及后面的多查询检索等),以保证回答质量和系统性能。
增量同步的核心在于比对“源数据库活跃文档”与“本地已索引文档”的差异 (Diff)。我们可以定义以下三种状态:
update_time 晚于上次同步时间。操作是先删除旧向量,再插入新向量。以下为核心 Diff 算法的简化示意:
from datetime import datetime
from typing import Tuple, List, Dict
async def calculate_diff(active_docs: list, state: Dict) -> Tuple[List[int], List[int], List[int]]:
"""
计算数据库与本地索引的状态差异
返回: (新增的 IDs, 修改的 IDs, 删除的 IDs)
"""
last_sync_time_str = state.get("last_sync_time", "1970-01-01 00:00:00")
last_sync_time = datetime.strptime(last_sync_time_str, "%Y-%m-%d %H:%M:%S")
indexed_ids = set(state.get("indexed_doc_ids", []))
db_doc_map = {doc.id: doc.update_time for doc in active_docs}
db_ids = set(db_doc_map.keys())
# 新增: 在数据库中但不在已索引列表中
added_ids = list(db_ids - indexed_ids)
# 修改: 都在,但数据库的更新时间晚于上次同步时间
modified_ids = [
doc_id for doc_id in (db_ids & indexed_ids)
if db_doc_map[doc_id] > last_sync_time
]
# 删除: 在已索引列表中但不在数据库活跃列表中
deleted_ids = list(indexed_ids - db_ids)
return added_ids, modified_ids, deleted_ids
结合 Milvus 的 expr="doc_id in [...]" 高效删除语法,这套机制能确保向量库与关系型数据库实时保持一致。
单一的提问往往难以直接匹配到向量空间中最相关的文档。为了克服单一向量空间的局限性,我们可以引入多查询(Multi-Query)策略:利用 LLM 将原始问题扩展为多个视角的查询词,然后并发检索。
async def search_relevant_docs(query: str, k: int = 5):
# 1. 生成多个查询变体
prompt_text = f"请根据用户的请求生成3个不同视角的搜索查询...原始问题: {query}"
try:
# 使用 config={"callbacks": []} 阻止继承流式回调,以免中间生成的查询被截获输出给前端
response = await llm.ainvoke(prompt_text, config={"callbacks": []})
content = response.content if hasattr(response, 'content') else str(response)
queries = [q.strip() for q in content.split("\n") if q.strip()]
if not queries: queries = [query]
except Exception:
queries = [query] # 容错:生成失败则降级使用原问题
# 2. 并发批量检索
try:
documents_list = await base_retriever.abatch(queries)
except Exception:
# 容错:并发报错则降级为串行
documents_list = [await base_retriever.ainvoke(q) for q in queries]
# 3. 展平并去重
flattened = [doc for sublist in documents_list for doc in sublist]
unique_docs = []
seen_contents = set()
for doc in flattened:
# 按 page_content 去重:保留同一篇文章的不同高相关度 chunk,只去掉完全重复的内容
content = doc.page_content
if content not in seen_contents:
seen_contents.add(content)
unique_docs.append(doc)
return unique_docs
检索回来的 Document 列表是结构化的 Python 对象。为了让 LLM 能够理解这些内容,并让前端能够展示原文链接,我们需要将其格式化。在格式化时,我们会提取文档的元数据,主动拼接博客的实际 URL。
def format_docs(docs: list[Document]) -> str:
"""
将检索到的文档片段格式化为 LLM 可理解的上下文字符串。
"""
formatted_docs = []
for i, doc in enumerate(docs):
title = doc.metadata.get('title', '未知标题')
doc_id = doc.metadata.get('doc_id')
link = f"https://heibaimeng.com/post/{doc_id}" if doc_id else "链接不可用"
content = doc.page_content
formatted_doc = f"[{i+1}] 标题: {title}\n链接: {link}\n内容: {content}"
formatted_docs.append(formatted_doc)
return "\n---\n".join(formatted_docs)
这样处理后,LLM 不仅能读到文本,还能在其生成的回答中带上 [1]、[2] 等引用标记,从而让最终的回答具备极强的可追溯性和可信度。
到目前为止,我们完成了 RAG 中 R (Retrieval) 的全部链路:数据清洗、切分、入库、检索、格式化。但 RAG 的另一半——G (Generation)——同样关键:如何将检索到的上下文交给大模型,并约束它基于事实回答?
系统提示词 (System Prompt) 是约束大模型行为的核心手段。在本项目中,我们为博客助手设计了一套严格的提示词规则:
def get_system_prompt():
return (
f"你是一个专门为个人技术博客【{settings.BLOG_NAME}】设计的对话式学习助手。\n\n"
"### 核心职责:\n"
"1. **知识问答**:基于博客内容回答技术问题、提供学习建议或解读文章。\n"
"2. **信息检索**:使用工具搜索文章、获取统计数据或查看文章全文。\n"
"3. **身份约束**:只能回答与本博客及其相关技术领域有关的问题。\n\n"
"### 工具使用规则:\n"
"- **必须调用工具**:在回答关于博客文章内容的问题前,必须先调用 search_blog。严禁凭空想象博客内容。\n"
"- **诚实原则**:如果工具没有返回相关信息,明确说明没有找到,不要编造。\n"
"- **链接引用**:引用文章时必须附带链接。"
)
这套设计的核心思想是强制工具调用:LLM 不允许直接回答关于博客内容的问题,必须先通过工具检索到事实后再作答。这从根本上遏制了大模型"编造博客内容"的幻觉风险。
在传统的 RAG 实现中,检索和生成通常是一条固定的链:用户问题 → 检索 → 拼装 Prompt → 生成。但在本项目中,我们利用 LangChain Agent 的工具调用机制,让 LLM 自主决定何时以及如何检索,实现了更灵活的 RAG 闭环。
核心在于 search_blog 工具的 response_format="content_and_artifact" 设计:
from langchain_core.tools import tool
@tool(response_format="content_and_artifact")
async def search_blog(query: str) -> tuple[str, list[dict]]:
"""在知识库中搜索博客文章、技术笔记和实现细节。"""
# 调用前面实现的多查询检索
docs = await search_relevant_docs(query)
if not docs:
return "未找到相关文档。", []
# 返回元组:(给 LLM 看的格式化文本, 给前端展示的结构化引用)
content = format_docs(docs)
citations = [
{
"title": doc.metadata.get('title', ''),
"url": f"https://heibaimeng.com/post/{doc.metadata.get('doc_id')}",
"snippet": doc.page_content
}
for doc in docs
]
return content, citations
这个设计有两层巧思:
content(第一个返回值):格式化后的文档文本,会自动注入到 LLM 的上下文中,LLM 基于这些内容生成回答。artifact(第二个返回值):结构化的引用数据,不会进入 LLM 的上下文,而是通过流式输出直接传递给前端,用于渲染引用卡片。这样一来,Agent 的工作流程就形成了完整闭环:
search_blog 工具 → 触发多查询检索 → 获取格式化上下文artifact) 通过独立通道传递给前端from langchain import agents
# 将工具注册到 Agent 中,LLM 会根据系统提示词自主决定何时调用
agent = agents.create_agent(
llm,
tools=[search_blog, get_blog_stats, get_article_content],
system_prompt=get_system_prompt(),
)
与固定链路的 RAG 不同,Agent 模式下 LLM 拥有自主判断的能力:对于需要检索的问题它会主动调用工具,对于简单的打招呼则直接回复,不会进行不必要的检索消耗。
在基础 RAG 跑通后,为了进一步提升系统在复杂场景下的准确率,以下几个方向值得重点探索:
目前本项目使用的是基于向量的稠密检索 (Dense Retrieval),强在语义匹配,但在应对专有名词、特定版本号、代码片段等精确匹配时效果较差。稀疏检索 (Sparse Retrieval,如 BM25、Elasticsearch) 则擅长精确关键词匹配。
将向量检索和 BM25 结合,通过 LangChain 的 EnsembleRetriever,两路并发检索,然后通过 Reciprocal Rank Fusion (RRF) 算法对两路结果进行加权重新打分合并,能让 RAG 兼具"懂意思"和"找得准"的能力。
向量数据库使用余弦相似度粗筛出来的相关性(Recall 阶段),可能在细粒度语义层面的排序并不完美。
在检索出 Top K(比如 20 个)片段后,再使用专门的 Reranker 模型(如 BGE-Reranker 或 Cohere Rerank)对这些片段和问题的相关性进行二次精确打分排序,最后只将最相关的 Top N(比如 5 个)喂给大模型。这极大缓解了大模型在处理超长上下文时的**"迷失在中间 (Lost in the Middle)"**问题。
在多轮对话中,用户的提问往往带有指代("它"、"这个 API")或者省略。直接拿这种提问去向量库检索,效果极差。
在进入检索之前,加一步 LLM 节点,根据之前的对话历史,将用户的当前问题重写为一个独立、完整、不带歧义的查询句子,然后再交给向量库。这样可以确保每次检索都是基于一个语义完整的查询进行的,显著提升多轮对话场景下的检索准确率。
为了找得准,我们希望分块尽量小;但为了给大模型提供足够的上下文,我们又希望分块尽量大——这是个矛盾。
解决方案是:在构建索引时,把文章切得很细(如单句话),建立向量索引;但在数据库里,把这句细粒度的话与其周围的完整段落或父章节绑定。检索时依靠细粒度的句子精准命中,但在返给大模型时,自动替换为它周围更大范围的父文档文本,从而兼顾了"高精度检索"与"丰富上下文"。
本文以黑白梦博客的真实数据为基础,完整走过了 RAG 系统从零到一的构建过程。回顾全文,我们覆盖了以下核心模块:
content_and_artifact),实现了检索→注入→生成的完整闭环,从机制上遏制了幻觉风险。这篇文章更多是一次技术探索的阶段性总结。RAG 是一个持续迭代的过程,从切分粒度的微调,到检索策略的升级,再到评估体系的建立,每一环都值得反复打磨。把这些实践经验沉淀下来,也是为日后构建更成熟的 RAG 系统积累一份可回溯的参考。