# mypy: disable-error-code="arg-type" """ FastAPI routes for Evidence management. Endpoints: - /evidence: Evidence listing and creation - /evidence/upload: Evidence file upload - /evidence/collect: CI/CD evidence collection - /evidence/ci-status: CI/CD evidence status Phase 1 Step 4 refactor: handlers delegate to EvidenceService. Pure helpers (`_parse_ci_evidence`, `_extract_findings_detail`) and the SOURCE_CONTROL_MAP constant are re-exported from this module so the existing tests (tests/test_evidence_routes.py) continue to import them from the legacy path. """ import logging import os import json import hashlib import uuid as uuid_module from datetime import datetime, timedelta from typing import Any, Optional from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from sqlalchemy.orm import Session from classroom_engine.database import get_db from ..db import ( ControlRepository, EvidenceRepository, EvidenceStatusEnum, EvidenceConfidenceEnum, EvidenceTruthStatusEnum, ) from ..db.models import EvidenceDB, AuditTrailDB from ..services.auto_risk_updater import AutoRiskUpdater from ..services.evidence_service import EvidenceService, _update_risks as _update_risks_impl from .schemas import ( EvidenceCreate, EvidenceResponse, EvidenceListResponse, EvidenceRejectRequest, ) from .audit_trail_utils import log_audit_trail from ._http_errors import translate_domain_errors logger = logging.getLogger(__name__) router = APIRouter(tags=["compliance-evidence"]) def get_evidence_service(db: Session = Depends(get_db)) -> EvidenceService: # Read repo + auto-updater classes from this module's namespace at call # time so test patches against compliance.api.evidence_routes.* propagate. return EvidenceService( db, evidence_repo_cls=EvidenceRepository, control_repo_cls=ControlRepository, auto_updater_cls=AutoRiskUpdater, ) # ============================================================================ # Anti-Fake-Evidence: Four-Eyes Domain Check # ============================================================================ FOUR_EYES_DOMAINS = {"gov", "priv"} def _requires_four_eyes(control_domain: str) -> bool: """Controls in governance/privacy domains require two independent reviewers.""" return control_domain in FOUR_EYES_DOMAINS # ============================================================================ # Anti-Fake-Evidence: Auto-Classification Helpers # ============================================================================ def _classify_confidence(source: Optional[str], evidence_type: Optional[str] = None, artifact_hash: Optional[str] = None) -> EvidenceConfidenceEnum: """Classify evidence confidence level based on source and metadata.""" if source == "ci_pipeline": return EvidenceConfidenceEnum.E3 if source == "api" and artifact_hash: return EvidenceConfidenceEnum.E3 if source == "api": return EvidenceConfidenceEnum.E3 if source in ("manual", "upload"): return EvidenceConfidenceEnum.E1 if source == "generated": return EvidenceConfidenceEnum.E0 # Default for unknown sources return EvidenceConfidenceEnum.E1 def _classify_truth_status(source: Optional[str]) -> EvidenceTruthStatusEnum: """Classify evidence truth status based on source.""" if source == "ci_pipeline": return EvidenceTruthStatusEnum.OBSERVED if source in ("manual", "upload"): return EvidenceTruthStatusEnum.UPLOADED if source == "generated": return EvidenceTruthStatusEnum.GENERATED if source == "api": return EvidenceTruthStatusEnum.OBSERVED return EvidenceTruthStatusEnum.UPLOADED def _build_evidence_response(e: EvidenceDB) -> EvidenceResponse: """Build an EvidenceResponse from an EvidenceDB, including anti-fake fields.""" return EvidenceResponse( id=e.id, control_id=e.control_id, evidence_type=e.evidence_type, title=e.title, description=e.description, artifact_path=e.artifact_path, artifact_url=e.artifact_url, artifact_hash=e.artifact_hash, file_size_bytes=e.file_size_bytes, mime_type=e.mime_type, valid_from=e.valid_from, valid_until=e.valid_until, status=e.status.value if e.status else None, source=e.source, ci_job_id=e.ci_job_id, uploaded_by=e.uploaded_by, collected_at=e.collected_at, created_at=e.created_at, confidence_level=e.confidence_level.value if e.confidence_level else None, truth_status=e.truth_status.value if e.truth_status else None, generation_mode=e.generation_mode, may_be_used_as_evidence=e.may_be_used_as_evidence, reviewed_by=e.reviewed_by, reviewed_at=e.reviewed_at, approval_status=e.approval_status, first_reviewer=e.first_reviewer, first_reviewed_at=e.first_reviewed_at, second_reviewer=e.second_reviewer, second_reviewed_at=e.second_reviewed_at, requires_four_eyes=e.requires_four_eyes, ) # ============================================================================ # Evidence # ============================================================================ @router.get("/evidence", response_model=EvidenceListResponse) async def list_evidence( control_id: Optional[str] = None, evidence_type: Optional[str] = None, status: Optional[str] = None, page: Optional[int] = Query(None, ge=1, description="Page number (1-based)"), limit: Optional[int] = Query(None, ge=1, le=500, description="Items per page"), db: Session = Depends(get_db), service: EvidenceService = Depends(get_evidence_service), ) -> EvidenceListResponse: """List evidence with optional filters and pagination.""" repo = EvidenceRepository(db) if control_id: # First get the control UUID ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if not control: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") evidence = repo.get_by_control(control.id) else: evidence = repo.get_all() if evidence_type: evidence = [e for e in evidence if e.evidence_type == evidence_type] if status: try: status_enum = EvidenceStatusEnum(status) evidence = [e for e in evidence if e.status == status_enum] except ValueError: pass total = len(evidence) # Apply pagination if requested if page is not None and limit is not None: offset = (page - 1) * limit evidence = evidence[offset:offset + limit] results = [_build_evidence_response(e) for e in evidence] return EvidenceListResponse(evidence=results, total=total) @router.post("/evidence", response_model=EvidenceResponse) async def create_evidence( evidence_data: EvidenceCreate, db: Session = Depends(get_db), service: EvidenceService = Depends(get_evidence_service), ) -> EvidenceResponse: """Create new evidence record.""" dsms_cid = None repo = EvidenceRepository(db) # Get control UUID ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(evidence_data.control_id) if not control: raise HTTPException(status_code=404, detail=f"Control {evidence_data.control_id} not found") source = evidence_data.source or "api" confidence = _classify_confidence(source, evidence_data.evidence_type) truth = _classify_truth_status(source) # Allow explicit override from request if evidence_data.confidence_level: try: confidence = EvidenceConfidenceEnum(evidence_data.confidence_level) except ValueError: pass if evidence_data.truth_status: try: truth = EvidenceTruthStatusEnum(evidence_data.truth_status) except ValueError: pass evidence = repo.create( control_id=control.id, evidence_type=evidence_data.evidence_type, title=evidence_data.title, description=evidence_data.description, artifact_url=evidence_data.artifact_url, valid_from=evidence_data.valid_from, valid_until=evidence_data.valid_until, source=source, ci_job_id=evidence_data.ci_job_id, ) # Set anti-fake-evidence fields evidence.confidence_level = confidence evidence.truth_status = truth # Generated evidence should not be used as evidence by default if truth == EvidenceTruthStatusEnum.GENERATED: evidence.may_be_used_as_evidence = False # Four-Eyes: check if the linked control's domain requires it control_domain = control.domain.value if control.domain else "" if _requires_four_eyes(control_domain): evidence.requires_four_eyes = True evidence.approval_status = "pending_first" db.commit() # Audit trail log_audit_trail( db, "evidence", evidence.id, evidence.title, "create", performed_by=evidence_data.source or "api", change_summary=f"Evidence created with confidence={confidence.value}, truth={truth.value}", ) db.commit() resp = _build_evidence_response(evidence) if dsms_cid: resp["dsms_cid"] = dsms_cid return resp @router.delete("/evidence/{evidence_id}") async def delete_evidence( evidence_id: str, db: Session = Depends(get_db), service: EvidenceService = Depends(get_evidence_service), ) -> dict[str, Any]: """Delete an evidence record.""" with translate_domain_errors(): return service.delete_evidence(evidence_id) # ============================================================================ # Upload # ============================================================================ @router.post("/evidence/upload") async def upload_evidence( control_id: str = Query(...), evidence_type: str = Query(...), title: str = Query(...), file: UploadFile = File(...), description: Optional[str] = Query(None), db: Session = Depends(get_db), service: EvidenceService = Depends(get_evidence_service), ) -> EvidenceResponse: """Upload evidence file.""" # Get control UUID ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if not control: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") # Create upload directory upload_dir = f"/tmp/compliance_evidence/{control_id}" os.makedirs(upload_dir, exist_ok=True) # Save file file_path = os.path.join(upload_dir, file.filename) content = await file.read() with open(file_path, "wb") as f: f.write(content) # Calculate hash file_hash = hashlib.sha256(content).hexdigest() # Create evidence record repo = EvidenceRepository(db) evidence = repo.create( control_id=control.id, evidence_type=evidence_type, title=title, description=description, artifact_path=file_path, artifact_hash=file_hash, file_size_bytes=len(content), mime_type=file.content_type, source="upload", ) # Upload evidence → E1 + uploaded evidence.confidence_level = EvidenceConfidenceEnum.E1 evidence.truth_status = EvidenceTruthStatusEnum.UPLOADED # Archive to DSMS (best-effort, non-blocking) dsms_cid = None try: from compliance.services.dsms_client import archive_to_dsms dsms_result = await archive_to_dsms( content=content, filename=file.filename, document_type="evidence", document_id=str(evidence.id), version="1", tenant_id=control_id, ) dsms_cid = dsms_result.get("cid") if dsms_cid: evidence.confidence_level = EvidenceConfidenceEnum.E2 from compliance.api.audit_trail_utils import log_audit_trail log_audit_trail(db, "evidence", str(evidence.id), title, "archive", "system", field_changed="dsms_cid", new_value=dsms_cid, change_summary=f"Evidence archived to DSMS: {dsms_cid}") except Exception: pass # DSMS unavailable # Four-Eyes: check if the linked control's domain requires it control_domain = control.domain.value if control.domain else "" if _requires_four_eyes(control_domain): evidence.requires_four_eyes = True evidence.approval_status = "pending_first" db.commit() resp = _build_evidence_response(evidence) if dsms_cid: resp["dsms_cid"] = dsms_cid return resp # ============================================================================ # CI/CD Evidence Collection — helpers # ============================================================================ # Map CI source names to the corresponding control IDs SOURCE_CONTROL_MAP = { "sast": "SDLC-001", "dependency_scan": "SDLC-002", "secret_scan": "SDLC-003", "code_review": "SDLC-004", "sbom": "SDLC-005", "container_scan": "SDLC-006", "test_results": "AUD-001", } def _parse_ci_evidence(data: dict) -> dict: """ Parse and validate incoming CI evidence data. Returns a dict with: - report_json: str (serialised JSON) - report_hash: str (SHA-256 hex digest) - evidence_status: str ("valid" or "failed") - findings_count: int - critical_findings: int """ report_json = json.dumps(data) if data else "{}" report_hash = hashlib.sha256(report_json.encode()).hexdigest() findings_count = 0 critical_findings = 0 if data and isinstance(data, dict): # Semgrep format if "results" in data: findings_count = len(data.get("results", [])) critical_findings = len([ r for r in data.get("results", []) if r.get("extra", {}).get("severity", "").upper() in ["CRITICAL", "HIGH"] ]) # Trivy format elif "Results" in data: for result in data.get("Results", []): vulns = result.get("Vulnerabilities", []) findings_count += len(vulns) critical_findings += len([ v for v in vulns if v.get("Severity", "").upper() in ["CRITICAL", "HIGH"] ]) # Generic findings array elif "findings" in data: findings_count = len(data.get("findings", [])) # SBOM format - just count components elif "components" in data: findings_count = len(data.get("components", [])) evidence_status = "failed" if critical_findings > 0 else "valid" return { "report_json": report_json, "report_hash": report_hash, "evidence_status": evidence_status, "findings_count": findings_count, "critical_findings": critical_findings, } def _store_evidence( db: Session, *, control_db_id: str, source: str, parsed: dict, ci_job_id: str, ci_job_url: str, report_data: dict, ) -> EvidenceDB: """ Persist a CI evidence item to the database and write the report file. Returns the created EvidenceDB instance (already committed). """ findings_count = parsed["findings_count"] critical_findings = parsed["critical_findings"] # Build title and description title = f"{source.upper()} Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}" description = "Automatically collected from CI/CD pipeline" if findings_count > 0: description += f"\n- Total findings: {findings_count}" if critical_findings > 0: description += f"\n- Critical/High findings: {critical_findings}" if ci_job_id: description += f"\n- CI Job ID: {ci_job_id}" if ci_job_url: description += f"\n- CI Job URL: {ci_job_url}" # Store report file upload_dir = f"/tmp/compliance_evidence/ci/{source}" os.makedirs(upload_dir, exist_ok=True) file_name = f"{source}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{parsed['report_hash'][:8]}.json" file_path = os.path.join(upload_dir, file_name) with open(file_path, "w") as f: json.dump(report_data or {}, f, indent=2) # Create evidence record with anti-fake-evidence classification evidence = EvidenceDB( id=str(uuid_module.uuid4()), control_id=control_db_id, evidence_type=f"ci_{source}", title=title, description=description, artifact_path=file_path, artifact_hash=parsed["report_hash"], file_size_bytes=len(parsed["report_json"]), mime_type="application/json", source="ci_pipeline", ci_job_id=ci_job_id, valid_from=datetime.utcnow(), valid_until=datetime.utcnow() + timedelta(days=90), status=EvidenceStatusEnum(parsed["evidence_status"]), # CI pipeline evidence → E3 observed (system-observed, hash-verified) confidence_level=EvidenceConfidenceEnum.E3, truth_status=EvidenceTruthStatusEnum.OBSERVED, may_be_used_as_evidence=True, ) db.add(evidence) db.commit() db.refresh(evidence) return evidence def _extract_findings_detail(report_data: dict) -> dict: """ Extract severity-bucketed finding counts from report data. Returns dict with keys: critical, high, medium, low. """ findings_detail = { "critical": 0, "high": 0, "medium": 0, "low": 0, } if not report_data: return findings_detail # Semgrep format if "results" in report_data: for r in report_data.get("results", []): severity = r.get("extra", {}).get("severity", "").upper() if severity == "CRITICAL": findings_detail["critical"] += 1 elif severity == "HIGH": findings_detail["high"] += 1 elif severity == "MEDIUM": findings_detail["medium"] += 1 elif severity in ["LOW", "INFO"]: findings_detail["low"] += 1 # Trivy format elif "Results" in report_data: for result in report_data.get("Results", []): for v in result.get("Vulnerabilities", []): severity = v.get("Severity", "").upper() if severity == "CRITICAL": findings_detail["critical"] += 1 elif severity == "HIGH": findings_detail["high"] += 1 elif severity == "MEDIUM": findings_detail["medium"] += 1 elif severity == "LOW": findings_detail["low"] += 1 # Generic findings with severity elif "findings" in report_data: for f in report_data.get("findings", []): severity = f.get("severity", "").upper() if severity == "CRITICAL": findings_detail["critical"] += 1 elif severity == "HIGH": findings_detail["high"] += 1 elif severity == "MEDIUM": findings_detail["medium"] += 1 else: findings_detail["low"] += 1 return findings_detail def _update_risks(db: Session, *, source: str, control_id: str, ci_job_id: str, report_data: dict): """ Update risk status based on new evidence. Uses AutoRiskUpdater to update Control status and linked Risks based on severity-bucketed findings. Returns the update result or None on error. """ findings_detail = _extract_findings_detail(report_data) try: auto_updater = AutoRiskUpdater(db) risk_update_result = auto_updater.process_evidence_collect_request( tool=source, control_id=control_id, evidence_type=f"ci_{source}", timestamp=datetime.utcnow().isoformat(), commit_sha=report_data.get("commit_sha", "unknown") if report_data else "unknown", ci_job_id=ci_job_id, findings=findings_detail, ) return risk_update_result except Exception: return None # ============================================================================ # CI/CD Evidence Collection # ============================================================================ def _update_risks( db: Session, *, source: str, control_id: str, ci_job_id: Optional[str], report_data: Optional[dict[str, Any]], ) -> Any: """Thin wrapper so test patches against this module's _update_risks take effect.""" return _update_risks_impl( db, source=source, control_id=control_id, ci_job_id=ci_job_id, report_data=report_data, auto_updater_cls=AutoRiskUpdater, ) @router.post("/evidence/collect") async def collect_ci_evidence( source: str = Query( ..., description="Evidence source: sast, dependency_scan, sbom, container_scan, test_results", ), ci_job_id: Optional[str] = Query(None, description="CI/CD Job ID for traceability"), ci_job_url: Optional[str] = Query(None, description="URL to CI/CD job"), report_data: Optional[dict[str, Any]] = None, db: Session = Depends(get_db), ) -> dict[str, Any]: """ Collect evidence from CI/CD pipeline. Handler stays inline so tests can patch ``compliance.api.evidence_routes._store_evidence`` / ``compliance.api.evidence_routes._update_risks`` directly. """ if source not in SOURCE_CONTROL_MAP: raise HTTPException( status_code=400, detail=f"Unknown source '{source}'. Supported: {list(SOURCE_CONTROL_MAP.keys())}", ) control_id = SOURCE_CONTROL_MAP[source] ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if not control: raise HTTPException( status_code=404, detail=f"Control {control_id} not found. Please seed the database first.", ) parsed = _parse_ci_evidence(report_data or {}) evidence = _store_evidence( db, control_db_id=control.id, source=source, parsed=parsed, ci_job_id=ci_job_id, ci_job_url=ci_job_url, report_data=report_data, ) risk_update_result = _update_risks( db, source=source, control_id=control_id, ci_job_id=ci_job_id, report_data=report_data, ) return { "success": True, "evidence_id": evidence.id, "control_id": control_id, "source": source, "status": parsed["evidence_status"], "findings_count": parsed["findings_count"], "critical_findings": parsed["critical_findings"], "artifact_path": evidence.artifact_path, "message": f"Evidence collected successfully for control {control_id}", "auto_risk_update": ( { "enabled": True, "control_updated": risk_update_result.control_updated, "old_status": risk_update_result.old_status, "new_status": risk_update_result.new_status, "risks_affected": risk_update_result.risks_affected, "alerts_generated": risk_update_result.alerts_generated, } if risk_update_result else {"enabled": False, "error": "Auto-update skipped"} ), } @router.get("/evidence/ci-status") async def get_ci_evidence_status( control_id: Optional[str] = Query(None, description="Filter by control ID"), days: int = Query(30, description="Look back N days"), db: Session = Depends(get_db), service: EvidenceService = Depends(get_evidence_service), ) -> dict[str, Any]: """Get CI/CD evidence collection status overview.""" with translate_domain_errors(): return service.ci_status(control_id, days) # (Alte CI-Status-Implementierung entfernt — unerreichbarer Code nach `return # service.ci_status(...)`; durch den Service ersetzt, `query` war nie initialisiert.) # ============================================================================ # Evidence Review (Anti-Fake-Evidence) # ============================================================================ from pydantic import BaseModel as _BaseModel class _EvidenceReviewRequest(_BaseModel): confidence_level: Optional[str] = None truth_status: Optional[str] = None reviewed_by: str @router.patch("/evidence/{evidence_id}/review", response_model=EvidenceResponse) async def review_evidence( evidence_id: str, review: _EvidenceReviewRequest, db: Session = Depends(get_db), ): """ Review evidence: upgrade confidence level and/or change truth status. For Four-Eyes evidence, the first reviewer sets first_reviewer and approval_status='first_approved'. A second (different) reviewer then sets second_reviewer and approval_status='approved'. """ dsms_cid = None evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() if not evidence: raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found") old_confidence = evidence.confidence_level.value if evidence.confidence_level else None old_truth = evidence.truth_status.value if evidence.truth_status else None if review.confidence_level: try: evidence.confidence_level = EvidenceConfidenceEnum(review.confidence_level) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid confidence_level: {review.confidence_level}") if review.truth_status: try: evidence.truth_status = EvidenceTruthStatusEnum(review.truth_status) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid truth_status: {review.truth_status}") # Four-Eyes branching if evidence.requires_four_eyes: status = evidence.approval_status or "none" if status in ("none", "pending_first"): evidence.first_reviewer = review.reviewed_by evidence.first_reviewed_at = datetime.utcnow() evidence.approval_status = "first_approved" elif status == "first_approved": if review.reviewed_by == evidence.first_reviewer: raise HTTPException( status_code=400, detail="Four-Eyes: second reviewer must be different from first reviewer", ) evidence.second_reviewer = review.reviewed_by evidence.second_reviewed_at = datetime.utcnow() evidence.approval_status = "approved" elif status == "approved": raise HTTPException(status_code=400, detail="Evidence already approved") elif status == "rejected": raise HTTPException(status_code=400, detail="Evidence was rejected — create new evidence instead") evidence.reviewed_by = review.reviewed_by evidence.reviewed_at = datetime.utcnow() db.commit() # Audit trail new_confidence = evidence.confidence_level.value if evidence.confidence_level else None if old_confidence != new_confidence: log_audit_trail( db, "evidence", evidence_id, evidence.title, "review", performed_by=review.reviewed_by, field_changed="confidence_level", old_value=old_confidence, new_value=new_confidence, ) new_truth = evidence.truth_status.value if evidence.truth_status else None if old_truth != new_truth: log_audit_trail( db, "evidence", evidence_id, evidence.title, "review", performed_by=review.reviewed_by, field_changed="truth_status", old_value=old_truth, new_value=new_truth, ) db.commit() db.refresh(evidence) resp = _build_evidence_response(evidence) if dsms_cid: resp["dsms_cid"] = dsms_cid return resp @router.patch("/evidence/{evidence_id}/reject", response_model=EvidenceResponse) async def reject_evidence( evidence_id: str, body: EvidenceRejectRequest, db: Session = Depends(get_db), ): """Reject evidence (sets approval_status='rejected').""" dsms_cid = None evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() if not evidence: raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found") evidence.approval_status = "rejected" evidence.reviewed_by = body.reviewed_by evidence.reviewed_at = datetime.utcnow() db.commit() log_audit_trail( db, "evidence", evidence_id, evidence.title, "reject", performed_by=body.reviewed_by, change_summary=body.rejection_reason or "Evidence rejected", ) db.commit() db.refresh(evidence) resp = _build_evidence_response(evidence) if dsms_cid: resp["dsms_cid"] = dsms_cid return resp # ============================================================================ # Audit Trail Query # ============================================================================ @router.get("/audit-trail") async def get_audit_trail( entity_type: Optional[str] = Query(None), entity_id: Optional[str] = Query(None), action: Optional[str] = Query(None), limit: int = Query(50, ge=1, le=200), db: Session = Depends(get_db), ): """Query audit trail entries for an entity.""" query = db.query(AuditTrailDB) if entity_type: query = query.filter(AuditTrailDB.entity_type == entity_type) if entity_id: query = query.filter(AuditTrailDB.entity_id == entity_id) if action: query = query.filter(AuditTrailDB.action == action) records = query.order_by(AuditTrailDB.performed_at.desc()).limit(limit).all() return { "entries": [ { "id": r.id, "entity_type": r.entity_type, "entity_id": r.entity_id, "entity_name": r.entity_name, "action": r.action, "field_changed": r.field_changed, "old_value": r.old_value, "new_value": r.new_value, "change_summary": r.change_summary, "performed_by": r.performed_by, "performed_at": r.performed_at.isoformat() if r.performed_at else None, "checksum": r.checksum, } for r in records ], "total": len(records), }