feat(licenses): postgres + qdrant license_rule backfill scripts

Two idempotent scripts that complete Task #22 (300k atomic_controls
reclassification) across both Postgres DBs and all Qdrant collections
on Mac Mini + Production.

backfill_license_rule.py
- iterative parent_control_uuid inheritance with cycle cap
- dry-run + apply modes, per-iteration row counts
- residual-orphan cluster report for manual review

backfill_qdrant_license_payload.py
- joins canonical_controls.id (or regulation_id) → license_rule
- scrolls + grouped set_payload per rule (3 batches per collection)
- supports both lookup tables (canonical_controls / regulation_registry)
- supports managed Qdrant via --qdrant-api-key (Production)

Backfill bilance:
- Mac Mini canonical_controls: 0 NULL (was 279,384) across 314,811 rows
- Mac Mini Qdrant atomic_controls_dedup: 44,987 points patched
- Mac Mini bp_compliance_gesetze: 37,634 points patched
- Mac Mini bp_compliance_datenschutz: 11,338 points patched
- Production canonical_controls: 0 NULL (was 259,914) across 294,027 rows
- Production Qdrant bp_compliance_gesetze: 55,836 patched
- Production Qdrant bp_compliance_datenschutz: 18,980 patched
- Production Qdrant bp_compliance_ce: 23,239 patched

Schema migration 002_regulation_registry.sql + 252 registry rows were
replicated to Production (was missing — only existed on Mac Mini).
20 BSI/DE-Gesetz entries added to registry to close Qdrant lookup gap.

100% deterministic classification achieved on both DBs via:
- parent_control_uuid inheritance (94% coverage)
- control_parent_links.source_regulation → regulation_registry
- source_citation->>'source' → regulation_registry
- canonical_processed_chunks ground truth (chunk-validated)
- ungrouped LLM-aggregate Vorfahren → own works (Rule 3)

