feat(pipeline): implement golden test suite + fix ontology patterns

- Add test_golden_controls.py: 37 tests covering all 8 YAML categories
  (container, framework, evidence, negative, title, split, scope, merge_key)
- Fix evidence detection: handle German feminine articles (eine/einer/etc.)
- Fix framework detection: use verb stems for conjugated German verbs
- Add framework patterns: OWASP API6, CCM without CSA prefix, generic category
- Fix negative patterns: use "nicht übertragen/gespeichert/erscheinen" before
  generic "dürfen nicht" to correctly route prevent vs exclude

All 73 tests passing (36 ontology + 37 golden).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-26 09:48:12 +02:00
parent d1f3b9ffcd
commit d660a45bb5
3 changed files with 259 additions and 11 deletions

View File

@@ -139,10 +139,14 @@ for action_type, info in ACTION_TYPES.items():
_NEGATIVE_PATTERNS: list[tuple[str, str]] = [
# Longer/specific patterns first (checked in order)
("darf nicht wiederverwendet", "prevent"),
("nicht in der URL", "prevent"),
("nicht im Token", "prevent"),
("nicht in Logs", "prevent"),
("nicht wiederverwendet", "prevent"),
("nicht in der url", "prevent"),
("nicht im token", "prevent"),
("nicht in logs", "prevent"),
("nicht in urls", "prevent"),
("nicht gespeichert", "prevent"),
("nicht übertragen", "prevent"),
("nicht erscheinen", "prevent"),
("verhindern", "prevent"),
("unterbinden", "prevent"),
("abweisen", "enforce"),
@@ -199,15 +203,17 @@ EVIDENCE_INDICATORS: set[str] = {
_FRAMEWORK_PATTERNS: list[str] = [
r"OWASP\s+ASVS\s+V\d",
r"OWASP\s+API\d+",
r"OWASP\s+API\s+Top\s+10",
r"NIST\s+SP\s+800-\d+",
r"NIST\s+IA-\d+",
r"NIST\s+AC-\d+",
r"NIST\s+IA[\s-]",
r"NIST\s+AC[\s-]",
r"BSI\s+IT-Grundschutz",
r"BSI\s+200-\d",
r"CSA\s+CCM",
r"(?:CSA\s+)?CCM[\s-]",
r"ISO\s+27001",
r"ISO\s+27002",
r"alle\s+Controls\s+der\s+Kategorie",
]
@@ -258,7 +264,11 @@ def is_evidence(text: str) -> bool:
# Primary check: evidence indicators at the start
for indicator in EVIDENCE_INDICATORS:
if text_lower.startswith(indicator) or f"ein {indicator}" in text_lower:
if text_lower.startswith(indicator):
return True
# German articles: ein/eine/einen/einem/einer + indicator
for article in ("ein ", "eine ", "einen ", "einem ", "einer "):
if f"{article}{indicator}" in text_lower:
return True
# Secondary: "X dokumentieren" where X is another action's result
@@ -276,9 +286,10 @@ def is_framework_reference(text: str) -> bool:
for pattern in _FRAMEWORK_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
# Only if the text is a generic "implement X framework" statement
implement_words = {"umsetzen", "implementieren", "einhalten", "erfüllen", "anwenden"}
# Use stems to handle German conjugation (umsetzen/umzusetzen/umgesetzt)
implement_stems = ("umsetz", "umzusetz", "implementier", "einhalt", "erfüll", "anwend")
text_lower = text.lower()
if any(w in text_lower for w in implement_words):
if any(s in text_lower for s in implement_stems):
return True
return False

View File

@@ -116,7 +116,7 @@ class TestClassifyObligation:
def test_negative_obligation(self):
result = classify_obligation("Sensible Daten dürfen nicht in URLs übertragen werden")
assert result["routing"] == "atomic"
assert result["action_type"] == "exclude"
assert result["action_type"] == "prevent"
class TestBuildCanonicalKey:

View File

@@ -0,0 +1,237 @@
"""
Golden Test Suite — pytest implementation of golden_controls.yaml.
Tests Pre-LLM classification (evidence, container, framework detection),
title quality rules, and negative obligation handling via control_ontology.
"""
import sys
import os
import yaml
import pytest
# Ensure control-pipeline is in the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from services.control_ontology import (
classify_obligation,
classify_action,
build_canonical_key,
)
# ---------------------------------------------------------------------------
# Load YAML once
# ---------------------------------------------------------------------------
GOLDEN_PATH = os.path.join(os.path.dirname(__file__), "golden_controls.yaml")
with open(GOLDEN_PATH) as f:
_GOLDEN = yaml.safe_load(f)
TESTS = _GOLDEN["tests"]
QUALITY_GATES = _GOLDEN["global_quality_gates"]
def _tests_by_category(cat: str) -> list:
return [t for t in TESTS if t["category"] == cat]
# ============================================================================
# D. Container Detection (5 tests)
# ============================================================================
class TestContainerDetection:
"""GT-CONTAINER-001..005: composite obligations must be routed as composite."""
@pytest.mark.parametrize("case", _tests_by_category("container_control_detection"),
ids=lambda c: c["id"])
def test_container_routed_composite(self, case):
inp = case["input"]
result = classify_obligation(inp, "")
expected_routing = case["expected"].get("routing_type", "composite")
assert result["routing"] == expected_routing, (
f"{case['id']}: expected routing={expected_routing}, "
f"got {result['routing']} for: {inp}"
)
# ============================================================================
# E. Framework Decomposition (5 tests)
# ============================================================================
class TestFrameworkDetection:
"""GT-FRAMEWORK-001..005: framework references must be detected."""
@pytest.mark.parametrize("case", _tests_by_category("framework_decomposition"),
ids=lambda c: c["id"])
def test_framework_routed(self, case):
inp = case["input"]
result = classify_obligation(inp, "")
expected = case["expected"].get("routing_type", "framework_container")
assert result["routing"] == expected, (
f"{case['id']}: expected routing={expected}, "
f"got {result['routing']} for: {inp}"
)
# ============================================================================
# F. Evidence Leakage (5 tests)
# ============================================================================
class TestEvidenceDetection:
"""GT-EVIDENCE-001..005: evidence obligations must not become controls."""
@pytest.mark.parametrize("case", _tests_by_category("evidence_not_control"),
ids=lambda c: c["id"])
def test_evidence_detected(self, case):
inp = case["input"]
result = classify_obligation(inp, "")
assert result["routing"] == "evidence", (
f"{case['id']}: expected routing=evidence, "
f"got {result['routing']} for: {inp}"
)
# ============================================================================
# C. Negative Obligation Handling (5 tests)
# ============================================================================
class TestNegativeObligations:
"""GT-NEG-001..005: negative patterns produce correct action_type."""
@pytest.mark.parametrize("case", _tests_by_category("negative_obligation_handling"),
ids=lambda c: c["id"])
def test_negative_action_type(self, case):
inp = case["input"]
expected_action = case["expected"].get("action_type")
if not expected_action:
pytest.skip("No expected action_type specified")
result = classify_action(inp)
assert result == expected_action, (
f"{case['id']}: expected action_type={expected_action}, "
f"got {result} for: {inp}"
)
# ============================================================================
# H. Title Quality (structural tests — no LLM needed)
# ============================================================================
class TestTitleQuality:
"""GT-TITLE-001..005: structural title rules."""
def test_gt_title_001_no_truncated_endings(self):
"""Truncated titles are forbidden globally."""
assert QUALITY_GATES["truncated_titles_allowed"] is False
def test_gt_title_005_composite_not_atomic(self):
"""'Token-Schutz muss umgesetzt werden' is a composite, not atomic."""
case = next(t for t in TESTS if t["id"] == "GT-TITLE-005")
result = classify_obligation(case["input"], "")
assert result["routing"] == "composite", (
f"GT-TITLE-005: 'Token-Schutz' should be composite, got {result['routing']}"
)
# ============================================================================
# B. Compound Action Split (structural — classify_action only)
# ============================================================================
class TestCompoundActionSplit:
"""Test that compound inputs contain recognizable actions."""
def test_gt_split_001_define_and_enforce(self):
"""'definieren und durchsetzen' should yield define action."""
result = classify_action("Maximale Payload-Größen definieren")
assert result == "define"
def test_gt_split_001_enforce(self):
result = classify_action("Payload-Größen technisch durchsetzen")
assert result == "enforce"
def test_gt_split_003_identify(self):
result = classify_action("Schwachstellen identifizieren")
assert result == "identify"
def test_gt_split_003_assess(self):
result = classify_action("Schwachstellen bewerten")
assert result == "assess"
def test_gt_split_003_monitor(self):
result = classify_action("Schwachstellen überwachen")
assert result == "monitor"
# ============================================================================
# A. Duplicate Explosion (merge_key structure tests)
# ============================================================================
class TestMergeKeyStructure:
"""Verify canonical key format: action_type:object:phase:scope."""
def test_canonical_key_format(self):
key = build_canonical_key(
action_type="implement",
normalized_object="api_rate_limiting",
phase="implementation",
asset_scope="api_endpoints",
)
assert key == "implement:api_rate_limiting:implementation:api_endpoints"
def test_canonical_key_no_empty_parts(self):
key = build_canonical_key(
action_type="define",
normalized_object="payload_limits",
)
assert key.startswith("define:payload_limits")
def test_canonical_key_colon_separated(self):
key = build_canonical_key("test", "obj", "phase", "scope")
parts = key.split(":")
assert len(parts) == 4
# ============================================================================
# G. Scope Dimension (structural — these need dedup to fully verify)
# ============================================================================
class TestScopeDimension:
"""Structural checks: different actor scopes should classify as atomic."""
def test_gt_scope_001_employee_atomic(self):
result = classify_obligation("Mitarbeiter müssen Vertraulichkeit wahren.", "")
assert result["routing"] == "atomic"
def test_gt_scope_001_subcontractor_atomic(self):
result = classify_obligation("Unterauftragnehmer müssen Vertraulichkeit wahren.", "")
assert result["routing"] == "atomic"
def test_gt_scope_005_admin_mfa_atomic(self):
result = classify_obligation("Privilegierte Accounts müssen MFA verwenden.", "")
assert result["routing"] == "atomic"
def test_gt_scope_005_all_users_mfa_atomic(self):
result = classify_obligation("Alle Nutzer müssen MFA verwenden.", "")
assert result["routing"] == "atomic"
# ============================================================================
# Quality gate assertions
# ============================================================================
class TestQualityGates:
"""Verify global quality gate values from YAML."""
def test_max_controls_per_obligation(self):
assert QUALITY_GATES["max_controls_per_single_obligation"] == 6
def test_no_evidence_as_control(self):
assert QUALITY_GATES["evidence_as_atomic_control_allowed"] is False
def test_no_framework_container_as_atomic(self):
assert QUALITY_GATES["framework_container_as_atomic_allowed"] is False
def test_no_composite_as_atomic(self):
assert QUALITY_GATES["composite_control_as_atomic_allowed"] is False