feat: Control Library UI, dedup migration, QA tooling, docs
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
- Control Library: parent control display, ObligationTypeBadge, GenerationStrategyBadge variants, evidence string fallback - API: expose parent_control_uuid/id/title in canonical controls - Fix: DSFA SQLAlchemy 2.0 Row._mapping compatibility - Migration 074: control_parent_links + control_dedup_reviews tables - QA scripts: benchmark, gap analysis, OSCAL import, OWASP cleanup, phase5 normalize, phase74 gap fill, sync_db, run_job - Docs: dedup engine, RAG benchmark, lessons learned, pipeline docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
200
scripts/qa/blue_guide_en_match.py
Normal file
200
scripts/qa/blue_guide_en_match.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Match unmatched Blue Guide controls against the English PDF."""
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import unicodedata
|
||||
import psycopg2
|
||||
import urllib.parse
|
||||
|
||||
try:
|
||||
import fitz
|
||||
except ImportError:
|
||||
print("ERROR: PyMuPDF (fitz) not installed")
|
||||
exit(1)
|
||||
|
||||
PDF_PATH = os.path.expanduser("~/rag-ingestion/pdfs/blue_guide_2022_en.pdf")
|
||||
|
||||
def normalize(s):
|
||||
s = s.replace('\u00ad', '').replace('\xad', '')
|
||||
s = s.replace('\u200b', '').replace('\u00a0', ' ')
|
||||
s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl')
|
||||
s = s.replace('\ufb00', 'ff').replace('\ufb03', 'ffi').replace('\ufb04', 'ffl')
|
||||
s = s.replace('\u2019', "'").replace('\u2018', "'")
|
||||
s = s.replace('\u201c', '"').replace('\u201d', '"')
|
||||
s = s.replace('\u2013', '-').replace('\u2014', '-')
|
||||
s = s.replace('\u2022', '-').replace('\u00b7', '-')
|
||||
s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s)
|
||||
s = unicodedata.normalize('NFC', s)
|
||||
s = re.sub(r'\s+', ' ', s)
|
||||
return s.strip()
|
||||
|
||||
# Read EN PDF
|
||||
print(f"Reading {PDF_PATH}...")
|
||||
doc = fitz.open(PDF_PATH)
|
||||
text = ""
|
||||
for page in doc:
|
||||
text += page.get_text() + "\n"
|
||||
doc.close()
|
||||
print(f" {len(text):,} chars")
|
||||
|
||||
text_norm = normalize(text)
|
||||
|
||||
# Build article index for EN Blue Guide
|
||||
# EN Blue Guide uses "Article N" headings (not "Artikel N")
|
||||
items = []
|
||||
|
||||
# Find where "Article 1" starts — content before is preamble/intro
|
||||
art1_match = re.search(r'\nArticle\s+1\s*\n', text)
|
||||
if not art1_match:
|
||||
# Try section-based structure instead
|
||||
print(" No 'Article N' headings found, trying section-based index...")
|
||||
for m in re.finditer(r'(?:^|\n)\s*(\d+(?:\.\d+)*)\.\s+[A-Z]', text, re.MULTILINE):
|
||||
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
||||
else:
|
||||
art1_pos = art1_match.start()
|
||||
# Article headings
|
||||
for m in re.finditer(r'(?:^|\n)\s*Article\s+(\d+[a-z]?)\s*\n', text, re.MULTILINE):
|
||||
art_num = int(re.match(r'(\d+)', m.group(1)).group(1))
|
||||
items.append((m.start(), f"Article {m.group(1)}", "article"))
|
||||
|
||||
# Annex markers
|
||||
for m in re.finditer(r'(?:^|\n)\s*ANNEX\s+([IVXLC]+[a-z]?)\b', text, re.MULTILINE):
|
||||
items.append((m.start(), f"Annex {m.group(1)}", "annex"))
|
||||
|
||||
# Also try numbered section headings as fallback
|
||||
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
|
||||
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
||||
|
||||
items.sort(key=lambda x: x[0])
|
||||
seen = set()
|
||||
unique = []
|
||||
for pos, label, typ in items:
|
||||
if label not in seen:
|
||||
seen.add(label)
|
||||
unique.append((pos, label, typ))
|
||||
|
||||
print(f" Index: {len(unique)} sections")
|
||||
if unique[:5]:
|
||||
for pos, label, typ in unique[:5]:
|
||||
print(f" {label} [{typ}] @ pos {pos}")
|
||||
|
||||
# Precompute normalized positions
|
||||
index_norm = []
|
||||
for pos, label, typ in unique:
|
||||
norm_pos = len(normalize(text[:pos]))
|
||||
index_norm.append((norm_pos, label, typ))
|
||||
|
||||
# Connect to DB
|
||||
db_url = os.environ['DATABASE_URL']
|
||||
parsed = urllib.parse.urlparse(db_url)
|
||||
conn = psycopg2.connect(
|
||||
host=parsed.hostname, port=parsed.port or 5432,
|
||||
user=parsed.username, password=parsed.password,
|
||||
dbname=parsed.path.lstrip('/'),
|
||||
options="-c search_path=compliance,public"
|
||||
)
|
||||
cur = conn.cursor()
|
||||
|
||||
# Get Blue Guide controls without article_type (unmatched)
|
||||
cur.execute("""
|
||||
SELECT id, control_id, title, source_original_text,
|
||||
source_citation->>'article' as existing_article,
|
||||
source_citation->>'article_type' as existing_type,
|
||||
release_state
|
||||
FROM compliance.canonical_controls
|
||||
WHERE source_citation->>'source' = 'EU Blue Guide 2022'
|
||||
AND source_original_text IS NOT NULL
|
||||
AND length(source_original_text) > 50
|
||||
AND (source_citation->>'article_type' IS NULL)
|
||||
ORDER BY control_id
|
||||
""")
|
||||
controls = cur.fetchall()
|
||||
print(f"\nUnmatched Blue Guide controls: {len(controls)}")
|
||||
|
||||
# Match each control
|
||||
results = []
|
||||
found = 0
|
||||
not_found = 0
|
||||
|
||||
for ctrl in controls:
|
||||
ctrl_id, control_id, title, orig_text, existing_art, existing_type, state = ctrl
|
||||
orig_norm = normalize(orig_text)
|
||||
if len(orig_norm) < 30:
|
||||
not_found += 1
|
||||
continue
|
||||
|
||||
matched = False
|
||||
for start_frac in [0.25, 0.1, 0.5, 0.0, 0.75]:
|
||||
for length in [80, 60, 40, 30, 20]:
|
||||
start = max(0, int(len(orig_norm) * start_frac))
|
||||
snippet = orig_norm[start:start+length]
|
||||
if not snippet or len(snippet) < 15:
|
||||
continue
|
||||
pos = text_norm.find(snippet)
|
||||
if pos >= 0:
|
||||
# Find section
|
||||
label = "Unknown"
|
||||
typ = "unknown"
|
||||
for h_pos, h_label, h_type in reversed(index_norm):
|
||||
if h_pos <= pos:
|
||||
label = h_label
|
||||
typ = h_type
|
||||
break
|
||||
results.append({
|
||||
"ctrl_id": str(ctrl_id),
|
||||
"control_id": control_id,
|
||||
"source": "EU Blue Guide 2022",
|
||||
"article_label": label,
|
||||
"article_type": typ,
|
||||
})
|
||||
found += 1
|
||||
is_active = "" if state not in ('duplicate', 'too_close') else " [DUP]"
|
||||
print(f" {control_id:10s}: {label:25s} [{typ:8s}]{is_active}")
|
||||
matched = True
|
||||
break
|
||||
if matched:
|
||||
break
|
||||
|
||||
if not matched:
|
||||
not_found += 1
|
||||
print(f" {control_id:10s}: NOT FOUND {title[:50]}")
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Results: {found} matched, {not_found} not found out of {len(controls)}")
|
||||
|
||||
# Save results
|
||||
out_path = "/tmp/blue_guide_en_results.json"
|
||||
with open(out_path, 'w') as f:
|
||||
json.dump(results, f, indent=2, ensure_ascii=False)
|
||||
print(f"Saved to {out_path}")
|
||||
|
||||
# Apply results to DB
|
||||
if results:
|
||||
print(f"\nApplying {len(results)} results to DB...")
|
||||
applied = 0
|
||||
for r in results:
|
||||
cur.execute("""
|
||||
UPDATE compliance.canonical_controls
|
||||
SET source_citation = source_citation ||
|
||||
jsonb_build_object('article', %s, 'article_type', %s)
|
||||
WHERE id = %s::uuid
|
||||
AND (source_citation->>'article' IS DISTINCT FROM %s
|
||||
OR source_citation->>'article_type' IS DISTINCT FROM %s)
|
||||
""", (r["article_label"], r["article_type"],
|
||||
r["ctrl_id"], r["article_label"], r["article_type"]))
|
||||
if cur.rowcount > 0:
|
||||
applied += 1
|
||||
conn.commit()
|
||||
print(f" Applied: {applied} controls updated")
|
||||
|
||||
# Show type distribution
|
||||
type_counts = {}
|
||||
for r in results:
|
||||
t = r["article_type"]
|
||||
type_counts[t] = type_counts.get(t, 0) + 1
|
||||
if type_counts:
|
||||
print(f"\nArticle type distribution:")
|
||||
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
|
||||
print(f" {t:12s}: {c:5d}")
|
||||
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user