diff --git a/control-pipeline/scripts/ingest_eu_regulations.py b/control-pipeline/scripts/ingest_eu_regulations.py index c0f1cb2..f9ffafc 100644 --- a/control-pipeline/scripts/ingest_eu_regulations.py +++ b/control-pipeline/scripts/ingest_eu_regulations.py @@ -1,200 +1,139 @@ #!/usr/bin/env python3 -"""Ingest missing EU regulations from EUR-Lex (HTML). +"""Ingest EU legal acts from eur-lex/CELLAR via the LegalActIngester engine. -Downloads German HTML from EUR-Lex via CELEX number, uploads with legal chunking. +For each act this downloads the German XHTML (CELLAR, eur-lex fallback), parses +it into articles + annexes with full authority metadata + citation edges +(services/legal_act_ingester.py), self-tests the parse, and uploads per unit. +Acts whose CELEX already exists are SKIPPED — there is no automatic re-ingest. -Usage (on Mac Mini): +Usage (Mac Mini, with the RAG service reachable): python3 control-pipeline/scripts/ingest_eu_regulations.py --dry-run python3 control-pipeline/scripts/ingest_eu_regulations.py """ import argparse -import json import logging +import os +import sys import time +from typing import TypedDict import httpx +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from services.legal_act_ingester import ( # noqa: E402 + RegSpec, + build_upload_units, + download_act, + parse_html, + self_test, + upload_unit, +) + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("ingest-eu") -RAG_URL = "https://localhost:8097" -QDRANT_URL = "http://localhost:6333" +RAG_URL = os.getenv("RAG_URL", "https://localhost:8097") +QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") COLLECTION = "bp_compliance_ce" +RUN_TAG = "2026-06-eu-v1" -EURLEX_URL = "https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{celex}" -# ---- EU Regulations to ingest ---- -REGULATIONS = [ - { - "celex": "32022L2464", - "regulation_id": "csrd_2022", - "name": "Corporate Sustainability Reporting Directive (CSRD)", - "short": "CSRD", - "category": "sustainability", - }, - { - "celex": "32024L1760", - "regulation_id": "csddd_2024", - "name": "Corporate Sustainability Due Diligence Directive (CSDDD)", - "short": "CSDDD", - "category": "sustainability", - }, - { - "celex": "32020R0852", - "regulation_id": "eu_taxonomy_2020", - "name": "EU-Taxonomie-Verordnung", - "short": "EU Taxonomy", - "category": "sustainability", - }, - { - "celex": "32024R1183", - "regulation_id": "eidas_2_0_2024", - "name": "eIDAS 2.0 Verordnung (EU Digital Identity)", - "short": "eIDAS 2.0", - "category": "digital_identity", - }, - { - "celex": "32023L0970", - "regulation_id": "pay_transparency_2023", - "name": "Entgelttransparenz-Richtlinie", - "short": "Pay Transparency", - "category": "employment", - }, - { - "celex": "32022R2065", - "regulation_id": "dsa_2022_updated", - "name": "Digital Services Act (DSA) — aktualisiert", - "short": "DSA", - "category": "digital_services", - "skip_if_exists": "dsa_2022", # already exists under different ID - }, +class IngestResult(TypedDict): + reg: str + status: str + chunks: int + + +def _rank(celex: str) -> str: + """eu_directive for L-acts, eu_regulation otherwise (CELEX descriptor letter).""" + return "eu_directive" if len(celex) > 5 and celex[5] == "L" else "eu_regulation" + + +def _spec(celex: str, name_de: str, short: str, version_date: str = "") -> RegSpec: + return RegSpec( + reg=short, celex=celex, name_de=name_de, collection=COLLECTION, + version_date=version_date, legal_basis_rank=_rank(celex), + ) + + +# Acts this script ingests. The proven MVP acts (CRA / AI Act / DORA / NIS2 / +# MaschinenVO / DSGVO) are already in the corpus and get re-ingested via a +# separate, controlled step — not here. +SPECS = [ + _spec("32022L2464", "Corporate Sustainability Reporting Directive (CSRD)", "CSRD"), + _spec("32024L1760", "Corporate Sustainability Due Diligence Directive (CSDDD)", "CSDDD"), + _spec("32020R0852", "EU-Taxonomie-Verordnung", "EU Taxonomy"), + _spec("32024R1183", "eIDAS 2.0 Verordnung (EU Digital Identity)", "eIDAS 2.0"), + _spec("32023L0970", "Entgelttransparenz-Richtlinie", "Pay Transparency"), + _spec("32022R2065", "Digital Services Act (DSA)", "DSA"), ] -def download_eurlex(celex: str) -> str: - """Download EU regulation HTML from EUR-Lex.""" - url = EURLEX_URL.format(celex=celex) - with httpx.Client(timeout=30.0, follow_redirects=True) as c: - resp = c.get(url) - resp.raise_for_status() - return resp.text - - -def upload_html(html: str, filename: str, reg: dict, dry_run: bool = False): - """Upload HTML to RAG service.""" - if dry_run: - logger.info(" DRY RUN — would upload %d chars", len(html)) - return {"chunks_count": 0} - - meta = { - "regulation_id": reg["regulation_id"], - "regulation_name_de": reg["name"], - "regulation_short": reg["short"], - "celex": reg["celex"], - "category": reg["category"], - "source": "EUR-Lex", - "license": "EU_law", - "jurisdiction": "EU", - "source_type": "law", - } - form_data = { - "collection": COLLECTION, - "data_type": "compliance", - "bundesland": "bund", - "use_case": "compliance", - "year": "2026", - "chunk_strategy": "legal", - "chunk_size": "1500", - "chunk_overlap": "100", - "metadata_json": json.dumps(meta, ensure_ascii=False), - } - with httpx.Client(timeout=600.0, verify=False) as c: - resp = c.post( - f"{RAG_URL}/api/v1/documents/upload", - files={"file": (filename, html.encode("utf-8"), "text/html")}, - data=form_data, - ) - resp.raise_for_status() - return resp.json() - - -def count_existing(regulation_id: str) -> int: - with httpx.Client(timeout=60.0) as c: - resp = c.post( +def count_existing(celex: str) -> int: + """Chunks already present for this CELEX (old or new tagging) — the skip guard.""" + with httpx.Client(timeout=60.0, verify=False) as client: + resp = client.post( f"{QDRANT_URL}/collections/{COLLECTION}/points/count", - json={"filter": {"must": [ - {"key": "regulation_id", "match": {"value": regulation_id}} - ]}, "exact": True}, + json={"filter": {"must": [{"key": "celex", "match": {"value": celex}}]}, "exact": True}, ) resp.raise_for_status() - return resp.json()["result"]["count"] + return int(resp.json()["result"]["count"]) -def main(): +def ingest_one(spec: RegSpec, dry_run: bool) -> IngestResult: + if (existing := count_existing(spec.celex)) > 0: + logger.info(" already present: %d chunks — SKIPPING (no re-ingest)", existing) + return {"reg": spec.reg, "status": "exists", "chunks": existing} + + try: + html = download_act(spec.celex) + except Exception as exc: # noqa: BLE001 — log + continue with the next act + logger.error(" download FAILED: %s", exc) + return {"reg": spec.reg, "status": "download_failed", "chunks": 0} + + act = parse_html(html, spec.reg) + passed, problems = self_test(act) + logger.info(" parsed: %d articles, %d annexes", len(act.articles), len(act.annexes)) + if not passed: + logger.error(" GATE FAIL — %s", "; ".join(problems)) + return {"reg": spec.reg, "status": "gate_failed", "chunks": 0} + + units = build_upload_units(act, spec, RUN_TAG) + if dry_run: + logger.info(" DRY RUN — would upload %d units", len(units)) + return {"reg": spec.reg, "status": "dry_run", "chunks": len(units)} + + chunks = 0 + with httpx.Client(timeout=600.0, verify=False) as client: + for unit in units: + chunks += upload_unit(client, RAG_URL, unit) + logger.info(" uploaded: %d units, %d chunks", len(units), chunks) + return {"reg": spec.reg, "status": "ok", "chunks": chunks} + + +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() logger.info("=" * 60) - logger.info("Ingest EU Regulations from EUR-Lex") - logger.info(" Regulations: %d", len(REGULATIONS)) - logger.info(" Dry run: %s", args.dry_run) + logger.info("LegalActIngester — %d acts | dry_run=%s", len(SPECS), args.dry_run) logger.info("=" * 60) - results = [] - for i, reg in enumerate(REGULATIONS, 1): - logger.info("\n[%d/%d] %s (CELEX: %s)", i, len(REGULATIONS), reg["name"], reg["celex"]) + results: list[IngestResult] = [] + for i, spec in enumerate(SPECS, 1): + logger.info("\n[%d/%d] %s (CELEX %s)", i, len(SPECS), spec.name_de, spec.celex) + results.append(ingest_one(spec, args.dry_run)) + if i < len(SPECS): + time.sleep(1) - # Skip if variant already exists - skip_id = reg.get("skip_if_exists") - if skip_id: - existing = count_existing(skip_id) - if existing > 0: - logger.info(" Already exists as '%s' (%d chunks) — SKIPPING", skip_id, existing) - results.append({"reg": reg["short"], "status": "exists", "chunks": existing}) - continue - - # Check if this exact ID exists - existing = count_existing(reg["regulation_id"]) - if existing > 0: - logger.info(" Already exists: %d chunks — SKIPPING", existing) - results.append({"reg": reg["short"], "status": "exists", "chunks": existing}) - continue - - # Download from EUR-Lex - logger.info(" Downloading from EUR-Lex...") - try: - html = download_eurlex(reg["celex"]) - logger.info(" Downloaded: %d chars", len(html)) - except Exception as e: - logger.error(" Download FAILED: %s", e) - results.append({"reg": reg["short"], "status": "download_failed", "chunks": 0}) - continue - - # Upload - filename = f"{reg['regulation_id']}.html" - try: - result = upload_html(html, filename, reg, args.dry_run) - chunks = result.get("chunks_count", 0) - logger.info(" Uploaded: %d chunks", chunks) - results.append({"reg": reg["short"], "status": "ok", "chunks": chunks}) - except Exception as e: - logger.error(" Upload FAILED: %s", e) - results.append({"reg": reg["short"], "status": "error", "chunks": 0}) - - if i < len(REGULATIONS): - time.sleep(2) - - # Summary logger.info("\n" + "=" * 60) - logger.info("RESULTS") - logger.info("=" * 60) for r in results: - logger.info(" %-20s %s chunks=%d", r["reg"], r["status"].upper(), r["chunks"]) - - total_new = sum(r["chunks"] for r in results if r["status"] == "ok") - logger.info("\nTotal new chunks: %d", total_new) + logger.info(" %-18s %-15s chunks=%s", r["reg"], r["status"].upper(), r["chunks"]) + total = sum(r["chunks"] for r in results if r["status"] == "ok") + logger.info("\nTotal new chunks: %d", total) if __name__ == "__main__": diff --git a/control-pipeline/services/legal_act_ingester.py b/control-pipeline/services/legal_act_ingester.py new file mode 100644 index 0000000..06dcd93 --- /dev/null +++ b/control-pipeline/services/legal_act_ingester.py @@ -0,0 +1,332 @@ +"""Production LegalActIngester for EU eur-lex acts (Parser 1 of the corpus stack). + +Downloads the German XHTML of an EU act (CELLAR machine endpoint, with the +eur-lex web UI as fallback), parses it into ARTICLES + ANNEXES with full +authority metadata and forward citation edges (references_out), and self-tests +the parse before any upload. The eur-lex / CELLAR XHTML uses uniform CSS classes +(oj-ti-art / oj-sti-art / oj-normal / oj-ti-grseq-1 / oj-doc-ti) across every +act, so one parser covers DSGVO / CRA / AI Act / DORA / NIS2 / MaschinenVO / ... + +Recitals are intentionally NOT handled here — they are a separate, lower-weight +source (RecitalIngester, Parser 2). Scope of this module: binding articles + the +annexes that carry the actual obligations. +""" + +from __future__ import annotations + +import html as html_lib +import json +import logging +import re +from dataclasses import dataclass, field +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + +CHUNK_SIZE = "2500" +CHUNK_OVERLAP = "200" + +# CELLAR is the canonical machine endpoint and returns the full XHTML even for +# acts the eur-lex web UI blocks with an empty HTTP 202 (e.g. DORA). Try it +# first, fall back to the web UI for anything CELLAR cannot serve. +CELLAR_URL = "http://publications.europa.eu/resource/celex/{celex}" +EURLEX_URL = "https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{celex}" +_USER_AGENT = "Mozilla/5.0 (compatible; BreakPilot-LegalActIngester/1.0)" + +_P_RE = re.compile( + r'
]*class="oj-(ti-art|sti-art|normal|expanded|ti-grseq-1|doc-ti)"[^>]*>(.*?)
', + re.S, +) +_TAG_RE = re.compile(r"<[^>]+>") +_ART_RE = re.compile(r"Artikel\s+(\d+[a-z]?)") +_ANNEX_RE = re.compile(r"ANHANG\s+([IVXLC]+)\b") +_PARA_RE = re.compile(r"§\s*(\d+[a-z]?)") +_ANNEX_REF_RE = re.compile(r"Anh[ae]ng\s+([IVXLC]+)\b") +_EMPTY_ANNEX_CHARS = 15 # below this an annex is a table/product list → skip on upload + + +@dataclass +class RegSpec: + """The minimum an act needs to be ingested + cited.""" + + reg: str # short citation handle, e.g. "CRA" + celex: str # e.g. "32024R2847" + name_de: str + collection: str = "bp_compliance_ce" + version_date: str = "" # ISO date, e.g. "2024-10-23" + legal_basis_rank: str = "eu_regulation" # or "eu_directive" + + +@dataclass +class Article: + num: str + title: str = "" + body: list[str] = field(default_factory=list) + chapter: str = "" + + +@dataclass +class Annex: + num: str + title: str = "" + body: list[str] = field(default_factory=list) + + +@dataclass +class ParsedAct: + reg: str + articles: list[Article] + annexes: list[Annex] + + +@dataclass +class UploadUnit: + filename: str + text: str + meta: dict[str, Any] + document_version: str + collection: str + + +def clean(fragment: str) -> str: + """Strip tags, unescape entities and collapse whitespace.""" + return " ".join(html_lib.unescape(_TAG_RE.sub("", fragment)).split()) + + +def download_act(celex: str, *, client: httpx.Client | None = None) -> str: + """Fetch an act's German XHTML — CELLAR first, eur-lex fallback. + + Raises RuntimeError if neither source yields a usable document (status 200 + containing article markers). + """ + own_client = client is None + http = client or httpx.Client(timeout=60.0, follow_redirects=True) + try: + attempts: tuple[tuple[str, dict[str, str]], ...] = ( + ( + CELLAR_URL.format(celex=celex), + {"Accept-Language": "deu", "Accept": "application/xhtml+xml, text/html;q=0.9"}, + ), + (EURLEX_URL.format(celex=celex), {}), + ) + for url, extra in attempts: + try: + resp = http.get(url, headers={"User-Agent": _USER_AGENT, **extra}) + except httpx.HTTPError as exc: + logger.warning("legal-act fetch error for %s: %s", url, exc) + continue + if resp.status_code == 200 and "oj-ti-art" in resp.text: + logger.info("downloaded CELEX %s from %s (%d chars)", celex, url, len(resp.text)) + return resp.text + logger.warning( + "no usable doc for CELEX %s from %s (status=%s, len=%d)", + celex, url, resp.status_code, len(resp.text), + ) + raise RuntimeError(f"no usable XHTML for CELEX {celex} (CELLAR + eur-lex failed)") + finally: + if own_client: + http.close() + + +def refs_out(reg: str, text: str) -> list[str]: + """Forward citation edges found in `text`: Art→Art, Art→§ (BDSG), Art→Annex.""" + out = {f"Art. {m} {reg}" for m in _ART_RE.findall(text)} + out |= {f"§ {m} BDSG" for m in _PARA_RE.findall(text)} + out |= {f"{reg} Anhang {m}" for m in _ANNEX_REF_RE.findall(text)} + return sorted(out) + + +def parse_html(raw: str, reg: str) -> ParsedAct: + """Parse eur-lex/CELLAR XHTML into articles + annexes (no recitals). + + Text before the first article (recitals/preamble) is ignored on purpose — + that is RecitalIngester's job (Parser 2). + """ + articles: list[Article] = [] + annexes: list[Annex] = [] + cur: Article | None = None + ann: Annex | None = None + chapter = "" + + for cls, inner in _P_RE.findall(raw): + txt = clean(inner) + if not txt: + continue + + annex_match = _ANNEX_RE.match(txt) if cls == "doc-ti" else None + if annex_match: + if cur is not None: + articles.append(cur) + cur = None + if ann is not None: + annexes.append(ann) + ann = Annex(num=annex_match.group(1)) + continue + + if ann is not None: # annex mode + if cls in ("doc-ti", "ti-grseq-1") and not ann.title: + ann.title = txt + elif cls in ("normal", "expanded", "ti-grseq-1"): + ann.body.append(txt) + continue + + if cls == "doc-ti": # document title / preamble headings + continue + if cls == "ti-grseq-1": # chapter / section heading + chapter = txt + continue + if cls == "ti-art": + art_match = _ART_RE.match(txt) + if art_match: + if cur is not None: + articles.append(cur) + cur = Article(num=art_match.group(1), chapter=chapter) + continue + if cls == "sti-art" and cur is not None: + cur.title = txt + continue + if cls in ("normal", "expanded") and cur is not None: + cur.body.append(txt) + + if cur is not None: + articles.append(cur) + if ann is not None: + annexes.append(ann) + return ParsedAct(reg=reg, articles=articles, annexes=annexes) + + +def self_test(act: ParsedAct) -> tuple[bool, list[str]]: + """Gate the parse before upload. Empty annexes (tables) do NOT fail — they + are skipped on upload. Returns (passed, problems).""" + problems: list[str] = [] + if not act.articles: + problems.append("0 articles parsed") + nums = [a.num for a in act.articles] + if len(nums) != len(set(nums)): + problems.append("duplicate article numbers") + short = [a.num for a in act.articles if len(" ".join(a.body)) < 15] + if short: + problems.append(f"{len(short)} empty articles (e.g. {short[:3]})") + return (not problems, problems) + + +def _base_meta(spec: RegSpec) -> dict[str, Any]: + return { + "regulation_code": spec.reg, + "regulation_short": spec.reg, + "regulation_name_de": spec.name_de, + "citation_style": "article", + "document_type": "legal_act", + "source_class": "binding_law", + "bindingness": "binding", + "authority_level": 95, + "authority_weight": 100, + "source_type": "law", + "issuer": "European Union", + "jurisdiction": "EU", + "legal_basis_rank": spec.legal_basis_rank, + "version_date": spec.version_date, + "source": "eur-lex.europa.eu", + "license": "public_eu", + "category": "recht", + "celex": spec.celex, + "use_for_primary": True, + } + + +def _article_meta(spec: RegSpec, art: Article) -> dict[str, Any]: + cu = f"Art. {art.num} {spec.reg}" + meta = _base_meta(spec) + meta.update({ + "citation_unit": cu, + "article_label": cu, + "parent_citation_unit": cu, + "is_citable": True, + "article": art.num, + "context_hierarchy": [art.chapter] if art.chapter else [], + "display_context": (art.chapter + " > " if art.chapter else "") + cu, + "chunk_scope": "section", + "article_title": art.title, + "article_type": "obligation", + "references_out": refs_out(spec.reg, " ".join(art.body)), + "norm_id": f"EU-{spec.reg.replace(' ', '')}-Art{art.num}", + }) + return meta + + +def _annex_meta(spec: RegSpec, annex: Annex) -> dict[str, Any]: + cu = f"{spec.reg} Anhang {annex.num}" + meta = _base_meta(spec) + meta.update({ + "citation_unit": cu, + "article_label": cu, + "parent_citation_unit": cu, + "is_citable": True, + "article": f"Anhang-{annex.num}", # distinct → avoids point-ID collisions + "context_hierarchy": [f"Anhang {annex.num}"], + "display_context": cu, + "chunk_scope": "annex", + "article_title": annex.title, + "article_type": "requirement", + "references_out": refs_out(spec.reg, " ".join(annex.body)), + "norm_id": f"EU-{spec.reg.replace(' ', '')}-Anhang{annex.num}", + }) + return meta + + +def build_upload_units(act: ParsedAct, spec: RegSpec, run_tag: str) -> list[UploadUnit]: + """One UploadUnit per article/annex. Articles share a document_version; each + annex gets its own (the RAG service derives `article` from text and would + otherwise collide annexes on chunk_index). Empty annexes are skipped. + """ + slug = spec.reg.lower().replace(" ", "") + base_version = f"{run_tag}-{slug}" + units: list[UploadUnit] = [] + for art in act.articles: + text = f"Art. {art.num} {spec.reg} {art.title}\n\n" + "\n\n".join(art.body) + units.append(UploadUnit( + filename=f"{slug}_art{art.num}.txt", + text=text, + meta=_article_meta(spec, art), + document_version=base_version, + collection=spec.collection, + )) + for annex in act.annexes: + if len(" ".join(annex.body)) < _EMPTY_ANNEX_CHARS: + continue # table / correspondence list — no usable prose + text = f"{spec.reg} Anhang {annex.num} {annex.title}\n\n" + "\n\n".join(annex.body) + units.append(UploadUnit( + filename=f"{slug}_anhang{annex.num}.txt", + text=text, + meta=_annex_meta(spec, annex), + document_version=f"{base_version}-anhang{annex.num}", + collection=spec.collection, + )) + return units + + +def upload_unit(client: httpx.Client, rag_url: str, unit: UploadUnit) -> int: + """Upload one unit to the RAG service. Returns the chunk count (0 on non-200).""" + data = { + "collection": unit.collection, + "data_type": "compliance", + "bundesland": "eu", + "use_case": "legal_reference", + "year": (unit.meta.get("version_date") or "")[:4] or "2026", + "chunk_strategy": "legal", + "chunk_size": CHUNK_SIZE, + "chunk_overlap": CHUNK_OVERLAP, + "metadata_json": json.dumps(unit.meta, ensure_ascii=False), + "document_version": unit.document_version, + } + resp = client.post( + f"{rag_url}/api/v1/documents/upload", + files={"file": (unit.filename, unit.text.encode("utf-8"), "text/plain")}, + data=data, + ) + if resp.status_code != 200: + logger.error("upload %s failed: %s %s", unit.filename, resp.status_code, resp.text[:200]) + return 0 + return int(resp.json().get("chunks_count", 0)) diff --git a/control-pipeline/tests/fixtures/sample_eurlex_act.html b/control-pipeline/tests/fixtures/sample_eurlex_act.html new file mode 100644 index 0000000..7e38a7e --- /dev/null +++ b/control-pipeline/tests/fixtures/sample_eurlex_act.html @@ -0,0 +1,17 @@ + + +VERORDNUNG (EU) 2099/1 DES TESTGEBERS
+(1) Dieser Erwaegungsgrund steht vor den Artikeln und darf NICHT als Artikel geparst werden.
+KAPITEL I
+Artikel 1
+Gegenstand
+Diese Verordnung legt Anforderungen fest; Einzelheiten regeln Artikel 2 und Anhang I.
+Artikel 2
+Begriffsbestimmungen
+Im Sinne dieser Verordnung bezeichnet der Ausdruck Produkt eine Sache mit digitalen Elementen.
+ANHANG I
+GRUNDLEGENDE ANFORDERUNGEN
+Die Produkte muessen die grundlegenden Anforderungen gemaess Artikel 1 dauerhaft erfuellen.
+ANHANG II
+x
+ diff --git a/control-pipeline/tests/test_legal_act_ingester.py b/control-pipeline/tests/test_legal_act_ingester.py new file mode 100644 index 0000000..929986c --- /dev/null +++ b/control-pipeline/tests/test_legal_act_ingester.py @@ -0,0 +1,108 @@ +"""Unit tests for the LegalActIngester engine (Parser 1). + +Pure parser + metadata tests against a synthetic eur-lex fixture — no network, +no RAG service. Covers: article/annex parsing, recital exclusion, references_out, +the self-test gate, full authority metadata and empty-annex skipping. +""" + +import os + +from services.legal_act_ingester import ( + RegSpec, + build_upload_units, + parse_html, + refs_out, + self_test, +) + +FIXTURE = os.path.join(os.path.dirname(__file__), "fixtures", "sample_eurlex_act.html") +SPEC = RegSpec(reg="TEST", celex="32099R0001", name_de="Testverordnung", + version_date="2099-01-01", legal_basis_rank="eu_regulation") + + +def _raw() -> str: + with open(FIXTURE, encoding="utf-8") as fh: + return fh.read() + + +def test_parse_articles_and_annexes(): + act = parse_html(_raw(), "TEST") + assert [a.num for a in act.articles] == ["1", "2"] + assert [a.num for a in act.annexes] == ["I", "II"] + art1 = act.articles[0] + assert art1.title == "Gegenstand" + assert art1.chapter == "KAPITEL I" + assert "grundlegenden Anforderungen" in act.annexes[0].body[0] + + +def test_recital_before_articles_is_ignored(): + # The "(1) Dieser Erwaegungsgrund …" paragraph precedes Article 1 and must + # not leak in as an article (recitals are Parser 2's job). + act = parse_html(_raw(), "TEST") + bodies = " ".join(b for a in act.articles for b in a.body) + assert "Erwaegungsgrund" not in bodies + + +def test_refs_out_extracts_article_and_annex_edges(): + act = parse_html(_raw(), "TEST") + art1_refs = refs_out("TEST", " ".join(act.articles[0].body)) + assert "Art. 2 TEST" in art1_refs + assert "TEST Anhang I" in art1_refs + # The annex points back to Article 1 (bidirectional graph is built later). + annex_refs = refs_out("TEST", " ".join(act.annexes[0].body)) + assert "Art. 1 TEST" in annex_refs + + +def test_self_test_passes_clean_act(): + passed, problems = self_test(parse_html(_raw(), "TEST")) + assert passed, problems + + +def test_self_test_flags_empty_and_duplicate(): + from services.legal_act_ingester import Article, ParsedAct + + dup = ParsedAct(reg="X", articles=[Article("1", body=["enough text here ok"]), + Article("1", body=["also enough text"])], annexes=[]) + passed, problems = self_test(dup) + assert not passed and any("duplicate" in p for p in problems) + + empty = ParsedAct(reg="X", articles=[Article("1", body=["x"])], annexes=[]) + passed2, problems2 = self_test(empty) + assert not passed2 and any("empty" in p for p in problems2) + + +def test_build_upload_units_skips_empty_annex_and_tags_authority(): + units = build_upload_units(parse_html(_raw(), "TEST"), SPEC, "2099-test") + # 2 articles + Annex I (Annex II body "x" is skipped) = 3 units + assert len(units) == 3 + by_cu = {u.meta["citation_unit"]: u for u in units} + assert set(by_cu) == {"Art. 1 TEST", "Art. 2 TEST", "TEST Anhang I"} + + art = by_cu["Art. 1 TEST"] + assert art.meta["chunk_scope"] == "section" + assert art.meta["source_class"] == "binding_law" + assert art.meta["authority_weight"] == 100 + assert art.meta["jurisdiction"] == "EU" + assert art.meta["use_for_primary"] is True + assert art.document_version == "2099-test-test" + + annex = by_cu["TEST Anhang I"] + assert annex.meta["chunk_scope"] == "annex" + assert annex.meta["article"] == "Anhang-I" + # per-annex document_version prevents point-ID collisions across annexes + assert annex.document_version == "2099-test-test-anhangI" + + +def test_build_upload_units_distinct_annex_versions(): + from services.legal_act_ingester import Annex, Article, ParsedAct + + act = ParsedAct( + reg="TEST", + articles=[Article("1", body=["body text long enough"])], + annexes=[Annex("I", body=["annex one body long enough"]), + Annex("II", body=["annex two body long enough"])], + ) + units = build_upload_units(act, SPEC, "run9") + versions = [u.document_version for u in units if u.meta["chunk_scope"] == "annex"] + assert versions == ["run9-test-anhangI", "run9-test-anhangII"] + assert len(set(versions)) == 2