"""Analyze NIST OSCAL data and compare with existing controls in DB.""" import os import re import json import psycopg2 import urllib.parse from collections import defaultdict OSCAL_DIR = os.path.expanduser("~/rag-ingestion/nist-oscal") # ── Load SP 800-53 Rev 5 ── with open(os.path.join(OSCAL_DIR, "sp800-53-rev5-catalog.json")) as f: sp853 = json.load(f)["catalog"] print("=" * 70) print("NIST SP 800-53 Rev 5 — OSCAL Catalog Analysis") print("=" * 70) print(f" UUID: {sp853.get('uuid', '?')}") print(f" Last Modified: {sp853.get('metadata', {}).get('last-modified', '?')}") # Count controls families = sp853.get("groups", []) total_base = 0 total_enhancements = 0 total_withdrawn = 0 total_active = 0 family_stats = [] for fam in families: fam_id = fam.get("id", "?") fam_title = fam.get("title", "?") controls = fam.get("controls", []) base = 0 enhancements = 0 withdrawn = 0 for ctrl in controls: # Check if withdrawn props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])} is_withdrawn = props.get("status") == "withdrawn" if is_withdrawn: withdrawn += 1 else: base += 1 # Count enhancements for enh in ctrl.get("controls", []): enh_props = {p["name"]: p.get("value", "") for p in enh.get("props", [])} if enh_props.get("status") == "withdrawn": withdrawn += 1 else: enhancements += 1 family_stats.append((fam_id, fam_title, base, enhancements, withdrawn)) total_base += base total_enhancements += enhancements total_withdrawn += withdrawn total_active = total_base + total_enhancements print(f"\n Families: {len(families)}") print(f" Base Controls: {total_base}") print(f" Enhancements: {total_enhancements}") print(f" Withdrawn: {total_withdrawn}") print(f" TOTAL ACTIVE: {total_active}") print(f"\n Per Family:") print(f" {'ID':6s} {'Title':45s} {'Base':>5s} {'Enh':>5s} {'Wdrn':>5s}") for fam_id, title, base, enh, wdrn in family_stats: print(f" {fam_id:6s} {title[:45]:45s} {base:5d} {enh:5d} {wdrn:5d}") # Show example control structure print(f"\n Example Control (AC-6 Least Privilege):") for fam in families: for ctrl in fam.get("controls", []): if ctrl["id"] == "ac-6": props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])} print(f" ID: {ctrl['id']}") print(f" Label: {props.get('label', '?')}") print(f" Title: {ctrl['title']}") for part in ctrl.get("parts", []): if part.get("name") == "statement": prose = part.get("prose", "") print(f" Statement: {prose[:150]}...") elif part.get("name") == "guidance": prose = part.get("prose", "") print(f" Guidance: {prose[:150]}...") enh_count = len(ctrl.get("controls", [])) print(f" Enhancements: {enh_count}") links = [l["href"].lstrip("#") for l in ctrl.get("links", []) if l.get("rel") == "related"] print(f" Related: {', '.join(links[:8])}...") break # ── Load CSF 2.0 ── print(f"\n{'='*70}") print("NIST CSF 2.0 — OSCAL Catalog Analysis") print("=" * 70) with open(os.path.join(OSCAL_DIR, "csf-2.0-catalog.json")) as f: csf = json.load(f)["catalog"] csf_groups = csf.get("groups", []) csf_total = 0 for grp in csf_groups: func_title = grp.get("title", "?") cats = grp.get("groups", []) subcats = 0 for cat in cats: subcats += len(cat.get("controls", [])) csf_total += subcats print(f" {func_title:25s}: {len(cats):2d} categories, {subcats:3d} subcategories") print(f" TOTAL: {csf_total} subcategories") # ── Compare with existing DB controls ── print(f"\n{'='*70}") print("VERGLEICH: OSCAL vs. bestehende Controls in DB") print("=" * 70) db_url = os.environ['DATABASE_URL'] parsed = urllib.parse.urlparse(db_url) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, user=parsed.username, password=parsed.password, dbname=parsed.path.lstrip('/'), options="-c search_path=compliance,public" ) cur = conn.cursor() # Get existing NIST controls cur.execute(""" SELECT control_id, title, source_citation->>'source' as source, source_citation->>'article' as article, source_citation->>'article_type' as art_type, release_state FROM compliance.canonical_controls WHERE source_citation->>'source' LIKE 'NIST%%' ORDER BY source_citation->>'source', control_id """) nist_controls = cur.fetchall() # Group by source by_source = defaultdict(list) for ctrl in nist_controls: by_source[ctrl[2]].append(ctrl) print(f"\n Bestehende NIST Controls in DB:") for src in sorted(by_source.keys()): ctrls = by_source[src] active = sum(1 for c in ctrls if c[5] not in ('duplicate', 'too_close')) with_article = sum(1 for c in ctrls if c[3]) print(f" {src:40s}: {len(ctrls):4d} total, {active:4d} active, {with_article:4d} mit article") # For SP 800-53: which control families do we have? sp853_existing = [c for c in nist_controls if 'SP 800-53' in (c[2] or '')] existing_families = set() existing_articles = set() for ctrl in sp853_existing: article = ctrl[3] or "" if article: # Extract family prefix (e.g., "AC-6" → "AC") m = re.match(r'([A-Z]{2})-', article) if m: existing_families.add(m.group(1)) existing_articles.add(article) print(f"\n SP 800-53 in DB:") print(f" Total: {len(sp853_existing)}") print(f" Families covered: {len(existing_families)}") print(f" Unique articles: {len(existing_articles)}") print(f" Families: {', '.join(sorted(existing_families))}") # Compare: which OSCAL controls are NOT in our DB? oscal_controls = {} # id → (label, title, statement) for fam in families: for ctrl in fam.get("controls", []): props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])} if props.get("status") == "withdrawn": continue label = props.get("label", ctrl["id"].upper()) statement = "" guidance = "" for part in ctrl.get("parts", []): if part.get("name") == "statement": statement = part.get("prose", "") # Also check sub-items for sub in part.get("parts", []): statement += " " + sub.get("prose", "") elif part.get("name") == "guidance": guidance = part.get("prose", "") oscal_controls[label] = (ctrl["title"], statement[:500], guidance[:500]) # Enhancements for enh in ctrl.get("controls", []): enh_props = {p["name"]: p.get("value", "") for p in enh.get("props", [])} if enh_props.get("status") == "withdrawn": continue enh_label = enh_props.get("label", enh["id"].upper()) enh_statement = "" enh_guidance = "" for part in enh.get("parts", []): if part.get("name") == "statement": enh_statement = part.get("prose", "") for sub in part.get("parts", []): enh_statement += " " + sub.get("prose", "") elif part.get("name") == "guidance": enh_guidance = part.get("prose", "") oscal_controls[enh_label] = (enh["title"], enh_statement[:500], enh_guidance[:500]) print(f"\n OSCAL SP 800-53 aktive Controls: {len(oscal_controls)}") # Find missing: in OSCAL but not in DB missing = [] covered = [] for label in sorted(oscal_controls.keys()): if label in existing_articles: covered.append(label) else: missing.append(label) print(f" In DB vorhanden: {len(covered)}") print(f" FEHLEND in DB: {len(missing)}") # Missing by family missing_by_fam = defaultdict(list) for label in missing: fam = label.split("-")[0] missing_by_fam[fam].append(label) print(f"\n Fehlende Controls nach Family:") for fam in sorted(missing_by_fam.keys()): ctrls = missing_by_fam[fam] examples = ", ".join(ctrls[:5]) more = f" +{len(ctrls)-5}" if len(ctrls) > 5 else "" print(f" {fam:4s}: {len(ctrls):3d} fehlend ({examples}{more})") # Also check CSF 2.0 print(f"\n{'='*70}") print("NIST CSF 2.0 — Vergleich mit DB") print("=" * 70) cur.execute(""" SELECT count(*), count(*) FILTER (WHERE release_state NOT IN ('duplicate', 'too_close')) FROM compliance.canonical_controls WHERE source_citation->>'source' LIKE 'NIST Cybersecurity%%' """) csf_row = cur.fetchone() print(f" CSF Controls in DB: {csf_row[0]} total, {csf_row[1]} active") csf_subcats = 0 csf_ids = [] for grp in csf_groups: for cat in grp.get("groups", []): for subcat in cat.get("controls", []): csf_subcats += 1 props = {p["name"]: p.get("value", "") for p in subcat.get("props", [])} csf_ids.append(props.get("label", subcat["id"])) print(f" CSF 2.0 OSCAL Subcategories: {csf_subcats}") print(f" Beispiele: {', '.join(csf_ids[:10])}") # ── Summary / Potential ── print(f"\n{'='*70}") print("POTENTIAL: Was OSCAL uns bringt") print("=" * 70) print(f""" SP 800-53 Rev 5: - {len(missing)} neue Controls möglich (aktuell {len(covered)} in DB) - Jeder Control hat: Statement + Guidance + Assessment-Methoden - Cross-References zwischen Controls (für Mapping) - Maschinenlesbare Parameter (ODP) - Public Domain — keine Lizenzprobleme CSF 2.0: - {csf_subcats} Subcategories als Compliance-Controls - 6 Functions (Govern, Identify, Protect, Detect, Respond, Recover) - Direkte Mappings zu SP 800-53 Controls Nächste Schritte: 1. Fehlende SP 800-53 Controls importieren ({len(missing)} Controls) 2. Statement-Text als source_original_text verwenden 3. article_type='control', article=Label (z.B. 'AC-6') 4. CSF 2.0 als eigene Regulation importieren 5. Cross-References als Grundlage für Control-Mappings nutzen """) conn.close()