""" PDF-based QA: Match ALL controls' source_original_text against original PDFs. Determine exact article/section/paragraph for each control. Handle: EU regulations (Artikel), German laws (§), NIST sections, OWASP categories, Erwägungsgründe (preamble), Anhänge (annexes). """ import os import re import json import unicodedata import psycopg2 import urllib.parse from pathlib import Path try: import fitz # PyMuPDF HAS_FITZ = True except ImportError: HAS_FITZ = False PDF_DIR = Path(os.path.expanduser("~/rag-ingestion/pdfs")) TEXT_DIR = Path(os.path.expanduser("~/rag-ingestion/texts")) # ── Source name → file path mapping ────────────────────────────────── SOURCE_FILE_MAP = { # EU Regulations (PDFs) "KI-Verordnung (EU) 2024/1689": "ai_act_2024_1689.pdf", "Maschinenverordnung (EU) 2023/1230": "machinery_regulation_2023_1230.pdf", "Cyber Resilience Act (CRA)": "cra_2024_2847.pdf", "EU Blue Guide 2022": "blue_guide_2022.pdf", "Markets in Crypto-Assets (MiCA)": "mica_2023_1114.pdf", "DSGVO (EU) 2016/679": "dsgvo_2016_679.pdf", "Batterieverordnung (EU) 2023/1542": "battery_2023_1542.pdf", "NIS2-Richtlinie (EU) 2022/2555": "nis2_2022_2555.pdf", "AML-Verordnung": "amlr_2024_1624.pdf", "Data Governance Act (DGA)": "dga_2022_868.pdf", "Data Act": "dataact_2023_2854.pdf", "GPSR (EU) 2023/988": "gpsr_2023_988.pdf", "IFRS-Übernahmeverordnung": "ifrs_regulation_2023_1803_de.pdf", # NIST (PDFs) "NIST SP 800-53 Rev. 5": "nist_sp_800_53_r5.pdf", "NIST SP 800-207 (Zero Trust)": "nist_sp_800_207.pdf", "NIST SP 800-63-3": "nist_sp_800_63_3.pdf", "NIST AI Risk Management Framework": "nist_ai_rmf.pdf", "NIST SP 800-218 (SSDF)": "nist_sp_800_218_ssdf.pdf", "NIST Cybersecurity Framework 2.0": "nist_csf_2_0.pdf", # OWASP (PDFs) "OWASP Top 10 (2021)": "owasp_top10_2021.pdf", "OWASP ASVS 4.0": "owasp_asvs_4_0.pdf", "OWASP SAMM 2.0": "owasp_samm_2_0.pdf", "OWASP API Security Top 10 (2023)": "owasp_api_top10_2023.pdf", "OWASP MASVS 2.0": "owasp_masvs_2_0.pdf", # ENISA (PDFs) "ENISA ICS/SCADA Dependencies": "enisa_ics_scada.pdf", "ENISA Supply Chain Good Practices": "enisa_supply_chain_security.pdf", "ENISA Threat Landscape Supply Chain": "enisa_supply_chain_security.pdf", "ENISA Cybersecurity State 2024": None, "CISA Secure by Design": "enisa_secure_by_design.pdf", # German laws (PDFs or TXT) "Bundesdatenschutzgesetz (BDSG)": "bdsg.pdf", "Gewerbeordnung (GewO)": "gewo.pdf", "Handelsgesetzbuch (HGB)": "hgb.pdf", "Abgabenordnung (AO)": "ao.pdf", # Austrian DSG "Österreichisches Datenschutzgesetz (DSG)": None, # ris HTML # EDPB Guidelines (PDFs) "EDPB Leitlinien 01/2022 (BCR)": "edpb_bcr_01_2022.pdf", "EDPB Leitlinien 05/2020 - Einwilligung": "edpb_consent_05_2020.pdf", "EDPB Leitlinien 08/2020 (Social Media)": "edpb_social_media_08_2020.pdf", "EDPB Leitlinien 01/2019 (Zertifizierung)": "edpb_certification_01_2019.pdf", "EDPB Leitlinien 07/2020 (Datentransfers)": "edpb_transfers_07_2020.pdf", "EDPB Leitlinien 09/2022 (Data Breach)": "edpb_breach_09_2022.pdf", "EDPB Leitlinien - Berechtigtes Interesse (Art. 6(1)(f))": "edpb_legitimate_interest.pdf", "EDPB Leitlinien 01/2024 (Berechtigtes Interesse)": "edpb_legitimate_interest.pdf", "EDPB Leitlinien 04/2019 (Data Protection by Design)": "edpb_dpbd_04_2019.pdf", "EDPB Leitlinien 01/2020 (Vernetzte Fahrzeuge)": "edpb_connected_vehicles_01_2020.pdf", "EDPB Leitlinien 01/2020 (Datentransfers)": "edpb_transfers_07_2020.pdf", # WP (Working Party) Guidelines "WP244 Leitlinien (Profiling)": "edpb_wp251_profiling.pdf", "WP251 Leitlinien (Profiling)": "edpb_wp251_profiling.pdf", "WP260 Leitlinien (Transparenz)": "edpb_wp260_transparency.pdf", # OECD "OECD KI-Empfehlung": "oecd_ai_principles.pdf", } # ── Document type classification ───────────────────────────────────── DOC_TYPE_MAP = { # EU regulations: "Artikel N" "eu_regulation": [ "KI-Verordnung", "Maschinenverordnung", "Cyber Resilience", "Blue Guide", "MiCA", "DSGVO", "Batterieverordnung", "NIS2", "AML-Verordnung", "Data Governance", "Data Act", "GPSR", "IFRS", "Markets in Crypto", ], # German laws: "§ N" "de_law": [ "BDSG", "GewO", "HGB", "Abgabenordnung", ], # NIST: "Section X.Y" or control families "AC-1" "nist": [ "NIST SP", "NIST Cybersecurity", "NIST AI", ], # OWASP: "A01:2021" or "V1.1" "owasp": [ "OWASP", ], # EDPB: numbered paragraphs or sections "edpb": [ "EDPB", "WP244", "WP251", "WP260", ], # ENISA: sections "enisa": [ "ENISA", "CISA", ], } def classify_doc(source_name): """Classify document type based on source name.""" if not source_name: return "unknown" for doc_type, keywords in DOC_TYPE_MAP.items(): for kw in keywords: if kw.lower() in source_name.lower(): return doc_type return "unknown" def normalize(s): """Remove soft hyphens, normalize whitespace, handle PDF encoding issues.""" s = s.replace('\u00ad', '').replace('\xad', '') # soft hyphen s = s.replace('\u200b', '').replace('\u00a0', ' ') # zero-width, nbsp s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl') # ligatures s = s.replace('\ufb00', 'ff').replace('\ufb03', 'ffi').replace('\ufb04', 'ffl') s = s.replace('\u2019', "'").replace('\u2018', "'") # smart quotes s = s.replace('\u201c', '"').replace('\u201d', '"') s = s.replace('\u2013', '-').replace('\u2014', '-') # en/em dash s = s.replace('\u2022', '-') # bullet s = s.replace('\u00b7', '-') # middle dot # Remove common PDF artifacts s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s) # control chars s = unicodedata.normalize('NFC', s) s = re.sub(r'\s+', ' ', s) return s.strip() def read_file(filename): """Read PDF or text file, return full text.""" path = PDF_DIR / filename if not path.exists(): # Try text dir txt_name = path.stem + ".txt" txt_path = TEXT_DIR / txt_name if txt_path.exists(): return txt_path.read_text(encoding='utf-8', errors='replace') return None if path.suffix == '.pdf': if not HAS_FITZ: return None doc = fitz.open(str(path)) text = "" for page in doc: text += page.get_text() + "\n" doc.close() return text elif path.suffix in ('.txt', '.html'): return path.read_text(encoding='utf-8', errors='replace') return None def build_eu_article_index(text, max_article=None): """Build article heading index for EU regulations. Returns list of (position, label, type) where type is 'article', 'preamble', 'annex'.""" items = [] # Find Erwägungsgründe (recitals) — numbered (1), (2), etc. before Artikel 1 # Find where Artikel 1 starts art1_match = re.search(r'\nArtikel\s+1\s*\n', text) art1_pos = art1_match.start() if art1_match else len(text) # Recital markers before Artikel 1 for m in re.finditer(r'(?:^|\n)\s*\((\d+)\)', text[:art1_pos]): items.append((m.start(), f"Erwägungsgrund ({m.group(1)})", "preamble")) # Article headings: "Artikel N" on its own line for m in re.finditer(r'(?:^|\n)\s*Artikel\s+(\d+[a-z]?)\s*\n', text, re.MULTILINE): art_num_str = m.group(1) art_num = int(re.match(r'(\d+)', art_num_str).group(1)) # Filter by max article number if known if max_article and art_num > max_article: continue items.append((m.start(), f"Artikel {art_num_str}", "article")) # Anhang/Annex markers for m in re.finditer(r'(?:^|\n)\s*ANHANG\s+([IVXLC]+[a-z]?)\b', text, re.MULTILINE): items.append((m.start(), f"Anhang {m.group(1)}", "annex")) # Also try "Anhang" without Roman numeral (single annex) for m in re.finditer(r'(?:^|\n)\s*ANHANG\s*\n', text, re.MULTILINE): items.append((m.start(), f"Anhang", "annex")) items.sort(key=lambda x: x[0]) # Deduplicate: keep first occurrence of each label seen = set() unique = [] for pos, label, typ in items: if label not in seen: seen.add(label) unique.append((pos, label, typ)) return unique def build_de_law_index(text): """Build section index for German laws (§ N).""" items = [] for m in re.finditer(r'(?:^|\n)\s*§\s+(\d+[a-z]?)\b', text, re.MULTILINE): items.append((m.start(), f"§ {m.group(1)}", "section")) items.sort(key=lambda x: x[0]) seen = set() unique = [] for pos, label, typ in items: if label not in seen: seen.add(label) unique.append((pos, label, typ)) return unique def build_nist_index(text): """Build section index for NIST documents.""" items = [] # NIST sections: "2.1 Section Name" or control families "AC-1" for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE): items.append((m.start(), f"Section {m.group(1)}", "section")) # Control families for m in re.finditer(r'(?:^|\n)\s*([A-Z]{2}-\d+)\b', text, re.MULTILINE): items.append((m.start(), f"{m.group(1)}", "control")) items.sort(key=lambda x: x[0]) seen = set() unique = [] for pos, label, typ in items: if label not in seen: seen.add(label) unique.append((pos, label, typ)) return unique def build_owasp_index(text, source_name): """Build index for OWASP documents.""" items = [] if "Top 10" in source_name and "API" not in source_name: # OWASP Top 10: A01:2021, A02:2021, etc. for m in re.finditer(r'(A\d{2}:\d{4})', text): items.append((m.start(), m.group(1), "category")) elif "API" in source_name: # OWASP API Top 10: API1:2023, API2:2023, etc. for m in re.finditer(r'(API\d+:\d{4})', text): items.append((m.start(), m.group(1), "category")) elif "ASVS" in source_name: # OWASP ASVS: V1.1, V2.1.1, etc. for m in re.finditer(r'(?:^|\n)\s*(V\d+\.\d+(?:\.\d+)?)\b', text, re.MULTILINE): items.append((m.start(), m.group(1), "requirement")) elif "SAMM" in source_name: # OWASP SAMM: practice names like "Strategy & Metrics", "Education & Guidance" # Use section numbers for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE): items.append((m.start(), f"Section {m.group(1)}", "section")) elif "MASVS" in source_name: # OWASP MASVS: MASVS-STORAGE-1, MASVS-CRYPTO-1, etc. for m in re.finditer(r'(MASVS-[A-Z]+-\d+)', text): items.append((m.start(), m.group(1), "requirement")) # Fallback: also find generic section numbers if not items: for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE): items.append((m.start(), f"Section {m.group(1)}", "section")) items.sort(key=lambda x: x[0]) seen = set() unique = [] for pos, label, typ in items: if label not in seen: seen.add(label) unique.append((pos, label, typ)) return unique def build_generic_index(text): """Build a generic section index using numbered headings.""" items = [] # Try section numbers: "1.", "1.1", "1.1.1" for m in re.finditer(r'(?:^|\n)\s*(\d+(?:\.\d+)*)\.\s+[A-Z]', text, re.MULTILINE): items.append((m.start(), f"Section {m.group(1)}", "section")) items.sort(key=lambda x: x[0]) seen = set() unique = [] for pos, label, typ in items: if label not in seen: seen.add(label) unique.append((pos, label, typ)) return unique # Known max article numbers for EU regulations MAX_ARTICLES = { "Batterieverordnung (EU) 2023/1542": 96, "KI-Verordnung (EU) 2024/1689": 113, "Maschinenverordnung (EU) 2023/1230": 54, "Cyber Resilience Act (CRA)": 71, "NIS2-Richtlinie (EU) 2022/2555": 46, "DSGVO (EU) 2016/679": 99, "Markets in Crypto-Assets (MiCA)": 149, "AML-Verordnung": 95, "Data Governance Act (DGA)": 38, "Data Act": 50, "GPSR (EU) 2023/988": 52, } def find_text_in_doc(orig_text, full_norm, index, index_norm_positions): """Find control text in document and return (article_label, article_type) or None.""" orig_norm = normalize(orig_text) if len(orig_norm) < 30: return None # Try progressively shorter substrings from different positions for start_frac in [0.25, 0.1, 0.5, 0.0, 0.75]: for length in [80, 60, 40, 30, 20]: start = max(0, int(len(orig_norm) * start_frac)) snippet = orig_norm[start:start+length] if not snippet or len(snippet) < 15: continue pos = full_norm.find(snippet) if pos >= 0: # Find which section precedes this position label = "Unknown" typ = "unknown" for h_pos, h_label, h_type in reversed(index_norm_positions): if h_pos <= pos: label = h_label typ = h_type break return (label, typ) return None # ── Main ───────────────────────────────────────────────────────────── def main(): db_url = os.environ['DATABASE_URL'] parsed = urllib.parse.urlparse(db_url) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, user=parsed.username, password=parsed.password, dbname=parsed.path.lstrip('/'), options="-c search_path=compliance,public" ) cur = conn.cursor() # Get all controls with source_original_text cur.execute(""" SELECT id, control_id, title, source_original_text, source_citation->>'source' as source_name, source_citation->>'article' as existing_article, source_citation as citation_json, release_state FROM compliance.canonical_controls WHERE source_original_text IS NOT NULL AND length(source_original_text) > 50 ORDER BY source_citation->>'source', control_id """) controls = cur.fetchall() print(f"Total controls with source text: {len(controls)}") # Group by source by_source = {} for ctrl in controls: src = ctrl[4] or "(null)" by_source.setdefault(src, []).append(ctrl) # Process each source total_found = 0 total_not_found = 0 total_updated = 0 total_new_article = 0 total_changed = 0 total_skipped_no_file = 0 updates = [] # (ctrl_id, new_article_label, article_type) for source_name in sorted(by_source.keys(), key=lambda s: -len(by_source[s])): ctrls = by_source[source_name] filename = SOURCE_FILE_MAP.get(source_name) doc_type = classify_doc(source_name) if filename is None: total_skipped_no_file += len(ctrls) active = sum(1 for c in ctrls if c[7] not in ('duplicate', 'too_close')) print(f"\n{'='*60}") print(f"SKIP: {source_name} ({len(ctrls)} controls, {active} active) — no PDF") continue # Read file text = read_file(filename) if text is None: total_skipped_no_file += len(ctrls) print(f"\n{'='*60}") print(f"SKIP: {source_name} — file not readable: {filename}") continue text_norm = normalize(text) # Build index based on doc type max_art = MAX_ARTICLES.get(source_name) if doc_type == "eu_regulation": index = build_eu_article_index(text, max_article=max_art) elif doc_type == "de_law": index = build_de_law_index(text) elif doc_type == "nist": index = build_nist_index(text) elif doc_type == "owasp": index = build_owasp_index(text, source_name) else: index = build_generic_index(text) # Precompute normalized positions index_norm = [] for pos, label, typ in index: norm_pos = len(normalize(text[:pos])) index_norm.append((norm_pos, label, typ)) active = sum(1 for c in ctrls if c[7] not in ('duplicate', 'too_close')) print(f"\n{'='*60}") print(f"{source_name} ({len(ctrls)} controls, {active} active)") print(f" File: {filename} ({len(text):,} chars)") print(f" Index: {len(index)} sections ({doc_type})") src_found = 0 src_not_found = 0 for ctrl in ctrls: ctrl_id, control_id, title, orig_text, _, existing_art, citation_json, state = ctrl result = find_text_in_doc(orig_text, text_norm, index, index_norm) if result: new_label, art_type = result src_found += 1 total_found += 1 # Compare with existing existing_clean = (existing_art or "").strip() if not existing_clean: status = "NEW" total_new_article += 1 elif existing_clean == new_label: status = "OK" else: status = f"CHANGED({existing_clean}→{new_label})" total_changed += 1 updates.append((ctrl_id, new_label, art_type, control_id, source_name)) if status != "OK": is_active = "" if state not in ('duplicate', 'too_close') else " [DUP]" print(f" {control_id:10s}: {new_label:25s} [{art_type:8s}] {status}{is_active}") else: src_not_found += 1 total_not_found += 1 print(f" {control_id:10s}: NOT FOUND {title[:50]}") pct = src_found / len(ctrls) * 100 if ctrls else 0 print(f" → {src_found}/{len(ctrls)} matched ({pct:.0f}%)") # ── Summary ────────────────────────────────────────────────────── print(f"\n{'='*60}") print("SUMMARY") print(f"{'='*60}") print(f" Total controls with text: {len(controls)}") print(f" Matched to PDF: {total_found}") print(f" Not found in PDF: {total_not_found}") print(f" Skipped (no PDF file): {total_skipped_no_file}") print(f" New articles assigned: {total_new_article}") print(f" Articles changed: {total_changed}") # Save results for later application results = [] for ctrl_id, label, art_type, control_id, source in updates: results.append({ "ctrl_id": str(ctrl_id), "control_id": control_id, "source": source, "article_label": label, "article_type": art_type, }) out_path = "/tmp/pdf_qa_results.json" with open(out_path, 'w') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"\n Results saved to {out_path} ({len(results)} entries)") # Type distribution type_counts = {} for r in results: t = r["article_type"] type_counts[t] = type_counts.get(t, 0) + 1 print(f"\n Article type distribution:") for t, c in sorted(type_counts.items(), key=lambda x: -x[1]): print(f" {t:12s}: {c:5d}") conn.close() if __name__ == "__main__": main()