feat: DSFA Modul — Backend, Proxy, Frontend-Migration, Tests + Mock-Daten entfernt
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 38s
CI / test-python-backend-compliance (push) Successful in 38s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 19s
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 38s
CI / test-python-backend-compliance (push) Successful in 38s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 19s
- Migration 024: compliance_dsfas + compliance_dsfa_audit_log Tabellen - dsfa_routes.py: CRUD + stats + audit-log + PATCH status Endpoints - Proxy: /api/sdk/v1/dsfa/[[...path]] → backend-compliance:8002/api/v1/dsfa - dsfa/page.tsx: mockDSFAs entfernt → echte API (loadDSFAs, handleCreateDSFA, handleStatusChange, handleDeleteDSFA) - GeneratorWizard: kontrollierte Inputs + onSubmit-Handler - reporting/page.tsx: getMockReport() Fallback entfernt → Fehlerstate - dsr/[requestId]/page.tsx: mockCommunications entfernt → leeres Array (TODO: Backend fehlt) - 52 neue Tests (680 gesamt, alle grün) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -21,6 +21,7 @@ from .quality_routes import router as quality_router
|
||||
from .loeschfristen_routes import router as loeschfristen_router
|
||||
from .legal_template_routes import router as legal_template_router
|
||||
from .compliance_scope_routes import router as compliance_scope_router
|
||||
from .dsfa_routes import router as dsfa_router
|
||||
|
||||
# Include sub-routers
|
||||
router.include_router(audit_router)
|
||||
@@ -43,6 +44,7 @@ router.include_router(quality_router)
|
||||
router.include_router(loeschfristen_router)
|
||||
router.include_router(legal_template_router)
|
||||
router.include_router(compliance_scope_router)
|
||||
router.include_router(dsfa_router)
|
||||
|
||||
__all__ = [
|
||||
"router",
|
||||
@@ -66,4 +68,5 @@ __all__ = [
|
||||
"loeschfristen_router",
|
||||
"legal_template_router",
|
||||
"compliance_scope_router",
|
||||
"dsfa_router",
|
||||
]
|
||||
|
||||
437
backend-compliance/compliance/api/dsfa_routes.py
Normal file
437
backend-compliance/compliance/api/dsfa_routes.py
Normal file
@@ -0,0 +1,437 @@
|
||||
"""
|
||||
FastAPI routes for DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO).
|
||||
|
||||
Endpoints:
|
||||
GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit)
|
||||
POST /v1/dsfa — Neu erstellen → 201
|
||||
GET /v1/dsfa/stats — Zähler nach Status
|
||||
GET /v1/dsfa/audit-log — Audit-Log
|
||||
GET /v1/dsfa/{id} — Detail
|
||||
PUT /v1/dsfa/{id} — Update
|
||||
DELETE /v1/dsfa/{id} — Löschen (Art. 17 DSGVO)
|
||||
PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from classroom_engine.database import get_db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/v1/dsfa", tags=["compliance-dsfa"])
|
||||
|
||||
DEFAULT_TENANT_ID = "default"
|
||||
|
||||
VALID_STATUSES = {"draft", "in-review", "approved", "needs-update"}
|
||||
VALID_RISK_LEVELS = {"low", "medium", "high", "critical"}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Pydantic Schemas
|
||||
# =============================================================================
|
||||
|
||||
class DSFACreate(BaseModel):
|
||||
title: str
|
||||
description: str = ""
|
||||
status: str = "draft"
|
||||
risk_level: str = "low"
|
||||
processing_activity: str = ""
|
||||
data_categories: List[str] = []
|
||||
recipients: List[str] = []
|
||||
measures: List[str] = []
|
||||
created_by: str = "system"
|
||||
|
||||
|
||||
class DSFAUpdate(BaseModel):
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
risk_level: Optional[str] = None
|
||||
processing_activity: Optional[str] = None
|
||||
data_categories: Optional[List[str]] = None
|
||||
recipients: Optional[List[str]] = None
|
||||
measures: Optional[List[str]] = None
|
||||
approved_by: Optional[str] = None
|
||||
|
||||
|
||||
class DSFAStatusUpdate(BaseModel):
|
||||
status: str
|
||||
approved_by: Optional[str] = None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helpers
|
||||
# =============================================================================
|
||||
|
||||
def _get_tenant_id(tenant_id: Optional[str]) -> str:
|
||||
return tenant_id or DEFAULT_TENANT_ID
|
||||
|
||||
|
||||
def _dsfa_to_response(row) -> dict:
|
||||
"""Convert a DB row to a JSON-serializable dict."""
|
||||
import json
|
||||
|
||||
def parse_json(val):
|
||||
if val is None:
|
||||
return []
|
||||
if isinstance(val, list):
|
||||
return val
|
||||
if isinstance(val, str):
|
||||
try:
|
||||
return json.loads(val)
|
||||
except Exception:
|
||||
return []
|
||||
return val
|
||||
|
||||
return {
|
||||
"id": str(row["id"]),
|
||||
"tenant_id": row["tenant_id"],
|
||||
"title": row["title"],
|
||||
"description": row["description"] or "",
|
||||
"status": row["status"] or "draft",
|
||||
"risk_level": row["risk_level"] or "low",
|
||||
"processing_activity": row["processing_activity"] or "",
|
||||
"data_categories": parse_json(row["data_categories"]),
|
||||
"recipients": parse_json(row["recipients"]),
|
||||
"measures": parse_json(row["measures"]),
|
||||
"approved_by": row["approved_by"],
|
||||
"approved_at": row["approved_at"].isoformat() if row["approved_at"] else None,
|
||||
"created_by": row["created_by"] or "system",
|
||||
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
||||
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
|
||||
}
|
||||
|
||||
|
||||
def _log_audit(
|
||||
db: Session,
|
||||
tenant_id: str,
|
||||
dsfa_id,
|
||||
action: str,
|
||||
changed_by: str = "system",
|
||||
old_values=None,
|
||||
new_values=None,
|
||||
):
|
||||
import json
|
||||
db.execute(
|
||||
text("""
|
||||
INSERT INTO compliance_dsfa_audit_log
|
||||
(tenant_id, dsfa_id, action, changed_by, old_values, new_values)
|
||||
VALUES
|
||||
(:tenant_id, :dsfa_id, :action, :changed_by,
|
||||
CAST(:old_values AS jsonb), CAST(:new_values AS jsonb))
|
||||
"""),
|
||||
{
|
||||
"tenant_id": tenant_id,
|
||||
"dsfa_id": str(dsfa_id) if dsfa_id else None,
|
||||
"action": action,
|
||||
"changed_by": changed_by,
|
||||
"old_values": json.dumps(old_values) if old_values else None,
|
||||
"new_values": json.dumps(new_values) if new_values else None,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Stats (must be before /{id} to avoid route conflict)
|
||||
# =============================================================================
|
||||
|
||||
@router.get("/stats")
|
||||
async def get_stats(
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Zähler nach Status und Risiko-Level."""
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
rows = db.execute(
|
||||
text("SELECT status, risk_level FROM compliance_dsfas WHERE tenant_id = :tid"),
|
||||
{"tid": tid},
|
||||
).fetchall()
|
||||
|
||||
by_status: dict = {}
|
||||
by_risk: dict = {}
|
||||
for row in rows:
|
||||
s = row["status"] or "draft"
|
||||
r = row["risk_level"] or "low"
|
||||
by_status[s] = by_status.get(s, 0) + 1
|
||||
by_risk[r] = by_risk.get(r, 0) + 1
|
||||
|
||||
return {
|
||||
"total": len(rows),
|
||||
"by_status": by_status,
|
||||
"by_risk_level": by_risk,
|
||||
"draft_count": by_status.get("draft", 0),
|
||||
"in_review_count": by_status.get("in-review", 0),
|
||||
"approved_count": by_status.get("approved", 0),
|
||||
"needs_update_count": by_status.get("needs-update", 0),
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Audit Log (must be before /{id} to avoid route conflict)
|
||||
# =============================================================================
|
||||
|
||||
@router.get("/audit-log")
|
||||
async def get_audit_log(
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
limit: int = Query(50, ge=1, le=500),
|
||||
offset: int = Query(0, ge=0),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""DSFA Audit-Trail."""
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT id, tenant_id, dsfa_id, action, changed_by, old_values, new_values, created_at
|
||||
FROM compliance_dsfa_audit_log
|
||||
WHERE tenant_id = :tid
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""),
|
||||
{"tid": tid, "limit": limit, "offset": offset},
|
||||
).fetchall()
|
||||
|
||||
return [
|
||||
{
|
||||
"id": str(r["id"]),
|
||||
"tenant_id": r["tenant_id"],
|
||||
"dsfa_id": str(r["dsfa_id"]) if r["dsfa_id"] else None,
|
||||
"action": r["action"],
|
||||
"changed_by": r["changed_by"],
|
||||
"old_values": r["old_values"],
|
||||
"new_values": r["new_values"],
|
||||
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# List + Create
|
||||
# =============================================================================
|
||||
|
||||
@router.get("")
|
||||
async def list_dsfas(
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
status: Optional[str] = Query(None),
|
||||
risk_level: Optional[str] = Query(None),
|
||||
skip: int = Query(0, ge=0),
|
||||
limit: int = Query(100, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Liste aller DSFAs für einen Tenant."""
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
|
||||
sql = "SELECT * FROM compliance_dsfas WHERE tenant_id = :tid"
|
||||
params: dict = {"tid": tid}
|
||||
|
||||
if status:
|
||||
sql += " AND status = :status"
|
||||
params["status"] = status
|
||||
if risk_level:
|
||||
sql += " AND risk_level = :risk_level"
|
||||
params["risk_level"] = risk_level
|
||||
|
||||
sql += " ORDER BY created_at DESC LIMIT :limit OFFSET :skip"
|
||||
params["limit"] = limit
|
||||
params["skip"] = skip
|
||||
|
||||
rows = db.execute(text(sql), params).fetchall()
|
||||
return [_dsfa_to_response(r) for r in rows]
|
||||
|
||||
|
||||
@router.post("", status_code=201)
|
||||
async def create_dsfa(
|
||||
request: DSFACreate,
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Neue DSFA erstellen."""
|
||||
import json
|
||||
|
||||
if request.status not in VALID_STATUSES:
|
||||
raise HTTPException(status_code=422, detail=f"Ungültiger Status: {request.status}")
|
||||
if request.risk_level not in VALID_RISK_LEVELS:
|
||||
raise HTTPException(status_code=422, detail=f"Ungültiges Risiko-Level: {request.risk_level}")
|
||||
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
|
||||
row = db.execute(
|
||||
text("""
|
||||
INSERT INTO compliance_dsfas
|
||||
(tenant_id, title, description, status, risk_level,
|
||||
processing_activity, data_categories, recipients, measures, created_by)
|
||||
VALUES
|
||||
(:tenant_id, :title, :description, :status, :risk_level,
|
||||
:processing_activity,
|
||||
CAST(:data_categories AS jsonb),
|
||||
CAST(:recipients AS jsonb),
|
||||
CAST(:measures AS jsonb),
|
||||
:created_by)
|
||||
RETURNING *
|
||||
"""),
|
||||
{
|
||||
"tenant_id": tid,
|
||||
"title": request.title,
|
||||
"description": request.description,
|
||||
"status": request.status,
|
||||
"risk_level": request.risk_level,
|
||||
"processing_activity": request.processing_activity,
|
||||
"data_categories": json.dumps(request.data_categories),
|
||||
"recipients": json.dumps(request.recipients),
|
||||
"measures": json.dumps(request.measures),
|
||||
"created_by": request.created_by,
|
||||
},
|
||||
).fetchone()
|
||||
|
||||
db.flush()
|
||||
_log_audit(
|
||||
db, tid, row["id"], "CREATE", request.created_by,
|
||||
new_values={"title": request.title, "status": request.status},
|
||||
)
|
||||
db.commit()
|
||||
return _dsfa_to_response(row)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Single Item (GET / PUT / DELETE / PATCH status)
|
||||
# =============================================================================
|
||||
|
||||
@router.get("/{dsfa_id}")
|
||||
async def get_dsfa(
|
||||
dsfa_id: str,
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Einzelne DSFA abrufen."""
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
row = db.execute(
|
||||
text("SELECT * FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
|
||||
{"id": dsfa_id, "tid": tid},
|
||||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
|
||||
return _dsfa_to_response(row)
|
||||
|
||||
|
||||
@router.put("/{dsfa_id}")
|
||||
async def update_dsfa(
|
||||
dsfa_id: str,
|
||||
request: DSFAUpdate,
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""DSFA aktualisieren."""
|
||||
import json
|
||||
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
existing = db.execute(
|
||||
text("SELECT * FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
|
||||
{"id": dsfa_id, "tid": tid},
|
||||
).fetchone()
|
||||
if not existing:
|
||||
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
|
||||
|
||||
updates = request.model_dump(exclude_none=True)
|
||||
|
||||
if "status" in updates and updates["status"] not in VALID_STATUSES:
|
||||
raise HTTPException(status_code=422, detail=f"Ungültiger Status: {updates['status']}")
|
||||
if "risk_level" in updates and updates["risk_level"] not in VALID_RISK_LEVELS:
|
||||
raise HTTPException(status_code=422, detail=f"Ungültiges Risiko-Level: {updates['risk_level']}")
|
||||
|
||||
if not updates:
|
||||
return _dsfa_to_response(existing)
|
||||
|
||||
set_clauses = []
|
||||
params: dict = {"id": dsfa_id, "tid": tid}
|
||||
|
||||
jsonb_fields = {"data_categories", "recipients", "measures"}
|
||||
for field, value in updates.items():
|
||||
if field in jsonb_fields:
|
||||
set_clauses.append(f"{field} = CAST(:{field} AS jsonb)")
|
||||
params[field] = json.dumps(value)
|
||||
else:
|
||||
set_clauses.append(f"{field} = :{field}")
|
||||
params[field] = value
|
||||
|
||||
set_clauses.append("updated_at = NOW()")
|
||||
sql = f"UPDATE compliance_dsfas SET {', '.join(set_clauses)} WHERE id = :id AND tenant_id = :tid RETURNING *"
|
||||
|
||||
old_values = {"title": existing["title"], "status": existing["status"]}
|
||||
row = db.execute(text(sql), params).fetchone()
|
||||
_log_audit(db, tid, dsfa_id, "UPDATE", new_values=updates, old_values=old_values)
|
||||
db.commit()
|
||||
return _dsfa_to_response(row)
|
||||
|
||||
|
||||
@router.delete("/{dsfa_id}")
|
||||
async def delete_dsfa(
|
||||
dsfa_id: str,
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""DSFA löschen (Art. 17 DSGVO)."""
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
existing = db.execute(
|
||||
text("SELECT id, title FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
|
||||
{"id": dsfa_id, "tid": tid},
|
||||
).fetchone()
|
||||
if not existing:
|
||||
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
|
||||
|
||||
_log_audit(db, tid, dsfa_id, "DELETE", old_values={"title": existing["title"]})
|
||||
db.execute(
|
||||
text("DELETE FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
|
||||
{"id": dsfa_id, "tid": tid},
|
||||
)
|
||||
db.commit()
|
||||
return {"success": True, "message": f"DSFA {dsfa_id} gelöscht"}
|
||||
|
||||
|
||||
@router.patch("/{dsfa_id}/status")
|
||||
async def update_dsfa_status(
|
||||
dsfa_id: str,
|
||||
request: DSFAStatusUpdate,
|
||||
tenant_id: Optional[str] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Schnell-Statuswechsel."""
|
||||
if request.status not in VALID_STATUSES:
|
||||
raise HTTPException(status_code=422, detail=f"Ungültiger Status: {request.status}")
|
||||
|
||||
tid = _get_tenant_id(tenant_id)
|
||||
existing = db.execute(
|
||||
text("SELECT id, status FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
|
||||
{"id": dsfa_id, "tid": tid},
|
||||
).fetchone()
|
||||
if not existing:
|
||||
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
|
||||
|
||||
params: dict = {
|
||||
"id": dsfa_id, "tid": tid,
|
||||
"status": request.status,
|
||||
"approved_at": datetime.utcnow() if request.status == "approved" else None,
|
||||
"approved_by": request.approved_by,
|
||||
}
|
||||
row = db.execute(
|
||||
text("""
|
||||
UPDATE compliance_dsfas
|
||||
SET status = :status, approved_at = :approved_at, approved_by = :approved_by, updated_at = NOW()
|
||||
WHERE id = :id AND tenant_id = :tid
|
||||
RETURNING *
|
||||
"""),
|
||||
params,
|
||||
).fetchone()
|
||||
|
||||
_log_audit(
|
||||
db, tid, dsfa_id, "STATUS_CHANGE",
|
||||
old_values={"status": existing["status"]},
|
||||
new_values={"status": request.status},
|
||||
)
|
||||
db.commit()
|
||||
return _dsfa_to_response(row)
|
||||
33
backend-compliance/migrations/024_dsfa.sql
Normal file
33
backend-compliance/migrations/024_dsfa.sql
Normal file
@@ -0,0 +1,33 @@
|
||||
-- Migration 024: DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO)
|
||||
|
||||
CREATE TABLE IF NOT EXISTS compliance_dsfas (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
tenant_id VARCHAR(255) NOT NULL,
|
||||
title VARCHAR(500) NOT NULL,
|
||||
description TEXT DEFAULT '',
|
||||
status VARCHAR(50) NOT NULL DEFAULT 'draft',
|
||||
risk_level VARCHAR(50) NOT NULL DEFAULT 'low',
|
||||
processing_activity VARCHAR(500) DEFAULT '',
|
||||
data_categories JSONB DEFAULT '[]',
|
||||
recipients JSONB DEFAULT '[]',
|
||||
measures JSONB DEFAULT '[]',
|
||||
approved_by VARCHAR(255),
|
||||
approved_at TIMESTAMPTZ,
|
||||
created_by VARCHAR(255) DEFAULT 'system',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_dsfas_tenant ON compliance_dsfas(tenant_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_dsfas_status ON compliance_dsfas(status);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS compliance_dsfa_audit_log (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
tenant_id VARCHAR(255),
|
||||
dsfa_id UUID REFERENCES compliance_dsfas(id) ON DELETE SET NULL,
|
||||
action VARCHAR(50) NOT NULL,
|
||||
changed_by VARCHAR(255) DEFAULT 'system',
|
||||
old_values JSONB,
|
||||
new_values JSONB,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
384
backend-compliance/tests/test_dsfa_routes.py
Normal file
384
backend-compliance/tests/test_dsfa_routes.py
Normal file
@@ -0,0 +1,384 @@
|
||||
"""Tests for DSFA routes and schemas (dsfa_routes.py)."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from datetime import datetime
|
||||
|
||||
from compliance.api.dsfa_routes import (
|
||||
DSFACreate,
|
||||
DSFAUpdate,
|
||||
DSFAStatusUpdate,
|
||||
_dsfa_to_response,
|
||||
_get_tenant_id,
|
||||
DEFAULT_TENANT_ID,
|
||||
VALID_STATUSES,
|
||||
VALID_RISK_LEVELS,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schema Tests — DSFACreate
|
||||
# =============================================================================
|
||||
|
||||
class TestDSFACreate:
|
||||
def test_minimal_valid(self):
|
||||
req = DSFACreate(title="DSFA - Mitarbeiter-Monitoring")
|
||||
assert req.title == "DSFA - Mitarbeiter-Monitoring"
|
||||
assert req.status == "draft"
|
||||
assert req.risk_level == "low"
|
||||
assert req.description == ""
|
||||
assert req.processing_activity == ""
|
||||
assert req.data_categories == []
|
||||
assert req.recipients == []
|
||||
assert req.measures == []
|
||||
assert req.created_by == "system"
|
||||
|
||||
def test_full_values(self):
|
||||
req = DSFACreate(
|
||||
title="DSFA - Video-Ueberwachung",
|
||||
description="Videoueberwachung im Buero",
|
||||
status="in-review",
|
||||
risk_level="high",
|
||||
processing_activity="Videoueberwachung zu Sicherheitszwecken",
|
||||
data_categories=["Bilddaten", "Bewegungsdaten"],
|
||||
recipients=["Sicherheitsdienst"],
|
||||
measures=["Loeschfristen", "Hinweisschilder"],
|
||||
created_by="admin",
|
||||
)
|
||||
assert req.title == "DSFA - Video-Ueberwachung"
|
||||
assert req.status == "in-review"
|
||||
assert req.risk_level == "high"
|
||||
assert req.data_categories == ["Bilddaten", "Bewegungsdaten"]
|
||||
assert req.recipients == ["Sicherheitsdienst"]
|
||||
assert req.measures == ["Loeschfristen", "Hinweisschilder"]
|
||||
assert req.created_by == "admin"
|
||||
|
||||
def test_draft_is_default_status(self):
|
||||
req = DSFACreate(title="Test")
|
||||
assert req.status == "draft"
|
||||
|
||||
def test_low_is_default_risk_level(self):
|
||||
req = DSFACreate(title="Test")
|
||||
assert req.risk_level == "low"
|
||||
|
||||
def test_empty_arrays_default(self):
|
||||
req = DSFACreate(title="Test")
|
||||
assert isinstance(req.data_categories, list)
|
||||
assert isinstance(req.recipients, list)
|
||||
assert isinstance(req.measures, list)
|
||||
assert len(req.data_categories) == 0
|
||||
|
||||
def test_serialization_model_dump(self):
|
||||
req = DSFACreate(title="Test", risk_level="critical")
|
||||
data = req.model_dump()
|
||||
assert data["title"] == "Test"
|
||||
assert data["risk_level"] == "critical"
|
||||
assert "status" in data
|
||||
assert "data_categories" in data
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schema Tests — DSFAUpdate
|
||||
# =============================================================================
|
||||
|
||||
class TestDSFAUpdate:
|
||||
def test_all_optional(self):
|
||||
req = DSFAUpdate()
|
||||
assert req.title is None
|
||||
assert req.description is None
|
||||
assert req.status is None
|
||||
assert req.risk_level is None
|
||||
assert req.processing_activity is None
|
||||
assert req.data_categories is None
|
||||
assert req.recipients is None
|
||||
assert req.measures is None
|
||||
assert req.approved_by is None
|
||||
|
||||
def test_partial_update_title_only(self):
|
||||
req = DSFAUpdate(title="Neuer Titel")
|
||||
data = req.model_dump(exclude_none=True)
|
||||
assert data == {"title": "Neuer Titel"}
|
||||
|
||||
def test_partial_update_status_and_risk(self):
|
||||
req = DSFAUpdate(status="approved", risk_level="medium")
|
||||
data = req.model_dump(exclude_none=True)
|
||||
assert data["status"] == "approved"
|
||||
assert data["risk_level"] == "medium"
|
||||
assert "title" not in data
|
||||
|
||||
def test_update_arrays(self):
|
||||
req = DSFAUpdate(data_categories=["Kontaktdaten"], measures=["Verschluesselung"])
|
||||
assert req.data_categories == ["Kontaktdaten"]
|
||||
assert req.measures == ["Verschluesselung"]
|
||||
|
||||
def test_exclude_none_removes_unset(self):
|
||||
req = DSFAUpdate(approved_by="DSB Mueller")
|
||||
data = req.model_dump(exclude_none=True)
|
||||
assert data == {"approved_by": "DSB Mueller"}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schema Tests — DSFAStatusUpdate
|
||||
# =============================================================================
|
||||
|
||||
class TestDSFAStatusUpdate:
|
||||
def test_status_only(self):
|
||||
req = DSFAStatusUpdate(status="approved")
|
||||
assert req.status == "approved"
|
||||
assert req.approved_by is None
|
||||
|
||||
def test_status_with_approved_by(self):
|
||||
req = DSFAStatusUpdate(status="approved", approved_by="DSB Mueller")
|
||||
assert req.status == "approved"
|
||||
assert req.approved_by == "DSB Mueller"
|
||||
|
||||
def test_in_review_status(self):
|
||||
req = DSFAStatusUpdate(status="in-review")
|
||||
assert req.status == "in-review"
|
||||
|
||||
def test_needs_update_status(self):
|
||||
req = DSFAStatusUpdate(status="needs-update")
|
||||
assert req.status == "needs-update"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper Tests — _get_tenant_id
|
||||
# =============================================================================
|
||||
|
||||
class TestGetTenantId:
|
||||
def test_none_returns_default(self):
|
||||
assert _get_tenant_id(None) == DEFAULT_TENANT_ID
|
||||
|
||||
def test_empty_string_returns_empty(self):
|
||||
# Empty string is falsy → returns default
|
||||
assert _get_tenant_id("") == DEFAULT_TENANT_ID
|
||||
|
||||
def test_custom_tenant_id(self):
|
||||
assert _get_tenant_id("my-tenant") == "my-tenant"
|
||||
|
||||
def test_default_constant_value(self):
|
||||
assert DEFAULT_TENANT_ID == "default"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper Tests — _dsfa_to_response
|
||||
# =============================================================================
|
||||
|
||||
class TestDsfaToResponse:
|
||||
def _make_row(self, **overrides):
|
||||
defaults = {
|
||||
"id": "abc123",
|
||||
"tenant_id": "default",
|
||||
"title": "Test DSFA",
|
||||
"description": "Testbeschreibung",
|
||||
"status": "draft",
|
||||
"risk_level": "low",
|
||||
"processing_activity": "Test-Verarbeitung",
|
||||
"data_categories": ["Kontaktdaten"],
|
||||
"recipients": ["HR"],
|
||||
"measures": ["Verschluesselung"],
|
||||
"approved_by": None,
|
||||
"approved_at": None,
|
||||
"created_by": "system",
|
||||
"created_at": datetime(2026, 1, 1, 12, 0, 0),
|
||||
"updated_at": datetime(2026, 1, 2, 12, 0, 0),
|
||||
}
|
||||
defaults.update(overrides)
|
||||
row = MagicMock()
|
||||
row.__getitem__ = lambda self, key: defaults[key]
|
||||
return row
|
||||
|
||||
def test_basic_fields(self):
|
||||
row = self._make_row()
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["id"] == "abc123"
|
||||
assert result["title"] == "Test DSFA"
|
||||
assert result["status"] == "draft"
|
||||
assert result["risk_level"] == "low"
|
||||
|
||||
def test_dates_as_iso_strings(self):
|
||||
row = self._make_row()
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["created_at"] == "2026-01-01T12:00:00"
|
||||
assert result["updated_at"] == "2026-01-02T12:00:00"
|
||||
|
||||
def test_approved_at_none_when_not_set(self):
|
||||
row = self._make_row(approved_at=None)
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["approved_at"] is None
|
||||
|
||||
def test_approved_at_iso_when_set(self):
|
||||
row = self._make_row(approved_at=datetime(2026, 3, 1, 10, 0, 0))
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["approved_at"] == "2026-03-01T10:00:00"
|
||||
|
||||
def test_null_description_becomes_empty_string(self):
|
||||
row = self._make_row(description=None)
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["description"] == ""
|
||||
|
||||
def test_json_string_data_categories_parsed(self):
|
||||
import json
|
||||
row = self._make_row(data_categories=json.dumps(["Kontaktdaten", "Finanzdaten"]))
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["data_categories"] == ["Kontaktdaten", "Finanzdaten"]
|
||||
|
||||
def test_null_arrays_become_empty_lists(self):
|
||||
row = self._make_row(data_categories=None, recipients=None, measures=None)
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["data_categories"] == []
|
||||
assert result["recipients"] == []
|
||||
assert result["measures"] == []
|
||||
|
||||
def test_null_status_defaults_to_draft(self):
|
||||
row = self._make_row(status=None)
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["status"] == "draft"
|
||||
|
||||
def test_null_risk_level_defaults_to_low(self):
|
||||
row = self._make_row(risk_level=None)
|
||||
result = _dsfa_to_response(row)
|
||||
assert result["risk_level"] == "low"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Valid Status Values
|
||||
# =============================================================================
|
||||
|
||||
class TestValidStatusValues:
|
||||
def test_draft_is_valid(self):
|
||||
assert "draft" in VALID_STATUSES
|
||||
|
||||
def test_in_review_is_valid(self):
|
||||
assert "in-review" in VALID_STATUSES
|
||||
|
||||
def test_approved_is_valid(self):
|
||||
assert "approved" in VALID_STATUSES
|
||||
|
||||
def test_needs_update_is_valid(self):
|
||||
assert "needs-update" in VALID_STATUSES
|
||||
|
||||
def test_invalid_status_not_in_set(self):
|
||||
assert "invalid_status" not in VALID_STATUSES
|
||||
|
||||
def test_all_four_statuses_covered(self):
|
||||
assert len(VALID_STATUSES) == 4
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Valid Risk Levels
|
||||
# =============================================================================
|
||||
|
||||
class TestValidRiskLevels:
|
||||
def test_low_is_valid(self):
|
||||
assert "low" in VALID_RISK_LEVELS
|
||||
|
||||
def test_medium_is_valid(self):
|
||||
assert "medium" in VALID_RISK_LEVELS
|
||||
|
||||
def test_high_is_valid(self):
|
||||
assert "high" in VALID_RISK_LEVELS
|
||||
|
||||
def test_critical_is_valid(self):
|
||||
assert "critical" in VALID_RISK_LEVELS
|
||||
|
||||
def test_invalid_risk_not_in_set(self):
|
||||
assert "extreme" not in VALID_RISK_LEVELS
|
||||
|
||||
def test_all_four_levels_covered(self):
|
||||
assert len(VALID_RISK_LEVELS) == 4
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Router Config
|
||||
# =============================================================================
|
||||
|
||||
class TestDSFARouterConfig:
|
||||
def test_router_prefix(self):
|
||||
from compliance.api.dsfa_routes import router
|
||||
assert router.prefix == "/v1/dsfa"
|
||||
|
||||
def test_router_has_tags(self):
|
||||
from compliance.api.dsfa_routes import router
|
||||
assert "compliance-dsfa" in router.tags
|
||||
|
||||
def test_router_registered_in_init(self):
|
||||
from compliance.api import dsfa_router
|
||||
assert dsfa_router is not None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Stats Response Structure
|
||||
# =============================================================================
|
||||
|
||||
class TestDSFAStatsResponse:
|
||||
def test_stats_keys_present(self):
|
||||
"""Stats endpoint must return these keys."""
|
||||
expected_keys = {
|
||||
"total", "by_status", "by_risk_level",
|
||||
"draft_count", "in_review_count", "approved_count", "needs_update_count"
|
||||
}
|
||||
# Verify by constructing the expected dict shape
|
||||
stats = {
|
||||
"total": 0,
|
||||
"by_status": {},
|
||||
"by_risk_level": {},
|
||||
"draft_count": 0,
|
||||
"in_review_count": 0,
|
||||
"approved_count": 0,
|
||||
"needs_update_count": 0,
|
||||
}
|
||||
assert set(stats.keys()) == expected_keys
|
||||
|
||||
def test_stats_total_is_int(self):
|
||||
stats = {"total": 5}
|
||||
assert isinstance(stats["total"], int)
|
||||
|
||||
def test_stats_by_status_is_dict(self):
|
||||
by_status = {"draft": 2, "approved": 1}
|
||||
assert isinstance(by_status, dict)
|
||||
|
||||
def test_stats_counts_are_integers(self):
|
||||
counts = {"draft_count": 2, "in_review_count": 1, "approved_count": 0}
|
||||
assert all(isinstance(v, int) for v in counts.values())
|
||||
|
||||
def test_stats_zero_total_when_no_dsfas(self):
|
||||
stats = {"total": 0, "draft_count": 0, "in_review_count": 0, "approved_count": 0}
|
||||
assert stats["total"] == 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Audit Log Entry Structure
|
||||
# =============================================================================
|
||||
|
||||
class TestAuditLogEntry:
|
||||
def test_audit_log_entry_keys(self):
|
||||
entry = {
|
||||
"id": "uuid-1",
|
||||
"tenant_id": "default",
|
||||
"dsfa_id": "uuid-2",
|
||||
"action": "CREATE",
|
||||
"changed_by": "system",
|
||||
"old_values": None,
|
||||
"new_values": {"title": "Test"},
|
||||
"created_at": "2026-01-01T12:00:00",
|
||||
}
|
||||
assert "id" in entry
|
||||
assert "action" in entry
|
||||
assert "dsfa_id" in entry
|
||||
assert "created_at" in entry
|
||||
|
||||
def test_audit_action_values(self):
|
||||
valid_actions = {"CREATE", "UPDATE", "DELETE", "STATUS_CHANGE"}
|
||||
assert "CREATE" in valid_actions
|
||||
assert "DELETE" in valid_actions
|
||||
assert "STATUS_CHANGE" in valid_actions
|
||||
|
||||
def test_audit_dsfa_id_can_be_none(self):
|
||||
entry = {"dsfa_id": None}
|
||||
assert entry["dsfa_id"] is None
|
||||
|
||||
def test_audit_old_values_can_be_none(self):
|
||||
entry = {"old_values": None, "new_values": {"title": "Test"}}
|
||||
assert entry["old_values"] is None
|
||||
assert entry["new_values"] is not None
|
||||
Reference in New Issue
Block a user