feat(control-pipeline): production LegalActIngester for EU acts (Parser 1)
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 28s
CI / test-python-voice (push) Successful in 32s
CI / test-bqas (push) Successful in 30s

Add services/legal_act_ingester.py — the EU eur-lex LegalActIngester engine:
CELLAR download (with eur-lex fallback, bypassing the HTTP 202 web block on
large acts like DORA), parse into articles + annexes with full authority
metadata + forward citation edges (references_out), and a self-test gate before
upload. Refactor scripts/ingest_eu_regulations.py to use it: parse-based,
per-unit upload with a skip-by-CELEX guard (no automatic re-ingest). Recitals
are intentionally left to a separate ingester (Parser 2).

Tested: parser / metadata / self-test / refs_out over a synthetic eur-lex
fixture (7 tests), ruff + mypy clean, real CELLAR fetch of DORA verified
end-to-end (64 articles, full authority metadata).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-24 08:17:56 +02:00
parent f398088fbb
commit 569f64a400
4 changed files with 556 additions and 160 deletions
+99 -160
View File
@@ -1,200 +1,139 @@
#!/usr/bin/env python3
"""Ingest missing EU regulations from EUR-Lex (HTML).
"""Ingest EU legal acts from eur-lex/CELLAR via the LegalActIngester engine.
Downloads German HTML from EUR-Lex via CELEX number, uploads with legal chunking.
For each act this downloads the German XHTML (CELLAR, eur-lex fallback), parses
it into articles + annexes with full authority metadata + citation edges
(services/legal_act_ingester.py), self-tests the parse, and uploads per unit.
Acts whose CELEX already exists are SKIPPED — there is no automatic re-ingest.
Usage (on Mac Mini):
Usage (Mac Mini, with the RAG service reachable):
python3 control-pipeline/scripts/ingest_eu_regulations.py --dry-run
python3 control-pipeline/scripts/ingest_eu_regulations.py
"""
import argparse
import json
import logging
import os
import sys
import time
from typing import TypedDict
import httpx
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from services.legal_act_ingester import ( # noqa: E402
RegSpec,
build_upload_units,
download_act,
parse_html,
self_test,
upload_unit,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("ingest-eu")
RAG_URL = "https://localhost:8097"
QDRANT_URL = "http://localhost:6333"
RAG_URL = os.getenv("RAG_URL", "https://localhost:8097")
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
COLLECTION = "bp_compliance_ce"
RUN_TAG = "2026-06-eu-v1"
EURLEX_URL = "https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{celex}"
# ---- EU Regulations to ingest ----
REGULATIONS = [
{
"celex": "32022L2464",
"regulation_id": "csrd_2022",
"name": "Corporate Sustainability Reporting Directive (CSRD)",
"short": "CSRD",
"category": "sustainability",
},
{
"celex": "32024L1760",
"regulation_id": "csddd_2024",
"name": "Corporate Sustainability Due Diligence Directive (CSDDD)",
"short": "CSDDD",
"category": "sustainability",
},
{
"celex": "32020R0852",
"regulation_id": "eu_taxonomy_2020",
"name": "EU-Taxonomie-Verordnung",
"short": "EU Taxonomy",
"category": "sustainability",
},
{
"celex": "32024R1183",
"regulation_id": "eidas_2_0_2024",
"name": "eIDAS 2.0 Verordnung (EU Digital Identity)",
"short": "eIDAS 2.0",
"category": "digital_identity",
},
{
"celex": "32023L0970",
"regulation_id": "pay_transparency_2023",
"name": "Entgelttransparenz-Richtlinie",
"short": "Pay Transparency",
"category": "employment",
},
{
"celex": "32022R2065",
"regulation_id": "dsa_2022_updated",
"name": "Digital Services Act (DSA) — aktualisiert",
"short": "DSA",
"category": "digital_services",
"skip_if_exists": "dsa_2022", # already exists under different ID
},
class IngestResult(TypedDict):
reg: str
status: str
chunks: int
def _rank(celex: str) -> str:
"""eu_directive for L-acts, eu_regulation otherwise (CELEX descriptor letter)."""
return "eu_directive" if len(celex) > 5 and celex[5] == "L" else "eu_regulation"
def _spec(celex: str, name_de: str, short: str, version_date: str = "") -> RegSpec:
return RegSpec(
reg=short, celex=celex, name_de=name_de, collection=COLLECTION,
version_date=version_date, legal_basis_rank=_rank(celex),
)
# Acts this script ingests. The proven MVP acts (CRA / AI Act / DORA / NIS2 /
# MaschinenVO / DSGVO) are already in the corpus and get re-ingested via a
# separate, controlled step — not here.
SPECS = [
_spec("32022L2464", "Corporate Sustainability Reporting Directive (CSRD)", "CSRD"),
_spec("32024L1760", "Corporate Sustainability Due Diligence Directive (CSDDD)", "CSDDD"),
_spec("32020R0852", "EU-Taxonomie-Verordnung", "EU Taxonomy"),
_spec("32024R1183", "eIDAS 2.0 Verordnung (EU Digital Identity)", "eIDAS 2.0"),
_spec("32023L0970", "Entgelttransparenz-Richtlinie", "Pay Transparency"),
_spec("32022R2065", "Digital Services Act (DSA)", "DSA"),
]
def download_eurlex(celex: str) -> str:
"""Download EU regulation HTML from EUR-Lex."""
url = EURLEX_URL.format(celex=celex)
with httpx.Client(timeout=30.0, follow_redirects=True) as c:
resp = c.get(url)
resp.raise_for_status()
return resp.text
def upload_html(html: str, filename: str, reg: dict, dry_run: bool = False):
"""Upload HTML to RAG service."""
if dry_run:
logger.info(" DRY RUN — would upload %d chars", len(html))
return {"chunks_count": 0}
meta = {
"regulation_id": reg["regulation_id"],
"regulation_name_de": reg["name"],
"regulation_short": reg["short"],
"celex": reg["celex"],
"category": reg["category"],
"source": "EUR-Lex",
"license": "EU_law",
"jurisdiction": "EU",
"source_type": "law",
}
form_data = {
"collection": COLLECTION,
"data_type": "compliance",
"bundesland": "bund",
"use_case": "compliance",
"year": "2026",
"chunk_strategy": "legal",
"chunk_size": "1500",
"chunk_overlap": "100",
"metadata_json": json.dumps(meta, ensure_ascii=False),
}
with httpx.Client(timeout=600.0, verify=False) as c:
resp = c.post(
f"{RAG_URL}/api/v1/documents/upload",
files={"file": (filename, html.encode("utf-8"), "text/html")},
data=form_data,
)
resp.raise_for_status()
return resp.json()
def count_existing(regulation_id: str) -> int:
with httpx.Client(timeout=60.0) as c:
resp = c.post(
def count_existing(celex: str) -> int:
"""Chunks already present for this CELEX (old or new tagging) — the skip guard."""
with httpx.Client(timeout=60.0, verify=False) as client:
resp = client.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/count",
json={"filter": {"must": [
{"key": "regulation_id", "match": {"value": regulation_id}}
]}, "exact": True},
json={"filter": {"must": [{"key": "celex", "match": {"value": celex}}]}, "exact": True},
)
resp.raise_for_status()
return resp.json()["result"]["count"]
return int(resp.json()["result"]["count"])
def main():
def ingest_one(spec: RegSpec, dry_run: bool) -> IngestResult:
if (existing := count_existing(spec.celex)) > 0:
logger.info(" already present: %d chunks — SKIPPING (no re-ingest)", existing)
return {"reg": spec.reg, "status": "exists", "chunks": existing}
try:
html = download_act(spec.celex)
except Exception as exc: # noqa: BLE001 — log + continue with the next act
logger.error(" download FAILED: %s", exc)
return {"reg": spec.reg, "status": "download_failed", "chunks": 0}
act = parse_html(html, spec.reg)
passed, problems = self_test(act)
logger.info(" parsed: %d articles, %d annexes", len(act.articles), len(act.annexes))
if not passed:
logger.error(" GATE FAIL — %s", "; ".join(problems))
return {"reg": spec.reg, "status": "gate_failed", "chunks": 0}
units = build_upload_units(act, spec, RUN_TAG)
if dry_run:
logger.info(" DRY RUN — would upload %d units", len(units))
return {"reg": spec.reg, "status": "dry_run", "chunks": len(units)}
chunks = 0
with httpx.Client(timeout=600.0, verify=False) as client:
for unit in units:
chunks += upload_unit(client, RAG_URL, unit)
logger.info(" uploaded: %d units, %d chunks", len(units), chunks)
return {"reg": spec.reg, "status": "ok", "chunks": chunks}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
logger.info("=" * 60)
logger.info("Ingest EU Regulations from EUR-Lex")
logger.info(" Regulations: %d", len(REGULATIONS))
logger.info(" Dry run: %s", args.dry_run)
logger.info("LegalActIngester — %d acts | dry_run=%s", len(SPECS), args.dry_run)
logger.info("=" * 60)
results = []
for i, reg in enumerate(REGULATIONS, 1):
logger.info("\n[%d/%d] %s (CELEX: %s)", i, len(REGULATIONS), reg["name"], reg["celex"])
results: list[IngestResult] = []
for i, spec in enumerate(SPECS, 1):
logger.info("\n[%d/%d] %s (CELEX %s)", i, len(SPECS), spec.name_de, spec.celex)
results.append(ingest_one(spec, args.dry_run))
if i < len(SPECS):
time.sleep(1)
# Skip if variant already exists
skip_id = reg.get("skip_if_exists")
if skip_id:
existing = count_existing(skip_id)
if existing > 0:
logger.info(" Already exists as '%s' (%d chunks) — SKIPPING", skip_id, existing)
results.append({"reg": reg["short"], "status": "exists", "chunks": existing})
continue
# Check if this exact ID exists
existing = count_existing(reg["regulation_id"])
if existing > 0:
logger.info(" Already exists: %d chunks — SKIPPING", existing)
results.append({"reg": reg["short"], "status": "exists", "chunks": existing})
continue
# Download from EUR-Lex
logger.info(" Downloading from EUR-Lex...")
try:
html = download_eurlex(reg["celex"])
logger.info(" Downloaded: %d chars", len(html))
except Exception as e:
logger.error(" Download FAILED: %s", e)
results.append({"reg": reg["short"], "status": "download_failed", "chunks": 0})
continue
# Upload
filename = f"{reg['regulation_id']}.html"
try:
result = upload_html(html, filename, reg, args.dry_run)
chunks = result.get("chunks_count", 0)
logger.info(" Uploaded: %d chunks", chunks)
results.append({"reg": reg["short"], "status": "ok", "chunks": chunks})
except Exception as e:
logger.error(" Upload FAILED: %s", e)
results.append({"reg": reg["short"], "status": "error", "chunks": 0})
if i < len(REGULATIONS):
time.sleep(2)
# Summary
logger.info("\n" + "=" * 60)
logger.info("RESULTS")
logger.info("=" * 60)
for r in results:
logger.info(" %-20s %s chunks=%d", r["reg"], r["status"].upper(), r["chunks"])
total_new = sum(r["chunks"] for r in results if r["status"] == "ok")
logger.info("\nTotal new chunks: %d", total_new)
logger.info(" %-18s %-15s chunks=%s", r["reg"], r["status"].upper(), r["chunks"])
total = sum(r["chunks"] for r in results if r["status"] == "ok")
logger.info("\nTotal new chunks: %d", total)
if __name__ == "__main__":
@@ -0,0 +1,332 @@
"""Production LegalActIngester for EU eur-lex acts (Parser 1 of the corpus stack).
Downloads the German XHTML of an EU act (CELLAR machine endpoint, with the
eur-lex web UI as fallback), parses it into ARTICLES + ANNEXES with full
authority metadata and forward citation edges (references_out), and self-tests
the parse before any upload. The eur-lex / CELLAR XHTML uses uniform CSS classes
(oj-ti-art / oj-sti-art / oj-normal / oj-ti-grseq-1 / oj-doc-ti) across every
act, so one parser covers DSGVO / CRA / AI Act / DORA / NIS2 / MaschinenVO / ...
Recitals are intentionally NOT handled here — they are a separate, lower-weight
source (RecitalIngester, Parser 2). Scope of this module: binding articles + the
annexes that carry the actual obligations.
"""
from __future__ import annotations
import html as html_lib
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Any
import httpx
logger = logging.getLogger(__name__)
CHUNK_SIZE = "2500"
CHUNK_OVERLAP = "200"
# CELLAR is the canonical machine endpoint and returns the full XHTML even for
# acts the eur-lex web UI blocks with an empty HTTP 202 (e.g. DORA). Try it
# first, fall back to the web UI for anything CELLAR cannot serve.
CELLAR_URL = "http://publications.europa.eu/resource/celex/{celex}"
EURLEX_URL = "https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{celex}"
_USER_AGENT = "Mozilla/5.0 (compatible; BreakPilot-LegalActIngester/1.0)"
_P_RE = re.compile(
r'<p[^>]*class="oj-(ti-art|sti-art|normal|expanded|ti-grseq-1|doc-ti)"[^>]*>(.*?)</p>',
re.S,
)
_TAG_RE = re.compile(r"<[^>]+>")
_ART_RE = re.compile(r"Artikel\s+(\d+[a-z]?)")
_ANNEX_RE = re.compile(r"ANHANG\s+([IVXLC]+)\b")
_PARA_RE = re.compile(r"§\s*(\d+[a-z]?)")
_ANNEX_REF_RE = re.compile(r"Anh[ae]ng\s+([IVXLC]+)\b")
_EMPTY_ANNEX_CHARS = 15 # below this an annex is a table/product list → skip on upload
@dataclass
class RegSpec:
"""The minimum an act needs to be ingested + cited."""
reg: str # short citation handle, e.g. "CRA"
celex: str # e.g. "32024R2847"
name_de: str
collection: str = "bp_compliance_ce"
version_date: str = "" # ISO date, e.g. "2024-10-23"
legal_basis_rank: str = "eu_regulation" # or "eu_directive"
@dataclass
class Article:
num: str
title: str = ""
body: list[str] = field(default_factory=list)
chapter: str = ""
@dataclass
class Annex:
num: str
title: str = ""
body: list[str] = field(default_factory=list)
@dataclass
class ParsedAct:
reg: str
articles: list[Article]
annexes: list[Annex]
@dataclass
class UploadUnit:
filename: str
text: str
meta: dict[str, Any]
document_version: str
collection: str
def clean(fragment: str) -> str:
"""Strip tags, unescape entities and collapse whitespace."""
return " ".join(html_lib.unescape(_TAG_RE.sub("", fragment)).split())
def download_act(celex: str, *, client: httpx.Client | None = None) -> str:
"""Fetch an act's German XHTML — CELLAR first, eur-lex fallback.
Raises RuntimeError if neither source yields a usable document (status 200
containing article markers).
"""
own_client = client is None
http = client or httpx.Client(timeout=60.0, follow_redirects=True)
try:
attempts: tuple[tuple[str, dict[str, str]], ...] = (
(
CELLAR_URL.format(celex=celex),
{"Accept-Language": "deu", "Accept": "application/xhtml+xml, text/html;q=0.9"},
),
(EURLEX_URL.format(celex=celex), {}),
)
for url, extra in attempts:
try:
resp = http.get(url, headers={"User-Agent": _USER_AGENT, **extra})
except httpx.HTTPError as exc:
logger.warning("legal-act fetch error for %s: %s", url, exc)
continue
if resp.status_code == 200 and "oj-ti-art" in resp.text:
logger.info("downloaded CELEX %s from %s (%d chars)", celex, url, len(resp.text))
return resp.text
logger.warning(
"no usable doc for CELEX %s from %s (status=%s, len=%d)",
celex, url, resp.status_code, len(resp.text),
)
raise RuntimeError(f"no usable XHTML for CELEX {celex} (CELLAR + eur-lex failed)")
finally:
if own_client:
http.close()
def refs_out(reg: str, text: str) -> list[str]:
"""Forward citation edges found in `text`: Art→Art, Art→§ (BDSG), Art→Annex."""
out = {f"Art. {m} {reg}" for m in _ART_RE.findall(text)}
out |= {f"§ {m} BDSG" for m in _PARA_RE.findall(text)}
out |= {f"{reg} Anhang {m}" for m in _ANNEX_REF_RE.findall(text)}
return sorted(out)
def parse_html(raw: str, reg: str) -> ParsedAct:
"""Parse eur-lex/CELLAR XHTML into articles + annexes (no recitals).
Text before the first article (recitals/preamble) is ignored on purpose —
that is RecitalIngester's job (Parser 2).
"""
articles: list[Article] = []
annexes: list[Annex] = []
cur: Article | None = None
ann: Annex | None = None
chapter = ""
for cls, inner in _P_RE.findall(raw):
txt = clean(inner)
if not txt:
continue
annex_match = _ANNEX_RE.match(txt) if cls == "doc-ti" else None
if annex_match:
if cur is not None:
articles.append(cur)
cur = None
if ann is not None:
annexes.append(ann)
ann = Annex(num=annex_match.group(1))
continue
if ann is not None: # annex mode
if cls in ("doc-ti", "ti-grseq-1") and not ann.title:
ann.title = txt
elif cls in ("normal", "expanded", "ti-grseq-1"):
ann.body.append(txt)
continue
if cls == "doc-ti": # document title / preamble headings
continue
if cls == "ti-grseq-1": # chapter / section heading
chapter = txt
continue
if cls == "ti-art":
art_match = _ART_RE.match(txt)
if art_match:
if cur is not None:
articles.append(cur)
cur = Article(num=art_match.group(1), chapter=chapter)
continue
if cls == "sti-art" and cur is not None:
cur.title = txt
continue
if cls in ("normal", "expanded") and cur is not None:
cur.body.append(txt)
if cur is not None:
articles.append(cur)
if ann is not None:
annexes.append(ann)
return ParsedAct(reg=reg, articles=articles, annexes=annexes)
def self_test(act: ParsedAct) -> tuple[bool, list[str]]:
"""Gate the parse before upload. Empty annexes (tables) do NOT fail — they
are skipped on upload. Returns (passed, problems)."""
problems: list[str] = []
if not act.articles:
problems.append("0 articles parsed")
nums = [a.num for a in act.articles]
if len(nums) != len(set(nums)):
problems.append("duplicate article numbers")
short = [a.num for a in act.articles if len(" ".join(a.body)) < 15]
if short:
problems.append(f"{len(short)} empty articles (e.g. {short[:3]})")
return (not problems, problems)
def _base_meta(spec: RegSpec) -> dict[str, Any]:
return {
"regulation_code": spec.reg,
"regulation_short": spec.reg,
"regulation_name_de": spec.name_de,
"citation_style": "article",
"document_type": "legal_act",
"source_class": "binding_law",
"bindingness": "binding",
"authority_level": 95,
"authority_weight": 100,
"source_type": "law",
"issuer": "European Union",
"jurisdiction": "EU",
"legal_basis_rank": spec.legal_basis_rank,
"version_date": spec.version_date,
"source": "eur-lex.europa.eu",
"license": "public_eu",
"category": "recht",
"celex": spec.celex,
"use_for_primary": True,
}
def _article_meta(spec: RegSpec, art: Article) -> dict[str, Any]:
cu = f"Art. {art.num} {spec.reg}"
meta = _base_meta(spec)
meta.update({
"citation_unit": cu,
"article_label": cu,
"parent_citation_unit": cu,
"is_citable": True,
"article": art.num,
"context_hierarchy": [art.chapter] if art.chapter else [],
"display_context": (art.chapter + " > " if art.chapter else "") + cu,
"chunk_scope": "section",
"article_title": art.title,
"article_type": "obligation",
"references_out": refs_out(spec.reg, " ".join(art.body)),
"norm_id": f"EU-{spec.reg.replace(' ', '')}-Art{art.num}",
})
return meta
def _annex_meta(spec: RegSpec, annex: Annex) -> dict[str, Any]:
cu = f"{spec.reg} Anhang {annex.num}"
meta = _base_meta(spec)
meta.update({
"citation_unit": cu,
"article_label": cu,
"parent_citation_unit": cu,
"is_citable": True,
"article": f"Anhang-{annex.num}", # distinct → avoids point-ID collisions
"context_hierarchy": [f"Anhang {annex.num}"],
"display_context": cu,
"chunk_scope": "annex",
"article_title": annex.title,
"article_type": "requirement",
"references_out": refs_out(spec.reg, " ".join(annex.body)),
"norm_id": f"EU-{spec.reg.replace(' ', '')}-Anhang{annex.num}",
})
return meta
def build_upload_units(act: ParsedAct, spec: RegSpec, run_tag: str) -> list[UploadUnit]:
"""One UploadUnit per article/annex. Articles share a document_version; each
annex gets its own (the RAG service derives `article` from text and would
otherwise collide annexes on chunk_index). Empty annexes are skipped.
"""
slug = spec.reg.lower().replace(" ", "")
base_version = f"{run_tag}-{slug}"
units: list[UploadUnit] = []
for art in act.articles:
text = f"Art. {art.num} {spec.reg} {art.title}\n\n" + "\n\n".join(art.body)
units.append(UploadUnit(
filename=f"{slug}_art{art.num}.txt",
text=text,
meta=_article_meta(spec, art),
document_version=base_version,
collection=spec.collection,
))
for annex in act.annexes:
if len(" ".join(annex.body)) < _EMPTY_ANNEX_CHARS:
continue # table / correspondence list — no usable prose
text = f"{spec.reg} Anhang {annex.num} {annex.title}\n\n" + "\n\n".join(annex.body)
units.append(UploadUnit(
filename=f"{slug}_anhang{annex.num}.txt",
text=text,
meta=_annex_meta(spec, annex),
document_version=f"{base_version}-anhang{annex.num}",
collection=spec.collection,
))
return units
def upload_unit(client: httpx.Client, rag_url: str, unit: UploadUnit) -> int:
"""Upload one unit to the RAG service. Returns the chunk count (0 on non-200)."""
data = {
"collection": unit.collection,
"data_type": "compliance",
"bundesland": "eu",
"use_case": "legal_reference",
"year": (unit.meta.get("version_date") or "")[:4] or "2026",
"chunk_strategy": "legal",
"chunk_size": CHUNK_SIZE,
"chunk_overlap": CHUNK_OVERLAP,
"metadata_json": json.dumps(unit.meta, ensure_ascii=False),
"document_version": unit.document_version,
}
resp = client.post(
f"{rag_url}/api/v1/documents/upload",
files={"file": (unit.filename, unit.text.encode("utf-8"), "text/plain")},
data=data,
)
if resp.status_code != 200:
logger.error("upload %s failed: %s %s", unit.filename, resp.status_code, resp.text[:200])
return 0
return int(resp.json().get("chunks_count", 0))
+17
View File
@@ -0,0 +1,17 @@
<!DOCTYPE html>
<html><body>
<p class="oj-doc-ti">VERORDNUNG (EU) 2099/1 DES TESTGEBERS</p>
<p class="oj-normal">(1) Dieser Erwaegungsgrund steht vor den Artikeln und darf NICHT als Artikel geparst werden.</p>
<p class="oj-ti-grseq-1">KAPITEL I</p>
<p class="oj-ti-art">Artikel 1</p>
<p class="oj-sti-art">Gegenstand</p>
<p class="oj-normal">Diese Verordnung legt Anforderungen fest; Einzelheiten regeln Artikel 2 und Anhang I.</p>
<p class="oj-ti-art">Artikel 2</p>
<p class="oj-sti-art">Begriffsbestimmungen</p>
<p class="oj-normal">Im Sinne dieser Verordnung bezeichnet der Ausdruck Produkt eine Sache mit digitalen Elementen.</p>
<p class="oj-doc-ti">ANHANG I</p>
<p class="oj-ti-grseq-1">GRUNDLEGENDE ANFORDERUNGEN</p>
<p class="oj-normal">Die Produkte muessen die grundlegenden Anforderungen gemaess Artikel 1 dauerhaft erfuellen.</p>
<p class="oj-doc-ti">ANHANG II</p>
<p class="oj-normal">x</p>
</body></html>
@@ -0,0 +1,108 @@
"""Unit tests for the LegalActIngester engine (Parser 1).
Pure parser + metadata tests against a synthetic eur-lex fixture — no network,
no RAG service. Covers: article/annex parsing, recital exclusion, references_out,
the self-test gate, full authority metadata and empty-annex skipping.
"""
import os
from services.legal_act_ingester import (
RegSpec,
build_upload_units,
parse_html,
refs_out,
self_test,
)
FIXTURE = os.path.join(os.path.dirname(__file__), "fixtures", "sample_eurlex_act.html")
SPEC = RegSpec(reg="TEST", celex="32099R0001", name_de="Testverordnung",
version_date="2099-01-01", legal_basis_rank="eu_regulation")
def _raw() -> str:
with open(FIXTURE, encoding="utf-8") as fh:
return fh.read()
def test_parse_articles_and_annexes():
act = parse_html(_raw(), "TEST")
assert [a.num for a in act.articles] == ["1", "2"]
assert [a.num for a in act.annexes] == ["I", "II"]
art1 = act.articles[0]
assert art1.title == "Gegenstand"
assert art1.chapter == "KAPITEL I"
assert "grundlegenden Anforderungen" in act.annexes[0].body[0]
def test_recital_before_articles_is_ignored():
# The "(1) Dieser Erwaegungsgrund …" paragraph precedes Article 1 and must
# not leak in as an article (recitals are Parser 2's job).
act = parse_html(_raw(), "TEST")
bodies = " ".join(b for a in act.articles for b in a.body)
assert "Erwaegungsgrund" not in bodies
def test_refs_out_extracts_article_and_annex_edges():
act = parse_html(_raw(), "TEST")
art1_refs = refs_out("TEST", " ".join(act.articles[0].body))
assert "Art. 2 TEST" in art1_refs
assert "TEST Anhang I" in art1_refs
# The annex points back to Article 1 (bidirectional graph is built later).
annex_refs = refs_out("TEST", " ".join(act.annexes[0].body))
assert "Art. 1 TEST" in annex_refs
def test_self_test_passes_clean_act():
passed, problems = self_test(parse_html(_raw(), "TEST"))
assert passed, problems
def test_self_test_flags_empty_and_duplicate():
from services.legal_act_ingester import Article, ParsedAct
dup = ParsedAct(reg="X", articles=[Article("1", body=["enough text here ok"]),
Article("1", body=["also enough text"])], annexes=[])
passed, problems = self_test(dup)
assert not passed and any("duplicate" in p for p in problems)
empty = ParsedAct(reg="X", articles=[Article("1", body=["x"])], annexes=[])
passed2, problems2 = self_test(empty)
assert not passed2 and any("empty" in p for p in problems2)
def test_build_upload_units_skips_empty_annex_and_tags_authority():
units = build_upload_units(parse_html(_raw(), "TEST"), SPEC, "2099-test")
# 2 articles + Annex I (Annex II body "x" is skipped) = 3 units
assert len(units) == 3
by_cu = {u.meta["citation_unit"]: u for u in units}
assert set(by_cu) == {"Art. 1 TEST", "Art. 2 TEST", "TEST Anhang I"}
art = by_cu["Art. 1 TEST"]
assert art.meta["chunk_scope"] == "section"
assert art.meta["source_class"] == "binding_law"
assert art.meta["authority_weight"] == 100
assert art.meta["jurisdiction"] == "EU"
assert art.meta["use_for_primary"] is True
assert art.document_version == "2099-test-test"
annex = by_cu["TEST Anhang I"]
assert annex.meta["chunk_scope"] == "annex"
assert annex.meta["article"] == "Anhang-I"
# per-annex document_version prevents point-ID collisions across annexes
assert annex.document_version == "2099-test-test-anhangI"
def test_build_upload_units_distinct_annex_versions():
from services.legal_act_ingester import Annex, Article, ParsedAct
act = ParsedAct(
reg="TEST",
articles=[Article("1", body=["body text long enough"])],
annexes=[Annex("I", body=["annex one body long enough"]),
Annex("II", body=["annex two body long enough"])],
)
units = build_upload_units(act, SPEC, "run9")
versions = [u.document_version for u in units if u.meta["chunk_scope"] == "annex"]
assert versions == ["run9-test-anhangI", "run9-test-anhangII"]
assert len(set(versions)) == 2