""" FastAPI routes for DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO). Endpoints: GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit) POST /v1/dsfa — Neu erstellen -> 201 GET /v1/dsfa/stats — Zaehler nach Status GET /v1/dsfa/audit-log — Audit-Log GET /v1/dsfa/export/csv — CSV-Export aller DSFAs POST /v1/dsfa/from-assessment/{id} — Stub: DSFA aus UCCA-Assessment GET /v1/dsfa/by-assessment/{id} — Stub: DSFA nach Assessment-ID GET /v1/dsfa/{id} — Detail PUT /v1/dsfa/{id} — Update DELETE /v1/dsfa/{id} — Loeschen (Art. 17 DSGVO) PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel PUT /v1/dsfa/{id}/sections/{nr} — Section-Update (1-8) POST /v1/dsfa/{id}/submit-for-review — Workflow: Einreichen POST /v1/dsfa/{id}/approve — Workflow: Genehmigen/Ablehnen GET /v1/dsfa/{id}/export — JSON-Export einer DSFA Phase 1 Step 4 refactor: handlers delegate to DSFAService (CRUD/stats/ audit/csv) and DSFAWorkflowService (status/section/submit/approve/export/ versions). Module-level helpers re-exported for legacy tests. """ import logging from typing import Any, Optional from fastapi import APIRouter, Depends, Query from pydantic import BaseModel from fastapi.responses import Response from sqlalchemy.orm import Session from classroom_engine.database import get_db from compliance.api._http_errors import translate_domain_errors from compliance.schemas.dsfa import ( DSFAApproveRequest, DSFACreate, DSFASectionUpdate, DSFAStatusUpdate, DSFAUpdate, ) from compliance.services.dsfa_service import ( DEFAULT_TENANT_ID, VALID_RISK_LEVELS, VALID_STATUSES, DSFAService, _dsfa_to_response, # re-exported for legacy test imports _get_tenant_id, # re-exported for legacy test imports ) from compliance.services.dsfa_workflow_service import ( SECTION_FIELD_MAP, # noqa: F401 — re-export DSFAWorkflowService, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/dsfa", tags=["compliance-dsfa"]) def get_dsfa_service(db: Session = Depends(get_db)) -> DSFAService: return DSFAService(db) # ============================================================================= # Pydantic Schemas # ============================================================================= class DSFACreate(BaseModel): title: str description: str = "" status: str = "draft" risk_level: str = "low" processing_activity: str = "" data_categories: List[str] = [] recipients: List[str] = [] measures: List[str] = [] created_by: str = "system" # Section 1 processing_description: Optional[str] = None processing_purpose: Optional[str] = None legal_basis: Optional[str] = None legal_basis_details: Optional[str] = None # Section 2 necessity_assessment: Optional[str] = None proportionality_assessment: Optional[str] = None data_minimization: Optional[str] = None alternatives_considered: Optional[str] = None retention_justification: Optional[str] = None # Section 3 involves_ai: Optional[bool] = None overall_risk_level: Optional[str] = None risk_score: Optional[int] = None # Section 6 dpo_consulted: Optional[bool] = None dpo_name: Optional[str] = None dpo_opinion: Optional[str] = None dpo_approved: Optional[bool] = None authority_consulted: Optional[bool] = None authority_reference: Optional[str] = None authority_decision: Optional[str] = None # Metadata version: Optional[int] = None conclusion: Optional[str] = None federal_state: Optional[str] = None authority_resource_id: Optional[str] = None submitted_by: Optional[str] = None # JSONB Arrays data_subjects: Optional[List[str]] = None affected_rights: Optional[List[str]] = None triggered_rule_codes: Optional[List[str]] = None ai_trigger_ids: Optional[List[str]] = None wp248_criteria_met: Optional[List[str]] = None art35_abs3_triggered: Optional[List[str]] = None tom_references: Optional[List[str]] = None risks: Optional[List[dict]] = None mitigations: Optional[List[dict]] = None stakeholder_consultations: Optional[List[dict]] = None review_triggers: Optional[List[dict]] = None review_comments: Optional[List[dict]] = None ai_use_case_modules: Optional[List[dict]] = None section_8_complete: Optional[bool] = None # JSONB Objects threshold_analysis: Optional[dict] = None consultation_requirement: Optional[dict] = None review_schedule: Optional[dict] = None section_progress: Optional[dict] = None metadata: Optional[dict] = None class DSFAUpdate(BaseModel): title: Optional[str] = None description: Optional[str] = None status: Optional[str] = None risk_level: Optional[str] = None processing_activity: Optional[str] = None data_categories: Optional[List[str]] = None recipients: Optional[List[str]] = None measures: Optional[List[str]] = None approved_by: Optional[str] = None # Section 1 processing_description: Optional[str] = None processing_purpose: Optional[str] = None legal_basis: Optional[str] = None legal_basis_details: Optional[str] = None # Section 2 necessity_assessment: Optional[str] = None proportionality_assessment: Optional[str] = None data_minimization: Optional[str] = None alternatives_considered: Optional[str] = None retention_justification: Optional[str] = None # Section 3 involves_ai: Optional[bool] = None overall_risk_level: Optional[str] = None risk_score: Optional[int] = None # Section 6 dpo_consulted: Optional[bool] = None dpo_name: Optional[str] = None dpo_opinion: Optional[str] = None dpo_approved: Optional[bool] = None authority_consulted: Optional[bool] = None authority_reference: Optional[str] = None authority_decision: Optional[str] = None # Metadata version: Optional[int] = None conclusion: Optional[str] = None federal_state: Optional[str] = None authority_resource_id: Optional[str] = None submitted_by: Optional[str] = None # JSONB Arrays data_subjects: Optional[List[str]] = None affected_rights: Optional[List[str]] = None triggered_rule_codes: Optional[List[str]] = None ai_trigger_ids: Optional[List[str]] = None wp248_criteria_met: Optional[List[str]] = None art35_abs3_triggered: Optional[List[str]] = None tom_references: Optional[List[str]] = None risks: Optional[List[dict]] = None mitigations: Optional[List[dict]] = None stakeholder_consultations: Optional[List[dict]] = None review_triggers: Optional[List[dict]] = None review_comments: Optional[List[dict]] = None ai_use_case_modules: Optional[List[dict]] = None section_8_complete: Optional[bool] = None # JSONB Objects threshold_analysis: Optional[dict] = None consultation_requirement: Optional[dict] = None review_schedule: Optional[dict] = None section_progress: Optional[dict] = None metadata: Optional[dict] = None class DSFAStatusUpdate(BaseModel): status: str approved_by: Optional[str] = None class DSFASectionUpdate(BaseModel): """Body for PUT /dsfa/{id}/sections/{section_number}.""" content: Optional[str] = None # Allow arbitrary extra fields so the frontend can send any section-specific data extra: Optional[dict] = None class DSFAApproveRequest(BaseModel): """Body for POST /dsfa/{id}/approve.""" approved: bool comments: Optional[str] = None approved_by: Optional[str] = None # ============================================================================= # Helpers # ============================================================================= def _get_tenant_id(tenant_id: Optional[str]) -> str: return tenant_id or DEFAULT_TENANT_ID def _dsfa_to_response(row) -> dict: """Convert a DB row to a JSON-serializable dict.""" import json # SQLAlchemy 2.0: Row objects need ._mapping for string-key access if hasattr(row, "_mapping"): row = row._mapping def _parse_arr(val): """Parse a JSONB array field → list.""" if val is None: return [] if isinstance(val, list): return val if isinstance(val, str): try: parsed = json.loads(val) return parsed if isinstance(parsed, list) else [] except Exception: return [] return val def _parse_obj(val): """Parse a JSONB object field → dict.""" if val is None: return {} if isinstance(val, dict): return val if isinstance(val, str): try: parsed = json.loads(val) return parsed if isinstance(parsed, dict) else {} except Exception: return {} return val def _ts(val): """Timestamp → ISO string or None.""" if not val: return None if isinstance(val, str): return val return val.isoformat() def _get(key, default=None): """Safe row access — returns default if key missing (handles old rows).""" try: v = row[key] return default if v is None and default is not None else v except (KeyError, IndexError): return default return { # Core fields (always present since Migration 024) "id": str(row["id"]), "tenant_id": row["tenant_id"], "title": row["title"], "description": row["description"] or "", "status": row["status"] or "draft", "risk_level": row["risk_level"] or "low", "processing_activity": row["processing_activity"] or "", "data_categories": _parse_arr(row["data_categories"]), "recipients": _parse_arr(row["recipients"]), "measures": _parse_arr(row["measures"]), "approved_by": row["approved_by"], "approved_at": _ts(row["approved_at"]), "created_by": row["created_by"] or "system", "created_at": _ts(row["created_at"]), "updated_at": _ts(row["updated_at"]), # Section 1 (Migration 030) "processing_description": _get("processing_description"), "processing_purpose": _get("processing_purpose"), "legal_basis": _get("legal_basis"), "legal_basis_details": _get("legal_basis_details"), # Section 2 "necessity_assessment": _get("necessity_assessment"), "proportionality_assessment": _get("proportionality_assessment"), "data_minimization": _get("data_minimization"), "alternatives_considered": _get("alternatives_considered"), "retention_justification": _get("retention_justification"), # Section 3 "involves_ai": _get("involves_ai", False), "overall_risk_level": _get("overall_risk_level"), "risk_score": _get("risk_score", 0), # Section 6 "dpo_consulted": _get("dpo_consulted", False), "dpo_consulted_at": _ts(_get("dpo_consulted_at")), "dpo_name": _get("dpo_name"), "dpo_opinion": _get("dpo_opinion"), "dpo_approved": _get("dpo_approved"), "authority_consulted": _get("authority_consulted", False), "authority_consulted_at": _ts(_get("authority_consulted_at")), "authority_reference": _get("authority_reference"), "authority_decision": _get("authority_decision"), # Metadata / Versioning "version": _get("version", 1), "previous_version_id": str(_get("previous_version_id")) if _get("previous_version_id") else None, "conclusion": _get("conclusion"), "federal_state": _get("federal_state"), "authority_resource_id": _get("authority_resource_id"), "submitted_for_review_at": _ts(_get("submitted_for_review_at")), "submitted_by": _get("submitted_by"), # JSONB Arrays "data_subjects": _parse_arr(_get("data_subjects")), "affected_rights": _parse_arr(_get("affected_rights")), "triggered_rule_codes": _parse_arr(_get("triggered_rule_codes")), "ai_trigger_ids": _parse_arr(_get("ai_trigger_ids")), "wp248_criteria_met": _parse_arr(_get("wp248_criteria_met")), "art35_abs3_triggered": _parse_arr(_get("art35_abs3_triggered")), "tom_references": _parse_arr(_get("tom_references")), "risks": _parse_arr(_get("risks")), "mitigations": _parse_arr(_get("mitigations")), "stakeholder_consultations": _parse_arr(_get("stakeholder_consultations")), "review_triggers": _parse_arr(_get("review_triggers")), "review_comments": _parse_arr(_get("review_comments")), # Section 8 / AI (Migration 028) "ai_use_case_modules": _parse_arr(_get("ai_use_case_modules")), "section_8_complete": _get("section_8_complete", False), # JSONB Objects "threshold_analysis": _parse_obj(_get("threshold_analysis")), "consultation_requirement": _parse_obj(_get("consultation_requirement")), "review_schedule": _parse_obj(_get("review_schedule")), "section_progress": _parse_obj(_get("section_progress")), "metadata": _parse_obj(_get("metadata")), } def _log_audit( db: Session, tenant_id: str, dsfa_id, action: str, changed_by: str = "system", old_values=None, new_values=None, ): import json db.execute( text(""" INSERT INTO compliance_dsfa_audit_log (tenant_id, dsfa_id, action, changed_by, old_values, new_values) VALUES (:tenant_id, :dsfa_id, :action, :changed_by, CAST(:old_values AS jsonb), CAST(:new_values AS jsonb)) """), { "tenant_id": tenant_id, "dsfa_id": str(dsfa_id) if dsfa_id else None, "action": action, "changed_by": changed_by, "old_values": json.dumps(old_values) if old_values else None, "new_values": json.dumps(new_values) if new_values else None, }, ) # ============================================================================= # Stats (must be before /{id} to avoid route conflict) # ============================================================================= @router.get("/stats") async def get_stats( tenant_id: Optional[str] = Query(None), service: DSFAService = Depends(get_dsfa_service), ) -> dict[str, Any]: """Zaehler nach Status und Risiko-Level.""" with translate_domain_errors(): return service.stats(tenant_id) # ============================================================================= # Audit Log (must be before /{id} to avoid route conflict) # ============================================================================= @router.get("/audit-log") async def get_audit_log( tenant_id: Optional[str] = Query(None), limit: int = Query(50, ge=1, le=500), offset: int = Query(0, ge=0), service: DSFAService = Depends(get_dsfa_service), ) -> list[dict[str, Any]]: """DSFA Audit-Trail.""" with translate_domain_errors(): return service.audit_log(tenant_id, limit, offset) # ============================================================================= # CSV Export (must be before /{id} to avoid route conflict) # ============================================================================= @router.get("/export/csv", name="export_dsfas_csv") async def export_dsfas_csv( tenant_id: Optional[str] = Query(None), service: DSFAService = Depends(get_dsfa_service), ) -> Response: """Export all DSFAs as CSV.""" with translate_domain_errors(): csv_content = service.export_csv(tenant_id) return Response( content=csv_content, media_type="text/csv", headers={ "Content-Disposition": "attachment; filename=dsfas_export.csv" }, ) # ============================================================================= # UCCA Integration Stubs (must be before /{id} to avoid route conflict) # ============================================================================= @router.post("/from-assessment/{assessment_id}", status_code=501) async def create_from_assessment( assessment_id: str, ) -> dict[str, str]: """Stub: Create DSFA from UCCA assessment.""" return { "detail": ( "Not implemented — requires cross-service " "integration with ai-compliance-sdk" ) } @router.get("/by-assessment/{assessment_id}", status_code=501) async def get_by_assessment( assessment_id: str, ) -> dict[str, str]: """Stub: Get DSFA by linked UCCA assessment ID.""" return { "detail": ( "Not implemented — requires cross-service " "integration with ai-compliance-sdk" ) } # ============================================================================= # List + Create # ============================================================================= @router.get("") async def list_dsfas( tenant_id: Optional[str] = Query(None), status: Optional[str] = Query(None), risk_level: Optional[str] = Query(None), skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=500), service: DSFAService = Depends(get_dsfa_service), ) -> list[dict[str, Any]]: """Liste aller DSFAs fuer einen Tenant.""" with translate_domain_errors(): return service.list_dsfas(tenant_id, status, risk_level, skip, limit) @router.post("", status_code=201) async def create_dsfa( request: DSFACreate, tenant_id: Optional[str] = Query(None), service: DSFAService = Depends(get_dsfa_service), ) -> dict[str, Any]: """Neue DSFA erstellen.""" import json if request.status not in VALID_STATUSES: raise HTTPException(status_code=422, detail=f"Ungültiger Status: {request.status}") if request.risk_level not in VALID_RISK_LEVELS: raise HTTPException(status_code=422, detail=f"Ungültiges Risiko-Level: {request.risk_level}") tid = _get_tenant_id(tenant_id) row = db.execute( text(""" INSERT INTO compliance_dsfas (tenant_id, title, description, status, risk_level, processing_activity, data_categories, recipients, measures, created_by) VALUES (:tenant_id, :title, :description, :status, :risk_level, :processing_activity, CAST(:data_categories AS jsonb), CAST(:recipients AS jsonb), CAST(:measures AS jsonb), :created_by) RETURNING * """), { "tenant_id": tid, "title": request.title, "description": request.description, "status": request.status, "risk_level": request.risk_level, "processing_activity": request.processing_activity, "data_categories": json.dumps(request.data_categories), "recipients": json.dumps(request.recipients), "measures": json.dumps(request.measures), "created_by": request.created_by, }, ).fetchone() db.flush() row_id = row._mapping["id"] if hasattr(row, "_mapping") else row[0] _log_audit( db, tid, row_id, "CREATE", request.created_by, new_values={"title": request.title, "status": request.status}, ) db.commit() return _dsfa_to_response(row) # ============================================================================= # Single Item (GET / PUT / DELETE / PATCH status) # ============================================================================= @router.get("/{dsfa_id}") async def get_dsfa( dsfa_id: str, tenant_id: Optional[str] = Query(None), service: DSFAService = Depends(get_dsfa_service), ) -> dict[str, Any]: """Einzelne DSFA abrufen.""" with translate_domain_errors(): return service.get(dsfa_id, tenant_id) @router.put("/{dsfa_id}") async def update_dsfa( dsfa_id: str, request: DSFAUpdate, tenant_id: Optional[str] = Query(None), service: DSFAService = Depends(get_dsfa_service), ) -> dict[str, Any]: """DSFA aktualisieren.""" with translate_domain_errors(): return service.update(dsfa_id, tenant_id, request) @router.delete("/{dsfa_id}") async def delete_dsfa( dsfa_id: str, tenant_id: Optional[str] = Query(None), service: DSFAService = Depends(get_dsfa_service), ) -> dict[str, Any]: """DSFA loeschen (Art. 17 DSGVO).""" with translate_domain_errors(): return service.delete(dsfa_id, tenant_id) @router.patch("/{dsfa_id}/status") async def update_dsfa_status( dsfa_id: str, request: DSFAStatusUpdate, tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> dict[str, Any]: """Schnell-Statuswechsel.""" with translate_domain_errors(): return wf.update_status(dsfa_id, tenant_id, request) # ============================================================================= # Section Update # ============================================================================= @router.put("/{dsfa_id}/sections/{section_number}") async def update_section( dsfa_id: str, section_number: int, request: DSFASectionUpdate, tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> dict[str, Any]: """Update a specific DSFA section (1-8).""" with translate_domain_errors(): return wf.update_section(dsfa_id, section_number, tenant_id, request) # ============================================================================= # Workflow: Submit for Review + Approve # ============================================================================= @router.post("/{dsfa_id}/submit-for-review") async def submit_for_review( dsfa_id: str, tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> dict[str, Any]: """Submit a DSFA for DPO review (draft -> in-review).""" with translate_domain_errors(): return wf.submit_for_review(dsfa_id, tenant_id) @router.post("/{dsfa_id}/approve") async def approve_dsfa( dsfa_id: str, request: DSFAApproveRequest, tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> dict[str, Any]: """Approve or reject a DSFA (DPO/CISO action).""" with translate_domain_errors(): return wf.approve(dsfa_id, tenant_id, request) # ============================================================================= # Export # ============================================================================= @router.get("/{dsfa_id}/export") async def export_dsfa_json( dsfa_id: str, format: str = Query("json"), tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> dict[str, Any]: """Export a single DSFA as JSON.""" with translate_domain_errors(): return wf.export_json(dsfa_id, tenant_id, format) # ============================================================================= # Versioning # ============================================================================= @router.get("/{dsfa_id}/versions") async def list_dsfa_versions( dsfa_id: str, tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> Any: """List all versions for a DSFA.""" with translate_domain_errors(): return wf.list_versions(dsfa_id, tenant_id) @router.get("/{dsfa_id}/versions/{version_number}") async def get_dsfa_version( dsfa_id: str, version_number: int, tenant_id: Optional[str] = Query(None), wf: DSFAWorkflowService = Depends(get_workflow_service), ) -> Any: """Get a specific DSFA version with full snapshot.""" with translate_domain_errors(): return wf.get_version(dsfa_id, version_number, tenant_id) # Legacy re-exports __all__ = [ "router", "DSFACreate", "DSFAUpdate", "DSFAStatusUpdate", "DSFASectionUpdate", "DSFAApproveRequest", "_dsfa_to_response", "_get_tenant_id", "DEFAULT_TENANT_ID", "VALID_STATUSES", "VALID_RISK_LEVELS", ]