diff --git a/control-pipeline/scripts/backfill_license_rule.py b/control-pipeline/scripts/backfill_license_rule.py new file mode 100644 index 0000000..b0d312d --- /dev/null +++ b/control-pipeline/scripts/backfill_license_rule.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +"""Backfill license_rule on canonical_controls by inheriting from parent. + +Background +========== + +Audit (audit_license_classification.py) showed that 279,384 of 314,811 rows +in compliance.canonical_controls have NULL license_rule. Drilling in: + +- 261,980 of those (94%) have a parent_control_uuid whose parent already + carries a non-NULL license_rule. The pass0b decomposition pipeline did + not propagate the rule to its child controls — this is a clear inheritance + bug, fixable without any classification decisions. +- 16,617 have a parent that itself has no license_rule (transitive case). + Inheriting iteratively converges to either rule-set or root-orphan. +- 787 have no parent at all (decomposition roots). These need cluster-based + manual classification (see Strategy Notes at the bottom of this file). + +This script runs the inheritance fix in three idempotent stages and +prints per-stage counts before any write happens. + +Usage:: + + # Always dry-run first: + python3 scripts/backfill_license_rule.py --db-host 100.80.114.48 \\ + --db-password breakpilot123 --dry-run + + # If counts look right: + python3 scripts/backfill_license_rule.py --db-host 100.80.114.48 \\ + --db-password breakpilot123 --apply + +The script is safe to rerun — it only touches rows where license_rule +IS NULL. +""" + +from __future__ import annotations + +import argparse +import sys + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--db-host", default="100.80.114.48") + p.add_argument("--db-port", type=int, default=5432) + p.add_argument("--db-user", default="breakpilot") + p.add_argument("--db-name", default="breakpilot_db") + p.add_argument("--db-password", required=True) + g = p.add_mutually_exclusive_group(required=True) + g.add_argument("--dry-run", action="store_true") + g.add_argument("--apply", action="store_true") + p.add_argument("--max-iterations", type=int, default=5, + help="Cap on inheritance iterations to avoid loops") + return p.parse_args() + + +# Stage 1: direct parent has license_rule — single UPDATE. +# Stage 2: iterative — parent did not have it, but a grandparent does. +# We loop until no more rows can be filled or max-iterations. +# Stage 3: residual rows with no resolvable parent. Report them clustered +# by category/pattern_id so the user can classify by family. + +SQL_REPORT_NULLS = """ +SET search_path TO compliance, public; +SELECT + CASE WHEN cc.parent_control_uuid IS NULL THEN 'no_parent' + WHEN p.license_rule IS NULL THEN 'parent_null' + ELSE 'parent_set' END AS bucket, + COUNT(*) AS n +FROM canonical_controls cc +LEFT JOIN canonical_controls p ON cc.parent_control_uuid = p.id +WHERE cc.license_rule IS NULL +GROUP BY 1 ORDER BY 2 DESC; +""" + +SQL_INHERIT_FROM_PARENT = """ +SET search_path TO compliance, public; +UPDATE canonical_controls cc +SET license_rule = p.license_rule, updated_at = NOW() +FROM canonical_controls p +WHERE cc.parent_control_uuid = p.id + AND cc.license_rule IS NULL + AND p.license_rule IS NOT NULL; +""" + +SQL_REPORT_ORPHAN_CLUSTERS = """ +SET search_path TO compliance, public; +SELECT + COALESCE(category, '(null)') AS category, + COALESCE(pattern_id, '(null)') AS pattern_id, + COALESCE(generation_strategy, '(null)') AS gen, + COUNT(*) AS n +FROM canonical_controls +WHERE license_rule IS NULL AND parent_control_uuid IS NULL +GROUP BY 1, 2, 3 ORDER BY n DESC LIMIT 25; +""" + + +def print_bucket(rows, label: str) -> None: + print(f"\n## {label}") + total = 0 + for bucket, n in rows: + print(f" {bucket:12} {n:>8}") + total += n + print(f" {'TOTAL':12} {total:>8}") + + +def main() -> int: + args = parse_args() + try: + import psycopg2 + except ImportError: + print("error: psycopg2 not installed", file=sys.stderr) + return 2 + + conn = psycopg2.connect( + host=args.db_host, port=args.db_port, user=args.db_user, + dbname=args.db_name, password=args.db_password, + ) + conn.autocommit = False + cur = conn.cursor() + + print("=" * 60) + print(" Backfill — license_rule via parent inheritance") + print(f" Mode: {'DRY-RUN' if args.dry_run else 'APPLY'}") + print("=" * 60) + + # Initial bucket report + cur.execute(SQL_REPORT_NULLS) + rows = cur.fetchall() + print_bucket(rows, "Initial NULL distribution") + + if args.dry_run: + # Print what the FIRST inherit pass would resolve (without writing) + cur.execute( + "SET search_path TO compliance, public; " + "SELECT p.license_rule, COUNT(*) " + "FROM canonical_controls cc " + "JOIN canonical_controls p ON cc.parent_control_uuid = p.id " + "WHERE cc.license_rule IS NULL AND p.license_rule IS NOT NULL " + "GROUP BY 1 ORDER BY 1;" + ) + print("\n## First inherit-pass would fill:") + for rule, n in cur.fetchall(): + print(f" rule={rule} {n:>8} rows") + + # Show orphan clusters that would remain + cur.execute(SQL_REPORT_ORPHAN_CLUSTERS) + print("\n## Orphan clusters (no parent + no rule, top 25):") + for cat, pid, gen, n in cur.fetchall(): + print(f" cat={cat[:20]:20} pat={pid[:20]:20} gen={gen[:20]:20} n={n}") + print("\nNo writes performed. Use --apply to execute.") + conn.rollback() + return 0 + + # Apply mode — iterative inheritance + total_updated = 0 + for i in range(1, args.max_iterations + 1): + cur.execute(SQL_INHERIT_FROM_PARENT) + updated = cur.rowcount + total_updated += updated + print(f"\n iteration {i}: {updated} rows updated") + if updated == 0: + break + + conn.commit() + print(f"\n✓ Total rows backfilled: {total_updated}") + + # Final bucket report + cur.execute(SQL_REPORT_NULLS) + print_bucket(cur.fetchall(), "Remaining NULL distribution") + + cur.execute(SQL_REPORT_ORPHAN_CLUSTERS) + rows = cur.fetchall() + if rows: + print("\n## Orphan clusters still need classification:") + for cat, pid, gen, n in rows: + print(f" cat={cat[:20]:20} pat={pid[:20]:20} gen={gen[:20]:20} n={n}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/control-pipeline/scripts/backfill_qdrant_license_payload.py b/control-pipeline/scripts/backfill_qdrant_license_payload.py new file mode 100644 index 0000000..72225a0 --- /dev/null +++ b/control-pipeline/scripts/backfill_qdrant_license_payload.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +"""Backfill ``license_rule`` payload field into Qdrant atomic_controls_dedup +and related compliance collections, sourced from canonical_controls in Postgres. + +The audit (audit_license_classification.py) surfaced that Qdrant collections +holding canonical-control vectors (notably ``atomic_controls_dedup``) carry no +license_rule payload at all, even though the underlying Postgres table is now +fully classified. This script joins the two via ``control_uuid`` and patches the +Qdrant payload in batches. + +Usage:: + + python3 scripts/backfill_qdrant_license_payload.py \\ + --pg-host 100.80.114.48 --pg-password breakpilot123 \\ + --qdrant http://100.80.114.48:6333 \\ + --collection atomic_controls_dedup \\ + --dry-run + + # apply + python3 scripts/backfill_qdrant_license_payload.py ... --apply + +Notes +----- +- ``control_uuid`` lives in the payload of atomic_controls_dedup. For other + collections that key the canonical control by a different field, override with + ``--uuid-field``. +- Qdrant ``set_payload`` is keyed by point id, not payload field. We resolve + UUID → point id by a paginated scroll-and-filter pass, then issue grouped + set_payload requests per license_rule (3 batches per collection). +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from typing import Iterator +from urllib import request as urllib_request + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--pg-host", default="100.80.114.48") + p.add_argument("--pg-port", type=int, default=5432) + p.add_argument("--pg-user", default="breakpilot") + p.add_argument("--pg-name", default="breakpilot_db") + p.add_argument("--pg-password", required=True) + p.add_argument("--qdrant", default="http://100.80.114.48:6333") + p.add_argument("--qdrant-api-key", default="", + help="API key for managed Qdrant (Production)") + p.add_argument("--collection", default="atomic_controls_dedup") + p.add_argument("--uuid-field", default="control_uuid", + help="Payload field used for lookup (control_uuid or regulation_id)") + p.add_argument("--lookup", choices=["canonical_controls", "regulation_registry"], + default="canonical_controls", + help="Postgres table to resolve the lookup against") + p.add_argument("--batch-size", type=int, default=500) + g = p.add_mutually_exclusive_group(required=True) + g.add_argument("--dry-run", action="store_true") + g.add_argument("--apply", action="store_true") + return p.parse_args() + + +def fetch_rule_by_uuid(args) -> dict[str, int]: + """Pull lookup-key → license_rule mapping from Postgres. + + Source table is chosen by ``--lookup``: + - canonical_controls: id (UUID) → license_rule, for atomic_controls_dedup + - regulation_registry: regulation_id → license_rule, for document chunks + """ + import psycopg2 + conn = psycopg2.connect( + host=args.pg_host, port=args.pg_port, user=args.pg_user, + dbname=args.pg_name, password=args.pg_password, + ) + cur = conn.cursor() + cur.execute("SET search_path TO compliance, public;") + if args.lookup == "regulation_registry": + cur.execute( + "SELECT regulation_id, license_rule FROM regulation_registry " + "WHERE license_rule IS NOT NULL" + ) + else: + cur.execute( + "SELECT id::text, license_rule FROM canonical_controls " + "WHERE license_rule IS NOT NULL" + ) + mapping = {row[0]: int(row[1]) for row in cur.fetchall()} + conn.close() + return mapping + + +def _headers(api_key: str = "") -> dict: + h = {"Content-Type": "application/json"} + if api_key: + h["api-key"] = api_key + return h + + +def scroll_collection(qdrant: str, collection: str, uuid_field: str, api_key: str = "") -> Iterator[dict]: + """Yield (point_id, uuid_value, has_rule_already) tuples.""" + next_offset = None + while True: + body = {"limit": 1000, "with_payload": True, "with_vector": False} + if next_offset is not None: + body["offset"] = next_offset + req = urllib_request.Request( + f"{qdrant}/collections/{collection}/points/scroll", + data=json.dumps(body).encode(), + headers=_headers(api_key), + ) + with urllib_request.urlopen(req, timeout=60) as r: + payload = json.loads(r.read()) + result = payload.get("result", {}) + for pt in result.get("points", []): + pl = pt.get("payload", {}) or {} + yield { + "id": pt["id"], + "uuid": pl.get(uuid_field), + "has_rule": "license_rule" in pl, + } + next_offset = result.get("next_page_offset") + if next_offset is None: + break + + +def set_payload_batch(qdrant: str, collection: str, point_ids: list, rule: int, api_key: str = "") -> int: + """POST set_payload for a batch of point IDs with a single license_rule.""" + body = { + "payload": {"license_rule": rule}, + "points": point_ids, + } + req = urllib_request.Request( + f"{qdrant}/collections/{collection}/points/payload?wait=true", + data=json.dumps(body).encode(), + headers=_headers(api_key), + method="POST", + ) + with urllib_request.urlopen(req, timeout=120) as r: + resp = json.loads(r.read()) + if resp.get("status") != "ok": + raise RuntimeError(f"set_payload failed: {resp}") + return len(point_ids) + + +def main() -> int: + args = parse_args() + print("Loading canonical_controls → license_rule mapping…") + rule_by_uuid = fetch_rule_by_uuid(args) + print(f" Postgres returned {len(rule_by_uuid)} classified controls") + + print(f"Scrolling Qdrant collection {args.collection!r}…") + by_rule: dict[int, list] = {1: [], 2: [], 3: []} + points_total = 0 + points_with_uuid = 0 + points_already_set = 0 + points_no_match = 0 + + for pt in scroll_collection(args.qdrant, args.collection, args.uuid_field, args.qdrant_api_key): + points_total += 1 + uuid = pt["uuid"] + if not uuid: + continue + points_with_uuid += 1 + if pt["has_rule"]: + points_already_set += 1 + continue + rule = rule_by_uuid.get(uuid) + if rule is None: + points_no_match += 1 + continue + if rule not in by_rule: + continue + by_rule[rule].append(pt["id"]) + + print(f" total points scanned: {points_total}") + print(f" with {args.uuid_field}: {points_with_uuid}") + print(f" already had license_rule: {points_already_set}") + print(f" uuid not found in Postgres: {points_no_match}") + print(f" to set per rule: rule1={len(by_rule[1])} rule2={len(by_rule[2])} rule3={len(by_rule[3])}") + + if args.dry_run: + print("\nDRY-RUN: no writes performed. Use --apply to execute.") + return 0 + + total_written = 0 + for rule, ids in by_rule.items(): + if not ids: + continue + print(f"\nWriting license_rule={rule} to {len(ids)} points (batch {args.batch_size})…") + for i in range(0, len(ids), args.batch_size): + chunk = ids[i:i + args.batch_size] + n = set_payload_batch(args.qdrant, args.collection, chunk, rule, args.qdrant_api_key) + total_written += n + print(f" batch {i // args.batch_size + 1}: {n} points (cumulative {total_written})") + time.sleep(0.05) + print(f"\nWrote license_rule on {total_written} Qdrant points in {args.collection}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())