""" Tests for the Control Dependency Engine. Pure Python tests — no DB required. Tests condition evaluation, effect application, cycle detection, topological sort, and full evaluation resolution. """ import sys import os import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from services.dependency_engine import ( Dependency, ControlState, EvaluationResult, EvaluationStatus, evaluate_condition, apply_effect, detect_cycles, topological_sort, evaluate_controls, _resolve_field, ) # ============================================================================ # Condition Evaluator # ============================================================================ class TestConditionEvaluator: def test_empty_condition_returns_true(self): assert evaluate_condition({}, {}) is True def test_simple_equals(self): cond = {"field": "source.status", "op": "==", "value": "pass"} ctx = {"source": {"status": "pass"}} assert evaluate_condition(cond, ctx) is True def test_simple_equals_false(self): cond = {"field": "source.status", "op": "==", "value": "pass"} ctx = {"source": {"status": "fail"}} assert evaluate_condition(cond, ctx) is False def test_not_equals(self): cond = {"field": "source.status", "op": "!=", "value": "pass"} ctx = {"source": {"status": "fail"}} assert evaluate_condition(cond, ctx) is True def test_in_operator(self): cond = {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]} ctx = {"context": {"company_size": "large"}} assert evaluate_condition(cond, ctx) is True def test_in_operator_false(self): cond = {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]} ctx = {"context": {"company_size": "small"}} assert evaluate_condition(cond, ctx) is False def test_not_in_operator(self): cond = {"field": "context.industry", "op": "not_in", "value": ["finance", "healthcare"]} ctx = {"context": {"industry": "retail"}} assert evaluate_condition(cond, ctx) is True def test_and_compound(self): cond = { "operator": "AND", "clauses": [ {"field": "source.status", "op": "==", "value": "pass"}, {"field": "context.company_size", "op": "in", "value": ["large"]}, ], } ctx = {"source": {"status": "pass"}, "context": {"company_size": "large"}} assert evaluate_condition(cond, ctx) is True def test_and_compound_one_false(self): cond = { "operator": "AND", "clauses": [ {"field": "source.status", "op": "==", "value": "pass"}, {"field": "context.company_size", "op": "in", "value": ["large"]}, ], } ctx = {"source": {"status": "pass"}, "context": {"company_size": "small"}} assert evaluate_condition(cond, ctx) is False def test_or_compound(self): cond = { "operator": "OR", "clauses": [ {"field": "source.status", "op": "==", "value": "pass"}, {"field": "source.status", "op": "==", "value": "compensated_fail"}, ], } ctx = {"source": {"status": "compensated_fail"}} assert evaluate_condition(cond, ctx) is True def test_not_operator(self): cond = { "operator": "NOT", "clause": {"field": "source.status", "op": "==", "value": "pass"}, } ctx = {"source": {"status": "fail"}} assert evaluate_condition(cond, ctx) is True def test_nested_field_resolution(self): ctx = {"context": {"scope_signals": {"uses_ai": True}}} val = _resolve_field("context.scope_signals.uses_ai", ctx) assert val is True def test_greater_than(self): cond = {"field": "context.employee_count", "op": ">", "value": 250} ctx = {"context": {"employee_count": 500}} assert evaluate_condition(cond, ctx) is True def test_contains(self): cond = {"field": "context.scope_signals", "op": "contains", "value": "uses_ai"} ctx = {"context": {"scope_signals": ["uses_ai", "has_employees"]}} assert evaluate_condition(cond, ctx) is True # ============================================================================ # Effect Applier # ============================================================================ class TestEffectApplier: def test_set_status_not_applicable(self): assert apply_effect({"set_status": "not_applicable"}, "fail") == "not_applicable" def test_set_status_compensated_fail(self): assert apply_effect({"set_status": "compensated_fail"}, "fail") == "compensated_fail" def test_set_status_pass(self): assert apply_effect({"set_status": "pass"}, "fail") == "pass" def test_unknown_effect_returns_original(self): assert apply_effect({"unknown_key": "value"}, "fail") == "fail" def test_invalid_status_returns_original(self): assert apply_effect({"set_status": "invalid_status"}, "fail") == "fail" def test_empty_effect(self): assert apply_effect({}, "pass") == "pass" # ============================================================================ # Cycle Detection # ============================================================================ class TestCycleDetection: def test_no_cycles(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B", dependency_type="prerequisite"), Dependency(id="2", source_control_id="B", target_control_id="C", dependency_type="prerequisite"), ] cycles = detect_cycles(deps) assert len(cycles) == 0 def test_simple_cycle(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B", dependency_type="supersedes"), Dependency(id="2", source_control_id="B", target_control_id="A", dependency_type="supersedes"), ] cycles = detect_cycles(deps) assert len(cycles) > 0 # Both A and B should be in the cycle cycle_nodes = set() for c in cycles: cycle_nodes.update(c) assert "A" in cycle_nodes assert "B" in cycle_nodes def test_diamond_no_cycle(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B"), Dependency(id="2", source_control_id="A", target_control_id="C"), Dependency(id="3", source_control_id="B", target_control_id="D"), Dependency(id="4", source_control_id="C", target_control_id="D"), ] cycles = detect_cycles(deps) assert len(cycles) == 0 def test_inactive_deps_ignored(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B", is_active=True), Dependency(id="2", source_control_id="B", target_control_id="A", is_active=False), ] cycles = detect_cycles(deps) assert len(cycles) == 0 # ============================================================================ # Topological Sort # ============================================================================ class TestTopologicalSort: def test_linear_chain(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B"), Dependency(id="2", source_control_id="B", target_control_id="C"), ] order = topological_sort(deps) assert order.index("A") < order.index("B") assert order.index("B") < order.index("C") def test_diamond_graph(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B"), Dependency(id="2", source_control_id="A", target_control_id="C"), Dependency(id="3", source_control_id="B", target_control_id="D"), Dependency(id="4", source_control_id="C", target_control_id="D"), ] order = topological_sort(deps) assert order.index("A") < order.index("B") assert order.index("A") < order.index("C") assert order.index("B") < order.index("D") assert order.index("C") < order.index("D") def test_disconnected_components(self): deps = [ Dependency(id="1", source_control_id="A", target_control_id="B"), Dependency(id="2", source_control_id="X", target_control_id="Y"), ] order = topological_sort(deps) assert len(order) == 4 assert order.index("A") < order.index("B") assert order.index("X") < order.index("Y") # ============================================================================ # Full Evaluation # ============================================================================ class TestEvaluateControls: def test_no_dependencies_passthrough(self): states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), } results = evaluate_controls(states, [], {}) assert results["A"].resolved_status == "pass" assert results["B"].resolved_status == "fail" def test_supersedes_makes_not_applicable(self): """GHV-Klausel (A) supersedes Schulung (B).""" states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="supersedes", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=10, ), ] results = evaluate_controls(states, deps, {}) assert results["A"].resolved_status == "pass" assert results["B"].resolved_status == "not_applicable" assert len(results["B"].dependency_resolution) == 1 assert results["B"].dependency_resolution[0]["condition_met"] is True def test_supersedes_not_met_preserves_status(self): """GHV-Klausel (A) fails -> Schulung (B) stays fail.""" states = { "A": ControlState(control_id="A", raw_status="fail"), "B": ControlState(control_id="B", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="supersedes", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=10, ), ] results = evaluate_controls(states, deps, {}) assert results["B"].resolved_status == "fail" def test_prerequisite_fail_blocks_target(self): """define:policy (A) must pass before implement:policy (B).""" states = { "A": ControlState(control_id="A", raw_status="fail"), "B": ControlState(control_id="B", raw_status="pass"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="prerequisite", condition={"field": "source.status", "op": "!=", "value": "pass"}, effect={"set_status": "review_required"}, priority=50, ), ] results = evaluate_controls(states, deps, {}) assert results["B"].resolved_status == "review_required" def test_compensating_control(self): """ISO cert (A) compensates individual control (B).""" states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="compensating_control", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "compensated_fail"}, priority=80, ), ] results = evaluate_controls(states, deps, {}) assert results["B"].resolved_status == "compensated_fail" def test_scope_exclusion(self): """No AI usage -> AI controls not applicable.""" states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="scope_exclusion", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=20, ), ] results = evaluate_controls(states, deps, {}) assert results["B"].resolved_status == "not_applicable" def test_conditional_requirement_with_context(self): """Enhanced logging required only for large companies.""" states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="conditional_requirement", condition={ "operator": "AND", "clauses": [ {"field": "source.status", "op": "==", "value": "pass"}, {"field": "context.company_size", "op": "in", "value": ["large", "enterprise"]}, ], }, effect={"set_status": "pass"}, priority=70, ), ] # Large company -> condition met results = evaluate_controls(states, deps, {"company_size": "large"}) assert results["B"].resolved_status == "pass" # Small company -> condition not met, stays fail results2 = evaluate_controls(states, deps, {"company_size": "small"}) assert results2["B"].resolved_status == "fail" def test_priority_conflict_resolution(self): """When scope_exclusion (prio 20) and compensating (prio 80) both match, scope_exclusion wins.""" states = { "A": ControlState(control_id="A", raw_status="pass"), "X": ControlState(control_id="X", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="scope_exclusion", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=20, ), Dependency( id="d2", source_control_id="X", target_control_id="B", dependency_type="compensating_control", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "compensated_fail"}, priority=80, ), ] results = evaluate_controls(states, deps, {}) assert results["B"].resolved_status == "not_applicable" # scope_exclusion wins def test_cycle_controls_get_review_required(self): """Controls in a cycle get review_required.""" states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="pass"), "C": ControlState(control_id="C", raw_status="fail"), } deps = [ Dependency(id="d1", source_control_id="A", target_control_id="B", dependency_type="supersedes"), Dependency(id="d2", source_control_id="B", target_control_id="A", dependency_type="supersedes"), ] results = evaluate_controls(states, deps, {}) assert results["A"].resolved_status == "review_required" assert results["B"].resolved_status == "review_required" assert results["C"].resolved_status == "fail" # Not in cycle, unaffected def test_chain_evaluation_order(self): """A -> B -> C: C's evaluation uses B's resolved status.""" states = { "A": ControlState(control_id="A", raw_status="pass"), "B": ControlState(control_id="B", raw_status="fail"), "C": ControlState(control_id="C", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="A", target_control_id="B", dependency_type="supersedes", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=10, ), Dependency( id="d2", source_control_id="B", target_control_id="C", dependency_type="prerequisite", condition={"field": "source.status", "op": "==", "value": "not_applicable"}, effect={"set_status": "not_applicable"}, priority=50, ), ] results = evaluate_controls(states, deps, {}) assert results["A"].resolved_status == "pass" assert results["B"].resolved_status == "not_applicable" # C depends on B's RESOLVED status (not_applicable), not raw (fail) assert results["C"].resolved_status == "not_applicable" def test_ghv_full_scenario(self): """Complete GHV scenario from the user's example: MC-001: GHV-Klausel im Vertrag MC-002: Vertraulichkeitsschulung MC-003: Jaehrliche Nachschulung Case 1: Vertrag hat GHV -> Schulung + Nachschulung not_applicable """ states = { "MC-001": ControlState(control_id="MC-001", raw_status="pass"), "MC-002": ControlState(control_id="MC-002", raw_status="fail"), "MC-003": ControlState(control_id="MC-003", raw_status="fail"), } deps = [ Dependency( id="d1", source_control_id="MC-001", target_control_id="MC-002", dependency_type="supersedes", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=10, ), Dependency( id="d2", source_control_id="MC-002", target_control_id="MC-003", dependency_type="prerequisite", condition={"field": "source.status", "op": "!=", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=50, ), ] results = evaluate_controls(states, deps, {}) assert results["MC-001"].resolved_status == "pass" assert results["MC-002"].resolved_status == "not_applicable" # MC-003: MC-002 resolved to not_applicable (!=pass) -> not_applicable assert results["MC-003"].resolved_status == "not_applicable" def test_ghv_no_contract(self): """GHV Case 2: Vertrag fehlt, Schulung vorhanden.""" states = { "MC-001": ControlState(control_id="MC-001", raw_status="fail"), "MC-002": ControlState(control_id="MC-002", raw_status="pass"), "MC-003": ControlState(control_id="MC-003", raw_status="pass"), } deps = [ Dependency( id="d1", source_control_id="MC-001", target_control_id="MC-002", dependency_type="supersedes", condition={"field": "source.status", "op": "==", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=10, ), Dependency( id="d2", source_control_id="MC-002", target_control_id="MC-003", dependency_type="prerequisite", condition={"field": "source.status", "op": "!=", "value": "pass"}, effect={"set_status": "not_applicable"}, priority=50, ), ] results = evaluate_controls(states, deps, {}) assert results["MC-001"].resolved_status == "fail" assert results["MC-002"].resolved_status == "pass" # Supersedes condition not met assert results["MC-003"].resolved_status == "pass" # Prereq met (MC-002 == pass)