From c258fbc3de7f02f354136730fce3e867f2500737 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 24 Jun 2026 08:49:30 +0200 Subject: [PATCH] feat(control-pipeline): RecitalIngester for EU act recitals (Parser 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add services/recital_ingester.py — parses EU act recitals (Erwägungsgründe) from the eur-lex/CELLAR preamble via the id="rct_N" markers (the table layout that defeats a naive article parser) and tags them as a SEPARATE interpretative source: source_class=recital, authority_weight=60, use_for_primary=false, so they rank below binding articles and surface only as interpretation context. Reuses the Parser-1 download + helpers. Add scripts/ingest_recitals.py (skip-by-existing, no auto re-ingest) + tests/fixture. Tested: 4 unit tests over a synthetic rct_N fixture, ruff + mypy clean, real CELLAR parse of DORA verified end-to-end (106 recitals, interpretative metadata). Co-Authored-By: Claude Opus 4.7 --- control-pipeline/scripts/ingest_recitals.py | 117 ++++++++++++++++++ control-pipeline/services/recital_ingester.py | 115 +++++++++++++++++ .../fixtures/sample_eurlex_recitals.html | 19 +++ .../tests/test_recital_ingester.py | 56 +++++++++ 4 files changed, 307 insertions(+) create mode 100644 control-pipeline/scripts/ingest_recitals.py create mode 100644 control-pipeline/services/recital_ingester.py create mode 100644 control-pipeline/tests/fixtures/sample_eurlex_recitals.html create mode 100644 control-pipeline/tests/test_recital_ingester.py diff --git a/control-pipeline/scripts/ingest_recitals.py b/control-pipeline/scripts/ingest_recitals.py new file mode 100644 index 0000000..8025c87 --- /dev/null +++ b/control-pipeline/scripts/ingest_recitals.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +"""Ingest EU act recitals via the RecitalIngester engine (Parser 2). + +Downloads each act's XHTML (CELLAR), parses the recitals (Erwägungsgründe), +self-tests, and uploads them as a SEPARATE interpretative source +(source_class=recital, use_for_primary=false). Acts whose recitals already +exist are SKIPPED — no automatic re-ingest. + +Usage (Mac Mini, with the RAG service reachable): + python3 control-pipeline/scripts/ingest_recitals.py --dry-run + python3 control-pipeline/scripts/ingest_recitals.py +""" + +import argparse +import logging +import os +import sys +import time +from typing import TypedDict + +import httpx + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from services.legal_act_ingester import RegSpec, download_act, upload_unit # noqa: E402 +from services.recital_ingester import build_upload_units, parse_recitals, self_test # noqa: E402 + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("ingest-recitals") + +RAG_URL = os.getenv("RAG_URL", "https://localhost:8097") +QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") +RUN_TAG = "2026-06-eu-v1" + +# The MVP acts whose recitals add interpretation context. Articles for these are +# ingested separately (Parser 1); this only adds the recitals. +SPECS = [ + RegSpec(reg="CRA", celex="32024R2847", name_de="Cyber Resilience Act", version_date="2024-10-23"), + RegSpec(reg="AI Act", celex="32024R1689", name_de="Verordnung über Künstliche Intelligenz (AI Act)", version_date="2024-06-13"), + RegSpec(reg="DORA", celex="32022R2554", name_de="Digital Operational Resilience Act (DORA)", version_date="2022-12-14"), + RegSpec(reg="MaschinenVO", celex="32023R1230", name_de="Maschinenverordnung (EU) 2023/1230", version_date="2023-06-14"), + RegSpec(reg="NIS2", celex="32022L2555", name_de="NIS-2-Richtlinie", version_date="2022-12-14", legal_basis_rank="eu_directive"), + RegSpec(reg="DSGVO", celex="32016R0679", name_de="Datenschutz-Grundverordnung (DSGVO)", + collection="bp_compliance_datenschutz", version_date="2016-04-27"), +] + + +class IngestResult(TypedDict): + reg: str + status: str + chunks: int + + +def count_existing_recitals(spec: RegSpec) -> int: + with httpx.Client(timeout=60.0, verify=False) as client: + resp = client.post( + f"{QDRANT_URL}/collections/{spec.collection}/points/count", + json={"filter": {"must": [ + {"key": "celex", "match": {"value": spec.celex}}, + {"key": "chunk_scope", "match": {"value": "recital"}}, + ]}, "exact": True}, + ) + resp.raise_for_status() + return int(resp.json()["result"]["count"]) + + +def ingest_one(spec: RegSpec, dry_run: bool) -> IngestResult: + if (existing := count_existing_recitals(spec)) > 0: + logger.info(" recitals already present: %d — SKIPPING (no re-ingest)", existing) + return {"reg": spec.reg, "status": "exists", "chunks": existing} + + try: + html = download_act(spec.celex) + except Exception as exc: # noqa: BLE001 — log + continue with the next act + logger.error(" download FAILED: %s", exc) + return {"reg": spec.reg, "status": "download_failed", "chunks": 0} + + recitals = parse_recitals(html, spec.reg) + passed, problems = self_test(recitals) + logger.info(" parsed: %d recitals", len(recitals)) + if not passed: + logger.error(" GATE FAIL — %s", "; ".join(problems)) + return {"reg": spec.reg, "status": "gate_failed", "chunks": 0} + + units = build_upload_units(recitals, spec, RUN_TAG) + if dry_run: + logger.info(" DRY RUN — would upload %d recital units", len(units)) + return {"reg": spec.reg, "status": "dry_run", "chunks": len(units)} + + chunks = 0 + with httpx.Client(timeout=600.0, verify=False) as client: + for unit in units: + chunks += upload_unit(client, RAG_URL, unit) + logger.info(" uploaded: %d units, %d chunks", len(units), chunks) + return {"reg": spec.reg, "status": "ok", "chunks": chunks} + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + logger.info("RecitalIngester — %d acts | dry_run=%s", len(SPECS), args.dry_run) + results: list[IngestResult] = [] + for i, spec in enumerate(SPECS, 1): + logger.info("\n[%d/%d] %s (CELEX %s)", i, len(SPECS), spec.name_de, spec.celex) + results.append(ingest_one(spec, args.dry_run)) + if i < len(SPECS): + time.sleep(1) + + for r in results: + logger.info(" %-14s %-15s chunks=%s", r["reg"], r["status"].upper(), r["chunks"]) + logger.info("Total: %d", sum(r["chunks"] for r in results if r["status"] == "ok")) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/services/recital_ingester.py b/control-pipeline/services/recital_ingester.py new file mode 100644 index 0000000..8899094 --- /dev/null +++ b/control-pipeline/services/recital_ingester.py @@ -0,0 +1,115 @@ +"""RecitalIngester (Parser 2): ingests EU act recitals (Erwägungsgründe) as a +SEPARATE, interpretative source — never a primary obligation source. + +In eur-lex / CELLAR XHTML each recital sits in a preamble block +
with the marker "(N)" and the text in +adjacent table cells, which is why a naive article parser finds none. This +parser keys on the id="rct_N" markers and joins the recital's prose. + +Recitals are tagged source_class=recital / authority_weight=60 / +use_for_primary=false, so they rank below binding articles and surface only as +interpretation context (and trip the human-review flag if they ever top +results). Reuses the eur-lex download + helpers from legal_act_ingester +(Parser 1). +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Any + +from services.legal_act_ingester import RegSpec, UploadUnit, clean, refs_out + +RECITAL_WEIGHT = 60 + +_RCT_DIV_RE = re.compile(r'id="rct_(\d+)"') +_OJ_P_RE = re.compile(r']*class="oj-normal"[^>]*>(.*?)

