Files
breakpilot-compliance/backend-compliance/compliance/api/evidence_routes.py
T
Benjamin Admin edbf6d2be5
Build + Deploy / build-admin-compliance (push) Successful in 1m58s
Build + Deploy / build-backend-compliance (push) Successful in 12s
Build + Deploy / build-ai-sdk (push) Successful in 11s
Build + Deploy / build-developer-portal (push) Successful in 11s
Build + Deploy / build-tts (push) Successful in 21s
Build + Deploy / build-document-crawler (push) Successful in 11s
Build + Deploy / build-dsms-gateway (push) Successful in 14s
Build + Deploy / build-dsms-node (push) Successful in 14s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 15s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m40s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 40s
CI / test-python-backend (push) Successful in 37s
CI / test-python-document-crawler (push) Successful in 26s
CI / test-python-dsms-gateway (push) Successful in 22s
CI / validate-canonical-controls (push) Successful in 14s
Build + Deploy / trigger-orca (push) Successful in 2m26s
feat(dsms): Stufe 2+3 — Evidence/TechFile → DSMS + Version Chains + Audit Timeline
Stufe 2A: Evidence Upload → automatische DSMS-Archivierung
- Nach SHA-256 Hash → archive_to_dsms(), CID im Audit-Trail
- Evidence mit CID wird automatisch zu E2 (hash-verifiziert) hochgestuft

Stufe 2B: IACE Tech-File Export → DSMS
- PDF/Excel/DOCX/Markdown Exporte werden nach DSMS archiviert
- archiveTechFile() Helper fuer alle 4 Formate

Stufe 3A: DSMS Gateway — parent_cid + History Endpoint
- parent_cid + tenant_id Felder in DocumentMetadata
- GET /documents/{cid}/history — folgt parent_cid-Chain (max 50 deep)

Stufe 3C: Audit Timeline UI
- Neue Seite /sdk/audit-timeline
- Vertikale Timeline mit farbigen Action-Dots
- Filter: Alle, Nachweis, DSMS-Archiv, Control, Dokument, DSFA, VVT, TOM
- CID-Badges fuer DSMS-archivierte Eintraege

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-12 13:55:07 +02:00

920 lines
32 KiB
Python

