Merge pull request 'fix(api): F821-Regression (Extract-Service-Halb-Refactor) — 7 Route-Dateien' (#44) from fix/api-f821-extract-service-regression into main
CI / detect-changes (push) Successful in 8s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 9s
CI / validate-canonical-controls (push) Successful in 7s
CI / loc-budget (push) Successful in 22s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 27s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

This commit was merged in pull request #44.
This commit is contained in:
2026-06-30 09:06:08 +00:00
7 changed files with 31 additions and 71 deletions
@@ -162,7 +162,7 @@ async def update_ai_system(
db: Session = Depends(get_db),
):
"""Update an AI system."""
from datetime import datetime
from datetime import datetime, timezone
system = db.query(AISystemDB).filter(AISystemDB.id == system_id).first()
if not system:
@@ -226,7 +226,7 @@ async def assess_ai_system(
db: Session = Depends(get_db),
):
"""Run AI Act risk assessment for an AI system."""
from datetime import datetime
from datetime import datetime, timezone
system = db.query(AISystemDB).filter(AISystemDB.id == system_id).first()
if not system:
@@ -47,6 +47,8 @@ from compliance.services.canonical_control_service import (
_control_row, # re-exported for legacy test imports
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"])
@@ -14,7 +14,7 @@ Endpoints:
"""
import logging
from datetime import datetime, date, timedelta
from datetime import datetime, date, timedelta, timezone
from calendar import month_abbr
from typing import Optional, Dict, Any, List
from decimal import Decimal
@@ -26,10 +26,11 @@ versions). Module-level helpers re-exported for legacy tests.
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from fastapi.responses import Response
from sqlalchemy.orm import Session
from sqlalchemy import text
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
@@ -484,6 +485,7 @@ async def list_dsfas(
async def create_dsfa(
request: DSFACreate,
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Neue DSFA erstellen."""
@@ -16,6 +16,11 @@ from the legacy path.
"""
import logging
import os
import json
import hashlib
import uuid as uuid_module
from datetime import datetime, timedelta
from typing import Any, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
@@ -30,14 +35,15 @@ from ..db import (
EvidenceConfidenceEnum,
EvidenceTruthStatusEnum,
)
from ..db.models import EvidenceDB, ControlDB, AuditTrailDB
from ..db.models import EvidenceDB, AuditTrailDB
from ..services.auto_risk_updater import AutoRiskUpdater
from ..services.evidence_service import EvidenceService
from ..services.evidence_service import EvidenceService, _update_risks as _update_risks_impl
from .schemas import (
EvidenceCreate, EvidenceResponse, EvidenceListResponse,
EvidenceRejectRequest,
)
from .audit_trail_utils import log_audit_trail
from ._http_errors import translate_domain_errors
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-evidence"])
@@ -146,6 +152,7 @@ async def list_evidence(
status: Optional[str] = None,
page: Optional[int] = Query(None, ge=1, description="Page number (1-based)"),
limit: Optional[int] = Query(None, ge=1, le=500, description="Items per page"),
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceListResponse:
"""List evidence with optional filters and pagination."""
@@ -186,9 +193,11 @@ async def list_evidence(
@router.post("/evidence", response_model=EvidenceResponse)
async def create_evidence(
evidence_data: EvidenceCreate,
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceResponse:
"""Create new evidence record."""
dsms_cid = None
repo = EvidenceRepository(db)
# Get control UUID
@@ -257,6 +266,7 @@ async def create_evidence(
@router.delete("/evidence/{evidence_id}")
async def delete_evidence(
evidence_id: str,
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> dict[str, Any]:
"""Delete an evidence record."""
@@ -275,6 +285,7 @@ async def upload_evidence(
title: str = Query(...),
file: UploadFile = File(...),
description: Optional[str] = Query(None),
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> EvidenceResponse:
"""Upload evidence file."""
@@ -674,6 +685,7 @@ async def collect_ci_evidence(
async def get_ci_evidence_status(
control_id: Optional[str] = Query(None, description="Filter by control ID"),
days: int = Query(30, description="Look back N days"),
db: Session = Depends(get_db),
service: EvidenceService = Depends(get_evidence_service),
) -> dict[str, Any]:
"""Get CI/CD evidence collection status overview."""
@@ -681,70 +693,8 @@ async def get_ci_evidence_status(
return service.ci_status(control_id, days)
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers directly.
# ----------------------------------------------------------------------------
if control_id:
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if control:
query = query.filter(EvidenceDB.control_id == control.id)
evidence_list = query.order_by(EvidenceDB.collected_at.desc()).limit(100).all()
# Group by control and calculate stats
control_stats = defaultdict(lambda: {
"total": 0,
"valid": 0,
"failed": 0,
"last_collected": None,
"evidence": [],
})
for e in evidence_list:
# Get control_id string
control = db.query(ControlDB).filter(ControlDB.id == e.control_id).first()
ctrl_id = control.control_id if control else "unknown"
stats = control_stats[ctrl_id]
stats["total"] += 1
if e.status:
if e.status.value == "valid":
stats["valid"] += 1
elif e.status.value == "failed":
stats["failed"] += 1
if not stats["last_collected"] or e.collected_at > stats["last_collected"]:
stats["last_collected"] = e.collected_at
# Add evidence summary
stats["evidence"].append({
"id": e.id,
"type": e.evidence_type,
"status": e.status.value if e.status else None,
"collected_at": e.collected_at.isoformat() if e.collected_at else None,
"ci_job_id": e.ci_job_id,
})
# Convert to list and sort
result = []
for ctrl_id, stats in control_stats.items():
result.append({
"control_id": ctrl_id,
"total_evidence": stats["total"],
"valid_count": stats["valid"],
"failed_count": stats["failed"],
"last_collected": stats["last_collected"].isoformat() if stats["last_collected"] else None,
"recent_evidence": stats["evidence"][:5],
})
result.sort(key=lambda x: x["last_collected"] or "", reverse=True)
return {
"period_days": days,
"total_evidence": len(evidence_list),
"controls": result,
}
# (Alte CI-Status-Implementierung entfernt — unerreichbarer Code nach `return
# service.ci_status(...)`; durch den Service ersetzt, `query` war nie initialisiert.)
# ============================================================================
@@ -772,6 +722,7 @@ async def review_evidence(
approval_status='first_approved'. A second (different) reviewer then
sets second_reviewer and approval_status='approved'.
"""
dsms_cid = None
evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
if not evidence:
raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found")
@@ -851,6 +802,7 @@ async def reject_evidence(
db: Session = Depends(get_db),
):
"""Reject evidence (sets approval_status='rejected')."""
dsms_cid = None
evidence = db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
if not evidence:
raise HTTPException(status_code=404, detail=f"Evidence {evidence_id} not found")
@@ -24,6 +24,7 @@ from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db.models import EvidenceDB
from .audit_trail_utils import log_audit_trail
from ..db import (
@@ -310,6 +311,7 @@ async def list_controls_paginated(
)
async def get_control(
control_id: str,
db: Session = Depends(get_db),
svc: ControlExportService = Depends(get_ctrl_export_service),
) -> ControlResponse:
"""Get a specific control by control_id."""
@@ -354,6 +356,7 @@ async def get_control(
async def update_control(
control_id: str,
update: ControlUpdate,
db: Session = Depends(get_db),
svc: ControlExportService = Depends(get_ctrl_export_service),
) -> ControlResponse:
"""Update a control."""
@@ -443,6 +446,7 @@ async def update_control(
async def review_control(
control_id: str,
review: ControlReviewRequest,
db: Session = Depends(get_db),
svc: ControlExportService = Depends(get_ctrl_export_service),
) -> ControlResponse:
"""Mark a control as reviewed with new status."""
@@ -21,7 +21,7 @@ Phase 1 Step 4 refactor: handlers delegate to VVTService.
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Query, Request
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session