222 lines
7.1 KiB
Python
222 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MinerU MVP — Document Parsing Pipeline (Cloud API)
|
|
|
|
Usage:
|
|
python pipeline.py <pdf_path>
|
|
|
|
Flow:
|
|
1. POST /file-urls/batch → get presigned upload URL + batch_id
|
|
2. PUT <pdf> to presigned URL
|
|
3. Poll GET /extract-results/batch/{batch_id}
|
|
4. Download & extract ZIP → output/{pdf_stem}/
|
|
5. Print summary to stdout
|
|
|
|
The backend (indexing_service.py) calls this via subprocess and reads
|
|
output/{pdf_stem}/*_content_list.json for downstream KG construction.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import io
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import zipfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
# ── Config ──────────────────────────────────────────────────────────────────────
|
|
|
|
load_dotenv(Path(__file__).parent / ".env", override=True)
|
|
|
|
API_BASE = "https://mineru.net/api/v4"
|
|
TOKEN = os.getenv("MINERU_API_TOKEN", "")
|
|
POLL_INTERVAL = 5 # seconds between status checks
|
|
MAX_WAIT = 600 # max total wait time (seconds) — matches backend timeout
|
|
|
|
if not TOKEN:
|
|
print("ERROR: MINERU_API_TOKEN not set in mineru_mvp/.env", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
HEADERS = {
|
|
"Authorization": f"Bearer {TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
OUTPUT_ROOT = Path(__file__).parent / "output"
|
|
|
|
|
|
# ── Pipeline Steps ──────────────────────────────────────────────────────────────
|
|
|
|
def _request(method: str, url: str, **kwargs: Any) -> dict:
|
|
"""Wrapper around requests that raises on HTTP errors."""
|
|
resp = requests.request(method, url, **kwargs)
|
|
resp.raise_for_status()
|
|
body: dict = resp.json()
|
|
if body.get("code") != 0:
|
|
raise RuntimeError(f"API error code={body.get('code')} msg={body.get('msg')}")
|
|
return body
|
|
|
|
|
|
def step1_get_upload_url(filename: str) -> tuple[str, str]:
|
|
"""POST /file-urls/batch → (batch_id, upload_url)."""
|
|
print(f"[1/5] Requesting presigned upload URL for: {filename}")
|
|
|
|
body = _request(
|
|
"POST",
|
|
f"{API_BASE}/file-urls/batch",
|
|
headers=HEADERS,
|
|
json={
|
|
"files": [{"name": filename}],
|
|
"enable_formula": True,
|
|
"enable_table": True,
|
|
"language": "en",
|
|
},
|
|
)
|
|
|
|
data = body["data"]
|
|
batch_id: str = data["batch_id"]
|
|
upload_url: str = data["file_urls"][0]
|
|
print(f" batch_id: {batch_id}")
|
|
return batch_id, upload_url
|
|
|
|
|
|
def step2_upload(upload_url: str, pdf_path: Path) -> None:
|
|
"""PUT file to presigned URL — MUST NOT include Content-Type header."""
|
|
print(f"[2/5] Uploading file ({pdf_path.stat().st_size / 1024:.0f} KB)...")
|
|
|
|
with open(pdf_path, "rb") as f:
|
|
resp = requests.put(upload_url, data=f) # no headers = no Content-Type
|
|
if not resp.ok:
|
|
raise RuntimeError(f"Upload failed: HTTP {resp.status_code} — {resp.text[:300]}")
|
|
print(" Upload complete.")
|
|
|
|
|
|
def step3_poll(batch_id: str) -> dict:
|
|
"""Poll GET /extract-results/batch/{batch_id} until done or failed."""
|
|
print(f"[3/5] Waiting for parsing to complete (polling every {POLL_INTERVAL}s)...")
|
|
|
|
started = time.time()
|
|
last_state = ""
|
|
|
|
while True:
|
|
elapsed = time.time() - started
|
|
if elapsed > MAX_WAIT:
|
|
raise TimeoutError(f"Parsing timed out after {MAX_WAIT}s")
|
|
|
|
body = _request(
|
|
"GET",
|
|
f"{API_BASE}/extract-results/batch/{batch_id}",
|
|
headers=HEADERS,
|
|
)
|
|
|
|
item = body["data"]["extract_result"][0]
|
|
state: str = item["state"]
|
|
|
|
if state != last_state:
|
|
print(f" state: {state}")
|
|
last_state = state
|
|
|
|
if state == "done":
|
|
zip_url: str = item["full_zip_url"]
|
|
progress = item.get("extract_progress", {})
|
|
pages = progress.get("extracted_pages", "?")
|
|
total = progress.get("total_pages", "?")
|
|
print(f" Parsing done — {pages}/{total} pages extracted.")
|
|
return {"zip_url": zip_url, "pages": pages, "total": total}
|
|
|
|
if state == "failed":
|
|
err = item.get("err_msg", "unknown error")
|
|
raise RuntimeError(f"Parsing failed: {err}")
|
|
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
|
|
def step4_download_and_extract(zip_url: str, output_dir: Path) -> list[str]:
|
|
"""Download ZIP and extract to output/ directory."""
|
|
print(f"[4/5] Downloading & extracting results...")
|
|
|
|
resp = requests.get(zip_url)
|
|
resp.raise_for_status()
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
|
zf.extractall(str(output_dir))
|
|
|
|
files = sorted(
|
|
f for f in os.listdir(output_dir) if not f.startswith(".")
|
|
)
|
|
for f in files:
|
|
fpath = output_dir / f
|
|
if fpath.is_file():
|
|
print(f" {f} ({fpath.stat().st_size / 1024:.0f} KB)")
|
|
|
|
return files
|
|
|
|
|
|
def step5_summary(output_dir: Path) -> None:
|
|
"""Print summary of parsed output."""
|
|
print(f"[5/5] Summary")
|
|
|
|
# Find content_list.json
|
|
matches = list(output_dir.glob("*_content_list.json"))
|
|
if not matches:
|
|
print(" WARNING: No content_list.json found!", file=sys.stderr)
|
|
return
|
|
|
|
content_list_path = matches[0]
|
|
with open(content_list_path, "r", encoding="utf-8") as f:
|
|
content_list = json.load(f)
|
|
|
|
# Count block types
|
|
type_counts: dict[str, int] = {}
|
|
pages: set[int] = set()
|
|
for block in content_list:
|
|
t = block.get("type", "unknown")
|
|
type_counts[t] = type_counts.get(t, 0) + 1
|
|
pages.add(block.get("page_idx", 0))
|
|
|
|
print(f" File: {content_list_path.name}")
|
|
print(f" Blocks: {len(content_list)}")
|
|
print(f" Pages: {len(pages)}")
|
|
for t, c in sorted(type_counts.items()):
|
|
print(f" {t}: {c}")
|
|
|
|
|
|
# ── Main ────────────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="MinerU document parsing pipeline")
|
|
parser.add_argument("pdf_path", help="Path to the PDF file to parse")
|
|
args = parser.parse_args()
|
|
|
|
pdf_path = Path(args.pdf_path).resolve()
|
|
if not pdf_path.exists():
|
|
print(f"ERROR: File not found: {pdf_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
stem = pdf_path.stem
|
|
output_dir = OUTPUT_ROOT / stem
|
|
|
|
try:
|
|
batch_id, upload_url = step1_get_upload_url(pdf_path.name)
|
|
step2_upload(upload_url, pdf_path)
|
|
result = step3_poll(batch_id)
|
|
step4_download_and_extract(result["zip_url"], output_dir)
|
|
step5_summary(output_dir)
|
|
print(f"\nDone. Output: {output_dir}")
|
|
except Exception as exc:
|
|
print(f"\nERROR: {exc}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|