# mypy: disable-error-code="arg-type"
"""
FastAPI routes for Evidence management.
Endpoints:
- /evidence: Evidence listing and creation
- /evidence/upload: Evidence file upload
- /evidence/collect: CI/CD evidence collection
- /evidence/ci-status: CI/CD evidence status
Phase 1 Step 4 refactor: handlers delegate to EvidenceService. Pure
helpers (`_parse_ci_evidence`, `_extract_findings_detail`) and the
SOURCE_CONTROL_MAP constant are re-exported from this module so the
existing tests (tests/test_evidence_routes.py) continue to import them
from the legacy path.
"""
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
ControlRepository,
EvidenceRepository,
EvidenceStatusEnum,
EvidenceConfidenceEnum,
EvidenceTruthStatusEnum,
)
from ..db.models import EvidenceDB, ControlDB, AuditTrailDB
from ..services.auto_risk_updater import AutoRiskUpdater
from ..services.evidence_service import EvidenceService
from .schemas import (
EvidenceCreate, EvidenceResponse, EvidenceListResponse,
EvidenceRejectRequest,
)
from .audit_trail_utils import log_audit_trail
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-evidence"])
def get_evidence_service(db: Session = Depends(get_db)) -> EvidenceService:
# Read repo + auto-updater classes from this module's namespace at call
# time so test patches against compliance.api.evidence_routes.* propagate.
return EvidenceService(
db,
evidence_repo_cls=EvidenceRepository,
control_repo_cls=ControlRepository,
auto_updater_cls=AutoRiskUpdater,
)
# ============================================================================
# Anti-Fake-Evidence: Four-Eyes Domain Check
# ============================================================================
FOUR_EYES_DOMAINS = {"gov", "priv"}
def _requires_four_eyes(control_domain: str) -> bool:
"""Controls in governance/privacy domains require two independent reviewers."""
return control_domain in FOUR_EYES_DOMAINS
# ============================================================================
# Anti-Fake-Evidence: Auto-Classification Helpers
# ============================================================================
def _classify_confidence(source: Optional[str], evidence_type: Optional[str] = None, artifact_hash: Optional[str] = None) -> EvidenceConfidenceEnum:
"""Classify evidence confidence level based on source and metadata."""
if source == "ci_pipeline":
return EvidenceConfidenceEnum.E3
if source == "api" and artifact_hash:
return EvidenceConfidenceEnum.E3
if source == "api":
return EvidenceConfidenceEnum.E3
if source in ("manual", "upload"):
return EvidenceConfidenceEnum.E1
if source == "generated":
return EvidenceConfidenceEnum.E0
# Default for unknown sources
return EvidenceConfidenceEnum.E1
def _classify_truth_status(source: Optional[str]) -> EvidenceTruthStatusEnum:
"""Classify evidence truth status based on source."""
if source == "ci_pipeline":
return EvidenceTruthStatusEnum.OBSERVED
if source in ("manual", "upload"):
return EvidenceTruthStatusEnum.UPLOADED
if source == "generated":
return EvidenceTruthStatusEnum.GENERATED
if source == "api":
return EvidenceTruthStatusEnum.OBSERVED
return EvidenceTruthStatusEnum.UPLOADED
def _build_evidence_response(e: EvidenceDB) -> EvidenceResponse:
"""Build an EvidenceResponse from an EvidenceDB, including anti-fake fields."""
return EvidenceResponse(
id=e.id,
control_id=e.control_id,
evidence_type=e.evidence_type,
title=e.title,
description=e.description,
artifact_path=e.artifact_path,
artifact_url=e.artifact_url,
artifact_hash=e.artifact_hash,
file_size_bytes=e.file_size_bytes,
mime_type=e.mime_type,
valid_from=e.valid_from,
valid_until=e.valid_until,
status=e.status.value if e.status else None,
source=e.source,
ci_job_id=e.ci_job_id,
uploaded_by=e.uploaded_by,
collected_at=e.collected_at,
created_at=e.created_at,
confidence_level=e.confidence_level.value if e.confidence_level else None,
truth_status=e.truth_status.value if e.truth_status else None,
generation_mode=e.generation_mode,
may_be_used_as_evidence=e.may_be_used_as_evidence,
reviewed_by=e.reviewed_by,
reviewed_at=e.reviewed_at,
approval_status=e.approval_status,
first_reviewer=e.first_reviewer,
first_reviewed_at=e.first_reviewed_at,
second_reviewer=e.second_reviewer,
second_reviewed_at=e.second_reviewed_at,
requires_four_eyes=e.requires_four_eyes,
)
# ============================================================================
# Evidence
# ============================================================================
@router.get("/evidence", response_model=EvidenceListResponse)
async def list_evidence(
control_id: Optional[str] = None,
evidence_type: Optional[str] = None,
status: Optional[str] = None,
page: Optional[int] = Query(None, ge=1, description="Page number (1-based)"),
limit: Optional[int] = Query(None, ge=1, le=500, description="Items per page"),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceListResponse:
"""List evidence with optional filters and pagination."""
repo = EvidenceRepository(db)
if control_id:
# First get the control UUID
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
evidence = repo.get_by_control(control.id)
else:
evidence = repo.get_all()
if evidence_type:
evidence = [e for e in evidence if e.evidence_type == evidence_type]
if status:
try:
status_enum = EvidenceStatusEnum(status)
evidence = [e for e in evidence if e.status == status_enum]
except ValueError:
pass
total = len(evidence)
# Apply pagination if requested
if page is not None and limit is not None:
offset = (page - 1) * limit
evidence = evidence[offset:offset + limit]
results = [_build_evidence_response(e) for e in evidence]
return EvidenceListResponse(evidence=results, total=total)
@router.post("/evidence", response_model=EvidenceResponse)
async def create_evidence(
evidence_data: EvidenceCreate,
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceResponse:
"""Create new evidence record."""
repo = EvidenceRepository(db)
# Get control UUID
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(evidence_data.control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {evidence_data.control_id} not found")
source = evidence_data.source or "api"
confidence = _classify_confidence(source, evidence_data.evidence_type)
truth = _classify_truth_status(source)
# Allow explicit override from request
if evidence_data.confidence_level:
try:
confidence = EvidenceConfidenceEnum(evidence_data.confidence_level)
except ValueError:
pass
if evidence_data.truth_status:
try:
truth = EvidenceTruthStatusEnum(evidence_data.truth_status)
except ValueError:
pass
evidence = repo.create(
control_id=control.id,
evidence_type=evidence_data.evidence_type,
title=evidence_data.title,
description=evidence_data.description,
artifact_url=evidence_data.artifact_url,
valid_from=evidence_data.valid_from,
valid_until=evidence_data.valid_until,
source=source,
ci_job_id=evidence_data.ci_job_id,
)
# Set anti-fake-evidence fields
evidence.confidence_level = confidence
evidence.truth_status = truth
# Generated evidence should not be used as evidence by default
if truth == EvidenceTruthStatusEnum.GENERATED:
evidence.may_be_used_as_evidence = False
# Four-Eyes: check if the linked control's domain requires it
control_domain = control.domain.value if control.domain else ""
if _requires_four_eyes(control_domain):
evidence.requires_four_eyes = True
evidence.approval_status = "pending_first"
db.commit()
# Audit trail
log_audit_trail(
db, "evidence", evidence.id, evidence.title, "create",
performed_by=evidence_data.source or "api",
change_summary=f"Evidence created with confidence={confidence.value}, truth={truth.value}",
)
db.commit()
resp = _build_evidence_response(evidence)
if dsms_cid:
resp["dsms_cid"] = dsms_cid
return resp
@router.delete("/evidence/{evidence_id}")
async def delete_evidence(
evidence_id: str,
service: EvidenceService = Depends(get_evidence_service),
) -> dict[str, Any]:
"""Delete an evidence record."""
with translate_domain_errors():
return service.delete_evidence(evidence_id)
# ============================================================================
# Upload
# ============================================================================
@router.post("/evidence/upload")
async def upload_evidence(
control_id: str = Query(...),
evidence_type: str = Query(...),
title: str = Query(...),
file: UploadFile = File(...),
description: Optional[str] = Query(None),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceResponse:
"""Upload evidence file."""
# Get control UUID
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
# Create upload directory
upload_dir = f"/tmp/compliance_evidence/{control_id}"
os.makedirs(upload_dir, exist_ok=True)
# Save file
file_path = os.path.join(upload_dir, file.filename)
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
# Calculate hash
file_hash = hashlib.sha256(content).hexdigest()
# Create evidence record
repo = EvidenceRepository(db)
evidence = repo.create(
control_id=control.id,
evidence_type=evidence_type,
title=title,
description=description,
artifact_path=file_path,
artifact_hash=file_hash,
file_size_bytes=len(content),
mime_type=file.content_type,
source="upload",
)
# Upload evidence → E1 + uploaded
evidence.confidence_level = EvidenceConfidenceEnum.E1
evidence.truth_status = EvidenceTruthStatusEnum.UPLOADED
# Archive to DSMS (best-effort, non-blocking)
dsms_cid = None
try:
from compliance.services.dsms_client import archive_to_dsms
dsms_result = await archive_to_dsms(
content=content, filename=file.filename,
document_type="evidence", document_id=str(evidence.id),
version="1", tenant_id=control_id,
)
dsms_cid = dsms_result.get("cid")
if dsms_cid:
evidence.confidence_level = EvidenceConfidenceEnum.E2
from compliance.api.audit_trail_utils import log_audit_trail
log_audit_trail(db, "evidence", str(evidence.id), title, "archive",
"system", field_changed="dsms_cid", new_value=dsms_cid,
change_summary=f"Evidence archived to DSMS: {dsms_cid}")
except Exception:
pass # DSMS unavailable
# Four-Eyes: check if the linked control's domain requires it
control_domain = control.domain.value if control.domain else ""
if _requires_four_eyes(control_domain):
evidence.requires_four_eyes = True
evidence.approval_status = "pending_first"
db.commit()
resp = _build_evidence_response(evidence)
if dsms_cid:
resp["dsms_cid"] = dsms_cid
return resp
# ============================================================================
# CI/CD Evidence Collection — helpers
# ============================================================================
# Map CI source names to the corresponding control IDs
SOURCE_CONTROL_MAP = {
"sast": "SDLC-001",
"dependency_scan": "SDLC-002",
"secret_scan": "SDLC-003",
"code_review": "SDLC-004",
"sbom": "SDLC-005",
"container_scan": "SDLC-006",
"test_results": "AUD-001",
}
def _parse_ci_evidence(data: dict) -> dict:
"""
Parse and validate incoming CI evidence data.
Returns a dict with:
- report_json: str (serialised JSON)
- report_hash: str (SHA-256 hex digest)
- evidence_status: str ("valid" or "failed")
- findings_count: int
- critical_findings: int
"""
report_json = json.dumps(data) if data else "{}"
report_hash = hashlib.sha256(report_json.encode()).hexdigest()
findings_count = 0
critical_findings = 0
if data and isinstance(data, dict):
# Semgrep format
if "results" in data:
findings_count = len(data.get("results", []))
critical_findings = len([
r for r in data.get("results", [])
if r.get("extra", {}).get("severity", "").upper() in ["CRITICAL", "HIGH"]
])
# Trivy format
elif "Results" in data:
for result in data.get("Results", []):
vulns = result.get("Vulnerabilities", [])
findings_count += len(vulns)
critical_findings += len([
v for v in vulns
if v.get("Severity", "").upper() in ["CRITICAL", "HIGH"]
])
# Generic findings array
elif "findings" in data:
findings_count = len(data.get("findings", []))
# SBOM format - just count components
elif "components" in data:
findings_count = len(data.get("components", []))
evidence_status = "failed" if critical_findings > 0 else "valid"
return {
"report_json": report_json,
"report_hash": report_hash,
"evidence_status": evidence_status,
"findings_count": findings_count,
"critical_findings": critical_findings,
}
def _store_evidence(
db: Session,
*,
control_db_id: str,
source: str,
parsed: dict,
ci_job_id: str,
ci_job_url: str,
report_data: dict,
) -> EvidenceDB:
"""
Persist a CI evidence item to the database and write the report file.
Returns the created EvidenceDB instance (already committed).
"""
findings_count = parsed["findings_count"]
critical_findings = parsed["critical_findings"]
# Build title and description
title = f"{source.upper()} Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
description = "Automatically collected from CI/CD pipeline"
if findings_count > 0:
description += f"\n- Total findings: {findings_count}"
if critical_findings > 0:
description += f"\n- Critical/High findings: {critical_findings}"
if ci_job_id:
description += f"\n- CI Job ID: {ci_job_id}"
if ci_job_url:
description += f"\n- CI Job URL: {ci_job_url}"
# Store report file
upload_dir = f"/tmp/compliance_evidence/ci/{source}"
os.makedirs(upload_dir, exist_ok=True)
file_name = f"{source}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{parsed['report_hash'][:8]}.json"
file_path = os.path.join(upload_dir, file_name)
with open(file_path, "w") as f:
json.dump(report_data or {}, f, indent=2)
# Create evidence record with anti-fake-evidence classification
evidence = EvidenceDB(
id=str(uuid_module.uuid4()),
control_id=control_db_id,
evidence_type=f"ci_{source}",
title=title,
description=description,
artifact_path=file_path,
artifact_hash=parsed["report_hash"],
file_size_bytes=len(parsed["report_json"]),
mime_type="application/json",
source="ci_pipeline",
ci_job_id=ci_job_id,
valid_from=datetime.utcnow(),
valid_until=datetime.utcnow() + timedelta(days=90),
status=EvidenceStatusEnum(parsed["evidence_status"]),
# CI pipeline evidence → E3 observed (system-observed, hash-verified)
confidence_level=EvidenceConfidenceEnum.E3,
truth_status=EvidenceTruthStatusEnum.OBSERVED,
may_be_used_as_evidence=True,
)
db.add(evidence)
db.commit()
db.refresh(evidence)
return evidence
def _extract_findings_detail(report_data: dict) -> dict:
"""
Extract severity-bucketed finding counts from report data.
Returns dict with keys: critical, high, medium, low.
"""
findings_detail = {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
}
if not report_data:
return findings_detail
# Semgrep format
if "results" in report_data:
for r in report_data.get("results", []):
severity = r.get("extra", {}).get("severity", "").upper()
if severity == "CRITICAL":
findings_detail["critical"] += 1
elif severity == "HIGH":
findings_detail["high"] += 1
elif severity == "MEDIUM":
findings_detail["medium"] += 1
elif severity in ["LOW", "INFO"]:
findings_detail["low"] += 1
# Trivy format
elif "Results" in report_data:
for result in report_data.get("Results", []):
for v in result.get("Vulnerabilities", []):
severity = v.get("Severity", "").upper()
if severity == "CRITICAL":
findings_detail["critical"] += 1
elif severity == "HIGH":
findings_detail["high"] += 1
elif severity == "MEDIUM":
findings_detail["medium"] += 1
elif severity == "LOW":
findings_detail["low"] += 1
# Generic findings with severity
elif "findings" in report_data:
for f in report_data.get("findings", []):
severity = f.get("severity", "").upper()
if severity == "CRITICAL":
findings_detail["critical"] += 1
elif severity == "HIGH":
findings_detail["high"] += 1
elif severity == "MEDIUM":
findings_detail["medium"] += 1
else:
findings_detail["low"] += 1
return findings_detail
def _update_risks(db: Session, *, source: str, control_id: str, ci_job_id: str, report_data: dict):
"""
Update risk status based on new evidence.
Uses AutoRiskUpdater to update Control status and linked Risks based on
severity-bucketed findings. Returns the update result or None on error.
"""
findings_detail = _extract_findings_detail(report_data)
try:
auto_updater = AutoRiskUpdater(db)
risk_update_result = auto_updater.process_evidence_collect_request(
tool=source,
control_id=control_id,
evidence_type=f"ci_{source}",
timestamp=datetime.utcnow().isoformat(),
commit_sha=report_data.get("commit_sha", "unknown") if report_data else "unknown",
ci_job_id=ci_job_id,
findings=findings_detail,
)
return risk_update_result
except Exception:
return None
# ============================================================================
# CI/CD Evidence Collection
# ============================================================================
def _update_risks(
db: Session,
*,
source: str,
control_id: str,
ci_job_id: Optional[str],
report_data: Optional[dict[str, Any]],
) -> Any:
"""Thin wrapper so test patches against this module's _update_risks take effect."""
return _update_risks_impl(
db,
source=source,
control_id=control_id,
ci_job_id=ci_job_id,
report_data=report_data,
auto_updater_cls=AutoRiskUpdater,
)
@router.post("/evidence/collect")
async def collect_ci_evidence(
source: str = Query(
...,
description="Evidence source: sast, dependency_scan, sbom, container_scan, test_results",
),
ci_job_id: Optional[str] = Query(None, description="CI/CD Job ID for traceability"),
ci_job_url: Optional[str] = Query(None, description="URL to CI/CD job"),
report_data: Optional[dict[str, Any]] = None,
db: Session = Depends(get_db),
) -> dict[str, Any]:
"""
Collect evidence from CI/CD pipeline.
Handler stays inline so tests can patch
``compliance.api.evidence_routes._store_evidence`` /
``compliance.api.evidence_routes._update_risks`` directly.
"""
if source not in SOURCE_CONTROL_MAP:
raise HTTPException(
status_code=400,
detail=f"Unknown source '{source}'. Supported: {list(SOURCE_CONTROL_MAP.keys())}",
)
control_id = SOURCE_CONTROL_MAP[source]
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if not control:
raise HTTPException(
status_code=404,
detail=f"Control {control_id} not found. Please seed the database first.",
)
parsed = _parse_ci_evidence(report_data or {})
evidence = _store_evidence(
db,
control_db_id=control.id,
source=source,
parsed=parsed,
ci_job_id=ci_job_id,
ci_job_url=ci_job_url,
report_data=report_data,
)
risk_update_result = _update_risks(
db,
source=source,
control_id=control_id,
ci_job_id=ci_job_id,
report_data=report_data,
)
return {
"success": True,
"evidence_id": evidence.id,
"control_id": control_id,
"source": source,
"status": parsed["evidence_status"],
"findings_count": parsed["findings_count"],
"critical_findings": parsed["critical_findings"],
"artifact_path": evidence.artifact_path,
"message": f"Evidence collected successfully for control {control_id}",
"auto_risk_update": (
{
"enabled": True,
"control_updated": risk_update_result.control_updated,
"old_status": risk_update_result.old_status,
"new_status": risk_update_result.new_status,
"risks_affected": risk_update_result.risks_affected,
"alerts_generated": risk_update_result.alerts_generated,
}
if risk_update_result
else {"enabled": False, "error": "Auto-update skipped"}
),
}
@router.get("/evidence/ci-status")
async def get_ci_evidence_status(
control_id: Optional[str] = Query(None, description="Filter by control ID"),
days: int = Query(30, description="Look back N days"),
service: EvidenceService = Depends(get_evidence_service),
) -> dict[str, Any]:
"""Get CI/CD evidence collection status overview."""
with translate_domain_errors():
return service.ci_status(control_id, days)
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers directly.
# ----------------------------------------------------------------------------
if control_id:
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if control:
query = query.filter(EvidenceDB.control_id == control.id)
evidence_list = query.order_by(EvidenceDB.collected_at.desc()).limit(100).all()
# Group by control and calculate stats
control_stats = defaultdict(lambda: {
"total": 0,
"valid": 0,
"failed": 0,
"last_collected": None,
"evidence": [],
})
for e in evidence_list:
# Get control_id string
control = db.query(ControlDB).filter(ControlDB.id == e.control_id).first()
ctrl_id = control.control_id if control else "unknown"
stats = control_stats[ctrl_id]
stats["total"] += 1
if e.status:
if e.status.value == "valid":
stats["valid"] += 1
elif e.status.value == "failed":
stats["failed"] += 1
if not stats["last_collected"] or e.collected_at > stats["last_collected"]:
stats["last_collected"] = e.collected_at
# Add evidence summary
stats["evidence"].append({
"id": e.id,
"type": e.evidence_type,
"status": e.status.value if e.status else None,
"collected_at": e.collected_at.isoformat() if e.collected_at else None,
"ci_job_id": e.ci_job_id,
})
# Convert to list and sort
result = []
for ctrl_id, stats in control_stats.items():
result.append({
"control_id": ctrl_id,
"total_evidence": stats["total"],
"valid_count": stats["valid"],
"failed_count": stats["failed"],
"last_collected": stats["last_collected"].isoformat() if stats["last_collected"] else None,
"recent_evidence": stats["evidence"][:5],
})
result.sort(key=lambda x: x["last_collected"] or "", reverse=True)
return {
"period_days": days,
"total_evidence": len(evidence_list),
"controls": result,
}
# ============================================================================
# Evidence Review (Anti-Fake-Evidence)
# ============================================================================
from pydantic import BaseModel as _BaseModel
class _EvidenceReviewRequest(_BaseModel):
confidence_level: Optional[str] = None
truth_status: Optional[str] = None
reviewed_by: str
@router.patch("/evidence/{evidence_id}/review", response_model=EvidenceResponse)
async def review_evidence(
evidence_id: str,
review: _EvidenceReviewRequest,
db: Session = Depends(get_db),
):
"""
Review evidence: upgrade confidence level and/or change truth status.
For Four-Eyes evidence, the first reviewer sets first_reviewer and
approval_status='first_approved'. A second (different) reviewer then
sets second_reviewer and approval_status='approved'.
"""
evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
if not evidence:
raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found")
old_confidence = evidence.confidence_level.value if evidence.confidence_level else None
old_truth = evidence.truth_status.value if evidence.truth_status else None
if review.confidence_level:
try:
evidence.confidence_level = EvidenceConfidenceEnum(review.confidence_level)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid confidence_level: {review.confidence_level}")
if review.truth_status:
try:
evidence.truth_status = EvidenceTruthStatusEnum(review.truth_status)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid truth_status: {review.truth_status}")
# Four-Eyes branching
if evidence.requires_four_eyes:
status = evidence.approval_status or "none"
if status in ("none", "pending_first"):
evidence.first_reviewer = review.reviewed_by
evidence.first_reviewed_at = datetime.utcnow()
evidence.approval_status = "first_approved"
elif status == "first_approved":
if review.reviewed_by == evidence.first_reviewer:
raise HTTPException(
status_code=400,
detail="Four-Eyes: second reviewer must be different from first reviewer",
)
evidence.second_reviewer = review.reviewed_by
evidence.second_reviewed_at = datetime.utcnow()
evidence.approval_status = "approved"
elif status == "approved":
raise HTTPException(status_code=400, detail="Evidence already approved")
elif status == "rejected":
raise HTTPException(status_code=400, detail="Evidence was rejected — create new evidence instead")
evidence.reviewed_by = review.reviewed_by
evidence.reviewed_at = datetime.utcnow()
db.commit()
# Audit trail
new_confidence = evidence.confidence_level.value if evidence.confidence_level else None
if old_confidence != new_confidence:
log_audit_trail(
db, "evidence", evidence_id, evidence.title, "review",
performed_by=review.reviewed_by,
field_changed="confidence_level",
old_value=old_confidence,
new_value=new_confidence,
)
new_truth = evidence.truth_status.value if evidence.truth_status else None
if old_truth != new_truth:
log_audit_trail(
db, "evidence", evidence_id, evidence.title, "review",
performed_by=review.reviewed_by,
field_changed="truth_status",
old_value=old_truth,
new_value=new_truth,
)
db.commit()
db.refresh(evidence)
resp = _build_evidence_response(evidence)
if dsms_cid:
resp["dsms_cid"] = dsms_cid
return resp
@router.patch("/evidence/{evidence_id}/reject", response_model=EvidenceResponse)
async def reject_evidence(
evidence_id: str,
body: EvidenceRejectRequest,
db: Session = Depends(get_db),
):
"""Reject evidence (sets approval_status='rejected')."""
evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
if not evidence:
raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found")
evidence.approval_status = "rejected"
evidence.reviewed_by = body.reviewed_by
evidence.reviewed_at = datetime.utcnow()
db.commit()
log_audit_trail(
db, "evidence", evidence_id, evidence.title, "reject",
performed_by=body.reviewed_by,
change_summary=body.rejection_reason or "Evidence rejected",
)
db.commit()
db.refresh(evidence)
resp = _build_evidence_response(evidence)
if dsms_cid:
resp["dsms_cid"] = dsms_cid
return resp
# ============================================================================
# Audit Trail Query
# ============================================================================
@router.get("/audit-trail")
async def get_audit_trail(
entity_type: Optional[str] = Query(None),
entity_id: Optional[str] = Query(None),
action: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
):
"""Query audit trail entries for an entity."""
query = db.query(AuditTrailDB)
if entity_type:
query = query.filter(AuditTrailDB.entity_type == entity_type)
if entity_id:
query = query.filter(AuditTrailDB.entity_id == entity_id)
if action:
query = query.filter(AuditTrailDB.action == action)
records = query.order_by(AuditTrailDB.performed_at.desc()).limit(limit).all()
return {
"entries": [
{
"id": r.id,
"entity_type": r.entity_type,
"entity_id": r.entity_id,
"entity_name": r.entity_name,
"action": r.action,
"field_changed": r.field_changed,
"old_value": r.old_value,
"new_value": r.new_value,
"change_summary": r.change_summary,
"performed_by": r.performed_by,
"performed_at": r.performed_at.isoformat() if r.performed_at else None,
"checksum": r.checksum,
}
for r in records
],
"total": len(records),
}