feat(control-pipeline): RecitalIngester for EU act recitals (Parser 2)
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 30s
CI / test-python-voice (push) Successful in 38s
CI / test-bqas (push) Successful in 40s

Add services/recital_ingester.py — parses EU act recitals (Erwägungsgründe)
from the eur-lex/CELLAR preamble via the id="rct_N" markers (the table layout
that defeats a naive article parser) and tags them as a SEPARATE interpretative
source: source_class=recital, authority_weight=60, use_for_primary=false, so
they rank below binding articles and surface only as interpretation context.
Reuses the Parser-1 download + helpers. Add scripts/ingest_recitals.py
(skip-by-existing, no auto re-ingest) + tests/fixture.

Tested: 4 unit tests over a synthetic rct_N fixture, ruff + mypy clean, real
CELLAR parse of DORA verified end-to-end (106 recitals, interpretative metadata).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-24 08:49:30 +02:00
parent 569f64a400
commit c258fbc3de
4 changed files with 307 additions and 0 deletions
+117
View File
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""Ingest EU act recitals via the RecitalIngester engine (Parser 2).
Downloads each act's XHTML (CELLAR), parses the recitals (Erwägungsgründe),
self-tests, and uploads them as a SEPARATE interpretative source
(source_class=recital, use_for_primary=false). Acts whose recitals already
exist are SKIPPED — no automatic re-ingest.
Usage (Mac Mini, with the RAG service reachable):
python3 control-pipeline/scripts/ingest_recitals.py --dry-run
python3 control-pipeline/scripts/ingest_recitals.py
"""
import argparse
import logging
import os
import sys
import time
from typing import TypedDict
import httpx
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from services.legal_act_ingester import RegSpec, download_act, upload_unit # noqa: E402
from services.recital_ingester import build_upload_units, parse_recitals, self_test # noqa: E402
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("ingest-recitals")
RAG_URL = os.getenv("RAG_URL", "https://localhost:8097")
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
RUN_TAG = "2026-06-eu-v1"
# The MVP acts whose recitals add interpretation context. Articles for these are
# ingested separately (Parser 1); this only adds the recitals.
SPECS = [
RegSpec(reg="CRA", celex="32024R2847", name_de="Cyber Resilience Act", version_date="2024-10-23"),
RegSpec(reg="AI Act", celex="32024R1689", name_de="Verordnung über Künstliche Intelligenz (AI Act)", version_date="2024-06-13"),
RegSpec(reg="DORA", celex="32022R2554", name_de="Digital Operational Resilience Act (DORA)", version_date="2022-12-14"),
RegSpec(reg="MaschinenVO", celex="32023R1230", name_de="Maschinenverordnung (EU) 2023/1230", version_date="2023-06-14"),
RegSpec(reg="NIS2", celex="32022L2555", name_de="NIS-2-Richtlinie", version_date="2022-12-14", legal_basis_rank="eu_directive"),
RegSpec(reg="DSGVO", celex="32016R0679", name_de="Datenschutz-Grundverordnung (DSGVO)",
collection="bp_compliance_datenschutz", version_date="2016-04-27"),
]
class IngestResult(TypedDict):
reg: str
status: str
chunks: int
def count_existing_recitals(spec: RegSpec) -> int:
with httpx.Client(timeout=60.0, verify=False) as client:
resp = client.post(
f"{QDRANT_URL}/collections/{spec.collection}/points/count",
json={"filter": {"must": [
{"key": "celex", "match": {"value": spec.celex}},
{"key": "chunk_scope", "match": {"value": "recital"}},
]}, "exact": True},
)
resp.raise_for_status()
return int(resp.json()["result"]["count"])
def ingest_one(spec: RegSpec, dry_run: bool) -> IngestResult:
if (existing := count_existing_recitals(spec)) > 0:
logger.info(" recitals already present: %d — SKIPPING (no re-ingest)", existing)
return {"reg": spec.reg, "status": "exists", "chunks": existing}
try:
html = download_act(spec.celex)
except Exception as exc: # noqa: BLE001 — log + continue with the next act
logger.error(" download FAILED: %s", exc)
return {"reg": spec.reg, "status": "download_failed", "chunks": 0}
recitals = parse_recitals(html, spec.reg)
passed, problems = self_test(recitals)
logger.info(" parsed: %d recitals", len(recitals))
if not passed:
logger.error(" GATE FAIL — %s", "; ".join(problems))
return {"reg": spec.reg, "status": "gate_failed", "chunks": 0}
units = build_upload_units(recitals, spec, RUN_TAG)
if dry_run:
logger.info(" DRY RUN — would upload %d recital units", len(units))
return {"reg": spec.reg, "status": "dry_run", "chunks": len(units)}
chunks = 0
with httpx.Client(timeout=600.0, verify=False) as client:
for unit in units:
chunks += upload_unit(client, RAG_URL, unit)
logger.info(" uploaded: %d units, %d chunks", len(units), chunks)
return {"reg": spec.reg, "status": "ok", "chunks": chunks}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
logger.info("RecitalIngester — %d acts | dry_run=%s", len(SPECS), args.dry_run)
results: list[IngestResult] = []
for i, spec in enumerate(SPECS, 1):
logger.info("\n[%d/%d] %s (CELEX %s)", i, len(SPECS), spec.name_de, spec.celex)
results.append(ingest_one(spec, args.dry_run))
if i < len(SPECS):
time.sleep(1)
for r in results:
logger.info(" %-14s %-15s chunks=%s", r["reg"], r["status"].upper(), r["chunks"])
logger.info("Total: %d", sum(r["chunks"] for r in results if r["status"] == "ok"))
if __name__ == "__main__":
main()
@@ -0,0 +1,115 @@
"""RecitalIngester (Parser 2): ingests EU act recitals (Erwägungsgründe) as a
SEPARATE, interpretative source — never a primary obligation source.
In eur-lex / CELLAR XHTML each recital sits in a preamble block
<div class="eli-subdivision" id="rct_N"> with the marker "(N)" and the text in
adjacent table cells, which is why a naive article parser finds none. This
parser keys on the id="rct_N" markers and joins the recital's prose.
Recitals are tagged source_class=recital / authority_weight=60 /
use_for_primary=false, so they rank below binding articles and surface only as
interpretation context (and trip the human-review flag if they ever top
results). Reuses the eur-lex download + helpers from legal_act_ingester
(Parser 1).
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any
from services.legal_act_ingester import RegSpec, UploadUnit, clean, refs_out
RECITAL_WEIGHT = 60
_RCT_DIV_RE = re.compile(r'id="rct_(\d+)"')
_OJ_P_RE = re.compile(r'<p[^>]*class="oj-normal"[^>]*>(.*?)</p>', re.S)
_RCT_NUM_RE = re.compile(r"^\(\d+\)$")
_MIN_RECITAL_CHARS = 20
@dataclass
class Recital:
num: str
text: str
def parse_recitals(raw: str, reg: str) -> list[Recital]:
"""Extract recitals from the preamble via the id="rct_N" markers. `reg` is
accepted for symmetry with the other parsers (recitals carry no reg in-text)."""
_ = reg
end = raw.find('class="oj-ti-art"')
if end < 0:
end = len(raw)
markers = [(m.group(1), m.start()) for m in _RCT_DIV_RE.finditer(raw, 0, end)]
recitals: list[Recital] = []
for i, (num, start) in enumerate(markers):
stop = markers[i + 1][1] if i + 1 < len(markers) else end
parts = [clean(inner) for inner in _OJ_P_RE.findall(raw[start:stop])]
body = " ".join(p for p in parts if p and not _RCT_NUM_RE.match(p))
if len(body) >= _MIN_RECITAL_CHARS:
recitals.append(Recital(num=num, text=body))
return recitals
def self_test(recitals: list[Recital]) -> tuple[bool, list[str]]:
"""Gate before upload. Every EU act has recitals → 0 is a parse failure."""
problems: list[str] = []
if not recitals:
problems.append("0 recitals parsed")
nums = [r.num for r in recitals]
if len(nums) != len(set(nums)):
problems.append("duplicate recital numbers")
return (not problems, problems)
def _recital_meta(spec: RegSpec, rc: Recital) -> dict[str, Any]:
cu = f"{spec.reg} Erwägungsgrund {rc.num}"
return {
"regulation_code": spec.reg,
"regulation_short": spec.reg,
"regulation_name_de": spec.name_de,
"citation_style": "recital",
"document_type": "legal_act",
"source_class": "recital",
"bindingness": "interpretative",
"authority_level": 60,
"authority_weight": RECITAL_WEIGHT,
"source_type": "law",
"issuer": "European Union",
"jurisdiction": "EU",
"legal_basis_rank": spec.legal_basis_rank,
"version_date": spec.version_date,
"source": "eur-lex.europa.eu",
"license": "public_eu",
"category": "recht",
"celex": spec.celex,
"use_for_primary": False, # interpretative — never a primary obligation source
"is_recital": True,
"citation_unit": cu,
"article_label": cu,
"article": f"Erwaegungsgrund-{rc.num}", # distinct → avoids point-ID collisions
"chunk_scope": "recital",
"article_type": "recital",
"references_out": refs_out(spec.reg, rc.text),
"norm_id": f"EU-{spec.reg.replace(' ', '')}-Rec{rc.num}",
}
def build_upload_units(recitals: list[Recital], spec: RegSpec, run_tag: str) -> list[UploadUnit]:
"""One UploadUnit per recital, each with its own document_version (the RAG
service derives `article` from text and would otherwise collide recitals)."""
slug = spec.reg.lower().replace(" ", "")
base = f"{run_tag}-{slug}"
units: list[UploadUnit] = []
for rc in recitals:
text = f"{spec.reg} Erwägungsgrund {rc.num}\n\n{rc.text}"
units.append(UploadUnit(
filename=f"{slug}_rec{rc.num}.txt",
text=text,
meta=_recital_meta(spec, rc),
document_version=f"{base}-rec{rc.num}",
collection=spec.collection,
))
return units
@@ -0,0 +1,19 @@
<!DOCTYPE html>
<html><body>
<p class="oj-normal">DAS EUROPÄISCHE PARLAMENT — in Erwägung nachstehender Gründe:</p>
<div class="eli-subdivision" id="rct_1">
<table><tbody><tr>
<td><p class="oj-normal">(1)</p></td>
<td><p class="oj-normal">Dieser erste Erwaegungsgrund erklaert den Hintergrund der Verordnung ausfuehrlich und verweist auf Artikel 5.</p></td>
</tr></tbody></table>
</div>
<div class="eli-subdivision" id="rct_2">
<table><tbody><tr>
<td><p class="oj-normal">(2)</p></td>
<td><p class="oj-normal">Der zweite Erwaegungsgrund ergaenzt den ersten und nennt weitere Ziele der Regelung im Detail.</p></td>
</tr></tbody></table>
</div>
<p class="oj-ti-art">Artikel 1</p>
<p class="oj-sti-art">Gegenstand</p>
<p class="oj-normal">Der eigentliche Artikeltext, der KEIN Erwaegungsgrund ist und nicht als solcher geparst werden darf.</p>
</body></html>
@@ -0,0 +1,56 @@
"""Unit tests for the RecitalIngester engine (Parser 2).
Pure parser + metadata tests against a synthetic eur-lex recital fixture (the
id="rct_N" preamble-table structure). Covers: recital extraction, exclusion of
article text, the self-test gate, and the interpretative (non-primary) metadata.
"""
import os
from services.legal_act_ingester import RegSpec
from services.recital_ingester import build_upload_units, parse_recitals, self_test
FIXTURE = os.path.join(os.path.dirname(__file__), "fixtures", "sample_eurlex_recitals.html")
SPEC = RegSpec(reg="TEST", celex="32099R0001", name_de="Testverordnung", version_date="2099-01-01")
def _raw() -> str:
with open(FIXTURE, encoding="utf-8") as fh:
return fh.read()
def test_parse_recitals_from_rct_markers():
recs = parse_recitals(_raw(), "TEST")
assert [r.num for r in recs] == ["1", "2"]
assert "Hintergrund" in recs[0].text
def test_article_text_is_not_captured_as_recital():
joined = " ".join(r.text for r in parse_recitals(_raw(), "TEST"))
assert "Artikeltext" not in joined # the article body must stay out of recitals
assert "(1)" not in joined and "(2)" not in joined # the "(N)" markers are stripped
def test_self_test_passes_and_flags_empty():
ok, _ = self_test(parse_recitals(_raw(), "TEST"))
assert ok
bad, problems = self_test([])
assert not bad and "0 recitals" in problems[0]
def test_recital_units_are_interpretative_not_primary():
units = build_upload_units(parse_recitals(_raw(), "TEST"), SPEC, "run")
assert len(units) == 2
meta = units[0].meta
assert meta["source_class"] == "recital"
assert meta["authority_weight"] == 60
assert meta["use_for_primary"] is False
assert meta["is_recital"] is True
assert meta["chunk_scope"] == "recital"
assert meta["citation_unit"] == "TEST Erwägungsgrund 1"
assert meta["article"] == "Erwaegungsgrund-1"
# per-recital document_version prevents point-ID collisions
assert units[0].document_version == "run-test-rec1"
assert units[1].document_version == "run-test-rec2"
# recital 1 cites Artikel 5 → forward edge for the citation graph
assert "Art. 5 TEST" in meta["references_out"]