Full-stack application for document-to-knowledge-graph pipeline: - Backend: FastAPI + LangGraph ReAct agent + DeepSeek + MinerU parsing - Frontend: React 19 + Vite + D3.js + shadcn/ui - Pipeline: MinerU parsing → LangExtract entity extraction → KG building Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
108 lines
3.0 KiB
Python
108 lines
3.0 KiB
Python
"""
|
|
Text Assembler — MinerU content_list.json → per-page plain text.
|
|
Independent implementation for the GraphRAG Studio backend.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import json
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class BlockSpan:
|
|
block_index: int
|
|
block_type: str
|
|
page_idx: int
|
|
char_start: int
|
|
char_end: int
|
|
bbox: list
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PageText:
|
|
page_idx: int
|
|
text: str
|
|
block_spans: list[BlockSpan]
|
|
|
|
|
|
def html_table_to_text(table_body: str) -> str:
|
|
soup = BeautifulSoup(table_body, "html.parser")
|
|
rows = []
|
|
for tr in soup.find_all("tr"):
|
|
cells = [td.get_text(strip=True) for td in tr.find_all(["td", "th"])]
|
|
rows.append(" | ".join(cells))
|
|
return "\n".join(rows)
|
|
|
|
|
|
def load_content_list(path: Path) -> list[dict]:
|
|
if path.is_dir():
|
|
matches = list(path.glob("*_content_list.json"))
|
|
if not matches:
|
|
matches = list(path.glob("*content_list.json"))
|
|
if not matches:
|
|
raise FileNotFoundError(f"No content_list.json found in {path}")
|
|
path = matches[0]
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def assemble_pages(content_list: list[dict]) -> list[PageText]:
|
|
pages: dict[int, list[tuple[int, dict]]] = defaultdict(list)
|
|
for i, block in enumerate(content_list):
|
|
page_idx = block.get("page_idx", 0)
|
|
pages[page_idx].append((i, block))
|
|
|
|
result = []
|
|
for page_idx in sorted(pages.keys()):
|
|
blocks = pages[page_idx]
|
|
buffer = []
|
|
spans = []
|
|
cursor = 0
|
|
|
|
for block_index, block in blocks:
|
|
block_type = block.get("type", "unknown")
|
|
bbox = block.get("bbox", [0, 0, 0, 0])
|
|
|
|
if block_type == "text":
|
|
block_text = block.get("text", "").rstrip()
|
|
elif block_type == "table":
|
|
table_body = block.get("table_body", "")
|
|
block_text = html_table_to_text(table_body) if table_body else ""
|
|
else:
|
|
continue
|
|
|
|
if not block_text:
|
|
continue
|
|
|
|
char_start = cursor
|
|
buffer.append(block_text)
|
|
cursor += len(block_text)
|
|
char_end = cursor
|
|
|
|
spans.append(BlockSpan(
|
|
block_index=block_index,
|
|
block_type=block_type,
|
|
page_idx=page_idx,
|
|
char_start=char_start,
|
|
char_end=char_end,
|
|
bbox=bbox,
|
|
))
|
|
buffer.append("\n")
|
|
cursor += 1
|
|
|
|
text = "".join(buffer).rstrip("\n")
|
|
result.append(PageText(page_idx=page_idx, text=text, block_spans=spans))
|
|
|
|
return result
|
|
|
|
|
|
def count_blocks_by_type(content_list: list[dict]) -> dict[str, int]:
|
|
counts: dict[str, int] = defaultdict(int)
|
|
for block in content_list:
|
|
counts[block.get("type", "unknown")] += 1
|
|
return dict(counts)
|