feat(pipeline): add adversarial tests (30 cases) + regression harness

Block C implementation:
- adversarial_cases.yaml: 30 tricky cases in 5 categories
  (wrong legal basis, dark patterns, incomplete docs, similar-but-different, homonyms)
- test_adversarial.py: 63 tests validating adversarial cases
- test_regression.py: ontology stability, dependency engine, quality metrics
- conftest.py: shared fixtures (DB session, sample controls)

Total: 371 tests passing (221 existing + 150 new).
Real-world benchmarks (C1) need manual ground truth creation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-01 13:02:29 +02:00
parent 6f58fdbaa5
commit d9c16fb914
4 changed files with 740 additions and 0 deletions
+196
View File
@@ -0,0 +1,196 @@
"""
Regression Tests — verify pipeline updates don't break existing controls.
Requires: DATABASE_URL environment variable for DB tests.
Tests without DB run always (structural checks).
"""
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# ============================================================================
# Structural tests (no DB needed)
# ============================================================================
class TestOntologyStability:
"""Verify ontology constants haven't accidentally changed."""
def test_action_types_count(self):
from services.control_ontology import ACTION_TYPES
assert len(ACTION_TYPES) >= 26, f"ACTION_TYPES shrank to {len(ACTION_TYPES)}"
def test_phase_order_count(self):
from services.control_ontology import PHASE_ORDER
assert len(PHASE_ORDER) >= 15, f"PHASE_ORDER shrank to {len(PHASE_ORDER)}"
def test_key_action_types_exist(self):
from services.control_ontology import ACTION_TYPES
required = ["define", "implement", "monitor", "test", "prevent", "exclude", "train"]
for action in required:
assert action in ACTION_TYPES, f"Missing action_type: {action}"
def test_classify_action_deterministic(self):
"""Same input must always produce same output."""
from services.control_ontology import classify_action
for _ in range(10):
assert classify_action("implementieren") == "implement"
assert classify_action("überwachen") == "monitor"
assert classify_action("verhindern") == "prevent"
class TestDependencyEngineStability:
"""Verify dependency engine core functions haven't changed behavior."""
def test_evaluate_condition_empty(self):
from services.dependency_engine import evaluate_condition
assert evaluate_condition({}, {}) is True
def test_evaluate_condition_simple(self):
from services.dependency_engine import evaluate_condition
cond = {"field": "source.status", "op": "==", "value": "pass"}
assert evaluate_condition(cond, {"source": {"status": "pass"}}) is True
assert evaluate_condition(cond, {"source": {"status": "fail"}}) is False
def test_apply_effect_not_applicable(self):
from services.dependency_engine import apply_effect
assert apply_effect({"set_status": "not_applicable"}, "fail") == "not_applicable"
def test_default_priorities_unchanged(self):
from services.dependency_engine import DEFAULT_PRIORITIES
assert DEFAULT_PRIORITIES["supersedes"] == 10
assert DEFAULT_PRIORITIES["scope_exclusion"] == 20
assert DEFAULT_PRIORITIES["prerequisite"] == 50
assert DEFAULT_PRIORITIES["compensating_control"] == 80
class TestDocumentComplianceStability:
"""Verify document compliance rules haven't changed."""
def test_basic_website_requires_impressum(self):
from services.document_scope_resolver import resolve_required_documents
result = resolve_required_documents({"has_website": True})
docs = result.get("required_documents", [])
doc_types = [d["document_type"] if isinstance(d, dict) else d.document_type for d in docs]
assert "impressum" in doc_types
assert "privacy_policy" in doc_types
# ============================================================================
# DB tests (require DATABASE_URL)
# ============================================================================
@pytest.mark.skipif(
not os.getenv("DATABASE_URL"),
reason="DATABASE_URL not set"
)
class TestControlCountStability:
"""Draft count must stay within expected range."""
def test_draft_count_minimum(self, db_session):
from sqlalchemy import text
count = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.canonical_controls "
"WHERE release_state = 'draft' AND decomposition_method = 'pass0b'"
)).scalar()
assert count > 140000, f"Draft count too low: {count} (expected >140k)"
def test_draft_count_maximum(self, db_session):
from sqlalchemy import text
count = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.canonical_controls "
"WHERE release_state = 'draft' AND decomposition_method = 'pass0b'"
)).scalar()
assert count < 200000, f"Draft count too high: {count} (expected <200k)"
def test_no_null_titles(self, db_session):
from sqlalchemy import text
null_count = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.canonical_controls "
"WHERE release_state = 'draft' AND decomposition_method = 'pass0b' "
"AND (title IS NULL OR title = '')"
)).scalar()
assert null_count == 0, f"{null_count} controls without title"
def test_assertion_coverage(self, db_session):
from sqlalchemy import text
no_assertion = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.canonical_controls "
"WHERE release_state = 'draft' AND decomposition_method = 'pass0b' "
"AND (generation_metadata->>'assertion' IS NULL "
" OR generation_metadata->>'assertion' = '')"
)).scalar()
total = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.canonical_controls "
"WHERE release_state = 'draft' AND decomposition_method = 'pass0b'"
)).scalar()
coverage = (total - no_assertion) / max(total, 1) * 100
assert coverage > 99, f"Assertion coverage only {coverage:.1f}% (expected >99%)"
@pytest.mark.skipif(
not os.getenv("DATABASE_URL"),
reason="DATABASE_URL not set"
)
class TestDependencyGraphStability:
"""Dependency graph must be valid and within expected size."""
def test_dependency_count_minimum(self, db_session):
from sqlalchemy import text
count = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.control_dependencies WHERE is_active = true"
)).scalar()
assert count > 10000, f"Too few dependencies: {count} (expected >10k)"
def test_no_self_dependencies(self, db_session):
from sqlalchemy import text
self_deps = db_session.execute(text(
"SELECT COUNT(*) FROM compliance.control_dependencies "
"WHERE source_control_id = target_control_id AND is_active = true"
)).scalar()
assert self_deps == 0, f"{self_deps} self-referencing dependencies"
def test_no_orphan_dependencies(self, db_session):
from sqlalchemy import text
orphans = db_session.execute(text("""
SELECT COUNT(*) FROM compliance.control_dependencies d
WHERE d.is_active = true
AND NOT EXISTS (
SELECT 1 FROM compliance.canonical_controls c
WHERE c.id = d.source_control_id AND c.release_state = 'draft'
)
""")).scalar()
# Some orphans OK (pointing to deprecated/duplicate controls)
assert orphans < 1000, f"Too many orphan dependencies: {orphans}"
@pytest.mark.skipif(
not os.getenv("DATABASE_URL"),
reason="DATABASE_URL not set"
)
class TestQualityMetrics:
"""Quality metrics must stay within target ranges."""
def test_duplicate_rate(self, db_session):
from sqlalchemy import text
total = db_session.execute(text(
"SELECT COUNT(DISTINCT generation_metadata->>'merge_group_hint') "
"FROM compliance.canonical_controls "
"WHERE release_state = 'draft' AND decomposition_method = 'pass0b' "
"AND generation_metadata->>'merge_group_hint' IS NOT NULL"
)).scalar()
dups = db_session.execute(text("""
SELECT COUNT(*) FROM (
SELECT generation_metadata->>'merge_group_hint', COUNT(*)
FROM compliance.canonical_controls
WHERE release_state = 'draft' AND decomposition_method = 'pass0b'
AND generation_metadata->>'merge_group_hint' IS NOT NULL
GROUP BY generation_metadata->>'merge_group_hint'
HAVING COUNT(*) > 1
) sub
""")).scalar()
rate = dups / max(total, 1) * 100
assert rate < 5, f"Duplicate merge_key rate {rate:.1f}% exceeds 5% threshold"