diff --git a/backend-compliance/compliance/services/decomposition_pass.py b/backend-compliance/compliance/services/decomposition_pass.py index cb329dd..afb557d 100644 --- a/backend-compliance/compliance/services/decomposition_pass.py +++ b/backend-compliance/compliance/services/decomposition_pass.py @@ -436,6 +436,1280 @@ Das Control muss UMSETZBAR sein — keine Gesetzesparaphrase. Antworte NUR als JSON. Keine Erklärungen.""" +# --------------------------------------------------------------------------- +# Deterministic Atomic Control Composition Engine v2 +# --------------------------------------------------------------------------- +# Transforms obligation candidates into atomic controls WITHOUT LLM. +# +# Pipeline: +# 1. split_compound_action() — split "erstellen und implementieren" → 2 +# 2. classify_action() — 18 fine-grained action types +# 3. classify_object() — policy / technical / process / register / … +# 4. trigger_qualifier() — periodic / event / continuous → timing text +# 5. template lookup — (action_type, object_class) → test + evidence +# 6. compose — assemble AtomicControlCandidate +# --------------------------------------------------------------------------- + +# ── 1. Compound Action Splitter ────────────────────────────────────────── + +_COMPOUND_SPLIT_RE = re.compile( + r"\s+(?:und|sowie|als\s+auch|,\s*(?:und|sowie))\s+", re.IGNORECASE +) + +# Phrases that should never be split (stylistic variants, not separate actions) +_NO_SPLIT_PHRASES: set[str] = { + "pflegen und aufrechterhalten", + "aufrechterhalten und pflegen", + "erkennen und verhindern", + "verhindern und erkennen", + "sichern und schützen", + "schützen und sichern", + "schützen und absichern", + "absichern und schützen", + "ermitteln und bewerten", + "bewerten und ermitteln", + "prüfen und überwachen", + "überwachen und prüfen", +} + + +def _split_compound_action(action: str) -> list[str]: + """Split compound actions into individual sub-actions. + + Only splits if: + - the parts map to *different* action types + - the phrase is not in the no-split list + + Keeps phrases like 'aufrechterhalten und pflegen' together + because both map to 'maintain'. + """ + if not action: + return [action] + + # Check no-split list first + if action.lower().strip() in _NO_SPLIT_PHRASES: + return [action] + + parts = _COMPOUND_SPLIT_RE.split(action.strip()) + if len(parts) <= 1: + return [action] + + # Classify each part — only split if types differ + types = [_classify_action(p.strip()) for p in parts] + if len(set(types)) > 1: + return [p.strip() for p in parts if p.strip()] + + return [action] + + +# ── 2. Action Type Classification (18 types) ──────────────────────────── + +_ACTION_PRIORITY = [ + "implement", "configure", "encrypt", "restrict_access", + "monitor", "review", "assess", "audit", + "test", "verify", "validate", + "report", "notify", "train", + "delete", "retain", "ensure", + "define", "document", "maintain", + "approve", "remediate", + "perform", "obtain", +] + +_ACTION_KEYWORDS: list[tuple[str, str]] = [ + # Multi-word patterns first (longest match wins) + ("aktuell halten", "maintain"), + ("aufrechterhalten", "maintain"), + ("sicherstellen", "ensure"), + ("gewährleisten", "ensure"), + ("benachrichtigen", "notify"), + ("sensibilisieren", "train"), + ("authentifizieren", "restrict_access"), + ("verschlüsseln", "encrypt"), + ("implementieren", "implement"), + ("konfigurieren", "configure"), + ("bereitstellen", "implement"), + ("protokollieren", "document"), + ("dokumentieren", "document"), + ("kontrollieren", "monitor"), + ("installieren", "implement"), + ("autorisieren", "restrict_access"), + ("beschränken", "restrict_access"), + ("berechtigen", "restrict_access"), + ("aufbewahren", "retain"), + ("archivieren", "retain"), + ("überwachen", "monitor"), + ("überprüfen", "review"), + ("auditieren", "audit"), + ("informieren", "notify"), + ("analysieren", "assess"), + ("verifizieren", "verify"), + ("validieren", "validate"), + ("evaluieren", "assess"), + ("integrieren", "implement"), + ("aktivieren", "configure"), + ("einrichten", "configure"), + ("einführen", "implement"), + ("unterweisen", "train"), + ("durchführen", "perform"), + ("verarbeiten", "perform"), + ("vernichten", "delete"), + ("entfernen", "delete"), + ("absichern", "implement"), + ("schützen", "implement"), + ("bewerten", "assess"), + ("umsetzen", "implement"), + ("aufbauen", "implement"), + ("erstellen", "document"), + ("definieren", "define"), + ("festlegen", "define"), + ("vorgeben", "define"), + ("verfassen", "document"), + ("einholen", "obtain"), + ("genehmigen", "approve"), + ("freigeben", "approve"), + ("zulassen", "approve"), + ("beheben", "remediate"), + ("korrigieren", "remediate"), + ("beseitigen", "remediate"), + ("nachbessern", "remediate"), + ("speichern", "retain"), + ("mitteilen", "notify"), + ("berichten", "report"), + ("schulen", "train"), + ("melden", "report"), + ("prüfen", "review"), + ("testen", "test"), + ("führen", "document"), + ("pflegen", "maintain"), + ("wahren", "maintain"), + ("löschen", "delete"), + ("angeben", "document"), + ("beifügen", "document"), + # English fallbacks + ("implement", "implement"), + ("configure", "configure"), + ("establish", "define"), + ("document", "document"), + ("maintain", "maintain"), + ("monitor", "monitor"), + ("review", "review"), + ("assess", "assess"), + ("audit", "audit"), + ("encrypt", "encrypt"), + ("restrict", "restrict_access"), + ("authorize", "restrict_access"), + ("verify", "verify"), + ("validate", "validate"), + ("report", "report"), + ("notify", "notify"), + ("train", "train"), + ("test", "test"), + ("delete", "delete"), + ("retain", "retain"), + ("ensure", "ensure"), + ("approve", "approve"), + ("remediate", "remediate"), + ("perform", "perform"), + ("obtain", "obtain"), +] + + +def _classify_action(action: str) -> str: + """Classify an obligation action string into one of 18 action types. + + For compound actions, returns the highest-priority matching type. + """ + if not action: + return "default" + action_lower = action.lower().strip() + + matches: set[str] = set() + for keyword, atype in _ACTION_KEYWORDS: + if keyword in action_lower: + matches.add(atype) + + if not matches: + return "default" + + for prio in _ACTION_PRIORITY: + if prio in matches: + return prio + + return next(iter(matches)) + + +# ── 3. Object Class Classification ────────────────────────────────────── + +_OBJECT_CLASS_KEYWORDS: dict[str, list[str]] = { + # ── Governance / Documentation ──────────────────────────── + "policy": [ + "richtlinie", "policy", "konzept", "strategie", "leitlinie", + "vorgabe", "regelung", "ordnung", "anweisung", "standard", + "rahmenwerk", "sicherheitskonzept", "datenschutzkonzept", + ], + "procedure": [ + "verfahren", "workflow", "ablauf", + "vorgehensweise", "methodik", "prozedur", "handlungsanweisung", + ], + "register": [ + "verzeichnis", "register", "inventar", "liste", "katalog", + "übersicht", "bestandsaufnahme", + ], + "record": [ + "protokoll", "log", "aufzeichnung", "nachweis", + "evidenz", "artefakt", "dokumentation", + ], + "report": [ + "meldung", "bericht", "report", "benachrichtigung", + "mitteilung", "anzeige", "statusbericht", + ], + # ── Technical / Security ────────────────────────────────── + "technical_control": [ + "mfa", "firewall", "verschlüsselung", "backup", "antivirus", + "ids", "ips", "waf", "vpn", "tls", "ssl", + "patch", "update", "härtung", "segmentierung", + "alarmierung", "monitoring", + ], + "access_control": [ + "authentifizierung", "autorisierung", "zugriff", + "berechtigung", "passwort", "kennwort", "anmeldung", + "sso", "rbac", "session", + ], + "cryptographic_control": [ + "schlüssel", "zertifikat", "signatur", "kryptographi", + "cipher", "hash", "token", + ], + "configuration": [ + "konfiguration", "einstellung", "parameter", + "baseline", "hardening", "härtungsprofil", + ], + "account": [ + "account", "konto", "benutzer", "privilegiert", + "admin", "root", "dienstkonto", "servicekonto", + ], + # ── Data / Systems ──────────────────────────────────────── + "system": [ + "system", "plattform", "dienst", "service", "anwendung", + "software", "komponente", "infrastruktur", "netzwerk", + "server", "datenbank", "produkt", "gerät", "endgerät", + ], + "data": [ + "daten", "information", "personenbezogen", "datensatz", + "datei", "inhalt", "verarbeitungstätigkeit", + ], + "interface": [ + "schnittstelle", "interface", "api", "integration", + "datenfluss", "datenübermittlung", + ], + # ── People / Org ────────────────────────────────────────── + "role": [ + "mitarbeiter", "personal", "rolle", "beauftragter", + "verantwortlicher", "team", "abteilung", "beschäftigte", + ], + "training": [ + "schulung", "training", "sensibilisierung", "awareness", + "unterweisung", "fortbildung", "qualifikation", + ], + # ── Incident / Risk ─────────────────────────────────────── + "incident": [ + "vorfall", "incident", "sicherheitsvorfall", "störung", + "notfall", "krise", "bedrohung", + ], + "risk_artifact": [ + "risiko", "schwachstelle", "vulnerability", "gefährdung", + "risikoanalyse", "risikobewertung", "schutzbedarfsfeststellung", + ], + # ── Process / Consent ──────────────────────────────────── + "process": [ + "prozess", "geschäftsprozess", "betriebsprozess", + "managementprozess", "steuerungsprozess", + ], + "consent": [ + "einwilligung", "consent", "einverständnis", + "zustimmung", "opt-in", "opt-out", + ], +} + + +def _classify_object(object_: str) -> str: + """Classify the obligation object into a domain class.""" + if not object_: + return "general" + obj_lower = object_.lower() + for obj_class, keywords in _OBJECT_CLASS_KEYWORDS.items(): + if any(k in obj_lower for k in keywords): + return obj_class + return "general" + + +# ── 4. Trigger / Timing Qualifier ─────────────────────────────────────── + +_FREQUENCY_PATTERNS: list[tuple[str, str]] = [ + (r"jährl", "jährlich"), + (r"quartal", "quartalsweise"), + (r"halbjähr", "halbjährlich"), + (r"monatl", "monatlich"), + (r"wöchentl", "wöchentlich"), + (r"regelmäßig", "regelmässig"), + (r"72\s*Stunden", "innerhalb von 72 Stunden"), + (r"unverzüglich", "unverzüglich"), + (r"ohne\s+Verzögerung", "ohne unangemessene Verzögerung"), + (r"vor\s+Inbetriebnahme", "vor Inbetriebnahme"), + (r"vor\s+Markteinführung", "vor Markteinführung"), + (r"vor\s+Freigabe", "vor Freigabe"), +] + + +def _extract_trigger_qualifier( + trigger_type: Optional[str], obligation_text: str, +) -> str: + """Extract timing/trigger context for test procedures.""" + # Try to find specific frequency in obligation text + for pattern, qualifier in _FREQUENCY_PATTERNS: + if re.search(pattern, obligation_text, re.IGNORECASE): + return qualifier + + if trigger_type == "event": + return "bei Eintreten des auslösenden Ereignisses" + if trigger_type == "periodic": + return "periodisch" + return "" # continuous → no special qualifier + + +# ── 4b. Structured Timing Extraction ──────────────────────────────────── + +_STRUCTURED_FREQUENCY_MAP: list[tuple[str, Optional[int], Optional[str]]] = [ + # (regex_pattern, deadline_hours, frequency) + (r"72\s*Stunden", 72, None), + (r"48\s*Stunden", 48, None), + (r"24\s*Stunden", 24, None), + (r"unverzüglich", 0, None), + (r"ohne\s+Verzögerung", 0, None), + (r"sofort", 0, None), + (r"jährl", None, "yearly"), + (r"halbjähr", None, "semi_annually"), + (r"quartal", None, "quarterly"), + (r"monatl", None, "monthly"), + (r"wöchentl", None, "weekly"), + (r"täglich", None, "daily"), + (r"regelmäßig", None, "periodic"), + (r"periodisch", None, "periodic"), + (r"vor\s+Inbetriebnahme", None, "before_deployment"), + (r"vor\s+Markteinführung", None, "before_launch"), + (r"vor\s+Freigabe", None, "before_release"), +] + + +def _extract_structured_timing( + obligation_text: str, +) -> tuple[Optional[int], Optional[str]]: + """Extract deadline_hours and frequency from obligation text. + + Returns (deadline_hours, frequency). Both may be None. + """ + for pattern, deadline, freq in _STRUCTURED_FREQUENCY_MAP: + if re.search(pattern, obligation_text, re.IGNORECASE): + return (deadline, freq) + return (None, None) + + +# ── 5. Template Matrix: (action_type, object_class) → templates ───────── +# +# Specific combos override base templates. Lookup order: +# 1. _SPECIFIC_TEMPLATES[(action_type, object_class)] +# 2. _ACTION_TEMPLATES[action_type] +# 3. _DEFAULT_ACTION_TEMPLATE + +_ACTION_TEMPLATES: dict[str, dict[str, list[str]]] = { + # ── Create / Define / Document ───────────────────────────── + "define": { + "test_procedure": [ + "Prüfung, ob {object} definiert und formal freigegeben ist", + "Review der Inhalte auf Vollständigkeit und Angemessenheit", + "Verifizierung, dass {object} den Betroffenen kommuniziert wurde", + ], + "evidence": [ + "Freigegebenes Dokument mit Geltungsbereich", + "Kommunikationsnachweis (E-Mail, Intranet, Schulung)", + ], + }, + "document": { + "test_procedure": [ + "Prüfung, ob {object} dokumentiert und aktuell ist", + "Sichtung der Dokumentation auf Vollständigkeit", + "Verifizierung der Versionierung und des Review-Zyklus", + ], + "evidence": [ + "Dokument mit Versionshistorie", + "Freigabenachweis (Unterschrift/Approval)", + ], + }, + "maintain": { + "test_procedure": [ + "Prüfung, ob {object} aktuell gehalten wird", + "Vergleich der letzten Aktualisierung mit dem Review-Zyklus", + "Stichprobe: Änderungen nach relevanten Ereignissen nachvollzogen", + ], + "evidence": [ + "Änderungshistorie mit Datum und Verantwortlichem", + "Nachweis des letzten Reviews", + ], + }, + # ── Implement / Configure ────────────────────────────────── + "implement": { + "test_procedure": [ + "Prüfung der technischen Umsetzung von {object}", + "Funktionstest der implementierten Massnahme", + "Review der Konfiguration gegen die Anforderungsspezifikation", + ], + "evidence": [ + "Konfigurationsnachweis (Screenshot/Export)", + "Implementierungsdokumentation", + ], + }, + "configure": { + "test_procedure": [ + "Prüfung der Konfiguration von {object} gegen Soll-Vorgaben", + "Vergleich mit Hardening-Baseline oder Best Practice", + "Automatisierter Konfigurationsscan (falls verfügbar)", + ], + "evidence": [ + "Konfigurationsexport mit Soll-/Ist-Vergleich", + "Scan-Bericht oder Compliance-Check-Ergebnis", + ], + }, + # ── Monitor / Review / Assess / Audit ────────────────────── + "monitor": { + "test_procedure": [ + "Prüfung der laufenden Überwachung von {object}", + "Stichprobe der Protokolle/Logs der letzten 3 Monate", + "Verifizierung der Alarmierungs- und Eskalationsprozesse", + ], + "evidence": [ + "Monitoring-Dashboard-Export oder Log-Auszüge", + "Alarmierungsregeln und Eskalationsmatrix", + ], + }, + "review": { + "test_procedure": [ + "Prüfung, ob {object} im vorgesehenen Zyklus überprüft wurde", + "Sichtung des Review-Protokolls auf Massnahmenableitung", + "Verifizierung der Umsetzung identifizierter Massnahmen", + ], + "evidence": [ + "Review-Protokoll mit Datum und Teilnehmern", + "Massnahmenplan mit Umsetzungsstatus", + ], + }, + "assess": { + "test_procedure": [ + "Prüfung der Bewertungsmethodik für {object}", + "Sichtung der letzten Bewertungsergebnisse", + "Verifizierung, dass Ergebnisse in Massnahmen überführt wurden", + ], + "evidence": [ + "Bewertungsbericht mit Methodik und Ergebnissen", + "Abgeleiteter Massnahmenplan", + ], + }, + "audit": { + "test_procedure": [ + "Prüfung des Audit-Plans und der Audit-Durchführung für {object}", + "Sichtung der Audit-Berichte und Findings", + "Verifizierung der Nachverfolgung offener Findings", + ], + "evidence": [ + "Audit-Bericht mit Findings und Empfehlungen", + "Finding-Tracker mit Umsetzungsstatus", + ], + }, + # ── Test / Verify / Validate ─────────────────────────────── + "test": { + "test_procedure": [ + "Review der Testpläne und -methodik für {object}", + "Stichprobe der Testergebnisse und Massnahmenableitung", + "Prüfung der Testabdeckung und -häufigkeit", + ], + "evidence": [ + "Testprotokoll mit Ergebnissen", + "Testplan und Abdeckungsbericht", + ], + }, + "verify": { + "test_procedure": [ + "Prüfung der Verifikationsmethodik für {object}", + "Nachvollzug der Verifikationsergebnisse gegen Spezifikation", + "Prüfung, ob alle Anforderungen abgedeckt sind", + ], + "evidence": [ + "Verifikationsbericht mit Soll-/Ist-Abgleich", + "Anforderungs-Traceability-Matrix", + ], + }, + "validate": { + "test_procedure": [ + "Prüfung der Validierungsmethodik für {object}", + "Bewertung, ob die Massnahme den Zweck im Praxiseinsatz erfüllt", + "Auswertung von Nutzerfeedback oder Betriebsdaten", + ], + "evidence": [ + "Validierungsbericht", + "Praxisnachweis (Betriebsdaten, Nutzerfeedback)", + ], + }, + # ── Report / Notify ──────────────────────────────────────── + "report": { + "test_procedure": [ + "Prüfung des Meldeprozesses für {object}", + "Stichprobe gemeldeter Vorfälle auf Vollständigkeit und Fristeneinhaltung", + "Verifizierung der Meldekanäle und Zuständigkeiten", + ], + "evidence": [ + "Meldeprozess-Dokumentation", + "Nachweise über erfolgte Meldungen mit Zeitstempeln", + ], + }, + "notify": { + "test_procedure": [ + "Prüfung des Benachrichtigungsprozesses für {object}", + "Verifizierung der Empfänger und Kommunikationskanäle", + "Stichprobe: Benachrichtigungen fristgerecht versendet", + ], + "evidence": [ + "Benachrichtigungsvorlagen und Verteiler", + "Versandnachweise mit Zeitstempeln", + ], + }, + # ── Train ────────────────────────────────────────────────── + "train": { + "test_procedure": [ + "Prüfung der Schulungsinhalte und -unterlagen zu {object}", + "Verifizierung der Teilnehmerlisten und Schulungsfrequenz", + "Stichprobe: Wissensstand durch Befragung oder Kurztest", + ], + "evidence": [ + "Schulungsunterlagen und Schulungsplan", + "Teilnehmerlisten mit Datum und Unterschrift", + "Ergebnisse von Wissenstests (falls durchgeführt)", + ], + }, + # ── Access / Encrypt ─────────────────────────────────────── + "restrict_access": { + "test_procedure": [ + "Review der Zugriffsberechtigungen für {object}", + "Prüfung der Berechtigungsmatrix auf Aktualität und Least-Privilege", + "Stichprobe: Entzug von Rechten bei Rollenwechsel/Austritt", + ], + "evidence": [ + "Aktuelle Berechtigungsmatrix", + "Zugriffsprotokolle der letzten 3 Monate", + "Nachweis des letzten Berechtigungs-Reviews", + ], + }, + "encrypt": { + "test_procedure": [ + "Prüfung der Verschlüsselungskonfiguration für {object}", + "Verifizierung der Algorithmen und Schlüssellängen gegen BSI-Empfehlungen", + "Prüfung des Schlüsselmanagement-Prozesses (Rotation, Speicherung)", + ], + "evidence": [ + "Kryptographie-Konzept", + "Zertifikats- und Schlüsselinventar", + "Schlüsselrotations-Nachweis", + ], + }, + # ── Delete / Retain ──────────────────────────────────────── + "delete": { + "test_procedure": [ + "Prüfung des Löschkonzepts für {object}", + "Verifizierung der Löschfristen und automatisierten Löschmechanismen", + "Stichprobe: Löschung nach Fristablauf tatsächlich durchgeführt", + ], + "evidence": [ + "Löschkonzept mit definierten Fristen", + "Löschprotokolle oder -nachweise", + ], + }, + "retain": { + "test_procedure": [ + "Prüfung der Aufbewahrungsfristen und -orte für {object}", + "Verifizierung der Zugriffskontrollen auf archivierte Daten", + "Prüfung der automatischen Löschung nach Ablauf der Aufbewahrungsfrist", + ], + "evidence": [ + "Aufbewahrungsrichtlinie mit Fristen", + "Speicherort-Dokumentation mit Zugriffskonzept", + ], + }, + # ── Ensure (catch-all for sicherstellen/gewährleisten) ───── + "ensure": { + "test_procedure": [ + "Prüfung, ob Massnahmen für {object} wirksam umgesetzt sind", + "Stichprobenprüfung der Einhaltung im operativen Betrieb", + "Review der zugehörigen Prozessdokumentation", + ], + "evidence": [ + "Nachweis der Umsetzung (Konfiguration/Prozess)", + "Prüfprotokoll der letzten Überprüfung", + ], + }, + # ── Perform / Obtain ─────────────────────────────────────── + "perform": { + "test_procedure": [ + "Prüfung der Durchführung von {object}", + "Verifizierung der Zuständigkeiten und Freigabeschritte", + "Stichprobe der Durchführung anhand aktueller Fälle", + ], + "evidence": [ + "Durchführungsnachweise (Tickets, Protokolle)", + "Prozessdokumentation mit Verantwortlichkeiten", + ], + }, + "obtain": { + "test_procedure": [ + "Prüfung des Einholungsprozesses für {object}", + "Verifizierung der Vollständigkeit und Gültigkeit", + "Stichprobe: Einholung vor Beginn der Verarbeitung nachgewiesen", + ], + "evidence": [ + "Nachweise der Einholung (Einwilligungen, Freigaben)", + "Gültigkeitsprüfung mit Zeitstempeln", + ], + }, + # ── Approve / Remediate ─────────────────────────────────── + "approve": { + "test_procedure": [ + "Prüfung des Genehmigungsprozesses für {object}", + "Verifizierung der Freigabeberechtigungen und Eskalationswege", + "Stichprobe: Genehmigung vor Umsetzung/Nutzung nachgewiesen", + ], + "evidence": [ + "Freigabenachweis (Signatur, Ticket, Workflow)", + "Genehmigungsmatrix mit Zuständigkeiten", + ], + }, + "remediate": { + "test_procedure": [ + "Prüfung des Behebungsprozesses für {object}", + "Verifizierung der Korrekturmassnahmen und Wirksamkeit", + "Stichprobe: Abweichungen fristgerecht behoben", + ], + "evidence": [ + "Korrekturmassnahmen-Dokumentation", + "Nachprüfprotokoll der Wirksamkeit", + ], + }, +} + +# ── Specific (action_type, object_class) overrides ─────────────────────── + +_SPECIFIC_TEMPLATES: dict[tuple[str, str], dict[str, list[str]]] = { + ("implement", "policy"): { + "test_procedure": [ + "Prüfung, ob {object} dokumentiert, freigegeben und in relevanten Prozessen umgesetzt ist", + "Interview mit Prozessverantwortlichen zur tatsächlichen Umsetzung", + "Stichprobe: Nachweis der Umsetzung in der Praxis", + ], + "evidence": [ + "Freigegebenes Richtliniendokument", + "Nachweis der Kommunikation an Betroffene", + "Stichproben der operativen Umsetzung", + ], + }, + ("implement", "technical_control"): { + "test_procedure": [ + "Prüfung der technischen Konfiguration von {object}", + "Funktionstest: Wirksamkeit der Massnahme verifizieren", + "Vulnerability-Scan oder Penetrationstest (falls anwendbar)", + ], + "evidence": [ + "Konfigurationsnachweis (Screenshot/Export)", + "Testprotokoll mit Ergebnissen", + "Scan-Bericht (falls durchgeführt)", + ], + }, + ("implement", "process"): { + "test_procedure": [ + "Prüfung der Prozessdokumentation für {object}", + "Verifizierung, dass der Prozess operativ gelebt wird", + "Stichprobe: Prozessdurchführung anhand aktueller Fälle", + ], + "evidence": [ + "Prozessdokumentation mit RACI-Matrix", + "Durchführungsnachweise der letzten 3 Monate", + ], + }, + ("define", "policy"): { + "test_procedure": [ + "Prüfung, ob {object} formal definiert und durch Management freigegeben ist", + "Review des Geltungsbereichs und der Adressaten", + "Verifizierung der regelmässigen Aktualisierung", + ], + "evidence": [ + "Freigegebene Policy mit Unterschrift der Geschäftsleitung", + "Geltungsbereich und Verteiler", + "Letzte Aktualisierung mit Änderungshistorie", + ], + }, + ("monitor", "system"): { + "test_procedure": [ + "Prüfung der Monitoring-Konfiguration für {object}", + "Stichprobe der System-Logs und Alerts der letzten 3 Monate", + "Verifizierung: Alerts führen zu dokumentierten Reaktionen", + ], + "evidence": [ + "Monitoring-Dashboard-Export", + "Alert-Konfiguration und Eskalationsregeln", + "Incident-Tickets aus Alert-Eskalation", + ], + }, + ("monitor", "incident"): { + "test_procedure": [ + "Prüfung des Incident-Monitoring-Prozesses für {object}", + "Stichprobe erkannter Vorfälle auf Reaktionszeit", + "Verifizierung der Eskalationswege", + ], + "evidence": [ + "Incident-Log mit Erkennungs- und Reaktionszeiten", + "Eskalationsmatrix", + ], + }, + ("review", "policy"): { + "test_procedure": [ + "Prüfung, ob {object} im vorgesehenen Zyklus durch Management reviewed wurde", + "Sichtung des Review-Protokolls auf Aktualisierungsbedarf", + "Verifizierung, dass Änderungen umgesetzt wurden", + ], + "evidence": [ + "Review-Protokoll mit Datum und Teilnehmern", + "Aktualisierte Version der Richtlinie (falls geändert)", + ], + }, + ("assess", "incident"): { + "test_procedure": [ + "Prüfung der Risikobewertung für {object}", + "Nachvollzug der Bewertungskriterien (Schwere, Auswirkung, Wahrscheinlichkeit)", + "Verifizierung der abgeleiteten Massnahmen", + ], + "evidence": [ + "Risikobewertungs-Matrix", + "Massnahmenplan mit Priorisierung", + ], + }, + ("report", "incident"): { + "test_procedure": [ + "Prüfung des Meldeprozesses für {object} an zuständige Behörden", + "Verifizierung der Meldefristen (z.B. 72h DSGVO, 24h NIS2)", + "Stichprobe: Meldungen fristgerecht und vollständig", + ], + "evidence": [ + "Meldeprozess mit Fristen und Zuständigkeiten", + "Kopien erfolgter Behördenmeldungen", + "Zeitstempel-Nachweis der Fristwahrung", + ], + }, + ("train", "role"): { + "test_procedure": [ + "Prüfung der Schulungspflicht für {object}", + "Verifizierung der Teilnahme aller betroffenen Personen", + "Stichprobe: Wissensstand durch Kurztest oder Befragung", + ], + "evidence": [ + "Schulungsplan mit Zielgruppen und Frequenz", + "Teilnehmerlisten mit Datum und Unterschrift", + "Testergebnisse oder Teilnahmebestätigungen", + ], + }, + ("restrict_access", "data"): { + "test_procedure": [ + "Prüfung der Zugriffskontrollen für {object}", + "Review der Berechtigungen nach Need-to-Know-Prinzip", + "Stichprobe: Keine überprivilegierten Zugänge", + ], + "evidence": [ + "Berechtigungsmatrix mit Rollen und Datenklassen", + "Zugriffsprotokolle", + "Ergebnis des letzten Access Reviews", + ], + }, + ("restrict_access", "system"): { + "test_procedure": [ + "Prüfung der Zugriffskontrollen für {object}", + "Review der Admin-/Privileged-Zugänge", + "Stichprobe: MFA aktiv, Passwort-Policy eingehalten", + ], + "evidence": [ + "Berechtigungsmatrix", + "Audit-Log privilegierter Zugriffe", + "MFA-Konfigurationsnachweis", + ], + }, + ("encrypt", "data"): { + "test_procedure": [ + "Prüfung der Verschlüsselung von {object} at Rest und in Transit", + "Verifizierung der Algorithmen gegen BSI TR-02102", + "Prüfung des Key-Management-Prozesses", + ], + "evidence": [ + "Kryptographie-Konzept mit Algorithmen und Schlüssellängen", + "TLS-Konfigurationsnachweis", + "Key-Rotation-Protokoll", + ], + }, + ("delete", "data"): { + "test_procedure": [ + "Prüfung des Löschkonzepts für {object}", + "Verifizierung der automatisierten Löschmechanismen", + "Stichprobe: Löschung personenbezogener Daten nach Fristablauf", + ], + "evidence": [ + "Löschkonzept mit Datenklassen und Fristen", + "Löschprotokolle", + "Nachweis der Vernichtung (bei physischen Medien)", + ], + }, + ("retain", "data"): { + "test_procedure": [ + "Prüfung der Aufbewahrungsfristen für {object}", + "Verifizierung der Speicherorte und Zugriffskontrollen", + "Prüfung: Keine Aufbewahrung über gesetzliche Frist hinaus", + ], + "evidence": [ + "Aufbewahrungsrichtlinie mit gesetzlichen Grundlagen", + "Speicherort-Inventar mit Zugriffskonzept", + ], + }, + ("obtain", "data"): { + "test_procedure": [ + "Prüfung des Einwilligungsprozesses für {object}", + "Verifizierung der Einwilligungstexte auf Rechtskonformität", + "Stichprobe: Einwilligung vor Verarbeitungsbeginn eingeholt", + ], + "evidence": [ + "Einwilligungsformulare/-dialoge", + "Consent-Log mit Zeitstempeln", + "Widerrufsprozess-Dokumentation", + ], + }, +} + +_DEFAULT_ACTION_TEMPLATE: dict[str, list[str]] = { + "test_procedure": [ + "Prüfung der Umsetzung von {object}", + "Verifizierung der zugehörigen Dokumentation und Nachweisführung", + ], + "evidence": [ + "Umsetzungsnachweis", + "Zugehörige Dokumentation", + ], +} + + +# ── 6. Title Suffix (action_type → past participle / state) ───────────── + +_ACTION_STATE_SUFFIX: dict[str, str] = { + "define": "definiert und freigegeben", + "document": "dokumentiert", + "maintain": "aktuell gehalten", + "implement": "umgesetzt", + "configure": "konfiguriert", + "monitor": "überwacht", + "review": "überprüft", + "assess": "bewertet", + "audit": "auditiert", + "test": "getestet", + "verify": "verifiziert", + "validate": "validiert", + "report": "gemeldet", + "notify": "benachrichtigt", + "train": "geschult", + "restrict_access": "zugriffsbeschränkt", + "encrypt": "verschlüsselt", + "delete": "gelöscht", + "retain": "aufbewahrt", + "ensure": "sichergestellt", + "approve": "genehmigt", + "remediate": "behoben", + "perform": "durchgeführt", + "obtain": "eingeholt", +} + + +# ── 6b. Pattern Candidates ────────────────────────────────────────────── + +_PATTERN_CANDIDATES_MAP: dict[tuple[str, str], list[str]] = { + ("define", "policy"): ["policy_documented", "policy_approved"], + ("document", "policy"): ["policy_documented"], + ("implement", "technical_control"): ["technical_safeguard_enabled", "security_control_tested"], + ("implement", "policy"): ["policy_implemented", "policy_communicated"], + ("implement", "process"): ["process_established", "process_operational"], + ("monitor", "system"): ["continuous_monitoring_active"], + ("monitor", "incident"): ["incident_detection_active"], + ("review", "policy"): ["policy_review_completed"], + ("review", "risk_artifact"): ["risk_review_completed"], + ("assess", "risk_artifact"): ["risk_assessment_completed"], + ("restrict_access", "data"): ["access_control_enforced"], + ("restrict_access", "system"): ["access_control_enforced", "privilege_management_active"], + ("restrict_access", "access_control"): ["access_control_enforced"], + ("encrypt", "data"): ["encryption_at_rest", "encryption_in_transit"], + ("encrypt", "cryptographic_control"): ["encryption_at_rest", "key_management_active"], + ("train", "role"): ["awareness_training_completed"], + ("train", "training"): ["awareness_training_completed"], + ("report", "incident"): ["incident_reported_timely"], + ("notify", "incident"): ["notification_sent_timely"], + ("delete", "data"): ["data_deletion_completed"], + ("retain", "data"): ["data_retention_enforced"], + ("audit", "system"): ["audit_completed"], + ("test", "technical_control"): ["security_control_tested"], + ("obtain", "consent"): ["consent_obtained"], + ("obtain", "data"): ["consent_obtained"], + ("approve", "policy"): ["policy_approved"], +} + +_PATTERN_CANDIDATES_BY_ACTION: dict[str, list[str]] = { + "define": ["policy_documented"], + "document": ["policy_documented"], + "implement": ["control_implemented"], + "monitor": ["continuous_monitoring_active"], + "review": ["review_completed"], + "assess": ["assessment_completed"], + "audit": ["audit_completed"], + "test": ["security_control_tested"], + "report": ["incident_reported_timely"], + "notify": ["notification_sent_timely"], + "train": ["awareness_training_completed"], + "restrict_access": ["access_control_enforced"], + "encrypt": ["encryption_at_rest"], + "delete": ["data_deletion_completed"], + "retain": ["data_retention_enforced"], + "ensure": ["control_implemented"], + "approve": ["policy_approved"], + "remediate": ["remediation_completed"], + "perform": ["activity_performed"], + "obtain": ["consent_obtained"], + "configure": ["technical_safeguard_enabled"], + "verify": ["verification_completed"], + "validate": ["validation_completed"], + "maintain": ["control_maintained"], +} + + +# ── 6c. Raw Infinitives (for validator Negativregeln) ──────────────────── + +_RAW_INFINITIVES: set[str] = { + "implementieren", "dokumentieren", "definieren", "konfigurieren", + "überwachen", "überprüfen", "auditieren", "testen", "verifizieren", + "validieren", "melden", "benachrichtigen", "schulen", "verschlüsseln", + "löschen", "aufbewahren", "sicherstellen", "gewährleisten", + "genehmigen", "beheben", "durchführen", "einholen", "erstellen", + "festlegen", "bereitstellen", "installieren", "einrichten", + "bewerten", "analysieren", "kontrollieren", "protokollieren", +} + + +# ── 7. Object Normalization (with synonym mapping) ────────────────────── + +_OBJECT_SYNONYMS: dict[str, str] = { + "verzeichnis": "register", + "inventar": "register", + "katalog": "register", + "bestandsaufnahme": "register", + "richtlinie": "policy", + "konzept": "policy", + "strategie": "policy", + "leitlinie": "policy", + "vorgabe": "policy", + "regelung": "policy", + "anweisung": "policy", + "rahmenwerk": "policy", + "sicherheitskonzept": "policy", + "datenschutzkonzept": "policy", + "verfahren": "procedure", + "ablauf": "procedure", + "vorgehensweise": "procedure", + "methodik": "procedure", + "prozedur": "procedure", + "handlungsanweisung": "procedure", + "protokoll": "record", + "aufzeichnung": "record", + "nachweis": "record", + "evidenz": "record", + "vorfall": "incident", + "störung": "incident", + "sicherheitsvorfall": "incident", + "notfall": "incident", + "krise": "incident", + "schwachstelle": "risk_artifact", + "gefährdung": "risk_artifact", + "risikoanalyse": "risk_artifact", + "risikobewertung": "risk_artifact", + "mitarbeiter": "role", + "personal": "role", + "beauftragter": "role", + "verantwortlicher": "role", + "schulung": "training", + "sensibilisierung": "training", + "unterweisung": "training", + "verschlüsselung": "technical_control", + "firewall": "technical_control", + "backup": "technical_control", + "meldung": "report", + "bericht": "report", + "benachrichtigung": "report", + "berechtigung": "access_control", + "authentifizierung": "access_control", + "zugriff": "access_control", + "einwilligung": "consent", + "zustimmung": "consent", +} + + +def _normalize_object(object_raw: str) -> str: + """Normalize object text to a snake_case key for merge hints. + + Applies synonym mapping to collapse German terms to canonical forms + (e.g., 'Richtlinie' -> 'policy', 'Verzeichnis' -> 'register'). + """ + if not object_raw: + return "unknown" + + obj_lower = object_raw.strip().lower() + + # Synonym mapping — find the longest matching synonym + best_match = "" + best_canonical = "" + for synonym, canonical in _OBJECT_SYNONYMS.items(): + if synonym in obj_lower and len(synonym) > len(best_match): + best_match = synonym + best_canonical = canonical + + if best_canonical: + obj_lower = obj_lower.replace(best_match, best_canonical, 1) + + obj = re.sub(r"\s+", "_", obj_lower.strip()) + for src, dst in [("ä", "ae"), ("ö", "oe"), ("ü", "ue"), ("ß", "ss")]: + obj = obj.replace(src, dst) + obj = re.sub(r"[^a-z0-9_]", "", obj) + return obj[:80] or "unknown" + + +# ── 7b. Output Validator (Negativregeln) ───────────────────────────────── + +def _validate_atomic_control( + atomic: "AtomicControlCandidate", + action_type: str, + object_class: str, +) -> list[str]: + """Validate an atomic control against Pflichtfelder + Negativregeln. + + Returns a list of issue strings (ERROR: / WARN:). + Logs warnings but never rejects the control. + """ + issues: list[str] = [] + + # ── Pflichtfelder ────────────────────────────────────── + if not atomic.title.strip(): + issues.append("ERROR: title is empty") + if not atomic.objective.strip(): + issues.append("ERROR: objective is empty") + if not atomic.test_procedure: + issues.append("ERROR: test_procedure is empty") + if not atomic.evidence: + issues.append("ERROR: evidence is empty") + + # ── Negativregeln ────────────────────────────────────── + if len(atomic.title) > 80: + issues.append(f"ERROR: title exceeds 80 chars ({len(atomic.title)})") + + # Detect garbage pattern: "Prüfung der {raw_infinitive}" (leaked action) + for i, tp in enumerate(atomic.test_procedure): + for inf in _RAW_INFINITIVES: + if re.search( + rf"\b(?:der|des|die)\s+{re.escape(inf)}\b", tp, re.IGNORECASE, + ): + issues.append( + f"ERROR: test_procedure[{i}] contains raw infinitive '{inf}'" + ) + break + + for i, ev in enumerate(atomic.evidence): + if not ev.strip(): + issues.append(f"ERROR: evidence[{i}] is empty string") + + # ── Warnregeln ───────────────────────────────────────── + confidence = getattr(atomic, "_decomposition_confidence", None) + if confidence is not None and confidence < 0.5: + issues.append(f"WARN: low confidence ({confidence})") + + if object_class == "general": + issues.append("WARN: object_class is 'general' (unclassified)") + + for issue in issues: + if issue.startswith("ERROR:"): + logger.warning("Validation: %s — title=%s", issue, atomic.title[:60]) + else: + logger.debug("Validation: %s — title=%s", issue, atomic.title[:60]) + + return issues + + +# ── 8. Confidence Scoring ─────────────────────────────────────────────── + +def _score_pass0b_confidence( + action_type: str, + object_class: str, + trigger_q: str, + has_specific_template: bool, +) -> float: + """Score decomposition confidence for a Pass 0b candidate.""" + score = 0.3 # base + if action_type != "default": + score += 0.25 + if object_class != "general": + score += 0.20 + if trigger_q: + score += 0.10 + if has_specific_template: + score += 0.15 + return round(min(score, 1.0), 2) + + +# ── 9. Compose Function ───────────────────────────────────────────────── + + +def _compose_deterministic( + obligation_text: str, + action: str, + object_: str, + parent_title: str, + parent_severity: str, + parent_category: str, + is_test: bool, + is_reporting: bool, + trigger_type: Optional[str] = None, + condition: Optional[str] = None, +) -> "AtomicControlCandidate": + """Compose an atomic control deterministically from obligation data. + + No LLM required. Uses action-type classification, object-class + matching, and trigger-aware templates. Generates: + - Title as '{Object} {state suffix}' + - Statement as '{condition_prefix} {object} ist {trigger} {action}' + - Evidence/test bundles from (action_type, object_class) matrix + - Pattern candidates for downstream categorization + - Merge hint for downstream dedup + - Structured timing (deadline_hours, frequency) + - Confidence score + - Validation issues (Negativregeln) + """ + # Override action type for flagged obligations + if is_test: + action_type = "test" + elif is_reporting: + action_type = "report" + else: + action_type = _classify_action(action) + + object_class = _classify_object(object_) + + # Template lookup: specific combo → action base → default + has_specific = (action_type, object_class) in _SPECIFIC_TEMPLATES + template = ( + _SPECIFIC_TEMPLATES.get((action_type, object_class)) + or _ACTION_TEMPLATES.get(action_type) + or _DEFAULT_ACTION_TEMPLATE + ) + + # Object for template substitution (fallback to parent title) + obj_display = object_.strip() if object_ else parent_title + + # ── Title: "{Object} {Zustand}" ─────────────────────────── + state = _ACTION_STATE_SUFFIX.get(action_type, "umgesetzt") + if object_: + title = f"{object_.strip()} {state}"[:80] + elif action: + title = f"{action.strip().capitalize()} {state}"[:80] + else: + title = f"{parent_title} {state}"[:80] + + # ── Objective = obligation text (the normative statement) ─ + objective = obligation_text.strip()[:2000] + + # ── Requirements = obligation as concrete requirement ───── + requirements = [obligation_text.strip()] if obligation_text else [] + + # ── Test procedure from templates with object substitution + test_procedure = [ + tp.replace("{object}", obj_display) + for tp in template["test_procedure"] + ] + + # ── Trigger qualifier → add timing test step ────────────── + trigger_q = _extract_trigger_qualifier(trigger_type, obligation_text) + if trigger_q and test_procedure: + test_procedure.append( + f"Prüfung der Frist-/Trigger-Einhaltung: {trigger_q}" + ) + + # ── Evidence from templates ─────────────────────────────── + evidence = list(template["evidence"]) + + # ── Merge hint for downstream dedup ─────────────────────── + norm_obj = _normalize_object(object_) + trigger_key = trigger_type or "none" + merge_hint = f"{action_type}:{norm_obj}:{trigger_key}" + + # ── Statement: structured normative sentence ────────────── + condition_prefix = "" + if condition and condition.strip(): + condition_prefix = condition.strip().rstrip(",") + "," + trigger_clause = trigger_q if trigger_q else "" + obj_for_stmt = object_.strip() if object_ else parent_title + if obj_for_stmt: + parts = [p for p in [condition_prefix, obj_for_stmt, "ist", trigger_clause, state] if p] + statement = " ".join(parts) + else: + statement = "" + + # ── Pattern candidates ──────────────────────────────────── + pattern_candidates = _PATTERN_CANDIDATES_MAP.get( + (action_type, object_class), + _PATTERN_CANDIDATES_BY_ACTION.get(action_type, []), + ) + + # ── Structured timing ───────────────────────────────────── + deadline_hours, frequency = _extract_structured_timing(obligation_text) + + # ── Confidence score ────────────────────────────────────── + confidence = _score_pass0b_confidence( + action_type, object_class, trigger_q, has_specific, + ) + + atomic = AtomicControlCandidate( + title=title, + objective=objective, + requirements=requirements, + test_procedure=test_procedure, + evidence=evidence, + severity=_normalize_severity(parent_severity), + category=parent_category or "governance", + ) + # Attach extra metadata (stored in generation_metadata) + atomic.domain = f"{action_type}:{object_class}" + atomic.source_regulation = merge_hint + atomic._decomposition_confidence = confidence # type: ignore[attr-defined] + atomic._statement = statement # type: ignore[attr-defined] + atomic._pattern_candidates = list(pattern_candidates) # type: ignore[attr-defined] + atomic._deadline_hours = deadline_hours # type: ignore[attr-defined] + atomic._frequency = frequency # type: ignore[attr-defined] + + # ── Validate (log issues, never reject) ─────────────────── + validation_issues = _validate_atomic_control(atomic, action_type, object_class) + atomic._validation_issues = validation_issues # type: ignore[attr-defined] + + return atomic + + def _build_pass0b_prompt( obligation_text: str, action: str, object_: str, parent_title: str, parent_category: str, source_ref: str, @@ -845,7 +2119,7 @@ class DecompositionPass: "controls_skipped_empty": 0, "llm_calls": 0, "errors": 0, - "provider": "anthropic" if use_anthropic else "ollama", + "provider": "anthropic" if use_anthropic else "deterministic", "batch_size": batch_size, } @@ -1022,15 +2296,16 @@ class DecompositionPass: Args: limit: Max candidates to process (0 = no limit). - batch_size: Obligations per LLM call (0 = auto). - use_anthropic: Use Anthropic API (True) or Ollama (False). + batch_size: Commit interval (0 = auto). For LLM: API batch size. + use_anthropic: Use Anthropic API (True) or deterministic engine (False). """ if batch_size <= 0: - batch_size = DECOMPOSITION_BATCH_SIZE if use_anthropic else 1 + batch_size = DECOMPOSITION_BATCH_SIZE if use_anthropic else 50 query = """ SELECT oc.id, oc.candidate_id, oc.parent_control_uuid, oc.obligation_text, oc.action, oc.object, + oc.condition, oc.is_test_obligation, oc.is_reporting_obligation, cc.title AS parent_title, cc.category AS parent_category, @@ -1061,7 +2336,7 @@ class DecompositionPass: "llm_failures": 0, "llm_calls": 0, "errors": 0, - "provider": "anthropic" if use_anthropic else "ollama", + "provider": "anthropic" if use_anthropic else "deterministic", "batch_size": batch_size, "dedup_enabled": self._dedup is not None, "dedup_linked": 0, @@ -1079,16 +2354,17 @@ class DecompositionPass: "obligation_text": row[3] or "", "action": row[4] or "", "object": row[5] or "", - "is_test": row[6], - "is_reporting": row[7], - "parent_title": row[8] or "", - "parent_category": row[9] or "", - "parent_citation": row[10] or "", - "parent_severity": row[11] or "medium", - "parent_control_id": row[12] or "", - "source_ref": _format_citation(row[10] or ""), - "trigger_type": row[13] or "continuous", - "is_implementation_specific": row[14] or False, + "condition": row[6] or "", + "is_test": row[7], + "is_reporting": row[8], + "parent_title": row[9] or "", + "parent_category": row[10] or "", + "parent_citation": row[11] or "", + "parent_severity": row[12] or "medium", + "parent_control_id": row[13] or "", + "source_ref": _format_citation(row[11] or ""), + "trigger_type": row[14] or "continuous", + "is_implementation_specific": row[15] or False, }) # Process in batches @@ -1125,22 +2401,25 @@ class DecompositionPass: parsed = _parse_json_object(llm_response) await self._process_pass0b_control(obl, parsed, stats) else: - from compliance.services.obligation_extractor import _llm_ollama - obl = batch[0] - prompt = _build_pass0b_prompt( - obligation_text=obl["obligation_text"], - action=obl["action"], object_=obl["object"], - parent_title=obl["parent_title"], - parent_category=obl["parent_category"], - source_ref=obl["source_ref"], - ) - llm_response = await _llm_ollama( - prompt=prompt, - system_prompt=_PASS0B_SYSTEM_PROMPT, - ) - stats["llm_calls"] += 1 - parsed = _parse_json_object(llm_response) - await self._process_pass0b_control(obl, parsed, stats) + # Deterministic engine — no LLM required + for obl in batch: + sub_actions = _split_compound_action(obl["action"]) + for sub_action in sub_actions: + atomic = _compose_deterministic( + obligation_text=obl["obligation_text"], + action=sub_action, + object_=obl["object"], + parent_title=obl["parent_title"], + parent_severity=obl["parent_severity"], + parent_category=obl["parent_category"], + is_test=obl["is_test"], + is_reporting=obl["is_reporting"], + trigger_type=obl.get("trigger_type"), + condition=obl.get("condition"), + ) + await self._process_pass0b_control( + obl, {}, stats, atomic=atomic, + ) # Commit after each successful sub-batch self.db.commit() @@ -1158,16 +2437,21 @@ class DecompositionPass: async def _process_pass0b_control( self, obl: dict, parsed: dict, stats: dict, + atomic: Optional[AtomicControlCandidate] = None, ) -> None: - """Create atomic control from parsed LLM output or template fallback. + """Create atomic control from deterministic engine, LLM output, or fallback. If dedup is enabled, checks for duplicates before insertion: - LINK: adds parent link to existing control instead of creating new - REVIEW: queues for human review, does not create control - NEW: creates new control and indexes in Qdrant """ - if not parsed or not parsed.get("title"): - atomic = _template_fallback( + if atomic is not None: + # Deterministic engine — atomic already composed + pass + elif not parsed or not parsed.get("title"): + # LLM failed → use deterministic engine as fallback + atomic = _compose_deterministic( obligation_text=obl["obligation_text"], action=obl["action"], object_=obl["object"], parent_title=obl["parent_title"], @@ -1175,6 +2459,7 @@ class DecompositionPass: parent_category=obl["parent_category"], is_test=obl["is_test"], is_reporting=obl["is_reporting"], + condition=obl.get("condition"), ) stats["llm_failures"] += 1 else: @@ -1559,6 +2844,17 @@ class DecompositionPass: "gen_meta": json.dumps({ "decomposition_source": candidate_id, "decomposition_method": "pass0b", + "engine_version": "v2", + "action_object_class": getattr(atomic, "domain", ""), + "merge_group_hint": atomic.source_regulation or "", + "decomposition_confidence": getattr( + atomic, "_decomposition_confidence", None + ), + "statement": getattr(atomic, "_statement", ""), + "pattern_candidates": getattr(atomic, "_pattern_candidates", []), + "deadline_hours": getattr(atomic, "_deadline_hours", None), + "frequency": getattr(atomic, "_frequency", None), + "validation_issues": getattr(atomic, "_validation_issues", []), }), "framework_id": "14b1bdd2-abc7-4a43-adae-14471ee5c7cf", }, @@ -2094,31 +3390,4 @@ def _normalize_severity(val: str) -> str: return "medium" -def _template_fallback( - obligation_text: str, action: str, object_: str, - parent_title: str, parent_severity: str, parent_category: str, - is_test: bool, is_reporting: bool, -) -> AtomicControlCandidate: - """Create an atomic control candidate from template when LLM fails.""" - if is_test: - title = f"Test: {object_[:60]}" if object_ else f"Test: {action[:60]}" - test_proc = [f"Prüfung der {object_ or action}"] - evidence = ["Testprotokoll", "Prüfbericht"] - elif is_reporting: - title = f"Meldepflicht: {object_[:60]}" if object_ else f"Meldung: {action[:60]}" - test_proc = ["Prüfung des Meldeprozesses", "Stichprobe gemeldeter Vorfälle"] - evidence = ["Meldeprozess-Dokumentation", "Meldeformulare"] - else: - title = f"{action.capitalize()}: {object_[:60]}" if object_ else parent_title[:80] - test_proc = [f"Prüfung der {action}"] - evidence = ["Dokumentation", "Konfigurationsnachweis"] - - return AtomicControlCandidate( - title=title[:200], - objective=obligation_text[:2000], - requirements=[obligation_text] if obligation_text else [], - test_procedure=test_proc, - evidence=evidence, - severity=_normalize_severity(parent_severity), - category=parent_category, - ) +# _template_fallback removed — replaced by _compose_deterministic engine diff --git a/backend-compliance/tests/test_decomposition_pass.py b/backend-compliance/tests/test_decomposition_pass.py index d2fcc94..d90bf36 100644 --- a/backend-compliance/tests/test_decomposition_pass.py +++ b/backend-compliance/tests/test_decomposition_pass.py @@ -9,7 +9,7 @@ Covers: - _parse_json_array / _parse_json_object - _format_field / _format_citation - _normalize_severity -- _template_fallback +- _compose_deterministic / _classify_action - _build_pass0a_prompt / _build_pass0b_prompt - DecompositionPass.run_pass0a (mocked LLM + DB) - DecompositionPass.run_pass0b (mocked LLM + DB) @@ -40,7 +40,11 @@ from compliance.services.decomposition_pass import ( _format_citation, _compute_extraction_confidence, _normalize_severity, - _template_fallback, + _compose_deterministic, + _classify_action, + _classify_object, + _split_compound_action, + _extract_trigger_qualifier, _fallback_obligation, _build_pass0a_prompt, _build_pass0b_prompt, @@ -53,6 +57,11 @@ from compliance.services.decomposition_pass import ( _is_implementation_specific_text, _text_similar, _is_more_implementation_specific, + _extract_structured_timing, + _normalize_object, + _validate_atomic_control, + _PATTERN_CANDIDATES_MAP, + _PATTERN_CANDIDATES_BY_ACTION, ) @@ -495,11 +504,98 @@ class TestNormalizeSeverity: assert _normalize_severity(None) == "medium" -class TestTemplateFallback: - """Tests for _template_fallback.""" +class TestClassifyAction: + """Tests for _classify_action.""" - def test_normal_obligation(self): - ac = _template_fallback( + def test_simple_document_action(self): + assert _classify_action("dokumentieren") == "document" + + def test_simple_implement_action(self): + assert _classify_action("implementieren") == "implement" + + def test_compound_action_picks_highest_priority(self): + # "erstellen" → document, "implementieren" → implement + # implement has higher priority + assert _classify_action("erstellen und implementieren") == "implement" + + def test_maintain_action(self): + assert _classify_action("aktuell halten") == "maintain" + assert _classify_action("pflegen") == "maintain" + + def test_ensure_action(self): + assert _classify_action("sicherstellen") == "ensure" + assert _classify_action("gewährleisten") == "ensure" + + def test_reporting_action(self): + assert _classify_action("melden") == "report" + assert _classify_action("informieren") == "notify" + + def test_empty_action(self): + assert _classify_action("") == "default" + + def test_unknown_action(self): + assert _classify_action("xyzzy") == "default" + + def test_access_action(self): + assert _classify_action("beschränken") == "restrict_access" + assert _classify_action("autorisieren") == "restrict_access" + + def test_encrypt_action(self): + assert _classify_action("verschlüsseln") == "encrypt" + + def test_english_fallback(self): + assert _classify_action("implement") == "implement" + assert _classify_action("monitor") == "monitor" + + def test_aufbewahren(self): + assert _classify_action("aufbewahren") == "retain" + + def test_beifuegen(self): + assert _classify_action("beifügen") == "document" + + def test_angeben(self): + assert _classify_action("angeben") == "document" + + def test_review_vs_monitor(self): + """review and monitor are now separate types.""" + assert _classify_action("überprüfen") == "review" + assert _classify_action("überwachen") == "monitor" + + def test_verify_vs_validate(self): + """verify and validate are separate types.""" + assert _classify_action("verifizieren") == "verify" + assert _classify_action("validieren") == "validate" + + def test_define_vs_document(self): + """define and document are separate types.""" + assert _classify_action("definieren") == "define" + assert _classify_action("festlegen") == "define" + assert _classify_action("dokumentieren") == "document" + + def test_approve_action(self): + assert _classify_action("genehmigen") == "approve" + assert _classify_action("freigeben") == "approve" + assert _classify_action("zulassen") == "approve" + + def test_remediate_action(self): + assert _classify_action("beheben") == "remediate" + assert _classify_action("korrigieren") == "remediate" + assert _classify_action("beseitigen") == "remediate" + + def test_process_object_class(self): + assert _classify_object("Geschäftsprozess") == "process" + assert _classify_object("Managementprozess") == "process" + + def test_consent_object_class(self): + assert _classify_object("Einwilligung") == "consent" + assert _classify_object("Consent-Management") == "consent" + + +class TestComposeDeterministic: + """Tests for _compose_deterministic engine.""" + + def test_implement_obligation(self): + ac = _compose_deterministic( obligation_text="Betreiber müssen MFA implementieren", action="implementieren", object_="MFA", @@ -509,12 +605,49 @@ class TestTemplateFallback: is_test=False, is_reporting=False, ) - assert "Implementieren" in ac.title + assert ac.title == "MFA umgesetzt" assert ac.severity == "high" assert len(ac.requirements) == 1 + assert len(ac.test_procedure) == 3 + assert "technischen Konfiguration" in ac.test_procedure[0] + assert "Funktionstest" in ac.test_procedure[1] + assert "Konfigurationsnachweis" in ac.evidence[0] - def test_test_obligation(self): - ac = _template_fallback( + def test_document_obligation(self): + ac = _compose_deterministic( + obligation_text="Unternehmen müssen Sicherheitsrichtlinie erstellen", + action="erstellen", + object_="Sicherheitsrichtlinie", + parent_title="Security Policy", + parent_severity="medium", + parent_category="governance", + is_test=False, + is_reporting=False, + ) + assert ac.title == "Sicherheitsrichtlinie dokumentiert" + assert "dokumentiert und aktuell" in ac.test_procedure[0] + assert "Vollständigkeit" in ac.test_procedure[1] + + def test_compound_action_uses_implement_template(self): + """'erstellen und implementieren' should use implement template.""" + ac = _compose_deterministic( + obligation_text="Wartungsrichtlinie erstellen und implementieren", + action="erstellen und implementieren", + object_="Wartungsrichtlinie", + parent_title="Maintenance", + parent_severity="high", + parent_category="operations", + is_test=False, + is_reporting=False, + ) + assert ac.title == "Wartungsrichtlinie umgesetzt" + assert "umgesetzt" in ac.test_procedure[0] + # Must NOT contain "Prüfung der erstellen und implementieren" + for tp in ac.test_procedure: + assert "erstellen und implementieren" not in tp + + def test_test_obligation_overrides_type(self): + ac = _compose_deterministic( obligation_text="MFA muss regelmäßig getestet werden", action="testen", object_="MFA-Wirksamkeit", @@ -524,11 +657,11 @@ class TestTemplateFallback: is_test=True, is_reporting=False, ) - assert "Test:" in ac.title - assert "Testprotokoll" in ac.evidence + assert "Testpläne" in ac.test_procedure[0] + assert "Testprotokoll" in ac.evidence[0] - def test_reporting_obligation(self): - ac = _template_fallback( + def test_reporting_obligation_overrides_type(self): + ac = _compose_deterministic( obligation_text="Behörden sind über Vorfälle zu informieren", action="informieren", object_="zuständige Behörden", @@ -538,8 +671,383 @@ class TestTemplateFallback: is_test=False, is_reporting=True, ) - assert "Meldepflicht:" in ac.title - assert "Meldeprozess-Dokumentation" in ac.evidence + assert "Meldeprozess" in ac.test_procedure[0] + assert "Meldeprozess-Dokumentation" in ac.evidence[0] + + def test_no_action_uses_default(self): + ac = _compose_deterministic( + obligation_text="Allgemeine Pflicht", + action="", + object_="Datenschutzkonzept", + parent_title="Privacy", + parent_severity="medium", + parent_category="privacy", + is_test=False, + is_reporting=False, + ) + assert ac.title == "Datenschutzkonzept umgesetzt" + assert len(ac.test_procedure) >= 2 + + def test_no_object_uses_parent_title(self): + ac = _compose_deterministic( + obligation_text="System muss gesichert werden", + action="absichern", + object_="", + parent_title="System Security", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert ac.title == "Absichern umgesetzt" + # Object placeholder should use parent_title + assert "System Security" in ac.test_procedure[0] + + def test_severity_inherited(self): + ac = _compose_deterministic( + obligation_text="Kritische Pflicht", + action="implementieren", + object_="Firewall", + parent_title="Net", + parent_severity="critical", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert ac.severity == "critical" + + def test_category_inherited(self): + ac = _compose_deterministic( + obligation_text="Pflicht", + action="dokumentieren", + object_="X", + parent_title="Y", + parent_severity="low", + parent_category="privacy", + is_test=False, + is_reporting=False, + ) + assert ac.category == "privacy" + + def test_empty_category_defaults_to_governance(self): + ac = _compose_deterministic( + obligation_text="Pflicht", + action="dokumentieren", + object_="X", + parent_title="Y", + parent_severity="low", + parent_category="", + is_test=False, + is_reporting=False, + ) + assert ac.category == "governance" + + +# --------------------------------------------------------------------------- +# GAP 1: STATEMENT FIELD TESTS +# --------------------------------------------------------------------------- + + +class TestStatementField: + """Tests for the statement field in _compose_deterministic.""" + + def test_statement_with_condition_and_trigger(self): + ac = _compose_deterministic( + obligation_text="Bei Vorfall müssen Behörden innerhalb von 72 Stunden informiert werden", + action="informieren", + object_="zuständige Behörden", + parent_title="Incident Reporting", + parent_severity="high", + parent_category="governance", + is_test=False, + is_reporting=True, + trigger_type="event", + condition="bei Sicherheitsvorfall", + ) + assert "bei Sicherheitsvorfall," in ac._statement + assert "zuständige Behörden" in ac._statement + assert "ist" in ac._statement + + def test_statement_without_condition(self): + ac = _compose_deterministic( + obligation_text="Richtlinie muss dokumentiert werden", + action="dokumentieren", + object_="Sicherheitsrichtlinie", + parent_title="Policy", + parent_severity="medium", + parent_category="governance", + is_test=False, + is_reporting=False, + ) + assert ac._statement.startswith("Sicherheitsrichtlinie ist") + assert "dokumentiert" in ac._statement + + def test_statement_without_trigger(self): + ac = _compose_deterministic( + obligation_text="MFA implementieren", + action="implementieren", + object_="MFA", + parent_title="Auth", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + trigger_type="continuous", + ) + assert "MFA ist umgesetzt" == ac._statement + + def test_statement_empty_object_uses_parent(self): + ac = _compose_deterministic( + obligation_text="Absichern", + action="absichern", + object_="", + parent_title="System Security", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert "System Security" in ac._statement + + +# --------------------------------------------------------------------------- +# GAP 2: PATTERN CANDIDATES TESTS +# --------------------------------------------------------------------------- + + +class TestPatternCandidates: + """Tests for pattern_candidates in _compose_deterministic.""" + + def test_specific_combo_returns_candidates(self): + ac = _compose_deterministic( + obligation_text="Verschlüsselung implementieren", + action="implementieren", + object_="Verschlüsselung", + parent_title="Crypto", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + # implement + technical_control → specific combo + assert "technical_safeguard_enabled" in ac._pattern_candidates + + def test_fallback_by_action(self): + ac = _compose_deterministic( + obligation_text="XYZ bewerten", + action="bewerten", + object_="Spezialthema", + parent_title="X", + parent_severity="medium", + parent_category="governance", + is_test=False, + is_reporting=False, + ) + # assess + general → no specific combo, uses action fallback + assert "assessment_completed" in ac._pattern_candidates + + def test_unknown_combo_returns_action_fallback(self): + ac = _compose_deterministic( + obligation_text="Pflicht", + action="", + object_="", + parent_title="Y", + parent_severity="low", + parent_category="governance", + is_test=False, + is_reporting=False, + ) + # default action → no pattern candidates + assert ac._pattern_candidates == [] + + def test_encrypt_data_gets_encryption_patterns(self): + ac = _compose_deterministic( + obligation_text="Daten verschlüsseln", + action="verschlüsseln", + object_="personenbezogene Daten", + parent_title="Crypto", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert "encryption_at_rest" in ac._pattern_candidates + assert "encryption_in_transit" in ac._pattern_candidates + + +# --------------------------------------------------------------------------- +# GAP 3: STRUCTURED TIMING TESTS +# --------------------------------------------------------------------------- + + +class TestStructuredTiming: + """Tests for _extract_structured_timing and fields on atomic controls.""" + + def test_72_stunden_deadline(self): + hours, freq = _extract_structured_timing("innerhalb von 72 Stunden melden") + assert hours == 72 + assert freq is None + + def test_unverzueglich_deadline(self): + hours, freq = _extract_structured_timing("unverzüglich melden") + assert hours == 0 + assert freq is None + + def test_yearly_frequency(self): + hours, freq = _extract_structured_timing("jährliche Überprüfung") + assert hours is None + assert freq == "yearly" + + def test_monthly_frequency(self): + hours, freq = _extract_structured_timing("monatliche Kontrolle") + assert hours is None + assert freq == "monthly" + + def test_quarterly_frequency(self): + hours, freq = _extract_structured_timing("quartalsweise Berichterstattung") + assert hours is None + assert freq == "quarterly" + + def test_before_deployment(self): + hours, freq = _extract_structured_timing("vor Inbetriebnahme prüfen") + assert hours is None + assert freq == "before_deployment" + + def test_no_timing_returns_none(self): + hours, freq = _extract_structured_timing("MFA implementieren") + assert hours is None + assert freq is None + + def test_timing_stored_on_atomic(self): + ac = _compose_deterministic( + obligation_text="Jährliche Überprüfung der Sicherheitsrichtlinie", + action="überprüfen", + object_="Sicherheitsrichtlinie", + parent_title="Review", + parent_severity="medium", + parent_category="governance", + is_test=False, + is_reporting=False, + trigger_type="periodic", + ) + assert ac._frequency == "yearly" + assert ac._deadline_hours is None + + +# --------------------------------------------------------------------------- +# GAP 4: OBJECT NORMALIZATION (SYNONYMS) TESTS +# --------------------------------------------------------------------------- + + +class TestObjectNormalization: + """Tests for synonym-enhanced _normalize_object.""" + + def test_richtlinie_to_policy(self): + result = _normalize_object("Sicherheitsrichtlinie") + assert "policy" in result + + def test_verzeichnis_to_register(self): + result = _normalize_object("Verzeichnis der Verarbeitungstätigkeiten") + assert "register" in result + + def test_vorfall_to_incident(self): + result = _normalize_object("Sicherheitsvorfall") + assert "incident" in result + + def test_einwilligung_to_consent(self): + result = _normalize_object("Einwilligung der Betroffenen") + assert "consent" in result + + def test_no_synonym_preserves_text(self): + result = _normalize_object("MFA") + assert result == "mfa" + + def test_empty_returns_unknown(self): + assert _normalize_object("") == "unknown" + + def test_umlaut_normalization(self): + result = _normalize_object("Prüfbericht") + assert "ue" in result + assert "ä" not in result + + +# --------------------------------------------------------------------------- +# GAP 5: OUTPUT VALIDATOR TESTS +# --------------------------------------------------------------------------- + + +class TestOutputValidator: + """Tests for _validate_atomic_control.""" + + def test_clean_control_passes(self): + ac = _compose_deterministic( + obligation_text="MFA implementieren", + action="implementieren", + object_="MFA", + parent_title="Auth", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + errors = [i for i in ac._validation_issues if i.startswith("ERROR:")] + assert len(errors) == 0 + + def test_empty_title_flagged(self): + ac = AtomicControlCandidate(title="", objective="x", test_procedure=["tp"], evidence=["ev"]) + issues = _validate_atomic_control(ac, "implement", "general") + assert any("title is empty" in i for i in issues) + + def test_empty_objective_flagged(self): + ac = AtomicControlCandidate(title="OK", objective="", test_procedure=["tp"], evidence=["ev"]) + issues = _validate_atomic_control(ac, "implement", "general") + assert any("objective is empty" in i for i in issues) + + def test_empty_test_procedure_flagged(self): + ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=[], evidence=["ev"]) + issues = _validate_atomic_control(ac, "implement", "general") + assert any("test_procedure is empty" in i for i in issues) + + def test_empty_evidence_flagged(self): + ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=[]) + issues = _validate_atomic_control(ac, "implement", "general") + assert any("evidence is empty" in i for i in issues) + + def test_general_class_warns(self): + ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=["ev"]) + issues = _validate_atomic_control(ac, "implement", "general") + assert any("general" in i for i in issues) + + def test_low_confidence_warns(self): + ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=["ev"]) + ac._decomposition_confidence = 0.3 + issues = _validate_atomic_control(ac, "default", "general") + assert any("low confidence" in i for i in issues) + + def test_empty_evidence_item_flagged(self): + ac = AtomicControlCandidate(title="OK", objective="x", test_procedure=["tp"], evidence=["", "ok"]) + issues = _validate_atomic_control(ac, "implement", "policy") + assert any("evidence[0] is empty" in i for i in issues) + + def test_garbage_infinitive_detected(self): + """'Prüfung der implementieren' pattern must be flagged.""" + ac = AtomicControlCandidate( + title="OK", objective="x", + test_procedure=["Prüfung der implementieren und dokumentieren"], + evidence=["ev"], + ) + issues = _validate_atomic_control(ac, "implement", "policy") + assert any("raw infinitive" in i for i in issues) + + def test_valid_infinitive_not_flagged(self): + """'Funktionstest: Wirksamkeit verifizieren' is valid German.""" + ac = AtomicControlCandidate( + title="OK", objective="x", + test_procedure=["Funktionstest: Wirksamkeit verifizieren"], + evidence=["ev"], + ) + issues = _validate_atomic_control(ac, "implement", "policy") + assert not any("raw infinitive" in i for i in issues) # --------------------------------------------------------------------------- @@ -757,6 +1265,7 @@ class TestDecompositionPassRun0b: "oc-uuid-1", "OC-CTRL-001-01", "parent-uuid-1", "Betreiber müssen Kontinuität sicherstellen", "sicherstellen", "Dienstleistungskontinuität", + "", # condition False, False, # is_test, is_reporting "Service Continuity", "finance", '{"source": "MiCA", "article": "Art. 8"}', @@ -802,7 +1311,8 @@ class TestDecompositionPassRun0b: assert stats["llm_failures"] == 0 @pytest.mark.asyncio - async def test_pass0b_template_fallback(self): + async def test_pass0b_deterministic_engine(self): + """Deterministic mode (use_anthropic=False) uses engine, no LLM.""" mock_db = MagicMock() mock_rows = MagicMock() @@ -811,6 +1321,7 @@ class TestDecompositionPassRun0b: "oc-uuid-1", "OC-CTRL-001-01", "parent-uuid-1", "Betreiber müssen MFA implementieren", "implementieren", "MFA", + "", # condition False, False, "Auth Controls", "authentication", "", "high", "AUTH-001", @@ -821,6 +1332,9 @@ class TestDecompositionPassRun0b: mock_seq = MagicMock() mock_seq.fetchone.return_value = (0,) + mock_insert = MagicMock() + mock_insert.fetchone.return_value = ("new-uuid-1",) + call_count = [0] def side_effect(*args, **kwargs): call_count[0] += 1 @@ -828,18 +1342,19 @@ class TestDecompositionPassRun0b: return mock_rows if call_count[0] == 2: return mock_seq + if call_count[0] == 3: + return mock_insert # INSERT RETURNING return MagicMock() mock_db.execute.side_effect = side_effect - with patch("compliance.services.obligation_extractor._llm_ollama", new_callable=AsyncMock) as mock_llm: - mock_llm.return_value = "Sorry, invalid response" # LLM fails + # No LLM mock needed — deterministic engine + decomp = DecompositionPass(db=mock_db) + stats = await decomp.run_pass0b(limit=10) - decomp = DecompositionPass(db=mock_db) - stats = await decomp.run_pass0b(limit=10) - - assert stats["controls_created"] == 1 - assert stats["llm_failures"] == 1 + assert stats["controls_created"] == 1 + assert stats["provider"] == "deterministic" + assert stats["llm_calls"] == 0 class TestDecompositionStatus: @@ -1098,12 +1613,14 @@ class TestDecompositionPassAnthropicBatch: mock_rows.fetchall.return_value = [ ("oc-uuid-1", "OC-CTRL-001-01", "parent-uuid-1", "MFA implementieren", "implementieren", "MFA", + "", # condition False, False, "Auth", "security", '{"source": "DSGVO", "article": "Art. 32"}', "high", "CTRL-001", "continuous", False), ("oc-uuid-2", "OC-CTRL-001-02", "parent-uuid-1", "MFA testen", "testen", "MFA", + "", # condition True, False, "Auth", "security", '{"source": "DSGVO", "article": "Art. 32"}', "high", "CTRL-001",