feat(control-pipeline): Control Ontology v1 — action types, evidence/container/framework detection

Block 7.1-7.2 from masterplan:
- 26 action_types with German aliases + phase mapping
- Negative obligation patterns (exclude, prevent, enforce)
- Container detection (11 composite objects that must not become atomic)
- Evidence detection (14 indicators + "X dokumentieren" pattern)
- Framework reference detection (OWASP, NIST, BSI, CSA, ISO patterns)
- classify_obligation() routes to: atomic, composite, evidence, framework_container
- build_canonical_key() for deterministic dedup
- 36 tests covering all classification functions

Also: merge_key bug fix in _process_pass0b_control()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-26 09:06:39 +02:00
parent 3a100fa1f1
commit b3fbbbacfe
3 changed files with 494 additions and 1 deletions

View File

@@ -0,0 +1,342 @@
"""
Control Ontology — Controlled vocabulary for action types, object classes,
normalized objects, and pre-LLM classification.
Used by:
1. Pre-LLM filter (classify obligations before sending to API)
2. Canonical key generation (deterministic dedup key)
3. Post-LLM validation (reject invalid action_type/object_class)
4. BatchDedup (merge_group from normalized_object)
"""
from __future__ import annotations
import re
from typing import Optional
# ============================================================================
# ACTION TYPES (26) — with German aliases
# ============================================================================
ACTION_TYPES: dict[str, dict] = {
"define": {
"aliases": ["definieren", "festlegen", "bestimmen", "vorgeben"],
"phase": "definition",
},
"document": {
"aliases": ["dokumentieren", "aufzeichnen", "protokollieren", "schriftlich festhalten"],
"phase": "evidence",
},
"approve": {
"aliases": ["freigeben", "genehmigen", "bestätigen"],
"phase": "governance",
},
"implement": {
"aliases": ["implementieren", "umsetzen", "einführen", "einsetzen", "bereitstellen",
"etablieren", "einrichten", "aufbauen"],
"phase": "implementation",
},
"configure": {
"aliases": ["konfigurieren", "einstellen", "parametrisieren"],
"phase": "configuration",
},
"enforce": {
"aliases": ["durchsetzen", "erzwingen", "technisch erzwingen"],
"phase": "implementation",
},
"maintain": {
"aliases": ["pflegen", "aktuell halten", "aufrechterhalten", "führen"],
"phase": "operation",
},
"monitor": {
"aliases": ["überwachen", "beobachten", "monitoren"],
"phase": "monitoring",
},
"review": {
"aliases": ["überprüfen", "prüfen", "reviewen", "kontrollieren"],
"phase": "review",
},
"assess": {
"aliases": ["bewerten", "beurteilen", "einschätzen", "analysieren"],
"phase": "assessment",
},
"identify": {
"aliases": ["identifizieren", "erkennen", "erfassen", "feststellen"],
"phase": "assessment",
},
"remediate": {
"aliases": ["beheben", "adressieren", "mitigieren", "behandeln", "abstellen"],
"phase": "remediation",
},
"test": {
"aliases": ["testen", "ausprobieren", "Test durchführen"],
"phase": "testing",
},
"verify": {
"aliases": ["verifizieren", "nachweisen", "bestätigen"],
"phase": "testing",
},
"validate": {
"aliases": ["validieren", "Wirksamkeit prüfen"],
"phase": "validation",
},
"report": {
"aliases": ["melden", "berichten", "Anzeige erstatten"],
"phase": "reporting",
},
"notify": {
"aliases": ["benachrichtigen", "informieren", "unterrichten"],
"phase": "reporting",
},
"train": {
"aliases": ["schulen", "unterweisen", "sensibilisieren"],
"phase": "training",
},
"retain": {
"aliases": ["aufbewahren", "archivieren", "speichern"],
"phase": "evidence",
},
"delete": {
"aliases": ["löschen", "vernichten", "entfernen"],
"phase": "operation",
},
"prevent": {
"aliases": ["verhindern", "vermeiden", "unterbinden"],
"phase": "implementation",
},
"exclude": {
"aliases": ["nicht zulassen", "ausschließen", "verbieten", "untersagen"],
"phase": "implementation",
},
"restrict_access": {
"aliases": ["Zugriff beschränken", "autorisieren", "berechtigen", "beschränken"],
"phase": "implementation",
},
"encrypt": {
"aliases": ["verschlüsseln", "kryptografisch schützen"],
"phase": "implementation",
},
"invalidate": {
"aliases": ["invalidieren", "ungültig machen", "widerrufen"],
"phase": "operation",
},
"issue": {
"aliases": ["ausstellen", "vergeben", "erzeugen", "generieren"],
"phase": "operation",
},
}
# Build reverse lookup: German alias → action_type
_ALIAS_TO_ACTION: dict[str, str] = {}
for action_type, info in ACTION_TYPES.items():
for alias in info["aliases"]:
_ALIAS_TO_ACTION[alias.lower()] = action_type
# ============================================================================
# NEGATIVE OBLIGATION PATTERNS
# ============================================================================
_NEGATIVE_PATTERNS: list[tuple[str, str]] = [
# Longer/specific patterns first (checked in order)
("darf nicht wiederverwendet", "prevent"),
("nicht in der URL", "prevent"),
("nicht im Token", "prevent"),
("nicht in Logs", "prevent"),
("verhindern", "prevent"),
("unterbinden", "prevent"),
("abweisen", "enforce"),
("blockieren", "enforce"),
("zurückweisen", "enforce"),
# Generic negative patterns last
("dürfen nicht", "exclude"),
("dürfen keine", "exclude"),
("darf nicht", "exclude"),
("darf keine", "exclude"),
("nicht zulässig", "exclude"),
("nicht erlaubt", "exclude"),
("verboten", "exclude"),
("untersagt", "exclude"),
]
# ============================================================================
# CONTAINER / COMPOSITE OBJECTS (must NOT become atomic)
# ============================================================================
CONTAINER_OBJECTS: set[str] = {
"sichere sitzungsverwaltung",
"token-schutz",
"sorgfaltspflichten für drittkomponenten",
"risikomanagementsystem",
"secure development lifecycle",
"informationssicherheitsmanagement",
"datenschutzmanagement",
"ki-governance",
"sicherheitsmaßnahmen",
"technische und organisatorische maßnahmen",
"compliance-programm",
"umfassendes risikomanagement",
"sicherer softwareentwicklungsprozess",
}
# ============================================================================
# EVIDENCE INDICATORS (must NOT become a control)
# ============================================================================
EVIDENCE_INDICATORS: set[str] = {
"nachweis", "dokumentation", "screenshot", "export", "auditbericht",
"prüfbericht", "zertifizierung", "log-auszug", "jira-ticket",
"servicenow-ticket", "sbom-nachweis", "freigabevermerk",
"review-protokoll", "testprotokoll",
}
# ============================================================================
# FRAMEWORK REFERENCES (must NOT become atomic directly)
# ============================================================================
_FRAMEWORK_PATTERNS: list[str] = [
r"OWASP\s+ASVS\s+V\d",
r"OWASP\s+API\s+Top\s+10",
r"NIST\s+SP\s+800-\d+",
r"NIST\s+IA-\d+",
r"NIST\s+AC-\d+",
r"BSI\s+IT-Grundschutz",
r"BSI\s+200-\d",
r"CSA\s+CCM",
r"ISO\s+27001",
r"ISO\s+27002",
]
# ============================================================================
# CLASSIFICATION FUNCTIONS
# ============================================================================
def classify_action(text: str) -> str:
"""Classify an obligation action text into a canonical action_type."""
text_lower = text.lower().strip()
# Check negative patterns first
for pattern, action_type in _NEGATIVE_PATTERNS:
if pattern in text_lower:
return action_type
# Direct alias match
if text_lower in _ALIAS_TO_ACTION:
return _ALIAS_TO_ACTION[text_lower]
# Substring match (longest first)
best_match = ""
best_action = "implement" # default fallback
for alias, action_type in sorted(_ALIAS_TO_ACTION.items(), key=lambda x: -len(x[0])):
if alias in text_lower and len(alias) > len(best_match):
best_match = alias
best_action = action_type
return best_action
def get_phase(action_type: str) -> str:
"""Get the control_phase for an action_type."""
info = ACTION_TYPES.get(action_type, {})
return info.get("phase", "implementation")
def is_container(text: str) -> bool:
"""Check if obligation text describes a container/composite — not atomic."""
text_lower = text.lower().strip()
return any(container in text_lower for container in CONTAINER_OBJECTS)
def is_evidence(text: str) -> bool:
"""Check if obligation text is actually evidence, not a control."""
text_lower = text.lower().strip()
# Primary check: evidence indicators at the start
for indicator in EVIDENCE_INDICATORS:
if text_lower.startswith(indicator) or f"ein {indicator}" in text_lower:
return True
# Secondary: "X dokumentieren" where X is another action's result
if text_lower.endswith("dokumentieren") or text_lower.endswith("dokumentiert"):
# Check if the primary subject is an action result, not a standalone duty
action_words = {"tests", "maßnahmen", "ergebnisse", "prüfungen", "änderungen"}
if any(w in text_lower for w in action_words):
return True
return False
def is_framework_reference(text: str) -> bool:
"""Check if obligation references a framework that should be decomposed, not atomic."""
for pattern in _FRAMEWORK_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
# Only if the text is a generic "implement X framework" statement
implement_words = {"umsetzen", "implementieren", "einhalten", "erfüllen", "anwenden"}
text_lower = text.lower()
if any(w in text_lower for w in implement_words):
return True
return False
def classify_obligation(text: str, action: str = "") -> dict:
"""Classify an obligation for pre-LLM routing.
Returns:
{
"routing": "atomic" | "composite" | "evidence" | "framework_container",
"action_type": str,
"phase": str,
"reason": str,
}
"""
if is_evidence(text):
return {
"routing": "evidence",
"action_type": "document",
"phase": "evidence",
"reason": f"Evidence indicator detected",
}
if is_container(text):
return {
"routing": "composite",
"action_type": classify_action(action or text),
"phase": get_phase(classify_action(action or text)),
"reason": "Container/composite object detected",
}
if is_framework_reference(text):
return {
"routing": "framework_container",
"action_type": classify_action(action or text),
"phase": get_phase(classify_action(action or text)),
"reason": "Framework reference detected",
}
action_type = classify_action(action or text)
return {
"routing": "atomic",
"action_type": action_type,
"phase": get_phase(action_type),
"reason": "Atomic obligation",
}
def build_canonical_key(
action_type: str,
normalized_object: str,
phase: Optional[str] = None,
asset_scope: Optional[str] = None,
) -> str:
"""Build a canonical dedup key."""
parts = [action_type, normalized_object]
if phase:
parts.append(phase)
if asset_scope:
parts.append(asset_scope)
return ":".join(parts)

