Files
Benjamin Admin 93687a32fe docs(licenses): freeze 3-rule license mapping + audit script
Defines the authoritative mapping from license_type to license_rule
in docs/LICENSE_RULES.md, and adds scripts/audit_license_classification.py
to surface classification gaps in registry/canonical_controls/Qdrant.

Key finding from first audit run against bp-core-postgres + Qdrant:

- regulation_registry: 232 rows, 224 rule=1, 8 rule=2, 0 rule=3;
  36 rows without license_type (need backfill)
- canonical_controls: 314,811 rows, 279,384 (89%) have NULL
  license_rule (target of Task #22 reclassification)
- Qdrant atomic_controls_dedup: 100% of sampled points lack both
  license and license_rule payload fields
- Qdrant bp_compliance_gesetze: 80.6% lack both fields
- Qdrant bp_compliance_ce + bp_compliance: nearly clean

Rule definitions clarified (was loosely remembered as
"law / cite / rewrite"):
- Rule 1 = verbatim, sovereign law (EU/DE/AT/CH/US, TRBS/TRGS/ASR,
  OSHA, NIST, EU guidelines, DGUV UVV)
- Rule 2 = verbatim with attribution (CC-BY, Apache, OWASP,
  OECD AI Principles, ENISA)
- Rule 3 = identifier citation only, no full text (DIN/EN/ISO,
  ANSI/UL/IEC, DGUV Regeln/Informationen/Grundsaetze, BSI,
  proprietary standards). Pipeline drops chunk_text when rule=3
  in pipeline_adapter.py:147.

The 4th category I had proposed ("R1-A") turned out to be already
implemented as rule=2; the mapping doc reflects the actual code
behaviour rather than the original 3-name verbal model.

No schema change. No data migration in this commit — reclassification
of the 279k controls is staged as Task #22 and will be cluster-based
by source/regulation_id.
2026-05-21 11:29:38 +02:00

257 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""Audit script for license classification gaps in the control pipeline.
Reports:
1. **regulation_registry coverage** — how many regulations are classified, by
rule and license_type.
2. **atomic_controls without license_rule** — how many controls reference a
regulation_id that has no entry (or no license_rule) in the registry.
3. **Qdrant payload consistency** — for each indexed collection, how many
chunks carry both ``license`` and ``license_rule`` payload fields.
The goal is to surface every record where the engine could in principle
extract or emit content but the license rule is unknown — those records are
the highest-risk material in a license audit.
Usage::
python3 scripts/audit_license_classification.py --db-host 100.80.114.48
Add ``--check-qdrant`` to also probe ``http://<host>:6333`` collections.
"""
from __future__ import annotations
import argparse
import json
import sys
from collections import Counter
from pathlib import Path
from typing import Optional
from urllib import request as urllib_request
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
DEFAULT_HOST = "100.80.114.48"
DEFAULT_PORT = 5432
DEFAULT_USER = "breakpilot"
DEFAULT_DB = "breakpilot_db"
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--db-host", default=DEFAULT_HOST)
p.add_argument("--db-port", type=int, default=DEFAULT_PORT)
p.add_argument("--db-user", default=DEFAULT_USER)
p.add_argument("--db-name", default=DEFAULT_DB)
p.add_argument("--db-password", default="")
p.add_argument("--check-qdrant", action="store_true")
p.add_argument("--qdrant-host", default="100.80.114.48")
p.add_argument("--qdrant-port", type=int, default=6333)
p.add_argument("--json", action="store_true", help="Emit JSON result on stdout")
return p.parse_args()
def audit_registry(conn) -> dict:
"""Coverage of regulation_registry."""
cur = conn.cursor()
cur.execute(
"SET search_path TO compliance, public; "
"SELECT license_rule, license_type, COUNT(*) "
"FROM regulation_registry GROUP BY license_rule, license_type "
"ORDER BY license_rule, license_type;"
)
by_rule_and_type: list[tuple] = []
by_rule: Counter = Counter()
for rule, ltype, count in cur.fetchall():
by_rule_and_type.append((rule, ltype or "(empty)", count))
by_rule[rule] += count
cur.execute(
"SELECT COUNT(*) FROM regulation_registry "
"WHERE license_type IS NULL OR license_type = '';"
)
missing_type = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM regulation_registry;")
total = cur.fetchone()[0]
return {
"total": total,
"by_rule": dict(by_rule),
"by_rule_and_type": by_rule_and_type,
"missing_license_type": missing_type,
}
def audit_atomic_controls(conn) -> dict:
"""Controls whose source regulation has no license rule.
Important: the schema differs between core (bp-core) and customer
deployments. We probe a handful of likely column names and skip if
none are found.
"""
cur = conn.cursor()
# Detect controls table
cur.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema='compliance' AND table_name IN "
"('atomic_controls','atomic_controls_dedup','canonical_controls');"
)
tables = [r[0] for r in cur.fetchall()]
if not tables:
return {"skipped": True, "reason": "no controls table found"}
result: dict = {"tables": {}}
for tbl in tables:
cur.execute(
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema='compliance' AND table_name='{tbl}';"
)
cols = {r[0] for r in cur.fetchall()}
if "license_rule" not in cols:
result["tables"][tbl] = {"skipped": True, "reason": "no license_rule column"}
continue
cur.execute(f"SELECT COUNT(*) FROM compliance.{tbl};")
total = cur.fetchone()[0]
cur.execute(
f"SELECT license_rule, COUNT(*) FROM compliance.{tbl} "
f"GROUP BY license_rule ORDER BY license_rule;"
)
by_rule = {str(r[0]): r[1] for r in cur.fetchall()}
cur.execute(
f"SELECT COUNT(*) FROM compliance.{tbl} WHERE license_rule IS NULL;"
)
missing = cur.fetchone()[0]
result["tables"][tbl] = {
"total": total,
"by_rule": by_rule,
"missing_license_rule": missing,
}
return result
def audit_qdrant(host: str, port: int) -> dict:
"""Probe Qdrant collections for license + license_rule payload coverage.
Samples 500 points per collection and reports how many have neither
field populated.
"""
out: dict = {"collections": {}}
base = f"http://{host}:{port}"
try:
with urllib_request.urlopen(f"{base}/collections", timeout=10) as r:
colls = json.loads(r.read()).get("result", {}).get("collections", [])
except Exception as e:
return {"error": str(e)}
for c in colls:
name = c["name"]
if "compliance" not in name and "atomic_controls" not in name:
continue
payload = {"limit": 500, "with_payload": True, "with_vector": False}
req = urllib_request.Request(
f"{base}/collections/{name}/points/scroll",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
try:
with urllib_request.urlopen(req, timeout=15) as r:
points = json.loads(r.read()).get("result", {}).get("points", [])
except Exception as e:
out["collections"][name] = {"error": str(e)}
continue
sampled = len(points)
both_set = 0
only_license = 0
only_rule = 0
neither = 0
for p in points:
pl = p.get("payload", {}) or {}
has_lic = bool(pl.get("license"))
has_rule = pl.get("license_rule") is not None
if has_lic and has_rule:
both_set += 1
elif has_lic:
only_license += 1
elif has_rule:
only_rule += 1
else:
neither += 1
out["collections"][name] = {
"sampled": sampled,
"both_set": both_set,
"only_license_field": only_license,
"only_license_rule_field": only_rule,
"neither_set": neither,
"neither_pct": round(neither / sampled * 100, 1) if sampled else 0,
}
return out
def main() -> int:
args = parse_args()
try:
import psycopg2
except ImportError:
print("error: psycopg2 not installed (pip install psycopg2-binary)", file=sys.stderr)
return 2
conn = psycopg2.connect(
host=args.db_host,
port=args.db_port,
user=args.db_user,
dbname=args.db_name,
password=args.db_password or None,
)
try:
registry = audit_registry(conn)
controls = audit_atomic_controls(conn)
finally:
conn.close()
qdrant: Optional[dict] = None
if args.check_qdrant:
qdrant = audit_qdrant(args.qdrant_host, args.qdrant_port)
result = {"registry": registry, "atomic_controls": controls, "qdrant": qdrant}
if args.json:
print(json.dumps(result, indent=2, default=str))
return 0
print("=" * 60)
print(" Audit — License Classification")
print("=" * 60)
print()
print(f"## regulation_registry ({registry['total']} rows)")
print(f" By rule: {registry['by_rule']}")
print(f" Missing license_type: {registry['missing_license_type']}")
print()
print("## atomic_controls")
for tbl, info in controls.get("tables", {}).items():
if info.get("skipped"):
print(f" {tbl}: SKIPPED ({info['reason']})")
continue
print(f" {tbl}: {info['total']} rows")
print(f" by_rule={info['by_rule']}")
print(f" missing_license_rule={info['missing_license_rule']}")
print()
if qdrant:
print("## qdrant")
for name, info in qdrant.get("collections", {}).items():
if "error" in info:
print(f" {name}: ERROR {info['error']}")
continue
print(
f" {name:30} sampled={info['sampled']:4} "
f"both={info['both_set']:4} "
f"neither={info['neither_set']:4} ({info['neither_pct']}%)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())