Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
- Control Library: parent control display, ObligationTypeBadge, GenerationStrategyBadge variants, evidence string fallback - API: expose parent_control_uuid/id/title in canonical controls - Fix: DSFA SQLAlchemy 2.0 Row._mapping compatibility - Migration 074: control_parent_links + control_dedup_reviews tables - QA scripts: benchmark, gap analysis, OSCAL import, OWASP cleanup, phase5 normalize, phase74 gap fill, sync_db, run_job - Docs: dedup engine, RAG benchmark, lessons learned, pipeline docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
189 lines
6.6 KiB
Python
189 lines
6.6 KiB
Python
"""
|
|
Phase 7.3: Gap Analysis — Identify articles/sections WITHOUT controls.
|
|
|
|
For each regulation PDF:
|
|
1. Extract all articles/sections from the PDF
|
|
2. Compare with controls in the DB that reference this article
|
|
3. Report gaps (articles with no controls)
|
|
|
|
Usage:
|
|
python3 gap_analysis.py # show all gaps
|
|
python3 gap_analysis.py --source "DSGVO" # filter by source
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
import psycopg2
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
# Import from pdf_qa_all
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from pdf_qa_all import (
|
|
SOURCE_FILE_MAP, read_file, classify_doc, normalize,
|
|
build_eu_article_index, build_de_law_index, build_nist_index,
|
|
build_owasp_index, build_generic_index, MAX_ARTICLES
|
|
)
|
|
|
|
# Only analyze sources with significant control counts (skip sources with <5 controls)
|
|
MIN_CONTROLS = 5
|
|
|
|
|
|
def main():
|
|
source_filter = None
|
|
if "--source" in sys.argv:
|
|
idx = sys.argv.index("--source")
|
|
if idx + 1 < len(sys.argv):
|
|
source_filter = sys.argv[idx + 1]
|
|
|
|
# DB connection
|
|
db_url = os.environ['DATABASE_URL']
|
|
parsed = urllib.parse.urlparse(db_url)
|
|
conn = psycopg2.connect(
|
|
host=parsed.hostname, port=parsed.port or 5432,
|
|
user=parsed.username, password=parsed.password,
|
|
dbname=parsed.path.lstrip('/'),
|
|
options="-c search_path=compliance,public"
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
# Get all controls grouped by source with their article
|
|
cur.execute("""
|
|
SELECT source_citation->>'source' as source,
|
|
source_citation->>'article' as article,
|
|
source_citation->>'article_type' as article_type,
|
|
count(*) as cnt
|
|
FROM compliance.canonical_controls
|
|
WHERE source_citation->>'source' IS NOT NULL
|
|
AND release_state NOT IN ('duplicate', 'too_close')
|
|
GROUP BY 1, 2, 3
|
|
ORDER BY 1, 2
|
|
""")
|
|
|
|
# Build: source -> {article -> (type, count)}
|
|
controls_by_source = defaultdict(dict)
|
|
for source, article, art_type, cnt in cur.fetchall():
|
|
if article:
|
|
controls_by_source[source][article] = (art_type or "unknown", cnt)
|
|
|
|
total_gaps = 0
|
|
total_articles_checked = 0
|
|
total_covered = 0
|
|
gap_report = []
|
|
|
|
sources_to_check = sorted(SOURCE_FILE_MAP.keys())
|
|
if source_filter:
|
|
sources_to_check = [s for s in sources_to_check if source_filter.lower() in s.lower()]
|
|
|
|
for source_name in sources_to_check:
|
|
filename = SOURCE_FILE_MAP.get(source_name)
|
|
if filename is None:
|
|
continue
|
|
|
|
controls = controls_by_source.get(source_name, {})
|
|
if len(controls) < MIN_CONTROLS and not source_filter:
|
|
continue
|
|
|
|
# Read PDF and build article index
|
|
text = read_file(filename)
|
|
if text is None:
|
|
continue
|
|
|
|
doc_type = classify_doc(source_name)
|
|
max_art = MAX_ARTICLES.get(source_name)
|
|
|
|
if doc_type == "eu_regulation":
|
|
index = build_eu_article_index(text, max_article=max_art)
|
|
elif doc_type == "de_law":
|
|
index = build_de_law_index(text)
|
|
elif doc_type == "nist":
|
|
index = build_nist_index(text)
|
|
elif doc_type == "owasp":
|
|
index = build_owasp_index(text, source_name)
|
|
else:
|
|
index = build_generic_index(text)
|
|
|
|
if not index:
|
|
continue
|
|
|
|
# Only look at substantive articles (not preamble, not annex for gap analysis)
|
|
substantive_types = {"article", "section", "control", "requirement", "category"}
|
|
substantive_articles = [(pos, label, typ) for pos, label, typ in index if typ in substantive_types]
|
|
|
|
preamble_articles = [(pos, label, typ) for pos, label, typ in index if typ == "preamble"]
|
|
annex_articles = [(pos, label, typ) for pos, label, typ in index if typ == "annex"]
|
|
|
|
# Check which articles have controls
|
|
covered = []
|
|
gaps = []
|
|
for pos, label, typ in substantive_articles:
|
|
if label in controls:
|
|
covered.append(label)
|
|
else:
|
|
gaps.append((label, typ))
|
|
|
|
total_articles_checked += len(substantive_articles)
|
|
total_covered += len(covered)
|
|
total_gaps += len(gaps)
|
|
|
|
# Count preamble/annex controls
|
|
preamble_controls = sum(1 for a in controls if controls[a][0] == "preamble")
|
|
annex_controls = sum(1 for a in controls if controls[a][0] == "annex")
|
|
|
|
coverage_pct = len(covered) / len(substantive_articles) * 100 if substantive_articles else 0
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"{source_name}")
|
|
print(f" PDF articles: {len(substantive_articles)} substantive, "
|
|
f"{len(preamble_articles)} preamble, {len(annex_articles)} annex")
|
|
print(f" DB controls: {sum(v[1] for v in controls.values())} total "
|
|
f"({preamble_controls} preamble, {annex_controls} annex)")
|
|
print(f" Coverage: {len(covered)}/{len(substantive_articles)} "
|
|
f"({coverage_pct:.0f}%)")
|
|
|
|
if gaps:
|
|
print(f" GAPS ({len(gaps)}):")
|
|
for label, typ in gaps[:30]: # limit output
|
|
print(f" - {label} [{typ}]")
|
|
if len(gaps) > 30:
|
|
print(f" ... and {len(gaps)-30} more")
|
|
|
|
gap_report.append({
|
|
"source": source_name,
|
|
"total_articles": len(substantive_articles),
|
|
"covered": len(covered),
|
|
"gaps": len(gaps),
|
|
"coverage_pct": round(coverage_pct, 1),
|
|
"gap_articles": [{"label": l, "type": t} for l, t in gaps],
|
|
})
|
|
|
|
# Summary
|
|
print(f"\n{'='*70}")
|
|
print("GAP ANALYSIS SUMMARY")
|
|
print(f"{'='*70}")
|
|
print(f" Sources analyzed: {len([r for r in gap_report]) + len([s for s in sources_to_check if SOURCE_FILE_MAP.get(s)])}")
|
|
print(f" Total articles in PDFs: {total_articles_checked}")
|
|
print(f" Articles with controls: {total_covered}")
|
|
print(f" Articles WITHOUT controls: {total_gaps}")
|
|
if total_articles_checked:
|
|
print(f" Overall coverage: {total_covered/total_articles_checked*100:.1f}%")
|
|
|
|
print(f"\n Sources with gaps:")
|
|
for r in sorted(gap_report, key=lambda x: -x["gaps"]):
|
|
print(f" {r['source']:45s} {r['gaps']:4d} gaps "
|
|
f"({r['covered']}/{r['total_articles']} = {r['coverage_pct']}%)")
|
|
|
|
# Save report
|
|
out_path = "/tmp/gap_analysis_results.json"
|
|
with open(out_path, 'w') as f:
|
|
json.dump(gap_report, f, indent=2, ensure_ascii=False)
|
|
print(f"\n Full report saved to {out_path}")
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|