"""Import 776 missing NIST SP 800-53 Rev 5 controls from OSCAL into canonical_controls.""" import os import re import json import uuid import psycopg2 import urllib.parse OSCAL_DIR = os.path.expanduser("~/rag-ingestion/nist-oscal") with open(os.path.join(OSCAL_DIR, "sp800-53-rev5-catalog.json")) as f: sp853 = json.load(f)["catalog"] # ── Extract all OSCAL controls ── def extract_controls(catalog): """Extract all active controls with full data.""" controls = [] for fam in catalog.get("groups", []): fam_id = fam.get("id", "").upper() fam_title = fam.get("title", "") for ctrl in fam.get("controls", []): result = extract_single(ctrl, fam_title) if result: controls.append(result) # Enhancements for enh in ctrl.get("controls", []): result = extract_single(enh, fam_title) if result: controls.append(result) return controls def extract_single(ctrl, family_title): """Extract a single control or enhancement.""" props = {p["name"]: p.get("value", "") for p in ctrl.get("props", [])} if props.get("status") == "withdrawn": return None label = props.get("label", ctrl["id"].upper()) title = ctrl.get("title", "") # Extract statement (main requirement text) statement = "" for part in ctrl.get("parts", []): if part.get("name") == "statement": statement = part.get("prose", "") # Sub-items (a., b., c., etc.) for sub in part.get("parts", []): sub_prose = sub.get("prose", "") sub_label = "" for sp in sub.get("props", []): if sp["name"] == "label": sub_label = sp.get("value", "") if sub_label: statement += f"\n{sub_label} {sub_prose}" elif sub_prose: statement += f"\n{sub_prose}" # Nested sub-sub-items for subsub in sub.get("parts", []): ss_prose = subsub.get("prose", "") ss_label = "" for sp in subsub.get("props", []): if sp["name"] == "label": ss_label = sp.get("value", "") if ss_label: statement += f"\n {ss_label} {ss_prose}" elif ss_prose: statement += f"\n {ss_prose}" # Extract guidance guidance = "" for part in ctrl.get("parts", []): if part.get("name") == "guidance": guidance = part.get("prose", "") # Cross-references related = [l["href"].lstrip("#") for l in ctrl.get("links", []) if l.get("rel") == "related"] # Parameters params = [] for p in ctrl.get("params", []): param_id = p.get("id", "") param_label = p.get("label", "") guidelines = "" for g in p.get("guidelines", []): guidelines += g.get("prose", "") select_choices = [] if "select" in p: for choice in p["select"].get("choice", []): select_choices.append(choice) params.append({ "id": param_id, "label": param_label, "guidelines": guidelines, "choices": select_choices, }) return { "label": label, "title": title, "family": family_title, "statement": statement.strip(), "guidance": guidance.strip(), "related": related, "params": params, "is_enhancement": "(" in label, } all_oscal = extract_controls(sp853) print(f"Total OSCAL active controls: {len(all_oscal)}") # ── Normalize label for comparison ── def normalize_label(label): label = re.sub(r'-0+(\d)', r'-\1', label) label = re.sub(r'\(0+(\d+)\)', r'(\1)', label) return label.upper() # ── DB connection ── db_url = os.environ['DATABASE_URL'] parsed = urllib.parse.urlparse(db_url) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, user=parsed.username, password=parsed.password, dbname=parsed.path.lstrip('/'), options="-c search_path=compliance,public" ) cur = conn.cursor() # Get existing labels cur.execute(""" SELECT DISTINCT source_citation->>'article' as article FROM compliance.canonical_controls WHERE source_citation->>'source' = 'NIST SP 800-53 Rev. 5' AND source_citation->>'article' IS NOT NULL """) existing_labels = set(normalize_label(r[0]) for r in cur.fetchall()) print(f"Existing DB labels (normalized): {len(existing_labels)}") # Get highest control_id numbers per prefix cur.execute(""" SELECT control_id FROM compliance.canonical_controls WHERE control_id ~ '^[A-Z]+-[0-9]+$' ORDER BY control_id """) existing_ids = set(r[0] for r in cur.fetchall()) # Find next available ID per prefix def next_control_id(prefix, existing): """Find next available control_id like SEC-1234.""" max_num = 0 pattern = re.compile(rf'^{prefix}-(\d+)$') for eid in existing: m = pattern.match(eid) if m: max_num = max(max_num, int(m.group(1))) return max_num # Map NIST families to our control_id prefixes FAMILY_PREFIX = { "Access Control": "ACC", "Awareness and Training": "GOV", "Audit and Accountability": "LOG", "Assessment, Authorization, and Monitoring": "GOV", "Configuration Management": "COMP", "Contingency Planning": "INC", "Identification and Authentication": "AUTH", "Incident Response": "INC", "Maintenance": "COMP", "Media Protection": "DATA", "Physical and Environmental Protection": "SEC", "Planning": "GOV", "Program Management": "GOV", "Personnel Security": "GOV", "Personally Identifiable Information Processing and Transparency": "DATA", "Risk Assessment": "GOV", "System and Services Acquisition": "COMP", "System and Communications Protection": "NET", "System and Information Integrity": "SEC", "Supply Chain Risk Management": "COMP", } # Track next IDs prefix_counters = {} for prefix in set(FAMILY_PREFIX.values()): prefix_counters[prefix] = next_control_id(prefix, existing_ids) print(f"Starting counters: {prefix_counters}") # ── Filter to only new controls ── to_import = [] for ctrl in all_oscal: norm = normalize_label(ctrl["label"]) if norm not in existing_labels: to_import.append(ctrl) print(f"\nControls to import: {len(to_import)}") # ── Import ── imported = 0 for ctrl in to_import: prefix = FAMILY_PREFIX.get(ctrl["family"], "COMP") prefix_counters[prefix] += 1 control_id = f"{prefix}-{prefix_counters[prefix]:04d}" # Build title: "NIST {label}: {title}" title = f"NIST {ctrl['label']}: {ctrl['title']}" # source_original_text = statement (the official requirement text) source_text = ctrl["statement"] if not source_text: source_text = ctrl["guidance"][:500] if ctrl["guidance"] else ctrl["title"] # objective = guidance text objective = ctrl["guidance"][:2000] if ctrl["guidance"] else "" # source_citation citation = { "source": "NIST SP 800-53 Rev. 5", "article": ctrl["label"], "article_type": "control", "source_type": "standard", "oscal_import": True, } if ctrl["related"]: citation["related_controls"] = ctrl["related"][:20] if ctrl["params"]: citation["parameters"] = [{"id": p["id"], "label": p["label"]} for p in ctrl["params"][:10]] FRAMEWORK_ID = '14b1bdd2-abc7-4a43-adae-14471ee5c7cf' new_id = str(uuid.uuid4()) cur.execute(""" INSERT INTO compliance.canonical_controls (id, framework_id, control_id, title, objective, rationale, severity, source_original_text, source_citation, pipeline_version, release_state, generation_strategy, category) VALUES (%s, %s, %s, %s, %s, '', 'medium', %s, %s, 4, 'draft', 'oscal_import', %s) """, ( new_id, FRAMEWORK_ID, control_id, title[:500], objective[:5000], source_text[:10000], json.dumps(citation, ensure_ascii=False), ctrl["family"], )) imported += 1 conn.commit() print(f"\nImported: {imported} new controls") # ── Verify ── cur.execute(""" SELECT count(*), count(*) FILTER (WHERE release_state NOT IN ('duplicate', 'too_close')) FROM compliance.canonical_controls WHERE source_citation->>'source' = 'NIST SP 800-53 Rev. 5' """) total, active = cur.fetchone() print(f"\nSP 800-53 after import: {total} total, {active} active") cur.execute(""" SELECT release_state, count(*) FROM compliance.canonical_controls GROUP BY release_state ORDER BY count(*) DESC """) print(f"\nDB release_state gesamt:") for row in cur.fetchall(): print(f" {row[0]:15s}: {row[1]:5d}") cur.execute(""" SELECT count(*) FROM compliance.canonical_controls WHERE release_state NOT IN ('duplicate', 'too_close') """) print(f"\nAktive Controls gesamt: {cur.fetchone()[0]}") # ── Import stats by family ── fam_counts = {} for ctrl in to_import: fam = ctrl["family"] fam_counts[fam] = fam_counts.get(fam, 0) + 1 print(f"\nImportiert nach Family:") for fam in sorted(fam_counts.keys()): print(f" {fam[:45]:45s}: {fam_counts[fam]:3d}") conn.close()