Files
breakpilot-core/control-pipeline/scripts/ingest_recitals.py
T
Benjamin Admin c258fbc3de
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 30s
CI / test-python-voice (push) Successful in 38s
CI / test-bqas (push) Successful in 40s
feat(control-pipeline): RecitalIngester for EU act recitals (Parser 2)
Add services/recital_ingester.py — parses EU act recitals (Erwägungsgründe)
from the eur-lex/CELLAR preamble via the id="rct_N" markers (the table layout
that defeats a naive article parser) and tags them as a SEPARATE interpretative
source: source_class=recital, authority_weight=60, use_for_primary=false, so
they rank below binding articles and surface only as interpretation context.
Reuses the Parser-1 download + helpers. Add scripts/ingest_recitals.py
(skip-by-existing, no auto re-ingest) + tests/fixture.

Tested: 4 unit tests over a synthetic rct_N fixture, ruff + mypy clean, real
CELLAR parse of DORA verified end-to-end (106 recitals, interpretative metadata).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-24 08:49:30 +02:00

118 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""Ingest EU act recitals via the RecitalIngester engine (Parser 2).
Downloads each act's XHTML (CELLAR), parses the recitals (Erwägungsgründe),
self-tests, and uploads them as a SEPARATE interpretative source
(source_class=recital, use_for_primary=false). Acts whose recitals already
exist are SKIPPED — no automatic re-ingest.
Usage (Mac Mini, with the RAG service reachable):
python3 control-pipeline/scripts/ingest_recitals.py --dry-run
python3 control-pipeline/scripts/ingest_recitals.py
"""
import argparse
import logging
import os
import sys
import time
from typing import TypedDict
import httpx
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from services.legal_act_ingester import RegSpec, download_act, upload_unit # noqa: E402
from services.recital_ingester import build_upload_units, parse_recitals, self_test # noqa: E402
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("ingest-recitals")
RAG_URL = os.getenv("RAG_URL", "https://localhost:8097")
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
RUN_TAG = "2026-06-eu-v1"
# The MVP acts whose recitals add interpretation context. Articles for these are
# ingested separately (Parser 1); this only adds the recitals.
SPECS = [
RegSpec(reg="CRA", celex="32024R2847", name_de="Cyber Resilience Act", version_date="2024-10-23"),
RegSpec(reg="AI Act", celex="32024R1689", name_de="Verordnung über Künstliche Intelligenz (AI Act)", version_date="2024-06-13"),
RegSpec(reg="DORA", celex="32022R2554", name_de="Digital Operational Resilience Act (DORA)", version_date="2022-12-14"),
RegSpec(reg="MaschinenVO", celex="32023R1230", name_de="Maschinenverordnung (EU) 2023/1230", version_date="2023-06-14"),
RegSpec(reg="NIS2", celex="32022L2555", name_de="NIS-2-Richtlinie", version_date="2022-12-14", legal_basis_rank="eu_directive"),
RegSpec(reg="DSGVO", celex="32016R0679", name_de="Datenschutz-Grundverordnung (DSGVO)",
collection="bp_compliance_datenschutz", version_date="2016-04-27"),
]
class IngestResult(TypedDict):
reg: str
status: str
chunks: int
def count_existing_recitals(spec: RegSpec) -> int:
with httpx.Client(timeout=60.0, verify=False) as client:
resp = client.post(
f"{QDRANT_URL}/collections/{spec.collection}/points/count",
json={"filter": {"must": [
{"key": "celex", "match": {"value": spec.celex}},
{"key": "chunk_scope", "match": {"value": "recital"}},
]}, "exact": True},
)
resp.raise_for_status()
return int(resp.json()["result"]["count"])
def ingest_one(spec: RegSpec, dry_run: bool) -> IngestResult:
if (existing := count_existing_recitals(spec)) > 0:
logger.info(" recitals already present: %d — SKIPPING (no re-ingest)", existing)
return {"reg": spec.reg, "status": "exists", "chunks": existing}
try:
html = download_act(spec.celex)
except Exception as exc: # noqa: BLE001 — log + continue with the next act
logger.error(" download FAILED: %s", exc)
return {"reg": spec.reg, "status": "download_failed", "chunks": 0}
recitals = parse_recitals(html, spec.reg)
passed, problems = self_test(recitals)
logger.info(" parsed: %d recitals", len(recitals))
if not passed:
logger.error(" GATE FAIL — %s", "; ".join(problems))
return {"reg": spec.reg, "status": "gate_failed", "chunks": 0}
units = build_upload_units(recitals, spec, RUN_TAG)
if dry_run:
logger.info(" DRY RUN — would upload %d recital units", len(units))
return {"reg": spec.reg, "status": "dry_run", "chunks": len(units)}
chunks = 0
with httpx.Client(timeout=600.0, verify=False) as client:
for unit in units:
chunks += upload_unit(client, RAG_URL, unit)
logger.info(" uploaded: %d units, %d chunks", len(units), chunks)
return {"reg": spec.reg, "status": "ok", "chunks": chunks}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
logger.info("RecitalIngester — %d acts | dry_run=%s", len(SPECS), args.dry_run)
results: list[IngestResult] = []
for i, spec in enumerate(SPECS, 1):
logger.info("\n[%d/%d] %s (CELEX %s)", i, len(SPECS), spec.name_de, spec.celex)
results.append(ingest_one(spec, args.dry_run))
if i < len(SPECS):
time.sleep(1)
for r in results:
logger.info(" %-14s %-15s chunks=%s", r["reg"], r["status"].upper(), r["chunks"])
logger.info("Total: %d", sum(r["chunks"] for r in results if r["status"] == "ok"))
if __name__ == "__main__":
main()