diff --git a/backend-compliance/compliance/data/use_case_registry.py b/backend-compliance/compliance/data/use_case_registry.py new file mode 100644 index 00000000..887ba673 --- /dev/null +++ b/backend-compliance/compliance/data/use_case_registry.py @@ -0,0 +1,244 @@ +"""Use-Case-Register — Single Source of Truth fuer Use Cases × Verifikations- +Methoden. + +Jede Master Control wird auf >=1 Use Case (n:m) und genau eine Verifikations- +Methode gemappt. Use Cases sind NICHT nur dokumenten-basiert: >=50% sind +Source-Code / IT-Prozess (Code Security, Network Security, CRA, ISMS, TISAX). + +Neuer Use Case = 1 Eintrag in `_USE_CASES`. Kein DB-Schema-Change noetig. +Dieses Modul ist die kanonische Quelle; die heute verstreuten doc_type-Listen +(rag_document_checker._DOC_TYPE_MAP, legacy_url_discovery._SLUG_FAMILY, +doc_type_classifier, Migration 145) werden spaeter test-gated darauf reduziert. +""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass + + +# Wie wird eine MC geprueft? — kanonische Verifikations-Methoden-Taxonomie. +VERIFICATION_METHODS: tuple[str, ...] = ( + "document", # veroeffentlichtes Dokument lesen (Impressum, DSE, ...) + "source_code", # Repo/Code scannen (SAST, Secrets, Dependencies, Review) + "network", # Netzwerk/Infra scannen (Ports, TLS, Header, Config) + "it_process", # Prozess-/Nachweis-Review (Verfahren, Evidence) + "hybrid", # Kombination mehrerer Methoden + "manual", # menschliche Attestierung +) + +USE_CASE_GROUPS: tuple[str, ...] = ("document", "security", "cross_cutting") + + +@dataclass(frozen=True) +class UseCase: + key: str # stabil, snake_case + label: str + group: str # USE_CASE_GROUPS + regulations: tuple[str, ...] = () + verification_methods: tuple[str, ...] = () + doc_types: tuple[str, ...] = () # operative doc_type-Aliase (Doku-UCs) + scope_tokens: tuple[str, ...] = () # canonical_controls.scope_doc_type + categories: tuple[str, ...] = () # canonical_controls.category + keyword_tokens: tuple[str, ...] = () # canonical_name/title-Stichwoerter + enabled: bool = True + + +_USE_CASES: tuple[UseCase, ...] = ( + # ── Dokument-Use-Cases ────────────────────────────────────────── + UseCase("impressum", "Impressum (§5 TMG/DDG)", "document", + regulations=("TMG", "DDG", "MStV"), + verification_methods=("document",), + doc_types=("impressum",), scope_tokens=("impressum",), + categories=("compliance",), + keyword_tokens=("impressum", "anbieterkennzeichnung")), + UseCase("dse", "Datenschutzerklärung", "document", + regulations=("DSGVO",), + verification_methods=("document",), + doc_types=("dse",), scope_tokens=("dse",), + categories=("privacy", "data_protection"), + keyword_tokens=("datenschutz", "privacy")), + UseCase("agb", "AGB", "document", + regulations=("BGB",), + verification_methods=("document",), + doc_types=("agb",), scope_tokens=("agb",), + categories=("compliance",), + keyword_tokens=("geschäftsbedingungen", "agb")), + UseCase("cookie_banner", "Cookie-Banner & -Richtlinie", "document", + regulations=("TDDDG", "ePrivacy", "DSGVO"), + verification_methods=("document", "source_code"), + doc_types=("cookie",), + scope_tokens=("cookie_richtlinie", "banner_implementation", + "cmp_audit"), + categories=("privacy",), + keyword_tokens=("cookie", "consent", "einwilligung")), + UseCase("widerruf", "Widerrufsbelehrung", "document", + regulations=("BGB",), + verification_methods=("document",), + doc_types=("widerruf",), scope_tokens=("widerruf",), + categories=("compliance",), + keyword_tokens=("widerruf", "widerrufsbelehrung")), + UseCase("dsr", "Betroffenenrechte (DSR)", "document", + regulations=("DSGVO",), + verification_methods=("document", "it_process"), + scope_tokens=("process",), + categories=("privacy", "operations"), + keyword_tokens=("betroffenenrecht", "auskunft", "löschung", "dsr")), + UseCase("loeschkonzept", "Löschkonzept", "document", + regulations=("DSGVO",), + verification_methods=("document", "it_process"), + doc_types=("loeschkonzept",), + scope_tokens=("process", "accounting"), + categories=("data_protection",), + keyword_tokens=("löschung", "löschfrist", "aufbewahrung")), + UseCase("avv", "Auftragsverarbeitung (AVV)", "document", + regulations=("DSGVO",), + verification_methods=("document",), + doc_types=("avv",), scope_tokens=("avv", "jc"), + categories=("compliance",), + keyword_tokens=("auftragsverarbeitung", "avv")), + UseCase("dsfa", "Datenschutz-Folgenabschätzung", "document", + regulations=("DSGVO",), + verification_methods=("document", "it_process"), + doc_types=("dsfa",), scope_tokens=("tom", "process"), + categories=("risk", "privacy"), + keyword_tokens=("folgenabschätzung", "dsfa")), + # ── Security / Code-Use-Cases ─────────────────────────────────── + UseCase("code_security", "Code Security", "security", + regulations=("CRA", "OWASP", "ISO 27001"), + verification_methods=("source_code", "hybrid"), + categories=("testing", "application", "encryption", + "authentication", "identity"), + keyword_tokens=("sast", "secret", "dependency", "vulnerability", + "injection", "code")), + UseCase("network_security", "Network Security", "security", + regulations=("ISO 27001", "BSI", "NIS2"), + verification_methods=("network", "hybrid"), + categories=("network", "system", "operations"), + keyword_tokens=("firewall", "tls", "port", "segmentation", + "network", "header")), + # ── Querschnitt / Multi-Methode ───────────────────────────────── + UseCase("cra", "Cyber Resilience Act", "cross_cutting", + regulations=("CRA",), + verification_methods=("document", "source_code", "network", + "it_process"), + categories=("security", "supply_chain", "testing", "incident"), + keyword_tokens=("cra", "sbom", "konformität", "produktsicherheit")), + UseCase("isms", "ISMS (ISO 27001)", "cross_cutting", + regulations=("ISO 27001",), + verification_methods=("it_process", "document", "hybrid"), + categories=("governance", "security", "operations", "incident"), + keyword_tokens=("isms", "risikomanagement", "soa")), + UseCase("tisax", "TISAX", "cross_cutting", + regulations=("VDA ISA", "TISAX"), + verification_methods=("it_process", "document", "network", + "hybrid"), + categories=("security", "governance", "operations"), + keyword_tokens=("tisax", "vda", "prototypenschutz")), +) + + +REGISTRY: dict[str, UseCase] = {uc.key: uc for uc in _USE_CASES} + + +# canonical_controls.evidence_type / .verification_method → unsere Methode +# (fuer den deterministischen Seed; der LLM-Pass verfeinert). +_EVIDENCE_TO_METHOD: dict[str, str] = { + "document": "document", + "code": "source_code", + "code_review": "source_code", + "process": "it_process", + "tool": "network", + "hybrid": "hybrid", +} + + +def _reverse(attr: str) -> dict[str, list[str]]: + out: dict[str, list[str]] = {} + for uc in _USE_CASES: + if not uc.enabled: + continue + for tok in getattr(uc, attr): + out.setdefault(tok, []).append(uc.key) + return out + + +scope_token_to_use_cases: dict[str, list[str]] = _reverse("scope_tokens") +category_to_use_cases: dict[str, list[str]] = _reverse("categories") +doc_type_to_use_cases: dict[str, list[str]] = _reverse("doc_types") + + +def is_valid_use_case(key: str) -> bool: + return key in REGISTRY and REGISTRY[key].enabled + + +def is_valid_verification_method(method: str) -> bool: + return method in VERIFICATION_METHODS + + +def evidence_to_verification_method(value: str | None) -> str | None: + """Heuristik-Mapping fuer den Seed (None wenn unbekannt).""" + if not value: + return None + return _EVIDENCE_TO_METHOD.get(value.strip().lower()) + + +def enabled_use_cases() -> list[UseCase]: + return [uc for uc in _USE_CASES if uc.enabled] + + +def seed_classify( + scopes=(), categories=(), vmethods=(), etypes=(), +) -> tuple[list[str], str | None]: + """Deterministischer Seed (kein LLM): (use_cases, verification_method) + aus den aggregierten Member-Signalen einer Master Control — + scope_doc_type + category → Use Cases; verification_method/evidence_type + → Methode. Pure → testbar.""" + ucs: set[str] = set() + for s in scopes or (): + if s: + ucs.update(scope_token_to_use_cases.get(s, ())) + for c in categories or (): + if c: + ucs.update(category_to_use_cases.get(c, ())) + method: str | None = None + for v in list(vmethods or ()) + list(etypes or ()): + m = evidence_to_verification_method(v) + if m: + method = m + break + return sorted(ucs), method + + +def taxonomy_for_prompt() -> str: + """Kompakter Anker-Block fuer den LLM-Klassifizierer (gecacht).""" + lines = ["USE CASES (key — Label — Regulierungen — Methoden):"] + for uc in enabled_use_cases(): + lines.append( + f" {uc.key} — {uc.label} — {', '.join(uc.regulations) or '-'}" + f" — {', '.join(uc.verification_methods)}" + ) + lines.append("VERIFIKATIONS-METHODEN: " + ", ".join(VERIFICATION_METHODS)) + return "\n".join(lines) + + +def frontend_list() -> list[dict]: + """Schlanke Liste fuers Frontend-Dropdown (Twin: use-case-registry.ts).""" + return [ + {"key": uc.key, "label": uc.label, "group": uc.group, + "verification_methods": list(uc.verification_methods)} + for uc in enabled_use_cases() + ] + + +def registry_hash() -> str: + """Stabiler Hash → Re-Klassifizierung bei Taxonomie-Aenderung.""" + payload = json.dumps( + [[uc.key, uc.group, list(uc.regulations), + list(uc.verification_methods), list(uc.doc_types), + list(uc.scope_tokens), list(uc.categories)] + for uc in _USE_CASES], + sort_keys=True, ensure_ascii=False, + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() diff --git a/backend-compliance/migrations/149_mc_use_case_mappings.sql b/backend-compliance/migrations/149_mc_use_case_mappings.sql new file mode 100644 index 00000000..953ec348 --- /dev/null +++ b/backend-compliance/migrations/149_mc_use_case_mappings.sql @@ -0,0 +1,64 @@ +-- Migration 149: MC <-> Use-Case Mappings (n:m) + Verifikations-Methode +-- Fundament fuer das Use-Case-Mapping-System: jede Master Control auf +-- >=1 Use Case (n:m) + genau eine Verifikations-Methode. Strikt add-only. +-- [migration-approved] + +SET search_path TO compliance, public; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.tables + WHERE table_schema = 'compliance' + AND table_name = 'master_controls') THEN + + -- 1. n:m Mapping: Master Control -> Use Case + CREATE TABLE IF NOT EXISTS mc_use_case_mappings ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + master_control_uuid UUID NOT NULL + REFERENCES master_controls(id) ON DELETE CASCADE, + master_control_id VARCHAR(60) NOT NULL, + -- Registry-Key; KEIN SQL-CHECK -> neuer Use Case ohne Migration. + -- Validierung erfolgt in der App gegen use_case_registry. + use_case VARCHAR(40) NOT NULL, + method VARCHAR(20) NOT NULL DEFAULT 'auto' + CHECK (method IN ('auto', 'manual', 'seed')), + confidence NUMERIC(3,2) DEFAULT 1.00 + CHECK (confidence >= 0 AND confidence <= 1), + rationale TEXT, + model VARCHAR(60), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (master_control_uuid, use_case) + ); + CREATE INDEX IF NOT EXISTS idx_mcuc_use_case + ON mc_use_case_mappings(use_case); + CREATE INDEX IF NOT EXISTS idx_mcuc_uc_mc + ON mc_use_case_mappings(use_case, master_control_uuid); + + -- 2. Verifikations-Methode pro MC (1 Zeile/MC) + CREATE TABLE IF NOT EXISTS mc_verification ( + master_control_uuid UUID PRIMARY KEY + REFERENCES master_controls(id) ON DELETE CASCADE, + master_control_id VARCHAR(60) NOT NULL, + verification_method VARCHAR(20) NOT NULL, -- App-validiert + method VARCHAR(20) NOT NULL DEFAULT 'auto' + CHECK (method IN ('auto', 'manual', 'seed')), + confidence NUMERIC(3,2) DEFAULT 1.00, + rationale TEXT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_mcv_method + ON mc_verification(verification_method); + + -- 3. Sync-State: registry_hash -> Re-Klassifizierung bei Taxonomie-Aenderung + CREATE TABLE IF NOT EXISTS mc_use_case_sync_state ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + registry_hash VARCHAR(64), + stage VARCHAR(20), + total_mappings INTEGER DEFAULT 0, + mcs_classified INTEGER DEFAULT 0, + last_run_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + END IF; +END $$; diff --git a/backend-compliance/scripts/classify_mc_use_cases.py b/backend-compliance/scripts/classify_mc_use_cases.py new file mode 100644 index 00000000..60421cdb --- /dev/null +++ b/backend-compliance/scripts/classify_mc_use_cases.py @@ -0,0 +1,98 @@ +"""Klassifiziert Master Controls auf Use Cases (n:m) + Verifikations-Methode. + +Stufe 1 — Seed (kein LLM, gratis): aus vorhandenen Member-Signalen + (canonical_controls.scope_doc_type / .category / .verification_method / + .evidence_type) via `use_case_registry.seed_classify`. +Stufe 2 — LLM (Phase 3): Multi-Label gegen die Registry-Taxonomie. [TODO] + +Lauf im Container: + docker exec bp-compliance-backend \ + python /app/scripts/classify_mc_use_cases.py [--limit N] +""" + +from __future__ import annotations + +import argparse +import asyncio +import os + +import asyncpg + +from compliance.data import use_case_registry as reg + +_AGG_SQL = """ +SELECT mc.id AS mc_uuid, mc.master_control_id, + array_agg(DISTINCT cc.scope_doc_type) AS scopes, + array_agg(DISTINCT cc.category) AS categories, + array_agg(DISTINCT cc.verification_method) AS vmethods, + array_agg(DISTINCT cc.evidence_type) AS etypes +FROM compliance.master_controls mc +JOIN compliance.master_control_members mcm + ON mcm.master_control_uuid = mc.id +JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid +GROUP BY mc.id, mc.master_control_id +""" + + +async def run_seed(conn, limit: int = 0) -> dict: + """Deterministischer Seed → mc_use_case_mappings + mc_verification. + Idempotent (ON CONFLICT DO NOTHING); ueberschreibt 'manual' nie.""" + sql = _AGG_SQL + (f" LIMIT {limit}" if limit > 0 else "") + rows = await conn.fetch(sql) + n_mc_with_uc = n_uc_rows = n_verif = 0 + for r in rows: + ucs, method = reg.seed_classify( + r["scopes"], r["categories"], r["vmethods"], r["etypes"], + ) + for uc in ucs: + await conn.execute( + """INSERT INTO compliance.mc_use_case_mappings + (master_control_uuid, master_control_id, use_case, + method, confidence, rationale) + VALUES ($1,$2,$3,'seed',0.6,'deterministic seed') + ON CONFLICT (master_control_uuid, use_case) DO NOTHING""", + r["mc_uuid"], r["master_control_id"], uc, + ) + n_uc_rows += 1 + if ucs: + n_mc_with_uc += 1 + if method: + await conn.execute( + """INSERT INTO compliance.mc_verification + (master_control_uuid, master_control_id, + verification_method, method, confidence, rationale) + VALUES ($1,$2,$3,'seed',0.6,'deterministic seed') + ON CONFLICT (master_control_uuid) DO NOTHING""", + r["mc_uuid"], r["master_control_id"], method, + ) + n_verif += 1 + total = await conn.fetchval( + "SELECT count(*) FROM compliance.mc_use_case_mappings") + await conn.execute( + """INSERT INTO compliance.mc_use_case_sync_state + (registry_hash, stage, total_mappings, mcs_classified) + VALUES ($1,'seed',$2,$3)""", + reg.registry_hash(), total, n_mc_with_uc, + ) + return {"mcs": len(rows), "mcs_with_use_case": n_mc_with_uc, + "use_case_rows": n_uc_rows, "verification_rows": n_verif} + + +async def _main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--limit", type=int, default=0) + ap.add_argument("--with-llm", action="store_true", + help="Phase 3 — noch nicht implementiert") + args = ap.parse_args() + if args.with_llm: + raise SystemExit("LLM-Stufe (Phase 3) noch nicht implementiert.") + conn = await asyncpg.connect(os.getenv("DATABASE_URL")) + try: + stats = await run_seed(conn, args.limit) + finally: + await conn.close() + print("Seed fertig:", stats) + + +if __name__ == "__main__": + asyncio.run(_main()) diff --git a/backend-compliance/tests/test_use_case_registry.py b/backend-compliance/tests/test_use_case_registry.py new file mode 100644 index 00000000..ba6815ed --- /dev/null +++ b/backend-compliance/tests/test_use_case_registry.py @@ -0,0 +1,112 @@ +"""Tests fuer das Use-Case-Register (Phase 0).""" + +from __future__ import annotations + +from compliance.data import use_case_registry as reg + + +def test_keys_unique_and_nonempty(): + keys = [uc.key for uc in reg._USE_CASES] + assert len(keys) == len(set(keys)) + for uc in reg._USE_CASES: + assert uc.key and uc.label + assert uc.group in reg.USE_CASE_GROUPS + + +def test_every_use_case_has_a_verification_method_in_taxonomy(): + for uc in reg._USE_CASES: + assert uc.verification_methods, uc.key + for m in uc.verification_methods: + assert m in reg.VERIFICATION_METHODS, (uc.key, m) + + +def test_not_only_document_use_cases(): + # Der entscheidende Punkt (User-Vorgabe): >=50% Source-Code/IT-Prozess. + keys = set(reg.REGISTRY) + for k in ("code_security", "network_security", "cra", "isms", "tisax"): + assert k in keys + methods = {m for uc in reg._USE_CASES for m in uc.verification_methods} + assert {"source_code", "network", "it_process"} <= methods + + +def test_scope_tokens_cover_migration_145(): + # Alle bedeutungstragenden Migration-145-scope_doc_type-Werte ('other' + # ausgenommen) sind mindestens einem Use Case zugeordnet. + meaningful = { + "cookie_richtlinie", "dse", "banner_implementation", "cmp_audit", + "tom", "avv", "jc", "impressum", "agb", "widerruf", "process", + "accounting", + } + assert meaningful <= set(reg.scope_token_to_use_cases) + + +def test_taxonomy_for_prompt_lists_all_enabled(): + txt = reg.taxonomy_for_prompt() + for uc in reg.enabled_use_cases(): + assert uc.key in txt + for m in reg.VERIFICATION_METHODS: + assert m in txt + + +def test_validators(): + assert reg.is_valid_use_case("impressum") + assert not reg.is_valid_use_case("ghost") + assert reg.is_valid_verification_method("source_code") + assert not reg.is_valid_verification_method("telepathy") + + +def test_evidence_mapping(): + assert reg.evidence_to_verification_method("code") == "source_code" + assert reg.evidence_to_verification_method("code_review") == "source_code" + assert reg.evidence_to_verification_method("process") == "it_process" + assert reg.evidence_to_verification_method("document") == "document" + assert reg.evidence_to_verification_method(None) is None + assert reg.evidence_to_verification_method("xyz") is None + + +def test_registry_hash_stable_and_hex(): + h1 = reg.registry_hash() + assert h1 == reg.registry_hash() + assert len(h1) == 64 and all(c in "0123456789abcdef" for c in h1) + + +def test_frontend_list_shape(): + fl = reg.frontend_list() + assert len(fl) == len(reg.enabled_use_cases()) + for e in fl: + assert set(e) == {"key", "label", "group", "verification_methods"} + + +# ── Seed-Klassifizierung (Phase 1) ────────────────────────────────── + + +def test_seed_scope_token_to_use_case(): + ucs, _ = reg.seed_classify(scopes=["impressum"]) + assert "impressum" in ucs + + +def test_seed_category_to_use_case(): + ucs, _ = reg.seed_classify(categories=["network"]) + assert "network_security" in ucs + + +def test_seed_verification_method_from_evidence_and_method(): + _, m = reg.seed_classify(etypes=["code"]) + assert m == "source_code" + _, m2 = reg.seed_classify(vmethods=["document"]) + assert m2 == "document" + _, m3 = reg.seed_classify(etypes=["process"]) + assert m3 == "it_process" + + +def test_seed_multi_label(): + # scope 'process' haengt an mehreren Use Cases (dsr/loeschkonzept/dsfa) + ucs, _ = reg.seed_classify(scopes=["process"]) + assert len(ucs) >= 2 + + +def test_seed_empty_and_none_safe(): + ucs, m = reg.seed_classify(scopes=[None], categories=[None], + vmethods=[None], etypes=[None]) + assert ucs == [] and m is None + assert reg.seed_classify() == ([], None)