Files
breakpilot-compliance/scripts/qa/blue_guide_en_match.py
Benjamin Admin 643b26618f
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
feat: Control Library UI, dedup migration, QA tooling, docs
- Control Library: parent control display, ObligationTypeBadge,
  GenerationStrategyBadge variants, evidence string fallback
- API: expose parent_control_uuid/id/title in canonical controls
- Fix: DSFA SQLAlchemy 2.0 Row._mapping compatibility
- Migration 074: control_parent_links + control_dedup_reviews tables
- QA scripts: benchmark, gap analysis, OSCAL import, OWASP cleanup,
  phase5 normalize, phase74 gap fill, sync_db, run_job
- Docs: dedup engine, RAG benchmark, lessons learned, pipeline docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 11:56:08 +01:00

201 lines
6.6 KiB
Python

"""Match unmatched Blue Guide controls against the English PDF."""
import os
import re
import json
import unicodedata
import psycopg2
import urllib.parse
try:
import fitz
except ImportError:
print("ERROR: PyMuPDF (fitz) not installed")
exit(1)
PDF_PATH = os.path.expanduser("~/rag-ingestion/pdfs/blue_guide_2022_en.pdf")
def normalize(s):
s = s.replace('\u00ad', '').replace('\xad', '')
s = s.replace('\u200b', '').replace('\u00a0', ' ')
s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl')
s = s.replace('\ufb00', 'ff').replace('\ufb03', 'ffi').replace('\ufb04', 'ffl')
s = s.replace('\u2019', "'").replace('\u2018', "'")
s = s.replace('\u201c', '"').replace('\u201d', '"')
s = s.replace('\u2013', '-').replace('\u2014', '-')
s = s.replace('\u2022', '-').replace('\u00b7', '-')
s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s)
s = unicodedata.normalize('NFC', s)
s = re.sub(r'\s+', ' ', s)
return s.strip()
# Read EN PDF
print(f"Reading {PDF_PATH}...")
doc = fitz.open(PDF_PATH)
text = ""
for page in doc:
text += page.get_text() + "\n"
doc.close()
print(f" {len(text):,} chars")
text_norm = normalize(text)
# Build article index for EN Blue Guide
# EN Blue Guide uses "Article N" headings (not "Artikel N")
items = []
# Find where "Article 1" starts — content before is preamble/intro
art1_match = re.search(r'\nArticle\s+1\s*\n', text)
if not art1_match:
# Try section-based structure instead
print(" No 'Article N' headings found, trying section-based index...")
for m in re.finditer(r'(?:^|\n)\s*(\d+(?:\.\d+)*)\.\s+[A-Z]', text, re.MULTILINE):
items.append((m.start(), f"Section {m.group(1)}", "section"))
else:
art1_pos = art1_match.start()
# Article headings
for m in re.finditer(r'(?:^|\n)\s*Article\s+(\d+[a-z]?)\s*\n', text, re.MULTILINE):
art_num = int(re.match(r'(\d+)', m.group(1)).group(1))
items.append((m.start(), f"Article {m.group(1)}", "article"))
# Annex markers
for m in re.finditer(r'(?:^|\n)\s*ANNEX\s+([IVXLC]+[a-z]?)\b', text, re.MULTILINE):
items.append((m.start(), f"Annex {m.group(1)}", "annex"))
# Also try numbered section headings as fallback
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
items.append((m.start(), f"Section {m.group(1)}", "section"))
items.sort(key=lambda x: x[0])
seen = set()
unique = []
for pos, label, typ in items:
if label not in seen:
seen.add(label)
unique.append((pos, label, typ))
print(f" Index: {len(unique)} sections")
if unique[:5]:
for pos, label, typ in unique[:5]:
print(f" {label} [{typ}] @ pos {pos}")
# Precompute normalized positions
index_norm = []
for pos, label, typ in unique:
norm_pos = len(normalize(text[:pos]))
index_norm.append((norm_pos, label, typ))
# Connect to DB
db_url = os.environ['DATABASE_URL']
parsed = urllib.parse.urlparse(db_url)
conn = psycopg2.connect(
host=parsed.hostname, port=parsed.port or 5432,
user=parsed.username, password=parsed.password,
dbname=parsed.path.lstrip('/'),
options="-c search_path=compliance,public"
)
cur = conn.cursor()
# Get Blue Guide controls without article_type (unmatched)
cur.execute("""
SELECT id, control_id, title, source_original_text,
source_citation->>'article' as existing_article,
source_citation->>'article_type' as existing_type,
release_state
FROM compliance.canonical_controls
WHERE source_citation->>'source' = 'EU Blue Guide 2022'
AND source_original_text IS NOT NULL
AND length(source_original_text) > 50
AND (source_citation->>'article_type' IS NULL)
ORDER BY control_id
""")
controls = cur.fetchall()
print(f"\nUnmatched Blue Guide controls: {len(controls)}")
# Match each control
results = []
found = 0
not_found = 0
for ctrl in controls:
ctrl_id, control_id, title, orig_text, existing_art, existing_type, state = ctrl
orig_norm = normalize(orig_text)
if len(orig_norm) < 30:
not_found += 1
continue
matched = False
for start_frac in [0.25, 0.1, 0.5, 0.0, 0.75]:
for length in [80, 60, 40, 30, 20]:
start = max(0, int(len(orig_norm) * start_frac))
snippet = orig_norm[start:start+length]
if not snippet or len(snippet) < 15:
continue
pos = text_norm.find(snippet)
if pos >= 0:
# Find section
label = "Unknown"
typ = "unknown"
for h_pos, h_label, h_type in reversed(index_norm):
if h_pos <= pos:
label = h_label
typ = h_type
break
results.append({
"ctrl_id": str(ctrl_id),
"control_id": control_id,
"source": "EU Blue Guide 2022",
"article_label": label,
"article_type": typ,
})
found += 1
is_active = "" if state not in ('duplicate', 'too_close') else " [DUP]"
print(f" {control_id:10s}: {label:25s} [{typ:8s}]{is_active}")
matched = True
break
if matched:
break
if not matched:
not_found += 1
print(f" {control_id:10s}: NOT FOUND {title[:50]}")
print(f"\n{'='*50}")
print(f"Results: {found} matched, {not_found} not found out of {len(controls)}")
# Save results
out_path = "/tmp/blue_guide_en_results.json"
with open(out_path, 'w') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"Saved to {out_path}")
# Apply results to DB
if results:
print(f"\nApplying {len(results)} results to DB...")
applied = 0
for r in results:
cur.execute("""
UPDATE compliance.canonical_controls
SET source_citation = source_citation ||
jsonb_build_object('article', %s, 'article_type', %s)
WHERE id = %s::uuid
AND (source_citation->>'article' IS DISTINCT FROM %s
OR source_citation->>'article_type' IS DISTINCT FROM %s)
""", (r["article_label"], r["article_type"],
r["ctrl_id"], r["article_label"], r["article_type"]))
if cur.rowcount > 0:
applied += 1
conn.commit()
print(f" Applied: {applied} controls updated")
# Show type distribution
type_counts = {}
for r in results:
t = r["article_type"]
type_counts[t] = type_counts.get(t, 0) + 1
if type_counts:
print(f"\nArticle type distribution:")
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {t:12s}: {c:5d}")
conn.close()