chore(qa): preamble vs article dedup — 190 duplicates marked
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 33s
CI/CD / test-python-backend-compliance (push) Successful in 32s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 16s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped

Preamble controls that duplicate article controls (same regulation,
Jaccard title similarity >= 0.40) are marked as duplicate.
Article controls always take priority.

Result: 6,183 active controls (was 6,373), 648 unique preamble controls remain.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-20 08:08:04 +01:00
parent 0e16640c28
commit 92d37a1660
2 changed files with 187 additions and 0 deletions

35
scripts/qa/db_status.py Normal file
View File

@@ -0,0 +1,35 @@
"""Quick DB status check."""
import os, psycopg2, urllib.parse
db_url = os.environ['DATABASE_URL']
parsed = urllib.parse.urlparse(db_url)
conn = psycopg2.connect(host=parsed.hostname, port=parsed.port or 5432, user=parsed.username, password=parsed.password, dbname=parsed.path.lstrip('/'), options="-c search_path=compliance,public")
cur = conn.cursor()
cur.execute("""
SELECT release_state, count(*) FROM compliance.canonical_controls
GROUP BY 1 ORDER BY count(*) DESC
""")
total = 0
active = 0
print("Release state distribution:")
for row in cur.fetchall():
print(f" {str(row[0]):15s} {row[1]:6d}")
total += row[1]
if row[0] not in ('duplicate', 'too_close', 'deprecated'):
active += row[1]
print(f" {'TOTAL':15s} {total:6d}")
print(f" {'ACTIVE':15s} {active:6d}")
# Article type distribution for active controls
cur.execute("""
SELECT source_citation->>'article_type', count(*)
FROM compliance.canonical_controls
WHERE release_state NOT IN ('duplicate', 'too_close', 'deprecated')
AND source_citation->>'article_type' IS NOT NULL
GROUP BY 1 ORDER BY count(*) DESC
""")
print(f"\nArticle types (active controls):")
for row in cur.fetchall():
print(f" {str(row[0]):12s} {row[1]:5d}")
conn.close()

View File

