Files
breakpilot-compliance/backend-compliance/compliance/api/dsfa_routes.py
Sharang Parnerkar c43d9da6d0 merge: sync with origin/main, take upstream on conflicts
# Conflicts:
#	admin-compliance/lib/sdk/types.ts
#	admin-compliance/lib/sdk/vendor-compliance/types.ts
2026-04-16 16:26:48 +02:00

687 lines
24 KiB
Python

"""
FastAPI routes for DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO).
Endpoints:
GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit)
POST /v1/dsfa — Neu erstellen -> 201
GET /v1/dsfa/stats — Zaehler nach Status
GET /v1/dsfa/audit-log — Audit-Log
GET /v1/dsfa/export/csv — CSV-Export aller DSFAs
POST /v1/dsfa/from-assessment/{id} — Stub: DSFA aus UCCA-Assessment
GET /v1/dsfa/by-assessment/{id} — Stub: DSFA nach Assessment-ID
GET /v1/dsfa/{id} — Detail
PUT /v1/dsfa/{id} — Update
DELETE /v1/dsfa/{id} — Loeschen (Art. 17 DSGVO)
PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel
PUT /v1/dsfa/{id}/sections/{nr} — Section-Update (1-8)
POST /v1/dsfa/{id}/submit-for-review — Workflow: Einreichen
POST /v1/dsfa/{id}/approve — Workflow: Genehmigen/Ablehnen
GET /v1/dsfa/{id}/export — JSON-Export einer DSFA
Phase 1 Step 4 refactor: handlers delegate to DSFAService (CRUD/stats/
audit/csv) and DSFAWorkflowService (status/section/submit/approve/export/
versions). Module-level helpers re-exported for legacy tests.
"""
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, Query
from fastapi.responses import Response
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.dsfa import (
DSFAApproveRequest,
DSFACreate,
DSFASectionUpdate,
DSFAStatusUpdate,
DSFAUpdate,
)
from compliance.services.dsfa_service import (
DEFAULT_TENANT_ID,
VALID_RISK_LEVELS,
VALID_STATUSES,
DSFAService,
_dsfa_to_response, # re-exported for legacy test imports
_get_tenant_id, # re-exported for legacy test imports
)
from compliance.services.dsfa_workflow_service import (
SECTION_FIELD_MAP, # noqa: F401 — re-export
DSFAWorkflowService,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dsfa", tags=["compliance-dsfa"])
def get_dsfa_service(db: Session = Depends(get_db)) -> DSFAService:
return DSFAService(db)
# =============================================================================
# Pydantic Schemas
# =============================================================================
class DSFACreate(BaseModel):
title: str
description: str = ""
status: str = "draft"
risk_level: str = "low"
processing_activity: str = ""
data_categories: List[str] = []
recipients: List[str] = []
measures: List[str] = []
created_by: str = "system"
# Section 1
processing_description: Optional[str] = None
processing_purpose: Optional[str] = None
legal_basis: Optional[str] = None
legal_basis_details: Optional[str] = None
# Section 2
necessity_assessment: Optional[str] = None
proportionality_assessment: Optional[str] = None
data_minimization: Optional[str] = None
alternatives_considered: Optional[str] = None
retention_justification: Optional[str] = None
# Section 3
involves_ai: Optional[bool] = None
overall_risk_level: Optional[str] = None
risk_score: Optional[int] = None
# Section 6
dpo_consulted: Optional[bool] = None
dpo_name: Optional[str] = None
dpo_opinion: Optional[str] = None
dpo_approved: Optional[bool] = None
authority_consulted: Optional[bool] = None
authority_reference: Optional[str] = None
authority_decision: Optional[str] = None
# Metadata
version: Optional[int] = None
conclusion: Optional[str] = None
federal_state: Optional[str] = None
authority_resource_id: Optional[str] = None
submitted_by: Optional[str] = None
# JSONB Arrays
data_subjects: Optional[List[str]] = None
affected_rights: Optional[List[str]] = None
triggered_rule_codes: Optional[List[str]] = None
ai_trigger_ids: Optional[List[str]] = None
wp248_criteria_met: Optional[List[str]] = None
art35_abs3_triggered: Optional[List[str]] = None
tom_references: Optional[List[str]] = None
risks: Optional[List[dict]] = None
mitigations: Optional[List[dict]] = None
stakeholder_consultations: Optional[List[dict]] = None
review_triggers: Optional[List[dict]] = None
review_comments: Optional[List[dict]] = None
ai_use_case_modules: Optional[List[dict]] = None
section_8_complete: Optional[bool] = None
# JSONB Objects
threshold_analysis: Optional[dict] = None
consultation_requirement: Optional[dict] = None
review_schedule: Optional[dict] = None
section_progress: Optional[dict] = None
metadata: Optional[dict] = None
class DSFAUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
risk_level: Optional[str] = None
processing_activity: Optional[str] = None
data_categories: Optional[List[str]] = None
recipients: Optional[List[str]] = None
measures: Optional[List[str]] = None
approved_by: Optional[str] = None
# Section 1
processing_description: Optional[str] = None
processing_purpose: Optional[str] = None
legal_basis: Optional[str] = None
legal_basis_details: Optional[str] = None
# Section 2
necessity_assessment: Optional[str] = None
proportionality_assessment: Optional[str] = None
data_minimization: Optional[str] = None
alternatives_considered: Optional[str] = None
retention_justification: Optional[str] = None
# Section 3
involves_ai: Optional[bool] = None
overall_risk_level: Optional[str] = None
risk_score: Optional[int] = None
# Section 6
dpo_consulted: Optional[bool] = None
dpo_name: Optional[str] = None
dpo_opinion: Optional[str] = None
dpo_approved: Optional[bool] = None
authority_consulted: Optional[bool] = None
authority_reference: Optional[str] = None
authority_decision: Optional[str] = None
# Metadata
version: Optional[int] = None
conclusion: Optional[str] = None
federal_state: Optional[str] = None
authority_resource_id: Optional[str] = None
submitted_by: Optional[str] = None
# JSONB Arrays
data_subjects: Optional[List[str]] = None
affected_rights: Optional[List[str]] = None
triggered_rule_codes: Optional[List[str]] = None
ai_trigger_ids: Optional[List[str]] = None
wp248_criteria_met: Optional[List[str]] = None
art35_abs3_triggered: Optional[List[str]] = None
tom_references: Optional[List[str]] = None
risks: Optional[List[dict]] = None
mitigations: Optional[List[dict]] = None
stakeholder_consultations: Optional[List[dict]] = None
review_triggers: Optional[List[dict]] = None
review_comments: Optional[List[dict]] = None
ai_use_case_modules: Optional[List[dict]] = None
section_8_complete: Optional[bool] = None
# JSONB Objects
threshold_analysis: Optional[dict] = None
consultation_requirement: Optional[dict] = None
review_schedule: Optional[dict] = None
section_progress: Optional[dict] = None
metadata: Optional[dict] = None
class DSFAStatusUpdate(BaseModel):
status: str
approved_by: Optional[str] = None
class DSFASectionUpdate(BaseModel):
"""Body for PUT /dsfa/{id}/sections/{section_number}."""
content: Optional[str] = None
# Allow arbitrary extra fields so the frontend can send any section-specific data
extra: Optional[dict] = None
class DSFAApproveRequest(BaseModel):
"""Body for POST /dsfa/{id}/approve."""
approved: bool
comments: Optional[str] = None
approved_by: Optional[str] = None
# =============================================================================
# Helpers
# =============================================================================
def _get_tenant_id(tenant_id: Optional[str]) -> str:
return tenant_id or DEFAULT_TENANT_ID
def _dsfa_to_response(row) -> dict:
"""Convert a DB row to a JSON-serializable dict."""
import json
# SQLAlchemy 2.0: Row objects need ._mapping for string-key access
if hasattr(row, "_mapping"):
row = row._mapping
def _parse_arr(val):
"""Parse a JSONB array field → list."""
if val is None:
return []
if isinstance(val, list):
return val
if isinstance(val, str):
try:
parsed = json.loads(val)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
return val
def _parse_obj(val):
"""Parse a JSONB object field → dict."""
if val is None:
return {}
if isinstance(val, dict):
return val
if isinstance(val, str):
try:
parsed = json.loads(val)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
return val
def _ts(val):
"""Timestamp → ISO string or None."""
if not val:
return None
if isinstance(val, str):
return val
return val.isoformat()
def _get(key, default=None):
"""Safe row access — returns default if key missing (handles old rows)."""
try:
v = row[key]
return default if v is None and default is not None else v
except (KeyError, IndexError):
return default
return {
# Core fields (always present since Migration 024)
"id": str(row["id"]),
"tenant_id": row["tenant_id"],
"title": row["title"],
"description": row["description"] or "",
"status": row["status"] or "draft",
"risk_level": row["risk_level"] or "low",
"processing_activity": row["processing_activity"] or "",
"data_categories": _parse_arr(row["data_categories"]),
"recipients": _parse_arr(row["recipients"]),
"measures": _parse_arr(row["measures"]),
"approved_by": row["approved_by"],
"approved_at": _ts(row["approved_at"]),
"created_by": row["created_by"] or "system",
"created_at": _ts(row["created_at"]),
"updated_at": _ts(row["updated_at"]),
# Section 1 (Migration 030)
"processing_description": _get("processing_description"),
"processing_purpose": _get("processing_purpose"),
"legal_basis": _get("legal_basis"),
"legal_basis_details": _get("legal_basis_details"),
# Section 2
"necessity_assessment": _get("necessity_assessment"),
"proportionality_assessment": _get("proportionality_assessment"),
"data_minimization": _get("data_minimization"),
"alternatives_considered": _get("alternatives_considered"),
"retention_justification": _get("retention_justification"),
# Section 3
"involves_ai": _get("involves_ai", False),
"overall_risk_level": _get("overall_risk_level"),
"risk_score": _get("risk_score", 0),
# Section 6
"dpo_consulted": _get("dpo_consulted", False),
"dpo_consulted_at": _ts(_get("dpo_consulted_at")),
"dpo_name": _get("dpo_name"),
"dpo_opinion": _get("dpo_opinion"),
"dpo_approved": _get("dpo_approved"),
"authority_consulted": _get("authority_consulted", False),
"authority_consulted_at": _ts(_get("authority_consulted_at")),
"authority_reference": _get("authority_reference"),
"authority_decision": _get("authority_decision"),
# Metadata / Versioning
"version": _get("version", 1),
"previous_version_id": str(_get("previous_version_id")) if _get("previous_version_id") else None,
"conclusion": _get("conclusion"),
"federal_state": _get("federal_state"),
"authority_resource_id": _get("authority_resource_id"),
"submitted_for_review_at": _ts(_get("submitted_for_review_at")),
"submitted_by": _get("submitted_by"),
# JSONB Arrays
"data_subjects": _parse_arr(_get("data_subjects")),
"affected_rights": _parse_arr(_get("affected_rights")),
"triggered_rule_codes": _parse_arr(_get("triggered_rule_codes")),
"ai_trigger_ids": _parse_arr(_get("ai_trigger_ids")),
"wp248_criteria_met": _parse_arr(_get("wp248_criteria_met")),
"art35_abs3_triggered": _parse_arr(_get("art35_abs3_triggered")),
"tom_references": _parse_arr(_get("tom_references")),
"risks": _parse_arr(_get("risks")),
"mitigations": _parse_arr(_get("mitigations")),
"stakeholder_consultations": _parse_arr(_get("stakeholder_consultations")),
"review_triggers": _parse_arr(_get("review_triggers")),
"review_comments": _parse_arr(_get("review_comments")),
# Section 8 / AI (Migration 028)
"ai_use_case_modules": _parse_arr(_get("ai_use_case_modules")),
"section_8_complete": _get("section_8_complete", False),
# JSONB Objects
"threshold_analysis": _parse_obj(_get("threshold_analysis")),
"consultation_requirement": _parse_obj(_get("consultation_requirement")),
"review_schedule": _parse_obj(_get("review_schedule")),
"section_progress": _parse_obj(_get("section_progress")),
"metadata": _parse_obj(_get("metadata")),
}
def _log_audit(
db: Session,
tenant_id: str,
dsfa_id,
action: str,
changed_by: str = "system",
old_values=None,
new_values=None,
):
import json
db.execute(
text("""
INSERT INTO compliance_dsfa_audit_log
(tenant_id, dsfa_id, action, changed_by, old_values, new_values)
VALUES
(:tenant_id, :dsfa_id, :action, :changed_by,
CAST(:old_values AS jsonb), CAST(:new_values AS jsonb))
"""),
{
"tenant_id": tenant_id,
"dsfa_id": str(dsfa_id) if dsfa_id else None,
"action": action,
"changed_by": changed_by,
"old_values": json.dumps(old_values) if old_values else None,
"new_values": json.dumps(new_values) if new_values else None,
},
)
# =============================================================================
# Stats (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/stats")
async def get_stats(
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Zaehler nach Status und Risiko-Level."""
with translate_domain_errors():
return service.stats(tenant_id)
# =============================================================================
# Audit Log (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/audit-log")
async def get_audit_log(
tenant_id: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
service: DSFAService = Depends(get_dsfa_service),
) -> list[dict[str, Any]]:
"""DSFA Audit-Trail."""
with translate_domain_errors():
return service.audit_log(tenant_id, limit, offset)
# =============================================================================
# CSV Export (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/export/csv", name="export_dsfas_csv")
async def export_dsfas_csv(
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> Response:
"""Export all DSFAs as CSV."""
with translate_domain_errors():
csv_content = service.export_csv(tenant_id)
return Response(
content=csv_content,
media_type="text/csv",
headers={
"Content-Disposition": "attachment; filename=dsfas_export.csv"
},
)
# =============================================================================
# UCCA Integration Stubs (must be before /{id} to avoid route conflict)
# =============================================================================
@router.post("/from-assessment/{assessment_id}", status_code=501)
async def create_from_assessment(
assessment_id: str,
) -> dict[str, str]:
"""Stub: Create DSFA from UCCA assessment."""
return {
"detail": (
"Not implemented — requires cross-service "
"integration with ai-compliance-sdk"
)
}
@router.get("/by-assessment/{assessment_id}", status_code=501)
async def get_by_assessment(
assessment_id: str,
) -> dict[str, str]:
"""Stub: Get DSFA by linked UCCA assessment ID."""
return {
"detail": (
"Not implemented — requires cross-service "
"integration with ai-compliance-sdk"
)
}
# =============================================================================
# List + Create
# =============================================================================
@router.get("")
async def list_dsfas(
tenant_id: Optional[str] = Query(None),
status: Optional[str] = Query(None),
risk_level: Optional[str] = Query(None),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=500),
service: DSFAService = Depends(get_dsfa_service),
) -> list[dict[str, Any]]:
"""Liste aller DSFAs fuer einen Tenant."""
with translate_domain_errors():
return service.list_dsfas(tenant_id, status, risk_level, skip, limit)
@router.post("", status_code=201)
async def create_dsfa(
request: DSFACreate,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Neue DSFA erstellen."""
import json
if request.status not in VALID_STATUSES:
raise HTTPException(status_code=422, detail=f"Ungültiger Status: {request.status}")
if request.risk_level not in VALID_RISK_LEVELS:
raise HTTPException(status_code=422, detail=f"Ungültiges Risiko-Level: {request.risk_level}")
tid = _get_tenant_id(tenant_id)
row = db.execute(
text("""
INSERT INTO compliance_dsfas
(tenant_id, title, description, status, risk_level,
processing_activity, data_categories, recipients, measures, created_by)
VALUES
(:tenant_id, :title, :description, :status, :risk_level,
:processing_activity,
CAST(:data_categories AS jsonb),
CAST(:recipients AS jsonb),
CAST(:measures AS jsonb),
:created_by)
RETURNING *
"""),
{
"tenant_id": tid,
"title": request.title,
"description": request.description,
"status": request.status,
"risk_level": request.risk_level,
"processing_activity": request.processing_activity,
"data_categories": json.dumps(request.data_categories),
"recipients": json.dumps(request.recipients),
"measures": json.dumps(request.measures),
"created_by": request.created_by,
},
).fetchone()
db.flush()
row_id = row._mapping["id"] if hasattr(row, "_mapping") else row[0]
_log_audit(
db, tid, row_id, "CREATE", request.created_by,
new_values={"title": request.title, "status": request.status},
)
db.commit()
return _dsfa_to_response(row)
# =============================================================================
# Single Item (GET / PUT / DELETE / PATCH status)
# =============================================================================
@router.get("/{dsfa_id}")
async def get_dsfa(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""Einzelne DSFA abrufen."""
with translate_domain_errors():
return service.get(dsfa_id, tenant_id)
@router.put("/{dsfa_id}")
async def update_dsfa(
dsfa_id: str,
request: DSFAUpdate,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""DSFA aktualisieren."""
with translate_domain_errors():
return service.update(dsfa_id, tenant_id, request)
@router.delete("/{dsfa_id}")
async def delete_dsfa(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
service: DSFAService = Depends(get_dsfa_service),
) -> dict[str, Any]:
"""DSFA loeschen (Art. 17 DSGVO)."""
with translate_domain_errors():
return service.delete(dsfa_id, tenant_id)
@router.patch("/{dsfa_id}/status")
async def update_dsfa_status(
dsfa_id: str,
request: DSFAStatusUpdate,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Schnell-Statuswechsel."""
with translate_domain_errors():
return wf.update_status(dsfa_id, tenant_id, request)
# =============================================================================
# Section Update
# =============================================================================
@router.put("/{dsfa_id}/sections/{section_number}")
async def update_section(
dsfa_id: str,
section_number: int,
request: DSFASectionUpdate,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Update a specific DSFA section (1-8)."""
with translate_domain_errors():
return wf.update_section(dsfa_id, section_number, tenant_id, request)
# =============================================================================
# Workflow: Submit for Review + Approve
# =============================================================================
@router.post("/{dsfa_id}/submit-for-review")
async def submit_for_review(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Submit a DSFA for DPO review (draft -> in-review)."""
with translate_domain_errors():
return wf.submit_for_review(dsfa_id, tenant_id)
@router.post("/{dsfa_id}/approve")
async def approve_dsfa(
dsfa_id: str,
request: DSFAApproveRequest,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Approve or reject a DSFA (DPO/CISO action)."""
with translate_domain_errors():
return wf.approve(dsfa_id, tenant_id, request)
# =============================================================================
# Export
# =============================================================================
@router.get("/{dsfa_id}/export")
async def export_dsfa_json(
dsfa_id: str,
format: str = Query("json"),
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> dict[str, Any]:
"""Export a single DSFA as JSON."""
with translate_domain_errors():
return wf.export_json(dsfa_id, tenant_id, format)
# =============================================================================
# Versioning
# =============================================================================
@router.get("/{dsfa_id}/versions")
async def list_dsfa_versions(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> Any:
"""List all versions for a DSFA."""
with translate_domain_errors():
return wf.list_versions(dsfa_id, tenant_id)
@router.get("/{dsfa_id}/versions/{version_number}")
async def get_dsfa_version(
dsfa_id: str,
version_number: int,
tenant_id: Optional[str] = Query(None),
wf: DSFAWorkflowService = Depends(get_workflow_service),
) -> Any:
"""Get a specific DSFA version with full snapshot."""
with translate_domain_errors():
return wf.get_version(dsfa_id, version_number, tenant_id)
# Legacy re-exports
__all__ = [
"router",
"DSFACreate",
"DSFAUpdate",
"DSFAStatusUpdate",
"DSFASectionUpdate",
"DSFAApproveRequest",
"_dsfa_to_response",
"_get_tenant_id",
"DEFAULT_TENANT_ID",
"VALID_STATUSES",
"VALID_RISK_LEVELS",
]