# mypy: disable-error-code="arg-type" """ FastAPI routes for Evidence management. Endpoints: - /evidence: Evidence listing and creation - /evidence/upload: Evidence file upload - /evidence/collect: CI/CD evidence collection - /evidence/ci-status: CI/CD evidence status Phase 1 Step 4 refactor: handlers delegate to EvidenceService. Pure helpers (`_parse_ci_evidence`, `_extract_findings_detail`) and the SOURCE_CONTROL_MAP constant are re-exported from this module so the existing tests (tests/test_evidence_routes.py) continue to import them from the legacy path. """ import logging from typing import Any, Optional from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from sqlalchemy.orm import Session from classroom_engine.database import get_db from ..db import ( ControlRepository, EvidenceRepository, EvidenceStatusEnum, EvidenceConfidenceEnum, EvidenceTruthStatusEnum, ) from ..db.models import EvidenceDB, ControlDB, AuditTrailDB from ..services.auto_risk_updater import AutoRiskUpdater from .schemas import ( EvidenceCreate, EvidenceResponse, EvidenceListResponse, EvidenceRejectRequest, ) from .audit_trail_utils import log_audit_trail logger = logging.getLogger(__name__) router = APIRouter(tags=["compliance-evidence"]) def get_evidence_service(db: Session = Depends(get_db)) -> EvidenceService: # Read repo + auto-updater classes from this module's namespace at call # time so test patches against compliance.api.evidence_routes.* propagate. return EvidenceService( db, evidence_repo_cls=EvidenceRepository, control_repo_cls=ControlRepository, auto_updater_cls=AutoRiskUpdater, ) # ============================================================================ # Anti-Fake-Evidence: Four-Eyes Domain Check # ============================================================================ FOUR_EYES_DOMAINS = {"gov", "priv"} def _requires_four_eyes(control_domain: str) -> bool: """Controls in governance/privacy domains require two independent reviewers.""" return control_domain in FOUR_EYES_DOMAINS # ============================================================================ # Anti-Fake-Evidence: Auto-Classification Helpers # ============================================================================ def _classify_confidence(source: Optional[str], evidence_type: Optional[str] = None, artifact_hash: Optional[str] = None) -> EvidenceConfidenceEnum: """Classify evidence confidence level based on source and metadata.""" if source == "ci_pipeline": return EvidenceConfidenceEnum.E3 if source == "api" and artifact_hash: return EvidenceConfidenceEnum.E3 if source == "api": return EvidenceConfidenceEnum.E3 if source in ("manual", "upload"): return EvidenceConfidenceEnum.E1 if source == "generated": return EvidenceConfidenceEnum.E0 # Default for unknown sources return EvidenceConfidenceEnum.E1 def _classify_truth_status(source: Optional[str]) -> EvidenceTruthStatusEnum: """Classify evidence truth status based on source.""" if source == "ci_pipeline": return EvidenceTruthStatusEnum.OBSERVED if source in ("manual", "upload"): return EvidenceTruthStatusEnum.UPLOADED if source == "generated": return EvidenceTruthStatusEnum.GENERATED if source == "api": return EvidenceTruthStatusEnum.OBSERVED return EvidenceTruthStatusEnum.UPLOADED def _build_evidence_response(e: EvidenceDB) -> EvidenceResponse: """Build an EvidenceResponse from an EvidenceDB, including anti-fake fields.""" return EvidenceResponse( id=e.id, control_id=e.control_id, evidence_type=e.evidence_type, title=e.title, description=e.description, artifact_path=e.artifact_path, artifact_url=e.artifact_url, artifact_hash=e.artifact_hash, file_size_bytes=e.file_size_bytes, mime_type=e.mime_type, valid_from=e.valid_from, valid_until=e.valid_until, status=e.status.value if e.status else None, source=e.source, ci_job_id=e.ci_job_id, uploaded_by=e.uploaded_by, collected_at=e.collected_at, created_at=e.created_at, confidence_level=e.confidence_level.value if e.confidence_level else None, truth_status=e.truth_status.value if e.truth_status else None, generation_mode=e.generation_mode, may_be_used_as_evidence=e.may_be_used_as_evidence, reviewed_by=e.reviewed_by, reviewed_at=e.reviewed_at, approval_status=e.approval_status, first_reviewer=e.first_reviewer, first_reviewed_at=e.first_reviewed_at, second_reviewer=e.second_reviewer, second_reviewed_at=e.second_reviewed_at, requires_four_eyes=e.requires_four_eyes, ) # ============================================================================ # Evidence # ============================================================================ @router.get("/evidence", response_model=EvidenceListResponse) async def list_evidence( control_id: Optional[str] = None, evidence_type: Optional[str] = None, status: Optional[str] = None, page: Optional[int] = Query(None, ge=1, description="Page number (1-based)"), limit: Optional[int] = Query(None, ge=1, le=500, description="Items per page"), service: EvidenceService = Depends(get_evidence_service), ) -> EvidenceListResponse: """List evidence with optional filters and pagination.""" repo = EvidenceRepository(db) if control_id: # First get the control UUID ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if not control: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") evidence = repo.get_by_control(control.id) else: evidence = repo.get_all() if evidence_type: evidence = [e for e in evidence if e.evidence_type == evidence_type] if status: try: status_enum = EvidenceStatusEnum(status) evidence = [e for e in evidence if e.status == status_enum] except ValueError: pass total = len(evidence) # Apply pagination if requested if page is not None and limit is not None: offset = (page - 1) * limit evidence = evidence[offset:offset + limit] results = [_build_evidence_response(e) for e in evidence] return EvidenceListResponse(evidence=results, total=total) @router.post("/evidence", response_model=EvidenceResponse) async def create_evidence( evidence_data: EvidenceCreate, service: EvidenceService = Depends(get_evidence_service), ) -> EvidenceResponse: """Create new evidence record.""" repo = EvidenceRepository(db) # Get control UUID ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(evidence_data.control_id) if not control: raise HTTPException(status_code=404, detail=f"Control {evidence_data.control_id} not found") source = evidence_data.source or "api" confidence = _classify_confidence(source, evidence_data.evidence_type) truth = _classify_truth_status(source) # Allow explicit override from request if evidence_data.confidence_level: try: confidence = EvidenceConfidenceEnum(evidence_data.confidence_level) except ValueError: pass if evidence_data.truth_status: try: truth = EvidenceTruthStatusEnum(evidence_data.truth_status) except ValueError: pass evidence = repo.create( control_id=control.id, evidence_type=evidence_data.evidence_type, title=evidence_data.title, description=evidence_data.description, artifact_url=evidence_data.artifact_url, valid_from=evidence_data.valid_from, valid_until=evidence_data.valid_until, source=source, ci_job_id=evidence_data.ci_job_id, ) # Set anti-fake-evidence fields evidence.confidence_level = confidence evidence.truth_status = truth # Generated evidence should not be used as evidence by default if truth == EvidenceTruthStatusEnum.GENERATED: evidence.may_be_used_as_evidence = False # Four-Eyes: check if the linked control's domain requires it control_domain = control.domain.value if control.domain else "" if _requires_four_eyes(control_domain): evidence.requires_four_eyes = True evidence.approval_status = "pending_first" db.commit() # Audit trail log_audit_trail( db, "evidence", evidence.id, evidence.title, "create", performed_by=evidence_data.source or "api", change_summary=f"Evidence created with confidence={confidence.value}, truth={truth.value}", ) db.commit() return _build_evidence_response(evidence) @router.delete("/evidence/{evidence_id}") async def delete_evidence( evidence_id: str, service: EvidenceService = Depends(get_evidence_service), ) -> dict[str, Any]: """Delete an evidence record.""" with translate_domain_errors(): return service.delete_evidence(evidence_id) # ============================================================================ # Upload # ============================================================================ @router.post("/evidence/upload") async def upload_evidence( control_id: str = Query(...), evidence_type: str = Query(...), title: str = Query(...), file: UploadFile = File(...), description: Optional[str] = Query(None), service: EvidenceService = Depends(get_evidence_service), ) -> EvidenceResponse: """Upload evidence file.""" # Get control UUID ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if not control: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") # Create upload directory upload_dir = f"/tmp/compliance_evidence/{control_id}" os.makedirs(upload_dir, exist_ok=True) # Save file file_path = os.path.join(upload_dir, file.filename) content = await file.read() with open(file_path, "wb") as f: f.write(content) # Calculate hash file_hash = hashlib.sha256(content).hexdigest() # Create evidence record repo = EvidenceRepository(db) evidence = repo.create( control_id=control.id, evidence_type=evidence_type, title=title, description=description, artifact_path=file_path, artifact_hash=file_hash, file_size_bytes=len(content), mime_type=file.content_type, source="upload", ) # Upload evidence → E1 + uploaded evidence.confidence_level = EvidenceConfidenceEnum.E1 evidence.truth_status = EvidenceTruthStatusEnum.UPLOADED # Four-Eyes: check if the linked control's domain requires it control_domain = control.domain.value if control.domain else "" if _requires_four_eyes(control_domain): evidence.requires_four_eyes = True evidence.approval_status = "pending_first" db.commit() return _build_evidence_response(evidence) # ============================================================================ # CI/CD Evidence Collection — helpers # ============================================================================ # Map CI source names to the corresponding control IDs SOURCE_CONTROL_MAP = { "sast": "SDLC-001", "dependency_scan": "SDLC-002", "secret_scan": "SDLC-003", "code_review": "SDLC-004", "sbom": "SDLC-005", "container_scan": "SDLC-006", "test_results": "AUD-001", } def _parse_ci_evidence(data: dict) -> dict: """ Parse and validate incoming CI evidence data. Returns a dict with: - report_json: str (serialised JSON) - report_hash: str (SHA-256 hex digest) - evidence_status: str ("valid" or "failed") - findings_count: int - critical_findings: int """ report_json = json.dumps(data) if data else "{}" report_hash = hashlib.sha256(report_json.encode()).hexdigest() findings_count = 0 critical_findings = 0 if data and isinstance(data, dict): # Semgrep format if "results" in data: findings_count = len(data.get("results", [])) critical_findings = len([ r for r in data.get("results", []) if r.get("extra", {}).get("severity", "").upper() in ["CRITICAL", "HIGH"] ]) # Trivy format elif "Results" in data: for result in data.get("Results", []): vulns = result.get("Vulnerabilities", []) findings_count += len(vulns) critical_findings += len([ v for v in vulns if v.get("Severity", "").upper() in ["CRITICAL", "HIGH"] ]) # Generic findings array elif "findings" in data: findings_count = len(data.get("findings", [])) # SBOM format - just count components elif "components" in data: findings_count = len(data.get("components", [])) evidence_status = "failed" if critical_findings > 0 else "valid" return { "report_json": report_json, "report_hash": report_hash, "evidence_status": evidence_status, "findings_count": findings_count, "critical_findings": critical_findings, } def _store_evidence( db: Session, *, control_db_id: str, source: str, parsed: dict, ci_job_id: str, ci_job_url: str, report_data: dict, ) -> EvidenceDB: """ Persist a CI evidence item to the database and write the report file. Returns the created EvidenceDB instance (already committed). """ findings_count = parsed["findings_count"] critical_findings = parsed["critical_findings"] # Build title and description title = f"{source.upper()} Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}" description = "Automatically collected from CI/CD pipeline" if findings_count > 0: description += f"\n- Total findings: {findings_count}" if critical_findings > 0: description += f"\n- Critical/High findings: {critical_findings}" if ci_job_id: description += f"\n- CI Job ID: {ci_job_id}" if ci_job_url: description += f"\n- CI Job URL: {ci_job_url}" # Store report file upload_dir = f"/tmp/compliance_evidence/ci/{source}" os.makedirs(upload_dir, exist_ok=True) file_name = f"{source}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{parsed['report_hash'][:8]}.json" file_path = os.path.join(upload_dir, file_name) with open(file_path, "w") as f: json.dump(report_data or {}, f, indent=2) # Create evidence record with anti-fake-evidence classification evidence = EvidenceDB( id=str(uuid_module.uuid4()), control_id=control_db_id, evidence_type=f"ci_{source}", title=title, description=description, artifact_path=file_path, artifact_hash=parsed["report_hash"], file_size_bytes=len(parsed["report_json"]), mime_type="application/json", source="ci_pipeline", ci_job_id=ci_job_id, valid_from=datetime.utcnow(), valid_until=datetime.utcnow() + timedelta(days=90), status=EvidenceStatusEnum(parsed["evidence_status"]), # CI pipeline evidence → E3 observed (system-observed, hash-verified) confidence_level=EvidenceConfidenceEnum.E3, truth_status=EvidenceTruthStatusEnum.OBSERVED, may_be_used_as_evidence=True, ) db.add(evidence) db.commit() db.refresh(evidence) return evidence def _extract_findings_detail(report_data: dict) -> dict: """ Extract severity-bucketed finding counts from report data. Returns dict with keys: critical, high, medium, low. """ findings_detail = { "critical": 0, "high": 0, "medium": 0, "low": 0, } if not report_data: return findings_detail # Semgrep format if "results" in report_data: for r in report_data.get("results", []): severity = r.get("extra", {}).get("severity", "").upper() if severity == "CRITICAL": findings_detail["critical"] += 1 elif severity == "HIGH": findings_detail["high"] += 1 elif severity == "MEDIUM": findings_detail["medium"] += 1 elif severity in ["LOW", "INFO"]: findings_detail["low"] += 1 # Trivy format elif "Results" in report_data: for result in report_data.get("Results", []): for v in result.get("Vulnerabilities", []): severity = v.get("Severity", "").upper() if severity == "CRITICAL": findings_detail["critical"] += 1 elif severity == "HIGH": findings_detail["high"] += 1 elif severity == "MEDIUM": findings_detail["medium"] += 1 elif severity == "LOW": findings_detail["low"] += 1 # Generic findings with severity elif "findings" in report_data: for f in report_data.get("findings", []): severity = f.get("severity", "").upper() if severity == "CRITICAL": findings_detail["critical"] += 1 elif severity == "HIGH": findings_detail["high"] += 1 elif severity == "MEDIUM": findings_detail["medium"] += 1 else: findings_detail["low"] += 1 return findings_detail def _update_risks(db: Session, *, source: str, control_id: str, ci_job_id: str, report_data: dict): """ Update risk status based on new evidence. Uses AutoRiskUpdater to update Control status and linked Risks based on severity-bucketed findings. Returns the update result or None on error. """ findings_detail = _extract_findings_detail(report_data) try: auto_updater = AutoRiskUpdater(db) risk_update_result = auto_updater.process_evidence_collect_request( tool=source, control_id=control_id, evidence_type=f"ci_{source}", timestamp=datetime.utcnow().isoformat(), commit_sha=report_data.get("commit_sha", "unknown") if report_data else "unknown", ci_job_id=ci_job_id, findings=findings_detail, ) # ============================================================================ # CI/CD Evidence Collection # ============================================================================ def _update_risks( db: Session, *, source: str, control_id: str, ci_job_id: Optional[str], report_data: Optional[dict[str, Any]], ) -> Any: """Thin wrapper so test patches against this module's _update_risks take effect.""" return _update_risks_impl( db, source=source, control_id=control_id, ci_job_id=ci_job_id, report_data=report_data, auto_updater_cls=AutoRiskUpdater, ) @router.post("/evidence/collect") async def collect_ci_evidence( source: str = Query( ..., description="Evidence source: sast, dependency_scan, sbom, container_scan, test_results", ), ci_job_id: Optional[str] = Query(None, description="CI/CD Job ID for traceability"), ci_job_url: Optional[str] = Query(None, description="URL to CI/CD job"), report_data: Optional[dict[str, Any]] = None, db: Session = Depends(get_db), ) -> dict[str, Any]: """ Collect evidence from CI/CD pipeline. Handler stays inline so tests can patch ``compliance.api.evidence_routes._store_evidence`` / ``compliance.api.evidence_routes._update_risks`` directly. """ if source not in SOURCE_CONTROL_MAP: raise HTTPException( status_code=400, detail=f"Unknown source '{source}'. Supported: {list(SOURCE_CONTROL_MAP.keys())}", ) control_id = SOURCE_CONTROL_MAP[source] ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if not control: raise HTTPException( status_code=404, detail=f"Control {control_id} not found. Please seed the database first.", ) parsed = _parse_ci_evidence(report_data or {}) evidence = _store_evidence( db, control_db_id=control.id, source=source, parsed=parsed, ci_job_id=ci_job_id, ci_job_url=ci_job_url, report_data=report_data, ) risk_update_result = _update_risks( db, source=source, control_id=control_id, ci_job_id=ci_job_id, report_data=report_data, ) return { "success": True, "evidence_id": evidence.id, "control_id": control_id, "source": source, "status": parsed["evidence_status"], "findings_count": parsed["findings_count"], "critical_findings": parsed["critical_findings"], "artifact_path": evidence.artifact_path, "message": f"Evidence collected successfully for control {control_id}", "auto_risk_update": ( { "enabled": True, "control_updated": risk_update_result.control_updated, "old_status": risk_update_result.old_status, "new_status": risk_update_result.new_status, "risks_affected": risk_update_result.risks_affected, "alerts_generated": risk_update_result.alerts_generated, } if risk_update_result else {"enabled": False, "error": "Auto-update skipped"} ), } @router.get("/evidence/ci-status") async def get_ci_evidence_status( control_id: Optional[str] = Query(None, description="Filter by control ID"), days: int = Query(30, description="Look back N days"), service: EvidenceService = Depends(get_evidence_service), ) -> dict[str, Any]: """Get CI/CD evidence collection status overview.""" with translate_domain_errors(): return service.ci_status(control_id, days) # ---------------------------------------------------------------------------- # Legacy re-exports for tests that import helpers directly. # ---------------------------------------------------------------------------- if control_id: ctrl_repo = ControlRepository(db) control = ctrl_repo.get_by_control_id(control_id) if control: query = query.filter(EvidenceDB.control_id == control.id) evidence_list = query.order_by(EvidenceDB.collected_at.desc()).limit(100).all() # Group by control and calculate stats control_stats = defaultdict(lambda: { "total": 0, "valid": 0, "failed": 0, "last_collected": None, "evidence": [], }) for e in evidence_list: # Get control_id string control = db.query(ControlDB).filter(ControlDB.id == e.control_id).first() ctrl_id = control.control_id if control else "unknown" stats = control_stats[ctrl_id] stats["total"] += 1 if e.status: if e.status.value == "valid": stats["valid"] += 1 elif e.status.value == "failed": stats["failed"] += 1 if not stats["last_collected"] or e.collected_at > stats["last_collected"]: stats["last_collected"] = e.collected_at # Add evidence summary stats["evidence"].append({ "id": e.id, "type": e.evidence_type, "status": e.status.value if e.status else None, "collected_at": e.collected_at.isoformat() if e.collected_at else None, "ci_job_id": e.ci_job_id, }) # Convert to list and sort result = [] for ctrl_id, stats in control_stats.items(): result.append({ "control_id": ctrl_id, "total_evidence": stats["total"], "valid_count": stats["valid"], "failed_count": stats["failed"], "last_collected": stats["last_collected"].isoformat() if stats["last_collected"] else None, "recent_evidence": stats["evidence"][:5], }) result.sort(key=lambda x: x["last_collected"] or "", reverse=True) return { "period_days": days, "total_evidence": len(evidence_list), "controls": result, } # ============================================================================ # Evidence Review (Anti-Fake-Evidence) # ============================================================================ from pydantic import BaseModel as _BaseModel class _EvidenceReviewRequest(_BaseModel): confidence_level: Optional[str] = None truth_status: Optional[str] = None reviewed_by: str @router.patch("/evidence/{evidence_id}/review", response_model=EvidenceResponse) async def review_evidence( evidence_id: str, review: _EvidenceReviewRequest, db: Session = Depends(get_db), ): """ Review evidence: upgrade confidence level and/or change truth status. For Four-Eyes evidence, the first reviewer sets first_reviewer and approval_status='first_approved'. A second (different) reviewer then sets second_reviewer and approval_status='approved'. """ evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() if not evidence: raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found") old_confidence = evidence.confidence_level.value if evidence.confidence_level else None old_truth = evidence.truth_status.value if evidence.truth_status else None if review.confidence_level: try: evidence.confidence_level = EvidenceConfidenceEnum(review.confidence_level) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid confidence_level: {review.confidence_level}") if review.truth_status: try: evidence.truth_status = EvidenceTruthStatusEnum(review.truth_status) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid truth_status: {review.truth_status}") # Four-Eyes branching if evidence.requires_four_eyes: status = evidence.approval_status or "none" if status in ("none", "pending_first"): evidence.first_reviewer = review.reviewed_by evidence.first_reviewed_at = datetime.utcnow() evidence.approval_status = "first_approved" elif status == "first_approved": if review.reviewed_by == evidence.first_reviewer: raise HTTPException( status_code=400, detail="Four-Eyes: second reviewer must be different from first reviewer", ) evidence.second_reviewer = review.reviewed_by evidence.second_reviewed_at = datetime.utcnow() evidence.approval_status = "approved" elif status == "approved": raise HTTPException(status_code=400, detail="Evidence already approved") elif status == "rejected": raise HTTPException(status_code=400, detail="Evidence was rejected — create new evidence instead") evidence.reviewed_by = review.reviewed_by evidence.reviewed_at = datetime.utcnow() db.commit() # Audit trail new_confidence = evidence.confidence_level.value if evidence.confidence_level else None if old_confidence != new_confidence: log_audit_trail( db, "evidence", evidence_id, evidence.title, "review", performed_by=review.reviewed_by, field_changed="confidence_level", old_value=old_confidence, new_value=new_confidence, ) new_truth = evidence.truth_status.value if evidence.truth_status else None if old_truth != new_truth: log_audit_trail( db, "evidence", evidence_id, evidence.title, "review", performed_by=review.reviewed_by, field_changed="truth_status", old_value=old_truth, new_value=new_truth, ) db.commit() db.refresh(evidence) return _build_evidence_response(evidence) @router.patch("/evidence/{evidence_id}/reject", response_model=EvidenceResponse) async def reject_evidence( evidence_id: str, body: EvidenceRejectRequest, db: Session = Depends(get_db), ): """Reject evidence (sets approval_status='rejected').""" evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() if not evidence: raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found") evidence.approval_status = "rejected" evidence.reviewed_by = body.reviewed_by evidence.reviewed_at = datetime.utcnow() db.commit() log_audit_trail( db, "evidence", evidence_id, evidence.title, "reject", performed_by=body.reviewed_by, change_summary=body.rejection_reason or "Evidence rejected", ) db.commit() db.refresh(evidence) return _build_evidence_response(evidence) # ============================================================================ # Audit Trail Query # ============================================================================ @router.get("/audit-trail") async def get_audit_trail( entity_type: Optional[str] = Query(None), entity_id: Optional[str] = Query(None), action: Optional[str] = Query(None), limit: int = Query(50, ge=1, le=200), db: Session = Depends(get_db), ): """Query audit trail entries for an entity.""" query = db.query(AuditTrailDB) if entity_type: query = query.filter(AuditTrailDB.entity_type == entity_type) if entity_id: query = query.filter(AuditTrailDB.entity_id == entity_id) if action: query = query.filter(AuditTrailDB.action == action) records = query.order_by(AuditTrailDB.performed_at.desc()).limit(limit).all() return { "entries": [ { "id": r.id, "entity_type": r.entity_type, "entity_id": r.entity_id, "entity_name": r.entity_name, "action": r.action, "field_changed": r.field_changed, "old_value": r.old_value, "new_value": r.new_value, "change_summary": r.change_summary, "performed_by": r.performed_by, "performed_at": r.performed_at.isoformat() if r.performed_at else None, "checksum": r.checksum, } for r in records ], "total": len(records), }