@@ -0,0 +1,152 @@
"""
Step 4: Preamble vs. Article dedup.
If a preamble control covers the same topic as an article control
(same regulation, similar title), mark the preamble as duplicate.
Article controls always take priority.
"""
import os
import re
import psycopg2
import urllib.parse
STOPWORDS = {
'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einer', 'eines',
'und', 'oder', 'für', 'von', 'zu', 'mit', 'auf', 'in', 'an', 'bei', 'nach',
'über', 'unter', 'durch', 'als', 'aus', 'zur', 'zum', 'im', 'am', 'um',
'ist', 'sind', 'wird', 'werden', 'hat', 'haben', 'kann', 'können',
'the', 'and', 'or', 'for', 'of', 'to', 'with', 'on', 'in', 'at', 'by',
'is', 'are', 'be', 'was', 'were', 'been', 'has', 'have', 'had',
'a', 'an', 'not', 'no', 'from',
}
def tokenize(title):
"""Tokenize and remove stopwords."""
words = set(re.findall(r'\w+', title.lower()))
return words - STOPWORDS
def jaccard(a, b):
"""Jaccard similarity between two word sets."""
if not a or not b:
return 0.0
return len(a & b) / len(a | b)
db_url = os.environ['DATABASE_URL']
parsed = urllib.parse.urlparse(db_url)
conn = psycopg2.connect(
host=parsed.hostname, port=parsed.port or 5432,
user=parsed.username, password=parsed.password,
dbname=parsed.path.lstrip('/'),
options="-c search_path=compliance,public"
)
cur = conn.cursor()
# Get all active controls with article_type
cur.execute("""
SELECT id, control_id, title,
source_citation->>'source' as source,
source_citation->>'article' as article,
source_citation->>'article_type' as article_type,
release_state
FROM compliance.canonical_controls
WHERE release_state NOT IN ('duplicate', 'too_close')
AND source_citation->>'article_type' IS NOT NULL
ORDER BY source_citation->>'source', control_id
""")
controls = cur.fetchall()
print(f"Active controls with article_type: {len(controls)}")
# Group by source
by_source = {}
for c in controls:
src = c[3] or "(null)"
by_source.setdefault(src, []).append(c)
# For each source: find preamble controls that duplicate article controls
total_dupes = 0
dupe_pairs = []
for source, ctrls in sorted(by_source.items(), key=lambda x: -len(x[1])):
articles = [c for c in ctrls if c[5] == 'article']
preambles = [c for c in ctrls if c[5] == 'preamble']
annexes = [c for c in ctrls if c[5] == 'annex']
if not preambles or not articles:
continue
# Precompute tokens for article controls
article_tokens = [(c, tokenize(c[2])) for c in articles]
source_dupes = 0
for p_ctrl in preambles:
p_tokens = tokenize(p_ctrl[2])
if not p_tokens:
continue
# Find best matching article control
best_match = None
best_score = 0
for a_ctrl, a_tokens in article_tokens:
score = jaccard(p_tokens, a_tokens)
if score > best_score:
best_score = score
best_match = a_ctrl
# Threshold: 0.40 similarity → likely same topic
if best_score >= 0.40 and best_match:
source_dupes += 1
dupe_pairs.append((p_ctrl, best_match, best_score))
if source_dupes > 0:
print(f"\n{source}: {source_dupes} preamble duplicates (of {len(preambles)} preambles, {len(articles)} articles)")
# Show first 3 pairs
pairs_for_source = [(p, a, s) for p, a, s in dupe_pairs if p[3] == source]
for p, a, score in pairs_for_source[:3]:
print(f" PREAMBLE {p[1]}: {p[2][:60]}")
print(f" ARTICLE {a[1]}: {a[2][:60]}")
print(f" Jaccard: {score:.2f} ({p[4]} vs {a[4]})")
print()
total_dupes += source_dupes
print(f"\n{'='*60}")
print(f"SUMMARY")
print(f"{'='*60}")
print(f" Total preamble controls checked: {sum(len([c for c in v if c[5]=='preamble']) for v in by_source.values())}")
print(f" Preamble duplicates found: {total_dupes}")
print(f" Unique preamble controls: {sum(len([c for c in v if c[5]=='preamble']) for v in by_source.values()) - total_dupes}")
# Preview only — don't apply yet
if dupe_pairs:
print(f"\n=== DRY RUN: Would mark {total_dupes} as duplicate ===")
# Score distribution
scores = [s for _, _, s in dupe_pairs]
print(f" Score range: {min(scores):.2f} - {max(scores):.2f}")
print(f" Score >= 0.50: {sum(1 for s in scores if s >= 0.50)}")
print(f" Score 0.40-0.49: {sum(1 for s in scores if 0.40 <= s < 0.50)}")
# Ask for confirmation
print(f"\n Apply? Set APPLY=1 env var to mark duplicates.")
if os.environ.get('APPLY') == '1':
print(f"\n Applying {total_dupes} duplicate marks...")
applied = 0
for p_ctrl, a_ctrl, score in dupe_pairs:
cur.execute("""
UPDATE compliance.canonical_controls
SET release_state = 'duplicate',
updated_at = now()
WHERE id = %s
AND release_state NOT IN ('duplicate', 'too_close')
""", (str(p_ctrl[0]),))
if cur.rowcount > 0:
applied += 1
conn.commit()
print(f" Applied: {applied} marked as duplicate")
else:
# Show all pairs for review
print(f"\n=== All {total_dupes} pairs ===")
for p, a, score in sorted(dupe_pairs, key=lambda x: -x[2])[:30]:
print(f" [{score:.2f}] {p[1]:10s} ({p[4]}) → {a[1]:10s} ({a[4]})")
print(f" P: {p[2][:65]}")
print(f" A: {a[2][:65]}")
conn.close()