Core engine (dependency_engine.py): - 5 dependency types: prerequisite, supersedes, compensating_control, conditional_requirement, scope_exclusion - Generic condition evaluator (JSONB rules with AND/OR/NOT/field ops) - Priority-based conflict resolution - Cycle detection (DFS) + topological sort - Full evaluation with MCP-compatible dependency_resolution trace - 39 tests all passing (incl. GHV scenario from user requirements) Automatic generator (dependency_generator.py): - Ontology-based: same normalized_object + phase sequence -> prerequisite - Pattern-based: define->implement, implement->monitor, etc. - Domain packs: YAML rules for GDPR, AI Act, CRA, Security, Labor Contracts - 14 tests all passing API routes (dependency_routes.py): - CRUD for dependencies - POST /evaluate with dependency resolution - POST /generate (auto-generation with dry_run) - POST /validate (cycle detection) - GET /graph (nodes + edges for visualization) Prompt enhancement (decomposition_pass.py): - Added dependency_hints + lifecycle_phase_order to Pass 0b prompt - Stored in generation_metadata for post-processing DB migration: control_dependencies + control_evaluation_results tables 126 tests total, all passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
507 lines
20 KiB
Python
507 lines
20 KiB
Python
"""
|
|
Tests for the Control Dependency Engine.
|
|
|
|
Pure Python tests — no DB required. Tests condition evaluation,
|
|
effect application, cycle detection, topological sort, and
|
|
full evaluation resolution.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
from services.dependency_engine import (
|
|
Dependency,
|
|
ControlState,
|
|
EvaluationResult,
|
|
EvaluationStatus,
|
|
evaluate_condition,
|
|
apply_effect,
|
|
detect_cycles,
|
|
topological_sort,
|
|
evaluate_controls,
|
|
_resolve_field,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Condition Evaluator
|
|
# ============================================================================
|
|
|
|
class TestConditionEvaluator:
|
|
|
|
def test_empty_condition_returns_true(self):
|
|
assert evaluate_condition({}, {}) is True
|
|
|
|
def test_simple_equals(self):
|
|
cond = {"field": "source.status", "op": "==", "value": "pass"}
|
|
ctx = {"source": {"status": "pass"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_simple_equals_false(self):
|
|
cond = {"field": "source.status", "op": "==", "value": "pass"}
|
|
ctx = {"source": {"status": "fail"}}
|
|
assert evaluate_condition(cond, ctx) is False
|
|
|
|
def test_not_equals(self):
|
|
cond = {"field": "source.status", "op": "!=", "value": "pass"}
|
|
ctx = {"source": {"status": "fail"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_in_operator(self):
|
|
cond = {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]}
|
|
ctx = {"context": {"company_size": "large"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_in_operator_false(self):
|
|
cond = {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]}
|
|
ctx = {"context": {"company_size": "small"}}
|
|
assert evaluate_condition(cond, ctx) is False
|
|
|
|
def test_not_in_operator(self):
|
|
cond = {"field": "context.industry", "op": "not_in", "value": ["finance", "healthcare"]}
|
|
ctx = {"context": {"industry": "retail"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_and_compound(self):
|
|
cond = {
|
|
"operator": "AND",
|
|
"clauses": [
|
|
{"field": "source.status", "op": "==", "value": "pass"},
|
|
{"field": "context.company_size", "op": "in", "value": ["large"]},
|
|
],
|
|
}
|
|
ctx = {"source": {"status": "pass"}, "context": {"company_size": "large"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_and_compound_one_false(self):
|
|
cond = {
|
|
"operator": "AND",
|
|
"clauses": [
|
|
{"field": "source.status", "op": "==", "value": "pass"},
|
|
{"field": "context.company_size", "op": "in", "value": ["large"]},
|
|
],
|
|
}
|
|
ctx = {"source": {"status": "pass"}, "context": {"company_size": "small"}}
|
|
assert evaluate_condition(cond, ctx) is False
|
|
|
|
def test_or_compound(self):
|
|
cond = {
|
|
"operator": "OR",
|
|
"clauses": [
|
|
{"field": "source.status", "op": "==", "value": "pass"},
|
|
{"field": "source.status", "op": "==", "value": "compensated_fail"},
|
|
],
|
|
}
|
|
ctx = {"source": {"status": "compensated_fail"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_not_operator(self):
|
|
cond = {
|
|
"operator": "NOT",
|
|
"clause": {"field": "source.status", "op": "==", "value": "pass"},
|
|
}
|
|
ctx = {"source": {"status": "fail"}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_nested_field_resolution(self):
|
|
ctx = {"context": {"scope_signals": {"uses_ai": True}}}
|
|
val = _resolve_field("context.scope_signals.uses_ai", ctx)
|
|
assert val is True
|
|
|
|
def test_greater_than(self):
|
|
cond = {"field": "context.employee_count", "op": ">", "value": 250}
|
|
ctx = {"context": {"employee_count": 500}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
def test_contains(self):
|
|
cond = {"field": "context.scope_signals", "op": "contains", "value": "uses_ai"}
|
|
ctx = {"context": {"scope_signals": ["uses_ai", "has_employees"]}}
|
|
assert evaluate_condition(cond, ctx) is True
|
|
|
|
|
|
# ============================================================================
|
|
# Effect Applier
|
|
# ============================================================================
|
|
|
|
class TestEffectApplier:
|
|
|
|
def test_set_status_not_applicable(self):
|
|
assert apply_effect({"set_status": "not_applicable"}, "fail") == "not_applicable"
|
|
|
|
def test_set_status_compensated_fail(self):
|
|
assert apply_effect({"set_status": "compensated_fail"}, "fail") == "compensated_fail"
|
|
|
|
def test_set_status_pass(self):
|
|
assert apply_effect({"set_status": "pass"}, "fail") == "pass"
|
|
|
|
def test_unknown_effect_returns_original(self):
|
|
assert apply_effect({"unknown_key": "value"}, "fail") == "fail"
|
|
|
|
def test_invalid_status_returns_original(self):
|
|
assert apply_effect({"set_status": "invalid_status"}, "fail") == "fail"
|
|
|
|
def test_empty_effect(self):
|
|
assert apply_effect({}, "pass") == "pass"
|
|
|
|
|
|
# ============================================================================
|
|
# Cycle Detection
|
|
# ============================================================================
|
|
|
|
class TestCycleDetection:
|
|
|
|
def test_no_cycles(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B", dependency_type="prerequisite"),
|
|
Dependency(id="2", source_control_id="B", target_control_id="C", dependency_type="prerequisite"),
|
|
]
|
|
cycles = detect_cycles(deps)
|
|
assert len(cycles) == 0
|
|
|
|
def test_simple_cycle(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B", dependency_type="supersedes"),
|
|
Dependency(id="2", source_control_id="B", target_control_id="A", dependency_type="supersedes"),
|
|
]
|
|
cycles = detect_cycles(deps)
|
|
assert len(cycles) > 0
|
|
# Both A and B should be in the cycle
|
|
cycle_nodes = set()
|
|
for c in cycles:
|
|
cycle_nodes.update(c)
|
|
assert "A" in cycle_nodes
|
|
assert "B" in cycle_nodes
|
|
|
|
def test_diamond_no_cycle(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B"),
|
|
Dependency(id="2", source_control_id="A", target_control_id="C"),
|
|
Dependency(id="3", source_control_id="B", target_control_id="D"),
|
|
Dependency(id="4", source_control_id="C", target_control_id="D"),
|
|
]
|
|
cycles = detect_cycles(deps)
|
|
assert len(cycles) == 0
|
|
|
|
def test_inactive_deps_ignored(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B", is_active=True),
|
|
Dependency(id="2", source_control_id="B", target_control_id="A", is_active=False),
|
|
]
|
|
cycles = detect_cycles(deps)
|
|
assert len(cycles) == 0
|
|
|
|
|
|
# ============================================================================
|
|
# Topological Sort
|
|
# ============================================================================
|
|
|
|
class TestTopologicalSort:
|
|
|
|
def test_linear_chain(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B"),
|
|
Dependency(id="2", source_control_id="B", target_control_id="C"),
|
|
]
|
|
order = topological_sort(deps)
|
|
assert order.index("A") < order.index("B")
|
|
assert order.index("B") < order.index("C")
|
|
|
|
def test_diamond_graph(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B"),
|
|
Dependency(id="2", source_control_id="A", target_control_id="C"),
|
|
Dependency(id="3", source_control_id="B", target_control_id="D"),
|
|
Dependency(id="4", source_control_id="C", target_control_id="D"),
|
|
]
|
|
order = topological_sort(deps)
|
|
assert order.index("A") < order.index("B")
|
|
assert order.index("A") < order.index("C")
|
|
assert order.index("B") < order.index("D")
|
|
assert order.index("C") < order.index("D")
|
|
|
|
def test_disconnected_components(self):
|
|
deps = [
|
|
Dependency(id="1", source_control_id="A", target_control_id="B"),
|
|
Dependency(id="2", source_control_id="X", target_control_id="Y"),
|
|
]
|
|
order = topological_sort(deps)
|
|
assert len(order) == 4
|
|
assert order.index("A") < order.index("B")
|
|
assert order.index("X") < order.index("Y")
|
|
|
|
|
|
# ============================================================================
|
|
# Full Evaluation
|
|
# ============================================================================
|
|
|
|
class TestEvaluateControls:
|
|
|
|
def test_no_dependencies_passthrough(self):
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
results = evaluate_controls(states, [], {})
|
|
assert results["A"].resolved_status == "pass"
|
|
assert results["B"].resolved_status == "fail"
|
|
|
|
def test_supersedes_makes_not_applicable(self):
|
|
"""GHV-Klausel (A) supersedes Schulung (B)."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="supersedes",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=10,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["A"].resolved_status == "pass"
|
|
assert results["B"].resolved_status == "not_applicable"
|
|
assert len(results["B"].dependency_resolution) == 1
|
|
assert results["B"].dependency_resolution[0]["condition_met"] is True
|
|
|
|
def test_supersedes_not_met_preserves_status(self):
|
|
"""GHV-Klausel (A) fails -> Schulung (B) stays fail."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="fail"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="supersedes",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=10,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["B"].resolved_status == "fail"
|
|
|
|
def test_prerequisite_fail_blocks_target(self):
|
|
"""define:policy (A) must pass before implement:policy (B)."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="fail"),
|
|
"B": ControlState(control_id="B", raw_status="pass"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="prerequisite",
|
|
condition={"field": "source.status", "op": "!=", "value": "pass"},
|
|
effect={"set_status": "review_required"},
|
|
priority=50,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["B"].resolved_status == "review_required"
|
|
|
|
def test_compensating_control(self):
|
|
"""ISO cert (A) compensates individual control (B)."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="compensating_control",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "compensated_fail"},
|
|
priority=80,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["B"].resolved_status == "compensated_fail"
|
|
|
|
def test_scope_exclusion(self):
|
|
"""No AI usage -> AI controls not applicable."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="scope_exclusion",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=20,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["B"].resolved_status == "not_applicable"
|
|
|
|
def test_conditional_requirement_with_context(self):
|
|
"""Enhanced logging required only for large companies."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="conditional_requirement",
|
|
condition={
|
|
"operator": "AND",
|
|
"clauses": [
|
|
{"field": "source.status", "op": "==", "value": "pass"},
|
|
{"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]},
|
|
],
|
|
},
|
|
effect={"set_status": "pass"},
|
|
priority=70,
|
|
),
|
|
]
|
|
# Large company -> condition met
|
|
results = evaluate_controls(states, deps, {"company_size": "large"})
|
|
assert results["B"].resolved_status == "pass"
|
|
|
|
# Small company -> condition not met, stays fail
|
|
results2 = evaluate_controls(states, deps, {"company_size": "small"})
|
|
assert results2["B"].resolved_status == "fail"
|
|
|
|
def test_priority_conflict_resolution(self):
|
|
"""When scope_exclusion (prio 20) and compensating (prio 80) both match,
|
|
scope_exclusion wins."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"X": ControlState(control_id="X", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="scope_exclusion",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=20,
|
|
),
|
|
Dependency(
|
|
id="d2", source_control_id="X", target_control_id="B",
|
|
dependency_type="compensating_control",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "compensated_fail"},
|
|
priority=80,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["B"].resolved_status == "not_applicable" # scope_exclusion wins
|
|
|
|
def test_cycle_controls_get_review_required(self):
|
|
"""Controls in a cycle get review_required."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="pass"),
|
|
"C": ControlState(control_id="C", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(id="d1", source_control_id="A", target_control_id="B", dependency_type="supersedes"),
|
|
Dependency(id="d2", source_control_id="B", target_control_id="A", dependency_type="supersedes"),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["A"].resolved_status == "review_required"
|
|
assert results["B"].resolved_status == "review_required"
|
|
assert results["C"].resolved_status == "fail" # Not in cycle, unaffected
|
|
|
|
def test_chain_evaluation_order(self):
|
|
"""A -> B -> C: C's evaluation uses B's resolved status."""
|
|
states = {
|
|
"A": ControlState(control_id="A", raw_status="pass"),
|
|
"B": ControlState(control_id="B", raw_status="fail"),
|
|
"C": ControlState(control_id="C", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="A", target_control_id="B",
|
|
dependency_type="supersedes",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=10,
|
|
),
|
|
Dependency(
|
|
id="d2", source_control_id="B", target_control_id="C",
|
|
dependency_type="prerequisite",
|
|
condition={"field": "source.status", "op": "==", "value": "not_applicable"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=50,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["A"].resolved_status == "pass"
|
|
assert results["B"].resolved_status == "not_applicable"
|
|
# C depends on B's RESOLVED status (not_applicable), not raw (fail)
|
|
assert results["C"].resolved_status == "not_applicable"
|
|
|
|
def test_ghv_full_scenario(self):
|
|
"""Complete GHV scenario from the user's example:
|
|
MC-001: GHV-Klausel im Vertrag
|
|
MC-002: Vertraulichkeitsschulung
|
|
MC-003: Jaehrliche Nachschulung
|
|
|
|
Case 1: Vertrag hat GHV -> Schulung + Nachschulung not_applicable
|
|
"""
|
|
states = {
|
|
"MC-001": ControlState(control_id="MC-001", raw_status="pass"),
|
|
"MC-002": ControlState(control_id="MC-002", raw_status="fail"),
|
|
"MC-003": ControlState(control_id="MC-003", raw_status="fail"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="MC-001", target_control_id="MC-002",
|
|
dependency_type="supersedes",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=10,
|
|
),
|
|
Dependency(
|
|
id="d2", source_control_id="MC-002", target_control_id="MC-003",
|
|
dependency_type="prerequisite",
|
|
condition={"field": "source.status", "op": "!=", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=50,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["MC-001"].resolved_status == "pass"
|
|
assert results["MC-002"].resolved_status == "not_applicable"
|
|
# MC-003: MC-002 resolved to not_applicable (!=pass) -> not_applicable
|
|
assert results["MC-003"].resolved_status == "not_applicable"
|
|
|
|
def test_ghv_no_contract(self):
|
|
"""GHV Case 2: Vertrag fehlt, Schulung vorhanden."""
|
|
states = {
|
|
"MC-001": ControlState(control_id="MC-001", raw_status="fail"),
|
|
"MC-002": ControlState(control_id="MC-002", raw_status="pass"),
|
|
"MC-003": ControlState(control_id="MC-003", raw_status="pass"),
|
|
}
|
|
deps = [
|
|
Dependency(
|
|
id="d1", source_control_id="MC-001", target_control_id="MC-002",
|
|
dependency_type="supersedes",
|
|
condition={"field": "source.status", "op": "==", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=10,
|
|
),
|
|
Dependency(
|
|
id="d2", source_control_id="MC-002", target_control_id="MC-003",
|
|
dependency_type="prerequisite",
|
|
condition={"field": "source.status", "op": "!=", "value": "pass"},
|
|
effect={"set_status": "not_applicable"},
|
|
priority=50,
|
|
),
|
|
]
|
|
results = evaluate_controls(states, deps, {})
|
|
assert results["MC-001"].resolved_status == "fail"
|
|
assert results["MC-002"].resolved_status == "pass" # Supersedes condition not met
|
|
assert results["MC-003"].resolved_status == "pass" # Prereq met (MC-002 == pass)
|