#!/usr/bin/env python3 """Ingest CRA-relevant ENISA documents into the RAG (collection `bp_compliance_ce`). Source files live under `legal-sources/enisa/` in this repo. The script extracts PDF text with pdfplumber (HTML for the SRP FAQ), normalizes it, and uploads via the RAG service with `chunk_strategy='legal'` so that section metadata is attached to every chunk. Each document carries a `requirement_strength` field so downstream consumers can distinguish normative material from guidance and consultation drafts: - mandatory — binding (none in this batch; CRA itself is the law) - guidance — official ENISA / EUCC guidance, citable - consultation_draft — public-consultation drafts (use with caveat) Usage (run on Mac Mini after copying the legal-sources/enisa/ folder, or via SSH with the repo mounted): python3 control-pipeline/scripts/ingest_enisa_cra.py --dry-run python3 control-pipeline/scripts/ingest_enisa_cra.py """ import argparse import json import re import sys import time import unicodedata from html.parser import HTMLParser from pathlib import Path import httpx import pdfplumber RAG_URL = "https://localhost:8097" QDRANT_URL = "http://localhost:6333" UPLOAD_TIMEOUT = 1800.0 COLLECTION = "bp_compliance_ce" REPO_ROOT = Path(__file__).resolve().parents[2] SOURCE_DIR = REPO_ROOT / "legal-sources" / "enisa" DOCS = [ { "regulation_id": "enisa_cra_requirements_standards_mapping", "filename": "enisa_cra_requirements_standards_mapping.pdf", "upload_filename": "enisa_cra_requirements_standards_mapping.txt", "extra_metadata": { "regulation_id": "enisa_cra_requirements_standards_mapping", "regulation_short": "ENISA CRA Standards Mapping", "guideline_name": "Cyber Resilience Act Requirements Standards Mapping", "doc_type": "standards_mapping", "requirement_strength": "guidance", "publication_year": "2024", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_cra_implementation_via_eucc", "filename": "enisa_cra_implementation_via_eucc.pdf", "upload_filename": "enisa_cra_implementation_via_eucc.txt", "extra_metadata": { "regulation_id": "enisa_cra_implementation_via_eucc", "regulation_short": "ENISA CRA via EUCC", "guideline_name": "CRA Implementation via EUCC and its Applicable Technical Elements", "doc_type": "certification_guidance", "requirement_strength": "guidance", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_cra_implementation_via_eucc_annex", "filename": "enisa_cra_implementation_via_eucc_annex.pdf", "upload_filename": "enisa_cra_implementation_via_eucc_annex.txt", "extra_metadata": { "regulation_id": "enisa_cra_implementation_via_eucc_annex", "regulation_short": "ENISA CRA via EUCC (Annex)", "guideline_name": "Annex — CRA Implementation via EUCC", "doc_type": "certification_guidance_annex", "requirement_strength": "guidance", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_eucc_vulnerability_management_disclosure", "filename": "enisa_eucc_vulnerability_management_disclosure.pdf", "upload_filename": "enisa_eucc_vulnerability_management_disclosure.txt", "extra_metadata": { "regulation_id": "enisa_eucc_vulnerability_management_disclosure", "regulation_short": "EUCC Vuln Management & Disclosure", "guideline_name": "EUCC Guidelines — Vulnerability Management and Disclosure v1.1", "doc_type": "vulnerability_guidance", "requirement_strength": "guidance", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_eccg_opinion_vulnerability_management", "filename": "enisa_eccg_opinion_vulnerability_management.pdf", "upload_filename": "enisa_eccg_opinion_vulnerability_management.txt", "extra_metadata": { "regulation_id": "enisa_eccg_opinion_vulnerability_management", "regulation_short": "ECCG Opinion Vuln Management", "guideline_name": "Final ECCG Opinion — Guidance on Vulnerability Management", "doc_type": "eccg_opinion", "requirement_strength": "guidance", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_nis2_technical_implementation_guidance", "filename": "enisa_nis2_technical_implementation_guidance.pdf", "upload_filename": "enisa_nis2_technical_implementation_guidance.txt", "extra_metadata": { "regulation_id": "enisa_nis2_technical_implementation_guidance", "regulation_short": "ENISA NIS2 TIG v1.0", "guideline_name": "ENISA Technical Implementation Guidance on Cybersecurity Risk Management Measures v1.0", "doc_type": "technical_guidance", "requirement_strength": "guidance", "publication_year": "2025", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_nis2_security_measures_consultation", "filename": "enisa_nis2_security_measures_implementation_guidance_consultation.pdf", "upload_filename": "enisa_nis2_security_measures_consultation.txt", "extra_metadata": { "regulation_id": "enisa_nis2_security_measures_consultation", "regulation_short": "ENISA NIS2 Security Measures (Draft)", "guideline_name": "Implementation Guidance on Security Measures — Public Consultation Draft", "doc_type": "consultation_draft", "requirement_strength": "consultation_draft", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_cra_single_reporting_platform_faq", "filename": "enisa_cra_single_reporting_platform_faq.html", "upload_filename": "enisa_cra_single_reporting_platform_faq.txt", "extra_metadata": { "regulation_id": "enisa_cra_single_reporting_platform_faq", "regulation_short": "ENISA SRP FAQ", "guideline_name": "CRA Single Reporting Platform (SRP) FAQ", "doc_type": "faq", "requirement_strength": "guidance", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_eucc_evaluation_methodology_product_series", "filename": "enisa_eucc_evaluation_methodology_product_series.pdf", "upload_filename": "enisa_eucc_evaluation_methodology_product_series.txt", "extra_metadata": { "regulation_id": "enisa_eucc_evaluation_methodology_product_series", "regulation_short": "EUCC Eval Methodology Product Series", "guideline_name": "EUCC Guidelines — Evaluation Methodology for Product Series v1.0", "doc_type": "evaluation_methodology", "requirement_strength": "guidance", "publication_year": "2025", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_threat_landscape_2025", "filename": "enisa_threat_landscape_2025.pdf", "upload_filename": "enisa_threat_landscape_2025.txt", "extra_metadata": { "regulation_id": "enisa_threat_landscape_2025", "regulation_short": "ENISA Threat Landscape 2025", "guideline_name": "ENISA Threat Landscape 2025 v1.2", "doc_type": "threat_landscape", "requirement_strength": "evidentiary", "publication_year": "2025", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, { "regulation_id": "enisa_cvd_policies_eu_2022", "filename": "enisa_cvd_policies_eu_2022.pdf", "upload_filename": "enisa_cvd_policies_eu_2022.txt", "extra_metadata": { "regulation_id": "enisa_cvd_policies_eu_2022", "regulation_short": "ENISA CVD Policies EU 2022", "guideline_name": "Coordinated Vulnerability Disclosure Policies in the EU (2022)", "doc_type": "policy_study", "requirement_strength": "guidance", "publication_year": "2022", "license": "reuse_with_attribution", "source": "enisa.europa.eu", "attribution": "ENISA, CC BY 4.0", }, }, ] def normalize_text(text: str) -> str: text = unicodedata.normalize("NFKC", text) text = text.replace("­", "").replace("​", "") prev = None while prev != text: prev = text text = re.sub(r"(\d+)\s+\.\s+(\d+)", r"\1.\2", text) text = re.sub(r"\b([A-Z]{2,4})\s+-\s+(\d+)\b", r"\1-\2", text) text = re.sub(r"\(\s+(\d+)\s+\)", r"(\1)", text) text = re.sub(r"[^\S\n]{2,}", " ", text) return text class _HTMLToText(HTMLParser): SKIP = {"script", "style", "nav", "header", "footer", "noscript"} BLOCK = {"p", "div", "li", "br", "h1", "h2", "h3", "h4", "h5", "h6", "tr", "section"} def __init__(self) -> None: super().__init__() self._buf: list[str] = [] self._skip_depth = 0 def handle_starttag(self, tag, attrs): if tag in self.SKIP: self._skip_depth += 1 if tag in self.BLOCK: self._buf.append("\n") def handle_endtag(self, tag): if tag in self.SKIP and self._skip_depth > 0: self._skip_depth -= 1 if tag in self.BLOCK: self._buf.append("\n") def handle_data(self, data): if self._skip_depth == 0: self._buf.append(data) def text(self) -> str: raw = "".join(self._buf) raw = re.sub(r"\n{3,}", "\n\n", raw) return raw.strip() def extract_pdf(path: Path) -> str: print(f" Extracting PDF: {path.name}") parts: list[str] = [] with pdfplumber.open(path) as pdf: for i, page in enumerate(pdf.pages): t = page.extract_text(x_tolerance=3, y_tolerance=4) if t: parts.append(t) if (i + 1) % 50 == 0: print(f" {i + 1}/{len(pdf.pages)} pages...") return normalize_text("\n\n".join(parts)) def extract_html(path: Path) -> str: print(f" Extracting HTML: {path.name}") html = path.read_text(encoding="utf-8", errors="replace") parser = _HTMLToText() parser.feed(html) return normalize_text(parser.text()) def get_text(doc) -> str: path = SOURCE_DIR / doc["filename"] if not path.exists(): raise FileNotFoundError(path) if path.suffix.lower() == ".pdf": text = extract_pdf(path) elif path.suffix.lower() in {".html", ".htm"}: text = extract_html(path) else: raise ValueError(f"Unsupported file type: {path.suffix}") print(f" Extracted {len(text):,} chars") return text def upload_text_legal(text: str, filename: str, extra_metadata: dict) -> dict: form_data = { "collection": COLLECTION, "data_type": "compliance", "bundesland": "bund", "use_case": "compliance", "year": "2026", "chunk_strategy": "legal", "chunk_size": "1500", "chunk_overlap": "100", "metadata_json": json.dumps(extra_metadata, ensure_ascii=False), } with httpx.Client(timeout=UPLOAD_TIMEOUT, verify=False) as c: resp = c.post( f"{RAG_URL}/api/v1/documents/upload", files={"file": (filename, text.encode("utf-8"), "text/plain")}, data=form_data, ) resp.raise_for_status() return resp.json() def count_chunks(regulation_id: str) -> int: with httpx.Client(timeout=30) as c: resp = c.post( f"{QDRANT_URL}/collections/{COLLECTION}/points/count", json={ "filter": { "must": [ {"key": "regulation_id", "match": {"value": regulation_id}} ] }, "exact": True, }, ) resp.raise_for_status() return resp.json()["result"]["count"] def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true", help="Extract text and report sizes, but do not upload.") parser.add_argument("--only", action="append", default=[], help="Limit run to one or more regulation_ids.") args = parser.parse_args() if not SOURCE_DIR.exists(): print(f"ERROR: source dir not found: {SOURCE_DIR}") return 2 docs = DOCS if args.only: wanted = set(args.only) docs = [d for d in DOCS if d["regulation_id"] in wanted] missing = wanted - {d["regulation_id"] for d in docs} if missing: print(f"ERROR: unknown regulation_id(s): {sorted(missing)}") return 2 print("=" * 70) print(f"ENISA CRA ingestion → collection={COLLECTION}") print(f"Source dir: {SOURCE_DIR}") print(f"Documents: {len(docs)} Dry run: {args.dry_run}") print("=" * 70) results = [] for i, doc in enumerate(docs, 1): reg_id = doc["regulation_id"] print(f"\n[{i}/{len(docs)}] {reg_id}") existing = count_chunks(reg_id) if not args.dry_run else "?" print(f" Existing chunks in Qdrant: {existing}") try: text = get_text(doc) except Exception as e: print(f" ERROR extracting text: {e}") results.append({"id": reg_id, "chars": 0, "new": 0, "strength": doc["extra_metadata"]["requirement_strength"]}) continue if args.dry_run: results.append({"id": reg_id, "chars": len(text), "new": "?", "strength": doc["extra_metadata"]["requirement_strength"]}) continue if existing and existing > 0: print(f" SKIP — {existing} chunks already present. " f"Use Qdrant delete-by-filter before re-ingesting.") results.append({"id": reg_id, "chars": len(text), "new": 0, "strength": doc["extra_metadata"]["requirement_strength"]}) continue print(" Uploading with chunk_strategy='legal'...") result = upload_text_legal( text, doc["upload_filename"], doc["extra_metadata"] ) new_chunks = result.get("chunks_count", 0) new_doc_id = result.get("document_id", "") print(f" -> {new_chunks} chunks (doc_id={new_doc_id})") results.append({"id": reg_id, "chars": len(text), "new": new_chunks, "strength": doc["extra_metadata"]["requirement_strength"]}) if i < len(docs): time.sleep(2) print("\n" + "=" * 70) print("SUMMARY") print("=" * 70) for r in results: print(f" {r['id']:<55} chars={r['chars']:<9} new={r['new']:<5} " f"strength={r['strength']}") total_new = sum(r["new"] for r in results if isinstance(r["new"], int)) print(f"\nTotal new chunks: {total_new}") return 0 if __name__ == "__main__": sys.exit(main())