Files
breakpilot-compliance/backend-compliance/compliance/api/vvt_routes.py
T
Benjamin Admin 3389fa3e7a
CI / detect-changes (pull_request) Successful in 5s
CI / branch-name (pull_request) Successful in 1s
CI / guardrail-integrity (pull_request) Successful in 5s
CI / secret-scan (pull_request) Successful in 8s
CI / dep-audit (pull_request) Failing after 57s
CI / sbom-scan (pull_request) Failing after 56s
CI / build-sha-integrity (pull_request) Successful in 6s
CI / validate-canonical-controls (pull_request) Successful in 5s
CI / loc-budget (pull_request) Successful in 22s
CI / go-lint (pull_request) Successful in 46s
CI / python-lint (pull_request) Failing after 17s
CI / nodejs-lint (pull_request) Failing after 1m8s
CI / nodejs-build (pull_request) Successful in 3m1s
CI / test-go (pull_request) Successful in 1m2s
CI / iace-gt-coverage (pull_request) Successful in 18s
CI / test-python-backend (pull_request) Successful in 25s
CI / test-python-document-crawler (pull_request) Successful in 14s
CI / test-python-dsms-gateway (pull_request) Successful in 10s
fix(api): F821-Regression in 6 weiteren Route-Dateien beheben
Gleiche Wurzel wie evidence_routes (Extract-Service-Refactor a638d0e5 ff.):
Signaturen/Imports halb umgestellt → undefined names → NameError beim Aufruf.

- routes.py: db-Param in get_control/update_control/review_control + EvidenceDB-Import
- dsfa_routes.py: db-Param in create_dsfa + HTTPException/text-Import
- dashboard_routes.py: timezone-Import
- canonical_control_routes.py: logger-Definition
- ai_routes.py: timezone in den lokalen datetime-Imports
- vvt_routes.py: HTTPException-Import

Verifiziert: ruff F821 0 über das gesamte compliance/api/, alle 6 py_compile,
294 Tests grün auf den betroffenen Modulen (die 2 dsfa-invalid-status/risk-Failures
sind vorbestehend = 400-vs-422, unabhängig von diesem Fix).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-30 10:51:00 +02:00

376 lines
13 KiB
Python

