Block 7.1-7.2 from masterplan: - 26 action_types with German aliases + phase mapping - Negative obligation patterns (exclude, prevent, enforce) - Container detection (11 composite objects that must not become atomic) - Evidence detection (14 indicators + "X dokumentieren" pattern) - Framework reference detection (OWASP, NIST, BSI, CSA, ISO patterns) - classify_obligation() routes to: atomic, composite, evidence, framework_container - build_canonical_key() for deterministic dedup - 36 tests covering all classification functions Also: merge_key bug fix in _process_pass0b_control() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
343 lines
11 KiB
Python
343 lines
11 KiB
Python
"""
|
|
Control Ontology — Controlled vocabulary for action types, object classes,
|
|
normalized objects, and pre-LLM classification.
|
|
|
|
Used by:
|
|
1. Pre-LLM filter (classify obligations before sending to API)
|
|
2. Canonical key generation (deterministic dedup key)
|
|
3. Post-LLM validation (reject invalid action_type/object_class)
|
|
4. BatchDedup (merge_group from normalized_object)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Optional
|
|
|
|
# ============================================================================
|
|
# ACTION TYPES (26) — with German aliases
|
|
# ============================================================================
|
|
|
|
ACTION_TYPES: dict[str, dict] = {
|
|
"define": {
|
|
"aliases": ["definieren", "festlegen", "bestimmen", "vorgeben"],
|
|
"phase": "definition",
|
|
},
|
|
"document": {
|
|
"aliases": ["dokumentieren", "aufzeichnen", "protokollieren", "schriftlich festhalten"],
|
|
"phase": "evidence",
|
|
},
|
|
"approve": {
|
|
"aliases": ["freigeben", "genehmigen", "bestätigen"],
|
|
"phase": "governance",
|
|
},
|
|
"implement": {
|
|
"aliases": ["implementieren", "umsetzen", "einführen", "einsetzen", "bereitstellen",
|
|
"etablieren", "einrichten", "aufbauen"],
|
|
"phase": "implementation",
|
|
},
|
|
"configure": {
|
|
"aliases": ["konfigurieren", "einstellen", "parametrisieren"],
|
|
"phase": "configuration",
|
|
},
|
|
"enforce": {
|
|
"aliases": ["durchsetzen", "erzwingen", "technisch erzwingen"],
|
|
"phase": "implementation",
|
|
},
|
|
"maintain": {
|
|
"aliases": ["pflegen", "aktuell halten", "aufrechterhalten", "führen"],
|
|
"phase": "operation",
|
|
},
|
|
"monitor": {
|
|
"aliases": ["überwachen", "beobachten", "monitoren"],
|
|
"phase": "monitoring",
|
|
},
|
|
"review": {
|
|
"aliases": ["überprüfen", "prüfen", "reviewen", "kontrollieren"],
|
|
"phase": "review",
|
|
},
|
|
"assess": {
|
|
"aliases": ["bewerten", "beurteilen", "einschätzen", "analysieren"],
|
|
"phase": "assessment",
|
|
},
|
|
"identify": {
|
|
"aliases": ["identifizieren", "erkennen", "erfassen", "feststellen"],
|
|
"phase": "assessment",
|
|
},
|
|
"remediate": {
|
|
"aliases": ["beheben", "adressieren", "mitigieren", "behandeln", "abstellen"],
|
|
"phase": "remediation",
|
|
},
|
|
"test": {
|
|
"aliases": ["testen", "ausprobieren", "Test durchführen"],
|
|
"phase": "testing",
|
|
},
|
|
"verify": {
|
|
"aliases": ["verifizieren", "nachweisen", "bestätigen"],
|
|
"phase": "testing",
|
|
},
|
|
"validate": {
|
|
"aliases": ["validieren", "Wirksamkeit prüfen"],
|
|
"phase": "validation",
|
|
},
|
|
"report": {
|
|
"aliases": ["melden", "berichten", "Anzeige erstatten"],
|
|
"phase": "reporting",
|
|
},
|
|
"notify": {
|
|
"aliases": ["benachrichtigen", "informieren", "unterrichten"],
|
|
"phase": "reporting",
|
|
},
|
|
"train": {
|
|
"aliases": ["schulen", "unterweisen", "sensibilisieren"],
|
|
"phase": "training",
|
|
},
|
|
"retain": {
|
|
"aliases": ["aufbewahren", "archivieren", "speichern"],
|
|
"phase": "evidence",
|
|
},
|
|
"delete": {
|
|
"aliases": ["löschen", "vernichten", "entfernen"],
|
|
"phase": "operation",
|
|
},
|
|
"prevent": {
|
|
"aliases": ["verhindern", "vermeiden", "unterbinden"],
|
|
"phase": "implementation",
|
|
},
|
|
"exclude": {
|
|
"aliases": ["nicht zulassen", "ausschließen", "verbieten", "untersagen"],
|
|
"phase": "implementation",
|
|
},
|
|
"restrict_access": {
|
|
"aliases": ["Zugriff beschränken", "autorisieren", "berechtigen", "beschränken"],
|
|
"phase": "implementation",
|
|
},
|
|
"encrypt": {
|
|
"aliases": ["verschlüsseln", "kryptografisch schützen"],
|
|
"phase": "implementation",
|
|
},
|
|
"invalidate": {
|
|
"aliases": ["invalidieren", "ungültig machen", "widerrufen"],
|
|
"phase": "operation",
|
|
},
|
|
"issue": {
|
|
"aliases": ["ausstellen", "vergeben", "erzeugen", "generieren"],
|
|
"phase": "operation",
|
|
},
|
|
}
|
|
|
|
# Build reverse lookup: German alias → action_type
|
|
_ALIAS_TO_ACTION: dict[str, str] = {}
|
|
for action_type, info in ACTION_TYPES.items():
|
|
for alias in info["aliases"]:
|
|
_ALIAS_TO_ACTION[alias.lower()] = action_type
|
|
|
|
|
|
# ============================================================================
|
|
# NEGATIVE OBLIGATION PATTERNS
|
|
# ============================================================================
|
|
|
|
_NEGATIVE_PATTERNS: list[tuple[str, str]] = [
|
|
# Longer/specific patterns first (checked in order)
|
|
("darf nicht wiederverwendet", "prevent"),
|
|
("nicht in der URL", "prevent"),
|
|
("nicht im Token", "prevent"),
|
|
("nicht in Logs", "prevent"),
|
|
("verhindern", "prevent"),
|
|
("unterbinden", "prevent"),
|
|
("abweisen", "enforce"),
|
|
("blockieren", "enforce"),
|
|
("zurückweisen", "enforce"),
|
|
# Generic negative patterns last
|
|
("dürfen nicht", "exclude"),
|
|
("dürfen keine", "exclude"),
|
|
("darf nicht", "exclude"),
|
|
("darf keine", "exclude"),
|
|
("nicht zulässig", "exclude"),
|
|
("nicht erlaubt", "exclude"),
|
|
("verboten", "exclude"),
|
|
("untersagt", "exclude"),
|
|
]
|
|
|
|
|
|
# ============================================================================
|
|
# CONTAINER / COMPOSITE OBJECTS (must NOT become atomic)
|
|
# ============================================================================
|
|
|
|
CONTAINER_OBJECTS: set[str] = {
|
|
"sichere sitzungsverwaltung",
|
|
"token-schutz",
|
|
"sorgfaltspflichten für drittkomponenten",
|
|
"risikomanagementsystem",
|
|
"secure development lifecycle",
|
|
"informationssicherheitsmanagement",
|
|
"datenschutzmanagement",
|
|
"ki-governance",
|
|
"sicherheitsmaßnahmen",
|
|
"technische und organisatorische maßnahmen",
|
|
"compliance-programm",
|
|
"umfassendes risikomanagement",
|
|
"sicherer softwareentwicklungsprozess",
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# EVIDENCE INDICATORS (must NOT become a control)
|
|
# ============================================================================
|
|
|
|
EVIDENCE_INDICATORS: set[str] = {
|
|
"nachweis", "dokumentation", "screenshot", "export", "auditbericht",
|
|
"prüfbericht", "zertifizierung", "log-auszug", "jira-ticket",
|
|
"servicenow-ticket", "sbom-nachweis", "freigabevermerk",
|
|
"review-protokoll", "testprotokoll",
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# FRAMEWORK REFERENCES (must NOT become atomic directly)
|
|
# ============================================================================
|
|
|
|
_FRAMEWORK_PATTERNS: list[str] = [
|
|
r"OWASP\s+ASVS\s+V\d",
|
|
r"OWASP\s+API\s+Top\s+10",
|
|
r"NIST\s+SP\s+800-\d+",
|
|
r"NIST\s+IA-\d+",
|
|
r"NIST\s+AC-\d+",
|
|
r"BSI\s+IT-Grundschutz",
|
|
r"BSI\s+200-\d",
|
|
r"CSA\s+CCM",
|
|
r"ISO\s+27001",
|
|
r"ISO\s+27002",
|
|
]
|
|
|
|
|
|
# ============================================================================
|
|
# CLASSIFICATION FUNCTIONS
|
|
# ============================================================================
|
|
|
|
|
|
def classify_action(text: str) -> str:
|
|
"""Classify an obligation action text into a canonical action_type."""
|
|
text_lower = text.lower().strip()
|
|
|
|
# Check negative patterns first
|
|
for pattern, action_type in _NEGATIVE_PATTERNS:
|
|
if pattern in text_lower:
|
|
return action_type
|
|
|
|
# Direct alias match
|
|
if text_lower in _ALIAS_TO_ACTION:
|
|
return _ALIAS_TO_ACTION[text_lower]
|
|
|
|
# Substring match (longest first)
|
|
best_match = ""
|
|
best_action = "implement" # default fallback
|
|
for alias, action_type in sorted(_ALIAS_TO_ACTION.items(), key=lambda x: -len(x[0])):
|
|
if alias in text_lower and len(alias) > len(best_match):
|
|
best_match = alias
|
|
best_action = action_type
|
|
|
|
return best_action
|
|
|
|
|
|
def get_phase(action_type: str) -> str:
|
|
"""Get the control_phase for an action_type."""
|
|
info = ACTION_TYPES.get(action_type, {})
|
|
return info.get("phase", "implementation")
|
|
|
|
|
|
def is_container(text: str) -> bool:
|
|
"""Check if obligation text describes a container/composite — not atomic."""
|
|
text_lower = text.lower().strip()
|
|
return any(container in text_lower for container in CONTAINER_OBJECTS)
|
|
|
|
|
|
def is_evidence(text: str) -> bool:
|
|
"""Check if obligation text is actually evidence, not a control."""
|
|
text_lower = text.lower().strip()
|
|
|
|
# Primary check: evidence indicators at the start
|
|
for indicator in EVIDENCE_INDICATORS:
|
|
if text_lower.startswith(indicator) or f"ein {indicator}" in text_lower:
|
|
return True
|
|
|
|
# Secondary: "X dokumentieren" where X is another action's result
|
|
if text_lower.endswith("dokumentieren") or text_lower.endswith("dokumentiert"):
|
|
# Check if the primary subject is an action result, not a standalone duty
|
|
action_words = {"tests", "maßnahmen", "ergebnisse", "prüfungen", "änderungen"}
|
|
if any(w in text_lower for w in action_words):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def is_framework_reference(text: str) -> bool:
|
|
"""Check if obligation references a framework that should be decomposed, not atomic."""
|
|
for pattern in _FRAMEWORK_PATTERNS:
|
|
if re.search(pattern, text, re.IGNORECASE):
|
|
# Only if the text is a generic "implement X framework" statement
|
|
implement_words = {"umsetzen", "implementieren", "einhalten", "erfüllen", "anwenden"}
|
|
text_lower = text.lower()
|
|
if any(w in text_lower for w in implement_words):
|
|
return True
|
|
return False
|
|
|
|
|
|
def classify_obligation(text: str, action: str = "") -> dict:
|
|
"""Classify an obligation for pre-LLM routing.
|
|
|
|
Returns:
|
|
{
|
|
"routing": "atomic" | "composite" | "evidence" | "framework_container",
|
|
"action_type": str,
|
|
"phase": str,
|
|
"reason": str,
|
|
}
|
|
"""
|
|
if is_evidence(text):
|
|
return {
|
|
"routing": "evidence",
|
|
"action_type": "document",
|
|
"phase": "evidence",
|
|
"reason": f"Evidence indicator detected",
|
|
}
|
|
|
|
if is_container(text):
|
|
return {
|
|
"routing": "composite",
|
|
"action_type": classify_action(action or text),
|
|
"phase": get_phase(classify_action(action or text)),
|
|
"reason": "Container/composite object detected",
|
|
}
|
|
|
|
if is_framework_reference(text):
|
|
return {
|
|
"routing": "framework_container",
|
|
"action_type": classify_action(action or text),
|
|
"phase": get_phase(classify_action(action or text)),
|
|
"reason": "Framework reference detected",
|
|
}
|
|
|
|
action_type = classify_action(action or text)
|
|
return {
|
|
"routing": "atomic",
|
|
"action_type": action_type,
|
|
"phase": get_phase(action_type),
|
|
"reason": "Atomic obligation",
|
|
}
|
|
|
|
|
|
def build_canonical_key(
|
|
action_type: str,
|
|
normalized_object: str,
|
|
phase: Optional[str] = None,
|
|
asset_scope: Optional[str] = None,
|
|
) -> str:
|
|
"""Build a canonical dedup key."""
|
|
parts = [action_type, normalized_object]
|
|
if phase:
|
|
parts.append(phase)
|
|
if asset_scope:
|
|
parts.append(asset_scope)
|
|
return ":".join(parts)
|