6ca4dcde3e
CI / detect-changes (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 31s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Use-Case-Zuordnung jetzt DETERMINISTISCH aus der Quell-Regulierung (statt LLM/scope-category): control_parent_links.source_regulation (79% der 13.588 MCs) -> Keyword-Mapper -> ~30 Domaenen-Use-Cases. 117/117 Regulierungen gemappt (dse 44 Leitlinien, code_security 10, network_security 9, ...). - use_case_registry.py: 37 Use Cases (Doku + Security + Produkt/Sektor: cra/ai_act/mica/mdr/maschinen/batterie/ehds/dsa/dma/psd2/aml/lksg/...) + use_case_for_regulation() Keyword-Mapper (117 Regulierungen abgedeckt). - migration 150: is_primary auf mc_use_case_mappings + neue mc_regulations (MC->source_regulation, n:m, is_primary) als feine Filter-Dimension. - classify_mc_use_cases.py: source_regulation-getriebener Seed; Primaerzweck = dominante Regulierung, Mehrfachzwecke = weitere. PYTHONPATH-Bootstrap. - 18 Registry-Tests gruen (Mapper-Abdeckung + Konsistenz-Invariante). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
150 lines
5.8 KiB
Python
150 lines
5.8 KiB
Python
"""Klassifiziert Master Controls deterministisch auf Use Cases (n:m) +
|
|
Quell-Regulierung (n:m) + Verifikations-Methode.
|
|
|
|
DETERMINISTISCH (kein LLM): Die Zuordnung kommt aus der Quell-Regulierung
|
|
jedes Controls — Lineage master_controls -> master_control_members ->
|
|
control_parent_links.source_regulation. 117 Regulierungen -> Keyword-Mapper
|
|
(use_case_registry.use_case_for_regulation) -> ~30 Domaenen-Use-Cases.
|
|
Primaerzweck = dominante Quell-Regulierung (meiste Member); Mehrfachzwecke =
|
|
die weiteren. LLM-Stufe (spaeter) nur Fallback fuer MCs ohne source_regulation.
|
|
|
|
Lauf im Container:
|
|
docker exec bp-compliance-backend \
|
|
python /app/scripts/classify_mc_use_cases.py [--limit N]
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
# /app auf den Pfad, damit `compliance...` als Standalone-Script importierbar
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import asyncpg # noqa: E402
|
|
|
|
from compliance.data import use_case_registry as reg # noqa: E402
|
|
|
|
_REG_SQL = """
|
|
SELECT mc.id AS mc_uuid, mc.master_control_id,
|
|
cpl.source_regulation AS reg, count(*) AS cnt
|
|
FROM compliance.master_controls mc
|
|
JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id
|
|
JOIN compliance.control_parent_links cpl ON cpl.control_uuid = mcm.control_uuid
|
|
WHERE cpl.source_regulation IS NOT NULL AND cpl.source_regulation <> ''
|
|
GROUP BY mc.id, mc.master_control_id, cpl.source_regulation
|
|
"""
|
|
|
|
_VERIF_SQL = """
|
|
SELECT mc.id AS mc_uuid,
|
|
array_agg(DISTINCT cc.verification_method) AS vmethods,
|
|
array_agg(DISTINCT cc.evidence_type) AS etypes
|
|
FROM compliance.master_controls mc
|
|
JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id
|
|
JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid
|
|
GROUP BY mc.id
|
|
"""
|
|
|
|
|
|
async def run_seed(conn, limit: int = 0) -> dict:
|
|
"""Deterministischer Seed aus der Quell-Regulierung. Ersetzt die
|
|
bisherigen Seed-Zeilen; 'manual'-Korrekturen bleiben unangetastet."""
|
|
await conn.execute(
|
|
"DELETE FROM compliance.mc_use_case_mappings WHERE method='seed'")
|
|
await conn.execute(
|
|
"DELETE FROM compliance.mc_verification WHERE method='seed'")
|
|
await conn.execute(
|
|
"DELETE FROM compliance.mc_regulations WHERE method='lineage'")
|
|
|
|
by_mc: dict = {}
|
|
for r in await conn.fetch(_REG_SQL):
|
|
e = by_mc.setdefault(
|
|
r["mc_uuid"],
|
|
{"mc_id": r["master_control_id"], "regs": {}})
|
|
e["regs"][r["reg"]] = r["cnt"]
|
|
if limit > 0:
|
|
by_mc = dict(list(by_mc.items())[:limit])
|
|
|
|
verif: dict = {}
|
|
for r in await conn.fetch(_VERIF_SQL):
|
|
_ucs, m = reg.seed_classify(vmethods=r["vmethods"], etypes=r["etypes"])
|
|
if m:
|
|
verif[r["mc_uuid"]] = m
|
|
|
|
n_reg = n_uc = n_mc = n_v = 0
|
|
for mc_uuid, e in by_mc.items():
|
|
mc_id, regs = e["mc_id"], e["regs"]
|
|
if not regs:
|
|
continue
|
|
primary_reg = max(regs, key=regs.get)
|
|
uc_primary = reg.use_case_for_regulation(primary_reg)
|
|
ucs: dict = {}
|
|
for rg, cnt in regs.items():
|
|
await conn.execute(
|
|
"""INSERT INTO compliance.mc_regulations
|
|
(master_control_uuid, master_control_id, source_regulation,
|
|
is_primary, member_count)
|
|
VALUES ($1,$2,$3,$4,$5)
|
|
ON CONFLICT (master_control_uuid, source_regulation)
|
|
DO NOTHING""",
|
|
mc_uuid, mc_id, rg[:160], rg == primary_reg, cnt)
|
|
n_reg += 1
|
|
uc = reg.use_case_for_regulation(rg)
|
|
if uc:
|
|
ucs[uc] = ucs.get(uc, False) or (uc == uc_primary)
|
|
for uc, is_prim in ucs.items():
|
|
await conn.execute(
|
|
"""INSERT INTO compliance.mc_use_case_mappings
|
|
(master_control_uuid, master_control_id, use_case,
|
|
method, confidence, rationale, is_primary)
|
|
VALUES ($1,$2,$3,'seed',0.85,'source_regulation',$4)
|
|
ON CONFLICT (master_control_uuid, use_case)
|
|
DO UPDATE SET is_primary =
|
|
mc_use_case_mappings.is_primary OR EXCLUDED.is_primary
|
|
WHERE mc_use_case_mappings.method <> 'manual'""",
|
|
mc_uuid, mc_id, uc, is_prim)
|
|
n_uc += 1
|
|
n_mc += 1
|
|
m = verif.get(mc_uuid)
|
|
if m:
|
|
await conn.execute(
|
|
"""INSERT INTO compliance.mc_verification
|
|
(master_control_uuid, master_control_id,
|
|
verification_method, method, confidence, rationale)
|
|
VALUES ($1,$2,$3,'seed',0.7,'member evidence_type')
|
|
ON CONFLICT (master_control_uuid) DO NOTHING""",
|
|
mc_uuid, mc_id, m)
|
|
n_v += 1
|
|
|
|
total = await conn.fetchval(
|
|
"SELECT count(*) FROM compliance.mc_use_case_mappings")
|
|
await conn.execute(
|
|
"""INSERT INTO compliance.mc_use_case_sync_state
|
|
(registry_hash, stage, total_mappings, mcs_classified)
|
|
VALUES ($1,'seed_source_regulation',$2,$3)""",
|
|
reg.registry_hash(), total, n_mc)
|
|
return {"mcs_mapped": n_mc, "regulation_rows": n_reg,
|
|
"use_case_rows": n_uc, "verification_rows": n_v}
|
|
|
|
|
|
async def _main() -> None:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--limit", type=int, default=0)
|
|
ap.add_argument("--with-llm", action="store_true",
|
|
help="Fallback fuer MCs ohne source_regulation (TODO)")
|
|
args = ap.parse_args()
|
|
if args.with_llm:
|
|
raise SystemExit("LLM-Fallback (Phase 3) noch nicht implementiert.")
|
|
conn = await asyncpg.connect(os.getenv("DATABASE_URL"))
|
|
try:
|
|
stats = await run_seed(conn, args.limit)
|
|
finally:
|
|
await conn.close()
|
|
print("Seed (source_regulation) fertig:", stats)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(_main())
|