Files
breakpilot-compliance/backend-compliance/compliance/api/dsfa_routes.py
Benjamin Admin 8f2cc3b93b
Some checks failed
Build + Deploy / build-admin-compliance (push) Successful in 11s
Build + Deploy / build-backend-compliance (push) Successful in 14s
Build + Deploy / build-ai-sdk (push) Successful in 14s
Build + Deploy / build-developer-portal (push) Successful in 14s
Build + Deploy / build-tts (push) Successful in 12s
Build + Deploy / build-document-crawler (push) Successful in 13s
Build + Deploy / build-dsms-gateway (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 21s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m21s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 39s
CI / test-python-backend (push) Successful in 34s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 19s
CI / validate-canonical-controls (push) Successful in 12s
Build + Deploy / trigger-orca (push) Successful in 1m56s
fix: EvidenceService Import + get_workflow_service Factory
evidence_routes: fehlender EvidenceService Import
dsfa_routes: fehlende get_workflow_service Dependency-Factory

Erwartet: 41/41 sub-routers (vorher 39/41)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 23:01:44 +02:00

693 lines
24 KiB
Python

"""
FastAPI routes for DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO).
Endpoints:
GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit)
POST /v1/dsfa — Neu erstellen -> 201
GET /v1/dsfa/stats — Zaehler nach Status
GET /v1/dsfa/audit-log — Audit-Log
GET /v1/dsfa/export/csv — CSV-Export aller DSFAs
POST /v1/dsfa/from-assessment/{id} — Stub: DSFA aus UCCA-Assessment
GET /v1/dsfa/by-assessment/{id} — Stub: DSFA nach Assessment-ID
GET /v1/dsfa/{id} — Detail
PUT /v1/dsfa/{id} — Update
DELETE /v1/dsfa/{id} — Loeschen (Art. 17 DSGVO)
PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel
PUT /v1/dsfa/{id}/sections/{nr} — Section-Update (1-8)
POST /v1/dsfa/{id}/submit-for-review — Workflow: Einreichen
POST /v1/dsfa/{id}/approve — Workflow: Genehmigen/Ablehnen
GET /v1/dsfa/{id}/export — JSON-Export einer DSFA
Phase 1 Step 4 refactor: handlers delegate to DSFAService (CRUD/stats/
audit/csv) and DSFAWorkflowService (status/section/submit/approve/export/
versions). Module-level helpers re-exported for legacy tests.
"""
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from fastapi.responses import Response
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.dsfa import (
DSFAApproveRequest,
DSFACreate,
DSFASectionUpdate,
DSFAStatusUpdate,
DSFAUpdate,
)
from compliance.services.dsfa_service import (
DEFAULT_TENANT_ID,
VALID_RISK_LEVELS,
VALID_STATUSES,
DSFAService,
_dsfa_to_response, # re-exported for legacy test imports
_get_tenant_id, # re-exported for legacy test imports
)
from compliance.services.dsfa_workflow_service import (
SECTION_FIELD_MAP, # noqa: F401 — re-export
DSFAWorkflowService,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dsfa", tags=["compliance-dsfa"])
def get_workflow_service(db: Session = Depends(get_db)):
"""FastAPI dependency for DSFAWorkflowService."""
return DSFAWorkflowService(db)
def get_dsfa_service(db: Session = Depends(get_db)) -> DSFAService:
return DSFAService(db)
# =============================================================================
# Pydantic Schemas
# =============================================================================
class DSFACreate(BaseModel):
title: str
description: str = ""
status: str = "draft"
risk_level: str = "low"
processing_activity: str = ""
data_categories: List[str] = []
recipients: List[str] = []
measures: List[str] = []
created_by: str = "system"
# Section 1
processing_description: Optional[str] = None
processing_purpose: Optional[str] = None
legal_basis: Optional[str] = None
legal_basis_details: Optional[str] = None
# Section 2
necessity_assessment: Optional[str] = None
proportionality_assessment: Optional[str] = None
data_minimization: Optional[str] = None
alternatives_considered: Optional[str] = None
retention_justification: Optional[str] = None
# Section 3
involves_ai: Optional[bool] = None
overall_risk_level: Optional[str] = None
risk_score: Optional[int] = None
# Section 6
dpo_consulted: Optional[bool] = None
dpo_name: Optional[str] = None
dpo_opinion: Optional[str] = None
dpo_approved: Optional[bool] = None
authority_consulted: Optional[bool] = None
authority_reference: Optional[str] = None
authority_decision: Optional[str] = None
# Metadata
version: Optional[int] = None
conclusion: Optional[str] = None
federal_state: Optional[str] = None
authority_resource_id: Optional[str] = None
submitted_by: Optional[str] = None
# JSONB Arrays
data_subjects: Optional[List[str]] = None
affected_rights: Optional[List[str]] = None
triggered_rule_codes: Optional[List[str]] = None
ai_trigger_ids: Optional[List[str]] = None
wp248_criteria_met: Optional[List[str]] = None
art35_abs3_triggered: Optional[List[str]] = None
tom_references: Optional[List[str]] = None
risks: Optional[List[dict]] = None
mitigations: Optional[List[dict]] = None
stakeholder_consultations: Optional[List[dict]] = None
review_triggers: Optional[List[dict]] = None
review_comments: Optional[List[dict]] = None
ai_use_case_modules: Optional[List[dict]] = None
section_8_complete: Optional[bool] = None
# JSONB Objects
threshold_analysis: Optional[dict] = None
consultation_requirement: Optional[dict] = None
review_schedule: Optional[dict] = None
section_progress: Optional[dict] = None
metadata: Optional[dict] = None
class DSFAUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
risk_level: Optional[str] = None
processing_activity: Optional[str] = None
data_categories: Optional[List[str]] = None
recipients: Optional[List[str]] = None
measures: Optional[List[str]] = None
approved_by: Optional[str] = None
# Section 1
processing_description: Optional[str] = None
processing_purpose: Optional[str] = None
legal_basis: Optional[str] = None
legal_basis_details: Optional[str] = None
# Section 2
necessity_assessment: Optional[str] = None
proportionality_assessment: Optional[str] = None
data_minimization: Optional[str] = None
alternatives_considered: Optional[str] = None
retention_justification: Optional[str] = None
# Section 3
involves_ai: Optional[bool] = None
overall_risk_level: Optional[str] = None
risk_score: Optional[int] = None
# Section 6
dpo_consulted: Optional[bool] = None
dpo_name: Optional[str] = None
dpo_opinion: Optional[str] = None
dpo_approved: Optional[bool] = None
authority_consulted: Optional[bool] = None
authority_reference: Optional[str] = None
authority_decision: Optional[str] = None
# Metadata
version: Optional[int] = None
conclusion: Optional[str] = None
federal_state: Optional[str] = None
authority_resource_id: Optional[str] = None
submitted_by: Optional[str] = None
# JSONB Arrays
data_subjects: Optional[List[str]] = None
affected_rights: Optional[List[str]] = None
triggered_rule_codes: Optional[List[str]] = None
ai_trigger_ids: Optional[List[str]] = None
wp248_criteria_met: Optional[List[str]] = None
art35_abs3_triggered: Optional[List[str]] = None
tom_references: Optional[List[str]] = None
risks: Optional[List[dict]] = None
mitigations: Optional[List[dict]] = None
stakeholder_consultations: Optional[List[dict]] = None
review_triggers: Optional[List[dict]] = None
review_comments: Optional[List[dict]] = None
ai_use_case_modules: Optional[List[dict]] = None
section_8_complete: Optional[bool] = None
# JSONB Objects
threshold_analysis: Optional[dict] = None
consultation_requirement: Optional[dict] = None
review_schedule: Optional[dict] = None
section_progress: Optional[dict] = None
metadata: Optional[dict] = None
class DSFAStatusUpdate(BaseModel):
status: str
approved_by: Optional[str] = None
class DSFASectionUpdate(BaseModel):
"""Body for PUT /dsfa/{id}/sections/{section_number}."""
content: Optional[str] = None
# Allow arbitrary extra fields so the frontend can send any section-specific data
extra: Optional[dict] = None
class DSFAApproveRequest(BaseModel):
"""Body for POST /dsfa/{id}/approve."""
approved: bool
comments: Optional[str] = None
approved_by: Optional[str] = None
# =============================================================================
# Helpers
# =============================================================================
def _get_tenant_id(tenant_id: Optional[str]) -> str:
return tenant_id or DEFAULT_TENANT_ID
def _dsfa_to_response(row) -> dict:
"""Convert a DB row to a JSON-serializable dict."""
import json
# SQLAlchemy 2.0: Row objects need ._mapping for string-key access
if hasattr(row, "_mapping"):
row = row._mapping
def _parse_arr(val):
"""Parse a JSONB array field → list."""
if val is None:
return []
if isinstance(val, list):
return val
if isinstance(val, str):
try:
parsed = json.loads(val)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
return val
def _parse_obj(val):
"""Parse a JSONB object field → dict."""
if val is None:
return {}
if isinstance(val, dict):
return val
if isinstance(val, str):
try:
parsed = json.loads(val)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
return val
def _ts(val):
"""Timestamp → ISO string or None."""
if not val:
return None
if isinstance(val, str):
return val
return val.isoformat()
def _get(key, default=None):
"""Safe row access — returns default if key missing (handles old rows)."""
try:
v = row[key]
return default if v is None and default is not None else v
except (KeyError, IndexError):
return default
return {
# Core fields (always present since Migration 024)
"id": str(row["id"]),
"tenant_id": row["tenant_id"],
"title": row["title"],
"description": row["description"] or "",
"status": row["status"] or "draft",
"risk_level": row["risk_level"] or "low",
"processing_activity": row["processing_activity"] or "",
"data_categories": _parse_arr(row["data_categories"]),
"recipients": _parse_arr(row["recipients"]),
"measures": _parse_arr(row["measures"]),
"approved_by": row["approved_by"],
"approved_at": _ts(row["approved_at"]),
"created_by": row["created_by"] or "system",
"created_at": _ts(row["created_at"]),
"updated_at": _ts(row["updated_at"]),
# Section 1 (Migration 030)
"processing_description": _get("processing_description"),
"processing_purpose": _get("processing_purpose"),
"legal_basis": _get("legal_basis"),
"legal_basis_details": _get("legal_basis_details"),
# Section 2
"necessity_assessment": _get("necessity_assessment"),
"proportionality_assessment": _get("proportionality_assessment"),
"data_minimization": _get("data_minimization"),
"alternatives_considered": _get("alternatives_considered"),
"retention_justification": _get("retention_justification"),
# Section 3
"involves_ai": _get("involves_ai", False),
"overall_risk_level": _get("overall_risk_level"),
"risk_score": _get("risk_score", 0),
# Section 6
"dpo_consulted": _get("dpo_consulted", False),
"dpo_consulted_at": _ts(_get("dpo_consulted_at")),
"dpo_name": _get("dpo_name"),
"dpo_opinion": _get("dpo_opinion"),
"dpo_approved": _get("dpo_approved"),
"authority_consulted": _get("authority_consulted", False),
"authority_consulted_at": _ts(_get("authority_consulted_at")),
"authority_reference": _get("authority_reference"),
"authority_decision": _get("authority_decision"),
# Metadata / Versioning
"version": _get("version", 1),
"previous_version_id": str(_get("previous_version_id")) if _get("previous_version_id") else None,
"conclusion": _get("conclusion"),
"federal_state": _get("federal_state"),
"authority_resource_id": _get("authority_resource_id"),
"submitted_for_review_at": _ts(_get("submitted_for_review_at")),
"submitted_by": _get("submitted_by"),
# JSONB Arrays
"data_subjects": _parse_arr(_get("data_subjects")),
"affected_rights": _parse_arr(_get("affected_rights")),
"triggered_rule_codes": _parse_arr(_get("triggered_rule_codes")),
"ai_trigger_ids": _parse_arr(_get("ai_trigger_ids")),
"wp248_criteria_met": _parse_arr(_get("wp248_criteria_met")),
"art35_abs3_triggered": _parse_arr(_get("art35_abs3_triggered")),
"tom_references": _parse_arr(_get("tom_references")),
"risks": _parse_arr(_get("risks")),
"mitigations": _parse_arr(_get("mitigations")),
"stakeholder_consultations": _parse_arr(_get("stakeholder_consultations")),
"review_triggers": _parse_arr(_get("review_triggers")),
"review_comments": _parse_arr(_get("review_comments")),
# Section 8 / AI (Migration 028)
"ai_use_case_modules": _parse_arr(_get("ai_use_case_modules")),
"section_8_complete": _get("section_8_complete", False),
# JSONB Objects
"threshold_analysis": _parse_obj(_get("threshold_analysis")),
"consultation_requirement": _parse_obj(_get("consultation_requirement")),
"review_schedule": _parse_obj(_get("review_schedule")),
"section_progress": _parse_obj(_get("section_progress")),
"metadata": _parse_obj(_get("metadata")),
}
def _log_audit(
db: Session,
tenant_id: str,
dsfa_id,
action: str,
changed_by: str = "system",
old_values=None,
new_values=None,
):
import json
db.execute(
text("""
INSERT INTO compliance_dsfa_audit_log
(tenant_id, dsfa_id, action, changed_by, old_values, new_values)
VALUES
(:tenant_id, :dsfa_id, :action, :changed_by,
CAST(:old_values AS jsonb), CAST(:new_values AS jsonb))
"""),
{
"tenant_id": tenant_id,
"dsfa_id": str(dsfa_id) if dsfa_id else None,
"action": action,
"changed_by": changed_by,
"old_values": json.dumps(old_values) if old_values else None,
"new_values": json.dumps(new_values) if new_values else None,
},
)
# =============================================================================
# Stats (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/stats")
async def get_stats(
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Zaehler nach Status und Risiko-Level."""
with translate_domain_errors():
return service.stats(tenant_id)
# =============================================================================
# Audit Log (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/audit-log")
async def get_audit_log(
tenant_id: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
service: DSFAService = Depends(get_dsfa_service),
) -> list[dict[str, Any]]:
"""DSFA Audit-Trail."""
with translate_domain_errors():
return service.audit_log(tenant_id, limit, offset)
# =============================================================================
# CSV Export (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/export/csv", name="export_dsfas_csv")
async def export_dsfas_csv(
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> Response:
"""Export all DSFAs as CSV."""
with translate_domain_errors():
csv_content = service.export_csv(tenant_id)
return Response(
content=csv_content,
media_type="text/csv",
headers={
"Content-Disposition": "attachment; filename=dsfas_export.csv"
},
)
# =============================================================================
# UCCA Integration Stubs (must be before /{id} to avoid route conflict)
# =============================================================================
@router.post("/from-assessment/{assessment_id}", status_code=501)
async def create_from_assessment(
assessment_id: str,
) -> dict[str, str]:
"""Stub: Create DSFA from UCCA assessment."""
return {
"detail": (
"Not implemented — requires cross-service "
"integration with ai-compliance-sdk"
)
}
@router.get("/by-assessment/{assessment_id}", status_code=501)
async def get_by_assessment(
assessment_id: str,
) -> dict[str, str]:
"""Stub: Get DSFA by linked UCCA assessment ID."""
return {
"detail": (
"Not implemented — requires cross-service "
"integration with ai-compliance-sdk"
)
}
# =============================================================================
# List + Create
# =============================================================================
@router.get("")
async def list_dsfas(
tenant_id: Optional[str] = Query(None),
status: Optional[str] = Query(None),
risk_level: Optional[str] = Query(None),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=500),
service: DSFAService = Depends(get_dsfa_service),
) -> list[dict[str, Any]]:
"""Liste aller DSFAs fuer einen Tenant."""
with translate_domain_errors():
return service.list_dsfas(tenant_id, status, risk_level, skip, limit)
@router.post("", status_code=201)
async def create_dsfa(
request: DSFACreate,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Neue DSFA erstellen."""
import json
if request.status not in VALID_STATUSES:
raise HTTPException(status_code=422, detail=f"Ungültiger Status: {request.status}")
if request.risk_level not in VALID_RISK_LEVELS:
raise HTTPException(status_code=422, detail=f"Ungültiges Risiko-Level: {request.risk_level}")
tid = _get_tenant_id(tenant_id)
row = db.execute(
text("""
INSERT INTO compliance_dsfas
(tenant_id, title, description, status, risk_level,
processing_activity, data_categories, recipients, measures, created_by)
VALUES
(:tenant_id, :title, :description, :status, :risk_level,
:processing_activity,
CAST(:data_categories AS jsonb),
CAST(:recipients AS jsonb),
CAST(:measures AS jsonb),
:created_by)
RETURNING *
"""),
{
"tenant_id": tid,
"title": request.title,
"description": request.description,
"status": request.status,
"risk_level": request.risk_level,
"processing_activity": request.processing_activity,
"data_categories": json.dumps(request.data_categories),
"recipients": json.dumps(request.recipients),
"measures": json.dumps(request.measures),
"created_by": request.created_by,
},
).fetchone()
db.flush()
row_id = row._mapping["id"] if hasattr(row, "_mapping") else row[0]
_log_audit(
db, tid, row_id, "CREATE", request.created_by,
new_values={"title": request.title, "status": request.status},
)
db.commit()
return _dsfa_to_response(row)
# =============================================================================
# Single Item (GET / PUT / DELETE / PATCH status)
# =============================================================================
@router.get("/{dsfa_id}")
async def get_dsfa(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Einzelne DSFA abrufen."""
with translate_domain_errors():
return service.get(dsfa_id, tenant_id)
@router.put("/{dsfa_id}")
async def update_dsfa(
dsfa_id: str,
request: DSFAUpdate,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""DSFA aktualisieren."""
with translate_domain_errors():
return service.update(dsfa_id, tenant_id, request)
@router.delete("/{dsfa_id}")
async def delete_dsfa(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""DSFA loeschen (Art. 17 DSGVO)."""
with translate_domain_errors():
return service.delete(dsfa_id, tenant_id)
@router.patch("/{dsfa_id}/status")
async def update_dsfa_status(
dsfa_id: str,
request: DSFAStatusUpdate,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Schnell-Statuswechsel."""
with translate_domain_errors():
return wf.update_status(dsfa_id, tenant_id, request)
# =============================================================================
# Section Update
# =============================================================================
@router.put("/{dsfa_id}/sections/{section_number}")
async def update_section(
dsfa_id: str,
section_number: int,
request: DSFASectionUpdate,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Update a specific DSFA section (1-8)."""
with translate_domain_errors():
return wf.update_section(dsfa_id, section_number, tenant_id, request)
# =============================================================================
# Workflow: Submit for Review + Approve
# =============================================================================
@router.post("/{dsfa_id}/submit-for-review")
async def submit_for_review(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Submit a DSFA for DPO review (draft -> in-review)."""
with translate_domain_errors():
return wf.submit_for_review(dsfa_id, tenant_id)
@router.post("/{dsfa_id}/approve")
async def approve_dsfa(
dsfa_id: str,
request: DSFAApproveRequest,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Approve or reject a DSFA (DPO/CISO action)."""
with translate_domain_errors():
return wf.approve(dsfa_id, tenant_id, request)
# =============================================================================
# Export
# =============================================================================
@router.get("/{dsfa_id}/export")
async def export_dsfa_json(
dsfa_id: str,
format: str = Query("json"),
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Export a single DSFA as JSON."""
with translate_domain_errors():
return wf.export_json(dsfa_id, tenant_id, format)
# =============================================================================
# Versioning
# =============================================================================
@router.get("/{dsfa_id}/versions")
async def list_dsfa_versions(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> Any:
"""List all versions for a DSFA."""
with translate_domain_errors():
return wf.list_versions(dsfa_id, tenant_id)
@router.get("/{dsfa_id}/versions/{version_number}")
async def get_dsfa_version(
dsfa_id: str,
version_number: int,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> Any:
"""Get a specific DSFA version with full snapshot."""
with translate_domain_errors():
return wf.get_version(dsfa_id, version_number, tenant_id)
# Legacy re-exports
__all__ = [
"router",
"DSFACreate",
"DSFAUpdate",
"DSFAStatusUpdate",
"DSFASectionUpdate",
"DSFAApproveRequest",
"_dsfa_to_response",
"_get_tenant_id",
"DEFAULT_TENANT_ID",
"VALID_STATUSES",
"VALID_RISK_LEVELS",
]