Improve object normalization: shorter keys, synonym expansion, qualifier stripping
- Truncate object keys to 40 chars (was 80) at underscore boundary - Strip German qualifying prepositional phrases (bei/für/gemäß/von/zur/...) - Add 65 new synonym mappings for near-duplicate patterns found in analysis - Strip trailing noise tokens (articles/prepositions) - Add _truncate_at_boundary() helper and _QUALIFYING_PHRASE_RE regex - 11 new tests for normalization improvements (227 total pass) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1415,6 +1415,74 @@ _OBJECT_SYNONYMS: dict[str, str] = {
|
||||
"zugriff": "access_control",
|
||||
"einwilligung": "consent",
|
||||
"zustimmung": "consent",
|
||||
# Near-synonym expansions found via heavy-control analysis (2026-03-28)
|
||||
"erkennung": "detection",
|
||||
"früherkennung": "detection",
|
||||
"frühzeitige erkennung": "detection",
|
||||
"frühzeitigen erkennung": "detection",
|
||||
"detektion": "detection",
|
||||
"eskalation": "escalation",
|
||||
"eskalationsprozess": "escalation",
|
||||
"eskalationsverfahren": "escalation",
|
||||
"benachrichtigungsprozess": "notification",
|
||||
"benachrichtigungsverfahren": "notification",
|
||||
"meldeprozess": "notification",
|
||||
"meldeverfahren": "notification",
|
||||
"meldesystem": "notification",
|
||||
"benachrichtigungssystem": "notification",
|
||||
"überwachung": "monitoring",
|
||||
"monitoring": "monitoring",
|
||||
"kontinuierliche überwachung": "monitoring",
|
||||
"laufende überwachung": "monitoring",
|
||||
"prüfung": "audit",
|
||||
"überprüfung": "audit",
|
||||
"kontrolle": "control_check",
|
||||
"sicherheitskontrolle": "control_check",
|
||||
"dokumentation": "documentation",
|
||||
"aufzeichnungspflicht": "documentation",
|
||||
"protokollierung": "logging",
|
||||
"logführung": "logging",
|
||||
"logmanagement": "logging",
|
||||
"wiederherstellung": "recovery",
|
||||
"notfallwiederherstellung": "recovery",
|
||||
"disaster recovery": "recovery",
|
||||
"notfallplan": "contingency_plan",
|
||||
"notfallplanung": "contingency_plan",
|
||||
"wiederanlaufplan": "contingency_plan",
|
||||
"klassifizierung": "classification",
|
||||
"kategorisierung": "classification",
|
||||
"einstufung": "classification",
|
||||
"segmentierung": "segmentation",
|
||||
"netzwerksegmentierung": "segmentation",
|
||||
"netzwerk-segmentierung": "segmentation",
|
||||
"trennung": "segmentation",
|
||||
"isolierung": "isolation",
|
||||
"patch": "patch_mgmt",
|
||||
"patchmanagement": "patch_mgmt",
|
||||
"patch-management": "patch_mgmt",
|
||||
"aktualisierung": "patch_mgmt",
|
||||
"softwareaktualisierung": "patch_mgmt",
|
||||
"härtung": "hardening",
|
||||
"systemhärtung": "hardening",
|
||||
"härtungsmaßnahme": "hardening",
|
||||
"löschung": "deletion",
|
||||
"datenlöschung": "deletion",
|
||||
"löschkonzept": "deletion",
|
||||
"anonymisierung": "anonymization",
|
||||
"pseudonymisierung": "pseudonymization",
|
||||
"zugangssteuerung": "access_control",
|
||||
"zugangskontrolle": "access_control",
|
||||
"zugriffssteuerung": "access_control",
|
||||
"zugriffskontrolle": "access_control",
|
||||
"schlüsselmanagement": "key_mgmt",
|
||||
"schlüsselverwaltung": "key_mgmt",
|
||||
"key management": "key_mgmt",
|
||||
"zertifikatsverwaltung": "cert_mgmt",
|
||||
"zertifikatsmanagement": "cert_mgmt",
|
||||
"lieferant": "vendor",
|
||||
"dienstleister": "vendor",
|
||||
"auftragsverarbeiter": "vendor",
|
||||
"drittanbieter": "vendor",
|
||||
}
|
||||
|
||||
|
||||
@@ -1435,12 +1503,20 @@ def _normalize_object(object_raw: str) -> str:
|
||||
|
||||
Applies synonym mapping to collapse German terms to canonical forms
|
||||
(e.g., 'Richtlinie' -> 'policy', 'Verzeichnis' -> 'register').
|
||||
Then strips qualifying prepositional phrases that would create
|
||||
near-duplicate keys (e.g., 'bei Schwellenwertüberschreitung').
|
||||
Truncates to 40 chars to collapse overly specific variants.
|
||||
"""
|
||||
if not object_raw:
|
||||
return "unknown"
|
||||
|
||||
obj_lower = object_raw.strip().lower()
|
||||
|
||||
# Strip qualifying prepositional phrases that don't change core identity.
|
||||
# These create near-duplicate keys like "eskalationsprozess" vs
|
||||
# "eskalationsprozess bei schwellenwertüberschreitung".
|
||||
obj_lower = _QUALIFYING_PHRASE_RE.sub("", obj_lower).strip()
|
||||
|
||||
# Synonym mapping — find the longest matching synonym
|
||||
best_match = ""
|
||||
best_canonical = ""
|
||||
@@ -1456,7 +1532,54 @@ def _normalize_object(object_raw: str) -> str:
|
||||
for src, dst in [("ä", "ae"), ("ö", "oe"), ("ü", "ue"), ("ß", "ss")]:
|
||||
obj = obj.replace(src, dst)
|
||||
obj = re.sub(r"[^a-z0-9_]", "", obj)
|
||||
return obj[:80] or "unknown"
|
||||
|
||||
# Strip trailing noise tokens (articles/prepositions stuck at the end)
|
||||
obj = re.sub(r"(_(?:der|die|das|des|dem|den|fuer|bei|von|zur|zum|mit|auf|in|und|oder|aus|an|ueber|nach|gegen|unter|vor|zwischen|als|durch|ohne|wie))+$", "", obj)
|
||||
|
||||
# Truncate at 40 chars (at underscore boundary) to collapse
|
||||
# overly specific suffixes that create near-duplicate keys.
|
||||
obj = _truncate_at_boundary(obj, 40)
|
||||
|
||||
return obj or "unknown"
|
||||
|
||||
|
||||
# Regex to strip German qualifying prepositional phrases from object text.
|
||||
# Matches patterns like "bei schwellenwertüberschreitung",
|
||||
# "für kritische systeme", "gemäß artikel 32" etc.
|
||||
_QUALIFYING_PHRASE_RE = re.compile(
|
||||
r"\s+(?:"
|
||||
r"bei\s+\w+"
|
||||
r"|für\s+(?:die\s+|den\s+|das\s+|kritische\s+)?\w+"
|
||||
r"|gemäß\s+\w+"
|
||||
r"|nach\s+\w+"
|
||||
r"|von\s+\w+"
|
||||
r"|im\s+(?:falle?\s+|rahmen\s+)?\w+"
|
||||
r"|mit\s+(?:den\s+|der\s+|dem\s+)?\w+"
|
||||
r"|auf\s+(?:basis|grundlage)\s+\w+"
|
||||
r"|zur\s+(?:einhaltung|sicherstellung|gewährleistung|vermeidung|erfüllung)\s*\w*"
|
||||
r"|durch\s+(?:den\s+|die\s+|das\s+)?\w+"
|
||||
r"|über\s+(?:den\s+|die\s+|das\s+)?\w+"
|
||||
r"|unter\s+\w+"
|
||||
r"|zwischen\s+\w+"
|
||||
r"|innerhalb\s+\w+"
|
||||
r"|gegenüber\s+\w+"
|
||||
r"|hinsichtlich\s+\w+"
|
||||
r"|bezüglich\s+\w+"
|
||||
r"|einschließlich\s+\w+"
|
||||
r").*$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _truncate_at_boundary(text: str, max_len: int) -> str:
|
||||
"""Truncate text at the last underscore boundary within max_len."""
|
||||
if len(text) <= max_len:
|
||||
return text
|
||||
truncated = text[:max_len]
|
||||
last_sep = truncated.rfind("_")
|
||||
if last_sep > max_len // 2:
|
||||
return truncated[:last_sep]
|
||||
return truncated
|
||||
|
||||
|
||||
# ── 7b. Framework / Composite Detection ──────────────────────────────────
|
||||
|
||||
@@ -974,6 +974,76 @@ class TestObjectNormalization:
|
||||
assert "ue" in result
|
||||
assert "ä" not in result
|
||||
|
||||
# --- New tests for improved normalization (2026-03-28) ---
|
||||
|
||||
def test_qualifying_phrase_stripped(self):
|
||||
"""Prepositional qualifiers like 'bei X' are stripped."""
|
||||
base = _normalize_object("Eskalationsprozess")
|
||||
qualified = _normalize_object(
|
||||
"Eskalationsprozess bei Schwellenwertüberschreitung"
|
||||
)
|
||||
assert base == qualified
|
||||
|
||||
def test_fuer_phrase_stripped(self):
|
||||
"""'für kritische Systeme' qualifier is stripped."""
|
||||
base = _normalize_object("Backup-Verfahren")
|
||||
qualified = _normalize_object("Backup-Verfahren für kritische Systeme")
|
||||
assert base == qualified
|
||||
|
||||
def test_gemaess_phrase_stripped(self):
|
||||
"""'gemäß Artikel 32' qualifier is stripped."""
|
||||
base = _normalize_object("Verschlüsselung")
|
||||
qualified = _normalize_object("Verschlüsselung gemäß Artikel 32")
|
||||
assert base == qualified
|
||||
|
||||
def test_truncation_at_40_chars(self):
|
||||
"""Objects truncated at 40 chars at word boundary."""
|
||||
long_obj = "interner_eskalationsprozess_bei_schwellenwertueberschreitung_und_mehr"
|
||||
result = _normalize_object(long_obj)
|
||||
assert len(result) <= 40
|
||||
|
||||
def test_near_synonym_erkennung(self):
|
||||
"""'Früherkennung' and 'frühzeitige Erkennung' collapse."""
|
||||
a = _normalize_object("Früherkennung von Anomalien")
|
||||
b = _normalize_object("frühzeitige Erkennung von Angriffen")
|
||||
assert a == b
|
||||
|
||||
def test_near_synonym_eskalation(self):
|
||||
"""'Eskalationsprozess' and 'Eskalationsverfahren' collapse."""
|
||||
a = _normalize_object("Eskalationsprozess")
|
||||
b = _normalize_object("Eskalationsverfahren")
|
||||
assert a == b
|
||||
|
||||
def test_near_synonym_meldeprozess(self):
|
||||
"""'Meldeprozess' and 'Meldeverfahren' collapse to notification."""
|
||||
a = _normalize_object("Meldeprozess")
|
||||
b = _normalize_object("Meldeverfahren")
|
||||
assert a == b
|
||||
|
||||
def test_near_synonym_ueberwachung(self):
|
||||
"""'Überwachung' and 'Monitoring' collapse."""
|
||||
a = _normalize_object("Überwachung")
|
||||
b = _normalize_object("Monitoring")
|
||||
assert a == b
|
||||
|
||||
def test_trailing_noise_stripped(self):
|
||||
"""Trailing articles/prepositions are stripped."""
|
||||
result = _normalize_object("Schutz der")
|
||||
assert not result.endswith("_der")
|
||||
|
||||
def test_vendor_synonyms(self):
|
||||
"""Lieferant/Dienstleister/Auftragsverarbeiter collapse to vendor."""
|
||||
a = _normalize_object("Lieferant")
|
||||
b = _normalize_object("Dienstleister")
|
||||
c = _normalize_object("Auftragsverarbeiter")
|
||||
assert a == b == c
|
||||
|
||||
def test_patch_mgmt_synonyms(self):
|
||||
"""Patchmanagement/Aktualisierung collapse."""
|
||||
a = _normalize_object("Patchmanagement")
|
||||
b = _normalize_object("Softwareaktualisierung")
|
||||
assert a == b
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GAP 5: OUTPUT VALIDATOR TESTS
|
||||
|
||||
Reference in New Issue
Block a user