From b3fbbbacfe668cb0bc9e823339c4f763cb5397c4 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 26 Apr 2026 09:06:39 +0200 Subject: [PATCH] =?UTF-8?q?feat(control-pipeline):=20Control=20Ontology=20?= =?UTF-8?q?v1=20=E2=80=94=20action=20types,=20evidence/container/framework?= =?UTF-8?q?=20detection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Block 7.1-7.2 from masterplan: - 26 action_types with German aliases + phase mapping - Negative obligation patterns (exclude, prevent, enforce) - Container detection (11 composite objects that must not become atomic) - Evidence detection (14 indicators + "X dokumentieren" pattern) - Framework reference detection (OWASP, NIST, BSI, CSA, ISO patterns) - classify_obligation() routes to: atomic, composite, evidence, framework_container - build_canonical_key() for deterministic dedup - 36 tests covering all classification functions Also: merge_key bug fix in _process_pass0b_control() Co-Authored-By: Claude Opus 4.6 (1M context) --- control-pipeline/services/control_ontology.py | 342 ++++++++++++++++++ .../services/decomposition_pass.py | 3 +- .../tests/test_control_ontology.py | 150 ++++++++ 3 files changed, 494 insertions(+), 1 deletion(-) create mode 100644 control-pipeline/services/control_ontology.py create mode 100644 control-pipeline/tests/test_control_ontology.py diff --git a/control-pipeline/services/control_ontology.py b/control-pipeline/services/control_ontology.py new file mode 100644 index 0000000..9449c73 --- /dev/null +++ b/control-pipeline/services/control_ontology.py @@ -0,0 +1,342 @@ +""" +Control Ontology — Controlled vocabulary for action types, object classes, +normalized objects, and pre-LLM classification. + +Used by: + 1. Pre-LLM filter (classify obligations before sending to API) + 2. Canonical key generation (deterministic dedup key) + 3. Post-LLM validation (reject invalid action_type/object_class) + 4. BatchDedup (merge_group from normalized_object) +""" + +from __future__ import annotations + +import re +from typing import Optional + +# ============================================================================ +# ACTION TYPES (26) — with German aliases +# ============================================================================ + +ACTION_TYPES: dict[str, dict] = { + "define": { + "aliases": ["definieren", "festlegen", "bestimmen", "vorgeben"], + "phase": "definition", + }, + "document": { + "aliases": ["dokumentieren", "aufzeichnen", "protokollieren", "schriftlich festhalten"], + "phase": "evidence", + }, + "approve": { + "aliases": ["freigeben", "genehmigen", "bestätigen"], + "phase": "governance", + }, + "implement": { + "aliases": ["implementieren", "umsetzen", "einführen", "einsetzen", "bereitstellen", + "etablieren", "einrichten", "aufbauen"], + "phase": "implementation", + }, + "configure": { + "aliases": ["konfigurieren", "einstellen", "parametrisieren"], + "phase": "configuration", + }, + "enforce": { + "aliases": ["durchsetzen", "erzwingen", "technisch erzwingen"], + "phase": "implementation", + }, + "maintain": { + "aliases": ["pflegen", "aktuell halten", "aufrechterhalten", "führen"], + "phase": "operation", + }, + "monitor": { + "aliases": ["überwachen", "beobachten", "monitoren"], + "phase": "monitoring", + }, + "review": { + "aliases": ["überprüfen", "prüfen", "reviewen", "kontrollieren"], + "phase": "review", + }, + "assess": { + "aliases": ["bewerten", "beurteilen", "einschätzen", "analysieren"], + "phase": "assessment", + }, + "identify": { + "aliases": ["identifizieren", "erkennen", "erfassen", "feststellen"], + "phase": "assessment", + }, + "remediate": { + "aliases": ["beheben", "adressieren", "mitigieren", "behandeln", "abstellen"], + "phase": "remediation", + }, + "test": { + "aliases": ["testen", "ausprobieren", "Test durchführen"], + "phase": "testing", + }, + "verify": { + "aliases": ["verifizieren", "nachweisen", "bestätigen"], + "phase": "testing", + }, + "validate": { + "aliases": ["validieren", "Wirksamkeit prüfen"], + "phase": "validation", + }, + "report": { + "aliases": ["melden", "berichten", "Anzeige erstatten"], + "phase": "reporting", + }, + "notify": { + "aliases": ["benachrichtigen", "informieren", "unterrichten"], + "phase": "reporting", + }, + "train": { + "aliases": ["schulen", "unterweisen", "sensibilisieren"], + "phase": "training", + }, + "retain": { + "aliases": ["aufbewahren", "archivieren", "speichern"], + "phase": "evidence", + }, + "delete": { + "aliases": ["löschen", "vernichten", "entfernen"], + "phase": "operation", + }, + "prevent": { + "aliases": ["verhindern", "vermeiden", "unterbinden"], + "phase": "implementation", + }, + "exclude": { + "aliases": ["nicht zulassen", "ausschließen", "verbieten", "untersagen"], + "phase": "implementation", + }, + "restrict_access": { + "aliases": ["Zugriff beschränken", "autorisieren", "berechtigen", "beschränken"], + "phase": "implementation", + }, + "encrypt": { + "aliases": ["verschlüsseln", "kryptografisch schützen"], + "phase": "implementation", + }, + "invalidate": { + "aliases": ["invalidieren", "ungültig machen", "widerrufen"], + "phase": "operation", + }, + "issue": { + "aliases": ["ausstellen", "vergeben", "erzeugen", "generieren"], + "phase": "operation", + }, +} + +# Build reverse lookup: German alias → action_type +_ALIAS_TO_ACTION: dict[str, str] = {} +for action_type, info in ACTION_TYPES.items(): + for alias in info["aliases"]: + _ALIAS_TO_ACTION[alias.lower()] = action_type + + +# ============================================================================ +# NEGATIVE OBLIGATION PATTERNS +# ============================================================================ + +_NEGATIVE_PATTERNS: list[tuple[str, str]] = [ + # Longer/specific patterns first (checked in order) + ("darf nicht wiederverwendet", "prevent"), + ("nicht in der URL", "prevent"), + ("nicht im Token", "prevent"), + ("nicht in Logs", "prevent"), + ("verhindern", "prevent"), + ("unterbinden", "prevent"), + ("abweisen", "enforce"), + ("blockieren", "enforce"), + ("zurückweisen", "enforce"), + # Generic negative patterns last + ("dürfen nicht", "exclude"), + ("dürfen keine", "exclude"), + ("darf nicht", "exclude"), + ("darf keine", "exclude"), + ("nicht zulässig", "exclude"), + ("nicht erlaubt", "exclude"), + ("verboten", "exclude"), + ("untersagt", "exclude"), +] + + +# ============================================================================ +# CONTAINER / COMPOSITE OBJECTS (must NOT become atomic) +# ============================================================================ + +CONTAINER_OBJECTS: set[str] = { + "sichere sitzungsverwaltung", + "token-schutz", + "sorgfaltspflichten für drittkomponenten", + "risikomanagementsystem", + "secure development lifecycle", + "informationssicherheitsmanagement", + "datenschutzmanagement", + "ki-governance", + "sicherheitsmaßnahmen", + "technische und organisatorische maßnahmen", + "compliance-programm", + "umfassendes risikomanagement", + "sicherer softwareentwicklungsprozess", +} + + +# ============================================================================ +# EVIDENCE INDICATORS (must NOT become a control) +# ============================================================================ + +EVIDENCE_INDICATORS: set[str] = { + "nachweis", "dokumentation", "screenshot", "export", "auditbericht", + "prüfbericht", "zertifizierung", "log-auszug", "jira-ticket", + "servicenow-ticket", "sbom-nachweis", "freigabevermerk", + "review-protokoll", "testprotokoll", +} + + +# ============================================================================ +# FRAMEWORK REFERENCES (must NOT become atomic directly) +# ============================================================================ + +_FRAMEWORK_PATTERNS: list[str] = [ + r"OWASP\s+ASVS\s+V\d", + r"OWASP\s+API\s+Top\s+10", + r"NIST\s+SP\s+800-\d+", + r"NIST\s+IA-\d+", + r"NIST\s+AC-\d+", + r"BSI\s+IT-Grundschutz", + r"BSI\s+200-\d", + r"CSA\s+CCM", + r"ISO\s+27001", + r"ISO\s+27002", +] + + +# ============================================================================ +# CLASSIFICATION FUNCTIONS +# ============================================================================ + + +def classify_action(text: str) -> str: + """Classify an obligation action text into a canonical action_type.""" + text_lower = text.lower().strip() + + # Check negative patterns first + for pattern, action_type in _NEGATIVE_PATTERNS: + if pattern in text_lower: + return action_type + + # Direct alias match + if text_lower in _ALIAS_TO_ACTION: + return _ALIAS_TO_ACTION[text_lower] + + # Substring match (longest first) + best_match = "" + best_action = "implement" # default fallback + for alias, action_type in sorted(_ALIAS_TO_ACTION.items(), key=lambda x: -len(x[0])): + if alias in text_lower and len(alias) > len(best_match): + best_match = alias + best_action = action_type + + return best_action + + +def get_phase(action_type: str) -> str: + """Get the control_phase for an action_type.""" + info = ACTION_TYPES.get(action_type, {}) + return info.get("phase", "implementation") + + +def is_container(text: str) -> bool: + """Check if obligation text describes a container/composite — not atomic.""" + text_lower = text.lower().strip() + return any(container in text_lower for container in CONTAINER_OBJECTS) + + +def is_evidence(text: str) -> bool: + """Check if obligation text is actually evidence, not a control.""" + text_lower = text.lower().strip() + + # Primary check: evidence indicators at the start + for indicator in EVIDENCE_INDICATORS: + if text_lower.startswith(indicator) or f"ein {indicator}" in text_lower: + return True + + # Secondary: "X dokumentieren" where X is another action's result + if text_lower.endswith("dokumentieren") or text_lower.endswith("dokumentiert"): + # Check if the primary subject is an action result, not a standalone duty + action_words = {"tests", "maßnahmen", "ergebnisse", "prüfungen", "änderungen"} + if any(w in text_lower for w in action_words): + return True + + return False + + +def is_framework_reference(text: str) -> bool: + """Check if obligation references a framework that should be decomposed, not atomic.""" + for pattern in _FRAMEWORK_PATTERNS: + if re.search(pattern, text, re.IGNORECASE): + # Only if the text is a generic "implement X framework" statement + implement_words = {"umsetzen", "implementieren", "einhalten", "erfüllen", "anwenden"} + text_lower = text.lower() + if any(w in text_lower for w in implement_words): + return True + return False + + +def classify_obligation(text: str, action: str = "") -> dict: + """Classify an obligation for pre-LLM routing. + + Returns: + { + "routing": "atomic" | "composite" | "evidence" | "framework_container", + "action_type": str, + "phase": str, + "reason": str, + } + """ + if is_evidence(text): + return { + "routing": "evidence", + "action_type": "document", + "phase": "evidence", + "reason": f"Evidence indicator detected", + } + + if is_container(text): + return { + "routing": "composite", + "action_type": classify_action(action or text), + "phase": get_phase(classify_action(action or text)), + "reason": "Container/composite object detected", + } + + if is_framework_reference(text): + return { + "routing": "framework_container", + "action_type": classify_action(action or text), + "phase": get_phase(classify_action(action or text)), + "reason": "Framework reference detected", + } + + action_type = classify_action(action or text) + return { + "routing": "atomic", + "action_type": action_type, + "phase": get_phase(action_type), + "reason": "Atomic obligation", + } + + +def build_canonical_key( + action_type: str, + normalized_object: str, + phase: Optional[str] = None, + asset_scope: Optional[str] = None, +) -> str: + """Build a canonical dedup key.""" + parts = [action_type, normalized_object] + if phase: + parts.append(phase) + if asset_scope: + parts.append(asset_scope) + return ":".join(parts) diff --git a/control-pipeline/services/decomposition_pass.py b/control-pipeline/services/decomposition_pass.py index f765ed9..14d3c13 100644 --- a/control-pipeline/services/decomposition_pass.py +++ b/control-pipeline/services/decomposition_pass.py @@ -3395,7 +3395,8 @@ class DecompositionPass: "decomposition_method": "pass0b", "engine_version": "v2", "action_object_class": getattr(atomic, "domain", ""), - "merge_group_hint": atomic.source_regulation or "", + "merge_group_hint": getattr(atomic, "merge_group_hint", "") or atomic.source_regulation or "", + "obligation_candidate_id": atomic.obligation_candidate_id or "", "decomposition_confidence": getattr( atomic, "_decomposition_confidence", None ), diff --git a/control-pipeline/tests/test_control_ontology.py b/control-pipeline/tests/test_control_ontology.py new file mode 100644 index 0000000..f7bfe84 --- /dev/null +++ b/control-pipeline/tests/test_control_ontology.py @@ -0,0 +1,150 @@ +"""Tests for Control Ontology — action classification, evidence/container detection.""" + +import pytest +from services.control_ontology import ( + classify_action, classify_obligation, is_container, is_evidence, + is_framework_reference, build_canonical_key, get_phase, +) + + +class TestClassifyAction: + def test_implement(self): + assert classify_action("implementieren") == "implement" + assert classify_action("umsetzen") == "implement" + assert classify_action("einführen") == "implement" + assert classify_action("etablieren") == "implement" + + def test_monitor(self): + assert classify_action("überwachen") == "monitor" + + def test_test(self): + assert classify_action("testen") == "test" + + def test_prevent(self): + assert classify_action("verhindern") == "prevent" + + def test_exclude(self): + assert classify_action("nicht zulassen") == "exclude" + + def test_negative_pattern(self): + assert classify_action("dürfen nicht verwendet werden") == "exclude" + assert classify_action("darf nicht wiederverwendet werden") == "prevent" + + def test_compound_picks_first(self): + # Compound text — should pick the dominant action + result = classify_action("identifizieren und bewerten") + assert result in ("identify", "assess") + + def test_schulen(self): + assert classify_action("schulen") == "train" + + def test_melden(self): + assert classify_action("melden") == "report" + + +class TestIsContainer: + def test_session_management(self): + assert is_container("Sichere Sitzungsverwaltung muss umgesetzt werden") + + def test_token_protection(self): + assert is_container("Token-Schutz muss umgesetzt werden") + + def test_risk_management(self): + assert is_container("Umfassendes Risikomanagement einrichten") + + def test_not_container(self): + assert not is_container("Rate-Limiting für API-Endpunkte konfigurieren") + assert not is_container("MFA für privilegierte Accounts aktivieren") + + +class TestIsEvidence: + def test_sbom_nachweis(self): + assert is_evidence("Ein SBOM-Nachweis muss vorliegen") + + def test_screenshot(self): + assert is_evidence("Ein Screenshot der MFA-Konfiguration ist vorzulegen") + + def test_auditbericht(self): + assert is_evidence("Ein Auditbericht zur Zugriffskontrolle muss vorhanden sein") + + def test_tests_dokumentieren(self): + assert is_evidence("Tests dokumentieren") + assert is_evidence("Tests des Verfahrens dokumentiert") + + def test_massnahmen_dokumentieren(self): + assert is_evidence("Ergriffene Maßnahmen dokumentieren") + + def test_not_evidence_standalone_doc_duty(self): + # VVT führen is a standalone documentation duty, not evidence + assert not is_evidence("Verarbeitungsverzeichnis führen") + + def test_not_evidence_implement(self): + assert not is_evidence("Rate-Limiting implementieren") + assert not is_evidence("MFA für privilegierte Accounts aktivieren") + + +class TestIsFrameworkReference: + def test_owasp_asvs(self): + assert is_framework_reference("OWASP ASVS V3 Session Management umsetzen") + + def test_nist(self): + assert is_framework_reference("NIST SP 800-53 IA-Anforderungen implementieren") + + def test_not_framework_specific(self): + assert not is_framework_reference("Rate-Limiting konfigurieren") + assert not is_framework_reference("MFA aktivieren") + + +class TestClassifyObligation: + def test_atomic(self): + result = classify_obligation("Rate-Limiting für API-Endpunkte konfigurieren", "konfigurieren") + assert result["routing"] == "atomic" + assert result["action_type"] == "configure" + + def test_evidence_routed(self): + result = classify_obligation("Ein SBOM-Nachweis muss vorliegen") + assert result["routing"] == "evidence" + + def test_container_routed(self): + result = classify_obligation("Sichere Sitzungsverwaltung muss umgesetzt werden") + assert result["routing"] == "composite" + + def test_framework_routed(self): + result = classify_obligation("OWASP ASVS V3 umsetzen", "umsetzen") + assert result["routing"] == "framework_container" + + def test_negative_obligation(self): + result = classify_obligation("Sensible Daten dürfen nicht in URLs übertragen werden") + assert result["routing"] == "atomic" + assert result["action_type"] == "exclude" + + +class TestBuildCanonicalKey: + def test_minimal(self): + key = build_canonical_key("implement", "api_rate_limiting") + assert key == "implement:api_rate_limiting" + + def test_with_phase(self): + key = build_canonical_key("implement", "api_rate_limiting", phase="implementation") + assert key == "implement:api_rate_limiting:implementation" + + def test_full(self): + key = build_canonical_key("implement", "api_rate_limiting", "implementation", "api_endpoints") + assert key == "implement:api_rate_limiting:implementation:api_endpoints" + + +class TestGetPhase: + def test_implement(self): + assert get_phase("implement") == "implementation" + + def test_monitor(self): + assert get_phase("monitor") == "monitoring" + + def test_test(self): + assert get_phase("test") == "testing" + + def test_document(self): + assert get_phase("document") == "evidence" + + def test_unknown(self): + assert get_phase("unknown_action") == "implementation"