feat(embedding): add NIST/ENISA/standard section numbering to chunker
Extends _LEGAL_SECTION_RE to detect: - Numbered sections: 1.1 Title, 2.3.1 Subtitle - Control family IDs: AC-1, AU-2, PO.1, PW.1.1 - Table/Figure/Appendix references Also adds EUR-Lex HTML replacement script. 58 embedding-service tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,213 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Replace EU regulation PDFs with clean HTML from EUR-Lex.
|
||||
|
||||
Downloads HTML versions of EU regulations (using CELEX numbers),
|
||||
deletes old PDF chunks from Qdrant, uploads HTML via RAG service.
|
||||
|
||||
Usage:
|
||||
python3 scripts/replace_eu_pdfs_with_html.py --dry-run
|
||||
python3 scripts/replace_eu_pdfs_with_html.py
|
||||
python3 scripts/replace_eu_pdfs_with_html.py --celex 32016R0679 # single doc
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
import httpx
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
logger = logging.getLogger("eurlex-replace")
|
||||
|
||||
DEFAULT_RAG_URL = "https://macmini:8097"
|
||||
DEFAULT_QDRANT_URL = "http://macmini:6333"
|
||||
|
||||
EURLEX_HTML_URL = "https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{celex}"
|
||||
|
||||
# EU regulations with CELEX numbers and their current collection + metadata
|
||||
EU_REGULATIONS = [
|
||||
{"celex": "32024R1689", "reg_id": "ai_act_2024", "name": "AI Act", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32024R2847", "reg_id": "cra_2024", "name": "Cyber Resilience Act", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32022L2555", "reg_id": "nis2_2022", "name": "NIS2-Richtlinie", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32016R0679", "reg_id": "dsgvo_2016", "name": "DSGVO", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32024R1624", "reg_id": "amlr_2024", "name": "Anti-Geldwaesche-VO", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32017R0745", "reg_id": "eu_mdr_2017", "name": "Medical Device Regulation", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32022R2065", "reg_id": "dsa_2022", "name": "Digital Services Act", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32022R1925", "reg_id": "dma_2022", "name": "Digital Markets Act", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32022R2554", "reg_id": "dora_2022", "name": "DORA", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32022R0868", "reg_id": "dga_2022", "name": "Data Governance Act", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32023R2854", "reg_id": "dataact_2023", "name": "Data Act", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32023R0988", "reg_id": "gpsr_2023", "name": "General Product Safety Regulation", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32023R1230", "reg_id": "machinery_2023", "name": "Maschinenverordnung", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32023R1803", "reg_id": "ifrs_2023", "name": "IFRS Regulation", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32023D1795", "reg_id": "dpf_2023", "name": "Data Privacy Framework", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32019L2161", "reg_id": "omnibus_2019", "name": "Omnibus-Richtlinie", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32019L0790", "reg_id": "dsm_2019", "name": "DSM-Richtlinie", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32019L0770", "reg_id": "digital_content_2019", "name": "Digital Content Directive", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32002L0058", "reg_id": "eprivacy_2002", "name": "ePrivacy-Richtlinie", "coll": "bp_compliance_ce"},
|
||||
{"celex": "32000L0031", "reg_id": "ecommerce_2000", "name": "E-Commerce-Richtlinie", "coll": "bp_compliance_ce"},
|
||||
]
|
||||
|
||||
|
||||
def download_eurlex_html(celex: str) -> bytes:
|
||||
"""Download HTML from EUR-Lex for a given CELEX number."""
|
||||
url = EURLEX_HTML_URL.format(celex=celex)
|
||||
with httpx.Client(timeout=60.0, follow_redirects=True) as c:
|
||||
r = c.get(url)
|
||||
r.raise_for_status()
|
||||
return r.content
|
||||
|
||||
|
||||
def delete_old_chunks(qdrant_url: str, collection: str, reg_id: str):
|
||||
"""Delete chunks matching regulation_id prefix."""
|
||||
with httpx.Client(timeout=30.0) as c:
|
||||
# Try multiple field names for regulation_id
|
||||
for field in ["regulation_id"]:
|
||||
r = c.post(f"{qdrant_url}/collections/{collection}/points/delete", json={
|
||||
"filter": {"must": [{"key": field, "match": {"value": reg_id}}]}
|
||||
})
|
||||
if r.status_code == 200:
|
||||
return
|
||||
|
||||
|
||||
def find_old_chunks_by_filename(qdrant_url: str, collection: str, filename_pattern: str) -> int:
|
||||
"""Count existing chunks matching a filename pattern."""
|
||||
with httpx.Client(timeout=30.0) as c:
|
||||
r = c.post(f"{qdrant_url}/collections/{collection}/points/count", json={
|
||||
"exact": True,
|
||||
"filter": {"must": [{"key": "regulation_id", "match": {"value": filename_pattern}}]}
|
||||
})
|
||||
if r.status_code == 200:
|
||||
return r.json()["result"]["count"]
|
||||
return 0
|
||||
|
||||
|
||||
def upload_html(rag_url: str, html_bytes: bytes, reg: dict) -> dict:
|
||||
"""Upload HTML to RAG service."""
|
||||
filename = f"{reg['reg_id']}.html"
|
||||
metadata = json.dumps({
|
||||
"regulation_id": reg["reg_id"],
|
||||
"regulation_name_de": reg["name"],
|
||||
"celex": reg["celex"],
|
||||
"source": "EUR-Lex",
|
||||
"license": "EU_law",
|
||||
"source_type": "law",
|
||||
"category": "eu_regulation",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
with httpx.Client(timeout=3600.0, verify=False) as c:
|
||||
r = c.post(f"{rag_url}/api/v1/documents/upload",
|
||||
files={"file": (filename, html_bytes, "text/html")},
|
||||
data={
|
||||
"collection": reg["coll"],
|
||||
"data_type": "compliance",
|
||||
"bundesland": "eu",
|
||||
"use_case": "regulation",
|
||||
"year": reg["celex"][1:5],
|
||||
"chunk_strategy": "recursive",
|
||||
"chunk_size": "1500",
|
||||
"chunk_overlap": "100",
|
||||
"metadata_json": metadata,
|
||||
},
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def check_section_rate(qdrant_url: str, collection: str, reg_id: str) -> tuple:
|
||||
"""Check section rate for a regulation. Returns (total, with_section)."""
|
||||
total = 0
|
||||
with_section = 0
|
||||
with httpx.Client(timeout=30.0) as c:
|
||||
r = c.post(f"{qdrant_url}/collections/{collection}/points/scroll", json={
|
||||
"limit": 100, "with_payload": True, "with_vector": False,
|
||||
"filter": {"must": [{"key": "regulation_id", "match": {"value": reg_id}}]}
|
||||
})
|
||||
if r.status_code == 200:
|
||||
pts = r.json()["result"]["points"]
|
||||
total = len(pts)
|
||||
with_section = sum(1 for p in pts if p["payload"].get("section"))
|
||||
return total, with_section
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Replace EU PDFs with EUR-Lex HTML")
|
||||
parser.add_argument("--rag-url", default=DEFAULT_RAG_URL)
|
||||
parser.add_argument("--qdrant-url", default=DEFAULT_QDRANT_URL)
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--celex", default=None, help="Process only this CELEX number")
|
||||
args = parser.parse_args()
|
||||
|
||||
regs = EU_REGULATIONS
|
||||
if args.celex:
|
||||
regs = [r for r in regs if r["celex"] == args.celex]
|
||||
if not regs:
|
||||
print(f"CELEX {args.celex} not found in list")
|
||||
return
|
||||
|
||||
results = []
|
||||
|
||||
for reg in regs:
|
||||
logger.info("[%s] %s (%s)", reg["celex"], reg["name"], reg["reg_id"])
|
||||
|
||||
# Download HTML
|
||||
try:
|
||||
html_bytes = download_eurlex_html(reg["celex"])
|
||||
logger.info(" Downloaded: %d bytes", len(html_bytes))
|
||||
except Exception as e:
|
||||
logger.error(" Download FAILED: %s", e)
|
||||
results.append({"reg": reg, "status": "download_failed", "error": str(e)})
|
||||
continue
|
||||
|
||||
if args.dry_run:
|
||||
results.append({"reg": reg, "status": "dry_run", "html_size": len(html_bytes)})
|
||||
continue
|
||||
|
||||
# Delete old chunks
|
||||
old_count = find_old_chunks_by_filename(args.qdrant_url, reg["coll"], reg["reg_id"])
|
||||
delete_old_chunks(args.qdrant_url, reg["coll"], reg["reg_id"])
|
||||
logger.info(" Deleted %d old chunks", old_count)
|
||||
|
||||
# Upload HTML
|
||||
try:
|
||||
result = upload_html(args.rag_url, html_bytes, reg)
|
||||
new_chunks = result.get("chunks_count", 0)
|
||||
logger.info(" Uploaded: %d new chunks", new_chunks)
|
||||
except Exception as e:
|
||||
logger.error(" Upload FAILED: %s", e)
|
||||
results.append({"reg": reg, "status": "upload_failed", "error": str(e)})
|
||||
time.sleep(2)
|
||||
continue
|
||||
|
||||
# Check quality
|
||||
time.sleep(2)
|
||||
total, with_sec = check_section_rate(args.qdrant_url, reg["coll"], reg["reg_id"])
|
||||
pct = with_sec * 100 // max(total, 1)
|
||||
logger.info(" Section rate: %d/%d = %d%%", with_sec, total, pct)
|
||||
|
||||
results.append({
|
||||
"reg": reg, "status": "ok",
|
||||
"old_chunks": old_count, "new_chunks": new_chunks,
|
||||
"section_rate": pct,
|
||||
})
|
||||
time.sleep(2)
|
||||
|
||||
# Report
|
||||
print("\n" + "=" * 90)
|
||||
print("EUR-LEX REPLACEMENT REPORT")
|
||||
print("=" * 90)
|
||||
print(f"{'CELEX':<15} {'Name':<30} {'Status':<10} {'Old':>5} {'New':>5} {'Sect%':>6}")
|
||||
print("-" * 90)
|
||||
for r in results:
|
||||
reg = r["reg"]
|
||||
status = r["status"]
|
||||
old = r.get("old_chunks", "")
|
||||
new = r.get("new_chunks", r.get("html_size", ""))
|
||||
sect = f"{r.get('section_rate', '')}%" if "section_rate" in r else ""
|
||||
print(f"{reg['celex']:<15} {reg['name'][:30]:<30} {status:<10} {str(old):>5} {str(new):>5} {sect:>6}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user