Compare commits
4 Commits
fb2cf29b34
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
712fa8cb74 | ||
|
|
447ec08509 | ||
|
|
8cb1dc1108 | ||
|
|
f8d9919b97 |
@@ -459,7 +459,9 @@ def _split_compound_action(action: str) -> list[str]:
|
||||
# ── 2. Action Type Classification (18 types) ────────────────────────────
|
||||
|
||||
_ACTION_PRIORITY = [
|
||||
"prevent", "exclude", "forbid",
|
||||
"implement", "configure", "encrypt", "restrict_access",
|
||||
"enforce", "invalidate", "issue", "rotate",
|
||||
"monitor", "review", "assess", "audit",
|
||||
"test", "verify", "validate",
|
||||
"report", "notify", "train",
|
||||
@@ -470,7 +472,41 @@ _ACTION_PRIORITY = [
|
||||
]
|
||||
|
||||
_ACTION_KEYWORDS: list[tuple[str, str]] = [
|
||||
# Multi-word patterns first (longest match wins)
|
||||
# ── Negative / prohibitive actions (highest priority) ────
|
||||
("dürfen keine", "prevent"),
|
||||
("dürfen nicht", "prevent"),
|
||||
("darf keine", "prevent"),
|
||||
("darf nicht", "prevent"),
|
||||
("nicht zulässig", "forbid"),
|
||||
("nicht erlaubt", "forbid"),
|
||||
("nicht gestattet", "forbid"),
|
||||
("untersagt", "forbid"),
|
||||
("verboten", "forbid"),
|
||||
("nicht enthalten", "exclude"),
|
||||
("nicht übertragen", "prevent"),
|
||||
("nicht übermittelt", "prevent"),
|
||||
("nicht wiederverwendet", "prevent"),
|
||||
("nicht gespeichert", "prevent"),
|
||||
("verhindern", "prevent"),
|
||||
("unterbinden", "prevent"),
|
||||
("ausschließen", "exclude"),
|
||||
("vermeiden", "prevent"),
|
||||
("ablehnen", "exclude"),
|
||||
("zurückweisen", "exclude"),
|
||||
# ── Session / lifecycle actions ──────────────────────────
|
||||
("ungültig machen", "invalidate"),
|
||||
("invalidieren", "invalidate"),
|
||||
("widerrufen", "invalidate"),
|
||||
("session beenden", "invalidate"),
|
||||
("vergeben", "issue"),
|
||||
("ausstellen", "issue"),
|
||||
("erzeugen", "issue"),
|
||||
("generieren", "issue"),
|
||||
("rotieren", "rotate"),
|
||||
("erneuern", "rotate"),
|
||||
("durchsetzen", "enforce"),
|
||||
("erzwingen", "enforce"),
|
||||
# ── Multi-word patterns (longest match wins) ─────────────
|
||||
("aktuell halten", "maintain"),
|
||||
("aufrechterhalten", "maintain"),
|
||||
("sicherstellen", "ensure"),
|
||||
@@ -565,6 +601,15 @@ _ACTION_KEYWORDS: list[tuple[str, str]] = [
|
||||
("remediate", "remediate"),
|
||||
("perform", "perform"),
|
||||
("obtain", "obtain"),
|
||||
("prevent", "prevent"),
|
||||
("forbid", "forbid"),
|
||||
("exclude", "exclude"),
|
||||
("invalidate", "invalidate"),
|
||||
("revoke", "invalidate"),
|
||||
("issue", "issue"),
|
||||
("generate", "issue"),
|
||||
("rotate", "rotate"),
|
||||
("enforce", "enforce"),
|
||||
]
|
||||
|
||||
|
||||
@@ -627,11 +672,29 @@ _OBJECT_CLASS_KEYWORDS: dict[str, list[str]] = {
|
||||
"access_control": [
|
||||
"authentifizierung", "autorisierung", "zugriff",
|
||||
"berechtigung", "passwort", "kennwort", "anmeldung",
|
||||
"sso", "rbac", "session",
|
||||
"sso", "rbac",
|
||||
],
|
||||
"session": [
|
||||
"session", "sitzung", "sitzungsverwaltung", "session management",
|
||||
"session-id", "session-token", "idle timeout",
|
||||
"inaktivitäts-timeout", "inaktivitätszeitraum",
|
||||
"logout", "abmeldung",
|
||||
],
|
||||
"cookie": [
|
||||
"cookie", "session-cookie", "secure-flag", "httponly",
|
||||
"samesite", "cookie-attribut",
|
||||
],
|
||||
"jwt": [
|
||||
"jwt", "json web token", "bearer token",
|
||||
"jwt-algorithmus", "jwt-signatur",
|
||||
],
|
||||
"federated_assertion": [
|
||||
"assertion", "saml", "oidc", "openid",
|
||||
"föderiert", "federated", "identity provider",
|
||||
],
|
||||
"cryptographic_control": [
|
||||
"schlüssel", "zertifikat", "signatur", "kryptographi",
|
||||
"cipher", "hash", "token",
|
||||
"cipher", "hash", "token", "entropie",
|
||||
],
|
||||
"configuration": [
|
||||
"konfiguration", "einstellung", "parameter",
|
||||
@@ -1030,6 +1093,85 @@ _ACTION_TEMPLATES: dict[str, dict[str, list[str]]] = {
|
||||
"Gültigkeitsprüfung mit Zeitstempeln",
|
||||
],
|
||||
},
|
||||
# ── Prevent / Exclude / Forbid (negative norms) ────────────
|
||||
"prevent": {
|
||||
"test_procedure": [
|
||||
"Prüfung, dass {object} technisch verhindert wird",
|
||||
"Stichprobe: Versuch der verbotenen Aktion schlägt fehl",
|
||||
"Review der Konfiguration und Zugriffskontrollen",
|
||||
],
|
||||
"evidence": [
|
||||
"Konfigurationsnachweis der Präventionsmassnahme",
|
||||
"Testprotokoll der Negativtests",
|
||||
],
|
||||
},
|
||||
"exclude": {
|
||||
"test_procedure": [
|
||||
"Prüfung, dass {object} ausgeschlossen ist",
|
||||
"Stichprobe: Verbotene Inhalte/Aktionen sind nicht vorhanden",
|
||||
"Automatisierter Scan oder manuelle Prüfung",
|
||||
],
|
||||
"evidence": [
|
||||
"Scan-Ergebnis oder Prüfprotokoll",
|
||||
"Konfigurationsnachweis",
|
||||
],
|
||||
},
|
||||
"forbid": {
|
||||
"test_procedure": [
|
||||
"Prüfung, dass {object} untersagt und technisch blockiert ist",
|
||||
"Verifizierung der Richtlinie und technischen Durchsetzung",
|
||||
"Stichprobe: Versuch der untersagten Aktion wird abgelehnt",
|
||||
],
|
||||
"evidence": [
|
||||
"Richtlinie mit explizitem Verbot",
|
||||
"Technischer Nachweis der Blockierung",
|
||||
],
|
||||
},
|
||||
# ── Enforce / Invalidate / Issue / Rotate ────────────────
|
||||
"enforce": {
|
||||
"test_procedure": [
|
||||
"Prüfung der technischen Durchsetzung von {object}",
|
||||
"Stichprobe: Nicht-konforme Konfigurationen werden automatisch korrigiert oder abgelehnt",
|
||||
"Review der Enforcement-Regeln und Ausnahmen",
|
||||
],
|
||||
"evidence": [
|
||||
"Enforcement-Policy mit technischer Umsetzung",
|
||||
"Protokoll erzwungener Korrekturen oder Ablehnungen",
|
||||
],
|
||||
},
|
||||
"invalidate": {
|
||||
"test_procedure": [
|
||||
"Prüfung, dass {object} korrekt ungültig gemacht wird",
|
||||
"Stichprobe: Nach Invalidierung kein Zugriff mehr möglich",
|
||||
"Verifizierung der serverseitigen Bereinigung",
|
||||
],
|
||||
"evidence": [
|
||||
"Protokoll der Invalidierungsaktionen",
|
||||
"Testnachweis der Zugriffsverweigerung nach Invalidierung",
|
||||
],
|
||||
},
|
||||
"issue": {
|
||||
"test_procedure": [
|
||||
"Prüfung des Vergabeprozesses für {object}",
|
||||
"Verifizierung der kryptographischen Sicherheit und Entropie",
|
||||
"Stichprobe: Korrekte Vergabe unter definierten Bedingungen",
|
||||
],
|
||||
"evidence": [
|
||||
"Prozessdokumentation der Vergabe",
|
||||
"Nachweis der Entropie-/Sicherheitseigenschaften",
|
||||
],
|
||||
},
|
||||
"rotate": {
|
||||
"test_procedure": [
|
||||
"Prüfung des Rotationsprozesses für {object}",
|
||||
"Verifizierung der Rotationsfrequenz und automatischen Auslöser",
|
||||
"Stichprobe: Alte Artefakte nach Rotation ungültig",
|
||||
],
|
||||
"evidence": [
|
||||
"Rotationsrichtlinie mit Frequenz",
|
||||
"Rotationsprotokoll mit Zeitstempeln",
|
||||
],
|
||||
},
|
||||
# ── Approve / Remediate ───────────────────────────────────
|
||||
"approve": {
|
||||
"test_procedure": [
|
||||
@@ -1415,6 +1557,93 @@ _OBJECT_SYNONYMS: dict[str, str] = {
|
||||
"zugriff": "access_control",
|
||||
"einwilligung": "consent",
|
||||
"zustimmung": "consent",
|
||||
# Near-synonym expansions found via heavy-control analysis (2026-03-28)
|
||||
"erkennung": "detection",
|
||||
"früherkennung": "detection",
|
||||
"frühzeitige erkennung": "detection",
|
||||
"frühzeitigen erkennung": "detection",
|
||||
"detektion": "detection",
|
||||
"eskalation": "escalation",
|
||||
"eskalationsprozess": "escalation",
|
||||
"eskalationsverfahren": "escalation",
|
||||
"benachrichtigungsprozess": "notification",
|
||||
"benachrichtigungsverfahren": "notification",
|
||||
"meldeprozess": "notification",
|
||||
"meldeverfahren": "notification",
|
||||
"meldesystem": "notification",
|
||||
"benachrichtigungssystem": "notification",
|
||||
"überwachung": "monitoring",
|
||||
"monitoring": "monitoring",
|
||||
"kontinuierliche überwachung": "monitoring",
|
||||
"laufende überwachung": "monitoring",
|
||||
"prüfung": "audit",
|
||||
"überprüfung": "audit",
|
||||
"kontrolle": "control_check",
|
||||
"sicherheitskontrolle": "control_check",
|
||||
"dokumentation": "documentation",
|
||||
"aufzeichnungspflicht": "documentation",
|
||||
"protokollierung": "logging",
|
||||
"logführung": "logging",
|
||||
"logmanagement": "logging",
|
||||
"wiederherstellung": "recovery",
|
||||
"notfallwiederherstellung": "recovery",
|
||||
"disaster recovery": "recovery",
|
||||
"notfallplan": "contingency_plan",
|
||||
"notfallplanung": "contingency_plan",
|
||||
"wiederanlaufplan": "contingency_plan",
|
||||
"klassifizierung": "classification",
|
||||
"kategorisierung": "classification",
|
||||
"einstufung": "classification",
|
||||
"segmentierung": "segmentation",
|
||||
"netzwerksegmentierung": "segmentation",
|
||||
"netzwerk-segmentierung": "segmentation",
|
||||
"trennung": "segmentation",
|
||||
"isolierung": "isolation",
|
||||
"patch": "patch_mgmt",
|
||||
"patchmanagement": "patch_mgmt",
|
||||
"patch-management": "patch_mgmt",
|
||||
"aktualisierung": "patch_mgmt",
|
||||
"softwareaktualisierung": "patch_mgmt",
|
||||
"härtung": "hardening",
|
||||
"systemhärtung": "hardening",
|
||||
"härtungsmaßnahme": "hardening",
|
||||
"löschung": "deletion",
|
||||
"datenlöschung": "deletion",
|
||||
"löschkonzept": "deletion",
|
||||
"anonymisierung": "anonymization",
|
||||
"pseudonymisierung": "pseudonymization",
|
||||
"zugangssteuerung": "access_control",
|
||||
"zugangskontrolle": "access_control",
|
||||
"zugriffssteuerung": "access_control",
|
||||
"zugriffskontrolle": "access_control",
|
||||
"schlüsselmanagement": "key_mgmt",
|
||||
"schlüsselverwaltung": "key_mgmt",
|
||||
"key management": "key_mgmt",
|
||||
"zertifikatsverwaltung": "cert_mgmt",
|
||||
"zertifikatsmanagement": "cert_mgmt",
|
||||
"lieferant": "vendor",
|
||||
"dienstleister": "vendor",
|
||||
"auftragsverarbeiter": "vendor",
|
||||
"drittanbieter": "vendor",
|
||||
# Session management synonyms (2026-03-28)
|
||||
"sitzung": "session",
|
||||
"sitzungsverwaltung": "session_mgmt",
|
||||
"session management": "session_mgmt",
|
||||
"session-id": "session_token",
|
||||
"sitzungstoken": "session_token",
|
||||
"session-token": "session_token",
|
||||
"idle timeout": "session_timeout",
|
||||
"inaktivitäts-timeout": "session_timeout",
|
||||
"inaktivitätszeitraum": "session_timeout",
|
||||
"abmeldung": "logout",
|
||||
"cookie-attribut": "cookie_security",
|
||||
"secure-flag": "cookie_security",
|
||||
"httponly": "cookie_security",
|
||||
"samesite": "cookie_security",
|
||||
"json web token": "jwt",
|
||||
"bearer token": "jwt",
|
||||
"föderierte assertion": "federated_assertion",
|
||||
"saml assertion": "federated_assertion",
|
||||
}
|
||||
|
||||
|
||||
@@ -1435,12 +1664,20 @@ def _normalize_object(object_raw: str) -> str:
|
||||
|
||||
Applies synonym mapping to collapse German terms to canonical forms
|
||||
(e.g., 'Richtlinie' -> 'policy', 'Verzeichnis' -> 'register').
|
||||
Then strips qualifying prepositional phrases that would create
|
||||
near-duplicate keys (e.g., 'bei Schwellenwertüberschreitung').
|
||||
Truncates to 40 chars to collapse overly specific variants.
|
||||
"""
|
||||
if not object_raw:
|
||||
return "unknown"
|
||||
|
||||
obj_lower = object_raw.strip().lower()
|
||||
|
||||
# Strip qualifying prepositional phrases that don't change core identity.
|
||||
# These create near-duplicate keys like "eskalationsprozess" vs
|
||||
# "eskalationsprozess bei schwellenwertüberschreitung".
|
||||
obj_lower = _QUALIFYING_PHRASE_RE.sub("", obj_lower).strip()
|
||||
|
||||
# Synonym mapping — find the longest matching synonym
|
||||
best_match = ""
|
||||
best_canonical = ""
|
||||
@@ -1456,7 +1693,54 @@ def _normalize_object(object_raw: str) -> str:
|
||||
for src, dst in [("ä", "ae"), ("ö", "oe"), ("ü", "ue"), ("ß", "ss")]:
|
||||
obj = obj.replace(src, dst)
|
||||
obj = re.sub(r"[^a-z0-9_]", "", obj)
|
||||
return obj[:80] or "unknown"
|
||||
|
||||
# Strip trailing noise tokens (articles/prepositions stuck at the end)
|
||||
obj = re.sub(r"(_(?:der|die|das|des|dem|den|fuer|bei|von|zur|zum|mit|auf|in|und|oder|aus|an|ueber|nach|gegen|unter|vor|zwischen|als|durch|ohne|wie))+$", "", obj)
|
||||
|
||||
# Truncate at 40 chars (at underscore boundary) to collapse
|
||||
# overly specific suffixes that create near-duplicate keys.
|
||||
obj = _truncate_at_boundary(obj, 40)
|
||||
|
||||
return obj or "unknown"
|
||||
|
||||
|
||||
# Regex to strip German qualifying prepositional phrases from object text.
|
||||
# Matches patterns like "bei schwellenwertüberschreitung",
|
||||
# "für kritische systeme", "gemäß artikel 32" etc.
|
||||
_QUALIFYING_PHRASE_RE = re.compile(
|
||||
r"\s+(?:"
|
||||
r"bei\s+\w+"
|
||||
r"|für\s+(?:die\s+|den\s+|das\s+|kritische\s+)?\w+"
|
||||
r"|gemäß\s+\w+"
|
||||
r"|nach\s+\w+"
|
||||
r"|von\s+\w+"
|
||||
r"|im\s+(?:falle?\s+|rahmen\s+)?\w+"
|
||||
r"|mit\s+(?:den\s+|der\s+|dem\s+)?\w+"
|
||||
r"|auf\s+(?:basis|grundlage)\s+\w+"
|
||||
r"|zur\s+(?:einhaltung|sicherstellung|gewährleistung|vermeidung|erfüllung)\s*\w*"
|
||||
r"|durch\s+(?:den\s+|die\s+|das\s+)?\w+"
|
||||
r"|über\s+(?:den\s+|die\s+|das\s+)?\w+"
|
||||
r"|unter\s+\w+"
|
||||
r"|zwischen\s+\w+"
|
||||
r"|innerhalb\s+\w+"
|
||||
r"|gegenüber\s+\w+"
|
||||
r"|hinsichtlich\s+\w+"
|
||||
r"|bezüglich\s+\w+"
|
||||
r"|einschließlich\s+\w+"
|
||||
r").*$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _truncate_at_boundary(text: str, max_len: int) -> str:
|
||||
"""Truncate text at the last underscore boundary within max_len."""
|
||||
if len(text) <= max_len:
|
||||
return text
|
||||
truncated = text[:max_len]
|
||||
last_sep = truncated.rfind("_")
|
||||
if last_sep > max_len // 2:
|
||||
return truncated[:last_sep]
|
||||
return truncated
|
||||
|
||||
|
||||
# ── 7b. Framework / Composite Detection ──────────────────────────────────
|
||||
@@ -1473,11 +1757,33 @@ _COMPOSITE_OBJECT_KEYWORDS: list[str] = [
|
||||
"soc 2", "soc2", "enisa", "kritis",
|
||||
]
|
||||
|
||||
# Container objects that are too broad for atomic controls.
|
||||
# These produce titles like "Sichere Sitzungsverwaltung umgesetzt" which
|
||||
# are not auditable — they encompass multiple sub-requirements.
|
||||
_CONTAINER_OBJECT_KEYWORDS: list[str] = [
|
||||
"sitzungsverwaltung", "session management", "session-management",
|
||||
"token-schutz", "tokenschutz",
|
||||
"authentifizierungsmechanismen", "authentifizierungsmechanismus",
|
||||
"sicherheitsmaßnahmen", "sicherheitsmassnahmen",
|
||||
"schutzmaßnahmen", "schutzmassnahmen",
|
||||
"zugriffskontrollmechanismen",
|
||||
"sicherheitsarchitektur",
|
||||
"sicherheitskontrollen",
|
||||
"datenschutzmaßnahmen", "datenschutzmassnahmen",
|
||||
"compliance-anforderungen",
|
||||
"risikomanagementprozess",
|
||||
]
|
||||
|
||||
_COMPOSITE_RE = re.compile(
|
||||
"|".join(_FRAMEWORK_KEYWORDS + _COMPOSITE_OBJECT_KEYWORDS),
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
_CONTAINER_RE = re.compile(
|
||||
"|".join(_CONTAINER_OBJECT_KEYWORDS),
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _is_composite_obligation(obligation_text: str, object_: str) -> bool:
|
||||
"""Detect framework-level / composite obligations that are NOT atomic.
|
||||
@@ -1489,6 +1795,17 @@ def _is_composite_obligation(obligation_text: str, object_: str) -> bool:
|
||||
return bool(_COMPOSITE_RE.search(combined))
|
||||
|
||||
|
||||
def _is_container_object(object_: str) -> bool:
|
||||
"""Detect overly broad container objects that should not be atomic.
|
||||
|
||||
Objects like 'Sitzungsverwaltung' or 'Token-Schutz' encompass multiple
|
||||
sub-requirements and produce non-auditable controls.
|
||||
"""
|
||||
if not object_:
|
||||
return False
|
||||
return bool(_CONTAINER_RE.search(object_))
|
||||
|
||||
|
||||
# ── 7c. Output Validator (Negativregeln) ─────────────────────────────────
|
||||
|
||||
def _validate_atomic_control(
|
||||
@@ -1702,11 +2019,17 @@ def _compose_deterministic(
|
||||
atomic._deadline_hours = deadline_hours # type: ignore[attr-defined]
|
||||
atomic._frequency = frequency # type: ignore[attr-defined]
|
||||
|
||||
# ── Composite / Framework detection ───────────────────────
|
||||
# ── Composite / Framework / Container detection ────────────
|
||||
is_composite = _is_composite_obligation(obligation_text, object_)
|
||||
atomic._is_composite = is_composite # type: ignore[attr-defined]
|
||||
atomic._atomicity = "composite" if is_composite else "atomic" # type: ignore[attr-defined]
|
||||
atomic._requires_decomposition = is_composite # type: ignore[attr-defined]
|
||||
is_container = _is_container_object(object_)
|
||||
atomic._is_composite = is_composite or is_container # type: ignore[attr-defined]
|
||||
if is_composite:
|
||||
atomic._atomicity = "composite" # type: ignore[attr-defined]
|
||||
elif is_container:
|
||||
atomic._atomicity = "container" # type: ignore[attr-defined]
|
||||
else:
|
||||
atomic._atomicity = "atomic" # type: ignore[attr-defined]
|
||||
atomic._requires_decomposition = is_composite or is_container # type: ignore[attr-defined]
|
||||
|
||||
# ── Validate (log issues, never reject) ───────────────────
|
||||
validation_issues = _validate_atomic_control(atomic, action_type, object_class)
|
||||
@@ -2327,6 +2650,7 @@ class DecompositionPass:
|
||||
SELECT 1 FROM canonical_controls ac
|
||||
WHERE ac.parent_control_uuid = oc.parent_control_uuid
|
||||
AND ac.decomposition_method = 'pass0b'
|
||||
AND ac.release_state NOT IN ('deprecated', 'duplicate')
|
||||
AND ac.title LIKE '%' || LEFT(oc.action, 20) || '%'
|
||||
)
|
||||
"""
|
||||
@@ -2902,7 +3226,7 @@ class DecompositionPass:
|
||||
SELECT id::text FROM canonical_controls
|
||||
WHERE parent_control_uuid = CAST(:parent AS uuid)
|
||||
AND generation_metadata->>'merge_group_hint' = :hint
|
||||
AND release_state != 'rejected'
|
||||
AND release_state NOT IN ('rejected', 'deprecated', 'duplicate')
|
||||
LIMIT 1
|
||||
"""),
|
||||
{"parent": parent_uuid, "hint": merge_hint},
|
||||
@@ -3168,6 +3492,7 @@ class DecompositionPass:
|
||||
SELECT 1 FROM canonical_controls ac
|
||||
WHERE ac.parent_control_uuid = oc.parent_control_uuid
|
||||
AND ac.decomposition_method = 'pass0b'
|
||||
AND ac.release_state NOT IN ('deprecated', 'duplicate')
|
||||
AND ac.title LIKE '%' || LEFT(oc.action, 20) || '%'
|
||||
)
|
||||
"""
|
||||
@@ -3521,6 +3846,12 @@ _ACTION_SEVERITY_CAP: dict[str, str] = {
|
||||
"configure": "high",
|
||||
"monitor": "high",
|
||||
"enforce": "high",
|
||||
"prevent": "high",
|
||||
"exclude": "high",
|
||||
"forbid": "high",
|
||||
"invalidate": "high",
|
||||
"issue": "high",
|
||||
"rotate": "medium",
|
||||
}
|
||||
|
||||
# Severity ordering for cap comparison
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-- Widen source_article and source_regulation to TEXT to handle long NIST references
|
||||
-- e.g. "SC-22 (und weitere redaktionelle Änderungen SC-7, SC-14, SC-17, ...)"
|
||||
ALTER TABLE control_parent_links ALTER COLUMN source_article TYPE TEXT;
|
||||
ALTER TABLE control_parent_links ALTER COLUMN source_regulation TYPE TEXT;
|
||||
@@ -65,6 +65,9 @@ from compliance.services.decomposition_pass import (
|
||||
_PATTERN_CANDIDATES_MAP,
|
||||
_PATTERN_CANDIDATES_BY_ACTION,
|
||||
_is_composite_obligation,
|
||||
_is_container_object,
|
||||
_ACTION_TEMPLATES,
|
||||
_ACTION_SEVERITY_CAP,
|
||||
)
|
||||
|
||||
|
||||
@@ -974,6 +977,76 @@ class TestObjectNormalization:
|
||||
assert "ue" in result
|
||||
assert "ä" not in result
|
||||
|
||||
# --- New tests for improved normalization (2026-03-28) ---
|
||||
|
||||
def test_qualifying_phrase_stripped(self):
|
||||
"""Prepositional qualifiers like 'bei X' are stripped."""
|
||||
base = _normalize_object("Eskalationsprozess")
|
||||
qualified = _normalize_object(
|
||||
"Eskalationsprozess bei Schwellenwertüberschreitung"
|
||||
)
|
||||
assert base == qualified
|
||||
|
||||
def test_fuer_phrase_stripped(self):
|
||||
"""'für kritische Systeme' qualifier is stripped."""
|
||||
base = _normalize_object("Backup-Verfahren")
|
||||
qualified = _normalize_object("Backup-Verfahren für kritische Systeme")
|
||||
assert base == qualified
|
||||
|
||||
def test_gemaess_phrase_stripped(self):
|
||||
"""'gemäß Artikel 32' qualifier is stripped."""
|
||||
base = _normalize_object("Verschlüsselung")
|
||||
qualified = _normalize_object("Verschlüsselung gemäß Artikel 32")
|
||||
assert base == qualified
|
||||
|
||||
def test_truncation_at_40_chars(self):
|
||||
"""Objects truncated at 40 chars at word boundary."""
|
||||
long_obj = "interner_eskalationsprozess_bei_schwellenwertueberschreitung_und_mehr"
|
||||
result = _normalize_object(long_obj)
|
||||
assert len(result) <= 40
|
||||
|
||||
def test_near_synonym_erkennung(self):
|
||||
"""'Früherkennung' and 'frühzeitige Erkennung' collapse."""
|
||||
a = _normalize_object("Früherkennung von Anomalien")
|
||||
b = _normalize_object("frühzeitige Erkennung von Angriffen")
|
||||
assert a == b
|
||||
|
||||
def test_near_synonym_eskalation(self):
|
||||
"""'Eskalationsprozess' and 'Eskalationsverfahren' collapse."""
|
||||
a = _normalize_object("Eskalationsprozess")
|
||||
b = _normalize_object("Eskalationsverfahren")
|
||||
assert a == b
|
||||
|
||||
def test_near_synonym_meldeprozess(self):
|
||||
"""'Meldeprozess' and 'Meldeverfahren' collapse to notification."""
|
||||
a = _normalize_object("Meldeprozess")
|
||||
b = _normalize_object("Meldeverfahren")
|
||||
assert a == b
|
||||
|
||||
def test_near_synonym_ueberwachung(self):
|
||||
"""'Überwachung' and 'Monitoring' collapse."""
|
||||
a = _normalize_object("Überwachung")
|
||||
b = _normalize_object("Monitoring")
|
||||
assert a == b
|
||||
|
||||
def test_trailing_noise_stripped(self):
|
||||
"""Trailing articles/prepositions are stripped."""
|
||||
result = _normalize_object("Schutz der")
|
||||
assert not result.endswith("_der")
|
||||
|
||||
def test_vendor_synonyms(self):
|
||||
"""Lieferant/Dienstleister/Auftragsverarbeiter collapse to vendor."""
|
||||
a = _normalize_object("Lieferant")
|
||||
b = _normalize_object("Dienstleister")
|
||||
c = _normalize_object("Auftragsverarbeiter")
|
||||
assert a == b == c
|
||||
|
||||
def test_patch_mgmt_synonyms(self):
|
||||
"""Patchmanagement/Aktualisierung collapse."""
|
||||
a = _normalize_object("Patchmanagement")
|
||||
b = _normalize_object("Softwareaktualisierung")
|
||||
assert a == b
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 5: OUTPUT VALIDATOR TESTS
|
||||
@@ -2544,3 +2617,334 @@ class TestComposeDeterministicSeverity:
|
||||
is_reporting=False,
|
||||
)
|
||||
assert atomic.severity == "high"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ERROR CLASS 1: NEGATIVE / PROHIBITIVE ACTION CLASSIFICATION
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNegativeActions:
|
||||
"""Tests for prohibitive action keywords → prevent/exclude/forbid."""
|
||||
|
||||
def test_duerfen_keine_maps_to_prevent(self):
|
||||
assert _classify_action("dürfen keine") == "prevent"
|
||||
|
||||
def test_duerfen_nicht_maps_to_prevent(self):
|
||||
assert _classify_action("dürfen nicht") == "prevent"
|
||||
|
||||
def test_darf_keine_maps_to_prevent(self):
|
||||
assert _classify_action("darf keine") == "prevent"
|
||||
|
||||
def test_verboten_maps_to_forbid(self):
|
||||
assert _classify_action("verboten") == "forbid"
|
||||
|
||||
def test_untersagt_maps_to_forbid(self):
|
||||
assert _classify_action("untersagt") == "forbid"
|
||||
|
||||
def test_nicht_zulaessig_maps_to_forbid(self):
|
||||
assert _classify_action("nicht zulässig") == "forbid"
|
||||
|
||||
def test_nicht_erlaubt_maps_to_forbid(self):
|
||||
assert _classify_action("nicht erlaubt") == "forbid"
|
||||
|
||||
def test_nicht_enthalten_maps_to_exclude(self):
|
||||
assert _classify_action("nicht enthalten") == "exclude"
|
||||
|
||||
def test_ausschliessen_maps_to_exclude(self):
|
||||
assert _classify_action("ausschließen") == "exclude"
|
||||
|
||||
def test_verhindern_maps_to_prevent(self):
|
||||
assert _classify_action("verhindern") == "prevent"
|
||||
|
||||
def test_unterbinden_maps_to_prevent(self):
|
||||
assert _classify_action("unterbinden") == "prevent"
|
||||
|
||||
def test_ablehnen_maps_to_exclude(self):
|
||||
assert _classify_action("ablehnen") == "exclude"
|
||||
|
||||
def test_nicht_uebertragen_maps_to_prevent(self):
|
||||
assert _classify_action("nicht übertragen") == "prevent"
|
||||
|
||||
def test_nicht_gespeichert_maps_to_prevent(self):
|
||||
assert _classify_action("nicht gespeichert") == "prevent"
|
||||
|
||||
def test_negative_action_has_higher_priority_than_implement(self):
|
||||
"""Negative keywords at start of ACTION_PRIORITY → picked over lower ones."""
|
||||
result = _classify_action("verhindern und dokumentieren")
|
||||
assert result == "prevent"
|
||||
|
||||
def test_prevent_template_exists(self):
|
||||
assert "prevent" in _ACTION_TEMPLATES
|
||||
assert "test_procedure" in _ACTION_TEMPLATES["prevent"]
|
||||
assert "evidence" in _ACTION_TEMPLATES["prevent"]
|
||||
|
||||
def test_exclude_template_exists(self):
|
||||
assert "exclude" in _ACTION_TEMPLATES
|
||||
assert "test_procedure" in _ACTION_TEMPLATES["exclude"]
|
||||
|
||||
def test_forbid_template_exists(self):
|
||||
assert "forbid" in _ACTION_TEMPLATES
|
||||
assert "test_procedure" in _ACTION_TEMPLATES["forbid"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ERROR CLASS 1b: SESSION / LIFECYCLE ACTIONS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSessionActions:
|
||||
"""Tests for session lifecycle action keywords."""
|
||||
|
||||
def test_ungueltig_machen_maps_to_invalidate(self):
|
||||
assert _classify_action("ungültig machen") == "invalidate"
|
||||
|
||||
def test_invalidieren_maps_to_invalidate(self):
|
||||
assert _classify_action("invalidieren") == "invalidate"
|
||||
|
||||
def test_widerrufen_maps_to_invalidate(self):
|
||||
assert _classify_action("widerrufen") == "invalidate"
|
||||
|
||||
def test_session_beenden_maps_to_invalidate(self):
|
||||
assert _classify_action("session beenden") == "invalidate"
|
||||
|
||||
def test_vergeben_maps_to_issue(self):
|
||||
assert _classify_action("vergeben") == "issue"
|
||||
|
||||
def test_erzeugen_maps_to_issue(self):
|
||||
assert _classify_action("erzeugen") == "issue"
|
||||
|
||||
def test_rotieren_maps_to_rotate(self):
|
||||
assert _classify_action("rotieren") == "rotate"
|
||||
|
||||
def test_erneuern_maps_to_rotate(self):
|
||||
assert _classify_action("erneuern") == "rotate"
|
||||
|
||||
def test_durchsetzen_maps_to_enforce(self):
|
||||
assert _classify_action("durchsetzen") == "enforce"
|
||||
|
||||
def test_erzwingen_maps_to_enforce(self):
|
||||
assert _classify_action("erzwingen") == "enforce"
|
||||
|
||||
def test_invalidate_template_exists(self):
|
||||
assert "invalidate" in _ACTION_TEMPLATES
|
||||
assert "test_procedure" in _ACTION_TEMPLATES["invalidate"]
|
||||
|
||||
def test_issue_template_exists(self):
|
||||
assert "issue" in _ACTION_TEMPLATES
|
||||
|
||||
def test_rotate_template_exists(self):
|
||||
assert "rotate" in _ACTION_TEMPLATES
|
||||
|
||||
def test_enforce_template_exists(self):
|
||||
assert "enforce" in _ACTION_TEMPLATES
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ERROR CLASS 2: CONTAINER OBJECT DETECTION
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestContainerObjectDetection:
|
||||
"""Tests for _is_container_object — broad objects that need decomposition."""
|
||||
|
||||
def test_sitzungsverwaltung_is_container(self):
|
||||
assert _is_container_object("Sitzungsverwaltung") is True
|
||||
|
||||
def test_session_management_is_container(self):
|
||||
assert _is_container_object("Session Management") is True
|
||||
|
||||
def test_token_schutz_is_container(self):
|
||||
assert _is_container_object("Token-Schutz") is True
|
||||
|
||||
def test_authentifizierungsmechanismen_is_container(self):
|
||||
assert _is_container_object("Authentifizierungsmechanismen") is True
|
||||
|
||||
def test_sicherheitsmassnahmen_is_container(self):
|
||||
assert _is_container_object("Sicherheitsmaßnahmen") is True
|
||||
|
||||
def test_zugriffskontrollmechanismen_is_container(self):
|
||||
assert _is_container_object("Zugriffskontrollmechanismen") is True
|
||||
|
||||
def test_sicherheitsarchitektur_is_container(self):
|
||||
assert _is_container_object("Sicherheitsarchitektur") is True
|
||||
|
||||
def test_compliance_anforderungen_is_container(self):
|
||||
assert _is_container_object("Compliance-Anforderungen") is True
|
||||
|
||||
def test_session_id_is_not_container(self):
|
||||
"""Specific objects like Session-ID are NOT containers."""
|
||||
assert _is_container_object("Session-ID") is False
|
||||
|
||||
def test_firewall_is_not_container(self):
|
||||
assert _is_container_object("Firewall") is False
|
||||
|
||||
def test_mfa_is_not_container(self):
|
||||
assert _is_container_object("MFA") is False
|
||||
|
||||
def test_verschluesselung_is_not_container(self):
|
||||
assert _is_container_object("Verschlüsselung") is False
|
||||
|
||||
def test_cookie_is_not_container(self):
|
||||
assert _is_container_object("Session-Cookie") is False
|
||||
|
||||
def test_empty_string_is_not_container(self):
|
||||
assert _is_container_object("") is False
|
||||
|
||||
def test_none_is_not_container(self):
|
||||
assert _is_container_object(None) is False
|
||||
|
||||
def test_container_in_compose_sets_atomicity(self):
|
||||
"""Container objects set _atomicity='container' and _requires_decomposition."""
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Sitzungsverwaltung muss abgesichert werden",
|
||||
action="implementieren",
|
||||
object_="Sitzungsverwaltung",
|
||||
parent_title="Session Security",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac._atomicity == "container"
|
||||
assert ac._requires_decomposition is True
|
||||
|
||||
def test_specific_object_is_atomic(self):
|
||||
"""Specific objects like Session-ID stay atomic."""
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Session-ID muss nach Logout gelöscht werden",
|
||||
action="implementieren",
|
||||
object_="Session-ID",
|
||||
parent_title="Session Security",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac._atomicity == "atomic"
|
||||
assert ac._requires_decomposition is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ERROR CLASS 3: SESSION-SPECIFIC OBJECT CLASSES
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSessionObjectClasses:
|
||||
"""Tests for session/cookie/jwt/federated_assertion object classification."""
|
||||
|
||||
def test_session_class(self):
|
||||
assert _classify_object("Session") == "session"
|
||||
|
||||
def test_sitzung_class(self):
|
||||
assert _classify_object("Sitzung") == "session"
|
||||
|
||||
def test_session_id_class(self):
|
||||
assert _classify_object("Session-ID") == "session"
|
||||
|
||||
def test_session_token_class(self):
|
||||
assert _classify_object("Session-Token") == "session"
|
||||
|
||||
def test_idle_timeout_class(self):
|
||||
assert _classify_object("Idle Timeout") == "session"
|
||||
|
||||
def test_logout_matches_record_via_log(self):
|
||||
"""'Logout' matches 'log' in record class (checked before session)."""
|
||||
# Ordering: record class checked before session — "log" substring matches
|
||||
assert _classify_object("Logout") == "record"
|
||||
|
||||
def test_abmeldung_matches_report_via_meldung(self):
|
||||
"""'Abmeldung' matches 'meldung' in report class (checked before session)."""
|
||||
assert _classify_object("Abmeldung") == "report"
|
||||
|
||||
def test_cookie_class(self):
|
||||
assert _classify_object("Cookie") == "cookie"
|
||||
|
||||
def test_session_cookie_matches_session_first(self):
|
||||
"""'Session-Cookie' matches 'session' in session class (checked before cookie)."""
|
||||
assert _classify_object("Session-Cookie") == "session"
|
||||
|
||||
def test_secure_flag_class(self):
|
||||
assert _classify_object("Secure-Flag") == "cookie"
|
||||
|
||||
def test_httponly_class(self):
|
||||
assert _classify_object("HttpOnly") == "cookie"
|
||||
|
||||
def test_samesite_class(self):
|
||||
assert _classify_object("SameSite") == "cookie"
|
||||
|
||||
def test_jwt_class(self):
|
||||
assert _classify_object("JWT") == "jwt"
|
||||
|
||||
def test_json_web_token_class(self):
|
||||
assert _classify_object("JSON Web Token") == "jwt"
|
||||
|
||||
def test_bearer_token_class(self):
|
||||
assert _classify_object("Bearer Token") == "jwt"
|
||||
|
||||
def test_saml_assertion_class(self):
|
||||
assert _classify_object("SAML Assertion") == "federated_assertion"
|
||||
|
||||
def test_oidc_class(self):
|
||||
assert _classify_object("OIDC Provider") == "federated_assertion"
|
||||
|
||||
def test_openid_class(self):
|
||||
assert _classify_object("OpenID Connect") == "federated_assertion"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ERROR CLASS 4: SEVERITY CAPS FOR NEW ACTION TYPES
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNewActionSeverityCaps:
|
||||
"""Tests for _ACTION_SEVERITY_CAP on new action types."""
|
||||
|
||||
def test_prevent_capped_at_high(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("prevent") == "high"
|
||||
|
||||
def test_exclude_capped_at_high(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("exclude") == "high"
|
||||
|
||||
def test_forbid_capped_at_high(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("forbid") == "high"
|
||||
|
||||
def test_invalidate_capped_at_high(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("invalidate") == "high"
|
||||
|
||||
def test_issue_capped_at_high(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("issue") == "high"
|
||||
|
||||
def test_rotate_capped_at_medium(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("rotate") == "medium"
|
||||
|
||||
def test_enforce_capped_at_high(self):
|
||||
assert _ACTION_SEVERITY_CAP.get("enforce") == "high"
|
||||
|
||||
def test_prevent_action_severity_in_compose(self):
|
||||
"""prevent + critical parent → capped to high."""
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Session-Tokens dürfen nicht im Klartext gespeichert werden",
|
||||
action="verhindern",
|
||||
object_="Klartextspeicherung",
|
||||
parent_title="Token Security",
|
||||
parent_severity="critical",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.severity == "high"
|
||||
|
||||
def test_rotate_action_severity_in_compose(self):
|
||||
"""rotate + high parent → capped to medium."""
|
||||
ac = _compose_deterministic(
|
||||
obligation_text="Session-Tokens müssen regelmäßig rotiert werden",
|
||||
action="rotieren",
|
||||
object_="Session-Token",
|
||||
parent_title="Token Lifecycle",
|
||||
parent_severity="high",
|
||||
parent_category="security",
|
||||
is_test=False,
|
||||
is_reporting=False,
|
||||
)
|
||||
assert ac.severity == "medium"
|
||||
|
||||
Reference in New Issue
Block a user