"""
FastAPI routes for VVT — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO).
Endpoints:
GET /vvt/organization — Load organization header
PUT /vvt/organization — Save organization header
GET /vvt/activities — List activities
POST /vvt/activities — Create new activity
GET /vvt/activities/{id} — Get single activity
PUT /vvt/activities/{id} — Update activity
DELETE /vvt/activities/{id} — Delete activity
GET /vvt/audit-log — Audit trail
GET /vvt/export — JSON or CSV export
GET /vvt/stats — Statistics
GET /vvt/activities/{id}/versions — List activity versions
GET /vvt/activities/{id}/versions/{n} — Get specific version
Phase 1 Step 4 refactor: handlers delegate to VVTService.
"""
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.api.tenant_utils import get_tenant_id
from compliance.schemas.vvt import (
VVTActivityCreate,
VVTActivityResponse,
VVTActivityUpdate,
VVTAuditLogEntry,
VVTOrganizationResponse,
VVTOrganizationUpdate,
VVTStatsResponse,
)
from compliance.db.vvt_models import VVTActivityDB
from compliance.services.vvt_service import (
VVTService,
_activity_to_response, # re-exported for legacy test imports
_export_csv, # re-exported for legacy test imports
_log_audit, # re-exported for legacy test imports
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/vvt", tags=["compliance-vvt"])
def get_vvt_service(db: Session = Depends(get_db)) -> VVTService:
return VVTService(db)
# ============================================================================
# Organization Header
# ============================================================================
@router.get("/organization", response_model=Optional[VVTOrganizationResponse])
async def get_organization(
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Optional[VVTOrganizationResponse]:
"""Load the VVT organization header for the given tenant."""
with translate_domain_errors():
return service.get_organization(tid)
@router.put("/organization", response_model=VVTOrganizationResponse)
async def upsert_organization(
request: VVTOrganizationUpdate,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTOrganizationResponse:
"""Create or update the VVT organization header."""
with translate_domain_errors():
return service.upsert_organization(tid, request)
# ============================================================================
# Activities
# ============================================================================
def _activity_to_response(act: VVTActivityDB) -> VVTActivityResponse:
return VVTActivityResponse(
id=str(act.id),
vvt_id=act.vvt_id,
name=act.name,
description=act.description,
purposes=act.purposes or [],
legal_bases=act.legal_bases or [],
data_subject_categories=act.data_subject_categories or [],
personal_data_categories=act.personal_data_categories or [],
recipient_categories=act.recipient_categories or [],
third_country_transfers=act.third_country_transfers or [],
retention_period=act.retention_period or {},
tom_description=act.tom_description,
business_function=act.business_function,
systems=act.systems or [],
deployment_model=act.deployment_model,
data_sources=act.data_sources or [],
data_flows=act.data_flows or [],
protection_level=act.protection_level or 'MEDIUM',
dpia_required=act.dpia_required or False,
structured_toms=act.structured_toms or {},
status=act.status or 'DRAFT',
responsible=act.responsible,
owner=act.owner,
last_reviewed_at=act.last_reviewed_at,
next_review_at=act.next_review_at,
created_by=act.created_by,
dsfa_id=str(act.dsfa_id) if act.dsfa_id else None,
# Library refs
purpose_refs=act.purpose_refs,
legal_basis_refs=act.legal_basis_refs,
data_subject_refs=act.data_subject_refs,
data_category_refs=act.data_category_refs,
recipient_refs=act.recipient_refs,
retention_rule_ref=act.retention_rule_ref,
transfer_mechanism_refs=act.transfer_mechanism_refs,
tom_refs=act.tom_refs,
source_template_id=act.source_template_id,
risk_score=act.risk_score,
linked_loeschfristen_ids=act.linked_loeschfristen_ids,
linked_tom_measure_ids=act.linked_tom_measure_ids,
art30_completeness=act.art30_completeness,
created_at=act.created_at,
updated_at=act.updated_at,
)
@router.get("/activities", response_model=List[VVTActivityResponse])
async def list_activities(
status: Optional[str] = Query(None),
business_function: Optional[str] = Query(None),
search: Optional[str] = Query(None),
review_overdue: Optional[bool] = Query(None),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> List[VVTActivityResponse]:
"""List all processing activities with optional filters."""
with translate_domain_errors():
return service.list_activities(
tid, status, business_function, search, review_overdue
)
@router.post("/activities", response_model=VVTActivityResponse, status_code=201)
async def create_activity(
request: VVTActivityCreate,
http_request: Request,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Create a new processing activity."""
with translate_domain_errors():
return service.create_activity(
tid, request, http_request.headers.get("X-User-ID")
)
@router.get("/activities/{activity_id}", response_model=VVTActivityResponse)
async def get_activity(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Get a single processing activity by ID."""
with translate_domain_errors():
return service.get_activity(tid, activity_id)
@router.put("/activities/{activity_id}", response_model=VVTActivityResponse)
async def update_activity(
activity_id: str,
request: VVTActivityUpdate,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Update a processing activity."""
with translate_domain_errors():
return service.update_activity(tid, activity_id, request)
@router.delete("/activities/{activity_id}")
async def delete_activity(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> dict[str, Any]:
"""Delete a processing activity."""
with translate_domain_errors():
return service.delete_activity(tid, activity_id)
# ============================================================================
# Art. 30 Completeness Check
# ============================================================================
@router.get("/activities/{activity_id}/completeness")
async def get_activity_completeness(
activity_id: str,
tid: str = Depends(get_tenant_id),
db: Session = Depends(get_db),
):
"""Calculate Art. 30 completeness score for a VVT activity."""
act = db.query(VVTActivityDB).filter(
VVTActivityDB.id == activity_id,
VVTActivityDB.tenant_id == tid,
).first()
if not act:
raise HTTPException(status_code=404, detail=f"Activity {activity_id} not found")
return _calculate_completeness(act)
def _calculate_completeness(act: VVTActivityDB) -> dict:
"""Calculate Art. 30 completeness — required fields per DSGVO Art. 30 Abs. 1."""
missing = []
warnings = []
total_checks = 10
passed = 0
# 1. Name/Zweck
if act.name:
passed += 1
else:
missing.append("name")
# 2. Verarbeitungszwecke
has_purposes = bool(act.purposes) or bool(act.purpose_refs)
if has_purposes:
passed += 1
else:
missing.append("purposes")
# 3. Rechtsgrundlage
has_legal = bool(act.legal_bases) or bool(act.legal_basis_refs)
if has_legal:
passed += 1
else:
missing.append("legal_bases")
# 4. Betroffenenkategorien
has_subjects = bool(act.data_subject_categories) or bool(act.data_subject_refs)
if has_subjects:
passed += 1
else:
missing.append("data_subjects")
# 5. Datenkategorien
has_categories = bool(act.personal_data_categories) or bool(act.data_category_refs)
if has_categories:
passed += 1
else:
missing.append("data_categories")
# 6. Empfaenger
has_recipients = bool(act.recipient_categories) or bool(act.recipient_refs)
if has_recipients:
passed += 1
else:
missing.append("recipients")
# 7. Drittland-Uebermittlung (checked but not strictly required)
passed += 1 # always passes — no transfer is valid state
# 8. Loeschfristen
has_retention = bool(act.retention_period and act.retention_period.get('description')) or bool(act.retention_rule_ref)
if has_retention:
passed += 1
else:
missing.append("retention_period")
# 9. TOM-Beschreibung
has_tom = bool(act.tom_description) or bool(act.tom_refs) or bool(act.structured_toms)
if has_tom:
passed += 1
else:
missing.append("tom_description")
# 10. Verantwortlicher
if act.responsible:
passed += 1
else:
missing.append("responsible")
# Warnings
if act.dpia_required and not act.dsfa_id:
warnings.append("dpia_required_but_no_dsfa_linked")
if act.third_country_transfers and not act.transfer_mechanism_refs:
warnings.append("third_country_transfer_without_mechanism")
score = int((passed / total_checks) * 100)
return {"score": score, "missing": missing, "warnings": warnings, "passed": passed, "total": total_checks}
# ============================================================================
# Audit Log
# ============================================================================
@router.get("/audit-log", response_model=List[VVTAuditLogEntry])
async def get_audit_log(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> List[VVTAuditLogEntry]:
"""Get the VVT audit trail."""
with translate_domain_errors():
return service.audit_log(tid, limit, offset)
# ============================================================================
# Export & Stats
# ============================================================================
@router.get("/export")
async def export_activities(
format: str = Query("json", pattern="^(json|csv)$"),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""Export all activities as JSON or CSV (semicolon-separated, DE locale)."""
with translate_domain_errors():
return service.export(tid, format)
@router.get("/stats", response_model=VVTStatsResponse)
async def get_stats(
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTStatsResponse:
"""Get VVT statistics summary."""
with translate_domain_errors():
return service.stats(tid)
# ============================================================================
# Versioning
# ============================================================================
@router.get("/activities/{activity_id}/versions")
async def list_activity_versions(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""List all versions for a VVT activity."""
with translate_domain_errors():
return service.list_versions(tid, activity_id)
@router.get("/activities/{activity_id}/versions/{version_number}")
async def get_activity_version(
activity_id: str,
version_number: int,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""Get a specific VVT activity version with full snapshot."""
with translate_domain_errors():
return service.get_version(tid, activity_id, version_number)
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"_activity_to_response",
"_log_audit",
"_export_csv",
]