chore(qa): add PDF-based control QA scripts and results
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 36s
CI/CD / test-python-backend-compliance (push) Successful in 32s
CI/CD / test-python-document-crawler (push) Successful in 22s
CI/CD / test-python-dsms-gateway (push) Successful in 19s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 36s
CI/CD / test-python-backend-compliance (push) Successful in 32s
CI/CD / test-python-document-crawler (push) Successful in 22s
CI/CD / test-python-dsms-gateway (push) Successful in 19s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
QA pipeline that matches control source_original_text directly against original PDF documents to verify article/paragraph assignments. Covers backfill, dedup, source normalization, Qdrant cleanup, and prod sync. Key results (2026-03-20): - 4,110/7,943 controls matched to PDF (100% for major EU regs) - 3,366 article corrections, 705 new assignments - 1,290 controls from Erwägungsgründe (preamble) identified - 779 controls from Anhänge (annexes) identified Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
306
scripts/qa/qa_article_map_all_chunks.py
Normal file
306
scripts/qa/qa_article_map_all_chunks.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""
|
||||
Step 2: Build article/paragraph mapping for ALL regulations that have controls.
|
||||
Scan chunks sequentially by chunk_index, track current article heading.
|
||||
|
||||
Handles both EU regulations (Artikel X) and German laws (§ X).
|
||||
"""
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
|
||||
try:
|
||||
import httpx
|
||||
def http_post(url, data, timeout=30):
|
||||
return httpx.post(url, json=data, timeout=timeout).json()
|
||||
except ImportError:
|
||||
import requests
|
||||
def http_post(url, data, timeout=30):
|
||||
return requests.post(url, json=data, timeout=timeout).json()
|
||||
|
||||
from sqlalchemy import create_engine, text as sql_text
|
||||
|
||||
DB_URL = os.environ['DATABASE_URL']
|
||||
QDRANT_URL = os.environ.get('QDRANT_URL', 'http://host.docker.internal:6333')
|
||||
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
|
||||
|
||||
# ── Patterns for different document types ─────────────────────────────
|
||||
|
||||
# EU Regulations: "Artikel 26\n" heading
|
||||
EU_ARTICLE = re.compile(r'(?:^|\n)\s*Artikel\s+(\d+[a-z]?)\b', re.IGNORECASE)
|
||||
# German laws: "§ 26" or "§26"
|
||||
DE_PARAGRAPH = re.compile(r'(?:^|\n)\s*§\s*(\d+[a-z]?)\b')
|
||||
# NIST/OWASP section markers: "A01:2021", "AC-1", "PR.AC-1", etc.
|
||||
NIST_CONTROL = re.compile(r'(?:^|\n)\s*([A-Z]{2}(?:\.[A-Z]{2})?-\d+)', re.MULTILINE)
|
||||
OWASP_SECTION = re.compile(r'(A\d{2}:\d{4}(?:\s*[–—-]\s*[^\n]+)?)')
|
||||
# Absatz/paragraph
|
||||
ABSATZ = re.compile(r'(?:^|\n)\s*\((\d+)\)')
|
||||
# ENISA/CISA sections (numbered)
|
||||
SECTION_NUM = re.compile(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]')
|
||||
|
||||
# Regulation types
|
||||
EU_REGS = {
|
||||
'eu_2016_679', 'eu_2024_1689', 'eu_2022_2555', 'eu_2024_2847',
|
||||
'eu_2023_1230', 'eu_2023_1542', 'eu_2022_2065', 'eu_2022_1925',
|
||||
'eu_2022_868', 'eu_2019_770', 'eu_2021_914', 'eu_2002_58',
|
||||
'eu_2000_31', 'eu_2023_1803', 'eu_2023_988', 'gpsr', 'eucsa',
|
||||
'dataact', 'dora', 'ehds', 'mica', 'psd2', 'dpf', 'dsm', 'amlr',
|
||||
'eaa', 'eu_blue_guide_2022',
|
||||
}
|
||||
DE_LAWS = {
|
||||
'bdsg', 'bdsg_2018_komplett', 'gewo', 'elektrog', 'verpackg',
|
||||
'battdg', 'bfsg', 'ddg', 'uwg', 'de_tkg', 'prodhaftg',
|
||||
'tmg_komplett', 'urhg_komplett', 'bgb_komplett', 'hgb_komplett',
|
||||
'ao_komplett', 'egbgb_komplett', 'de_betrvg', 'de_geschgehg',
|
||||
'vsbg', 'pangv', 'mstv', 'de_dlinfov', 'de_ustg_ret',
|
||||
}
|
||||
OWASP = {
|
||||
'owasp_top10_2021', 'owasp_asvs', 'owasp_samm', 'owasp_api_top10_2023',
|
||||
'owasp_masvs', 'owasp_mobile_top10',
|
||||
}
|
||||
NIST = {
|
||||
'nist_sp800_53r5', 'nist_sp_800_53', 'nist_sp_800_218', 'nist_sp800_218',
|
||||
'nist_sp_800_63b', 'nist_sp800_63_3', 'nist_csf_2_0', 'nist_sp800_207',
|
||||
'nist_ai_rmf', 'nist_privacy_1_0', 'nistir_8259a',
|
||||
}
|
||||
|
||||
|
||||
def scan_regulation(collection, regulation_id):
|
||||
"""Scroll all chunks for a regulation, sorted by chunk_index."""
|
||||
chunks = []
|
||||
offset = None
|
||||
while True:
|
||||
params = {
|
||||
"filter": {"must": [{"key": "regulation_id", "match": {"value": regulation_id}}]},
|
||||
"limit": 250,
|
||||
"with_payload": ["chunk_text", "chunk_index"],
|
||||
"with_vectors": False,
|
||||
}
|
||||
if offset:
|
||||
params["offset"] = offset
|
||||
result = http_post(f"{QDRANT_URL}/collections/{collection}/points/scroll", params, timeout=30)
|
||||
points = result.get("result", {}).get("points", [])
|
||||
next_offset = result.get("result", {}).get("next_page_offset")
|
||||
for p in points:
|
||||
t = p["payload"].get("chunk_text", "")
|
||||
chunks.append({
|
||||
"hash": hashlib.sha256(t.encode()).hexdigest(),
|
||||
"idx": p["payload"].get("chunk_index", 0),
|
||||
"text": t,
|
||||
})
|
||||
if not next_offset:
|
||||
break
|
||||
offset = next_offset
|
||||
chunks.sort(key=lambda c: c["idx"])
|
||||
return chunks
|
||||
|
||||
|
||||
def map_eu_articles(chunks):
|
||||
"""Map EU regulation chunks to Artikel/Absatz."""
|
||||
current_article = ""
|
||||
current_paragraph = ""
|
||||
mapping = {}
|
||||
for c in chunks:
|
||||
m = EU_ARTICLE.search(c["text"])
|
||||
if m:
|
||||
current_article = f"Art. {m.group(1)}"
|
||||
current_paragraph = ""
|
||||
paras = ABSATZ.findall(c["text"])
|
||||
if paras:
|
||||
current_paragraph = f"Abs. {paras[0]}"
|
||||
if current_article:
|
||||
mapping[c["hash"]] = {"article": current_article, "paragraph": current_paragraph}
|
||||
return mapping
|
||||
|
||||
|
||||
def map_de_paragraphs(chunks):
|
||||
"""Map German law chunks to §/Absatz."""
|
||||
current_para = ""
|
||||
current_abs = ""
|
||||
mapping = {}
|
||||
for c in chunks:
|
||||
m = DE_PARAGRAPH.search(c["text"])
|
||||
if m:
|
||||
current_para = f"§ {m.group(1)}"
|
||||
current_abs = ""
|
||||
abs_matches = ABSATZ.findall(c["text"])
|
||||
if abs_matches:
|
||||
current_abs = f"Abs. {abs_matches[0]}"
|
||||
if current_para:
|
||||
mapping[c["hash"]] = {"article": current_para, "paragraph": current_abs}
|
||||
return mapping
|
||||
|
||||
|
||||
def map_owasp(chunks):
|
||||
"""Map OWASP chunks to section markers (A01:2021, etc.)."""
|
||||
current_section = ""
|
||||
mapping = {}
|
||||
for c in chunks:
|
||||
m = OWASP_SECTION.search(c["text"])
|
||||
if m:
|
||||
current_section = m.group(1).strip()
|
||||
# Normalize: take just the code part
|
||||
code_match = re.match(r'(A\d{2}:\d{4})', current_section)
|
||||
if code_match:
|
||||
current_section = code_match.group(1)
|
||||
if current_section:
|
||||
mapping[c["hash"]] = {"article": current_section, "paragraph": ""}
|
||||
return mapping
|
||||
|
||||
|
||||
def map_nist(chunks):
|
||||
"""Map NIST chunks to control families/sections."""
|
||||
current_section = ""
|
||||
mapping = {}
|
||||
for c in chunks:
|
||||
# Try NIST control ID (AC-1, SC-7, etc.)
|
||||
m = NIST_CONTROL.search(c["text"])
|
||||
if m:
|
||||
current_section = m.group(1)
|
||||
# Also try section numbers (2.1, 3.2.1, etc.)
|
||||
if not current_section:
|
||||
m2 = SECTION_NUM.search(c["text"])
|
||||
if m2:
|
||||
current_section = m2.group(1)
|
||||
if current_section:
|
||||
mapping[c["hash"]] = {"article": current_section, "paragraph": ""}
|
||||
return mapping
|
||||
|
||||
|
||||
def map_generic(chunks):
|
||||
"""Generic mapping using section numbers."""
|
||||
current_section = ""
|
||||
mapping = {}
|
||||
for c in chunks:
|
||||
# Try EU article first
|
||||
m = EU_ARTICLE.search(c["text"])
|
||||
if m:
|
||||
current_section = f"Art. {m.group(1)}"
|
||||
else:
|
||||
# Try section numbers
|
||||
m2 = SECTION_NUM.search(c["text"])
|
||||
if m2:
|
||||
current_section = m2.group(1)
|
||||
paras = ABSATZ.findall(c["text"])
|
||||
para = f"Abs. {paras[0]}" if paras else ""
|
||||
if current_section:
|
||||
mapping[c["hash"]] = {"article": current_section, "paragraph": para}
|
||||
return mapping
|
||||
|
||||
|
||||
def map_regulation(collection, regulation_id):
|
||||
"""Map a regulation to articles based on its type."""
|
||||
chunks = scan_regulation(collection, regulation_id)
|
||||
if not chunks:
|
||||
return {}, 0
|
||||
|
||||
if regulation_id in EU_REGS:
|
||||
mapping = map_eu_articles(chunks)
|
||||
elif regulation_id in DE_LAWS:
|
||||
mapping = map_de_paragraphs(chunks)
|
||||
elif regulation_id in OWASP:
|
||||
mapping = map_owasp(chunks)
|
||||
elif regulation_id in NIST:
|
||||
mapping = map_nist(chunks)
|
||||
else:
|
||||
mapping = map_generic(chunks)
|
||||
|
||||
return mapping, len(chunks)
|
||||
|
||||
|
||||
# ── Main: Get all regulations that have controls ─────────────────────
|
||||
with engine.connect() as conn:
|
||||
# Get regulations with controls (skip v1/v2 without citation)
|
||||
r = conn.execute(sql_text("""
|
||||
SELECT DISTINCT
|
||||
generation_metadata->>'source_regulation' as reg,
|
||||
source_citation->>'source' as source_name
|
||||
FROM compliance.canonical_controls
|
||||
WHERE source_citation IS NOT NULL
|
||||
AND generation_metadata->>'source_regulation' IS NOT NULL
|
||||
AND release_state NOT IN ('rejected')
|
||||
ORDER BY 1
|
||||
"""))
|
||||
regulations = [(row[0], row[1]) for row in r.fetchall()]
|
||||
|
||||
print(f"Regulations with controls: {len(regulations)}")
|
||||
|
||||
# Determine which collection each regulation is in
|
||||
# (Most are in bp_compliance_ce, some in bp_compliance_datenschutz)
|
||||
CE_REGS = EU_REGS | {'enisa_ics_scada_dependencies', 'enisa_supply_chain_good_practices',
|
||||
'enisa_threat_landscape_supply_chain', 'enisa_cybersecurity_state_2024',
|
||||
'cisa_secure_by_design', 'oecd_ai_principles', 'nistir_8259a'}
|
||||
DS_REGS = {'owasp_top10_2021', 'owasp_asvs', 'owasp_samm', 'owasp_api_top10_2023',
|
||||
'owasp_masvs', 'owasp_mobile_top10', 'nist_sp800_53r5', 'nist_sp_800_218',
|
||||
'nist_sp800_218', 'nist_sp800_63_3', 'nist_sp800_207', 'nist_csf_2_0',
|
||||
'nist_ai_rmf', 'nist_privacy_1_0', 'nistir_8259a',
|
||||
'edpb_bcr_01_2022', 'edpb_05_2020', 'edpb_09_2022',
|
||||
'edpb_certification_01_2019', 'edpb_connected_vehicles_01_2020',
|
||||
'edpb_dpbd_04_2019', 'edpb_legitimate_interest', 'edpb_legitimate_interest_01_2024',
|
||||
'edpb_social_media_08_2020', 'edpb_transfers_01_2020', 'edpb_transfers_07_2020',
|
||||
'edpb_breach_09_2022', 'edpb_01_2020',
|
||||
'wp244_profiling', 'wp251_profiling', 'wp260_transparency',
|
||||
'hleg_trustworthy_ai', 'edpb_guidelines_7_2020'}
|
||||
GE_REGS = DE_LAWS | {'at_dsg', 'at_tkg', 'es_lopdgdd', 'fr_loi_informatique',
|
||||
'hu_info_tv', 'bsi_200_1', 'bsi_200_2', 'bsi_200_3', 'bsi_200_4',
|
||||
'bsi_c5_2020'}
|
||||
|
||||
# Build all mappings
|
||||
all_mappings = {} # chunk_hash -> {article, paragraph}
|
||||
stats = [] # (reg_id, total_chunks, mapped_chunks)
|
||||
|
||||
for reg_id, source_name in regulations:
|
||||
# Skip eu_2023_988 (duplicate of gpsr)
|
||||
if reg_id == 'eu_2023_988':
|
||||
continue
|
||||
|
||||
# Determine collection
|
||||
if reg_id in CE_REGS or reg_id.startswith('eu_') or reg_id.startswith('enisa_') or reg_id.startswith('cisa_') or reg_id.startswith('oecd_'):
|
||||
collection = 'bp_compliance_ce'
|
||||
elif reg_id in DS_REGS or reg_id.startswith('owasp_') or reg_id.startswith('nist_') or reg_id.startswith('edpb_') or reg_id.startswith('wp') or reg_id.startswith('hleg_'):
|
||||
collection = 'bp_compliance_datenschutz'
|
||||
elif reg_id in GE_REGS or reg_id.startswith('bsi_') or reg_id.startswith('at_') or reg_id.startswith('ch_'):
|
||||
collection = 'bp_compliance_gesetze'
|
||||
else:
|
||||
collection = 'bp_compliance_ce' # default
|
||||
|
||||
sys.stdout.write(f"\r Mapping {reg_id:40s} ({collection})...")
|
||||
sys.stdout.flush()
|
||||
|
||||
mapping, total = map_regulation(collection, reg_id)
|
||||
|
||||
# If not found in first collection, try others
|
||||
if total == 0:
|
||||
for alt_coll in ['bp_compliance_ce', 'bp_compliance_datenschutz', 'bp_compliance_gesetze']:
|
||||
if alt_coll != collection:
|
||||
mapping, total = map_regulation(alt_coll, reg_id)
|
||||
if total > 0:
|
||||
collection = alt_coll
|
||||
break
|
||||
|
||||
all_mappings.update(mapping)
|
||||
stats.append((reg_id, source_name, total, len(mapping), collection))
|
||||
|
||||
print(f"\r{'=' * 70}")
|
||||
print(f"ARTICLE MAPPING RESULTS")
|
||||
print(f"{'=' * 70}")
|
||||
print(f"\n {'Regulation':35s} {'Source':35s} {'Chunks':>6s} {'Mapped':>7s} {'%':>5s}")
|
||||
print(f" {'-' * 90}")
|
||||
|
||||
total_chunks = 0
|
||||
total_mapped = 0
|
||||
for reg_id, source_name, chunks, mapped, coll in sorted(stats, key=lambda x: -x[2]):
|
||||
pct = f"{mapped/chunks*100:.0f}%" if chunks > 0 else "N/A"
|
||||
name = (source_name or "")[:35]
|
||||
print(f" {reg_id:35s} {name:35s} {chunks:6d} {mapped:7d} {pct:>5s}")
|
||||
total_chunks += chunks
|
||||
total_mapped += mapped
|
||||
|
||||
print(f"\n TOTAL: {total_chunks} chunks, {total_mapped} mapped ({total_mapped/total_chunks*100:.0f}%)")
|
||||
|
||||
# Save mapping
|
||||
with open("/tmp/all_article_mappings.json", "w") as f:
|
||||
json.dump(all_mappings, f)
|
||||
print(f"\n Saved to /tmp/all_article_mappings.json ({len(all_mappings)} entries)")
|
||||
Reference in New Issue
Block a user