Files
breakpilot-compliance/scripts/qa/pdf_qa_all.py
Benjamin Admin 9b0f25c105
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 36s
CI/CD / test-python-backend-compliance (push) Successful in 32s
CI/CD / test-python-document-crawler (push) Successful in 22s
CI/CD / test-python-dsms-gateway (push) Successful in 19s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
chore(qa): add PDF-based control QA scripts and results
QA pipeline that matches control source_original_text directly against
original PDF documents to verify article/paragraph assignments. Covers
backfill, dedup, source normalization, Qdrant cleanup, and prod sync.

Key results (2026-03-20):
- 4,110/7,943 controls matched to PDF (100% for major EU regs)
- 3,366 article corrections, 705 new assignments
- 1,290 controls from Erwägungsgründe (preamble) identified
- 779 controls from Anhänge (annexes) identified

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 00:56:13 +01:00

476 lines
17 KiB
Python

"""
PDF-based QA: Match ALL controls' source_original_text against original PDFs.
Determine exact article/section/paragraph for each control.
Handle: EU regulations (Artikel), German laws (§), NIST sections, OWASP categories,
Erwägungsgründe (preamble), Anhänge (annexes).
"""
import os
import re
import json
import unicodedata
import psycopg2
import urllib.parse
from pathlib import Path
try:
import fitz # PyMuPDF
HAS_FITZ = True
except ImportError:
HAS_FITZ = False
PDF_DIR = Path(os.path.expanduser("~/rag-ingestion/pdfs"))
TEXT_DIR = Path(os.path.expanduser("~/rag-ingestion/texts"))
# ── Source name → file path mapping ──────────────────────────────────
SOURCE_FILE_MAP = {
# EU Regulations (PDFs)
"KI-Verordnung (EU) 2024/1689": "ai_act_2024_1689.pdf",
"Maschinenverordnung (EU) 2023/1230": "machinery_regulation_2023_1230.pdf",
"Cyber Resilience Act (CRA)": "cra_2024_2847.pdf",
"EU Blue Guide 2022": "blue_guide_2022.pdf",
"Markets in Crypto-Assets (MiCA)": "mica_2023_1114.pdf",
"DSGVO (EU) 2016/679": "dsgvo_2016_679.pdf",
"Batterieverordnung (EU) 2023/1542": "battery_2023_1542.pdf",
"NIS2-Richtlinie (EU) 2022/2555": "nis2_2022_2555.pdf",
"AML-Verordnung": "amlr_2024_1624.pdf",
"Data Governance Act (DGA)": "dga_2022_868.pdf",
"Data Act": "dataact_2023_2854.pdf",
"GPSR (EU) 2023/988": "gpsr_2023_988.pdf",
"IFRS-Übernahmeverordnung": "ifrs_regulation_2023_1803_de.pdf",
# NIST (PDFs)
"NIST SP 800-53 Rev. 5": None, # TODO: Need to find/download
"NIST SP 800-207 (Zero Trust)": None,
"NIST SP 800-63-3": None,
"NIST AI Risk Management Framework": None,
"NIST SP 800-218 (SSDF)": "nist_sp_800_218_ssdf.pdf",
"NIST Cybersecurity Framework 2.0": "nist_csf_2_0.pdf",
# OWASP (no PDFs — these are web-based)
"OWASP Top 10 (2021)": None,
"OWASP ASVS 4.0": None,
"OWASP SAMM 2.0": None,
"OWASP API Security Top 10 (2023)": None,
"OWASP MASVS 2.0": None,
# ENISA (PDFs)
"ENISA ICS/SCADA Dependencies": None,
"ENISA Supply Chain Good Practices": "enisa_supply_chain_security.pdf",
"ENISA Threat Landscape Supply Chain": "enisa_supply_chain_security.pdf",
"ENISA Cybersecurity State 2024": None,
"CISA Secure by Design": "enisa_secure_by_design.pdf",
# German laws (PDFs or TXT)
"Bundesdatenschutzgesetz (BDSG)": "bdsg.pdf",
"Gewerbeordnung (GewO)": "gewo.pdf",
"Handelsgesetzbuch (HGB)": "hgb.pdf",
"Abgabenordnung (AO)": "ao.pdf",
# Austrian DSG
"Österreichisches Datenschutzgesetz (DSG)": None, # ris HTML
# EDPB Guidelines (PDFs)
"EDPB Leitlinien 01/2022 (BCR)": "edpb_bcr_01_2022.pdf",
"EDPB Leitlinien 05/2020 - Einwilligung": None, # txt
"EDPB Leitlinien 08/2020 (Social Media)": "edpb_social_media_08_2020.pdf",
"EDPB Leitlinien 01/2019 (Zertifizierung)": "edpb_certification_01_2019.pdf",
"EDPB Leitlinien 07/2020 (Datentransfers)": "edpb_transfers_07_2020.pdf",
"EDPB Leitlinien 09/2022 (Data Breach)": "edpb_breach_09_2022.pdf",
"EDPB Leitlinien - Berechtigtes Interesse (Art. 6(1)(f))": "edpb_legitimate_interest.pdf",
"EDPB Leitlinien 01/2024 (Berechtigtes Interesse)": "edpb_legitimate_interest.pdf",
"EDPB Leitlinien 04/2019 (Data Protection by Design)": None, # txt
"EDPB Leitlinien 01/2020 (Vernetzte Fahrzeuge)": "edpb_connected_vehicles_01_2020.pdf",
"EDPB Leitlinien 01/2020 (Datentransfers)": "edpb_transfers_07_2020.pdf",
# WP (Working Party) Guidelines
"WP244 Leitlinien (Profiling)": "edpb_wp251_profiling.pdf",
"WP251 Leitlinien (Profiling)": "edpb_wp251_profiling.pdf",
"WP260 Leitlinien (Transparenz)": "edpb_wp260_transparency.pdf",
# OECD
"OECD KI-Empfehlung": "oecd_ai_principles.pdf",
}
# ── Document type classification ─────────────────────────────────────
DOC_TYPE_MAP = {
# EU regulations: "Artikel N"
"eu_regulation": [
"KI-Verordnung", "Maschinenverordnung", "Cyber Resilience",
"Blue Guide", "MiCA", "DSGVO", "Batterieverordnung", "NIS2",
"AML-Verordnung", "Data Governance", "Data Act", "GPSR",
"IFRS", "Markets in Crypto",
],
# German laws: "§ N"
"de_law": [
"BDSG", "GewO", "HGB", "Abgabenordnung",
],
# NIST: "Section X.Y" or control families "AC-1"
"nist": [
"NIST SP", "NIST Cybersecurity", "NIST AI",
],
# OWASP: "A01:2021" or "V1.1"
"owasp": [
"OWASP",
],
# EDPB: numbered paragraphs or sections
"edpb": [
"EDPB", "WP244", "WP251", "WP260",
],
# ENISA: sections
"enisa": [
"ENISA", "CISA",
],
}
def classify_doc(source_name):
"""Classify document type based on source name."""
if not source_name:
return "unknown"
for doc_type, keywords in DOC_TYPE_MAP.items():
for kw in keywords:
if kw.lower() in source_name.lower():
return doc_type
return "unknown"
def normalize(s):
"""Remove soft hyphens, normalize whitespace."""
s = s.replace('\u00ad', '').replace('\xad', '')
s = s.replace('\u200b', '').replace('\u00a0', ' ')
s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl') # ligatures
s = unicodedata.normalize('NFC', s)
s = re.sub(r'\s+', ' ', s)
return s.strip()
def read_file(filename):
"""Read PDF or text file, return full text."""
path = PDF_DIR / filename
if not path.exists():
# Try text dir
txt_name = path.stem + ".txt"
txt_path = TEXT_DIR / txt_name
if txt_path.exists():
return txt_path.read_text(encoding='utf-8', errors='replace')
return None
if path.suffix == '.pdf':
if not HAS_FITZ:
return None
doc = fitz.open(str(path))
text = ""
for page in doc:
text += page.get_text() + "\n"
doc.close()
return text
elif path.suffix in ('.txt', '.html'):
return path.read_text(encoding='utf-8', errors='replace')
return None
def build_eu_article_index(text, max_article=None):
"""Build article heading index for EU regulations.
Returns list of (position, label, type) where type is 'article', 'preamble', 'annex'."""
items = []
# Find Erwägungsgründe (recitals) — numbered (1), (2), etc. before Artikel 1
# Find where Artikel 1 starts
art1_match = re.search(r'\nArtikel\s+1\s*\n', text)
art1_pos = art1_match.start() if art1_match else len(text)
# Recital markers before Artikel 1
for m in re.finditer(r'(?:^|\n)\s*\((\d+)\)', text[:art1_pos]):
items.append((m.start(), f"Erwägungsgrund ({m.group(1)})", "preamble"))
# Article headings: "Artikel N" on its own line
for m in re.finditer(r'(?:^|\n)\s*Artikel\s+(\d+[a-z]?)\s*\n', text, re.MULTILINE):
art_num_str = m.group(1)
art_num = int(re.match(r'(\d+)', art_num_str).group(1))
# Filter by max article number if known
if max_article and art_num > max_article:
continue
items.append((m.start(), f"Artikel {art_num_str}", "article"))
# Anhang/Annex markers
for m in re.finditer(r'(?:^|\n)\s*ANHANG\s+([IVXLC]+[a-z]?)\b', text, re.MULTILINE):
items.append((m.start(), f"Anhang {m.group(1)}", "annex"))
# Also try "Anhang" without Roman numeral (single annex)
for m in re.finditer(r'(?:^|\n)\s*ANHANG\s*\n', text, re.MULTILINE):
items.append((m.start(), f"Anhang", "annex"))
items.sort(key=lambda x: x[0])
# Deduplicate: keep first occurrence of each label
seen = set()
unique = []
for pos, label, typ in items:
if label not in seen:
seen.add(label)
unique.append((pos, label, typ))
return unique
def build_de_law_index(text):
"""Build section index for German laws (§ N)."""
items = []
for m in re.finditer(r'(?:^|\n)\s*§\s+(\d+[a-z]?)\b', text, re.MULTILINE):
items.append((m.start(), f"§ {m.group(1)}", "section"))
items.sort(key=lambda x: x[0])
seen = set()
unique = []
for pos, label, typ in items:
if label not in seen:
seen.add(label)
unique.append((pos, label, typ))
return unique
def build_nist_index(text):
"""Build section index for NIST documents."""
items = []
# NIST sections: "2.1 Section Name" or control families "AC-1"
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
items.append((m.start(), f"Section {m.group(1)}", "section"))
# Control families
for m in re.finditer(r'(?:^|\n)\s*([A-Z]{2}-\d+)\b', text, re.MULTILINE):
items.append((m.start(), f"{m.group(1)}", "control"))
items.sort(key=lambda x: x[0])
seen = set()
unique = []
for pos, label, typ in items:
if label not in seen:
seen.add(label)
unique.append((pos, label, typ))
return unique
def build_generic_index(text):
"""Build a generic section index using numbered headings."""
items = []
# Try section numbers: "1.", "1.1", "1.1.1"
for m in re.finditer(r'(?:^|\n)\s*(\d+(?:\.\d+)*)\.\s+[A-Z]', text, re.MULTILINE):
items.append((m.start(), f"Section {m.group(1)}", "section"))
items.sort(key=lambda x: x[0])
seen = set()
unique = []
for pos, label, typ in items:
if label not in seen:
seen.add(label)
unique.append((pos, label, typ))
return unique
# Known max article numbers for EU regulations
MAX_ARTICLES = {
"Batterieverordnung (EU) 2023/1542": 96,
"KI-Verordnung (EU) 2024/1689": 113,
"Maschinenverordnung (EU) 2023/1230": 54,
"Cyber Resilience Act (CRA)": 71,
"NIS2-Richtlinie (EU) 2022/2555": 46,
"DSGVO (EU) 2016/679": 99,
"Markets in Crypto-Assets (MiCA)": 149,
"AML-Verordnung": 95,
"Data Governance Act (DGA)": 38,
"Data Act": 50,
"GPSR (EU) 2023/988": 52,
}
def find_text_in_doc(orig_text, full_norm, index, index_norm_positions):
"""Find control text in document and return (article_label, article_type) or None."""
orig_norm = normalize(orig_text)
if len(orig_norm) < 30:
return None
# Try progressively shorter substrings from different positions
for start_frac in [0.25, 0.1, 0.5, 0.0]:
for length in [80, 60, 40, 30]:
start = max(0, int(len(orig_norm) * start_frac))
snippet = orig_norm[start:start+length]
if not snippet or len(snippet) < 25:
continue
pos = full_norm.find(snippet)
if pos >= 0:
# Find which section precedes this position
label = "Unknown"
typ = "unknown"
for h_pos, h_label, h_type in reversed(index_norm_positions):
if h_pos <= pos:
label = h_label
typ = h_type
break
return (label, typ)
return None
# ── Main ─────────────────────────────────────────────────────────────
def main():
db_url = os.environ['DATABASE_URL']
parsed = urllib.parse.urlparse(db_url)
conn = psycopg2.connect(
host=parsed.hostname, port=parsed.port or 5432,
user=parsed.username, password=parsed.password,
dbname=parsed.path.lstrip('/'),
options="-c search_path=compliance,public"
)
cur = conn.cursor()
# Get all controls with source_original_text
cur.execute("""
SELECT id, control_id, title, source_original_text,
source_citation->>'source' as source_name,
source_citation->>'article' as existing_article,
source_citation as citation_json,
release_state
FROM compliance.canonical_controls
WHERE source_original_text IS NOT NULL
AND length(source_original_text) > 50
ORDER BY source_citation->>'source', control_id
""")
controls = cur.fetchall()
print(f"Total controls with source text: {len(controls)}")
# Group by source
by_source = {}
for ctrl in controls:
src = ctrl[4] or "(null)"
by_source.setdefault(src, []).append(ctrl)
# Process each source
total_found = 0
total_not_found = 0
total_updated = 0
total_new_article = 0
total_changed = 0
total_skipped_no_file = 0
updates = [] # (ctrl_id, new_article_label, article_type)
for source_name in sorted(by_source.keys(), key=lambda s: -len(by_source[s])):
ctrls = by_source[source_name]
filename = SOURCE_FILE_MAP.get(source_name)
doc_type = classify_doc(source_name)
if filename is None:
total_skipped_no_file += len(ctrls)
active = sum(1 for c in ctrls if c[7] not in ('duplicate', 'too_close'))
print(f"\n{'='*60}")
print(f"SKIP: {source_name} ({len(ctrls)} controls, {active} active) — no PDF")
continue
# Read file
text = read_file(filename)
if text is None:
total_skipped_no_file += len(ctrls)
print(f"\n{'='*60}")
print(f"SKIP: {source_name} — file not readable: {filename}")
continue
text_norm = normalize(text)
# Build index based on doc type
max_art = MAX_ARTICLES.get(source_name)
if doc_type == "eu_regulation":
index = build_eu_article_index(text, max_article=max_art)
elif doc_type == "de_law":
index = build_de_law_index(text)
elif doc_type == "nist":
index = build_nist_index(text)
else:
index = build_generic_index(text)
# Precompute normalized positions
index_norm = []
for pos, label, typ in index:
norm_pos = len(normalize(text[:pos]))
index_norm.append((norm_pos, label, typ))
active = sum(1 for c in ctrls if c[7] not in ('duplicate', 'too_close'))
print(f"\n{'='*60}")
print(f"{source_name} ({len(ctrls)} controls, {active} active)")
print(f" File: {filename} ({len(text):,} chars)")
print(f" Index: {len(index)} sections ({doc_type})")
src_found = 0
src_not_found = 0
for ctrl in ctrls:
ctrl_id, control_id, title, orig_text, _, existing_art, citation_json, state = ctrl
result = find_text_in_doc(orig_text, text_norm, index, index_norm)
if result:
new_label, art_type = result
src_found += 1
total_found += 1
# Compare with existing
existing_clean = (existing_art or "").strip()
if not existing_clean:
status = "NEW"
total_new_article += 1
elif existing_clean == new_label:
status = "OK"
else:
status = f"CHANGED({existing_clean}{new_label})"
total_changed += 1
updates.append((ctrl_id, new_label, art_type, control_id, source_name))
if status != "OK":
is_active = "" if state not in ('duplicate', 'too_close') else " [DUP]"
print(f" {control_id:10s}: {new_label:25s} [{art_type:8s}] {status}{is_active}")
else:
src_not_found += 1
total_not_found += 1
print(f" {control_id:10s}: NOT FOUND {title[:50]}")
pct = src_found / len(ctrls) * 100 if ctrls else 0
print(f"{src_found}/{len(ctrls)} matched ({pct:.0f}%)")
# ── Summary ──────────────────────────────────────────────────────
print(f"\n{'='*60}")
print("SUMMARY")
print(f"{'='*60}")
print(f" Total controls with text: {len(controls)}")
print(f" Matched to PDF: {total_found}")
print(f" Not found in PDF: {total_not_found}")
print(f" Skipped (no PDF file): {total_skipped_no_file}")
print(f" New articles assigned: {total_new_article}")
print(f" Articles changed: {total_changed}")
# Save results for later application
results = []
for ctrl_id, label, art_type, control_id, source in updates:
results.append({
"ctrl_id": str(ctrl_id),
"control_id": control_id,
"source": source,
"article_label": label,
"article_type": art_type,
})
out_path = "/tmp/pdf_qa_results.json"
with open(out_path, 'w') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n Results saved to {out_path} ({len(results)} entries)")
# Type distribution
type_counts = {}
for r in results:
t = r["article_type"]
type_counts[t] = type_counts.get(t, 0) + 1
print(f"\n Article type distribution:")
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {t:12s}: {c:5d}")
conn.close()
if __name__ == "__main__":
main()