diff --git a/scripts/qa/db_status.py b/scripts/qa/db_status.py new file mode 100644 index 0000000..98c9c4f --- /dev/null +++ b/scripts/qa/db_status.py @@ -0,0 +1,35 @@ +"""Quick DB status check.""" +import os, psycopg2, urllib.parse +db_url = os.environ['DATABASE_URL'] +parsed = urllib.parse.urlparse(db_url) +conn = psycopg2.connect(host=parsed.hostname, port=parsed.port or 5432, user=parsed.username, password=parsed.password, dbname=parsed.path.lstrip('/'), options="-c search_path=compliance,public") +cur = conn.cursor() + +cur.execute(""" + SELECT release_state, count(*) FROM compliance.canonical_controls + GROUP BY 1 ORDER BY count(*) DESC +""") +total = 0 +active = 0 +print("Release state distribution:") +for row in cur.fetchall(): + print(f" {str(row[0]):15s} {row[1]:6d}") + total += row[1] + if row[0] not in ('duplicate', 'too_close', 'deprecated'): + active += row[1] +print(f" {'TOTAL':15s} {total:6d}") +print(f" {'ACTIVE':15s} {active:6d}") + +# Article type distribution for active controls +cur.execute(""" + SELECT source_citation->>'article_type', count(*) + FROM compliance.canonical_controls + WHERE release_state NOT IN ('duplicate', 'too_close', 'deprecated') + AND source_citation->>'article_type' IS NOT NULL + GROUP BY 1 ORDER BY count(*) DESC +""") +print(f"\nArticle types (active controls):") +for row in cur.fetchall(): + print(f" {str(row[0]):12s} {row[1]:5d}") + +conn.close() diff --git a/scripts/qa/preamble_dedup.py b/scripts/qa/preamble_dedup.py new file mode 100644 index 0000000..5548373 --- /dev/null +++ b/scripts/qa/preamble_dedup.py @@ -0,0 +1,152 @@ +""" +Step 4: Preamble vs. Article dedup. +If a preamble control covers the same topic as an article control +(same regulation, similar title), mark the preamble as duplicate. +Article controls always take priority. +""" +import os +import re +import psycopg2 +import urllib.parse + +STOPWORDS = { + 'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einer', 'eines', + 'und', 'oder', 'für', 'von', 'zu', 'mit', 'auf', 'in', 'an', 'bei', 'nach', + 'über', 'unter', 'durch', 'als', 'aus', 'zur', 'zum', 'im', 'am', 'um', + 'ist', 'sind', 'wird', 'werden', 'hat', 'haben', 'kann', 'können', + 'the', 'and', 'or', 'for', 'of', 'to', 'with', 'on', 'in', 'at', 'by', + 'is', 'are', 'be', 'was', 'were', 'been', 'has', 'have', 'had', + 'a', 'an', 'not', 'no', 'from', +} + +def tokenize(title): + """Tokenize and remove stopwords.""" + words = set(re.findall(r'\w+', title.lower())) + return words - STOPWORDS + +def jaccard(a, b): + """Jaccard similarity between two word sets.""" + if not a or not b: + return 0.0 + return len(a & b) / len(a | b) + +db_url = os.environ['DATABASE_URL'] +parsed = urllib.parse.urlparse(db_url) +conn = psycopg2.connect( + host=parsed.hostname, port=parsed.port or 5432, + user=parsed.username, password=parsed.password, + dbname=parsed.path.lstrip('/'), + options="-c search_path=compliance,public" +) +cur = conn.cursor() + +# Get all active controls with article_type +cur.execute(""" + SELECT id, control_id, title, + source_citation->>'source' as source, + source_citation->>'article' as article, + source_citation->>'article_type' as article_type, + release_state + FROM compliance.canonical_controls + WHERE release_state NOT IN ('duplicate', 'too_close') + AND source_citation->>'article_type' IS NOT NULL + ORDER BY source_citation->>'source', control_id +""") +controls = cur.fetchall() +print(f"Active controls with article_type: {len(controls)}") + +# Group by source +by_source = {} +for c in controls: + src = c[3] or "(null)" + by_source.setdefault(src, []).append(c) + +# For each source: find preamble controls that duplicate article controls +total_dupes = 0 +dupe_pairs = [] + +for source, ctrls in sorted(by_source.items(), key=lambda x: -len(x[1])): + articles = [c for c in ctrls if c[5] == 'article'] + preambles = [c for c in ctrls if c[5] == 'preamble'] + annexes = [c for c in ctrls if c[5] == 'annex'] + + if not preambles or not articles: + continue + + # Precompute tokens for article controls + article_tokens = [(c, tokenize(c[2])) for c in articles] + + source_dupes = 0 + for p_ctrl in preambles: + p_tokens = tokenize(p_ctrl[2]) + if not p_tokens: + continue + + # Find best matching article control + best_match = None + best_score = 0 + for a_ctrl, a_tokens in article_tokens: + score = jaccard(p_tokens, a_tokens) + if score > best_score: + best_score = score + best_match = a_ctrl + + # Threshold: 0.40 similarity → likely same topic + if best_score >= 0.40 and best_match: + source_dupes += 1 + dupe_pairs.append((p_ctrl, best_match, best_score)) + + if source_dupes > 0: + print(f"\n{source}: {source_dupes} preamble duplicates (of {len(preambles)} preambles, {len(articles)} articles)") + # Show first 3 pairs + pairs_for_source = [(p, a, s) for p, a, s in dupe_pairs if p[3] == source] + for p, a, score in pairs_for_source[:3]: + print(f" PREAMBLE {p[1]}: {p[2][:60]}") + print(f" ARTICLE {a[1]}: {a[2][:60]}") + print(f" Jaccard: {score:.2f} ({p[4]} vs {a[4]})") + print() + + total_dupes += source_dupes + +print(f"\n{'='*60}") +print(f"SUMMARY") +print(f"{'='*60}") +print(f" Total preamble controls checked: {sum(len([c for c in v if c[5]=='preamble']) for v in by_source.values())}") +print(f" Preamble duplicates found: {total_dupes}") +print(f" Unique preamble controls: {sum(len([c for c in v if c[5]=='preamble']) for v in by_source.values()) - total_dupes}") + +# Preview only — don't apply yet +if dupe_pairs: + print(f"\n=== DRY RUN: Would mark {total_dupes} as duplicate ===") + # Score distribution + scores = [s for _, _, s in dupe_pairs] + print(f" Score range: {min(scores):.2f} - {max(scores):.2f}") + print(f" Score >= 0.50: {sum(1 for s in scores if s >= 0.50)}") + print(f" Score 0.40-0.49: {sum(1 for s in scores if 0.40 <= s < 0.50)}") + + # Ask for confirmation + print(f"\n Apply? Set APPLY=1 env var to mark duplicates.") + if os.environ.get('APPLY') == '1': + print(f"\n Applying {total_dupes} duplicate marks...") + applied = 0 + for p_ctrl, a_ctrl, score in dupe_pairs: + cur.execute(""" + UPDATE compliance.canonical_controls + SET release_state = 'duplicate', + updated_at = now() + WHERE id = %s + AND release_state NOT IN ('duplicate', 'too_close') + """, (str(p_ctrl[0]),)) + if cur.rowcount > 0: + applied += 1 + conn.commit() + print(f" Applied: {applied} marked as duplicate") + else: + # Show all pairs for review + print(f"\n=== All {total_dupes} pairs ===") + for p, a, score in sorted(dupe_pairs, key=lambda x: -x[2])[:30]: + print(f" [{score:.2f}] {p[1]:10s} ({p[4]}) → {a[1]:10s} ({a[4]})") + print(f" P: {p[2][:65]}") + print(f" A: {a[2][:65]}") + +conn.close()