From f8d9919b978e97134cf548bb2eb6f6172160ae05 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sat, 28 Mar 2026 08:55:48 +0100 Subject: [PATCH] Improve object normalization: shorter keys, synonym expansion, qualifier stripping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Truncate object keys to 40 chars (was 80) at underscore boundary - Strip German qualifying prepositional phrases (bei/für/gemäß/von/zur/...) - Add 65 new synonym mappings for near-duplicate patterns found in analysis - Strip trailing noise tokens (articles/prepositions) - Add _truncate_at_boundary() helper and _QUALIFYING_PHRASE_RE regex - 11 new tests for normalization improvements (227 total pass) Co-Authored-By: Claude Opus 4.6 --- .../compliance/services/decomposition_pass.py | 125 +++++++++++++++++- .../tests/test_decomposition_pass.py | 70 ++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) diff --git a/backend-compliance/compliance/services/decomposition_pass.py b/backend-compliance/compliance/services/decomposition_pass.py index 3672159..7078fe2 100644 --- a/backend-compliance/compliance/services/decomposition_pass.py +++ b/backend-compliance/compliance/services/decomposition_pass.py @@ -1415,6 +1415,74 @@ _OBJECT_SYNONYMS: dict[str, str] = { "zugriff": "access_control", "einwilligung": "consent", "zustimmung": "consent", + # Near-synonym expansions found via heavy-control analysis (2026-03-28) + "erkennung": "detection", + "früherkennung": "detection", + "frühzeitige erkennung": "detection", + "frühzeitigen erkennung": "detection", + "detektion": "detection", + "eskalation": "escalation", + "eskalationsprozess": "escalation", + "eskalationsverfahren": "escalation", + "benachrichtigungsprozess": "notification", + "benachrichtigungsverfahren": "notification", + "meldeprozess": "notification", + "meldeverfahren": "notification", + "meldesystem": "notification", + "benachrichtigungssystem": "notification", + "überwachung": "monitoring", + "monitoring": "monitoring", + "kontinuierliche überwachung": "monitoring", + "laufende überwachung": "monitoring", + "prüfung": "audit", + "überprüfung": "audit", + "kontrolle": "control_check", + "sicherheitskontrolle": "control_check", + "dokumentation": "documentation", + "aufzeichnungspflicht": "documentation", + "protokollierung": "logging", + "logführung": "logging", + "logmanagement": "logging", + "wiederherstellung": "recovery", + "notfallwiederherstellung": "recovery", + "disaster recovery": "recovery", + "notfallplan": "contingency_plan", + "notfallplanung": "contingency_plan", + "wiederanlaufplan": "contingency_plan", + "klassifizierung": "classification", + "kategorisierung": "classification", + "einstufung": "classification", + "segmentierung": "segmentation", + "netzwerksegmentierung": "segmentation", + "netzwerk-segmentierung": "segmentation", + "trennung": "segmentation", + "isolierung": "isolation", + "patch": "patch_mgmt", + "patchmanagement": "patch_mgmt", + "patch-management": "patch_mgmt", + "aktualisierung": "patch_mgmt", + "softwareaktualisierung": "patch_mgmt", + "härtung": "hardening", + "systemhärtung": "hardening", + "härtungsmaßnahme": "hardening", + "löschung": "deletion", + "datenlöschung": "deletion", + "löschkonzept": "deletion", + "anonymisierung": "anonymization", + "pseudonymisierung": "pseudonymization", + "zugangssteuerung": "access_control", + "zugangskontrolle": "access_control", + "zugriffssteuerung": "access_control", + "zugriffskontrolle": "access_control", + "schlüsselmanagement": "key_mgmt", + "schlüsselverwaltung": "key_mgmt", + "key management": "key_mgmt", + "zertifikatsverwaltung": "cert_mgmt", + "zertifikatsmanagement": "cert_mgmt", + "lieferant": "vendor", + "dienstleister": "vendor", + "auftragsverarbeiter": "vendor", + "drittanbieter": "vendor", } @@ -1435,12 +1503,20 @@ def _normalize_object(object_raw: str) -> str: Applies synonym mapping to collapse German terms to canonical forms (e.g., 'Richtlinie' -> 'policy', 'Verzeichnis' -> 'register'). + Then strips qualifying prepositional phrases that would create + near-duplicate keys (e.g., 'bei Schwellenwertüberschreitung'). + Truncates to 40 chars to collapse overly specific variants. """ if not object_raw: return "unknown" obj_lower = object_raw.strip().lower() + # Strip qualifying prepositional phrases that don't change core identity. + # These create near-duplicate keys like "eskalationsprozess" vs + # "eskalationsprozess bei schwellenwertüberschreitung". + obj_lower = _QUALIFYING_PHRASE_RE.sub("", obj_lower).strip() + # Synonym mapping — find the longest matching synonym best_match = "" best_canonical = "" @@ -1456,7 +1532,54 @@ def _normalize_object(object_raw: str) -> str: for src, dst in [("ä", "ae"), ("ö", "oe"), ("ü", "ue"), ("ß", "ss")]: obj = obj.replace(src, dst) obj = re.sub(r"[^a-z0-9_]", "", obj) - return obj[:80] or "unknown" + + # Strip trailing noise tokens (articles/prepositions stuck at the end) + obj = re.sub(r"(_(?:der|die|das|des|dem|den|fuer|bei|von|zur|zum|mit|auf|in|und|oder|aus|an|ueber|nach|gegen|unter|vor|zwischen|als|durch|ohne|wie))+$", "", obj) + + # Truncate at 40 chars (at underscore boundary) to collapse + # overly specific suffixes that create near-duplicate keys. + obj = _truncate_at_boundary(obj, 40) + + return obj or "unknown" + + +# Regex to strip German qualifying prepositional phrases from object text. +# Matches patterns like "bei schwellenwertüberschreitung", +# "für kritische systeme", "gemäß artikel 32" etc. +_QUALIFYING_PHRASE_RE = re.compile( + r"\s+(?:" + r"bei\s+\w+" + r"|für\s+(?:die\s+|den\s+|das\s+|kritische\s+)?\w+" + r"|gemäß\s+\w+" + r"|nach\s+\w+" + r"|von\s+\w+" + r"|im\s+(?:falle?\s+|rahmen\s+)?\w+" + r"|mit\s+(?:den\s+|der\s+|dem\s+)?\w+" + r"|auf\s+(?:basis|grundlage)\s+\w+" + r"|zur\s+(?:einhaltung|sicherstellung|gewährleistung|vermeidung|erfüllung)\s*\w*" + r"|durch\s+(?:den\s+|die\s+|das\s+)?\w+" + r"|über\s+(?:den\s+|die\s+|das\s+)?\w+" + r"|unter\s+\w+" + r"|zwischen\s+\w+" + r"|innerhalb\s+\w+" + r"|gegenüber\s+\w+" + r"|hinsichtlich\s+\w+" + r"|bezüglich\s+\w+" + r"|einschließlich\s+\w+" + r").*$", + re.IGNORECASE, +) + + +def _truncate_at_boundary(text: str, max_len: int) -> str: + """Truncate text at the last underscore boundary within max_len.""" + if len(text) <= max_len: + return text + truncated = text[:max_len] + last_sep = truncated.rfind("_") + if last_sep > max_len // 2: + return truncated[:last_sep] + return truncated # ── 7b. Framework / Composite Detection ────────────────────────────────── diff --git a/backend-compliance/tests/test_decomposition_pass.py b/backend-compliance/tests/test_decomposition_pass.py index 8788f18..5148202 100644 --- a/backend-compliance/tests/test_decomposition_pass.py +++ b/backend-compliance/tests/test_decomposition_pass.py @@ -974,6 +974,76 @@ class TestObjectNormalization: assert "ue" in result assert "ä" not in result + # --- New tests for improved normalization (2026-03-28) --- + + def test_qualifying_phrase_stripped(self): + """Prepositional qualifiers like 'bei X' are stripped.""" + base = _normalize_object("Eskalationsprozess") + qualified = _normalize_object( + "Eskalationsprozess bei Schwellenwertüberschreitung" + ) + assert base == qualified + + def test_fuer_phrase_stripped(self): + """'für kritische Systeme' qualifier is stripped.""" + base = _normalize_object("Backup-Verfahren") + qualified = _normalize_object("Backup-Verfahren für kritische Systeme") + assert base == qualified + + def test_gemaess_phrase_stripped(self): + """'gemäß Artikel 32' qualifier is stripped.""" + base = _normalize_object("Verschlüsselung") + qualified = _normalize_object("Verschlüsselung gemäß Artikel 32") + assert base == qualified + + def test_truncation_at_40_chars(self): + """Objects truncated at 40 chars at word boundary.""" + long_obj = "interner_eskalationsprozess_bei_schwellenwertueberschreitung_und_mehr" + result = _normalize_object(long_obj) + assert len(result) <= 40 + + def test_near_synonym_erkennung(self): + """'Früherkennung' and 'frühzeitige Erkennung' collapse.""" + a = _normalize_object("Früherkennung von Anomalien") + b = _normalize_object("frühzeitige Erkennung von Angriffen") + assert a == b + + def test_near_synonym_eskalation(self): + """'Eskalationsprozess' and 'Eskalationsverfahren' collapse.""" + a = _normalize_object("Eskalationsprozess") + b = _normalize_object("Eskalationsverfahren") + assert a == b + + def test_near_synonym_meldeprozess(self): + """'Meldeprozess' and 'Meldeverfahren' collapse to notification.""" + a = _normalize_object("Meldeprozess") + b = _normalize_object("Meldeverfahren") + assert a == b + + def test_near_synonym_ueberwachung(self): + """'Überwachung' and 'Monitoring' collapse.""" + a = _normalize_object("Überwachung") + b = _normalize_object("Monitoring") + assert a == b + + def test_trailing_noise_stripped(self): + """Trailing articles/prepositions are stripped.""" + result = _normalize_object("Schutz der") + assert not result.endswith("_der") + + def test_vendor_synonyms(self): + """Lieferant/Dienstleister/Auftragsverarbeiter collapse to vendor.""" + a = _normalize_object("Lieferant") + b = _normalize_object("Dienstleister") + c = _normalize_object("Auftragsverarbeiter") + assert a == b == c + + def test_patch_mgmt_synonyms(self): + """Patchmanagement/Aktualisierung collapse.""" + a = _normalize_object("Patchmanagement") + b = _normalize_object("Softwareaktualisierung") + assert a == b + # --------------------------------------------------------------------------- # GAP 5: OUTPUT VALIDATOR TESTS