Files
Sharang Parnerkar c43d9da6d0 merge: sync with origin/main, take upstream on conflicts
# Conflicts:
#	admin-compliance/lib/sdk/types.ts
#	admin-compliance/lib/sdk/vendor-compliance/types.ts
2026-04-16 16:26:48 +02:00

375 lines
13 KiB
Python

"""
FastAPI routes for VVT — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO).
Endpoints:
GET /vvt/organization — Load organization header
PUT /vvt/organization — Save organization header
GET /vvt/activities — List activities
POST /vvt/activities — Create new activity
GET /vvt/activities/{id} — Get single activity
PUT /vvt/activities/{id} — Update activity
DELETE /vvt/activities/{id} — Delete activity
GET /vvt/audit-log — Audit trail
GET /vvt/export — JSON or CSV export
GET /vvt/stats — Statistics
GET /vvt/activities/{id}/versions — List activity versions
GET /vvt/activities/{id}/versions/{n} — Get specific version
Phase 1 Step 4 refactor: handlers delegate to VVTService.
"""
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.api.tenant_utils import get_tenant_id
from compliance.schemas.vvt import (
VVTActivityCreate,
VVTActivityResponse,
VVTActivityUpdate,
VVTAuditLogEntry,
VVTOrganizationResponse,
VVTOrganizationUpdate,
VVTStatsResponse,
)
from compliance.services.vvt_service import (
VVTService,
_activity_to_response, # re-exported for legacy test imports
_export_csv, # re-exported for legacy test imports
_log_audit, # re-exported for legacy test imports
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/vvt", tags=["compliance-vvt"])
def get_vvt_service(db: Session = Depends(get_db)) -> VVTService:
return VVTService(db)
# ============================================================================
# Organization Header
# ============================================================================
@router.get("/organization", response_model=Optional[VVTOrganizationResponse])
async def get_organization(
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Optional[VVTOrganizationResponse]:
"""Load the VVT organization header for the given tenant."""
with translate_domain_errors():
return service.get_organization(tid)
@router.put("/organization", response_model=VVTOrganizationResponse)
async def upsert_organization(
request: VVTOrganizationUpdate,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTOrganizationResponse:
"""Create or update the VVT organization header."""
with translate_domain_errors():
return service.upsert_organization(tid, request)
# ============================================================================
# Activities
# ============================================================================
def _activity_to_response(act: VVTActivityDB) -> VVTActivityResponse:
return VVTActivityResponse(
id=str(act.id),
vvt_id=act.vvt_id,
name=act.name,
description=act.description,
purposes=act.purposes or [],
legal_bases=act.legal_bases or [],
data_subject_categories=act.data_subject_categories or [],
personal_data_categories=act.personal_data_categories or [],
recipient_categories=act.recipient_categories or [],
third_country_transfers=act.third_country_transfers or [],
retention_period=act.retention_period or {},
tom_description=act.tom_description,
business_function=act.business_function,
systems=act.systems or [],
deployment_model=act.deployment_model,
data_sources=act.data_sources or [],
data_flows=act.data_flows or [],
protection_level=act.protection_level or 'MEDIUM',
dpia_required=act.dpia_required or False,
structured_toms=act.structured_toms or {},
status=act.status or 'DRAFT',
responsible=act.responsible,
owner=act.owner,
last_reviewed_at=act.last_reviewed_at,
next_review_at=act.next_review_at,
created_by=act.created_by,
dsfa_id=str(act.dsfa_id) if act.dsfa_id else None,
# Library refs
purpose_refs=act.purpose_refs,
legal_basis_refs=act.legal_basis_refs,
data_subject_refs=act.data_subject_refs,
data_category_refs=act.data_category_refs,
recipient_refs=act.recipient_refs,
retention_rule_ref=act.retention_rule_ref,
transfer_mechanism_refs=act.transfer_mechanism_refs,
tom_refs=act.tom_refs,
source_template_id=act.source_template_id,
risk_score=act.risk_score,
linked_loeschfristen_ids=act.linked_loeschfristen_ids,
linked_tom_measure_ids=act.linked_tom_measure_ids,
art30_completeness=act.art30_completeness,
created_at=act.created_at,
updated_at=act.updated_at,
)
@router.get("/activities", response_model=List[VVTActivityResponse])
async def list_activities(
status: Optional[str] = Query(None),
business_function: Optional[str] = Query(None),
search: Optional[str] = Query(None),
review_overdue: Optional[bool] = Query(None),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> List[VVTActivityResponse]:
"""List all processing activities with optional filters."""
with translate_domain_errors():
return service.list_activities(
tid, status, business_function, search, review_overdue
)
@router.post("/activities", response_model=VVTActivityResponse, status_code=201)
async def create_activity(
request: VVTActivityCreate,
http_request: Request,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Create a new processing activity."""
with translate_domain_errors():
return service.create_activity(
tid, request, http_request.headers.get("X-User-ID")
)
@router.get("/activities/{activity_id}", response_model=VVTActivityResponse)
async def get_activity(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Get a single processing activity by ID."""
with translate_domain_errors():
return service.get_activity(tid, activity_id)
@router.put("/activities/{activity_id}", response_model=VVTActivityResponse)
async def update_activity(
activity_id: str,
request: VVTActivityUpdate,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTActivityResponse:
"""Update a processing activity."""
with translate_domain_errors():
return service.update_activity(tid, activity_id, request)
@router.delete("/activities/{activity_id}")
async def delete_activity(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> dict[str, Any]:
"""Delete a processing activity."""
with translate_domain_errors():
return service.delete_activity(tid, activity_id)
# ============================================================================
# Art. 30 Completeness Check
# ============================================================================
@router.get("/activities/{activity_id}/completeness")
async def get_activity_completeness(
activity_id: str,
tid: str = Depends(get_tenant_id),
db: Session = Depends(get_db),
):
"""Calculate Art. 30 completeness score for a VVT activity."""
act = db.query(VVTActivityDB).filter(
VVTActivityDB.id == activity_id,
VVTActivityDB.tenant_id == tid,
).first()
if not act:
raise HTTPException(status_code=404, detail=f"Activity {activity_id} not found")
return _calculate_completeness(act)
def _calculate_completeness(act: VVTActivityDB) -> dict:
"""Calculate Art. 30 completeness — required fields per DSGVO Art. 30 Abs. 1."""
missing = []
warnings = []
total_checks = 10
passed = 0
# 1. Name/Zweck
if act.name:
passed += 1
else:
missing.append("name")
# 2. Verarbeitungszwecke
has_purposes = bool(act.purposes) or bool(act.purpose_refs)
if has_purposes:
passed += 1
else:
missing.append("purposes")
# 3. Rechtsgrundlage
has_legal = bool(act.legal_bases) or bool(act.legal_basis_refs)
if has_legal:
passed += 1
else:
missing.append("legal_bases")
# 4. Betroffenenkategorien
has_subjects = bool(act.data_subject_categories) or bool(act.data_subject_refs)
if has_subjects:
passed += 1
else:
missing.append("data_subjects")
# 5. Datenkategorien
has_categories = bool(act.personal_data_categories) or bool(act.data_category_refs)
if has_categories:
passed += 1
else:
missing.append("data_categories")
# 6. Empfaenger
has_recipients = bool(act.recipient_categories) or bool(act.recipient_refs)
if has_recipients:
passed += 1
else:
missing.append("recipients")
# 7. Drittland-Uebermittlung (checked but not strictly required)
passed += 1 # always passes — no transfer is valid state
# 8. Loeschfristen
has_retention = bool(act.retention_period and act.retention_period.get('description')) or bool(act.retention_rule_ref)
if has_retention:
passed += 1
else:
missing.append("retention_period")
# 9. TOM-Beschreibung
has_tom = bool(act.tom_description) or bool(act.tom_refs) or bool(act.structured_toms)
if has_tom:
passed += 1
else:
missing.append("tom_description")
# 10. Verantwortlicher
if act.responsible:
passed += 1
else:
missing.append("responsible")
# Warnings
if act.dpia_required and not act.dsfa_id:
warnings.append("dpia_required_but_no_dsfa_linked")
if act.third_country_transfers and not act.transfer_mechanism_refs:
warnings.append("third_country_transfer_without_mechanism")
score = int((passed / total_checks) * 100)
return {"score": score, "missing": missing, "warnings": warnings, "passed": passed, "total": total_checks}
# ============================================================================
# Audit Log
# ============================================================================
@router.get("/audit-log", response_model=List[VVTAuditLogEntry])
async def get_audit_log(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> List[VVTAuditLogEntry]:
"""Get the VVT audit trail."""
with translate_domain_errors():
return service.audit_log(tid, limit, offset)
# ============================================================================
# Export & Stats
# ============================================================================
@router.get("/export")
async def export_activities(
format: str = Query("json", pattern="^(json|csv)$"),
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""Export all activities as JSON or CSV (semicolon-separated, DE locale)."""
with translate_domain_errors():
return service.export(tid, format)
@router.get("/stats", response_model=VVTStatsResponse)
async def get_stats(
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> VVTStatsResponse:
"""Get VVT statistics summary."""
with translate_domain_errors():
return service.stats(tid)
# ============================================================================
# Versioning
# ============================================================================
@router.get("/activities/{activity_id}/versions")
async def list_activity_versions(
activity_id: str,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""List all versions for a VVT activity."""
with translate_domain_errors():
return service.list_versions(tid, activity_id)
@router.get("/activities/{activity_id}/versions/{version_number}")
async def get_activity_version(
activity_id: str,
version_number: int,
tid: str = Depends(get_tenant_id),
service: VVTService = Depends(get_vvt_service),
) -> Any:
"""Get a specific VVT activity version with full snapshot."""
with translate_domain_errors():
return service.get_version(tid, activity_id, version_number)
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"_activity_to_response",
"_log_audit",
"_export_csv",
]