"""Klassifiziert Master Controls deterministisch auf Use Cases (n:m) + Quell-Regulierung (n:m) + Verifikations-Methode. DETERMINISTISCH (kein LLM): Die Zuordnung kommt aus der Quell-Regulierung jedes Controls — Lineage master_controls -> master_control_members -> control_parent_links.source_regulation. 117 Regulierungen -> Keyword-Mapper (use_case_registry.use_case_for_regulation) -> ~30 Domaenen-Use-Cases. Primaerzweck = dominante Quell-Regulierung (meiste Member); Mehrfachzwecke = die weiteren. LLM-Stufe (spaeter) nur Fallback fuer MCs ohne source_regulation. Lauf im Container: docker exec bp-compliance-backend \ python /app/scripts/classify_mc_use_cases.py [--limit N] """ from __future__ import annotations import argparse import asyncio import os import sys # /app auf den Pfad, damit `compliance...` als Standalone-Script importierbar sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncpg # noqa: E402 from compliance.data import use_case_registry as reg # noqa: E402 _REG_SQL = """ SELECT mc.id AS mc_uuid, mc.master_control_id, cpl.source_regulation AS reg, count(*) AS cnt FROM compliance.master_controls mc JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id JOIN compliance.control_parent_links cpl ON cpl.control_uuid = mcm.control_uuid WHERE cpl.source_regulation IS NOT NULL AND cpl.source_regulation <> '' GROUP BY mc.id, mc.master_control_id, cpl.source_regulation """ _VERIF_SQL = """ SELECT mc.id AS mc_uuid, array_agg(DISTINCT cc.verification_method) AS vmethods, array_agg(DISTINCT cc.evidence_type) AS etypes FROM compliance.master_controls mc JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid GROUP BY mc.id """ async def run_seed(conn, limit: int = 0) -> dict: """Deterministischer Seed aus der Quell-Regulierung. Ersetzt die bisherigen Seed-Zeilen; 'manual'-Korrekturen bleiben unangetastet.""" await conn.execute( "DELETE FROM compliance.mc_use_case_mappings WHERE method='seed'") await conn.execute( "DELETE FROM compliance.mc_verification WHERE method='seed'") await conn.execute( "DELETE FROM compliance.mc_regulations WHERE method='lineage'") by_mc: dict = {} for r in await conn.fetch(_REG_SQL): e = by_mc.setdefault( r["mc_uuid"], {"mc_id": r["master_control_id"], "regs": {}}) e["regs"][r["reg"]] = r["cnt"] if limit > 0: by_mc = dict(list(by_mc.items())[:limit]) verif: dict = {} for r in await conn.fetch(_VERIF_SQL): _ucs, m = reg.seed_classify(vmethods=r["vmethods"], etypes=r["etypes"]) if m: verif[r["mc_uuid"]] = m n_reg = n_uc = n_mc = n_v = 0 for mc_uuid, e in by_mc.items(): mc_id, regs = e["mc_id"], e["regs"] if not regs: continue primary_reg = max(regs, key=regs.get) uc_primary = reg.use_case_for_regulation(primary_reg) ucs: dict = {} for rg, cnt in regs.items(): await conn.execute( """INSERT INTO compliance.mc_regulations (master_control_uuid, master_control_id, source_regulation, is_primary, member_count) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (master_control_uuid, source_regulation) DO NOTHING""", mc_uuid, mc_id, rg[:160], rg == primary_reg, cnt) n_reg += 1 uc = reg.use_case_for_regulation(rg) if uc: ucs[uc] = ucs.get(uc, False) or (uc == uc_primary) for uc, is_prim in ucs.items(): await conn.execute( """INSERT INTO compliance.mc_use_case_mappings (master_control_uuid, master_control_id, use_case, method, confidence, rationale, is_primary) VALUES ($1,$2,$3,'seed',0.85,'source_regulation',$4) ON CONFLICT (master_control_uuid, use_case) DO UPDATE SET is_primary = mc_use_case_mappings.is_primary OR EXCLUDED.is_primary WHERE mc_use_case_mappings.method <> 'manual'""", mc_uuid, mc_id, uc, is_prim) n_uc += 1 n_mc += 1 # Verifikations-Methode: Member-evidence oder (Fallback) aus dem # Primaer-Use-Case ableiten (Member tragen oft kein evidence_type). m = verif.get(mc_uuid) or reg.primary_verification_method(uc_primary) if m: await conn.execute( """INSERT INTO compliance.mc_verification (master_control_uuid, master_control_id, verification_method, method, confidence, rationale) VALUES ($1,$2,$3,'seed',0.7,'member evidence_type') ON CONFLICT (master_control_uuid) DO NOTHING""", mc_uuid, mc_id, m) n_v += 1 total = await conn.fetchval( "SELECT count(*) FROM compliance.mc_use_case_mappings") await conn.execute( """INSERT INTO compliance.mc_use_case_sync_state (registry_hash, stage, total_mappings, mcs_classified) VALUES ($1,'seed_regulation',$2,$3)""", reg.registry_hash(), total, n_mc) return {"mcs_mapped": n_mc, "regulation_rows": n_reg, "use_case_rows": n_uc, "verification_rows": n_v} async def _main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--limit", type=int, default=0) ap.add_argument("--with-llm", action="store_true", help="Fallback fuer MCs ohne source_regulation (TODO)") args = ap.parse_args() if args.with_llm: raise SystemExit("LLM-Fallback (Phase 3) noch nicht implementiert.") conn = await asyncpg.connect(os.getenv("DATABASE_URL")) try: stats = await run_seed(conn, args.limit) finally: await conn.close() print("Seed (source_regulation) fertig:", stats) if __name__ == "__main__": asyncio.run(_main())