发布于 2026-06-23
RAG 中,单靠向量或稀疏检索等粗筛手段,难以应对生产级高精度要求,且多路混合检索的分数存在打分尺度不统一等局限。为此,RAG 通常引入重排序精排模型作为过滤屏障。本文从介绍重排模型的基本概念与基础使用,到多源数据合并、动态截断算法以及字符预算控制等核心流水线的设计与实现。
在知识库或商品库检索中,系统需要平衡速度与精度:
qwen3-rerank)。从应用开发的视角,两者的核心指标和应用特征对比如下:
| 评估维度 | 向量检索 (Embedding) | 重排模型 (Reranker) |
|---|---|---|
| 工作方式 | 双方各自离线生成向量,在线检索时只进行向量相似度计算 | 将查询与文档拼接输入,由模型直接实时评估两者的相关性评分 |
| 计算速度 | 极快(毫秒级),支持大规模高并发检索 | 较慢(几十毫秒),计算耗时随候选文档数量线性增加 |
| 检索精度 | 一般,容易受常见词重叠影响而泛化,可能丢失具体细节 | 较高,能识别逻辑转折、关键型号、否定修饰等语义关系 |
| 分值可比性 | 跨查询可比性低,绝对相似度易随模型和语料分布漂移,难设固定阈值 | 跨查询可比性高,分数通常直接反映语义关联的置信度 |
| 适用范围 | 粗筛召回:从海量文档中快速定位一个候选子集 | 精排过滤:对初筛子集进行二次打分,以便进行硬卡点或截断 |
不同于 Embedding 模型接收单段文本并返回高维向量,Reranker 模型:
(Query, Document) 的二元组对列表。在 Python 中调用重排模型(例如阿里云的 qwen3-rerank)时,如果直接使用同步调用,在并发 RAG 应用中会阻塞 asyncio 的事件循环。因此,我们可以使用 asyncio.to_thread 将其封装为异步客户端类。
下面是基于 DashScope SDK 的通用重排序客户端异步封装逻辑:
import asyncio
from http import HTTPStatus
from typing import List
import dashscope
class DashScopeRerankerClient:
"""
通用 DashScope 重排序模型异步客户端包装器
"""
def __init__(self, api_key: str, model: str = "qwen3-rerank"):
self.api_key = api_key
self.model = model
async def rerank_documents(self, query: str, documents: List[str]) -> List[float]:
"""
异步调用 DashScope Rerank 模型评估文档相关性。
"""
if not documents:
return []
# 使用 asyncio.to_thread 将同步 SDK 调用包装进线程池异步执行
response = await asyncio.to_thread(
dashscope.TextReRank.call,
model=self.model,
query=query,
documents=documents,
api_key=self.api_key
)
if response.status_code != HTTPStatus.OK:
raise RuntimeError(f"Rerank API 调用失败: {response.message}")
# 获取打分并按输入 documents 的原始索引顺序对齐
scores = [0.0] * len(documents)
for result in response.output.results:
idx = result.index
if 0 <= idx < len(documents):
scores[idx] = float(result.relevance_score)
return scores
常见的 RAG 系统在重排序后,如果只采用固定的 Top-K 截断,可能会面临以下局限:
因此,为了解决这些痛点,在 Reranker 排序后,需要设计绝对阈值卡点与动态截断算法。
为了在实际工程中解耦并保持模块设计的通用性,我们可以将“多源数据合并”、“动态截断”和“大模型上下文预算控制”整合设计为一个通用的重排处理流程。
在实际检索链路中,经常需要把来自本地知识库和外部网络等多条渠道的数据合并到一起进行统一重排。
以下为通用的多源数据规范化合并函数:
from typing import List, Dict, Any
def merge_and_normalize_docs(local_docs: List[Dict[str, Any]], web_docs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
合并本地和网页等多源检索结果,统一映射为包含标题、内容、来源等通用字段的字典列表
"""
normalized_docs = []
# 1. 抽取并规范化本地文档
for doc in local_docs:
normalized_docs.append({
"title": doc.get("title") or doc.get("item_name") or "",
"content": doc.get("content") or "",
"source": "local",
"url": None
})
# 2. 抽取并规范化外部网页文档
for doc in web_docs:
normalized_docs.append({
"title": doc.get("title") or "",
"content": doc.get("snippet") or doc.get("content") or "",
"source": "web",
"url": doc.get("url")
})
return normalized_docs
截断的核心逻辑是:如果对相关性分数降序排列,在某一处出现了分数跌幅明显加大,那么该位置之后的所有文档大概率是低相关性的噪声。
为了防止过度截断并控制上下文长度,算法设定了四个控制参数:
RERANK_MAX_TOPK = 8:最多保留 8 篇文档,控制大模型输入长度。RERANK_MIN_TOPK = 3:在发生分数下跌时,也保留至少 3 篇文档,避免因过度拦截而丢失必要信息。RERANK_GAP_ABS = 0.35(绝对差门限):若相邻两个文档的分数差值达到 0.35 或以上,触发截断。RERANK_GAP_RATIO = 0.25(相对比例门限):若相邻两个文档的分数降幅达到 25% 或以上,触发截断,主要用于防止在低分段发生大幅度相对降幅。当相邻两个文档的分数满足“绝对差值达到 0.35”或“相对降幅达到 25%”的任一条件时,系统即判定发生了分值骤降,并在该位置进行截断,舍弃后续的所有候选文档。
重排阶段的截断阈值并没有一成不变的行业标准,通常取决于所使用的具体重排序模型与业务场景对精准/召回的权衡。以下是几项实践建议:
[0, 1] 之间,否则固定的阈值判定将失效。[0, 1] 下):5 到 8 之间。由于大模型存在“Lost in the Middle”(易忽略上下文中间部分)的缺陷,且长文本会显著增加延时与 Token 成本,塞入过多的文档反而会降低生成质量。2 到 3 之间。这属于保守策略,即使后方的分数降幅较大,也保留最相关的 2~3 篇,以防下游大模型缺少基本上下文。0.3 到 0.4 之间。在归一化的分值体系中,0.3 的差值代表了较显著的语义相关度下降;若设为 0.5 则可能偏大,导致过滤不够敏感。0.20 到 0.30 之间。这主要用于拦截在低分段发生的较大幅度相对降幅(例如从 0.45 跌到 0.32,绝对差仅 0.13,但相对降幅已达 28.8%),能够过滤不相关的长尾文档。下面是执行断崖式截断的通用纯函数逻辑:
def truncate_by_score_cliff(
ranked_docs: List[Dict[str, Any]],
min_top_k: int = 3,
max_top_k: int = 8,
gap_abs: float = 0.35,
gap_ratio: float = 0.25
) -> List[Dict[str, Any]]:
"""
根据相邻文档重排分数的降幅动态截断低相关性文档
:param ranked_docs: 已按分数降序排列且带有 'score' 键的文档列表
:return: 截断后的文档列表
"""
if not ranked_docs:
return []
n = len(ranked_docs)
upper_bound = min(max_top_k, n)
lower_bound = min(min_top_k, upper_bound)
cutoff_index = upper_bound
# 扫描从下限到上限之间的可能断崖位置
for i in range(lower_bound - 1, upper_bound - 1):
s_curr = ranked_docs[i].get("score", 0.0)
s_next = ranked_docs[i + 1].get("score", 0.0)
delta_abs = s_curr - s_next
delta_ratio = delta_abs / s_curr if s_curr > 0 else 0.0
# 双重阈值门控判断
if delta_abs >= gap_abs or delta_ratio >= gap_ratio:
cutoff_index = i + 1
break
return ranked_docs[:cutoff_index]
假设重排模型打分排序后得出以下五个文档的相关性得分:
案例 A(无显著降幅)
[0.92, 0.88, 0.85, 0.81, 0.78]案例 B(高分段分值骤降)
[0.95, 0.92, 0.88, 0.45, 0.20]cutoff_index = 3,仅保留前 3 个文档 [0.95, 0.92, 0.88],后续文档被过滤。案例 C(低分段绝对差值小但相对降幅大)
[0.85, 0.70, 0.40, 0.28, 0.26]cutoff_index = 3,仅保留前 3 个,过滤掉低相关性的长尾文档。尽管精排和断崖式截断在“篇数”维度(Top-K)上对文档进行了过滤,但在下游大模型的答案生成节点中,仍面临文本长度溢出的风险(例如某些网页内容过于冗长,或多轮历史对话累加导致超出大模型的上下文窗口上限)。
为了在字符长度层面提供安全保障,可以在生成阶段引入基于字符预算的累加截断机制:
MAX_CONTEXT_CHARS = 12000。from typing import Tuple
def format_docs_with_char_budget(documents: List[Dict[str, Any]], char_budget: int) -> Tuple[str, int]:
"""
格式化重排序后的文档,在不超过字符预算的前提下拼装为大模型上下文
:param documents: 经过重排和断崖截断后的文档列表
:param char_budget: 字符预算上限
:return: (格式化后的文本, 剩余可用字符数)
"""
formatted_lines = []
used_chars = 0
for idx, doc in enumerate(documents, start=1):
content = doc.get("content") or ""
meta_tags = [f"[{idx}]"]
for field, template in [
("source", "[source={}]"),
("url", "[url={}]"),
("title", "[title={}]"),
]:
field_value = str(doc.get(field)).strip() if doc.get(field) is not None else ""
if field_value and field_value != "None":
meta_tags.append(template.format(field_value))
relevance_score = doc.get("score")
if relevance_score is not None:
meta_tags.append(f"[score={float(relevance_score):.4f}]")
doc_entry = " ".join(meta_tags) + "\n" + content
# 如果加入此切片会导致超出预算,则舍弃此切片及后续切片
if used_chars + len(doc_entry) > char_budget:
break
formatted_lines.append(doc_entry)
used_chars += len(doc_entry) + 2 # 加上换行符开销
formatted_context = "\n\n".join(formatted_lines) if formatted_lines else "无参考文档。"
return formatted_context, char_budget - used_chars
为展示以上模块如何在实际的问答流水线中组合工作,我们使用了一个通用的旅游检索与重排作为演示用例:
import asyncio
async def main():
# 模拟从不同源初筛出来的文档
local_knowledge = [
{
"title": "北京旅行避坑指南",
"content": "故宫门票需要提前7天在线实名预约,周一闭馆。建议从午门进,神武门出。"
},
{
"title": "闲聊杂记",
"content": "今天中午和朋友去胡同里吃了炸酱面,老北京味道确实很地道。"
}
]
web_search_results = [
{
"title": "2026年北京故宫游览全攻略",
"url": "https://example.com/gugong-travel",
"snippet": "游览故宫必须提前预约门票,旺季门票紧张,务必提前关注放票时间。刷身份证入园。"
},
{
"title": "数码新品发布资讯",
"url": "https://example.com/tech-news",
"snippet": "某知名科技厂商发布全新主动降噪蓝牙耳机,音质表现优异。"
}
]
# 1. 多源文档合并与格式标准化
raw_docs = merge_and_normalize_docs(local_knowledge, web_search_results)
# 2. 定义 Query 并模拟重排模型打分 (实际应用中应调用 DashScopeRerankerClient)
query = "如何预约故宫门票以及游览路线推荐?"
# 模拟 Reranker 模型对四篇文档的打分结果:故宫相关得分高,无关内容得分低
mock_scores = [0.912, 0.183, 0.885, 0.114]
# 将打分回填并按分数降序排序
ranked_docs = [{**doc, "score": score} for doc, score in zip(raw_docs, mock_scores)]
ranked_docs.sort(key=lambda x: x["score"], reverse=True)
print("=== 重排与排序结果 ===")
for idx, d in enumerate(ranked_docs):
print(f"Top {idx+1}: [{d['source']}] {d['title']} (得分: {d['score']:.4f})")
# 3. 动态断崖式截断
# 设定:最少保留2条,最多保留8条,发生 0.35 分数差即截断
cutoff_docs = truncate_by_score_cliff(
ranked_docs,
min_top_k=2,
max_top_k=8,
gap_abs=0.35
)
print("\n=== 断崖式截断后保留结果 ===")
for idx, d in enumerate(cutoff_docs):
print(f"保留项 {idx+1}: [{d['source']}] {d['title']} (得分: {d['score']:.4f})")
# 4. 生成阶段的字符预算控制 (假定总字符预算限制为 500)
context_text, remaining_budget = format_docs_with_char_budget(cutoff_docs, char_budget=500)
print("\n=== 送入大语言模型的最终上下文 ===")
print(context_text)
print(f"\n剩余字符预算: {remaining_budget}")
if __name__ == "__main__":
asyncio.run(main())
通过这种规范化的接口设计,重排序模块降低了与特定检索框架的耦合度,具有较好的可扩展性,便于在不同 RAG 系统中复用。
重排序模型(Reranker)通过 Cross-Encoder 架构,为 RAG 检索链路提供了相关性二次过滤手段。在检索系统构建中,为避免固定 Top-K 截断带来的局限性,可以通过以下方式进行优化:
这些方法的结合使用,有助于在保证服务可用性的前提下,过滤不相关的长尾文档,为下游大模型提供高相关性的上下文。