#!/usr/bin/env python3 """Audit script for license classification gaps in the control pipeline. Reports: 1. **regulation_registry coverage** — how many regulations are classified, by rule and license_type. 2. **atomic_controls without license_rule** — how many controls reference a regulation_id that has no entry (or no license_rule) in the registry. 3. **Qdrant payload consistency** — for each indexed collection, how many chunks carry both ``license`` and ``license_rule`` payload fields. The goal is to surface every record where the engine could in principle extract or emit content but the license rule is unknown — those records are the highest-risk material in a license audit. Usage:: python3 scripts/audit_license_classification.py --db-host 100.80.114.48 Add ``--check-qdrant`` to also probe ``http://:6333`` collections. """ from __future__ import annotations import argparse import json import sys from collections import Counter from pathlib import Path from typing import Optional from urllib import request as urllib_request sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) DEFAULT_HOST = "100.80.114.48" DEFAULT_PORT = 5432 DEFAULT_USER = "breakpilot" DEFAULT_DB = "breakpilot_db" def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description=__doc__) p.add_argument("--db-host", default=DEFAULT_HOST) p.add_argument("--db-port", type=int, default=DEFAULT_PORT) p.add_argument("--db-user", default=DEFAULT_USER) p.add_argument("--db-name", default=DEFAULT_DB) p.add_argument("--db-password", default="") p.add_argument("--check-qdrant", action="store_true") p.add_argument("--qdrant-host", default="100.80.114.48") p.add_argument("--qdrant-port", type=int, default=6333) p.add_argument("--json", action="store_true", help="Emit JSON result on stdout") return p.parse_args() def audit_registry(conn) -> dict: """Coverage of regulation_registry.""" cur = conn.cursor() cur.execute( "SET search_path TO compliance, public; " "SELECT license_rule, license_type, COUNT(*) " "FROM regulation_registry GROUP BY license_rule, license_type " "ORDER BY license_rule, license_type;" ) by_rule_and_type: list[tuple] = [] by_rule: Counter = Counter() for rule, ltype, count in cur.fetchall(): by_rule_and_type.append((rule, ltype or "(empty)", count)) by_rule[rule] += count cur.execute( "SELECT COUNT(*) FROM regulation_registry " "WHERE license_type IS NULL OR license_type = '';" ) missing_type = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM regulation_registry;") total = cur.fetchone()[0] return { "total": total, "by_rule": dict(by_rule), "by_rule_and_type": by_rule_and_type, "missing_license_type": missing_type, } def audit_atomic_controls(conn) -> dict: """Controls whose source regulation has no license rule. Important: the schema differs between core (bp-core) and customer deployments. We probe a handful of likely column names and skip if none are found. """ cur = conn.cursor() # Detect controls table cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema='compliance' AND table_name IN " "('atomic_controls','atomic_controls_dedup','canonical_controls');" ) tables = [r[0] for r in cur.fetchall()] if not tables: return {"skipped": True, "reason": "no controls table found"} result: dict = {"tables": {}} for tbl in tables: cur.execute( f"SELECT column_name FROM information_schema.columns " f"WHERE table_schema='compliance' AND table_name='{tbl}';" ) cols = {r[0] for r in cur.fetchall()} if "license_rule" not in cols: result["tables"][tbl] = {"skipped": True, "reason": "no license_rule column"} continue cur.execute(f"SELECT COUNT(*) FROM compliance.{tbl};") total = cur.fetchone()[0] cur.execute( f"SELECT license_rule, COUNT(*) FROM compliance.{tbl} " f"GROUP BY license_rule ORDER BY license_rule;" ) by_rule = {str(r[0]): r[1] for r in cur.fetchall()} cur.execute( f"SELECT COUNT(*) FROM compliance.{tbl} WHERE license_rule IS NULL;" ) missing = cur.fetchone()[0] result["tables"][tbl] = { "total": total, "by_rule": by_rule, "missing_license_rule": missing, } return result def audit_qdrant(host: str, port: int) -> dict: """Probe Qdrant collections for license + license_rule payload coverage. Samples 500 points per collection and reports how many have neither field populated. """ out: dict = {"collections": {}} base = f"http://{host}:{port}" try: with urllib_request.urlopen(f"{base}/collections", timeout=10) as r: colls = json.loads(r.read()).get("result", {}).get("collections", []) except Exception as e: return {"error": str(e)} for c in colls: name = c["name"] if "compliance" not in name and "atomic_controls" not in name: continue payload = {"limit": 500, "with_payload": True, "with_vector": False} req = urllib_request.Request( f"{base}/collections/{name}/points/scroll", data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, ) try: with urllib_request.urlopen(req, timeout=15) as r: points = json.loads(r.read()).get("result", {}).get("points", []) except Exception as e: out["collections"][name] = {"error": str(e)} continue sampled = len(points) both_set = 0 only_license = 0 only_rule = 0 neither = 0 for p in points: pl = p.get("payload", {}) or {} has_lic = bool(pl.get("license")) has_rule = pl.get("license_rule") is not None if has_lic and has_rule: both_set += 1 elif has_lic: only_license += 1 elif has_rule: only_rule += 1 else: neither += 1 out["collections"][name] = { "sampled": sampled, "both_set": both_set, "only_license_field": only_license, "only_license_rule_field": only_rule, "neither_set": neither, "neither_pct": round(neither / sampled * 100, 1) if sampled else 0, } return out def main() -> int: args = parse_args() try: import psycopg2 except ImportError: print("error: psycopg2 not installed (pip install psycopg2-binary)", file=sys.stderr) return 2 conn = psycopg2.connect( host=args.db_host, port=args.db_port, user=args.db_user, dbname=args.db_name, password=args.db_password or None, ) try: registry = audit_registry(conn) controls = audit_atomic_controls(conn) finally: conn.close() qdrant: Optional[dict] = None if args.check_qdrant: qdrant = audit_qdrant(args.qdrant_host, args.qdrant_port) result = {"registry": registry, "atomic_controls": controls, "qdrant": qdrant} if args.json: print(json.dumps(result, indent=2, default=str)) return 0 print("=" * 60) print(" Audit — License Classification") print("=" * 60) print() print(f"## regulation_registry ({registry['total']} rows)") print(f" By rule: {registry['by_rule']}") print(f" Missing license_type: {registry['missing_license_type']}") print() print("## atomic_controls") for tbl, info in controls.get("tables", {}).items(): if info.get("skipped"): print(f" {tbl}: SKIPPED ({info['reason']})") continue print(f" {tbl}: {info['total']} rows") print(f" by_rule={info['by_rule']}") print(f" missing_license_rule={info['missing_license_rule']}") print() if qdrant: print("## qdrant") for name, info in qdrant.get("collections", {}).items(): if "error" in info: print(f" {name}: ERROR {info['error']}") continue print( f" {name:30} sampled={info['sampled']:4} " f"both={info['both_set']:4} " f"neither={info['neither_set']:4} ({info['neither_pct']}%)" ) return 0 if __name__ == "__main__": raise SystemExit(main())