Files
breakpilot-compliance/backend-compliance/scripts/classify_mc_use_cases.py
T
Benjamin Admin ef746ea8f0
CI / detect-changes (push) Successful in 6s
CI / branch-name (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 30s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / nodejs-build (push) Has been skipped
fix(use-cases): Verifikations-Methode aus Primaer-Use-Case ableiten (Fallback)
Member-canonical_controls tragen meist kein evidence_type/verification_method
(wie schon source_citation). primary_verification_method() leitet die Methode
deterministisch aus dem Primaer-Use-Case ab (impressum->document,
code_security->source_code, ...). Populiert mc_verification beim naechsten Seed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 17:01:42 +02:00

152 lines
6.0 KiB
Python

"""Klassifiziert Master Controls deterministisch auf Use Cases (n:m) +
Quell-Regulierung (n:m) + Verifikations-Methode.
DETERMINISTISCH (kein LLM): Die Zuordnung kommt aus der Quell-Regulierung
jedes Controls — Lineage master_controls -> master_control_members ->
control_parent_links.source_regulation. 117 Regulierungen -> Keyword-Mapper
(use_case_registry.use_case_for_regulation) -> ~30 Domaenen-Use-Cases.
Primaerzweck = dominante Quell-Regulierung (meiste Member); Mehrfachzwecke =
die weiteren. LLM-Stufe (spaeter) nur Fallback fuer MCs ohne source_regulation.
Lauf im Container:
docker exec bp-compliance-backend \
python /app/scripts/classify_mc_use_cases.py [--limit N]
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
# /app auf den Pfad, damit `compliance...` als Standalone-Script importierbar
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncpg # noqa: E402
from compliance.data import use_case_registry as reg # noqa: E402
_REG_SQL = """
SELECT mc.id AS mc_uuid, mc.master_control_id,
cpl.source_regulation AS reg, count(*) AS cnt
FROM compliance.master_controls mc
JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id
JOIN compliance.control_parent_links cpl ON cpl.control_uuid = mcm.control_uuid
WHERE cpl.source_regulation IS NOT NULL AND cpl.source_regulation <> ''
GROUP BY mc.id, mc.master_control_id, cpl.source_regulation
"""
_VERIF_SQL = """
SELECT mc.id AS mc_uuid,
array_agg(DISTINCT cc.verification_method) AS vmethods,
array_agg(DISTINCT cc.evidence_type) AS etypes
FROM compliance.master_controls mc
JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id
JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid
GROUP BY mc.id
"""
async def run_seed(conn, limit: int = 0) -> dict:
"""Deterministischer Seed aus der Quell-Regulierung. Ersetzt die
bisherigen Seed-Zeilen; 'manual'-Korrekturen bleiben unangetastet."""
await conn.execute(
"DELETE FROM compliance.mc_use_case_mappings WHERE method='seed'")
await conn.execute(
"DELETE FROM compliance.mc_verification WHERE method='seed'")
await conn.execute(
"DELETE FROM compliance.mc_regulations WHERE method='lineage'")
by_mc: dict = {}
for r in await conn.fetch(_REG_SQL):
e = by_mc.setdefault(
r["mc_uuid"],
{"mc_id": r["master_control_id"], "regs": {}})
e["regs"][r["reg"]] = r["cnt"]
if limit > 0:
by_mc = dict(list(by_mc.items())[:limit])
verif: dict = {}
for r in await conn.fetch(_VERIF_SQL):
_ucs, m = reg.seed_classify(vmethods=r["vmethods"], etypes=r["etypes"])
if m:
verif[r["mc_uuid"]] = m
n_reg = n_uc = n_mc = n_v = 0
for mc_uuid, e in by_mc.items():
mc_id, regs = e["mc_id"], e["regs"]
if not regs:
continue
primary_reg = max(regs, key=regs.get)
uc_primary = reg.use_case_for_regulation(primary_reg)
ucs: dict = {}
for rg, cnt in regs.items():
await conn.execute(
"""INSERT INTO compliance.mc_regulations
(master_control_uuid, master_control_id, source_regulation,
is_primary, member_count)
VALUES ($1,$2,$3,$4,$5)
ON CONFLICT (master_control_uuid, source_regulation)
DO NOTHING""",
mc_uuid, mc_id, rg[:160], rg == primary_reg, cnt)
n_reg += 1
uc = reg.use_case_for_regulation(rg)
if uc:
ucs[uc] = ucs.get(uc, False) or (uc == uc_primary)
for uc, is_prim in ucs.items():
await conn.execute(
"""INSERT INTO compliance.mc_use_case_mappings
(master_control_uuid, master_control_id, use_case,
method, confidence, rationale, is_primary)
VALUES ($1,$2,$3,'seed',0.85,'source_regulation',$4)
ON CONFLICT (master_control_uuid, use_case)
DO UPDATE SET is_primary =
mc_use_case_mappings.is_primary OR EXCLUDED.is_primary
WHERE mc_use_case_mappings.method <> 'manual'""",
mc_uuid, mc_id, uc, is_prim)
n_uc += 1
n_mc += 1
# Verifikations-Methode: Member-evidence oder (Fallback) aus dem
# Primaer-Use-Case ableiten (Member tragen oft kein evidence_type).
m = verif.get(mc_uuid) or reg.primary_verification_method(uc_primary)
if m:
await conn.execute(
"""INSERT INTO compliance.mc_verification
(master_control_uuid, master_control_id,
verification_method, method, confidence, rationale)
VALUES ($1,$2,$3,'seed',0.7,'member evidence_type')
ON CONFLICT (master_control_uuid) DO NOTHING""",
mc_uuid, mc_id, m)
n_v += 1
total = await conn.fetchval(
"SELECT count(*) FROM compliance.mc_use_case_mappings")
await conn.execute(
"""INSERT INTO compliance.mc_use_case_sync_state
(registry_hash, stage, total_mappings, mcs_classified)
VALUES ($1,'seed_source_regulation',$2,$3)""",
reg.registry_hash(), total, n_mc)
return {"mcs_mapped": n_mc, "regulation_rows": n_reg,
"use_case_rows": n_uc, "verification_rows": n_v}
async def _main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--with-llm", action="store_true",
help="Fallback fuer MCs ohne source_regulation (TODO)")
args = ap.parse_args()
if args.with_llm:
raise SystemExit("LLM-Fallback (Phase 3) noch nicht implementiert.")
conn = await asyncpg.connect(os.getenv("DATABASE_URL"))
try:
stats = await run_seed(conn, args.limit)
finally:
await conn.close()
print("Seed (source_regulation) fertig:", stats)
if __name__ == "__main__":
asyncio.run(_main())