', re.S) +_RCT_NUM_RE = re.compile(r"^\(\d+\)$") +_MIN_RECITAL_CHARS = 20 + + +@dataclass +class Recital: + num: str + text: str + + +def parse_recitals(raw: str, reg: str) -> list[Recital]: + """Extract recitals from the preamble via the id="rct_N" markers. `reg` is + accepted for symmetry with the other parsers (recitals carry no reg in-text).""" + _ = reg + end = raw.find('class="oj-ti-art"') + if end < 0: + end = len(raw) + markers = [(m.group(1), m.start()) for m in _RCT_DIV_RE.finditer(raw, 0, end)] + recitals: list[Recital] = [] + for i, (num, start) in enumerate(markers): + stop = markers[i + 1][1] if i + 1 < len(markers) else end + parts = [clean(inner) for inner in _OJ_P_RE.findall(raw[start:stop])] + body = " ".join(p for p in parts if p and not _RCT_NUM_RE.match(p)) + if len(body) >= _MIN_RECITAL_CHARS: + recitals.append(Recital(num=num, text=body)) + return recitals + + +def self_test(recitals: list[Recital]) -> tuple[bool, list[str]]: + """Gate before upload. Every EU act has recitals → 0 is a parse failure.""" + problems: list[str] = [] + if not recitals: + problems.append("0 recitals parsed") + nums = [r.num for r in recitals] + if len(nums) != len(set(nums)): + problems.append("duplicate recital numbers") + return (not problems, problems) + + +def _recital_meta(spec: RegSpec, rc: Recital) -> dict[str, Any]: + cu = f"{spec.reg} Erwägungsgrund {rc.num}" + return { + "regulation_code": spec.reg, + "regulation_short": spec.reg, + "regulation_name_de": spec.name_de, + "citation_style": "recital", + "document_type": "legal_act", + "source_class": "recital", + "bindingness": "interpretative", + "authority_level": 60, + "authority_weight": RECITAL_WEIGHT, + "source_type": "law", + "issuer": "European Union", + "jurisdiction": "EU", + "legal_basis_rank": spec.legal_basis_rank, + "version_date": spec.version_date, + "source": "eur-lex.europa.eu", + "license": "public_eu", + "category": "recht", + "celex": spec.celex, + "use_for_primary": False, # interpretative — never a primary obligation source + "is_recital": True, + "citation_unit": cu, + "article_label": cu, + "article": f"Erwaegungsgrund-{rc.num}", # distinct → avoids point-ID collisions + "chunk_scope": "recital", + "article_type": "recital", + "references_out": refs_out(spec.reg, rc.text), + "norm_id": f"EU-{spec.reg.replace(' ', '')}-Rec{rc.num}", + } + + +def build_upload_units(recitals: list[Recital], spec: RegSpec, run_tag: str) -> list[UploadUnit]: + """One UploadUnit per recital, each with its own document_version (the RAG + service derives `article` from text and would otherwise collide recitals).""" + slug = spec.reg.lower().replace(" ", "") + base = f"{run_tag}-{slug}" + units: list[UploadUnit] = [] + for rc in recitals: + text = f"{spec.reg} Erwägungsgrund {rc.num}\n\n{rc.text}" + units.append(UploadUnit( + filename=f"{slug}_rec{rc.num}.txt", + text=text, + meta=_recital_meta(spec, rc), + document_version=f"{base}-rec{rc.num}", + collection=spec.collection, + )) + return units diff --git a/control-pipeline/tests/fixtures/sample_eurlex_recitals.html b/control-pipeline/tests/fixtures/sample_eurlex_recitals.html new file mode 100644 index 0000000..bf9e002 --- /dev/null +++ b/control-pipeline/tests/fixtures/sample_eurlex_recitals.html @@ -0,0 +1,19 @@ + + +

