diff --git a/control-pipeline/services/control_ontology.py b/control-pipeline/services/control_ontology.py index 9449c73..ef5ce1a 100644 --- a/control-pipeline/services/control_ontology.py +++ b/control-pipeline/services/control_ontology.py @@ -139,10 +139,14 @@ for action_type, info in ACTION_TYPES.items(): _NEGATIVE_PATTERNS: list[tuple[str, str]] = [ # Longer/specific patterns first (checked in order) - ("darf nicht wiederverwendet", "prevent"), - ("nicht in der URL", "prevent"), - ("nicht im Token", "prevent"), - ("nicht in Logs", "prevent"), + ("nicht wiederverwendet", "prevent"), + ("nicht in der url", "prevent"), + ("nicht im token", "prevent"), + ("nicht in logs", "prevent"), + ("nicht in urls", "prevent"), + ("nicht gespeichert", "prevent"), + ("nicht übertragen", "prevent"), + ("nicht erscheinen", "prevent"), ("verhindern", "prevent"), ("unterbinden", "prevent"), ("abweisen", "enforce"), @@ -199,15 +203,17 @@ EVIDENCE_INDICATORS: set[str] = { _FRAMEWORK_PATTERNS: list[str] = [ r"OWASP\s+ASVS\s+V\d", + r"OWASP\s+API\d+", r"OWASP\s+API\s+Top\s+10", r"NIST\s+SP\s+800-\d+", - r"NIST\s+IA-\d+", - r"NIST\s+AC-\d+", + r"NIST\s+IA[\s-]", + r"NIST\s+AC[\s-]", r"BSI\s+IT-Grundschutz", r"BSI\s+200-\d", - r"CSA\s+CCM", + r"(?:CSA\s+)?CCM[\s-]", r"ISO\s+27001", r"ISO\s+27002", + r"alle\s+Controls\s+der\s+Kategorie", ] @@ -258,8 +264,12 @@ def is_evidence(text: str) -> bool: # Primary check: evidence indicators at the start for indicator in EVIDENCE_INDICATORS: - if text_lower.startswith(indicator) or f"ein {indicator}" in text_lower: + if text_lower.startswith(indicator): return True + # German articles: ein/eine/einen/einem/einer + indicator + for article in ("ein ", "eine ", "einen ", "einem ", "einer "): + if f"{article}{indicator}" in text_lower: + return True # Secondary: "X dokumentieren" where X is another action's result if text_lower.endswith("dokumentieren") or text_lower.endswith("dokumentiert"): @@ -276,9 +286,10 @@ def is_framework_reference(text: str) -> bool: for pattern in _FRAMEWORK_PATTERNS: if re.search(pattern, text, re.IGNORECASE): # Only if the text is a generic "implement X framework" statement - implement_words = {"umsetzen", "implementieren", "einhalten", "erfüllen", "anwenden"} + # Use stems to handle German conjugation (umsetzen/umzusetzen/umgesetzt) + implement_stems = ("umsetz", "umzusetz", "implementier", "einhalt", "erfüll", "anwend") text_lower = text.lower() - if any(w in text_lower for w in implement_words): + if any(s in text_lower for s in implement_stems): return True return False diff --git a/control-pipeline/tests/test_control_ontology.py b/control-pipeline/tests/test_control_ontology.py index f7bfe84..6aa7311 100644 --- a/control-pipeline/tests/test_control_ontology.py +++ b/control-pipeline/tests/test_control_ontology.py @@ -116,7 +116,7 @@ class TestClassifyObligation: def test_negative_obligation(self): result = classify_obligation("Sensible Daten dürfen nicht in URLs übertragen werden") assert result["routing"] == "atomic" - assert result["action_type"] == "exclude" + assert result["action_type"] == "prevent" class TestBuildCanonicalKey: diff --git a/control-pipeline/tests/test_golden_controls.py b/control-pipeline/tests/test_golden_controls.py new file mode 100644 index 0000000..486d3ff --- /dev/null +++ b/control-pipeline/tests/test_golden_controls.py @@ -0,0 +1,237 @@ +""" +Golden Test Suite — pytest implementation of golden_controls.yaml. + +Tests Pre-LLM classification (evidence, container, framework detection), +title quality rules, and negative obligation handling via control_ontology. +""" + +import sys +import os +import yaml +import pytest + +# Ensure control-pipeline is in the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from services.control_ontology import ( + classify_obligation, + classify_action, + build_canonical_key, +) + + +# --------------------------------------------------------------------------- +# Load YAML once +# --------------------------------------------------------------------------- + +GOLDEN_PATH = os.path.join(os.path.dirname(__file__), "golden_controls.yaml") + +with open(GOLDEN_PATH) as f: + _GOLDEN = yaml.safe_load(f) + +TESTS = _GOLDEN["tests"] +QUALITY_GATES = _GOLDEN["global_quality_gates"] + + +def _tests_by_category(cat: str) -> list: + return [t for t in TESTS if t["category"] == cat] + + +# ============================================================================ +# D. Container Detection (5 tests) +# ============================================================================ + +class TestContainerDetection: + """GT-CONTAINER-001..005: composite obligations must be routed as composite.""" + + @pytest.mark.parametrize("case", _tests_by_category("container_control_detection"), + ids=lambda c: c["id"]) + def test_container_routed_composite(self, case): + inp = case["input"] + result = classify_obligation(inp, "") + expected_routing = case["expected"].get("routing_type", "composite") + assert result["routing"] == expected_routing, ( + f"{case['id']}: expected routing={expected_routing}, " + f"got {result['routing']} for: {inp}" + ) + + +# ============================================================================ +# E. Framework Decomposition (5 tests) +# ============================================================================ + +class TestFrameworkDetection: + """GT-FRAMEWORK-001..005: framework references must be detected.""" + + @pytest.mark.parametrize("case", _tests_by_category("framework_decomposition"), + ids=lambda c: c["id"]) + def test_framework_routed(self, case): + inp = case["input"] + result = classify_obligation(inp, "") + expected = case["expected"].get("routing_type", "framework_container") + assert result["routing"] == expected, ( + f"{case['id']}: expected routing={expected}, " + f"got {result['routing']} for: {inp}" + ) + + +# ============================================================================ +# F. Evidence Leakage (5 tests) +# ============================================================================ + +class TestEvidenceDetection: + """GT-EVIDENCE-001..005: evidence obligations must not become controls.""" + + @pytest.mark.parametrize("case", _tests_by_category("evidence_not_control"), + ids=lambda c: c["id"]) + def test_evidence_detected(self, case): + inp = case["input"] + result = classify_obligation(inp, "") + assert result["routing"] == "evidence", ( + f"{case['id']}: expected routing=evidence, " + f"got {result['routing']} for: {inp}" + ) + + +# ============================================================================ +# C. Negative Obligation Handling (5 tests) +# ============================================================================ + +class TestNegativeObligations: + """GT-NEG-001..005: negative patterns produce correct action_type.""" + + @pytest.mark.parametrize("case", _tests_by_category("negative_obligation_handling"), + ids=lambda c: c["id"]) + def test_negative_action_type(self, case): + inp = case["input"] + expected_action = case["expected"].get("action_type") + if not expected_action: + pytest.skip("No expected action_type specified") + + result = classify_action(inp) + assert result == expected_action, ( + f"{case['id']}: expected action_type={expected_action}, " + f"got {result} for: {inp}" + ) + + +# ============================================================================ +# H. Title Quality (structural tests — no LLM needed) +# ============================================================================ + +class TestTitleQuality: + """GT-TITLE-001..005: structural title rules.""" + + def test_gt_title_001_no_truncated_endings(self): + """Truncated titles are forbidden globally.""" + assert QUALITY_GATES["truncated_titles_allowed"] is False + + def test_gt_title_005_composite_not_atomic(self): + """'Token-Schutz muss umgesetzt werden' is a composite, not atomic.""" + case = next(t for t in TESTS if t["id"] == "GT-TITLE-005") + result = classify_obligation(case["input"], "") + assert result["routing"] == "composite", ( + f"GT-TITLE-005: 'Token-Schutz' should be composite, got {result['routing']}" + ) + + +# ============================================================================ +# B. Compound Action Split (structural — classify_action only) +# ============================================================================ + +class TestCompoundActionSplit: + """Test that compound inputs contain recognizable actions.""" + + def test_gt_split_001_define_and_enforce(self): + """'definieren und durchsetzen' should yield define action.""" + result = classify_action("Maximale Payload-Größen definieren") + assert result == "define" + + def test_gt_split_001_enforce(self): + result = classify_action("Payload-Größen technisch durchsetzen") + assert result == "enforce" + + def test_gt_split_003_identify(self): + result = classify_action("Schwachstellen identifizieren") + assert result == "identify" + + def test_gt_split_003_assess(self): + result = classify_action("Schwachstellen bewerten") + assert result == "assess" + + def test_gt_split_003_monitor(self): + result = classify_action("Schwachstellen überwachen") + assert result == "monitor" + + +# ============================================================================ +# A. Duplicate Explosion (merge_key structure tests) +# ============================================================================ + +class TestMergeKeyStructure: + """Verify canonical key format: action_type:object:phase:scope.""" + + def test_canonical_key_format(self): + key = build_canonical_key( + action_type="implement", + normalized_object="api_rate_limiting", + phase="implementation", + asset_scope="api_endpoints", + ) + assert key == "implement:api_rate_limiting:implementation:api_endpoints" + + def test_canonical_key_no_empty_parts(self): + key = build_canonical_key( + action_type="define", + normalized_object="payload_limits", + ) + assert key.startswith("define:payload_limits") + + def test_canonical_key_colon_separated(self): + key = build_canonical_key("test", "obj", "phase", "scope") + parts = key.split(":") + assert len(parts) == 4 + + +# ============================================================================ +# G. Scope Dimension (structural — these need dedup to fully verify) +# ============================================================================ + +class TestScopeDimension: + """Structural checks: different actor scopes should classify as atomic.""" + + def test_gt_scope_001_employee_atomic(self): + result = classify_obligation("Mitarbeiter müssen Vertraulichkeit wahren.", "") + assert result["routing"] == "atomic" + + def test_gt_scope_001_subcontractor_atomic(self): + result = classify_obligation("Unterauftragnehmer müssen Vertraulichkeit wahren.", "") + assert result["routing"] == "atomic" + + def test_gt_scope_005_admin_mfa_atomic(self): + result = classify_obligation("Privilegierte Accounts müssen MFA verwenden.", "") + assert result["routing"] == "atomic" + + def test_gt_scope_005_all_users_mfa_atomic(self): + result = classify_obligation("Alle Nutzer müssen MFA verwenden.", "") + assert result["routing"] == "atomic" + + +# ============================================================================ +# Quality gate assertions +# ============================================================================ + +class TestQualityGates: + """Verify global quality gate values from YAML.""" + + def test_max_controls_per_obligation(self): + assert QUALITY_GATES["max_controls_per_single_obligation"] == 6 + + def test_no_evidence_as_control(self): + assert QUALITY_GATES["evidence_as_atomic_control_allowed"] is False + + def test_no_framework_container_as_atomic(self): + assert QUALITY_GATES["framework_container_as_atomic_allowed"] is False + + def test_no_composite_as_atomic(self): + assert QUALITY_GATES["composite_control_as_atomic_allowed"] is False