"""Tests for Anti-Fake-Evidence Phase 1 guardrails. ~45 tests covering: - Evidence confidence classification - Evidence truth status classification - Control status transition state machine - Multi-dimensional compliance score - LLM generation audit - Evidence review endpoint """ from datetime import datetime, timedelta from unittest.mock import MagicMock, patch from fastapi import FastAPI from fastapi.testclient import TestClient from compliance.api.evidence_routes import router as evidence_router from compliance.api.llm_audit_routes import router as llm_audit_router from compliance.api.evidence_routes import _classify_confidence, _classify_truth_status from compliance.services.control_status_machine import validate_transition from compliance.db.models import ( EvidenceConfidenceEnum, EvidenceTruthStatusEnum, ControlStatusEnum, ) from classroom_engine.database import get_db # --------------------------------------------------------------------------- # App setup with mocked DB dependency # --------------------------------------------------------------------------- app = FastAPI() app.include_router(evidence_router) app.include_router(llm_audit_router, prefix="/compliance") mock_db = MagicMock() def override_get_db(): yield mock_db app.dependency_overrides[get_db] = override_get_db client = TestClient(app) EVIDENCE_UUID = "eeeeeeee-aaaa-bbbb-cccc-ffffffffffff" CONTROL_UUID = "cccccccc-aaaa-bbbb-cccc-dddddddddddd" NOW = datetime(2026, 3, 23, 12, 0, 0) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def make_evidence(overrides=None): e = MagicMock() e.id = EVIDENCE_UUID e.control_id = CONTROL_UUID e.evidence_type = "test_results" e.title = "Pytest Test Report" e.description = "All tests passing" e.artifact_url = "https://ci.example.com/job/123/artifact" e.artifact_path = None e.artifact_hash = "abc123def456" e.file_size_bytes = None e.mime_type = None e.status = MagicMock() e.status.value = "valid" e.uploaded_by = None e.source = "ci_pipeline" e.ci_job_id = "job-123" e.valid_from = NOW e.valid_until = NOW + timedelta(days=90) e.collected_at = NOW e.created_at = NOW # Anti-fake-evidence fields e.confidence_level = EvidenceConfidenceEnum.E3 e.truth_status = EvidenceTruthStatusEnum.OBSERVED e.generation_mode = None e.may_be_used_as_evidence = True e.reviewed_by = None e.reviewed_at = None # Phase 2 fields e.approval_status = "none" e.first_reviewer = None e.first_reviewed_at = None e.second_reviewer = None e.second_reviewed_at = None e.requires_four_eyes = False if overrides: for k, v in overrides.items(): setattr(e, k, v) return e def make_control(overrides=None): c = MagicMock() c.id = CONTROL_UUID c.control_id = "GOV-001" c.title = "Access Control" c.status = ControlStatusEnum.PLANNED if overrides: for k, v in overrides.items(): setattr(c, k, v) return c # =========================================================================== # 1. TestEvidenceConfidenceClassification # =========================================================================== class TestEvidenceConfidenceClassification: """Test automatic confidence level classification.""" def test_ci_pipeline_returns_e3(self): assert _classify_confidence("ci_pipeline") == EvidenceConfidenceEnum.E3 def test_api_with_hash_returns_e3(self): assert _classify_confidence("api", artifact_hash="sha256:abc") == EvidenceConfidenceEnum.E3 def test_api_without_hash_returns_e3(self): assert _classify_confidence("api") == EvidenceConfidenceEnum.E3 def test_manual_returns_e1(self): assert _classify_confidence("manual") == EvidenceConfidenceEnum.E1 def test_upload_returns_e1(self): assert _classify_confidence("upload") == EvidenceConfidenceEnum.E1 def test_generated_returns_e0(self): assert _classify_confidence("generated") == EvidenceConfidenceEnum.E0 def test_unknown_source_returns_e1(self): assert _classify_confidence("some_random_source") == EvidenceConfidenceEnum.E1 def test_none_source_returns_e1(self): assert _classify_confidence(None) == EvidenceConfidenceEnum.E1 # =========================================================================== # 2. TestEvidenceTruthStatus # =========================================================================== class TestEvidenceTruthStatus: """Test automatic truth status classification.""" def test_ci_pipeline_returns_observed(self): assert _classify_truth_status("ci_pipeline") == EvidenceTruthStatusEnum.OBSERVED def test_manual_returns_uploaded(self): assert _classify_truth_status("manual") == EvidenceTruthStatusEnum.UPLOADED def test_upload_returns_uploaded(self): assert _classify_truth_status("upload") == EvidenceTruthStatusEnum.UPLOADED def test_generated_returns_generated(self): assert _classify_truth_status("generated") == EvidenceTruthStatusEnum.GENERATED def test_api_returns_observed(self): assert _classify_truth_status("api") == EvidenceTruthStatusEnum.OBSERVED def test_none_returns_uploaded(self): assert _classify_truth_status(None) == EvidenceTruthStatusEnum.UPLOADED # =========================================================================== # 3. TestControlStatusTransitions # =========================================================================== class TestControlStatusTransitions: """Test the control status transition state machine.""" def test_planned_to_in_progress_allowed(self): allowed, violations = validate_transition("planned", "in_progress") assert allowed is True assert violations == [] def test_in_progress_to_pass_without_evidence_blocked(self): allowed, violations = validate_transition("in_progress", "pass", evidence_list=[]) assert allowed is False assert len(violations) > 0 assert "pass" in violations[0].lower() def test_in_progress_to_pass_with_e2_evidence_allowed(self): e = make_evidence({ "confidence_level": EvidenceConfidenceEnum.E2, "truth_status": EvidenceTruthStatusEnum.VALIDATED_INTERNAL, }) allowed, violations = validate_transition("in_progress", "pass", evidence_list=[e]) assert allowed is True assert violations == [] def test_in_progress_to_pass_with_e1_evidence_blocked(self): e = make_evidence({ "confidence_level": EvidenceConfidenceEnum.E1, "truth_status": EvidenceTruthStatusEnum.UPLOADED, }) allowed, violations = validate_transition("in_progress", "pass", evidence_list=[e]) assert allowed is False assert "E2" in violations[0] def test_in_progress_to_partial_with_evidence_allowed(self): e = make_evidence({"confidence_level": EvidenceConfidenceEnum.E0}) allowed, violations = validate_transition("in_progress", "partial", evidence_list=[e]) assert allowed is True def test_in_progress_to_partial_without_evidence_blocked(self): allowed, violations = validate_transition("in_progress", "partial", evidence_list=[]) assert allowed is False def test_pass_to_fail_always_allowed(self): allowed, violations = validate_transition("pass", "fail") assert allowed is True def test_any_to_na_requires_justification(self): allowed, violations = validate_transition("in_progress", "n/a", status_justification=None) assert allowed is False assert "justification" in violations[0].lower() def test_any_to_na_with_justification_allowed(self): allowed, violations = validate_transition("in_progress", "n/a", status_justification="Not applicable for this project") assert allowed is True def test_any_to_planned_always_allowed(self): allowed, violations = validate_transition("pass", "planned") assert allowed is True def test_same_status_noop_allowed(self): allowed, violations = validate_transition("pass", "pass") assert allowed is True def test_bypass_for_auto_updater(self): allowed, violations = validate_transition("in_progress", "pass", evidence_list=[], bypass_for_auto_updater=True) assert allowed is True def test_partial_to_pass_needs_e2(self): e = make_evidence({ "confidence_level": EvidenceConfidenceEnum.E1, "truth_status": EvidenceTruthStatusEnum.UPLOADED, }) allowed, violations = validate_transition("partial", "pass", evidence_list=[e]) assert allowed is False def test_partial_to_pass_with_e3_allowed(self): e = make_evidence({ "confidence_level": EvidenceConfidenceEnum.E3, "truth_status": EvidenceTruthStatusEnum.OBSERVED, }) allowed, violations = validate_transition("partial", "pass", evidence_list=[e]) assert allowed is True def test_in_progress_to_fail_allowed(self): allowed, violations = validate_transition("in_progress", "fail") assert allowed is True # =========================================================================== # 4. TestMultiDimensionalScore # =========================================================================== class TestMultiDimensionalScore: """Test multi-dimensional score calculation.""" def test_score_structure(self): """Score result should have all required keys.""" from compliance.db.repository import ControlRepository repo = ControlRepository(mock_db) with patch.object(repo, 'get_all', return_value=[]): result = repo.get_multi_dimensional_score() assert "requirement_coverage" in result assert "evidence_strength" in result assert "validation_quality" in result assert "evidence_freshness" in result assert "control_effectiveness" in result assert "overall_readiness" in result assert "hard_blocks" in result def test_empty_controls_returns_zeros(self): from compliance.db.repository import ControlRepository repo = ControlRepository(mock_db) with patch.object(repo, 'get_all', return_value=[]): result = repo.get_multi_dimensional_score() assert result["overall_readiness"] == 0.0 assert "Keine Controls" in result["hard_blocks"][0] def test_hard_blocks_pass_without_evidence(self): """Controls on 'pass' without evidence should trigger hard block.""" from compliance.db.repository import ControlRepository repo = ControlRepository(mock_db) ctrl = make_control({"status": ControlStatusEnum.PASS}) mock_db.query.return_value.all.return_value = [] # no evidence mock_db.query.return_value.scalar.return_value = 0 with patch.object(repo, 'get_all', return_value=[ctrl]): result = repo.get_multi_dimensional_score() assert any("Evidence" in b or "evidence" in b.lower() for b in result["hard_blocks"]) def test_all_dimensions_are_floats(self): from compliance.db.repository import ControlRepository repo = ControlRepository(mock_db) with patch.object(repo, 'get_all', return_value=[]): result = repo.get_multi_dimensional_score() for key in ["requirement_coverage", "evidence_strength", "validation_quality", "evidence_freshness", "control_effectiveness", "overall_readiness"]: assert isinstance(result[key], float), f"{key} should be float" def test_hard_blocks_is_list(self): from compliance.db.repository import ControlRepository repo = ControlRepository(mock_db) with patch.object(repo, 'get_all', return_value=[]): result = repo.get_multi_dimensional_score() assert isinstance(result["hard_blocks"], list) def test_backwards_compatibility_with_old_score(self): """get_statistics should still work and return compliance_score.""" from compliance.db.repository import ControlRepository repo = ControlRepository(mock_db) mock_db.query.return_value.scalar.return_value = 0 mock_db.query.return_value.group_by.return_value.all.return_value = [] result = repo.get_statistics() assert "compliance_score" in result assert "total" in result # =========================================================================== # 5. TestForbiddenFormulations # =========================================================================== class TestForbiddenFormulations: """Test forbidden formulation detection (tested via the validate endpoint context).""" def test_import_works(self): """Verify forbidden pattern check function is importable and callable.""" # This tests the Python-side schema, the actual check is in TypeScript from compliance.api.schemas import MultiDimensionalScore, StatusTransitionError score = MultiDimensionalScore() assert score.overall_readiness == 0.0 err = StatusTransitionError(current_status="planned", requested_status="pass") assert err.allowed is False def test_status_transition_error_schema(self): from compliance.api.schemas import StatusTransitionError err = StatusTransitionError( allowed=False, current_status="in_progress", requested_status="pass", violations=["Need E2 evidence"], ) assert err.violations == ["Need E2 evidence"] def test_multi_dimensional_score_defaults(self): from compliance.api.schemas import MultiDimensionalScore score = MultiDimensionalScore() assert score.requirement_coverage == 0.0 assert score.hard_blocks == [] def test_multi_dimensional_score_with_data(self): from compliance.api.schemas import MultiDimensionalScore score = MultiDimensionalScore( requirement_coverage=80.0, evidence_strength=60.0, validation_quality=40.0, evidence_freshness=90.0, control_effectiveness=70.0, overall_readiness=65.0, hard_blocks=["3 Controls ohne Evidence"], ) assert score.overall_readiness == 65.0 assert len(score.hard_blocks) == 1 def test_evidence_response_has_anti_fake_fields(self): from compliance.api.schemas import EvidenceResponse fields = EvidenceResponse.model_fields assert "confidence_level" in fields assert "truth_status" in fields assert "generation_mode" in fields assert "may_be_used_as_evidence" in fields assert "reviewed_by" in fields assert "reviewed_at" in fields # =========================================================================== # 6. TestLLMGenerationAudit # =========================================================================== class TestLLMGenerationAudit: """Test LLM generation audit trail.""" def test_create_audit_record(self): """POST /compliance/llm-audit should create a record.""" mock_record = MagicMock() mock_record.id = "audit-001" mock_record.tenant_id = None mock_record.entity_type = "document" mock_record.entity_id = None mock_record.generation_mode = "draft_assistance" mock_record.truth_status = EvidenceTruthStatusEnum.GENERATED mock_record.may_be_used_as_evidence = False mock_record.llm_model = "qwen2.5vl:32b" mock_record.llm_provider = "ollama" mock_record.prompt_hash = None mock_record.input_summary = "Test input" mock_record.output_summary = "Test output" mock_record.extra_metadata = {} mock_record.created_at = NOW mock_db.add = MagicMock() mock_db.commit = MagicMock() mock_db.refresh = MagicMock(side_effect=lambda r: setattr(r, 'id', 'audit-001')) # We need to patch the LLMGenerationAuditDB constructor with patch('compliance.api.llm_audit_routes.LLMGenerationAuditDB', return_value=mock_record): resp = client.post("/compliance/llm-audit", json={ "entity_type": "document", "generation_mode": "draft_assistance", "truth_status": "generated", "may_be_used_as_evidence": False, "llm_model": "qwen2.5vl:32b", "llm_provider": "ollama", }) assert resp.status_code == 200 data = resp.json() assert data["entity_type"] == "document" assert data["truth_status"] == "generated" assert data["may_be_used_as_evidence"] is False def test_truth_status_always_generated_for_llm(self): """LLM-generated content should always start with truth_status=generated.""" from compliance.db.models import LLMGenerationAuditDB, EvidenceTruthStatusEnum audit = LLMGenerationAuditDB() # Default should be GENERATED assert audit.truth_status is None or audit.truth_status == EvidenceTruthStatusEnum.GENERATED def test_may_be_used_as_evidence_defaults_false(self): """Generated content should NOT be usable as evidence by default.""" from compliance.db.models import LLMGenerationAuditDB audit = LLMGenerationAuditDB() assert audit.may_be_used_as_evidence is False or audit.may_be_used_as_evidence is None def test_list_audit_records(self): """GET /compliance/llm-audit should return records.""" mock_query = MagicMock() mock_query.count.return_value = 0 mock_query.filter.return_value = mock_query mock_query.order_by.return_value = mock_query mock_query.offset.return_value = mock_query mock_query.limit.return_value = mock_query mock_query.all.return_value = [] mock_db.query.return_value = mock_query resp = client.get("/compliance/llm-audit") assert resp.status_code == 200 data = resp.json() assert "records" in data assert "total" in data assert data["total"] == 0 # =========================================================================== # 7. TestEvidenceReview # =========================================================================== class TestEvidenceReview: """Test evidence review endpoint.""" def test_review_upgrades_confidence(self): """PATCH /evidence/{id}/review should update confidence and set reviewer.""" evidence = make_evidence({ "confidence_level": EvidenceConfidenceEnum.E1, "truth_status": EvidenceTruthStatusEnum.UPLOADED, }) mock_db.query.return_value.filter.return_value.first.return_value = evidence mock_db.commit = MagicMock() mock_db.refresh = MagicMock() resp = client.patch(f"/evidence/{EVIDENCE_UUID}/review", json={ "confidence_level": "E2", "truth_status": "validated_internal", "reviewed_by": "auditor@example.com", }) assert resp.status_code == 200 # Verify the evidence was updated assert evidence.confidence_level == EvidenceConfidenceEnum.E2 assert evidence.truth_status == EvidenceTruthStatusEnum.VALIDATED_INTERNAL assert evidence.reviewed_by == "auditor@example.com" assert evidence.reviewed_at is not None def test_review_nonexistent_evidence_returns_404(self): mock_db.query.return_value.filter.return_value.first.return_value = None resp = client.patch("/evidence/nonexistent-id/review", json={ "reviewed_by": "someone", }) assert resp.status_code == 404 def test_review_invalid_confidence_returns_400(self): evidence = make_evidence() mock_db.query.return_value.filter.return_value.first.return_value = evidence resp = client.patch(f"/evidence/{EVIDENCE_UUID}/review", json={ "confidence_level": "INVALID", "reviewed_by": "someone", }) assert resp.status_code == 400 # =========================================================================== # 8. TestControlUpdateIntegration # =========================================================================== class TestControlUpdateIntegration: """Test that ControlUpdate schema includes status_justification.""" def test_control_update_has_status_justification(self): from compliance.api.schemas import ControlUpdate fields = ControlUpdate.model_fields assert "status_justification" in fields def test_control_response_has_status_justification(self): from compliance.api.schemas import ControlResponse fields = ControlResponse.model_fields assert "status_justification" in fields def test_control_status_enum_has_in_progress(self): assert ControlStatusEnum.IN_PROGRESS.value == "in_progress" # =========================================================================== # 9. TestEvidenceEnums # =========================================================================== class TestEvidenceEnums: """Test the new evidence enums.""" def test_confidence_enum_values(self): assert EvidenceConfidenceEnum.E0.value == "E0" assert EvidenceConfidenceEnum.E1.value == "E1" assert EvidenceConfidenceEnum.E2.value == "E2" assert EvidenceConfidenceEnum.E3.value == "E3" assert EvidenceConfidenceEnum.E4.value == "E4" def test_truth_status_enum_values(self): assert EvidenceTruthStatusEnum.GENERATED.value == "generated" assert EvidenceTruthStatusEnum.UPLOADED.value == "uploaded" assert EvidenceTruthStatusEnum.OBSERVED.value == "observed" assert EvidenceTruthStatusEnum.VALIDATED_INTERNAL.value == "validated_internal" assert EvidenceTruthStatusEnum.REJECTED.value == "rejected" assert EvidenceTruthStatusEnum.PROVIDED_TO_AUDITOR.value == "provided_to_auditor" assert EvidenceTruthStatusEnum.ACCEPTED_BY_AUDITOR.value == "accepted_by_auditor"