DAS EUROPÄISCHE PARLAMENT — in Erwägung nachstehender Gründe:

+
+ + + +

(1)

Dieser erste Erwaegungsgrund erklaert den Hintergrund der Verordnung ausfuehrlich und verweist auf Artikel 5.

+
+
+ + + +

(2)

Der zweite Erwaegungsgrund ergaenzt den ersten und nennt weitere Ziele der Regelung im Detail.

+
+

Artikel 1

+

Gegenstand

+

Der eigentliche Artikeltext, der KEIN Erwaegungsgrund ist und nicht als solcher geparst werden darf.

+ diff --git a/control-pipeline/tests/test_recital_ingester.py b/control-pipeline/tests/test_recital_ingester.py new file mode 100644 index 0000000..80ca98b --- /dev/null +++ b/control-pipeline/tests/test_recital_ingester.py @@ -0,0 +1,56 @@ +"""Unit tests for the RecitalIngester engine (Parser 2). + +Pure parser + metadata tests against a synthetic eur-lex recital fixture (the +id="rct_N" preamble-table structure). Covers: recital extraction, exclusion of +article text, the self-test gate, and the interpretative (non-primary) metadata. +""" + +import os + +from services.legal_act_ingester import RegSpec +from services.recital_ingester import build_upload_units, parse_recitals, self_test + +FIXTURE = os.path.join(os.path.dirname(__file__), "fixtures", "sample_eurlex_recitals.html") +SPEC = RegSpec(reg="TEST", celex="32099R0001", name_de="Testverordnung", version_date="2099-01-01") + + +def _raw() -> str: + with open(FIXTURE, encoding="utf-8") as fh: + return fh.read() + + +def test_parse_recitals_from_rct_markers(): + recs = parse_recitals(_raw(), "TEST") + assert [r.num for r in recs] == ["1", "2"] + assert "Hintergrund" in recs[0].text + + +def test_article_text_is_not_captured_as_recital(): + joined = " ".join(r.text for r in parse_recitals(_raw(), "TEST")) + assert "Artikeltext" not in joined # the article body must stay out of recitals + assert "(1)" not in joined and "(2)" not in joined # the "(N)" markers are stripped + + +def test_self_test_passes_and_flags_empty(): + ok, _ = self_test(parse_recitals(_raw(), "TEST")) + assert ok + bad, problems = self_test([]) + assert not bad and "0 recitals" in problems[0] + + +def test_recital_units_are_interpretative_not_primary(): + units = build_upload_units(parse_recitals(_raw(), "TEST"), SPEC, "run") + assert len(units) == 2 + meta = units[0].meta + assert meta["source_class"] == "recital" + assert meta["authority_weight"] == 60 + assert meta["use_for_primary"] is False + assert meta["is_recital"] is True + assert meta["chunk_scope"] == "recital" + assert meta["citation_unit"] == "TEST Erwägungsgrund 1" + assert meta["article"] == "Erwaegungsgrund-1" + # per-recital document_version prevents point-ID collisions + assert units[0].document_version == "run-test-rec1" + assert units[1].document_version == "run-test-rec2" + # recital 1 cites Artikel 5 → forward edge for the citation graph + assert "Art. 5 TEST" in meta["references_out"]