diff --git a/control-pipeline/scripts/replace_eu_pdfs_with_html.py b/control-pipeline/scripts/replace_eu_pdfs_with_html.py new file mode 100644 index 0000000..b37776c --- /dev/null +++ b/control-pipeline/scripts/replace_eu_pdfs_with_html.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +Replace EU regulation PDFs with clean HTML from EUR-Lex. + +Downloads HTML versions of EU regulations (using CELEX numbers), +deletes old PDF chunks from Qdrant, uploads HTML via RAG service. + +Usage: + python3 scripts/replace_eu_pdfs_with_html.py --dry-run + python3 scripts/replace_eu_pdfs_with_html.py + python3 scripts/replace_eu_pdfs_with_html.py --celex 32016R0679 # single doc +""" + +import argparse +import json +import logging +import time + +import httpx + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger("eurlex-replace") + +DEFAULT_RAG_URL = "https://macmini:8097" +DEFAULT_QDRANT_URL = "http://macmini:6333" + +EURLEX_HTML_URL = "https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{celex}" + +# EU regulations with CELEX numbers and their current collection + metadata +EU_REGULATIONS = [ + {"celex": "32024R1689", "reg_id": "ai_act_2024", "name": "AI Act", "coll": "bp_compliance_ce"}, + {"celex": "32024R2847", "reg_id": "cra_2024", "name": "Cyber Resilience Act", "coll": "bp_compliance_ce"}, + {"celex": "32022L2555", "reg_id": "nis2_2022", "name": "NIS2-Richtlinie", "coll": "bp_compliance_ce"}, + {"celex": "32016R0679", "reg_id": "dsgvo_2016", "name": "DSGVO", "coll": "bp_compliance_ce"}, + {"celex": "32024R1624", "reg_id": "amlr_2024", "name": "Anti-Geldwaesche-VO", "coll": "bp_compliance_ce"}, + {"celex": "32017R0745", "reg_id": "eu_mdr_2017", "name": "Medical Device Regulation", "coll": "bp_compliance_ce"}, + {"celex": "32022R2065", "reg_id": "dsa_2022", "name": "Digital Services Act", "coll": "bp_compliance_ce"}, + {"celex": "32022R1925", "reg_id": "dma_2022", "name": "Digital Markets Act", "coll": "bp_compliance_ce"}, + {"celex": "32022R2554", "reg_id": "dora_2022", "name": "DORA", "coll": "bp_compliance_ce"}, + {"celex": "32022R0868", "reg_id": "dga_2022", "name": "Data Governance Act", "coll": "bp_compliance_ce"}, + {"celex": "32023R2854", "reg_id": "dataact_2023", "name": "Data Act", "coll": "bp_compliance_ce"}, + {"celex": "32023R0988", "reg_id": "gpsr_2023", "name": "General Product Safety Regulation", "coll": "bp_compliance_ce"}, + {"celex": "32023R1230", "reg_id": "machinery_2023", "name": "Maschinenverordnung", "coll": "bp_compliance_ce"}, + {"celex": "32023R1803", "reg_id": "ifrs_2023", "name": "IFRS Regulation", "coll": "bp_compliance_ce"}, + {"celex": "32023D1795", "reg_id": "dpf_2023", "name": "Data Privacy Framework", "coll": "bp_compliance_ce"}, + {"celex": "32019L2161", "reg_id": "omnibus_2019", "name": "Omnibus-Richtlinie", "coll": "bp_compliance_ce"}, + {"celex": "32019L0790", "reg_id": "dsm_2019", "name": "DSM-Richtlinie", "coll": "bp_compliance_ce"}, + {"celex": "32019L0770", "reg_id": "digital_content_2019", "name": "Digital Content Directive", "coll": "bp_compliance_ce"}, + {"celex": "32002L0058", "reg_id": "eprivacy_2002", "name": "ePrivacy-Richtlinie", "coll": "bp_compliance_ce"}, + {"celex": "32000L0031", "reg_id": "ecommerce_2000", "name": "E-Commerce-Richtlinie", "coll": "bp_compliance_ce"}, +] + + +def download_eurlex_html(celex: str) -> bytes: + """Download HTML from EUR-Lex for a given CELEX number.""" + url = EURLEX_HTML_URL.format(celex=celex) + with httpx.Client(timeout=60.0, follow_redirects=True) as c: + r = c.get(url) + r.raise_for_status() + return r.content + + +def delete_old_chunks(qdrant_url: str, collection: str, reg_id: str): + """Delete chunks matching regulation_id prefix.""" + with httpx.Client(timeout=30.0) as c: + # Try multiple field names for regulation_id + for field in ["regulation_id"]: + r = c.post(f"{qdrant_url}/collections/{collection}/points/delete", json={ + "filter": {"must": [{"key": field, "match": {"value": reg_id}}]} + }) + if r.status_code == 200: + return + + +def find_old_chunks_by_filename(qdrant_url: str, collection: str, filename_pattern: str) -> int: + """Count existing chunks matching a filename pattern.""" + with httpx.Client(timeout=30.0) as c: + r = c.post(f"{qdrant_url}/collections/{collection}/points/count", json={ + "exact": True, + "filter": {"must": [{"key": "regulation_id", "match": {"value": filename_pattern}}]} + }) + if r.status_code == 200: + return r.json()["result"]["count"] + return 0 + + +def upload_html(rag_url: str, html_bytes: bytes, reg: dict) -> dict: + """Upload HTML to RAG service.""" + filename = f"{reg['reg_id']}.html" + metadata = json.dumps({ + "regulation_id": reg["reg_id"], + "regulation_name_de": reg["name"], + "celex": reg["celex"], + "source": "EUR-Lex", + "license": "EU_law", + "source_type": "law", + "category": "eu_regulation", + }, ensure_ascii=False) + + with httpx.Client(timeout=3600.0, verify=False) as c: + r = c.post(f"{rag_url}/api/v1/documents/upload", + files={"file": (filename, html_bytes, "text/html")}, + data={ + "collection": reg["coll"], + "data_type": "compliance", + "bundesland": "eu", + "use_case": "regulation", + "year": reg["celex"][1:5], + "chunk_strategy": "recursive", + "chunk_size": "1500", + "chunk_overlap": "100", + "metadata_json": metadata, + }, + ) + r.raise_for_status() + return r.json() + + +def check_section_rate(qdrant_url: str, collection: str, reg_id: str) -> tuple: + """Check section rate for a regulation. Returns (total, with_section).""" + total = 0 + with_section = 0 + with httpx.Client(timeout=30.0) as c: + r = c.post(f"{qdrant_url}/collections/{collection}/points/scroll", json={ + "limit": 100, "with_payload": True, "with_vector": False, + "filter": {"must": [{"key": "regulation_id", "match": {"value": reg_id}}]} + }) + if r.status_code == 200: + pts = r.json()["result"]["points"] + total = len(pts) + with_section = sum(1 for p in pts if p["payload"].get("section")) + return total, with_section + + +def main(): + parser = argparse.ArgumentParser(description="Replace EU PDFs with EUR-Lex HTML") + parser.add_argument("--rag-url", default=DEFAULT_RAG_URL) + parser.add_argument("--qdrant-url", default=DEFAULT_QDRANT_URL) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--celex", default=None, help="Process only this CELEX number") + args = parser.parse_args() + + regs = EU_REGULATIONS + if args.celex: + regs = [r for r in regs if r["celex"] == args.celex] + if not regs: + print(f"CELEX {args.celex} not found in list") + return + + results = [] + + for reg in regs: + logger.info("[%s] %s (%s)", reg["celex"], reg["name"], reg["reg_id"]) + + # Download HTML + try: + html_bytes = download_eurlex_html(reg["celex"]) + logger.info(" Downloaded: %d bytes", len(html_bytes)) + except Exception as e: + logger.error(" Download FAILED: %s", e) + results.append({"reg": reg, "status": "download_failed", "error": str(e)}) + continue + + if args.dry_run: + results.append({"reg": reg, "status": "dry_run", "html_size": len(html_bytes)}) + continue + + # Delete old chunks + old_count = find_old_chunks_by_filename(args.qdrant_url, reg["coll"], reg["reg_id"]) + delete_old_chunks(args.qdrant_url, reg["coll"], reg["reg_id"]) + logger.info(" Deleted %d old chunks", old_count) + + # Upload HTML + try: + result = upload_html(args.rag_url, html_bytes, reg) + new_chunks = result.get("chunks_count", 0) + logger.info(" Uploaded: %d new chunks", new_chunks) + except Exception as e: + logger.error(" Upload FAILED: %s", e) + results.append({"reg": reg, "status": "upload_failed", "error": str(e)}) + time.sleep(2) + continue + + # Check quality + time.sleep(2) + total, with_sec = check_section_rate(args.qdrant_url, reg["coll"], reg["reg_id"]) + pct = with_sec * 100 // max(total, 1) + logger.info(" Section rate: %d/%d = %d%%", with_sec, total, pct) + + results.append({ + "reg": reg, "status": "ok", + "old_chunks": old_count, "new_chunks": new_chunks, + "section_rate": pct, + }) + time.sleep(2) + + # Report + print("\n" + "=" * 90) + print("EUR-LEX REPLACEMENT REPORT") + print("=" * 90) + print(f"{'CELEX':<15} {'Name':<30} {'Status':<10} {'Old':>5} {'New':>5} {'Sect%':>6}") + print("-" * 90) + for r in results: + reg = r["reg"] + status = r["status"] + old = r.get("old_chunks", "") + new = r.get("new_chunks", r.get("html_size", "")) + sect = f"{r.get('section_rate', '')}%" if "section_rate" in r else "" + print(f"{reg['celex']:<15} {reg['name'][:30]:<30} {status:<10} {str(old):>5} {str(new):>5} {sect:>6}") + + +if __name__ == "__main__": + main() diff --git a/embedding-service/main.py b/embedding-service/main.py index 5124881..03f3ca6 100644 --- a/embedding-service/main.py +++ b/embedding-service/main.py @@ -281,7 +281,7 @@ ENGLISH_ABBREVIATIONS = { # Combined abbreviations for both languages ALL_ABBREVIATIONS = GERMAN_ABBREVIATIONS | ENGLISH_ABBREVIATIONS -# Regex pattern for legal section headers (§, Art., Article, Section, etc.) +# Regex pattern for legal/standard section headers _LEGAL_SECTION_RE = re.compile( r'^(?:' r'§\s*\d+' # § 25, § 5a @@ -296,6 +296,12 @@ _LEGAL_SECTION_RE = re.compile( r'|Part\s+[IVXLC\d]+' # Part III r'|Recital\s+\d+' # Recital 42 r'|Erwaegungsgrund\s+\d+' # Erwaegungsgrund 26 + # NIST/ENISA/standard numbering + r'|\d+\.\d+(?:\.\d+)*\s+[A-ZÄÖÜ]' # 1.1 Title, 2.3.1 Subtitle + r'|[A-Z]{2,4}[-\.]\d+(?:\.\d+)*\b' # AC-1, AU-2, PO.1, PW.1.1 + r'|Table\s+\d+' # Table 1, Table A-1 + r'|Figure\s+\d+' # Figure 1 + r'|Appendix\s+[A-Z\d]' # Appendix A, Appendix 1 r')', re.IGNORECASE | re.MULTILINE )