feat: Anti-Fake-Evidence System (Phase 1-4b)

Implement full evidence integrity pipeline to prevent compliance theater:
- Confidence levels (E0-E4), truth status tracking, assertion engine
- Four-Eyes approval workflow, audit trail, reject endpoint
- Evidence distribution dashboard, LLM audit routes
- Traceability matrix (backend endpoint + Compliance Hub UI tab)
- Anti-fake badges, control status machine, normative patterns
- 2 migrations, 4 test suites, MkDocs documentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-23 17:15:45 +01:00
parent 48ca0a6bef
commit e6201d5239
36 changed files with 5627 additions and 189 deletions

View File

@@ -0,0 +1,562 @@
"""Tests for Anti-Fake-Evidence Phase 1 guardrails.
~45 tests covering:
- Evidence confidence classification
- Evidence truth status classification
- Control status transition state machine
- Multi-dimensional compliance score
- LLM generation audit
- Evidence review endpoint
"""
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.evidence_routes import router as evidence_router
from compliance.api.llm_audit_routes import router as llm_audit_router
from compliance.api.evidence_routes import _classify_confidence, _classify_truth_status
from compliance.services.control_status_machine import validate_transition
from compliance.db.models import (
EvidenceConfidenceEnum,
EvidenceTruthStatusEnum,
ControlStatusEnum,
)
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(evidence_router)
app.include_router(llm_audit_router, prefix="/compliance")
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
EVIDENCE_UUID = "eeeeeeee-aaaa-bbbb-cccc-ffffffffffff"
CONTROL_UUID = "cccccccc-aaaa-bbbb-cccc-dddddddddddd"
NOW = datetime(2026, 3, 23, 12, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_evidence(overrides=None):
e = MagicMock()
e.id = EVIDENCE_UUID
e.control_id = CONTROL_UUID
e.evidence_type = "test_results"
e.title = "Pytest Test Report"
e.description = "All tests passing"
e.artifact_url = "https://ci.example.com/job/123/artifact"
e.artifact_path = None
e.artifact_hash = "abc123def456"
e.file_size_bytes = None
e.mime_type = None
e.status = MagicMock()
e.status.value = "valid"
e.uploaded_by = None
e.source = "ci_pipeline"
e.ci_job_id = "job-123"
e.valid_from = NOW
e.valid_until = NOW + timedelta(days=90)
e.collected_at = NOW
e.created_at = NOW
# Anti-fake-evidence fields
e.confidence_level = EvidenceConfidenceEnum.E3
e.truth_status = EvidenceTruthStatusEnum.OBSERVED
e.generation_mode = None
e.may_be_used_as_evidence = True
e.reviewed_by = None
e.reviewed_at = None
# Phase 2 fields
e.approval_status = "none"
e.first_reviewer = None
e.first_reviewed_at = None
e.second_reviewer = None
e.second_reviewed_at = None
e.requires_four_eyes = False
if overrides:
for k, v in overrides.items():
setattr(e, k, v)
return e
def make_control(overrides=None):
c = MagicMock()
c.id = CONTROL_UUID
c.control_id = "GOV-001"
c.title = "Access Control"
c.status = ControlStatusEnum.PLANNED
if overrides:
for k, v in overrides.items():
setattr(c, k, v)
return c
# ===========================================================================
# 1. TestEvidenceConfidenceClassification
# ===========================================================================
class TestEvidenceConfidenceClassification:
"""Test automatic confidence level classification."""
def test_ci_pipeline_returns_e3(self):
assert _classify_confidence("ci_pipeline") == EvidenceConfidenceEnum.E3
def test_api_with_hash_returns_e3(self):
assert _classify_confidence("api", artifact_hash="sha256:abc") == EvidenceConfidenceEnum.E3
def test_api_without_hash_returns_e3(self):
assert _classify_confidence("api") == EvidenceConfidenceEnum.E3
def test_manual_returns_e1(self):
assert _classify_confidence("manual") == EvidenceConfidenceEnum.E1
def test_upload_returns_e1(self):
assert _classify_confidence("upload") == EvidenceConfidenceEnum.E1
def test_generated_returns_e0(self):
assert _classify_confidence("generated") == EvidenceConfidenceEnum.E0
def test_unknown_source_returns_e1(self):
assert _classify_confidence("some_random_source") == EvidenceConfidenceEnum.E1
def test_none_source_returns_e1(self):
assert _classify_confidence(None) == EvidenceConfidenceEnum.E1
# ===========================================================================
# 2. TestEvidenceTruthStatus
# ===========================================================================
class TestEvidenceTruthStatus:
"""Test automatic truth status classification."""
def test_ci_pipeline_returns_observed(self):
assert _classify_truth_status("ci_pipeline") == EvidenceTruthStatusEnum.OBSERVED
def test_manual_returns_uploaded(self):
assert _classify_truth_status("manual") == EvidenceTruthStatusEnum.UPLOADED
def test_upload_returns_uploaded(self):
assert _classify_truth_status("upload") == EvidenceTruthStatusEnum.UPLOADED
def test_generated_returns_generated(self):
assert _classify_truth_status("generated") == EvidenceTruthStatusEnum.GENERATED
def test_api_returns_observed(self):
assert _classify_truth_status("api") == EvidenceTruthStatusEnum.OBSERVED
def test_none_returns_uploaded(self):
assert _classify_truth_status(None) == EvidenceTruthStatusEnum.UPLOADED
# ===========================================================================
# 3. TestControlStatusTransitions
# ===========================================================================
class TestControlStatusTransitions:
"""Test the control status transition state machine."""
def test_planned_to_in_progress_allowed(self):
allowed, violations = validate_transition("planned", "in_progress")
assert allowed is True
assert violations == []
def test_in_progress_to_pass_without_evidence_blocked(self):
allowed, violations = validate_transition("in_progress", "pass", evidence_list=[])
assert allowed is False
assert len(violations) > 0
assert "pass" in violations[0].lower()
def test_in_progress_to_pass_with_e2_evidence_allowed(self):
e = make_evidence({
"confidence_level": EvidenceConfidenceEnum.E2,
"truth_status": EvidenceTruthStatusEnum.VALIDATED_INTERNAL,
})
allowed, violations = validate_transition("in_progress", "pass", evidence_list=[e])
assert allowed is True
assert violations == []
def test_in_progress_to_pass_with_e1_evidence_blocked(self):
e = make_evidence({
"confidence_level": EvidenceConfidenceEnum.E1,
"truth_status": EvidenceTruthStatusEnum.UPLOADED,
})
allowed, violations = validate_transition("in_progress", "pass", evidence_list=[e])
assert allowed is False
assert "E2" in violations[0]
def test_in_progress_to_partial_with_evidence_allowed(self):
e = make_evidence({"confidence_level": EvidenceConfidenceEnum.E0})
allowed, violations = validate_transition("in_progress", "partial", evidence_list=[e])
assert allowed is True
def test_in_progress_to_partial_without_evidence_blocked(self):
allowed, violations = validate_transition("in_progress", "partial", evidence_list=[])
assert allowed is False
def test_pass_to_fail_always_allowed(self):
allowed, violations = validate_transition("pass", "fail")
assert allowed is True
def test_any_to_na_requires_justification(self):
allowed, violations = validate_transition("in_progress", "n/a", status_justification=None)
assert allowed is False
assert "justification" in violations[0].lower()
def test_any_to_na_with_justification_allowed(self):
allowed, violations = validate_transition("in_progress", "n/a", status_justification="Not applicable for this project")
assert allowed is True
def test_any_to_planned_always_allowed(self):
allowed, violations = validate_transition("pass", "planned")
assert allowed is True
def test_same_status_noop_allowed(self):
allowed, violations = validate_transition("pass", "pass")
assert allowed is True
def test_bypass_for_auto_updater(self):
allowed, violations = validate_transition("in_progress", "pass", evidence_list=[], bypass_for_auto_updater=True)
assert allowed is True
def test_partial_to_pass_needs_e2(self):
e = make_evidence({
"confidence_level": EvidenceConfidenceEnum.E1,
"truth_status": EvidenceTruthStatusEnum.UPLOADED,
})
allowed, violations = validate_transition("partial", "pass", evidence_list=[e])
assert allowed is False
def test_partial_to_pass_with_e3_allowed(self):
e = make_evidence({
"confidence_level": EvidenceConfidenceEnum.E3,
"truth_status": EvidenceTruthStatusEnum.OBSERVED,
})
allowed, violations = validate_transition("partial", "pass", evidence_list=[e])
assert allowed is True
def test_in_progress_to_fail_allowed(self):
allowed, violations = validate_transition("in_progress", "fail")
assert allowed is True
# ===========================================================================
# 4. TestMultiDimensionalScore
# ===========================================================================
class TestMultiDimensionalScore:
"""Test multi-dimensional score calculation."""
def test_score_structure(self):
"""Score result should have all required keys."""
from compliance.db.repository import ControlRepository
repo = ControlRepository(mock_db)
with patch.object(repo, 'get_all', return_value=[]):
result = repo.get_multi_dimensional_score()
assert "requirement_coverage" in result
assert "evidence_strength" in result
assert "validation_quality" in result
assert "evidence_freshness" in result
assert "control_effectiveness" in result
assert "overall_readiness" in result
assert "hard_blocks" in result
def test_empty_controls_returns_zeros(self):
from compliance.db.repository import ControlRepository
repo = ControlRepository(mock_db)
with patch.object(repo, 'get_all', return_value=[]):
result = repo.get_multi_dimensional_score()
assert result["overall_readiness"] == 0.0
assert "Keine Controls" in result["hard_blocks"][0]
def test_hard_blocks_pass_without_evidence(self):
"""Controls on 'pass' without evidence should trigger hard block."""
from compliance.db.repository import ControlRepository
repo = ControlRepository(mock_db)
ctrl = make_control({"status": ControlStatusEnum.PASS})
mock_db.query.return_value.all.return_value = [] # no evidence
mock_db.query.return_value.scalar.return_value = 0
with patch.object(repo, 'get_all', return_value=[ctrl]):
result = repo.get_multi_dimensional_score()
assert any("Evidence" in b or "evidence" in b.lower() for b in result["hard_blocks"])
def test_all_dimensions_are_floats(self):
from compliance.db.repository import ControlRepository
repo = ControlRepository(mock_db)
with patch.object(repo, 'get_all', return_value=[]):
result = repo.get_multi_dimensional_score()
for key in ["requirement_coverage", "evidence_strength", "validation_quality",
"evidence_freshness", "control_effectiveness", "overall_readiness"]:
assert isinstance(result[key], float), f"{key} should be float"
def test_hard_blocks_is_list(self):
from compliance.db.repository import ControlRepository
repo = ControlRepository(mock_db)
with patch.object(repo, 'get_all', return_value=[]):
result = repo.get_multi_dimensional_score()
assert isinstance(result["hard_blocks"], list)
def test_backwards_compatibility_with_old_score(self):
"""get_statistics should still work and return compliance_score."""
from compliance.db.repository import ControlRepository
repo = ControlRepository(mock_db)
mock_db.query.return_value.scalar.return_value = 0
mock_db.query.return_value.group_by.return_value.all.return_value = []
result = repo.get_statistics()
assert "compliance_score" in result
assert "total" in result
# ===========================================================================
# 5. TestForbiddenFormulations
# ===========================================================================
class TestForbiddenFormulations:
"""Test forbidden formulation detection (tested via the validate endpoint context)."""
def test_import_works(self):
"""Verify forbidden pattern check function is importable and callable."""
# This tests the Python-side schema, the actual check is in TypeScript
from compliance.api.schemas import MultiDimensionalScore, StatusTransitionError
score = MultiDimensionalScore()
assert score.overall_readiness == 0.0
err = StatusTransitionError(current_status="planned", requested_status="pass")
assert err.allowed is False
def test_status_transition_error_schema(self):
from compliance.api.schemas import StatusTransitionError
err = StatusTransitionError(
allowed=False,
current_status="in_progress",
requested_status="pass",
violations=["Need E2 evidence"],
)
assert err.violations == ["Need E2 evidence"]
def test_multi_dimensional_score_defaults(self):
from compliance.api.schemas import MultiDimensionalScore
score = MultiDimensionalScore()
assert score.requirement_coverage == 0.0
assert score.hard_blocks == []
def test_multi_dimensional_score_with_data(self):
from compliance.api.schemas import MultiDimensionalScore
score = MultiDimensionalScore(
requirement_coverage=80.0,
evidence_strength=60.0,
validation_quality=40.0,
evidence_freshness=90.0,
control_effectiveness=70.0,
overall_readiness=65.0,
hard_blocks=["3 Controls ohne Evidence"],
)
assert score.overall_readiness == 65.0
assert len(score.hard_blocks) == 1
def test_evidence_response_has_anti_fake_fields(self):
from compliance.api.schemas import EvidenceResponse
fields = EvidenceResponse.model_fields
assert "confidence_level" in fields
assert "truth_status" in fields
assert "generation_mode" in fields
assert "may_be_used_as_evidence" in fields
assert "reviewed_by" in fields
assert "reviewed_at" in fields
# ===========================================================================
# 6. TestLLMGenerationAudit
# ===========================================================================
class TestLLMGenerationAudit:
"""Test LLM generation audit trail."""
def test_create_audit_record(self):
"""POST /compliance/llm-audit should create a record."""
mock_record = MagicMock()
mock_record.id = "audit-001"
mock_record.tenant_id = None
mock_record.entity_type = "document"
mock_record.entity_id = None
mock_record.generation_mode = "draft_assistance"
mock_record.truth_status = EvidenceTruthStatusEnum.GENERATED
mock_record.may_be_used_as_evidence = False
mock_record.llm_model = "qwen2.5vl:32b"
mock_record.llm_provider = "ollama"
mock_record.prompt_hash = None
mock_record.input_summary = "Test input"
mock_record.output_summary = "Test output"
mock_record.extra_metadata = {}
mock_record.created_at = NOW
mock_db.add = MagicMock()
mock_db.commit = MagicMock()
mock_db.refresh = MagicMock(side_effect=lambda r: setattr(r, 'id', 'audit-001'))
# We need to patch the LLMGenerationAuditDB constructor
with patch('compliance.api.llm_audit_routes.LLMGenerationAuditDB', return_value=mock_record):
resp = client.post("/compliance/llm-audit", json={
"entity_type": "document",
"generation_mode": "draft_assistance",
"truth_status": "generated",
"may_be_used_as_evidence": False,
"llm_model": "qwen2.5vl:32b",
"llm_provider": "ollama",
})
assert resp.status_code == 200
data = resp.json()
assert data["entity_type"] == "document"
assert data["truth_status"] == "generated"
assert data["may_be_used_as_evidence"] is False
def test_truth_status_always_generated_for_llm(self):
"""LLM-generated content should always start with truth_status=generated."""
from compliance.db.models import LLMGenerationAuditDB, EvidenceTruthStatusEnum
audit = LLMGenerationAuditDB()
# Default should be GENERATED
assert audit.truth_status is None or audit.truth_status == EvidenceTruthStatusEnum.GENERATED
def test_may_be_used_as_evidence_defaults_false(self):
"""Generated content should NOT be usable as evidence by default."""
from compliance.db.models import LLMGenerationAuditDB
audit = LLMGenerationAuditDB()
assert audit.may_be_used_as_evidence is False or audit.may_be_used_as_evidence is None
def test_list_audit_records(self):
"""GET /compliance/llm-audit should return records."""
mock_query = MagicMock()
mock_query.count.return_value = 0
mock_query.filter.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.offset.return_value = mock_query
mock_query.limit.return_value = mock_query
mock_query.all.return_value = []
mock_db.query.return_value = mock_query
resp = client.get("/compliance/llm-audit")
assert resp.status_code == 200
data = resp.json()
assert "records" in data
assert "total" in data
assert data["total"] == 0
# ===========================================================================
# 7. TestEvidenceReview
# ===========================================================================
class TestEvidenceReview:
"""Test evidence review endpoint."""
def test_review_upgrades_confidence(self):
"""PATCH /evidence/{id}/review should update confidence and set reviewer."""
evidence = make_evidence({
"confidence_level": EvidenceConfidenceEnum.E1,
"truth_status": EvidenceTruthStatusEnum.UPLOADED,
})
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.commit = MagicMock()
mock_db.refresh = MagicMock()
resp = client.patch(f"/evidence/{EVIDENCE_UUID}/review", json={
"confidence_level": "E2",
"truth_status": "validated_internal",
"reviewed_by": "auditor@example.com",
})
assert resp.status_code == 200
# Verify the evidence was updated
assert evidence.confidence_level == EvidenceConfidenceEnum.E2
assert evidence.truth_status == EvidenceTruthStatusEnum.VALIDATED_INTERNAL
assert evidence.reviewed_by == "auditor@example.com"
assert evidence.reviewed_at is not None
def test_review_nonexistent_evidence_returns_404(self):
mock_db.query.return_value.filter.return_value.first.return_value = None
resp = client.patch("/evidence/nonexistent-id/review", json={
"reviewed_by": "someone",
})
assert resp.status_code == 404
def test_review_invalid_confidence_returns_400(self):
evidence = make_evidence()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
resp = client.patch(f"/evidence/{EVIDENCE_UUID}/review", json={
"confidence_level": "INVALID",
"reviewed_by": "someone",
})
assert resp.status_code == 400
# ===========================================================================
# 8. TestControlUpdateIntegration
# ===========================================================================
class TestControlUpdateIntegration:
"""Test that ControlUpdate schema includes status_justification."""
def test_control_update_has_status_justification(self):
from compliance.api.schemas import ControlUpdate
fields = ControlUpdate.model_fields
assert "status_justification" in fields
def test_control_response_has_status_justification(self):
from compliance.api.schemas import ControlResponse
fields = ControlResponse.model_fields
assert "status_justification" in fields
def test_control_status_enum_has_in_progress(self):
assert ControlStatusEnum.IN_PROGRESS.value == "in_progress"
# ===========================================================================
# 9. TestEvidenceEnums
# ===========================================================================
class TestEvidenceEnums:
"""Test the new evidence enums."""
def test_confidence_enum_values(self):
assert EvidenceConfidenceEnum.E0.value == "E0"
assert EvidenceConfidenceEnum.E1.value == "E1"
assert EvidenceConfidenceEnum.E2.value == "E2"
assert EvidenceConfidenceEnum.E3.value == "E3"
assert EvidenceConfidenceEnum.E4.value == "E4"
def test_truth_status_enum_values(self):
assert EvidenceTruthStatusEnum.GENERATED.value == "generated"
assert EvidenceTruthStatusEnum.UPLOADED.value == "uploaded"
assert EvidenceTruthStatusEnum.OBSERVED.value == "observed"
assert EvidenceTruthStatusEnum.VALIDATED_INTERNAL.value == "validated_internal"
assert EvidenceTruthStatusEnum.REJECTED.value == "rejected"
assert EvidenceTruthStatusEnum.PROVIDED_TO_AUDITOR.value == "provided_to_auditor"
assert EvidenceTruthStatusEnum.ACCEPTED_BY_AUDITOR.value == "accepted_by_auditor"

View File

@@ -0,0 +1,528 @@
"""Tests for Anti-Fake-Evidence Phase 2.
~35 tests covering:
- Audit trail extension (evidence review/create logging)
- Assertion engine (extraction, CRUD, verify, summary)
- Four-Eyes review (domain check, first/second review, same-person reject)
- UI badge data (response schema includes new fields)
"""
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.evidence_routes import (
router as evidence_router,
_requires_four_eyes,
_classify_confidence,
_classify_truth_status,
)
from compliance.api.assertion_routes import router as assertion_router
from compliance.services.assertion_engine import extract_assertions, _classify_sentence
from compliance.db.models import (
EvidenceConfidenceEnum,
EvidenceTruthStatusEnum,
ControlStatusEnum,
AssertionDB,
)
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(evidence_router)
app.include_router(assertion_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
EVIDENCE_UUID = "eeee0002-aaaa-bbbb-cccc-ffffffffffff"
CONTROL_UUID = "cccc0002-aaaa-bbbb-cccc-dddddddddddd"
ASSERTION_UUID = "aaaa0002-bbbb-cccc-dddd-eeeeeeeeeeee"
NOW = datetime(2026, 3, 23, 14, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_evidence(overrides=None):
e = MagicMock()
e.id = EVIDENCE_UUID
e.control_id = CONTROL_UUID
e.evidence_type = "test_results"
e.title = "Phase 2 Test Evidence"
e.description = "Testing four-eyes"
e.artifact_url = "https://ci.example.com/artifact"
e.artifact_path = None
e.artifact_hash = "abc123"
e.file_size_bytes = None
e.mime_type = None
e.status = MagicMock()
e.status.value = "valid"
e.uploaded_by = None
e.source = "api"
e.ci_job_id = None
e.valid_from = NOW
e.valid_until = NOW + timedelta(days=90)
e.collected_at = NOW
e.created_at = NOW
e.confidence_level = EvidenceConfidenceEnum.E1
e.truth_status = EvidenceTruthStatusEnum.UPLOADED
e.generation_mode = None
e.may_be_used_as_evidence = True
e.reviewed_by = None
e.reviewed_at = None
# Phase 2 fields
e.approval_status = "none"
e.first_reviewer = None
e.first_reviewed_at = None
e.second_reviewer = None
e.second_reviewed_at = None
e.requires_four_eyes = False
if overrides:
for k, v in overrides.items():
setattr(e, k, v)
return e
def make_assertion(overrides=None):
a = MagicMock()
a.id = ASSERTION_UUID
a.tenant_id = "tenant-001"
a.entity_type = "control"
a.entity_id = CONTROL_UUID
a.sentence_text = "Test assertion sentence"
a.sentence_index = 0
a.assertion_type = "assertion"
a.evidence_ids = []
a.confidence = 0.0
a.normative_tier = "pflicht"
a.verified_by = None
a.verified_at = None
a.created_at = NOW
a.updated_at = NOW
if overrides:
for k, v in overrides.items():
setattr(a, k, v)
return a
# ===========================================================================
# 1. TestAuditTrailExtension
# ===========================================================================
class TestAuditTrailExtension:
"""Test that evidence review and create log audit trail entries."""
def test_review_evidence_logs_audit_trail(self):
evidence = make_evidence()
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"confidence_level": "E2", "reviewed_by": "auditor@test.com"},
)
assert resp.status_code == 200
# db.add should be called for audit trail entries
assert mock_db.add.called
def test_review_evidence_records_old_and_new_confidence(self):
evidence = make_evidence({"confidence_level": EvidenceConfidenceEnum.E1})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"confidence_level": "E3", "reviewed_by": "reviewer@test.com"},
)
assert resp.status_code == 200
def test_review_evidence_records_truth_status_change(self):
evidence = make_evidence({"truth_status": EvidenceTruthStatusEnum.UPLOADED})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"truth_status": "validated_internal", "reviewed_by": "reviewer@test.com"},
)
assert resp.status_code == 200
def test_review_nonexistent_evidence_returns_404(self):
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = None
resp = client.patch(
"/evidence/nonexistent/review",
json={"reviewed_by": "someone"},
)
assert resp.status_code == 404
def test_reject_evidence_logs_audit_trail(self):
evidence = make_evidence()
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/reject",
json={"reviewed_by": "auditor@test.com", "rejection_reason": "Fake evidence"},
)
assert resp.status_code == 200
data = resp.json()
assert data["approval_status"] == "rejected"
def test_reject_nonexistent_evidence_returns_404(self):
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = None
resp = client.patch(
"/evidence/nonexistent/reject",
json={"reviewed_by": "someone"},
)
assert resp.status_code == 404
def test_audit_trail_query_endpoint(self):
mock_db.reset_mock()
trail_entry = MagicMock()
trail_entry.id = "trail-001"
trail_entry.entity_type = "evidence"
trail_entry.entity_id = EVIDENCE_UUID
trail_entry.entity_name = "Test"
trail_entry.action = "review"
trail_entry.field_changed = "confidence_level"
trail_entry.old_value = "E1"
trail_entry.new_value = "E2"
trail_entry.change_summary = None
trail_entry.performed_by = "auditor"
trail_entry.performed_at = NOW
trail_entry.checksum = "abc"
mock_db.query.return_value.filter.return_value.filter.return_value.order_by.return_value.limit.return_value.all.return_value = [trail_entry]
resp = client.get(f"/audit-trail?entity_type=evidence&entity_id={EVIDENCE_UUID}")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
def test_audit_trail_checksum_present(self):
"""Audit trail entries should have a checksum for integrity."""
from compliance.api.audit_trail_utils import create_signature
sig = create_signature("evidence|123|review|user@test.com")
assert len(sig) == 64 # SHA-256 hex digest
# ===========================================================================
# 2. TestAssertionEngine
# ===========================================================================
class TestAssertionEngine:
"""Test assertion extraction and classification."""
def test_pflicht_sentence_classified_as_assertion(self):
result = _classify_sentence("Die Organisation muss ein ISMS implementieren.")
assert result == ("assertion", "pflicht")
def test_empfehlung_sentence_classified(self):
result = _classify_sentence("Die Organisation sollte regelmäßige Audits durchführen.")
assert result == ("assertion", "empfehlung")
def test_kann_sentence_classified(self):
result = _classify_sentence("Optional kann ein externes Audit durchgeführt werden.")
assert result == ("assertion", "kann")
def test_rationale_sentence_classified(self):
result = _classify_sentence("Dies ist erforderlich, weil Datenverlust schwere Folgen hat.")
assert result == ("rationale", None)
def test_fact_sentence_with_evidence_keyword(self):
result = _classify_sentence("Das Zertifikat wurde am 15.03.2026 ausgestellt.")
assert result == ("fact", None)
def test_extract_assertions_splits_sentences(self):
text = "Die Organisation muss Daten schützen. Sie sollte regelmäßig prüfen."
results = extract_assertions(text, "control", "ctrl-001")
assert len(results) == 2
assert results[0]["assertion_type"] == "assertion"
assert results[0]["normative_tier"] == "pflicht"
assert results[1]["normative_tier"] == "empfehlung"
def test_extract_assertions_empty_text(self):
results = extract_assertions("", "control", "ctrl-001")
assert results == []
def test_extract_assertions_single_sentence(self):
results = extract_assertions("Der Betreiber muss ein Audit durchführen.", "control", "ctrl-001")
assert len(results) == 1
assert results[0]["normative_tier"] == "pflicht"
def test_mixed_text_with_rationale(self):
text = "Die Organisation muss ein ISMS implementieren. Dies ist notwendig, weil Compliance gefordert ist."
results = extract_assertions(text, "control", "ctrl-001")
assert len(results) == 2
types = [r["assertion_type"] for r in results]
assert "assertion" in types
assert "rationale" in types
def test_assertion_crud_create(self):
mock_db.reset_mock()
mock_db.refresh.return_value = None
# Mock the added object to return proper values
def side_effect_add(obj):
obj.id = ASSERTION_UUID
obj.created_at = NOW
obj.updated_at = NOW
obj.sentence_index = 0
obj.confidence = 0.0
mock_db.add.side_effect = side_effect_add
resp = client.post(
"/assertions?tenant_id=tenant-001",
json={
"entity_type": "control",
"entity_id": CONTROL_UUID,
"sentence_text": "Die Organisation muss ein ISMS implementieren.",
"assertion_type": "assertion",
"normative_tier": "pflicht",
},
)
assert resp.status_code == 200
def test_assertion_verify_endpoint(self):
a = make_assertion()
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = a
mock_db.refresh.return_value = None
resp = client.post(f"/assertions/{ASSERTION_UUID}/verify?verified_by=auditor@test.com")
assert resp.status_code == 200
assert a.assertion_type == "fact"
assert a.verified_by == "auditor@test.com"
def test_assertion_summary(self):
mock_db.reset_mock()
a1 = make_assertion({"assertion_type": "assertion", "verified_by": None})
a2 = make_assertion({"assertion_type": "fact", "verified_by": "user"})
a3 = make_assertion({"assertion_type": "rationale", "verified_by": None})
mock_db.query.return_value.filter.return_value.filter.return_value.filter.return_value.all.return_value = [a1, a2, a3]
# Direct .all() for no-filter case
mock_db.query.return_value.all.return_value = [a1, a2, a3]
resp = client.get("/assertions/summary")
assert resp.status_code == 200
data = resp.json()
assert data["total_assertions"] == 3
assert data["total_facts"] == 1
assert data["total_rationale"] == 1
assert data["unverified_count"] == 1
# ===========================================================================
# 3. TestFourEyesReview
# ===========================================================================
class TestFourEyesReview:
"""Test Four-Eyes review process."""
def test_gov_domain_requires_four_eyes(self):
assert _requires_four_eyes("gov") is True
def test_priv_domain_requires_four_eyes(self):
assert _requires_four_eyes("priv") is True
def test_ops_domain_does_not_require_four_eyes(self):
assert _requires_four_eyes("ops") is False
def test_sdlc_domain_does_not_require_four_eyes(self):
assert _requires_four_eyes("sdlc") is False
def test_first_review_sets_first_approved(self):
evidence = make_evidence({
"requires_four_eyes": True,
"approval_status": "pending_first",
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"reviewed_by": "reviewer1@test.com"},
)
assert resp.status_code == 200
assert evidence.first_reviewer == "reviewer1@test.com"
assert evidence.approval_status == "first_approved"
def test_second_review_different_person_approves(self):
evidence = make_evidence({
"requires_four_eyes": True,
"approval_status": "first_approved",
"first_reviewer": "reviewer1@test.com",
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"reviewed_by": "reviewer2@test.com"},
)
assert resp.status_code == 200
assert evidence.second_reviewer == "reviewer2@test.com"
assert evidence.approval_status == "approved"
def test_same_person_second_review_rejected(self):
evidence = make_evidence({
"requires_four_eyes": True,
"approval_status": "first_approved",
"first_reviewer": "reviewer1@test.com",
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"reviewed_by": "reviewer1@test.com"},
)
assert resp.status_code == 400
assert "different" in resp.json()["detail"].lower()
def test_already_approved_blocked(self):
evidence = make_evidence({
"requires_four_eyes": True,
"approval_status": "approved",
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"reviewed_by": "reviewer3@test.com"},
)
assert resp.status_code == 400
assert "already" in resp.json()["detail"].lower()
def test_rejected_evidence_cannot_be_reviewed(self):
evidence = make_evidence({
"requires_four_eyes": True,
"approval_status": "rejected",
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"reviewed_by": "reviewer@test.com"},
)
assert resp.status_code == 400
def test_reject_endpoint(self):
evidence = make_evidence({"requires_four_eyes": True})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/reject",
json={"reviewed_by": "auditor@test.com", "rejection_reason": "Not authentic"},
)
assert resp.status_code == 200
assert evidence.approval_status == "rejected"
# ===========================================================================
# 4. TestUIBadgeData
# ===========================================================================
class TestUIBadgeData:
"""Test that evidence response includes all Phase 2 fields."""
def test_evidence_response_includes_approval_status(self):
evidence = make_evidence({
"approval_status": "first_approved",
"first_reviewer": "reviewer1@test.com",
"first_reviewed_at": NOW,
"requires_four_eyes": True,
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
mock_db.refresh.return_value = None
resp = client.patch(
f"/evidence/{EVIDENCE_UUID}/review",
json={"reviewed_by": "reviewer2@test.com"},
)
assert resp.status_code == 200
data = resp.json()
assert "approval_status" in data
assert "requires_four_eyes" in data
assert data["requires_four_eyes"] is True
def test_evidence_response_includes_four_eyes_fields(self):
evidence = make_evidence({
"requires_four_eyes": True,
"approval_status": "approved",
"first_reviewer": "r1@test.com",
"first_reviewed_at": NOW,
"second_reviewer": "r2@test.com",
"second_reviewed_at": NOW,
})
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = evidence
# Use list endpoint
mock_db.query.return_value.filter.return_value.all.return_value = [evidence]
mock_db.query.return_value.all.return_value = [evidence]
# Direct test via _build_evidence_response
from compliance.api.evidence_routes import _build_evidence_response
resp = _build_evidence_response(evidence)
assert resp.approval_status == "approved"
assert resp.first_reviewer == "r1@test.com"
assert resp.second_reviewer == "r2@test.com"
assert resp.requires_four_eyes is True
def test_assertion_response_schema(self):
a = make_assertion()
mock_db.reset_mock()
mock_db.query.return_value.filter.return_value.first.return_value = a
resp = client.get(f"/assertions/{ASSERTION_UUID}")
assert resp.status_code == 200
data = resp.json()
assert "assertion_type" in data
assert "normative_tier" in data
assert "evidence_ids" in data
assert "verified_by" in data
def test_evidence_response_includes_confidence_and_truth(self):
evidence = make_evidence({
"confidence_level": EvidenceConfidenceEnum.E3,
"truth_status": EvidenceTruthStatusEnum.OBSERVED,
})
from compliance.api.evidence_routes import _build_evidence_response
resp = _build_evidence_response(evidence)
assert resp.confidence_level == "E3"
assert resp.truth_status == "observed"
def test_evidence_response_none_four_eyes_fields_default(self):
evidence = make_evidence()
from compliance.api.evidence_routes import _build_evidence_response
resp = _build_evidence_response(evidence)
assert resp.approval_status == "none"
assert resp.requires_four_eyes is False
assert resp.first_reviewer is None

View File

@@ -0,0 +1,191 @@
"""Tests for Anti-Fake-Evidence Phase 3: Enforcement.
~8 tests covering:
- Evidence distribution endpoint (confidence counts, four-eyes pending)
- Dashboard multi-score presence
"""
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch, PropertyMock
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.dashboard_routes import router as dashboard_router
from compliance.db.models import EvidenceConfidenceEnum, EvidenceTruthStatusEnum
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(dashboard_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
NOW = datetime(2026, 3, 23, 14, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_evidence(confidence="E1", requires_four_eyes=False, approval_status="none"):
e = MagicMock()
e.confidence_level = MagicMock()
e.confidence_level.value = confidence
e.requires_four_eyes = requires_four_eyes
e.approval_status = approval_status
return e
# ===========================================================================
# 1. TestEvidenceDistributionEndpoint
# ===========================================================================
class TestEvidenceDistributionEndpoint:
"""Test GET /dashboard/evidence-distribution endpoint."""
def _setup_evidence(self, evidence_list):
"""Configure mock DB to return evidence list via EvidenceRepository."""
mock_db.reset_mock()
# EvidenceRepository(db).get_all() internally does db.query(...).all()
# We patch the EvidenceRepository class to return our list
return evidence_list
@patch("compliance.api.dashboard_routes.EvidenceRepository")
def test_empty_db_returns_zero_counts(self, mock_repo_cls):
mock_repo = MagicMock()
mock_repo.get_all.return_value = []
mock_repo_cls.return_value = mock_repo
resp = client.get("/dashboard/evidence-distribution")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["four_eyes_pending"] == 0
assert data["by_confidence"] == {"E0": 0, "E1": 0, "E2": 0, "E3": 0, "E4": 0}
@patch("compliance.api.dashboard_routes.EvidenceRepository")
def test_counts_by_confidence_level(self, mock_repo_cls):
evidence = [
make_evidence("E0"),
make_evidence("E1"),
make_evidence("E1"),
make_evidence("E2"),
make_evidence("E3"),
make_evidence("E3"),
make_evidence("E3"),
make_evidence("E4"),
]
mock_repo = MagicMock()
mock_repo.get_all.return_value = evidence
mock_repo_cls.return_value = mock_repo
resp = client.get("/dashboard/evidence-distribution")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 8
assert data["by_confidence"]["E0"] == 1
assert data["by_confidence"]["E1"] == 2
assert data["by_confidence"]["E2"] == 1
assert data["by_confidence"]["E3"] == 3
assert data["by_confidence"]["E4"] == 1
@patch("compliance.api.dashboard_routes.EvidenceRepository")
def test_four_eyes_pending_count(self, mock_repo_cls):
evidence = [
make_evidence("E1", requires_four_eyes=True, approval_status="pending_first"),
make_evidence("E2", requires_four_eyes=True, approval_status="first_approved"),
make_evidence("E2", requires_four_eyes=True, approval_status="approved"),
make_evidence("E1", requires_four_eyes=True, approval_status="rejected"),
make_evidence("E1", requires_four_eyes=False, approval_status="none"),
]
mock_repo = MagicMock()
mock_repo.get_all.return_value = evidence
mock_repo_cls.return_value = mock_repo
resp = client.get("/dashboard/evidence-distribution")
assert resp.status_code == 200
data = resp.json()
# pending_first and first_approved are pending; approved and rejected are not
assert data["four_eyes_pending"] == 2
assert data["total"] == 5
@patch("compliance.api.dashboard_routes.EvidenceRepository")
def test_null_confidence_defaults_to_e1(self, mock_repo_cls):
e = MagicMock()
e.confidence_level = None
e.requires_four_eyes = False
e.approval_status = "none"
mock_repo = MagicMock()
mock_repo.get_all.return_value = [e]
mock_repo_cls.return_value = mock_repo
resp = client.get("/dashboard/evidence-distribution")
assert resp.status_code == 200
data = resp.json()
assert data["by_confidence"]["E1"] == 1
assert data["total"] == 1
@patch("compliance.api.dashboard_routes.EvidenceRepository")
def test_all_four_eyes_approved_zero_pending(self, mock_repo_cls):
evidence = [
make_evidence("E2", requires_four_eyes=True, approval_status="approved"),
make_evidence("E3", requires_four_eyes=True, approval_status="approved"),
]
mock_repo = MagicMock()
mock_repo.get_all.return_value = evidence
mock_repo_cls.return_value = mock_repo
resp = client.get("/dashboard/evidence-distribution")
assert resp.status_code == 200
data = resp.json()
assert data["four_eyes_pending"] == 0
# ===========================================================================
# 2. TestDashboardMultiScore
# ===========================================================================
class TestDashboardMultiScore:
"""Test that dashboard response includes multi_score."""
def test_dashboard_response_schema_includes_multi_score(self):
"""DashboardResponse schema must include the multi_score field."""
from compliance.api.schemas import DashboardResponse
fields = DashboardResponse.model_fields
assert "multi_score" in fields, "DashboardResponse must have multi_score field"
def test_multi_score_schema_has_required_fields(self):
"""MultiDimensionalScore schema should have all 7 fields."""
from compliance.api.schemas import MultiDimensionalScore
fields = MultiDimensionalScore.model_fields
required = [
"requirement_coverage",
"evidence_strength",
"validation_quality",
"evidence_freshness",
"control_effectiveness",
"overall_readiness",
"hard_blocks",
]
for field in required:
assert field in fields, f"Missing field: {field}"
def test_multi_score_default_values(self):
"""MultiDimensionalScore defaults should be sensible."""
from compliance.api.schemas import MultiDimensionalScore
score = MultiDimensionalScore()
assert score.overall_readiness == 0.0
assert score.hard_blocks == []
assert score.requirement_coverage == 0.0

View File

@@ -0,0 +1,277 @@
"""Tests for Anti-Fake-Evidence Phase 4a: Traceability Matrix.
6 tests covering:
- Empty DB returns empty controls + zero summary
- Nested structure: Control → Evidence → Assertions
- Assertions appear under correct evidence
- Coverage flags computed correctly
- Control without evidence has correct coverage
- Summary counts match
"""
from datetime import datetime
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.dashboard_routes import router as dashboard_router
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(dashboard_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_control(id="c1", control_id="CTRL-001", title="Test Control", status="pass", domain="gov"):
ctrl = MagicMock()
ctrl.id = id
ctrl.control_id = control_id
ctrl.title = title
ctrl.status = MagicMock()
ctrl.status.value = status
ctrl.domain = MagicMock()
ctrl.domain.value = domain
return ctrl
def make_evidence(id="e1", control_id="c1", title="Evidence 1", evidence_type="scan_report",
confidence="E2", status="valid"):
e = MagicMock()
e.id = id
e.control_id = control_id
e.title = title
e.evidence_type = evidence_type
e.confidence_level = MagicMock()
e.confidence_level.value = confidence
e.status = MagicMock()
e.status.value = status
return e
def make_assertion(id="a1", entity_id="e1", sentence_text="System encrypts data at rest.",
assertion_type="assertion", confidence=0.85, verified_by=None):
a = MagicMock()
a.id = id
a.entity_id = entity_id
a.sentence_text = sentence_text
a.assertion_type = assertion_type
a.confidence = confidence
a.verified_by = verified_by
return a
# ===========================================================================
# Tests
# ===========================================================================
class TestTraceabilityMatrix:
"""Test GET /dashboard/traceability-matrix endpoint."""
@patch("compliance.api.dashboard_routes.EvidenceRepository")
@patch("compliance.api.dashboard_routes.ControlRepository")
def test_empty_db_returns_empty_matrix(self, mock_ctrl_cls, mock_ev_cls):
"""Empty DB should return zero controls and zero summary counts."""
mock_ctrl = MagicMock()
mock_ctrl.get_all.return_value = []
mock_ctrl_cls.return_value = mock_ctrl
mock_ev = MagicMock()
mock_ev.get_all.return_value = []
mock_ev_cls.return_value = mock_ev
# Mock db.query(AssertionDB).filter(...).all()
mock_db.reset_mock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = []
mock_db.query.return_value = mock_query
resp = client.get("/dashboard/traceability-matrix")
assert resp.status_code == 200
data = resp.json()
assert data["controls"] == []
assert data["summary"]["total_controls"] == 0
assert data["summary"]["covered_controls"] == 0
assert data["summary"]["fully_verified"] == 0
assert data["summary"]["uncovered_controls"] == 0
@patch("compliance.api.dashboard_routes.EvidenceRepository")
@patch("compliance.api.dashboard_routes.ControlRepository")
def test_nested_structure(self, mock_ctrl_cls, mock_ev_cls):
"""Control with evidence and assertions should return nested structure."""
ctrl = make_control(id="c1", control_id="PRIV-001", title="Privacy Control")
ev = make_evidence(id="e1", control_id="c1", confidence="E3")
assertion = make_assertion(id="a1", entity_id="e1", verified_by="auditor@example.com")
mock_ctrl = MagicMock()
mock_ctrl.get_all.return_value = [ctrl]
mock_ctrl_cls.return_value = mock_ctrl
mock_ev = MagicMock()
mock_ev.get_all.return_value = [ev]
mock_ev_cls.return_value = mock_ev
mock_db.reset_mock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = [assertion]
mock_db.query.return_value = mock_query
resp = client.get("/dashboard/traceability-matrix")
assert resp.status_code == 200
data = resp.json()
assert len(data["controls"]) == 1
c = data["controls"][0]
assert c["control_id"] == "PRIV-001"
assert len(c["evidence"]) == 1
assert c["evidence"][0]["confidence_level"] == "E3"
assert len(c["evidence"][0]["assertions"]) == 1
assert c["evidence"][0]["assertions"][0]["verified"] is True
@patch("compliance.api.dashboard_routes.EvidenceRepository")
@patch("compliance.api.dashboard_routes.ControlRepository")
def test_assertions_grouped_under_correct_evidence(self, mock_ctrl_cls, mock_ev_cls):
"""Assertions should only appear under the evidence they reference."""
ctrl = make_control(id="c1")
ev1 = make_evidence(id="e1", control_id="c1", title="Evidence A")
ev2 = make_evidence(id="e2", control_id="c1", title="Evidence B")
a1 = make_assertion(id="a1", entity_id="e1", sentence_text="Assertion for E1")
a2 = make_assertion(id="a2", entity_id="e2", sentence_text="Assertion for E2")
a3 = make_assertion(id="a3", entity_id="e2", sentence_text="Second assertion for E2")
mock_ctrl = MagicMock()
mock_ctrl.get_all.return_value = [ctrl]
mock_ctrl_cls.return_value = mock_ctrl
mock_ev = MagicMock()
mock_ev.get_all.return_value = [ev1, ev2]
mock_ev_cls.return_value = mock_ev
mock_db.reset_mock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = [a1, a2, a3]
mock_db.query.return_value = mock_query
resp = client.get("/dashboard/traceability-matrix")
assert resp.status_code == 200
data = resp.json()
c = data["controls"][0]
ev1_data = next(e for e in c["evidence"] if e["id"] == "e1")
ev2_data = next(e for e in c["evidence"] if e["id"] == "e2")
assert len(ev1_data["assertions"]) == 1
assert len(ev2_data["assertions"]) == 2
@patch("compliance.api.dashboard_routes.EvidenceRepository")
@patch("compliance.api.dashboard_routes.ControlRepository")
def test_coverage_flags_correct(self, mock_ctrl_cls, mock_ev_cls):
"""Coverage flags should reflect evidence, assertions, and verification state."""
ctrl = make_control(id="c1")
ev = make_evidence(id="e1", control_id="c1", confidence="E2")
# One verified, one not
a1 = make_assertion(id="a1", entity_id="e1", verified_by="alice")
a2 = make_assertion(id="a2", entity_id="e1", verified_by=None)
mock_ctrl = MagicMock()
mock_ctrl.get_all.return_value = [ctrl]
mock_ctrl_cls.return_value = mock_ctrl
mock_ev = MagicMock()
mock_ev.get_all.return_value = [ev]
mock_ev_cls.return_value = mock_ev
mock_db.reset_mock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = [a1, a2]
mock_db.query.return_value = mock_query
resp = client.get("/dashboard/traceability-matrix")
assert resp.status_code == 200
cov = resp.json()["controls"][0]["coverage"]
assert cov["has_evidence"] is True
assert cov["has_assertions"] is True
assert cov["all_assertions_verified"] is False # a2 not verified
assert cov["min_confidence_level"] == "E2"
@patch("compliance.api.dashboard_routes.EvidenceRepository")
@patch("compliance.api.dashboard_routes.ControlRepository")
def test_coverage_without_evidence(self, mock_ctrl_cls, mock_ev_cls):
"""Control with no evidence should have all coverage flags False/None."""
ctrl = make_control(id="c1")
mock_ctrl = MagicMock()
mock_ctrl.get_all.return_value = [ctrl]
mock_ctrl_cls.return_value = mock_ctrl
mock_ev = MagicMock()
mock_ev.get_all.return_value = []
mock_ev_cls.return_value = mock_ev
mock_db.reset_mock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = []
mock_db.query.return_value = mock_query
resp = client.get("/dashboard/traceability-matrix")
assert resp.status_code == 200
cov = resp.json()["controls"][0]["coverage"]
assert cov["has_evidence"] is False
assert cov["has_assertions"] is False
assert cov["all_assertions_verified"] is False
assert cov["min_confidence_level"] is None
@patch("compliance.api.dashboard_routes.EvidenceRepository")
@patch("compliance.api.dashboard_routes.ControlRepository")
def test_summary_counts(self, mock_ctrl_cls, mock_ev_cls):
"""Summary should count total, covered, fully verified, and uncovered controls."""
# c1: has evidence + verified assertions → fully verified
# c2: has evidence but no assertions → covered, not fully verified
# c3: no evidence → uncovered
c1 = make_control(id="c1", control_id="C-001")
c2 = make_control(id="c2", control_id="C-002")
c3 = make_control(id="c3", control_id="C-003")
ev1 = make_evidence(id="e1", control_id="c1", confidence="E3")
ev2 = make_evidence(id="e2", control_id="c2", confidence="E1")
a1 = make_assertion(id="a1", entity_id="e1", verified_by="auditor")
mock_ctrl = MagicMock()
mock_ctrl.get_all.return_value = [c1, c2, c3]
mock_ctrl_cls.return_value = mock_ctrl
mock_ev = MagicMock()
mock_ev.get_all.return_value = [ev1, ev2]
mock_ev_cls.return_value = mock_ev
mock_db.reset_mock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = [a1]
mock_db.query.return_value = mock_query
resp = client.get("/dashboard/traceability-matrix")
assert resp.status_code == 200
summary = resp.json()["summary"]
assert summary["total_controls"] == 3
assert summary["covered_controls"] == 2
assert summary["fully_verified"] == 1
assert summary["uncovered_controls"] == 1

View File

@@ -61,6 +61,7 @@ def make_control(overrides=None):
c.status = MagicMock()
c.status.value = "planned"
c.status_notes = None
c.status_justification = None
c.last_reviewed_at = None
c.next_review_at = None
c.created_at = NOW
@@ -249,15 +250,15 @@ class TestUpdateControl:
assert response.status_code == 404
def test_update_status_with_valid_enum(self):
"""Status must be a valid ControlStatusEnum value."""
"""Status must be a valid ControlStatusEnum value (planned → in_progress is always allowed)."""
updated = make_control()
updated.status.value = "pass"
updated.status.value = "in_progress"
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = make_control()
MockRepo.return_value.update.return_value = updated
response = client.put(
"/compliance/controls/GOV-001",
json={"status": "pass"},
json={"status": "in_progress"},
)
assert response.status_code == 200

View File

@@ -56,6 +56,22 @@ def make_evidence(overrides=None):
e.valid_until = None
e.collected_at = NOW
e.created_at = NOW
# Anti-Fake-Evidence fields
e.confidence_level = MagicMock()
e.confidence_level.value = "E1"
e.truth_status = MagicMock()
e.truth_status.value = "uploaded"
e.generation_mode = None
e.may_be_used_as_evidence = True
e.reviewed_by = None
e.reviewed_at = None
# Phase 2 fields
e.approval_status = "none"
e.first_reviewer = None
e.first_reviewed_at = None
e.second_reviewer = None
e.second_reviewed_at = None
e.requires_four_eyes = False
if overrides:
for k, v in overrides.items():
setattr(e, k, v)