fix(api): evidence_routes F821-Regression beheben (Extract-Service-Halb-Refactor)

a638d0e5 ("extract EvidenceService") stellte Signaturen auf service=Depends um,
ließ aber Bodies + Imports auf dem alten Stand → 43 F821 (NameError zur Laufzeit).

- gelöschte stdlib-Imports restauriert (os/json/hashlib/uuid/datetime/timedelta)
- db: Session = Depends(get_db) an den betroffenen Endpoints restauriert
- translate_domain_errors + _update_risks_impl (=evidence_service._update_risks) importiert
- unerreichbaren toten Block (alte get_ci_evidence_status-Impl nach dem return) entfernt
- dsms_cid=None no-op in create/review/reject (DSMS-Commit-Copy-Paste)

Verifiziert: ruff F821 0, py_compile, test_evidence_routes.py 35 passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-30 10:19:28 +02:00
parent d5925e57af
commit 79abf23ea8
@@ -16,6 +16,11 @@ from the legacy path.
"""
import logging
import os
import json
import hashlib
import uuid as uuid_module
from datetime import datetime, timedelta
from typing import Any, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
@@ -30,14 +35,15 @@ from ..db import (
EvidenceConfidenceEnum,
EvidenceTruthStatusEnum,
)
from ..db.models import EvidenceDB, ControlDB, AuditTrailDB
from ..db.models import EvidenceDB, AuditTrailDB
from ..services.auto_risk_updater import AutoRiskUpdater
from ..services.evidence_service import EvidenceService
from ..services.evidence_service import EvidenceService, _update_risks as _update_risks_impl
from .schemas import (
EvidenceCreate, EvidenceResponse, EvidenceListResponse,
EvidenceRejectRequest,
)
from .audit_trail_utils import log_audit_trail
from ._http_errors import translate_domain_errors
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-evidence"])
@@ -146,6 +152,7 @@ async def list_evidence(
status: Optional[str] = None,
page: Optional[int] = Query(None, ge=1, description="Page number (1-based)"),
limit: Optional[int] = Query(None, ge=1, le=500, description="Items per page"),
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceListResponse:
"""List evidence with optional filters and pagination."""
@@ -186,9 +193,11 @@ async def list_evidence(
@router.post("/evidence", response_model=EvidenceResponse)
async def create_evidence(
evidence_data: EvidenceCreate,
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceResponse:
"""Create new evidence record."""
dsms_cid = None
repo = EvidenceRepository(db)
# Get control UUID
@@ -257,6 +266,7 @@ async def create_evidence(
@router.delete("/evidence/{evidence_id}")
async def delete_evidence(
evidence_id: str,
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> dict[str, Any]:
"""Delete an evidence record."""
@@ -275,6 +285,7 @@ async def upload_evidence(
title: str = Query(...),
file: UploadFile = File(...),
description: Optional[str] = Query(None),
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceResponse:
"""Upload evidence file."""
@@ -674,6 +685,7 @@ async def collect_ci_evidence(
async def get_ci_evidence_status(
control_id: Optional[str] = Query(None, description="Filter by control ID"),
days: int = Query(30, description="Look back N days"),
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> dict[str, Any]:
"""Get CI/CD evidence collection status overview."""
@@ -681,70 +693,8 @@ async def get_ci_evidence_status(
return service.ci_status(control_id, days)
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers directly.
# ----------------------------------------------------------------------------
if control_id:
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if control:
query = query.filter(EvidenceDB.control_id == control.id)
evidence_list = query.order_by(EvidenceDB.collected_at.desc()).limit(100).all()
# Group by control and calculate stats
control_stats = defaultdict(lambda: {
"total": 0,
"valid": 0,
"failed": 0,
"last_collected": None,
"evidence": [],
})
for e in evidence_list:
# Get control_id string
control = db.query(ControlDB).filter(ControlDB.id == e.control_id).first()
ctrl_id = control.control_id if control else "unknown"
stats = control_stats[ctrl_id]
stats["total"] += 1
if e.status:
if e.status.value == "valid":
stats["valid"] += 1
elif e.status.value == "failed":
stats["failed"] += 1
if not stats["last_collected"] or e.collected_at > stats["last_collected"]:
stats["last_collected"] = e.collected_at
# Add evidence summary
stats["evidence"].append({
"id": e.id,
"type": e.evidence_type,
"status": e.status.value if e.status else None,
"collected_at": e.collected_at.isoformat() if e.collected_at else None,
"ci_job_id": e.ci_job_id,
})
# Convert to list and sort
result = []
for ctrl_id, stats in control_stats.items():
result.append({
"control_id": ctrl_id,
"total_evidence": stats["total"],
"valid_count": stats["valid"],
"failed_count": stats["failed"],
"last_collected": stats["last_collected"].isoformat() if stats["last_collected"] else None,
"recent_evidence": stats["evidence"][:5],
})
result.sort(key=lambda x: x["last_collected"] or "", reverse=True)
return {
"period_days": days,
"total_evidence": len(evidence_list),
"controls": result,
}
# (Alte CI-Status-Implementierung entfernt — unerreichbarer Code nach `return
# service.ci_status(...)`; durch den Service ersetzt, `query` war nie initialisiert.)
# ============================================================================
@@ -772,6 +722,7 @@ async def review_evidence(
approval_status='first_approved'. A second (different) reviewer then
sets second_reviewer and approval_status='approved'.
"""
dsms_cid = None
evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
if not evidence:
raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found")
@@ -851,6 +802,7 @@ async def reject_evidence(
db: Session = Depends(get_db),
):
"""Reject evidence (sets approval_status='rejected')."""
dsms_cid = None
evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
if not evidence:
raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found")