View File

@@ -3395,7 +3395,8 @@ class DecompositionPass:
"decomposition_method": "pass0b",
"engine_version": "v2",
"action_object_class": getattr(atomic, "domain", ""),
"merge_group_hint": atomic.source_regulation or "",
"merge_group_hint": getattr(atomic, "merge_group_hint", "") or atomic.source_regulation or "",
"obligation_candidate_id": atomic.obligation_candidate_id or "",
"decomposition_confidence": getattr(
atomic, "_decomposition_confidence", None
),

View File

@@ -0,0 +1,150 @@
"""Tests for Control Ontology — action classification, evidence/container detection."""
import pytest
from services.control_ontology import (
classify_action, classify_obligation, is_container, is_evidence,
is_framework_reference, build_canonical_key, get_phase,
)
class TestClassifyAction:
def test_implement(self):
assert classify_action("implementieren") == "implement"
assert classify_action("umsetzen") == "implement"
assert classify_action("einführen") == "implement"
assert classify_action("etablieren") == "implement"
def test_monitor(self):
assert classify_action("überwachen") == "monitor"
def test_test(self):
assert classify_action("testen") == "test"
def test_prevent(self):
assert classify_action("verhindern") == "prevent"
def test_exclude(self):
assert classify_action("nicht zulassen") == "exclude"
def test_negative_pattern(self):
assert classify_action("dürfen nicht verwendet werden") == "exclude"
assert classify_action("darf nicht wiederverwendet werden") == "prevent"
def test_compound_picks_first(self):
# Compound text — should pick the dominant action
result = classify_action("identifizieren und bewerten")
assert result in ("identify", "assess")
def test_schulen(self):
assert classify_action("schulen") == "train"
def test_melden(self):
assert classify_action("melden") == "report"
class TestIsContainer:
def test_session_management(self):
assert is_container("Sichere Sitzungsverwaltung muss umgesetzt werden")
def test_token_protection(self):
assert is_container("Token-Schutz muss umgesetzt werden")
def test_risk_management(self):
assert is_container("Umfassendes Risikomanagement einrichten")
def test_not_container(self):
assert not is_container("Rate-Limiting für API-Endpunkte konfigurieren")
assert not is_container("MFA für privilegierte Accounts aktivieren")
class TestIsEvidence:
def test_sbom_nachweis(self):
assert is_evidence("Ein SBOM-Nachweis muss vorliegen")
def test_screenshot(self):
assert is_evidence("Ein Screenshot der MFA-Konfiguration ist vorzulegen")
def test_auditbericht(self):
assert is_evidence("Ein Auditbericht zur Zugriffskontrolle muss vorhanden sein")
def test_tests_dokumentieren(self):
assert is_evidence("Tests dokumentieren")
assert is_evidence("Tests des Verfahrens dokumentiert")
def test_massnahmen_dokumentieren(self):
assert is_evidence("Ergriffene Maßnahmen dokumentieren")
def test_not_evidence_standalone_doc_duty(self):
# VVT führen is a standalone documentation duty, not evidence
assert not is_evidence("Verarbeitungsverzeichnis führen")
def test_not_evidence_implement(self):
assert not is_evidence("Rate-Limiting implementieren")
assert not is_evidence("MFA für privilegierte Accounts aktivieren")
class TestIsFrameworkReference:
def test_owasp_asvs(self):
assert is_framework_reference("OWASP ASVS V3 Session Management umsetzen")
def test_nist(self):
assert is_framework_reference("NIST SP 800-53 IA-Anforderungen implementieren")
def test_not_framework_specific(self):
assert not is_framework_reference("Rate-Limiting konfigurieren")
assert not is_framework_reference("MFA aktivieren")
class TestClassifyObligation:
def test_atomic(self):
result = classify_obligation("Rate-Limiting für API-Endpunkte konfigurieren", "konfigurieren")
assert result["routing"] == "atomic"
assert result["action_type"] == "configure"
def test_evidence_routed(self):
result = classify_obligation("Ein SBOM-Nachweis muss vorliegen")
assert result["routing"] == "evidence"
def test_container_routed(self):
result = classify_obligation("Sichere Sitzungsverwaltung muss umgesetzt werden")
assert result["routing"] == "composite"
def test_framework_routed(self):
result = classify_obligation("OWASP ASVS V3 umsetzen", "umsetzen")
assert result["routing"] == "framework_container"
def test_negative_obligation(self):
result = classify_obligation("Sensible Daten dürfen nicht in URLs übertragen werden")
assert result["routing"] == "atomic"
assert result["action_type"] == "exclude"
class TestBuildCanonicalKey:
def test_minimal(self):
key = build_canonical_key("implement", "api_rate_limiting")
assert key == "implement:api_rate_limiting"
def test_with_phase(self):
key = build_canonical_key("implement", "api_rate_limiting", phase="implementation")
assert key == "implement:api_rate_limiting:implementation"
def test_full(self):
key = build_canonical_key("implement", "api_rate_limiting", "implementation", "api_endpoints")
assert key == "implement:api_rate_limiting:implementation:api_endpoints"
class TestGetPhase:
def test_implement(self):
assert get_phase("implement") == "implementation"
def test_monitor(self):
assert get_phase("monitor") == "monitoring"
def test_test(self):
assert get_phase("test") == "testing"
def test_document(self):
assert get_phase("document") == "evidence"
def test_unknown(self):
assert get_phase("unknown_action") == "implementation"