Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
- Control Library: parent control display, ObligationTypeBadge, GenerationStrategyBadge variants, evidence string fallback - API: expose parent_control_uuid/id/title in canonical controls - Fix: DSFA SQLAlchemy 2.0 Row._mapping compatibility - Migration 074: control_parent_links + control_dedup_reviews tables - QA scripts: benchmark, gap analysis, OSCAL import, OWASP cleanup, phase5 normalize, phase74 gap fill, sync_db, run_job - Docs: dedup engine, RAG benchmark, lessons learned, pipeline docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
201 lines
6.6 KiB
Python
201 lines
6.6 KiB
Python
"""Match unmatched Blue Guide controls against the English PDF."""
|
|
import os
|
|
import re
|
|
import json
|
|
import unicodedata
|
|
import psycopg2
|
|
import urllib.parse
|
|
|
|
try:
|
|
import fitz
|
|
except ImportError:
|
|
print("ERROR: PyMuPDF (fitz) not installed")
|
|
exit(1)
|
|
|
|
PDF_PATH = os.path.expanduser("~/rag-ingestion/pdfs/blue_guide_2022_en.pdf")
|
|
|
|
def normalize(s):
|
|
s = s.replace('\u00ad', '').replace('\xad', '')
|
|
s = s.replace('\u200b', '').replace('\u00a0', ' ')
|
|
s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl')
|
|
s = s.replace('\ufb00', 'ff').replace('\ufb03', 'ffi').replace('\ufb04', 'ffl')
|
|
s = s.replace('\u2019', "'").replace('\u2018', "'")
|
|
s = s.replace('\u201c', '"').replace('\u201d', '"')
|
|
s = s.replace('\u2013', '-').replace('\u2014', '-')
|
|
s = s.replace('\u2022', '-').replace('\u00b7', '-')
|
|
s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s)
|
|
s = unicodedata.normalize('NFC', s)
|
|
s = re.sub(r'\s+', ' ', s)
|
|
return s.strip()
|
|
|
|
# Read EN PDF
|
|
print(f"Reading {PDF_PATH}...")
|
|
doc = fitz.open(PDF_PATH)
|
|
text = ""
|
|
for page in doc:
|
|
text += page.get_text() + "\n"
|
|
doc.close()
|
|
print(f" {len(text):,} chars")
|
|
|
|
text_norm = normalize(text)
|
|
|
|
# Build article index for EN Blue Guide
|
|
# EN Blue Guide uses "Article N" headings (not "Artikel N")
|
|
items = []
|
|
|
|
# Find where "Article 1" starts — content before is preamble/intro
|
|
art1_match = re.search(r'\nArticle\s+1\s*\n', text)
|
|
if not art1_match:
|
|
# Try section-based structure instead
|
|
print(" No 'Article N' headings found, trying section-based index...")
|
|
for m in re.finditer(r'(?:^|\n)\s*(\d+(?:\.\d+)*)\.\s+[A-Z]', text, re.MULTILINE):
|
|
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
|
else:
|
|
art1_pos = art1_match.start()
|
|
# Article headings
|
|
for m in re.finditer(r'(?:^|\n)\s*Article\s+(\d+[a-z]?)\s*\n', text, re.MULTILINE):
|
|
art_num = int(re.match(r'(\d+)', m.group(1)).group(1))
|
|
items.append((m.start(), f"Article {m.group(1)}", "article"))
|
|
|
|
# Annex markers
|
|
for m in re.finditer(r'(?:^|\n)\s*ANNEX\s+([IVXLC]+[a-z]?)\b', text, re.MULTILINE):
|
|
items.append((m.start(), f"Annex {m.group(1)}", "annex"))
|
|
|
|
# Also try numbered section headings as fallback
|
|
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
|
|
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
|
|
|
items.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
unique = []
|
|
for pos, label, typ in items:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
unique.append((pos, label, typ))
|
|
|
|
print(f" Index: {len(unique)} sections")
|
|
if unique[:5]:
|
|
for pos, label, typ in unique[:5]:
|
|
print(f" {label} [{typ}] @ pos {pos}")
|
|
|
|
# Precompute normalized positions
|
|
index_norm = []
|
|
for pos, label, typ in unique:
|
|
norm_pos = len(normalize(text[:pos]))
|
|
index_norm.append((norm_pos, label, typ))
|
|
|
|
# Connect to DB
|
|
db_url = os.environ['DATABASE_URL']
|
|
parsed = urllib.parse.urlparse(db_url)
|
|
conn = psycopg2.connect(
|
|
host=parsed.hostname, port=parsed.port or 5432,
|
|
user=parsed.username, password=parsed.password,
|
|
dbname=parsed.path.lstrip('/'),
|
|
options="-c search_path=compliance,public"
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
# Get Blue Guide controls without article_type (unmatched)
|
|
cur.execute("""
|
|
SELECT id, control_id, title, source_original_text,
|
|
source_citation->>'article' as existing_article,
|
|
source_citation->>'article_type' as existing_type,
|
|
release_state
|
|
FROM compliance.canonical_controls
|
|
WHERE source_citation->>'source' = 'EU Blue Guide 2022'
|
|
AND source_original_text IS NOT NULL
|
|
AND length(source_original_text) > 50
|
|
AND (source_citation->>'article_type' IS NULL)
|
|
ORDER BY control_id
|
|
""")
|
|
controls = cur.fetchall()
|
|
print(f"\nUnmatched Blue Guide controls: {len(controls)}")
|
|
|
|
# Match each control
|
|
results = []
|
|
found = 0
|
|
not_found = 0
|
|
|
|
for ctrl in controls:
|
|
ctrl_id, control_id, title, orig_text, existing_art, existing_type, state = ctrl
|
|
orig_norm = normalize(orig_text)
|
|
if len(orig_norm) < 30:
|
|
not_found += 1
|
|
continue
|
|
|
|
matched = False
|
|
for start_frac in [0.25, 0.1, 0.5, 0.0, 0.75]:
|
|
for length in [80, 60, 40, 30, 20]:
|
|
start = max(0, int(len(orig_norm) * start_frac))
|
|
snippet = orig_norm[start:start+length]
|
|
if not snippet or len(snippet) < 15:
|
|
continue
|
|
pos = text_norm.find(snippet)
|
|
if pos >= 0:
|
|
# Find section
|
|
label = "Unknown"
|
|
typ = "unknown"
|
|
for h_pos, h_label, h_type in reversed(index_norm):
|
|
if h_pos <= pos:
|
|
label = h_label
|
|
typ = h_type
|
|
break
|
|
results.append({
|
|
"ctrl_id": str(ctrl_id),
|
|
"control_id": control_id,
|
|
"source": "EU Blue Guide 2022",
|
|
"article_label": label,
|
|
"article_type": typ,
|
|
})
|
|
found += 1
|
|
is_active = "" if state not in ('duplicate', 'too_close') else " [DUP]"
|
|
print(f" {control_id:10s}: {label:25s} [{typ:8s}]{is_active}")
|
|
matched = True
|
|
break
|
|
if matched:
|
|
break
|
|
|
|
if not matched:
|
|
not_found += 1
|
|
print(f" {control_id:10s}: NOT FOUND {title[:50]}")
|
|
|
|
print(f"\n{'='*50}")
|
|
print(f"Results: {found} matched, {not_found} not found out of {len(controls)}")
|
|
|
|
# Save results
|
|
out_path = "/tmp/blue_guide_en_results.json"
|
|
with open(out_path, 'w') as f:
|
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
print(f"Saved to {out_path}")
|
|
|
|
# Apply results to DB
|
|
if results:
|
|
print(f"\nApplying {len(results)} results to DB...")
|
|
applied = 0
|
|
for r in results:
|
|
cur.execute("""
|
|
UPDATE compliance.canonical_controls
|
|
SET source_citation = source_citation ||
|
|
jsonb_build_object('article', %s, 'article_type', %s)
|
|
WHERE id = %s::uuid
|
|
AND (source_citation->>'article' IS DISTINCT FROM %s
|
|
OR source_citation->>'article_type' IS DISTINCT FROM %s)
|
|
""", (r["article_label"], r["article_type"],
|
|
r["ctrl_id"], r["article_label"], r["article_type"]))
|
|
if cur.rowcount > 0:
|
|
applied += 1
|
|
conn.commit()
|
|
print(f" Applied: {applied} controls updated")
|
|
|
|
# Show type distribution
|
|
type_counts = {}
|
|
for r in results:
|
|
t = r["article_type"]
|
|
type_counts[t] = type_counts.get(t, 0) + 1
|
|
if type_counts:
|
|
print(f"\nArticle type distribution:")
|
|
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
|
|
print(f" {t:12s}: {c:5d}")
|
|
|
|
conn.close()
|