Files
breakpilot-compliance/scripts/qa/oscal_analysis.py
Benjamin Admin 643b26618f
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
feat: Control Library UI, dedup migration, QA tooling, docs
- Control Library: parent control display, ObligationTypeBadge,
  GenerationStrategyBadge variants, evidence string fallback
- API: expose parent_control_uuid/id/title in canonical controls
- Fix: DSFA SQLAlchemy 2.0 Row._mapping compatibility
- Migration 074: control_parent_links + control_dedup_reviews tables
- QA scripts: benchmark, gap analysis, OSCAL import, OWASP cleanup,
  phase5 normalize, phase74 gap fill, sync_db, run_job
- Docs: dedup engine, RAG benchmark, lessons learned, pipeline docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 11:56:08 +01:00

289 lines
10 KiB
Python

"""Analyze NIST OSCAL data and compare with existing controls in DB."""
import os
import re
import json
import psycopg2
import urllib.parse
from collections import defaultdict
OSCAL_DIR = os.path.expanduser("~/rag-ingestion/nist-oscal")
# ── Load SP 800-53 Rev 5 ──
with open(os.path.join(OSCAL_DIR, "sp800-53-rev5-catalog.json")) as f:
sp853 = json.load(f)["catalog"]
print("=" * 70)
print("NIST SP 800-53 Rev 5 — OSCAL Catalog Analysis")
print("=" * 70)
print(f" UUID: {sp853.get('uuid', '?')}")
print(f" Last Modified: {sp853.get('metadata', {}).get('last-modified', '?')}")
# Count controls
families = sp853.get("groups", [])
total_base = 0
total_enhancements = 0
total_withdrawn = 0
total_active = 0
family_stats = []
for fam in families:
fam_id = fam.get("id", "?")
fam_title = fam.get("title", "?")
controls = fam.get("controls", [])
base = 0
enhancements = 0
withdrawn = 0
for ctrl in controls:
# Check if withdrawn
props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])}
is_withdrawn = props.get("status") == "withdrawn"
if is_withdrawn:
withdrawn += 1
else:
base += 1
# Count enhancements
for enh in ctrl.get("controls", []):
enh_props = {p["name"]: p.get("value", "") for p in enh.get("props", [])}
if enh_props.get("status") == "withdrawn":
withdrawn += 1
else:
enhancements += 1
family_stats.append((fam_id, fam_title, base, enhancements, withdrawn))
total_base += base
total_enhancements += enhancements
total_withdrawn += withdrawn
total_active = total_base + total_enhancements
print(f"\n Families: {len(families)}")
print(f" Base Controls: {total_base}")
print(f" Enhancements: {total_enhancements}")
print(f" Withdrawn: {total_withdrawn}")
print(f" TOTAL ACTIVE: {total_active}")
print(f"\n Per Family:")
print(f" {'ID':6s} {'Title':45s} {'Base':>5s} {'Enh':>5s} {'Wdrn':>5s}")
for fam_id, title, base, enh, wdrn in family_stats:
print(f" {fam_id:6s} {title[:45]:45s} {base:5d} {enh:5d} {wdrn:5d}")
# Show example control structure
print(f"\n Example Control (AC-6 Least Privilege):")
for fam in families:
for ctrl in fam.get("controls", []):
if ctrl["id"] == "ac-6":
props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])}
print(f" ID: {ctrl['id']}")
print(f" Label: {props.get('label', '?')}")
print(f" Title: {ctrl['title']}")
for part in ctrl.get("parts", []):
if part.get("name") == "statement":
prose = part.get("prose", "")
print(f" Statement: {prose[:150]}...")
elif part.get("name") == "guidance":
prose = part.get("prose", "")
print(f" Guidance: {prose[:150]}...")
enh_count = len(ctrl.get("controls", []))
print(f" Enhancements: {enh_count}")
links = [l["href"].lstrip("#") for l in ctrl.get("links", []) if l.get("rel") == "related"]
print(f" Related: {', '.join(links[:8])}...")
break
# ── Load CSF 2.0 ──
print(f"\n{'='*70}")
print("NIST CSF 2.0 — OSCAL Catalog Analysis")
print("=" * 70)
with open(os.path.join(OSCAL_DIR, "csf-2.0-catalog.json")) as f:
csf = json.load(f)["catalog"]
csf_groups = csf.get("groups", [])
csf_total = 0
for grp in csf_groups:
func_title = grp.get("title", "?")
cats = grp.get("groups", [])
subcats = 0
for cat in cats:
subcats += len(cat.get("controls", []))
csf_total += subcats
print(f" {func_title:25s}: {len(cats):2d} categories, {subcats:3d} subcategories")
print(f" TOTAL: {csf_total} subcategories")
# ── Compare with existing DB controls ──
print(f"\n{'='*70}")
print("VERGLEICH: OSCAL vs. bestehende Controls in DB")
print("=" * 70)
db_url = os.environ['DATABASE_URL']
parsed = urllib.parse.urlparse(db_url)
conn = psycopg2.connect(
host=parsed.hostname, port=parsed.port or 5432,
user=parsed.username, password=parsed.password,
dbname=parsed.path.lstrip('/'),
options="-c search_path=compliance,public"
)
cur = conn.cursor()
# Get existing NIST controls
cur.execute("""
SELECT control_id, title,
source_citation->>'source' as source,
source_citation->>'article' as article,
source_citation->>'article_type' as art_type,
release_state
FROM compliance.canonical_controls
WHERE source_citation->>'source' LIKE 'NIST%%'
ORDER BY source_citation->>'source', control_id
""")
nist_controls = cur.fetchall()
# Group by source
by_source = defaultdict(list)
for ctrl in nist_controls:
by_source[ctrl[2]].append(ctrl)
print(f"\n Bestehende NIST Controls in DB:")
for src in sorted(by_source.keys()):
ctrls = by_source[src]
active = sum(1 for c in ctrls if c[5] not in ('duplicate', 'too_close'))
with_article = sum(1 for c in ctrls if c[3])
print(f" {src:40s}: {len(ctrls):4d} total, {active:4d} active, {with_article:4d} mit article")
# For SP 800-53: which control families do we have?
sp853_existing = [c for c in nist_controls if 'SP 800-53' in (c[2] or '')]
existing_families = set()
existing_articles = set()
for ctrl in sp853_existing:
article = ctrl[3] or ""
if article:
# Extract family prefix (e.g., "AC-6" → "AC")
m = re.match(r'([A-Z]{2})-', article)
if m:
existing_families.add(m.group(1))
existing_articles.add(article)
print(f"\n SP 800-53 in DB:")
print(f" Total: {len(sp853_existing)}")
print(f" Families covered: {len(existing_families)}")
print(f" Unique articles: {len(existing_articles)}")
print(f" Families: {', '.join(sorted(existing_families))}")
# Compare: which OSCAL controls are NOT in our DB?
oscal_controls = {} # id → (label, title, statement)
for fam in families:
for ctrl in fam.get("controls", []):
props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])}
if props.get("status") == "withdrawn":
continue
label = props.get("label", ctrl["id"].upper())
statement = ""
guidance = ""
for part in ctrl.get("parts", []):
if part.get("name") == "statement":
statement = part.get("prose", "")
# Also check sub-items
for sub in part.get("parts", []):
statement += " " + sub.get("prose", "")
elif part.get("name") == "guidance":
guidance = part.get("prose", "")
oscal_controls[label] = (ctrl["title"], statement[:500], guidance[:500])
# Enhancements
for enh in ctrl.get("controls", []):
enh_props = {p["name"]: p.get("value", "") for p in enh.get("props", [])}
if enh_props.get("status") == "withdrawn":
continue
enh_label = enh_props.get("label", enh["id"].upper())
enh_statement = ""
enh_guidance = ""
for part in enh.get("parts", []):
if part.get("name") == "statement":
enh_statement = part.get("prose", "")
for sub in part.get("parts", []):
enh_statement += " " + sub.get("prose", "")
elif part.get("name") == "guidance":
enh_guidance = part.get("prose", "")
oscal_controls[enh_label] = (enh["title"], enh_statement[:500], enh_guidance[:500])
print(f"\n OSCAL SP 800-53 aktive Controls: {len(oscal_controls)}")
# Find missing: in OSCAL but not in DB
missing = []
covered = []
for label in sorted(oscal_controls.keys()):
if label in existing_articles:
covered.append(label)
else:
missing.append(label)
print(f" In DB vorhanden: {len(covered)}")
print(f" FEHLEND in DB: {len(missing)}")
# Missing by family
missing_by_fam = defaultdict(list)
for label in missing:
fam = label.split("-")[0]
missing_by_fam[fam].append(label)
print(f"\n Fehlende Controls nach Family:")
for fam in sorted(missing_by_fam.keys()):
ctrls = missing_by_fam[fam]
examples = ", ".join(ctrls[:5])
more = f" +{len(ctrls)-5}" if len(ctrls) > 5 else ""
print(f" {fam:4s}: {len(ctrls):3d} fehlend ({examples}{more})")
# Also check CSF 2.0
print(f"\n{'='*70}")
print("NIST CSF 2.0 — Vergleich mit DB")
print("=" * 70)
cur.execute("""
SELECT count(*), count(*) FILTER (WHERE release_state NOT IN ('duplicate', 'too_close'))
FROM compliance.canonical_controls
WHERE source_citation->>'source' LIKE 'NIST Cybersecurity%%'
""")
csf_row = cur.fetchone()
print(f" CSF Controls in DB: {csf_row[0]} total, {csf_row[1]} active")
csf_subcats = 0
csf_ids = []
for grp in csf_groups:
for cat in grp.get("groups", []):
for subcat in cat.get("controls", []):
csf_subcats += 1
props = {p["name"]: p.get("value", "") for p in subcat.get("props", [])}
csf_ids.append(props.get("label", subcat["id"]))
print(f" CSF 2.0 OSCAL Subcategories: {csf_subcats}")
print(f" Beispiele: {', '.join(csf_ids[:10])}")
# ── Summary / Potential ──
print(f"\n{'='*70}")
print("POTENTIAL: Was OSCAL uns bringt")
print("=" * 70)
print(f"""
SP 800-53 Rev 5:
- {len(missing)} neue Controls möglich (aktuell {len(covered)} in DB)
- Jeder Control hat: Statement + Guidance + Assessment-Methoden
- Cross-References zwischen Controls (für Mapping)
- Maschinenlesbare Parameter (ODP)
- Public Domain — keine Lizenzprobleme
CSF 2.0:
- {csf_subcats} Subcategories als Compliance-Controls
- 6 Functions (Govern, Identify, Protect, Detect, Respond, Recover)
- Direkte Mappings zu SP 800-53 Controls
Nächste Schritte:
1. Fehlende SP 800-53 Controls importieren ({len(missing)} Controls)
2. Statement-Text als source_original_text verwenden
3. article_type='control', article=Label (z.B. 'AC-6')
4. CSF 2.0 als eigene Regulation importieren
5. Cross-References als Grundlage für Control-Mappings nutzen
""")
conn.close()