Files
breakpilot-compliance/backend-compliance/compliance/api/vvt_routes.py
Sharang Parnerkar 4fa0dd6f6d refactor(backend/api): extract VVTService (Step 4 — file 5 of 18)
compliance/api/vvt_routes.py (550 LOC) -> 225 LOC thin routes + 475-line
VVTService. Covers the organization header, processing activities CRUD,
audit log, JSON/CSV export, stats, and version lookups for the Art. 30
DSGVO Verzeichnis.

Single-service split: organization + activities + audit + stats all
revolve around the same tenant's VVT document, and the existing test
suite (tests/test_vvt_routes.py — 768 LOC, tests/test_vvt_tenant_isolation.py
— 205 LOC) exercises them together.

Module-level helpers (_activity_to_response, _log_audit, _export_csv)
stay module-level in compliance.services.vvt_service and are re-exported
from compliance.api.vvt_routes so the two test files keep importing
from the old path.

Pydantic schemas already live in compliance.schemas.vvt from Step 3 —
no new schema file needed this round.

mypy.ini flips compliance.api.vvt_routes from ignore_errors=True to
False. Two SQLAlchemy Column[str] vs str dict-index errors fixed with
explicit str() casts on status/business_function in the stats loop.

Verified:
  - 242/242 pytest (173 core + 69 VVT integration) pass
  - OpenAPI 360/484 unchanged
  - mypy compliance/ -> Success on 128 source files
  - vvt_routes.py 550 -> 225 LOC
  - vvt_service.py 475 LOC (under 500 hard cap)
  - Hard-cap violations: 14 -> 13

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 19:50:40 +02:00

226 lines
7.9 KiB
Python

"""
FastAPI routes for VVT — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO).
Endpoints:
GET /vvt/organization — Load organization header
PUT /vvt/organization — Save organization header
GET /vvt/activities — List activities
POST /vvt/activities — Create new activity
GET /vvt/activities/{id} — Get single activity
PUT /vvt/activities/{id} — Update activity
DELETE /vvt/activities/{id} — Delete activity
GET /vvt/audit-log — Audit trail
GET /vvt/export — JSON or CSV export
GET /vvt/stats — Statistics
GET /vvt/activities/{id}/versions — List activity versions
GET /vvt/activities/{id}/versions/{n} — Get specific version
Phase 1 Step 4 refactor: handlers delegate to VVTService.
"""
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.api.tenant_utils import get_tenant_id
from compliance.schemas.vvt import (
VVTActivityCreate,
VVTActivityResponse,
VVTActivityUpdate,
VVTAuditLogEntry,
VVTOrganizationResponse,
VVTOrganizationUpdate,
VVTStatsResponse,
)
from compliance.services.vvt_service import (
VVTService,
_activity_to_response, # re-exported for legacy test imports
_export_csv, # re-exported for legacy test imports
_log_audit, # re-exported for legacy test imports
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/vvt", tags=["compliance-vvt"])
def get_vvt_service(db: Session = Depends(get_db)) -> VVTService:
return VVTService(db)
# ============================================================================
# Organization Header
# ============================================================================
@router.get("/organization", response_model=Optional[VVTOrganizationResponse])
async def get_organization(
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Optional[VVTOrganizationResponse]:
"""Load the VVT organization header for the given tenant."""
with translate_domain_errors():
return service.get_organization(tid)
@router.put("/organization", response_model=VVTOrganizationResponse)
async def upsert_organization(
request: VVTOrganizationUpdate,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTOrganizationResponse:
"""Create or update the VVT organization header."""
with translate_domain_errors():
return service.upsert_organization(tid, request)
# ============================================================================
# Activities
# ============================================================================
@router.get("/activities", response_model=List[VVTActivityResponse])
async def list_activities(
status: Optional[str] = Query(None),
business_function: Optional[str] = Query(None),
search: Optional[str] = Query(None),
review_overdue: Optional[bool] = Query(None),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> List[VVTActivityResponse]:
"""List all processing activities with optional filters."""
with translate_domain_errors():
return service.list_activities(
tid, status, business_function, search, review_overdue
)
@router.post("/activities", response_model=VVTActivityResponse, status_code=201)
async def create_activity(
request: VVTActivityCreate,
http_request: Request,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Create a new processing activity."""
with translate_domain_errors():
return service.create_activity(
tid, request, http_request.headers.get("X-User-ID")
)
@router.get("/activities/{activity_id}", response_model=VVTActivityResponse)
async def get_activity(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Get a single processing activity by ID."""
with translate_domain_errors():
return service.get_activity(tid, activity_id)
@router.put("/activities/{activity_id}", response_model=VVTActivityResponse)
async def update_activity(
activity_id: str,
request: VVTActivityUpdate,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Update a processing activity."""
with translate_domain_errors():
return service.update_activity(tid, activity_id, request)
@router.delete("/activities/{activity_id}")
async def delete_activity(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> dict[str, Any]:
"""Delete a processing activity."""
with translate_domain_errors():
return service.delete_activity(tid, activity_id)
# ============================================================================
# Audit Log
# ============================================================================
@router.get("/audit-log", response_model=List[VVTAuditLogEntry])
async def get_audit_log(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> List[VVTAuditLogEntry]:
"""Get the VVT audit trail."""
with translate_domain_errors():
return service.audit_log(tid, limit, offset)
# ============================================================================
# Export & Stats
# ============================================================================
@router.get("/export")
async def export_activities(
format: str = Query("json", pattern="^(json|csv)$"),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""Export all activities as JSON or CSV (semicolon-separated, DE locale)."""
with translate_domain_errors():
return service.export(tid, format)
@router.get("/stats", response_model=VVTStatsResponse)
async def get_stats(
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTStatsResponse:
"""Get VVT statistics summary."""
with translate_domain_errors():
return service.stats(tid)
# ============================================================================
# Versioning
# ============================================================================
@router.get("/activities/{activity_id}/versions")
async def list_activity_versions(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""List all versions for a VVT activity."""
with translate_domain_errors():
return service.list_versions(tid, activity_id)
@router.get("/activities/{activity_id}/versions/{version_number}")
async def get_activity_version(
activity_id: str,
version_number: int,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""Get a specific VVT activity version with full snapshot."""
with translate_domain_errors():
return service.get_version(tid, activity_id, version_number)
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"_activity_to_response",
"_log_audit",
"_export_csv",
]