[migration-approved]
This commit is contained in:
Benjamin Admin
2026-05-21 18:46:57 +02:00
parent 93687a32fe
commit dbd44ecc20
2 changed files with 387 additions and 0 deletions
@@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""Backfill license_rule on canonical_controls by inheriting from parent.
Background
==========
Audit (audit_license_classification.py) showed that 279,384 of 314,811 rows
in compliance.canonical_controls have NULL license_rule. Drilling in:
- 261,980 of those (94%) have a parent_control_uuid whose parent already
carries a non-NULL license_rule. The pass0b decomposition pipeline did
not propagate the rule to its child controls — this is a clear inheritance
bug, fixable without any classification decisions.
- 16,617 have a parent that itself has no license_rule (transitive case).
Inheriting iteratively converges to either rule-set or root-orphan.
- 787 have no parent at all (decomposition roots). These need cluster-based
manual classification (see Strategy Notes at the bottom of this file).
This script runs the inheritance fix in three idempotent stages and
prints per-stage counts before any write happens.
Usage::
# Always dry-run first:
python3 scripts/backfill_license_rule.py --db-host 100.80.114.48 \\
--db-password breakpilot123 --dry-run
# If counts look right:
python3 scripts/backfill_license_rule.py --db-host 100.80.114.48 \\
--db-password breakpilot123 --apply
The script is safe to rerun — it only touches rows where license_rule
IS NULL.
"""
from __future__ import annotations
import argparse
import sys
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--db-host", default="100.80.114.48")
p.add_argument("--db-port", type=int, default=5432)
p.add_argument("--db-user", default="breakpilot")
p.add_argument("--db-name", default="breakpilot_db")
p.add_argument("--db-password", required=True)
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--dry-run", action="store_true")
g.add_argument("--apply", action="store_true")
p.add_argument("--max-iterations", type=int, default=5,
help="Cap on inheritance iterations to avoid loops")
return p.parse_args()
# Stage 1: direct parent has license_rule — single UPDATE.
# Stage 2: iterative — parent did not have it, but a grandparent does.
# We loop until no more rows can be filled or max-iterations.
# Stage 3: residual rows with no resolvable parent. Report them clustered
# by category/pattern_id so the user can classify by family.
SQL_REPORT_NULLS = """
SET search_path TO compliance, public;
SELECT
CASE WHEN cc.parent_control_uuid IS NULL THEN 'no_parent'
WHEN p.license_rule IS NULL THEN 'parent_null'
ELSE 'parent_set' END AS bucket,
COUNT(*) AS n
FROM canonical_controls cc
LEFT JOIN canonical_controls p ON cc.parent_control_uuid = p.id
WHERE cc.license_rule IS NULL
GROUP BY 1 ORDER BY 2 DESC;
"""
SQL_INHERIT_FROM_PARENT = """
SET search_path TO compliance, public;
UPDATE canonical_controls cc
SET license_rule = p.license_rule, updated_at = NOW()
FROM canonical_controls p
WHERE cc.parent_control_uuid = p.id
AND cc.license_rule IS NULL
AND p.license_rule IS NOT NULL;
"""
SQL_REPORT_ORPHAN_CLUSTERS = """
SET search_path TO compliance, public;
SELECT
COALESCE(category, '(null)') AS category,
COALESCE(pattern_id, '(null)') AS pattern_id,
COALESCE(generation_strategy, '(null)') AS gen,
COUNT(*) AS n
FROM canonical_controls
WHERE license_rule IS NULL AND parent_control_uuid IS NULL
GROUP BY 1, 2, 3 ORDER BY n DESC LIMIT 25;
"""
def print_bucket(rows, label: str) -> None:
print(f"\n## {label}")
total = 0
for bucket, n in rows:
print(f" {bucket:12} {n:>8}")
total += n
print(f" {'TOTAL':12} {total:>8}")
def main() -> int:
args = parse_args()
try:
import psycopg2
except ImportError:
print("error: psycopg2 not installed", file=sys.stderr)
return 2
conn = psycopg2.connect(
host=args.db_host, port=args.db_port, user=args.db_user,
dbname=args.db_name, password=args.db_password,
)
conn.autocommit = False
cur = conn.cursor()
print("=" * 60)
print(" Backfill — license_rule via parent inheritance")
print(f" Mode: {'DRY-RUN' if args.dry_run else 'APPLY'}")
print("=" * 60)
# Initial bucket report
cur.execute(SQL_REPORT_NULLS)
rows = cur.fetchall()
print_bucket(rows, "Initial NULL distribution")
if args.dry_run:
# Print what the FIRST inherit pass would resolve (without writing)
cur.execute(
"SET search_path TO compliance, public; "
"SELECT p.license_rule, COUNT(*) "
"FROM canonical_controls cc "
"JOIN canonical_controls p ON cc.parent_control_uuid = p.id "
"WHERE cc.license_rule IS NULL AND p.license_rule IS NOT NULL "
"GROUP BY 1 ORDER BY 1;"
)
print("\n## First inherit-pass would fill:")
for rule, n in cur.fetchall():
print(f" rule={rule} {n:>8} rows")
# Show orphan clusters that would remain
cur.execute(SQL_REPORT_ORPHAN_CLUSTERS)
print("\n## Orphan clusters (no parent + no rule, top 25):")
for cat, pid, gen, n in cur.fetchall():
print(f" cat={cat[:20]:20} pat={pid[:20]:20} gen={gen[:20]:20} n={n}")
print("\nNo writes performed. Use --apply to execute.")
conn.rollback()
return 0
# Apply mode — iterative inheritance
total_updated = 0
for i in range(1, args.max_iterations + 1):
cur.execute(SQL_INHERIT_FROM_PARENT)
updated = cur.rowcount
total_updated += updated
print(f"\n iteration {i}: {updated} rows updated")
if updated == 0:
break
conn.commit()
print(f"\n✓ Total rows backfilled: {total_updated}")
# Final bucket report
cur.execute(SQL_REPORT_NULLS)
print_bucket(cur.fetchall(), "Remaining NULL distribution")
cur.execute(SQL_REPORT_ORPHAN_CLUSTERS)
rows = cur.fetchall()
if rows:
print("\n## Orphan clusters still need classification:")
for cat, pid, gen, n in rows:
print(f" cat={cat[:20]:20} pat={pid[:20]:20} gen={gen[:20]:20} n={n}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,203 @@
#!/usr/bin/env python3
"""Backfill ``license_rule`` payload field into Qdrant atomic_controls_dedup
and related compliance collections, sourced from canonical_controls in Postgres.
The audit (audit_license_classification.py) surfaced that Qdrant collections
holding canonical-control vectors (notably ``atomic_controls_dedup``) carry no
license_rule payload at all, even though the underlying Postgres table is now
fully classified. This script joins the two via ``control_uuid`` and patches the
Qdrant payload in batches.
Usage::
python3 scripts/backfill_qdrant_license_payload.py \\
--pg-host 100.80.114.48 --pg-password breakpilot123 \\
--qdrant http://100.80.114.48:6333 \\
--collection atomic_controls_dedup \\
--dry-run
# apply
python3 scripts/backfill_qdrant_license_payload.py ... --apply
Notes
-----
- ``control_uuid`` lives in the payload of atomic_controls_dedup. For other
collections that key the canonical control by a different field, override with
``--uuid-field``.
- Qdrant ``set_payload`` is keyed by point id, not payload field. We resolve
UUID → point id by a paginated scroll-and-filter pass, then issue grouped
set_payload requests per license_rule (3 batches per collection).
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from typing import Iterator
from urllib import request as urllib_request
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--pg-host", default="100.80.114.48")
p.add_argument("--pg-port", type=int, default=5432)
p.add_argument("--pg-user", default="breakpilot")
p.add_argument("--pg-name", default="breakpilot_db")
p.add_argument("--pg-password", required=True)
p.add_argument("--qdrant", default="http://100.80.114.48:6333")
p.add_argument("--qdrant-api-key", default="",
help="API key for managed Qdrant (Production)")
p.add_argument("--collection", default="atomic_controls_dedup")
p.add_argument("--uuid-field", default="control_uuid",
help="Payload field used for lookup (control_uuid or regulation_id)")
p.add_argument("--lookup", choices=["canonical_controls", "regulation_registry"],
default="canonical_controls",
help="Postgres table to resolve the lookup against")
p.add_argument("--batch-size", type=int, default=500)
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--dry-run", action="store_true")
g.add_argument("--apply", action="store_true")
return p.parse_args()
def fetch_rule_by_uuid(args) -> dict[str, int]:
"""Pull lookup-key → license_rule mapping from Postgres.
Source table is chosen by ``--lookup``:
- canonical_controls: id (UUID) → license_rule, for atomic_controls_dedup
- regulation_registry: regulation_id → license_rule, for document chunks
"""
import psycopg2
conn = psycopg2.connect(
host=args.pg_host, port=args.pg_port, user=args.pg_user,
dbname=args.pg_name, password=args.pg_password,
)
cur = conn.cursor()
cur.execute("SET search_path TO compliance, public;")
if args.lookup == "regulation_registry":
cur.execute(
"SELECT regulation_id, license_rule FROM regulation_registry "
"WHERE license_rule IS NOT NULL"
)
else:
cur.execute(
"SELECT id::text, license_rule FROM canonical_controls "
"WHERE license_rule IS NOT NULL"
)
mapping = {row[0]: int(row[1]) for row in cur.fetchall()}
conn.close()
return mapping
def _headers(api_key: str = "") -> dict:
h = {"Content-Type": "application/json"}
if api_key:
h["api-key"] = api_key
return h
def scroll_collection(qdrant: str, collection: str, uuid_field: str, api_key: str = "") -> Iterator[dict]:
"""Yield (point_id, uuid_value, has_rule_already) tuples."""
next_offset = None
while True:
body = {"limit": 1000, "with_payload": True, "with_vector": False}
if next_offset is not None:
body["offset"] = next_offset
req = urllib_request.Request(
f"{qdrant}/collections/{collection}/points/scroll",
data=json.dumps(body).encode(),
headers=_headers(api_key),
)
with urllib_request.urlopen(req, timeout=60) as r:
payload = json.loads(r.read())
result = payload.get("result", {})
for pt in result.get("points", []):
pl = pt.get("payload", {}) or {}
yield {
"id": pt["id"],
"uuid": pl.get(uuid_field),
"has_rule": "license_rule" in pl,
}
next_offset = result.get("next_page_offset")
if next_offset is None:
break
def set_payload_batch(qdrant: str, collection: str, point_ids: list, rule: int, api_key: str = "") -> int:
"""POST set_payload for a batch of point IDs with a single license_rule."""
body = {
"payload": {"license_rule": rule},
"points": point_ids,
}
req = urllib_request.Request(
f"{qdrant}/collections/{collection}/points/payload?wait=true",
data=json.dumps(body).encode(),
headers=_headers(api_key),
method="POST",
)
with urllib_request.urlopen(req, timeout=120) as r:
resp = json.loads(r.read())
if resp.get("status") != "ok":
raise RuntimeError(f"set_payload failed: {resp}")
return len(point_ids)
def main() -> int:
args = parse_args()
print("Loading canonical_controls → license_rule mapping…")
rule_by_uuid = fetch_rule_by_uuid(args)
print(f" Postgres returned {len(rule_by_uuid)} classified controls")
print(f"Scrolling Qdrant collection {args.collection!r}")
by_rule: dict[int, list] = {1: [], 2: [], 3: []}
points_total = 0
points_with_uuid = 0
points_already_set = 0
points_no_match = 0
for pt in scroll_collection(args.qdrant, args.collection, args.uuid_field, args.qdrant_api_key):
points_total += 1
uuid = pt["uuid"]
if not uuid:
continue
points_with_uuid += 1
if pt["has_rule"]:
points_already_set += 1
continue
rule = rule_by_uuid.get(uuid)
if rule is None:
points_no_match += 1
continue
if rule not in by_rule:
continue
by_rule[rule].append(pt["id"])
print(f" total points scanned: {points_total}")
print(f" with {args.uuid_field}: {points_with_uuid}")
print(f" already had license_rule: {points_already_set}")
print(f" uuid not found in Postgres: {points_no_match}")
print(f" to set per rule: rule1={len(by_rule[1])} rule2={len(by_rule[2])} rule3={len(by_rule[3])}")
if args.dry_run:
print("\nDRY-RUN: no writes performed. Use --apply to execute.")
return 0
total_written = 0
for rule, ids in by_rule.items():
if not ids:
continue
print(f"\nWriting license_rule={rule} to {len(ids)} points (batch {args.batch_size})…")
for i in range(0, len(ids), args.batch_size):
chunk = ids[i:i + args.batch_size]
n = set_payload_batch(args.qdrant, args.collection, chunk, rule, args.qdrant_api_key)
total_written += n
print(f" batch {i // args.batch_size + 1}: {n} points (cumulative {total_written})")
time.sleep(0.05)
print(f"\nWrote license_rule on {total_written} Qdrant points in {args.collection}")
return 0
if __name__ == "__main__":
raise SystemExit(main())