发布于 2026-06-20
在 RAG 系统中,多模态结构化提取能力是重要基石,我们可以使用以 MinerU 这类智能多模态解析方案,在复杂文档的处理中还原表格与物理图像的真实轮廓,极大增强知识切片的高可信度。
对于同一份 PDF 文档,我们使用了 PyMuPDF (Fitz) 和 MinerU 两种方案,效果对比很明显。
PyMuPDF 这类传统的 PDF 提取工具,主要依赖于“字符物理坐标拼接”。它们在提取文本时存在以下硬伤:
不同于传统的流式提取,MinerU 结合了传统 PDF 解析技术与深度学习视觉模型(版面分析、OCR、公式与表格识别模型),提供了高度自动化的文档提取链路,解析效果得到了根本性的提升。
MinerU 核心主要由两大模块组成:
在 PDF 结构化提取的流程中,其核心能力主要体现在以下几个维度:
images/ 子目录,并在 Markdown 中留下清晰的  引用占位,保持图文上下文的一致性。colspan 与 rowspan)直接转换为标准的 HTML <table> 代码,使大模型能够无障碍理解表格的内在关联。$g/m^2$ 等数学公式)。MinerU 官网 (mineru.net) 目前向开发者提供了两种主要的在线解析 API:
对于高频调用、超长文档或敏感合规要求高的企业级场景,官方推荐进行私有化部署:
在生产环境中,集成 MinerU API 需要处理大文件上传、非阻塞状态轮询、防网络抖动重试及异常降级自愈等工程化挑战。
为了便于理解,在展示完整代码前,我们将客户端的核心方法 parse_pdf_to_markdown 梳理为以下 5 步执行逻辑:
POST /api/v4/file-urls/batch): 首先向 MinerU 平台提交待解析文件名,向接口申请一个专用于上传的预签名 URL 以及用于跟踪任务进度的 batch_id。PUT): 将本地 PDF 文件以二进制读取流(rb 模式)的形式,直接 PUT 到上一步获得的预签名 URL 中。为了避免存储服务器鉴权失败,该请求中必须忽略任何多余的 Content-Type 请求头。GET /api/v4/extract-results/batch/{batch_id}): 启动循环对 batch_id 状态进行监听。每次循环采用带随机抖动的指数退避(Exponential Backoff with Jitter)算法计算等待时长,防止密集并发轮询被云端 IP 频控(Rate Limit)阻断。GET stream): 当轮询状态显示为 done 时,将获得结果压缩包的公网下载链接。客户端启动流式传输(stream=True)分块下载该 ZIP 提取文件,节约内存占用。full.md 重命名为以原文件命名的本地 Markdown 文件,以供下游模块读取。以下是完整的 Python 异步集成示例代码:
import os
import asyncio
import zipfile
import shutil
import requests
import random
from typing import Dict, Any
class MinerUClient:
"""通用的高可靠 MinerU 官方 v4 API 异步客户端封装"""
def __init__(self, api_url: str, api_token: str, max_poll_time: int = 900):
self.api_url = api_url.rstrip("/")
self.api_token = api_token
self.max_poll_time = max_poll_time
self.headers = {"Authorization": f"Bearer {api_token}"}
async def parse_pdf_to_markdown(self, pdf_path: str, output_dir: str) -> str:
"""
端到端解析 PDF 并在指定目录中生成结构化 Markdown 及图片包
:param pdf_path: 待解析的本地 PDF 物理路径
:param output_dir: 结果输出目录
:return: 生成的 Markdown 绝对路径
"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"未找到目标 PDF 文件: {pdf_path}")
filename = os.path.basename(pdf_path)
zip_temp_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}_temp.zip")
os.makedirs(output_dir, exist_ok=True)
try:
# 1. 申请签名上传 URL
submit_url = f"{self.api_url}/api/v4/file-urls/batch"
payload = {
"files": [
{
"name": filename
}
],
"model_version": "vlm"
}
# 使用 asyncio.to_thread 防止同步 requests 库阻塞异步事件循环
resp_data = await asyncio.to_thread(
lambda: requests.post(submit_url, json=payload, headers=self.headers, timeout=15).json()
)
data = resp_data.get("data", {})
batch_id = data.get("batch_id")
upload_url = data.get("file_urls", [None])[0]
if not batch_id or not upload_url:
raise RuntimeError(f"获取 MinerU 上传签名 URL 失败: {resp_data}")
# 2. 上传 PDF 文件流 (注意: 签名上传 PUT 必须忽略额外的 Header)
def do_upload():
with open(pdf_path, "rb") as f:
resp = requests.put(upload_url, data=f, timeout=60)
resp.raise_for_status()
await asyncio.to_thread(do_upload)
# 3. 异步非阻塞状态轮询
poll_url = f"{self.api_url}/api/v4/extract-results/batch/{batch_id}"
success = False
full_zip_url = None
elapsed_time = 0
attempt = 0
base_interval = 2.0
max_interval = 30.0
while elapsed_time < self.max_poll_time:
# 引入指数退避 (Exponential Backoff),加随机抖动以避开并发踩踏 (Jitter, 0 到 1 秒随机偏置)
sleep_time = min(max_interval, base_interval * (2 ** attempt)) + random.random()
try:
status_resp = await asyncio.to_thread(
lambda: requests.get(poll_url, headers=self.headers, timeout=10).json()
)
status_data = status_resp.get("data", {})
extract_results = status_data.get("extract_result", [])
if extract_results:
file_res = extract_results[0]
state = file_res.get("state")
if state == "done":
full_zip_url = file_res.get("full_zip_url")
success = True
break
elif state in ["failed", "error"]:
err_msg = file_res.get("err_msg", "")
raise RuntimeError(f"MinerU 解析任务失败。原因: {err_msg}")
except Exception as e:
if "MinerU 解析任务失败" in str(e):
raise e
# 允许网络抖动时继续轮询
await asyncio.sleep(sleep_time)
elapsed_time += sleep_time
attempt += 1
continue
await asyncio.sleep(sleep_time)
elapsed_time += sleep_time
attempt += 1
if not success or not full_zip_url:
raise TimeoutError("轮询超时,未能成功获取 MinerU 提取包下载地址。")
# 4. 下载提取 ZIP 包并流式写入本地
def download_zip():
resp = requests.get(full_zip_url, stream=True, timeout=120)
resp.raise_for_status()
with open(zip_temp_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
await asyncio.to_thread(download_zip)
# 5. 解压并重命名结构化 Markdown
def extract_and_rename():
# 清理输出目录中的历史杂质,为新解析内容腾空
for item in os.listdir(output_dir):
item_path = os.path.join(output_dir, item)
if item_path != zip_temp_path:
if os.path.isdir(item_path):
shutil.rmtree(item_path)
else:
os.remove(item_path)
with zipfile.ZipFile(zip_temp_path, "r") as z:
z.extractall(output_dir)
await asyncio.to_thread(extract_and_rename)
finally:
# 安全释放临时压缩包
if os.path.exists(zip_temp_path):
os.remove(zip_temp_path)
# 定位解压出的 Markdown 文件并统一重命名为以 PDF 文件名命名的 md
target_md_name = f"{os.path.splitext(filename)[0]}.md"
target_md_path = os.path.join(output_dir, target_md_name)
full_md_path = os.path.join(output_dir, "full.md")
if os.path.exists(full_md_path):
os.rename(full_md_path, target_md_path)
else:
# 降级方案:寻找目录下的任意 markdown
md_files = [f for f in os.listdir(output_dir) if f.endswith(".md")]
if md_files:
os.rename(os.path.join(output_dir, md_files[0]), target_md_path)
else:
raise FileNotFoundError("解压包内未发现任何 Markdown 解析结果。")
return target_md_path