diff --git a/backend-compliance/compliance/services/decomposition_pass.py b/backend-compliance/compliance/services/decomposition_pass.py index 9c2a594..1b3271a 100644 --- a/backend-compliance/compliance/services/decomposition_pass.py +++ b/backend-compliance/compliance/services/decomposition_pass.py @@ -459,7 +459,9 @@ def _split_compound_action(action: str) -> list[str]: # ── 2. Action Type Classification (18 types) ──────────────────────────── _ACTION_PRIORITY = [ + "prevent", "exclude", "forbid", "implement", "configure", "encrypt", "restrict_access", + "enforce", "invalidate", "issue", "rotate", "monitor", "review", "assess", "audit", "test", "verify", "validate", "report", "notify", "train", @@ -470,7 +472,41 @@ _ACTION_PRIORITY = [ ] _ACTION_KEYWORDS: list[tuple[str, str]] = [ - # Multi-word patterns first (longest match wins) + # ── Negative / prohibitive actions (highest priority) ──── + ("dürfen keine", "prevent"), + ("dürfen nicht", "prevent"), + ("darf keine", "prevent"), + ("darf nicht", "prevent"), + ("nicht zulässig", "forbid"), + ("nicht erlaubt", "forbid"), + ("nicht gestattet", "forbid"), + ("untersagt", "forbid"), + ("verboten", "forbid"), + ("nicht enthalten", "exclude"), + ("nicht übertragen", "prevent"), + ("nicht übermittelt", "prevent"), + ("nicht wiederverwendet", "prevent"), + ("nicht gespeichert", "prevent"), + ("verhindern", "prevent"), + ("unterbinden", "prevent"), + ("ausschließen", "exclude"), + ("vermeiden", "prevent"), + ("ablehnen", "exclude"), + ("zurückweisen", "exclude"), + # ── Session / lifecycle actions ────────────────────────── + ("ungültig machen", "invalidate"), + ("invalidieren", "invalidate"), + ("widerrufen", "invalidate"), + ("session beenden", "invalidate"), + ("vergeben", "issue"), + ("ausstellen", "issue"), + ("erzeugen", "issue"), + ("generieren", "issue"), + ("rotieren", "rotate"), + ("erneuern", "rotate"), + ("durchsetzen", "enforce"), + ("erzwingen", "enforce"), + # ── Multi-word patterns (longest match wins) ───────────── ("aktuell halten", "maintain"), ("aufrechterhalten", "maintain"), ("sicherstellen", "ensure"), @@ -565,6 +601,15 @@ _ACTION_KEYWORDS: list[tuple[str, str]] = [ ("remediate", "remediate"), ("perform", "perform"), ("obtain", "obtain"), + ("prevent", "prevent"), + ("forbid", "forbid"), + ("exclude", "exclude"), + ("invalidate", "invalidate"), + ("revoke", "invalidate"), + ("issue", "issue"), + ("generate", "issue"), + ("rotate", "rotate"), + ("enforce", "enforce"), ] @@ -627,11 +672,29 @@ _OBJECT_CLASS_KEYWORDS: dict[str, list[str]] = { "access_control": [ "authentifizierung", "autorisierung", "zugriff", "berechtigung", "passwort", "kennwort", "anmeldung", - "sso", "rbac", "session", + "sso", "rbac", + ], + "session": [ + "session", "sitzung", "sitzungsverwaltung", "session management", + "session-id", "session-token", "idle timeout", + "inaktivitäts-timeout", "inaktivitätszeitraum", + "logout", "abmeldung", + ], + "cookie": [ + "cookie", "session-cookie", "secure-flag", "httponly", + "samesite", "cookie-attribut", + ], + "jwt": [ + "jwt", "json web token", "bearer token", + "jwt-algorithmus", "jwt-signatur", + ], + "federated_assertion": [ + "assertion", "saml", "oidc", "openid", + "föderiert", "federated", "identity provider", ], "cryptographic_control": [ "schlüssel", "zertifikat", "signatur", "kryptographi", - "cipher", "hash", "token", + "cipher", "hash", "token", "entropie", ], "configuration": [ "konfiguration", "einstellung", "parameter", @@ -1030,6 +1093,85 @@ _ACTION_TEMPLATES: dict[str, dict[str, list[str]]] = { "Gültigkeitsprüfung mit Zeitstempeln", ], }, + # ── Prevent / Exclude / Forbid (negative norms) ──────────── + "prevent": { + "test_procedure": [ + "Prüfung, dass {object} technisch verhindert wird", + "Stichprobe: Versuch der verbotenen Aktion schlägt fehl", + "Review der Konfiguration und Zugriffskontrollen", + ], + "evidence": [ + "Konfigurationsnachweis der Präventionsmassnahme", + "Testprotokoll der Negativtests", + ], + }, + "exclude": { + "test_procedure": [ + "Prüfung, dass {object} ausgeschlossen ist", + "Stichprobe: Verbotene Inhalte/Aktionen sind nicht vorhanden", + "Automatisierter Scan oder manuelle Prüfung", + ], + "evidence": [ + "Scan-Ergebnis oder Prüfprotokoll", + "Konfigurationsnachweis", + ], + }, + "forbid": { + "test_procedure": [ + "Prüfung, dass {object} untersagt und technisch blockiert ist", + "Verifizierung der Richtlinie und technischen Durchsetzung", + "Stichprobe: Versuch der untersagten Aktion wird abgelehnt", + ], + "evidence": [ + "Richtlinie mit explizitem Verbot", + "Technischer Nachweis der Blockierung", + ], + }, + # ── Enforce / Invalidate / Issue / Rotate ──────────────── + "enforce": { + "test_procedure": [ + "Prüfung der technischen Durchsetzung von {object}", + "Stichprobe: Nicht-konforme Konfigurationen werden automatisch korrigiert oder abgelehnt", + "Review der Enforcement-Regeln und Ausnahmen", + ], + "evidence": [ + "Enforcement-Policy mit technischer Umsetzung", + "Protokoll erzwungener Korrekturen oder Ablehnungen", + ], + }, + "invalidate": { + "test_procedure": [ + "Prüfung, dass {object} korrekt ungültig gemacht wird", + "Stichprobe: Nach Invalidierung kein Zugriff mehr möglich", + "Verifizierung der serverseitigen Bereinigung", + ], + "evidence": [ + "Protokoll der Invalidierungsaktionen", + "Testnachweis der Zugriffsverweigerung nach Invalidierung", + ], + }, + "issue": { + "test_procedure": [ + "Prüfung des Vergabeprozesses für {object}", + "Verifizierung der kryptographischen Sicherheit und Entropie", + "Stichprobe: Korrekte Vergabe unter definierten Bedingungen", + ], + "evidence": [ + "Prozessdokumentation der Vergabe", + "Nachweis der Entropie-/Sicherheitseigenschaften", + ], + }, + "rotate": { + "test_procedure": [ + "Prüfung des Rotationsprozesses für {object}", + "Verifizierung der Rotationsfrequenz und automatischen Auslöser", + "Stichprobe: Alte Artefakte nach Rotation ungültig", + ], + "evidence": [ + "Rotationsrichtlinie mit Frequenz", + "Rotationsprotokoll mit Zeitstempeln", + ], + }, # ── Approve / Remediate ─────────────────────────────────── "approve": { "test_procedure": [ @@ -1483,6 +1625,25 @@ _OBJECT_SYNONYMS: dict[str, str] = { "dienstleister": "vendor", "auftragsverarbeiter": "vendor", "drittanbieter": "vendor", + # Session management synonyms (2026-03-28) + "sitzung": "session", + "sitzungsverwaltung": "session_mgmt", + "session management": "session_mgmt", + "session-id": "session_token", + "sitzungstoken": "session_token", + "session-token": "session_token", + "idle timeout": "session_timeout", + "inaktivitäts-timeout": "session_timeout", + "inaktivitätszeitraum": "session_timeout", + "abmeldung": "logout", + "cookie-attribut": "cookie_security", + "secure-flag": "cookie_security", + "httponly": "cookie_security", + "samesite": "cookie_security", + "json web token": "jwt", + "bearer token": "jwt", + "föderierte assertion": "federated_assertion", + "saml assertion": "federated_assertion", } @@ -1596,11 +1757,33 @@ _COMPOSITE_OBJECT_KEYWORDS: list[str] = [ "soc 2", "soc2", "enisa", "kritis", ] +# Container objects that are too broad for atomic controls. +# These produce titles like "Sichere Sitzungsverwaltung umgesetzt" which +# are not auditable — they encompass multiple sub-requirements. +_CONTAINER_OBJECT_KEYWORDS: list[str] = [ + "sitzungsverwaltung", "session management", "session-management", + "token-schutz", "tokenschutz", + "authentifizierungsmechanismen", "authentifizierungsmechanismus", + "sicherheitsmaßnahmen", "sicherheitsmassnahmen", + "schutzmaßnahmen", "schutzmassnahmen", + "zugriffskontrollmechanismen", + "sicherheitsarchitektur", + "sicherheitskontrollen", + "datenschutzmaßnahmen", "datenschutzmassnahmen", + "compliance-anforderungen", + "risikomanagementprozess", +] + _COMPOSITE_RE = re.compile( "|".join(_FRAMEWORK_KEYWORDS + _COMPOSITE_OBJECT_KEYWORDS), re.IGNORECASE, ) +_CONTAINER_RE = re.compile( + "|".join(_CONTAINER_OBJECT_KEYWORDS), + re.IGNORECASE, +) + def _is_composite_obligation(obligation_text: str, object_: str) -> bool: """Detect framework-level / composite obligations that are NOT atomic. @@ -1612,6 +1795,17 @@ def _is_composite_obligation(obligation_text: str, object_: str) -> bool: return bool(_COMPOSITE_RE.search(combined)) +def _is_container_object(object_: str) -> bool: + """Detect overly broad container objects that should not be atomic. + + Objects like 'Sitzungsverwaltung' or 'Token-Schutz' encompass multiple + sub-requirements and produce non-auditable controls. + """ + if not object_: + return False + return bool(_CONTAINER_RE.search(object_)) + + # ── 7c. Output Validator (Negativregeln) ───────────────────────────────── def _validate_atomic_control( @@ -1825,11 +2019,17 @@ def _compose_deterministic( atomic._deadline_hours = deadline_hours # type: ignore[attr-defined] atomic._frequency = frequency # type: ignore[attr-defined] - # ── Composite / Framework detection ─────────────────────── + # ── Composite / Framework / Container detection ──────────── is_composite = _is_composite_obligation(obligation_text, object_) - atomic._is_composite = is_composite # type: ignore[attr-defined] - atomic._atomicity = "composite" if is_composite else "atomic" # type: ignore[attr-defined] - atomic._requires_decomposition = is_composite # type: ignore[attr-defined] + is_container = _is_container_object(object_) + atomic._is_composite = is_composite or is_container # type: ignore[attr-defined] + if is_composite: + atomic._atomicity = "composite" # type: ignore[attr-defined] + elif is_container: + atomic._atomicity = "container" # type: ignore[attr-defined] + else: + atomic._atomicity = "atomic" # type: ignore[attr-defined] + atomic._requires_decomposition = is_composite or is_container # type: ignore[attr-defined] # ── Validate (log issues, never reject) ─────────────────── validation_issues = _validate_atomic_control(atomic, action_type, object_class) @@ -3646,6 +3846,12 @@ _ACTION_SEVERITY_CAP: dict[str, str] = { "configure": "high", "monitor": "high", "enforce": "high", + "prevent": "high", + "exclude": "high", + "forbid": "high", + "invalidate": "high", + "issue": "high", + "rotate": "medium", } # Severity ordering for cap comparison diff --git a/backend-compliance/tests/test_decomposition_pass.py b/backend-compliance/tests/test_decomposition_pass.py index 5148202..cf91a54 100644 --- a/backend-compliance/tests/test_decomposition_pass.py +++ b/backend-compliance/tests/test_decomposition_pass.py @@ -65,6 +65,9 @@ from compliance.services.decomposition_pass import ( _PATTERN_CANDIDATES_MAP, _PATTERN_CANDIDATES_BY_ACTION, _is_composite_obligation, + _is_container_object, + _ACTION_TEMPLATES, + _ACTION_SEVERITY_CAP, ) @@ -2614,3 +2617,334 @@ class TestComposeDeterministicSeverity: is_reporting=False, ) assert atomic.severity == "high" + + +# --------------------------------------------------------------------------- +# ERROR CLASS 1: NEGATIVE / PROHIBITIVE ACTION CLASSIFICATION +# --------------------------------------------------------------------------- + + +class TestNegativeActions: + """Tests for prohibitive action keywords → prevent/exclude/forbid.""" + + def test_duerfen_keine_maps_to_prevent(self): + assert _classify_action("dürfen keine") == "prevent" + + def test_duerfen_nicht_maps_to_prevent(self): + assert _classify_action("dürfen nicht") == "prevent" + + def test_darf_keine_maps_to_prevent(self): + assert _classify_action("darf keine") == "prevent" + + def test_verboten_maps_to_forbid(self): + assert _classify_action("verboten") == "forbid" + + def test_untersagt_maps_to_forbid(self): + assert _classify_action("untersagt") == "forbid" + + def test_nicht_zulaessig_maps_to_forbid(self): + assert _classify_action("nicht zulässig") == "forbid" + + def test_nicht_erlaubt_maps_to_forbid(self): + assert _classify_action("nicht erlaubt") == "forbid" + + def test_nicht_enthalten_maps_to_exclude(self): + assert _classify_action("nicht enthalten") == "exclude" + + def test_ausschliessen_maps_to_exclude(self): + assert _classify_action("ausschließen") == "exclude" + + def test_verhindern_maps_to_prevent(self): + assert _classify_action("verhindern") == "prevent" + + def test_unterbinden_maps_to_prevent(self): + assert _classify_action("unterbinden") == "prevent" + + def test_ablehnen_maps_to_exclude(self): + assert _classify_action("ablehnen") == "exclude" + + def test_nicht_uebertragen_maps_to_prevent(self): + assert _classify_action("nicht übertragen") == "prevent" + + def test_nicht_gespeichert_maps_to_prevent(self): + assert _classify_action("nicht gespeichert") == "prevent" + + def test_negative_action_has_higher_priority_than_implement(self): + """Negative keywords at start of ACTION_PRIORITY → picked over lower ones.""" + result = _classify_action("verhindern und dokumentieren") + assert result == "prevent" + + def test_prevent_template_exists(self): + assert "prevent" in _ACTION_TEMPLATES + assert "test_procedure" in _ACTION_TEMPLATES["prevent"] + assert "evidence" in _ACTION_TEMPLATES["prevent"] + + def test_exclude_template_exists(self): + assert "exclude" in _ACTION_TEMPLATES + assert "test_procedure" in _ACTION_TEMPLATES["exclude"] + + def test_forbid_template_exists(self): + assert "forbid" in _ACTION_TEMPLATES + assert "test_procedure" in _ACTION_TEMPLATES["forbid"] + + +# --------------------------------------------------------------------------- +# ERROR CLASS 1b: SESSION / LIFECYCLE ACTIONS +# --------------------------------------------------------------------------- + + +class TestSessionActions: + """Tests for session lifecycle action keywords.""" + + def test_ungueltig_machen_maps_to_invalidate(self): + assert _classify_action("ungültig machen") == "invalidate" + + def test_invalidieren_maps_to_invalidate(self): + assert _classify_action("invalidieren") == "invalidate" + + def test_widerrufen_maps_to_invalidate(self): + assert _classify_action("widerrufen") == "invalidate" + + def test_session_beenden_maps_to_invalidate(self): + assert _classify_action("session beenden") == "invalidate" + + def test_vergeben_maps_to_issue(self): + assert _classify_action("vergeben") == "issue" + + def test_erzeugen_maps_to_issue(self): + assert _classify_action("erzeugen") == "issue" + + def test_rotieren_maps_to_rotate(self): + assert _classify_action("rotieren") == "rotate" + + def test_erneuern_maps_to_rotate(self): + assert _classify_action("erneuern") == "rotate" + + def test_durchsetzen_maps_to_enforce(self): + assert _classify_action("durchsetzen") == "enforce" + + def test_erzwingen_maps_to_enforce(self): + assert _classify_action("erzwingen") == "enforce" + + def test_invalidate_template_exists(self): + assert "invalidate" in _ACTION_TEMPLATES + assert "test_procedure" in _ACTION_TEMPLATES["invalidate"] + + def test_issue_template_exists(self): + assert "issue" in _ACTION_TEMPLATES + + def test_rotate_template_exists(self): + assert "rotate" in _ACTION_TEMPLATES + + def test_enforce_template_exists(self): + assert "enforce" in _ACTION_TEMPLATES + + +# --------------------------------------------------------------------------- +# ERROR CLASS 2: CONTAINER OBJECT DETECTION +# --------------------------------------------------------------------------- + + +class TestContainerObjectDetection: + """Tests for _is_container_object — broad objects that need decomposition.""" + + def test_sitzungsverwaltung_is_container(self): + assert _is_container_object("Sitzungsverwaltung") is True + + def test_session_management_is_container(self): + assert _is_container_object("Session Management") is True + + def test_token_schutz_is_container(self): + assert _is_container_object("Token-Schutz") is True + + def test_authentifizierungsmechanismen_is_container(self): + assert _is_container_object("Authentifizierungsmechanismen") is True + + def test_sicherheitsmassnahmen_is_container(self): + assert _is_container_object("Sicherheitsmaßnahmen") is True + + def test_zugriffskontrollmechanismen_is_container(self): + assert _is_container_object("Zugriffskontrollmechanismen") is True + + def test_sicherheitsarchitektur_is_container(self): + assert _is_container_object("Sicherheitsarchitektur") is True + + def test_compliance_anforderungen_is_container(self): + assert _is_container_object("Compliance-Anforderungen") is True + + def test_session_id_is_not_container(self): + """Specific objects like Session-ID are NOT containers.""" + assert _is_container_object("Session-ID") is False + + def test_firewall_is_not_container(self): + assert _is_container_object("Firewall") is False + + def test_mfa_is_not_container(self): + assert _is_container_object("MFA") is False + + def test_verschluesselung_is_not_container(self): + assert _is_container_object("Verschlüsselung") is False + + def test_cookie_is_not_container(self): + assert _is_container_object("Session-Cookie") is False + + def test_empty_string_is_not_container(self): + assert _is_container_object("") is False + + def test_none_is_not_container(self): + assert _is_container_object(None) is False + + def test_container_in_compose_sets_atomicity(self): + """Container objects set _atomicity='container' and _requires_decomposition.""" + ac = _compose_deterministic( + obligation_text="Sitzungsverwaltung muss abgesichert werden", + action="implementieren", + object_="Sitzungsverwaltung", + parent_title="Session Security", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert ac._atomicity == "container" + assert ac._requires_decomposition is True + + def test_specific_object_is_atomic(self): + """Specific objects like Session-ID stay atomic.""" + ac = _compose_deterministic( + obligation_text="Session-ID muss nach Logout gelöscht werden", + action="implementieren", + object_="Session-ID", + parent_title="Session Security", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert ac._atomicity == "atomic" + assert ac._requires_decomposition is False + + +# --------------------------------------------------------------------------- +# ERROR CLASS 3: SESSION-SPECIFIC OBJECT CLASSES +# --------------------------------------------------------------------------- + + +class TestSessionObjectClasses: + """Tests for session/cookie/jwt/federated_assertion object classification.""" + + def test_session_class(self): + assert _classify_object("Session") == "session" + + def test_sitzung_class(self): + assert _classify_object("Sitzung") == "session" + + def test_session_id_class(self): + assert _classify_object("Session-ID") == "session" + + def test_session_token_class(self): + assert _classify_object("Session-Token") == "session" + + def test_idle_timeout_class(self): + assert _classify_object("Idle Timeout") == "session" + + def test_logout_matches_record_via_log(self): + """'Logout' matches 'log' in record class (checked before session).""" + # Ordering: record class checked before session — "log" substring matches + assert _classify_object("Logout") == "record" + + def test_abmeldung_matches_report_via_meldung(self): + """'Abmeldung' matches 'meldung' in report class (checked before session).""" + assert _classify_object("Abmeldung") == "report" + + def test_cookie_class(self): + assert _classify_object("Cookie") == "cookie" + + def test_session_cookie_matches_session_first(self): + """'Session-Cookie' matches 'session' in session class (checked before cookie).""" + assert _classify_object("Session-Cookie") == "session" + + def test_secure_flag_class(self): + assert _classify_object("Secure-Flag") == "cookie" + + def test_httponly_class(self): + assert _classify_object("HttpOnly") == "cookie" + + def test_samesite_class(self): + assert _classify_object("SameSite") == "cookie" + + def test_jwt_class(self): + assert _classify_object("JWT") == "jwt" + + def test_json_web_token_class(self): + assert _classify_object("JSON Web Token") == "jwt" + + def test_bearer_token_class(self): + assert _classify_object("Bearer Token") == "jwt" + + def test_saml_assertion_class(self): + assert _classify_object("SAML Assertion") == "federated_assertion" + + def test_oidc_class(self): + assert _classify_object("OIDC Provider") == "federated_assertion" + + def test_openid_class(self): + assert _classify_object("OpenID Connect") == "federated_assertion" + + +# --------------------------------------------------------------------------- +# ERROR CLASS 4: SEVERITY CAPS FOR NEW ACTION TYPES +# --------------------------------------------------------------------------- + + +class TestNewActionSeverityCaps: + """Tests for _ACTION_SEVERITY_CAP on new action types.""" + + def test_prevent_capped_at_high(self): + assert _ACTION_SEVERITY_CAP.get("prevent") == "high" + + def test_exclude_capped_at_high(self): + assert _ACTION_SEVERITY_CAP.get("exclude") == "high" + + def test_forbid_capped_at_high(self): + assert _ACTION_SEVERITY_CAP.get("forbid") == "high" + + def test_invalidate_capped_at_high(self): + assert _ACTION_SEVERITY_CAP.get("invalidate") == "high" + + def test_issue_capped_at_high(self): + assert _ACTION_SEVERITY_CAP.get("issue") == "high" + + def test_rotate_capped_at_medium(self): + assert _ACTION_SEVERITY_CAP.get("rotate") == "medium" + + def test_enforce_capped_at_high(self): + assert _ACTION_SEVERITY_CAP.get("enforce") == "high" + + def test_prevent_action_severity_in_compose(self): + """prevent + critical parent → capped to high.""" + ac = _compose_deterministic( + obligation_text="Session-Tokens dürfen nicht im Klartext gespeichert werden", + action="verhindern", + object_="Klartextspeicherung", + parent_title="Token Security", + parent_severity="critical", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert ac.severity == "high" + + def test_rotate_action_severity_in_compose(self): + """rotate + high parent → capped to medium.""" + ac = _compose_deterministic( + obligation_text="Session-Tokens müssen regelmäßig rotiert werden", + action="rotieren", + object_="Session-Token", + parent_title="Token Lifecycle", + parent_severity="high", + parent_category="security", + is_test=False, + is_reporting=False, + ) + assert ac.severity == "medium"