Files
breakpilot-compliance/backend-compliance/scripts/classify_mc_use_cases.py
T
Benjamin Admin dca7740d8c
CI / detect-changes (push) Successful in 9s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 30s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
feat(use-cases): Fundament — Use-Case-Register + n:m-Mapping-Migration + Seed [migration-approved]
Layer 1+2 (Fundament) des Use-Case-Mapping-Systems (Plan genehmigt):
- compliance/data/use_case_registry.py: Single Source of Truth fuer 14 Use
  Cases x Verifikations-Methoden (Doku/Source-Code/Netzwerk/IT-Prozess).
  Erweiterbar (neuer UC = 1 Eintrag). code_security/network_security als
  Uebergabe-Punkte fuers Security-Team (SBOM/SAST/DAST/Pentest).
- migrations/149_mc_use_case_mappings.sql: add-only n:m mc_use_case_mappings
  + mc_verification (1/MC) + sync_state. use_case ohne SQL-CHECK (erweiterbar).
- scripts/classify_mc_use_cases.py: Seed-Stufe (deterministisch, kein LLM).
  LLM-Stufe (Phase 3) folgt.
- Tests: test_use_case_registry.py (14 gruen).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 15:30:34 +02:00

99 lines
3.6 KiB
Python

"""Klassifiziert Master Controls auf Use Cases (n:m) + Verifikations-Methode.
Stufe 1 — Seed (kein LLM, gratis): aus vorhandenen Member-Signalen
(canonical_controls.scope_doc_type / .category / .verification_method /
.evidence_type) via `use_case_registry.seed_classify`.
Stufe 2 — LLM (Phase 3): Multi-Label gegen die Registry-Taxonomie. [TODO]
Lauf im Container:
docker exec bp-compliance-backend \
python /app/scripts/classify_mc_use_cases.py [--limit N]
"""
from __future__ import annotations
import argparse
import asyncio
import os
import asyncpg
from compliance.data import use_case_registry as reg
_AGG_SQL = """
SELECT mc.id AS mc_uuid, mc.master_control_id,
array_agg(DISTINCT cc.scope_doc_type) AS scopes,
array_agg(DISTINCT cc.category) AS categories,
array_agg(DISTINCT cc.verification_method) AS vmethods,
array_agg(DISTINCT cc.evidence_type) AS etypes
FROM compliance.master_controls mc
JOIN compliance.master_control_members mcm
ON mcm.master_control_uuid = mc.id
JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid
GROUP BY mc.id, mc.master_control_id
"""
async def run_seed(conn, limit: int = 0) -> dict:
"""Deterministischer Seed → mc_use_case_mappings + mc_verification.
Idempotent (ON CONFLICT DO NOTHING); ueberschreibt 'manual' nie."""
sql = _AGG_SQL + (f" LIMIT {limit}" if limit > 0 else "")
rows = await conn.fetch(sql)
n_mc_with_uc = n_uc_rows = n_verif = 0
for r in rows:
ucs, method = reg.seed_classify(
r["scopes"], r["categories"], r["vmethods"], r["etypes"],
)
for uc in ucs:
await conn.execute(
"""INSERT INTO compliance.mc_use_case_mappings
(master_control_uuid, master_control_id, use_case,
method, confidence, rationale)
VALUES ($1,$2,$3,'seed',0.6,'deterministic seed')
ON CONFLICT (master_control_uuid, use_case) DO NOTHING""",
r["mc_uuid"], r["master_control_id"], uc,
)
n_uc_rows += 1
if ucs:
n_mc_with_uc += 1
if method:
await conn.execute(
"""INSERT INTO compliance.mc_verification
(master_control_uuid, master_control_id,
verification_method, method, confidence, rationale)
VALUES ($1,$2,$3,'seed',0.6,'deterministic seed')
ON CONFLICT (master_control_uuid) DO NOTHING""",
r["mc_uuid"], r["master_control_id"], method,
)
n_verif += 1
total = await conn.fetchval(
"SELECT count(*) FROM compliance.mc_use_case_mappings")
await conn.execute(
"""INSERT INTO compliance.mc_use_case_sync_state
(registry_hash, stage, total_mappings, mcs_classified)
VALUES ($1,'seed',$2,$3)""",
reg.registry_hash(), total, n_mc_with_uc,
)
return {"mcs": len(rows), "mcs_with_use_case": n_mc_with_uc,
"use_case_rows": n_uc_rows, "verification_rows": n_verif}
async def _main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--with-llm", action="store_true",
help="Phase 3 — noch nicht implementiert")
args = ap.parse_args()
if args.with_llm:
raise SystemExit("LLM-Stufe (Phase 3) noch nicht implementiert.")
conn = await asyncpg.connect(os.getenv("DATABASE_URL"))
try:
stats = await run_seed(conn, args.limit)
finally:
await conn.close()
print("Seed fertig:", stats)
if __name__ == "__main__":
asyncio.run(_main())