# Conflicts: # admin-compliance/lib/sdk/types.ts # admin-compliance/lib/sdk/vendor-compliance/types.ts
375 lines
13 KiB
Python
375 lines
13 KiB
Python
"""
|
|
FastAPI routes for VVT — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO).
|
|
|
|
Endpoints:
|
|
GET /vvt/organization — Load organization header
|
|
PUT /vvt/organization — Save organization header
|
|
GET /vvt/activities — List activities
|
|
POST /vvt/activities — Create new activity
|
|
GET /vvt/activities/{id} — Get single activity
|
|
PUT /vvt/activities/{id} — Update activity
|
|
DELETE /vvt/activities/{id} — Delete activity
|
|
GET /vvt/audit-log — Audit trail
|
|
GET /vvt/export — JSON or CSV export
|
|
GET /vvt/stats — Statistics
|
|
GET /vvt/activities/{id}/versions — List activity versions
|
|
GET /vvt/activities/{id}/versions/{n} — Get specific version
|
|
|
|
Phase 1 Step 4 refactor: handlers delegate to VVTService.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any, List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query, Request
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from classroom_engine.database import get_db
|
|
from compliance.api._http_errors import translate_domain_errors
|
|
from compliance.api.tenant_utils import get_tenant_id
|
|
from compliance.schemas.vvt import (
|
|
VVTActivityCreate,
|
|
VVTActivityResponse,
|
|
VVTActivityUpdate,
|
|
VVTAuditLogEntry,
|
|
VVTOrganizationResponse,
|
|
VVTOrganizationUpdate,
|
|
VVTStatsResponse,
|
|
)
|
|
from compliance.services.vvt_service import (
|
|
VVTService,
|
|
_activity_to_response, # re-exported for legacy test imports
|
|
_export_csv, # re-exported for legacy test imports
|
|
_log_audit, # re-exported for legacy test imports
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/vvt", tags=["compliance-vvt"])
|
|
|
|
|
|
def get_vvt_service(db: Session = Depends(get_db)) -> VVTService:
|
|
return VVTService(db)
|
|
|
|
|
|
# ============================================================================
|
|
# Organization Header
|
|
# ============================================================================
|
|
|
|
@router.get("/organization", response_model=Optional[VVTOrganizationResponse])
|
|
async def get_organization(
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> Optional[VVTOrganizationResponse]:
|
|
"""Load the VVT organization header for the given tenant."""
|
|
with translate_domain_errors():
|
|
return service.get_organization(tid)
|
|
|
|
|
|
@router.put("/organization", response_model=VVTOrganizationResponse)
|
|
async def upsert_organization(
|
|
request: VVTOrganizationUpdate,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> VVTOrganizationResponse:
|
|
"""Create or update the VVT organization header."""
|
|
with translate_domain_errors():
|
|
return service.upsert_organization(tid, request)
|
|
|
|
|
|
# ============================================================================
|
|
# Activities
|
|
# ============================================================================
|
|
|
|
def _activity_to_response(act: VVTActivityDB) -> VVTActivityResponse:
|
|
return VVTActivityResponse(
|
|
id=str(act.id),
|
|
vvt_id=act.vvt_id,
|
|
name=act.name,
|
|
description=act.description,
|
|
purposes=act.purposes or [],
|
|
legal_bases=act.legal_bases or [],
|
|
data_subject_categories=act.data_subject_categories or [],
|
|
personal_data_categories=act.personal_data_categories or [],
|
|
recipient_categories=act.recipient_categories or [],
|
|
third_country_transfers=act.third_country_transfers or [],
|
|
retention_period=act.retention_period or {},
|
|
tom_description=act.tom_description,
|
|
business_function=act.business_function,
|
|
systems=act.systems or [],
|
|
deployment_model=act.deployment_model,
|
|
data_sources=act.data_sources or [],
|
|
data_flows=act.data_flows or [],
|
|
protection_level=act.protection_level or 'MEDIUM',
|
|
dpia_required=act.dpia_required or False,
|
|
structured_toms=act.structured_toms or {},
|
|
status=act.status or 'DRAFT',
|
|
responsible=act.responsible,
|
|
owner=act.owner,
|
|
last_reviewed_at=act.last_reviewed_at,
|
|
next_review_at=act.next_review_at,
|
|
created_by=act.created_by,
|
|
dsfa_id=str(act.dsfa_id) if act.dsfa_id else None,
|
|
# Library refs
|
|
purpose_refs=act.purpose_refs,
|
|
legal_basis_refs=act.legal_basis_refs,
|
|
data_subject_refs=act.data_subject_refs,
|
|
data_category_refs=act.data_category_refs,
|
|
recipient_refs=act.recipient_refs,
|
|
retention_rule_ref=act.retention_rule_ref,
|
|
transfer_mechanism_refs=act.transfer_mechanism_refs,
|
|
tom_refs=act.tom_refs,
|
|
source_template_id=act.source_template_id,
|
|
risk_score=act.risk_score,
|
|
linked_loeschfristen_ids=act.linked_loeschfristen_ids,
|
|
linked_tom_measure_ids=act.linked_tom_measure_ids,
|
|
art30_completeness=act.art30_completeness,
|
|
created_at=act.created_at,
|
|
updated_at=act.updated_at,
|
|
)
|
|
|
|
|
|
@router.get("/activities", response_model=List[VVTActivityResponse])
|
|
async def list_activities(
|
|
status: Optional[str] = Query(None),
|
|
business_function: Optional[str] = Query(None),
|
|
search: Optional[str] = Query(None),
|
|
review_overdue: Optional[bool] = Query(None),
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> List[VVTActivityResponse]:
|
|
"""List all processing activities with optional filters."""
|
|
with translate_domain_errors():
|
|
return service.list_activities(
|
|
tid, status, business_function, search, review_overdue
|
|
)
|
|
|
|
|
|
@router.post("/activities", response_model=VVTActivityResponse, status_code=201)
|
|
async def create_activity(
|
|
request: VVTActivityCreate,
|
|
http_request: Request,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> VVTActivityResponse:
|
|
"""Create a new processing activity."""
|
|
with translate_domain_errors():
|
|
return service.create_activity(
|
|
tid, request, http_request.headers.get("X-User-ID")
|
|
)
|
|
|
|
|
|
@router.get("/activities/{activity_id}", response_model=VVTActivityResponse)
|
|
async def get_activity(
|
|
activity_id: str,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> VVTActivityResponse:
|
|
"""Get a single processing activity by ID."""
|
|
with translate_domain_errors():
|
|
return service.get_activity(tid, activity_id)
|
|
|
|
|
|
@router.put("/activities/{activity_id}", response_model=VVTActivityResponse)
|
|
async def update_activity(
|
|
activity_id: str,
|
|
request: VVTActivityUpdate,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> VVTActivityResponse:
|
|
"""Update a processing activity."""
|
|
with translate_domain_errors():
|
|
return service.update_activity(tid, activity_id, request)
|
|
|
|
|
|
@router.delete("/activities/{activity_id}")
|
|
async def delete_activity(
|
|
activity_id: str,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> dict[str, Any]:
|
|
"""Delete a processing activity."""
|
|
with translate_domain_errors():
|
|
return service.delete_activity(tid, activity_id)
|
|
|
|
|
|
# ============================================================================
|
|
# Art. 30 Completeness Check
|
|
# ============================================================================
|
|
|
|
@router.get("/activities/{activity_id}/completeness")
|
|
async def get_activity_completeness(
|
|
activity_id: str,
|
|
tid: str = Depends(get_tenant_id),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Calculate Art. 30 completeness score for a VVT activity."""
|
|
act = db.query(VVTActivityDB).filter(
|
|
VVTActivityDB.id == activity_id,
|
|
VVTActivityDB.tenant_id == tid,
|
|
).first()
|
|
if not act:
|
|
raise HTTPException(status_code=404, detail=f"Activity {activity_id} not found")
|
|
return _calculate_completeness(act)
|
|
|
|
|
|
def _calculate_completeness(act: VVTActivityDB) -> dict:
|
|
"""Calculate Art. 30 completeness — required fields per DSGVO Art. 30 Abs. 1."""
|
|
missing = []
|
|
warnings = []
|
|
total_checks = 10
|
|
passed = 0
|
|
|
|
# 1. Name/Zweck
|
|
if act.name:
|
|
passed += 1
|
|
else:
|
|
missing.append("name")
|
|
|
|
# 2. Verarbeitungszwecke
|
|
has_purposes = bool(act.purposes) or bool(act.purpose_refs)
|
|
if has_purposes:
|
|
passed += 1
|
|
else:
|
|
missing.append("purposes")
|
|
|
|
# 3. Rechtsgrundlage
|
|
has_legal = bool(act.legal_bases) or bool(act.legal_basis_refs)
|
|
if has_legal:
|
|
passed += 1
|
|
else:
|
|
missing.append("legal_bases")
|
|
|
|
# 4. Betroffenenkategorien
|
|
has_subjects = bool(act.data_subject_categories) or bool(act.data_subject_refs)
|
|
if has_subjects:
|
|
passed += 1
|
|
else:
|
|
missing.append("data_subjects")
|
|
|
|
# 5. Datenkategorien
|
|
has_categories = bool(act.personal_data_categories) or bool(act.data_category_refs)
|
|
if has_categories:
|
|
passed += 1
|
|
else:
|
|
missing.append("data_categories")
|
|
|
|
# 6. Empfaenger
|
|
has_recipients = bool(act.recipient_categories) or bool(act.recipient_refs)
|
|
if has_recipients:
|
|
passed += 1
|
|
else:
|
|
missing.append("recipients")
|
|
|
|
# 7. Drittland-Uebermittlung (checked but not strictly required)
|
|
passed += 1 # always passes — no transfer is valid state
|
|
|
|
# 8. Loeschfristen
|
|
has_retention = bool(act.retention_period and act.retention_period.get('description')) or bool(act.retention_rule_ref)
|
|
if has_retention:
|
|
passed += 1
|
|
else:
|
|
missing.append("retention_period")
|
|
|
|
# 9. TOM-Beschreibung
|
|
has_tom = bool(act.tom_description) or bool(act.tom_refs) or bool(act.structured_toms)
|
|
if has_tom:
|
|
passed += 1
|
|
else:
|
|
missing.append("tom_description")
|
|
|
|
# 10. Verantwortlicher
|
|
if act.responsible:
|
|
passed += 1
|
|
else:
|
|
missing.append("responsible")
|
|
|
|
# Warnings
|
|
if act.dpia_required and not act.dsfa_id:
|
|
warnings.append("dpia_required_but_no_dsfa_linked")
|
|
if act.third_country_transfers and not act.transfer_mechanism_refs:
|
|
warnings.append("third_country_transfer_without_mechanism")
|
|
|
|
score = int((passed / total_checks) * 100)
|
|
return {"score": score, "missing": missing, "warnings": warnings, "passed": passed, "total": total_checks}
|
|
|
|
|
|
# ============================================================================
|
|
# Audit Log
|
|
# ============================================================================
|
|
|
|
@router.get("/audit-log", response_model=List[VVTAuditLogEntry])
|
|
async def get_audit_log(
|
|
limit: int = Query(50, ge=1, le=500),
|
|
offset: int = Query(0, ge=0),
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> List[VVTAuditLogEntry]:
|
|
"""Get the VVT audit trail."""
|
|
with translate_domain_errors():
|
|
return service.audit_log(tid, limit, offset)
|
|
|
|
|
|
# ============================================================================
|
|
# Export & Stats
|
|
# ============================================================================
|
|
|
|
@router.get("/export")
|
|
async def export_activities(
|
|
format: str = Query("json", pattern="^(json|csv)$"),
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> Any:
|
|
"""Export all activities as JSON or CSV (semicolon-separated, DE locale)."""
|
|
with translate_domain_errors():
|
|
return service.export(tid, format)
|
|
|
|
|
|
@router.get("/stats", response_model=VVTStatsResponse)
|
|
async def get_stats(
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> VVTStatsResponse:
|
|
"""Get VVT statistics summary."""
|
|
with translate_domain_errors():
|
|
return service.stats(tid)
|
|
|
|
|
|
# ============================================================================
|
|
# Versioning
|
|
# ============================================================================
|
|
|
|
@router.get("/activities/{activity_id}/versions")
|
|
async def list_activity_versions(
|
|
activity_id: str,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> Any:
|
|
"""List all versions for a VVT activity."""
|
|
with translate_domain_errors():
|
|
return service.list_versions(tid, activity_id)
|
|
|
|
|
|
@router.get("/activities/{activity_id}/versions/{version_number}")
|
|
async def get_activity_version(
|
|
activity_id: str,
|
|
version_number: int,
|
|
tid: str = Depends(get_tenant_id),
|
|
service: VVTService = Depends(get_vvt_service),
|
|
) -> Any:
|
|
"""Get a specific VVT activity version with full snapshot."""
|
|
with translate_domain_errors():
|
|
return service.get_version(tid, activity_id, version_number)
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Legacy re-exports for tests that import helpers directly.
|
|
# ----------------------------------------------------------------------------
|
|
|
|
__all__ = [
|
|
"router",
|
|
"_activity_to_response",
|
|
"_log_audit",
|
|
"_export_csv",
|
|
]
|