From 6a940344c278dfeace2703956f204f57fb1c6869 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Fri, 6 Mar 2026 19:41:12 +0100 Subject: [PATCH] feat(dsfa): Go DSFA deprecated, URL-Fix, fehlende Endpoints + 145 Tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Go: DEPRECATED-Kommentare an allen 6 DSFA-Handlern + Route-Block - api.ts: URL-Fix /dsgvo/dsfas → /dsfa (Detail-Seite war komplett kaputt) - Python: Section-Update, Workflow (submit/approve), Export (JSON+CSV), UCCA-Stubs - Tests: 145/145 bestanden (Schema + Route-Integration mit TestClient+SQLite) Co-Authored-By: Claude Opus 4.6 --- admin-compliance/lib/sdk/dsfa/api.ts | 26 +- ai-compliance-sdk/cmd/server/main.go | 2 + .../internal/api/handlers/dsgvo_handlers.go | 9 + .../compliance/api/dsfa_routes.py | 302 ++++++- backend-compliance/tests/test_dsfa_routes.py | 763 ++++++++++++++++-- 5 files changed, 1005 insertions(+), 97 deletions(-) diff --git a/admin-compliance/lib/sdk/dsfa/api.ts b/admin-compliance/lib/sdk/dsfa/api.ts index 88d0b6b..bff9564 100644 --- a/admin-compliance/lib/sdk/dsfa/api.ts +++ b/admin-compliance/lib/sdk/dsfa/api.ts @@ -64,7 +64,7 @@ function getHeaders(): HeadersInit { * List all DSFAs for the current tenant */ export async function listDSFAs(status?: string): Promise { - const url = new URL(`${getBaseUrl()}/dsgvo/dsfas`, window.location.origin) + const url = new URL(`${getBaseUrl()}/dsfa`, window.location.origin) if (status) { url.searchParams.set('status', status) } @@ -83,7 +83,7 @@ export async function listDSFAs(status?: string): Promise { * Get a single DSFA by ID */ export async function getDSFA(id: string): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}`, { method: 'GET', headers: getHeaders(), credentials: 'include', @@ -96,7 +96,7 @@ export async function getDSFA(id: string): Promise { * Create a new DSFA */ export async function createDSFA(data: CreateDSFARequest): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas`, { + const response = await fetch(`${getBaseUrl()}/dsfa`, { method: 'POST', headers: getHeaders(), credentials: 'include', @@ -110,7 +110,7 @@ export async function createDSFA(data: CreateDSFARequest): Promise { * Update an existing DSFA */ export async function updateDSFA(id: string, data: Partial): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}`, { method: 'PUT', headers: getHeaders(), credentials: 'include', @@ -124,7 +124,7 @@ export async function updateDSFA(id: string, data: Partial): Promise * Delete a DSFA */ export async function deleteDSFA(id: string): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}`, { method: 'DELETE', headers: getHeaders(), credentials: 'include', @@ -147,7 +147,7 @@ export async function updateDSFASection( sectionNumber: number, data: UpdateDSFASectionRequest ): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/sections/${sectionNumber}`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}/sections/${sectionNumber}`, { method: 'PUT', headers: getHeaders(), credentials: 'include', @@ -165,7 +165,7 @@ export async function updateDSFASection( * Submit a DSFA for DPO review */ export async function submitDSFAForReview(id: string): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/submit-for-review`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}/submit-for-review`, { method: 'POST', headers: getHeaders(), credentials: 'include', @@ -178,7 +178,7 @@ export async function submitDSFAForReview(id: string): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/approve`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}/approve`, { method: 'POST', headers: getHeaders(), credentials: 'include', @@ -196,7 +196,7 @@ export async function approveDSFA(id: string, data: ApproveDSFARequest): Promise * Get DSFA statistics for the dashboard */ export async function getDSFAStats(): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/stats`, { + const response = await fetch(`${getBaseUrl()}/dsfa/stats`, { method: 'GET', headers: getHeaders(), credentials: 'include', @@ -216,7 +216,7 @@ export async function createDSFAFromAssessment( assessmentId: string, data?: CreateDSFAFromAssessmentRequest ): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/from-assessment/${assessmentId}`, { + const response = await fetch(`${getBaseUrl()}/dsfa/from-assessment/${assessmentId}`, { method: 'POST', headers: getHeaders(), credentials: 'include', @@ -231,7 +231,7 @@ export async function createDSFAFromAssessment( */ export async function getDSFAByAssessment(assessmentId: string): Promise { try { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/by-assessment/${assessmentId}`, { + const response = await fetch(`${getBaseUrl()}/dsfa/by-assessment/${assessmentId}`, { method: 'GET', headers: getHeaders(), credentials: 'include', @@ -269,7 +269,7 @@ export async function checkDSFARequired(assessmentId: string): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/export?format=json`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}/export?format=json`, { method: 'GET', headers: { 'Accept': 'application/json', @@ -288,7 +288,7 @@ export async function exportDSFAAsJSON(id: string): Promise { * Export a DSFA as PDF */ export async function exportDSFAAsPDF(id: string): Promise { - const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/export/pdf`, { + const response = await fetch(`${getBaseUrl()}/dsfa/${id}/export/pdf`, { method: 'GET', headers: { 'Accept': 'application/pdf', diff --git a/ai-compliance-sdk/cmd/server/main.go b/ai-compliance-sdk/cmd/server/main.go index 14efdb3..476c843 100644 --- a/ai-compliance-sdk/cmd/server/main.go +++ b/ai-compliance-sdk/cmd/server/main.go @@ -291,6 +291,8 @@ func main() { } // DSFA - Datenschutz-Folgenabschätzung (Art. 35) + // DEPRECATED: DSFA migrated to backend-compliance (Python/FastAPI). + // Use backend-compliance /api/compliance/dsfa/* instead. dsfa := dsgvoRoutes.Group("/dsfa") { dsfa.GET("", dsgvoHandlers.ListDSFAs) diff --git a/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go b/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go index 180cba3..1464e78 100644 --- a/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go +++ b/ai-compliance-sdk/internal/api/handlers/dsgvo_handlers.go @@ -397,9 +397,13 @@ func (h *DSGVOHandlers) GetStats(c *gin.Context) { // ============================================================================ // DSFA - Datenschutz-Folgenabschätzung +// DEPRECATED: DSFA endpoints migrated to backend-compliance (Python/FastAPI). +// These in-memory Go handlers are kept for backwards compatibility only. +// Use backend-compliance /api/compliance/dsfa/* instead. // ============================================================================ // ListDSFAs returns all DSFAs for a tenant +// DEPRECATED: Use backend-compliance GET /api/compliance/dsfa func (h *DSGVOHandlers) ListDSFAs(c *gin.Context) { tenantID := rbac.GetTenantID(c) if tenantID == uuid.Nil { @@ -419,6 +423,7 @@ func (h *DSGVOHandlers) ListDSFAs(c *gin.Context) { } // GetDSFA returns a DSFA by ID +// DEPRECATED: Use backend-compliance GET /api/compliance/dsfa/{id} func (h *DSGVOHandlers) GetDSFA(c *gin.Context) { id, err := uuid.Parse(c.Param("id")) if err != nil { @@ -440,6 +445,7 @@ func (h *DSGVOHandlers) GetDSFA(c *gin.Context) { } // CreateDSFA creates a new DSFA +// DEPRECATED: Use backend-compliance POST /api/compliance/dsfa func (h *DSGVOHandlers) CreateDSFA(c *gin.Context) { tenantID := rbac.GetTenantID(c) userID := rbac.GetUserID(c) @@ -469,6 +475,7 @@ func (h *DSGVOHandlers) CreateDSFA(c *gin.Context) { } // UpdateDSFA updates a DSFA +// DEPRECATED: Use backend-compliance PUT /api/compliance/dsfa/{id} func (h *DSGVOHandlers) UpdateDSFA(c *gin.Context) { id, err := uuid.Parse(c.Param("id")) if err != nil { @@ -492,6 +499,7 @@ func (h *DSGVOHandlers) UpdateDSFA(c *gin.Context) { } // DeleteDSFA deletes a DSFA +// DEPRECATED: Use backend-compliance DELETE /api/compliance/dsfa/{id} func (h *DSGVOHandlers) DeleteDSFA(c *gin.Context) { id, err := uuid.Parse(c.Param("id")) if err != nil { @@ -677,6 +685,7 @@ func (h *DSGVOHandlers) ExportDSR(c *gin.Context) { } // ExportDSFA exports a DSFA as JSON +// DEPRECATED: Use backend-compliance GET /api/compliance/dsfa/{id}/export?format=json func (h *DSGVOHandlers) ExportDSFA(c *gin.Context) { id, err := uuid.Parse(c.Param("id")) if err != nil { diff --git a/backend-compliance/compliance/api/dsfa_routes.py b/backend-compliance/compliance/api/dsfa_routes.py index 999ec26..09ba80d 100644 --- a/backend-compliance/compliance/api/dsfa_routes.py +++ b/backend-compliance/compliance/api/dsfa_routes.py @@ -2,14 +2,21 @@ FastAPI routes for DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO). Endpoints: - GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit) - POST /v1/dsfa — Neu erstellen → 201 - GET /v1/dsfa/stats — Zähler nach Status - GET /v1/dsfa/audit-log — Audit-Log - GET /v1/dsfa/{id} — Detail - PUT /v1/dsfa/{id} — Update - DELETE /v1/dsfa/{id} — Löschen (Art. 17 DSGVO) - PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel + GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit) + POST /v1/dsfa — Neu erstellen → 201 + GET /v1/dsfa/stats — Zähler nach Status + GET /v1/dsfa/audit-log — Audit-Log + GET /v1/dsfa/export/csv — CSV-Export aller DSFAs + POST /v1/dsfa/from-assessment/{id} — Stub: DSFA aus UCCA-Assessment + GET /v1/dsfa/by-assessment/{id} — Stub: DSFA nach Assessment-ID + GET /v1/dsfa/{id} — Detail + PUT /v1/dsfa/{id} — Update + DELETE /v1/dsfa/{id} — Löschen (Art. 17 DSGVO) + PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel + PUT /v1/dsfa/{id}/sections/{nr} — Section-Update (1-8) + POST /v1/dsfa/{id}/submit-for-review — Workflow: Einreichen + POST /v1/dsfa/{id}/approve — Workflow: Genehmigen/Ablehnen + GET /v1/dsfa/{id}/export — JSON-Export einer DSFA """ import logging @@ -165,6 +172,20 @@ class DSFAStatusUpdate(BaseModel): approved_by: Optional[str] = None +class DSFASectionUpdate(BaseModel): + """Body for PUT /dsfa/{id}/sections/{section_number}.""" + content: Optional[str] = None + # Allow arbitrary extra fields so the frontend can send any section-specific data + extra: Optional[dict] = None + + +class DSFAApproveRequest(BaseModel): + """Body for POST /dsfa/{id}/approve.""" + approved: bool + comments: Optional[str] = None + approved_by: Optional[str] = None + + # ============================================================================= # Helpers # ============================================================================= @@ -207,7 +228,11 @@ def _dsfa_to_response(row) -> dict: def _ts(val): """Timestamp → ISO string or None.""" - return val.isoformat() if val else None + if not val: + return None + if isinstance(val, str): + return val + return val.isoformat() def _get(key, default=None): """Safe row access — returns default if key missing (handles old rows).""" @@ -389,12 +414,68 @@ async def get_audit_log( "changed_by": r["changed_by"], "old_values": r["old_values"], "new_values": r["new_values"], - "created_at": r["created_at"].isoformat() if r["created_at"] else None, + "created_at": r["created_at"] if isinstance(r["created_at"], str) else (r["created_at"].isoformat() if r["created_at"] else None), } for r in rows ] +# ============================================================================= +# CSV Export (must be before /{id} to avoid route conflict) +# ============================================================================= + +@router.get("/export/csv", name="export_dsfas_csv") +async def export_dsfas_csv( + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Export all DSFAs as CSV.""" + import csv + import io + + tid = _get_tenant_id(tenant_id) + rows = db.execute( + text("SELECT * FROM compliance_dsfas WHERE tenant_id = :tid ORDER BY created_at DESC"), + {"tid": tid}, + ).fetchall() + + output = io.StringIO() + writer = csv.writer(output, delimiter=";") + writer.writerow(["ID", "Titel", "Status", "Risiko-Level", "Erstellt", "Aktualisiert"]) + for r in rows: + writer.writerow([ + str(r["id"]), + r["title"], + r["status"] or "draft", + r["risk_level"] or "low", + r["created_at"] if isinstance(r["created_at"], str) else (r["created_at"].isoformat() if r["created_at"] else ""), + r["updated_at"] if isinstance(r["updated_at"], str) else (r["updated_at"].isoformat() if r["updated_at"] else ""), + ]) + + from fastapi.responses import Response + return Response( + content=output.getvalue(), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=dsfas_export.csv"}, + ) + + +# ============================================================================= +# UCCA Integration Stubs (must be before /{id} to avoid route conflict) +# ============================================================================= + +@router.post("/from-assessment/{assessment_id}", status_code=501) +async def create_from_assessment(assessment_id: str): + """Stub: Create DSFA from UCCA assessment. Requires cross-service communication.""" + return {"detail": "Not implemented — requires cross-service integration with ai-compliance-sdk"} + + +@router.get("/by-assessment/{assessment_id}", status_code=501) +async def get_by_assessment(assessment_id: str): + """Stub: Get DSFA by linked UCCA assessment ID.""" + return {"detail": "Not implemented — requires cross-service integration with ai-compliance-sdk"} + + # ============================================================================= # List + Create # ============================================================================= @@ -627,3 +708,204 @@ async def update_dsfa_status( ) db.commit() return _dsfa_to_response(row) + + +# ============================================================================= +# Section Update +# ============================================================================= + +SECTION_FIELD_MAP = { + 1: "processing_description", + 2: "necessity_assessment", + 3: "risk_assessment", # maps to overall_risk_level + risk_score + 4: "stakeholder_consultations", # JSONB + 5: "measures", # JSONB array + 6: "dpo_opinion", # consultation section + 7: "conclusion", # documentation / conclusion + 8: "ai_use_case_modules", # JSONB array – Section 8 KI +} + + +@router.put("/{dsfa_id}/sections/{section_number}") +async def update_section( + dsfa_id: str, + section_number: int, + request: DSFASectionUpdate, + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Update a specific DSFA section (1-8).""" + import json + + if section_number < 1 or section_number > 8: + raise HTTPException(status_code=422, detail=f"Section must be 1-8, got {section_number}") + + tid = _get_tenant_id(tenant_id) + existing = db.execute( + text("SELECT * FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"), + {"id": dsfa_id, "tid": tid}, + ).fetchone() + if not existing: + raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden") + + field = SECTION_FIELD_MAP[section_number] + jsonb_sections = {4, 5, 8} + + params: dict = {"id": dsfa_id, "tid": tid} + + if section_number in jsonb_sections: + value = request.extra if request.extra is not None else ([] if section_number != 4 else []) + params["val"] = json.dumps(value) + set_clause = f"{field} = CAST(:val AS jsonb)" + else: + params["val"] = request.content or "" + set_clause = f"{field} = :val" + + # Also update section_progress + progress = existing["section_progress"] if existing["section_progress"] else {} + if isinstance(progress, str): + progress = json.loads(progress) + progress[f"section_{section_number}"] = True + params["progress"] = json.dumps(progress) + + row = db.execute( + text(f""" + UPDATE compliance_dsfas + SET {set_clause}, section_progress = CAST(:progress AS jsonb), updated_at = NOW() + WHERE id = :id AND tenant_id = :tid + RETURNING * + """), + params, + ).fetchone() + + _log_audit(db, tid, dsfa_id, "SECTION_UPDATE", new_values={"section": section_number, "field": field}) + db.commit() + return _dsfa_to_response(row) + + +# ============================================================================= +# Workflow: Submit for Review + Approve +# ============================================================================= + +@router.post("/{dsfa_id}/submit-for-review") +async def submit_for_review( + dsfa_id: str, + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Submit a DSFA for DPO review (draft → in-review).""" + tid = _get_tenant_id(tenant_id) + existing = db.execute( + text("SELECT id, status FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"), + {"id": dsfa_id, "tid": tid}, + ).fetchone() + if not existing: + raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden") + + if existing["status"] not in ("draft", "needs-update"): + raise HTTPException( + status_code=422, + detail=f"Kann nur aus Status 'draft' oder 'needs-update' eingereicht werden, aktuell: {existing['status']}", + ) + + row = db.execute( + text(""" + UPDATE compliance_dsfas + SET status = 'in-review', submitted_for_review_at = NOW(), updated_at = NOW() + WHERE id = :id AND tenant_id = :tid + RETURNING * + """), + {"id": dsfa_id, "tid": tid}, + ).fetchone() + + _log_audit( + db, tid, dsfa_id, "SUBMIT_FOR_REVIEW", + old_values={"status": existing["status"]}, + new_values={"status": "in-review"}, + ) + db.commit() + return {"message": "DSFA zur Prüfung eingereicht", "status": "in-review", "dsfa": _dsfa_to_response(row)} + + +@router.post("/{dsfa_id}/approve") +async def approve_dsfa( + dsfa_id: str, + request: DSFAApproveRequest, + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Approve or reject a DSFA (DPO/CISO action).""" + tid = _get_tenant_id(tenant_id) + existing = db.execute( + text("SELECT id, status FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"), + {"id": dsfa_id, "tid": tid}, + ).fetchone() + if not existing: + raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden") + + if existing["status"] != "in-review": + raise HTTPException( + status_code=422, + detail=f"Nur DSFAs im Status 'in-review' können genehmigt werden, aktuell: {existing['status']}", + ) + + if request.approved: + new_status = "approved" + row = db.execute( + text(""" + UPDATE compliance_dsfas + SET status = 'approved', approved_by = :approved_by, approved_at = NOW(), updated_at = NOW() + WHERE id = :id AND tenant_id = :tid + RETURNING * + """), + {"id": dsfa_id, "tid": tid, "approved_by": request.approved_by or "system"}, + ).fetchone() + else: + new_status = "needs-update" + row = db.execute( + text(""" + UPDATE compliance_dsfas + SET status = 'needs-update', updated_at = NOW() + WHERE id = :id AND tenant_id = :tid + RETURNING * + """), + {"id": dsfa_id, "tid": tid}, + ).fetchone() + + _log_audit( + db, tid, dsfa_id, "APPROVE" if request.approved else "REJECT", + old_values={"status": existing["status"]}, + new_values={"status": new_status, "comments": request.comments}, + ) + db.commit() + return {"message": f"DSFA {'genehmigt' if request.approved else 'zurückgewiesen'}", "status": new_status} + + +# ============================================================================= +# Export +# ============================================================================= + +@router.get("/{dsfa_id}/export") +async def export_dsfa_json( + dsfa_id: str, + format: str = Query("json"), + tenant_id: Optional[str] = Query(None), + db: Session = Depends(get_db), +): + """Export a single DSFA as JSON.""" + tid = _get_tenant_id(tenant_id) + row = db.execute( + text("SELECT * FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"), + {"id": dsfa_id, "tid": tid}, + ).fetchone() + if not row: + raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden") + + dsfa_data = _dsfa_to_response(row) + return { + "exported_at": datetime.utcnow().isoformat(), + "format": format, + "dsfa": dsfa_data, + } + + diff --git a/backend-compliance/tests/test_dsfa_routes.py b/backend-compliance/tests/test_dsfa_routes.py index 9fd0a37..535c997 100644 --- a/backend-compliance/tests/test_dsfa_routes.py +++ b/backend-compliance/tests/test_dsfa_routes.py @@ -1,23 +1,261 @@ -"""Tests for DSFA routes and schemas (dsfa_routes.py).""" +"""Tests for DSFA routes and schemas (dsfa_routes.py). + +Includes: + - Schema/Pydantic tests (DSFACreate, DSFAUpdate, DSFAStatusUpdate) + - Helper tests (_dsfa_to_response, _get_tenant_id) + - Route integration tests (TestClient + SQLite) +""" import pytest -from unittest.mock import MagicMock, patch +import uuid +import os +import sys from datetime import datetime +from unittest.mock import MagicMock +from fastapi import FastAPI +from fastapi.testclient import TestClient +from sqlalchemy import create_engine, text, event # noqa: F401 +from sqlalchemy.orm import sessionmaker + +# Ensure backend dir is on path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from classroom_engine.database import get_db from compliance.api.dsfa_routes import ( DSFACreate, DSFAUpdate, DSFAStatusUpdate, + DSFASectionUpdate, + DSFAApproveRequest, _dsfa_to_response, _get_tenant_id, DEFAULT_TENANT_ID, VALID_STATUSES, VALID_RISK_LEVELS, + router as dsfa_router, ) import json as _json +# ============================================================================= +# Test App + SQLite Setup +# ============================================================================= + +SQLALCHEMY_DATABASE_URL = "sqlite:///./test_dsfa.db" +engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) +_RawSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +@event.listens_for(engine, "connect") +def _register_sqlite_functions(dbapi_conn, connection_record): + """Register PostgreSQL-compatible functions for SQLite.""" + dbapi_conn.create_function("NOW", 0, lambda: datetime.utcnow().isoformat()) + +TENANT_ID = "default" + + +class _DictRow(dict): + """Dict wrapper that mimics PostgreSQL's dict-like row access for SQLite.""" + pass + + +class _DictSession: + """Wrapper around SQLAlchemy Session that returns dict-like rows. + + Production code uses row["column_name"] which works with PostgreSQL/psycopg2 + but not with SQLAlchemy 2.0's Row objects on SQLite. This wrapper converts + all result rows to dicts so the raw-SQL routes work in tests. + + Also rewrites CAST(:param AS jsonb) → :param for SQLite compatibility. + PostgreSQL CAST AS jsonb works, but SQLite CAST to unknown type yields 0. + """ + def __init__(self, session): + self._session = session + + def execute(self, stmt, params=None): + import re + # Rewrite CAST(:param AS jsonb) → :param for SQLite + if hasattr(stmt, 'text'): + rewritten = re.sub(r'CAST\((:[\w]+)\s+AS\s+jsonb\)', r'\1', stmt.text) + if rewritten != stmt.text: + stmt = text(rewritten) + result = self._session.execute(stmt, params) + return _DictResult(result) + + def flush(self): + self._session.flush() + + def commit(self): + self._session.commit() + + def rollback(self): + self._session.rollback() + + def close(self): + self._session.close() + + +class _DictResult: + """Wraps SQLAlchemy Result to return dict rows.""" + def __init__(self, result): + self._result = result + try: + self._keys = list(result.keys()) + self._returns_rows = True + except Exception: + self._keys = [] + self._returns_rows = False + + def fetchone(self): + if not self._returns_rows: + return None + row = self._result.fetchone() + if row is None: + return None + return _DictRow(zip(self._keys, row)) + + def fetchall(self): + if not self._returns_rows: + return [] + rows = self._result.fetchall() + return [_DictRow(zip(self._keys, r)) for r in rows] + + @property + def rowcount(self): + return self._result.rowcount + + +app = FastAPI() +app.include_router(dsfa_router, prefix="/api/compliance") + + +def override_get_db(): + session = _RawSessionLocal() + db = _DictSession(session) + try: + yield db + finally: + db.close() + + +app.dependency_overrides[get_db] = override_get_db +client = TestClient(app) + + +# SQL to create the DSFA tables in SQLite (simplified from PostgreSQL) +CREATE_DSFAS_TABLE = """ +CREATE TABLE IF NOT EXISTS compliance_dsfas ( + id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), + tenant_id TEXT NOT NULL DEFAULT 'default', + title TEXT NOT NULL, + description TEXT DEFAULT '', + status TEXT DEFAULT 'draft', + risk_level TEXT DEFAULT 'low', + processing_activity TEXT DEFAULT '', + data_categories TEXT DEFAULT '[]', + recipients TEXT DEFAULT '[]', + measures TEXT DEFAULT '[]', + approved_by TEXT, + approved_at TIMESTAMP, + created_by TEXT DEFAULT 'system', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- Section 1 + processing_description TEXT, + processing_purpose TEXT, + legal_basis TEXT, + legal_basis_details TEXT, + -- Section 2 + necessity_assessment TEXT, + proportionality_assessment TEXT, + data_minimization TEXT, + alternatives_considered TEXT, + retention_justification TEXT, + -- Section 3 + involves_ai INTEGER DEFAULT 0, + overall_risk_level TEXT, + risk_score INTEGER DEFAULT 0, + risk_assessment TEXT, + -- Section 6 + dpo_consulted INTEGER DEFAULT 0, + dpo_consulted_at TIMESTAMP, + dpo_name TEXT, + dpo_opinion TEXT, + dpo_approved INTEGER, + authority_consulted INTEGER DEFAULT 0, + authority_consulted_at TIMESTAMP, + authority_reference TEXT, + authority_decision TEXT, + -- Metadata + version INTEGER DEFAULT 1, + previous_version_id TEXT, + conclusion TEXT, + federal_state TEXT, + authority_resource_id TEXT, + submitted_for_review_at TIMESTAMP, + submitted_by TEXT, + -- JSONB arrays (stored as TEXT in SQLite) + data_subjects TEXT DEFAULT '[]', + affected_rights TEXT DEFAULT '[]', + triggered_rule_codes TEXT DEFAULT '[]', + ai_trigger_ids TEXT DEFAULT '[]', + wp248_criteria_met TEXT DEFAULT '[]', + art35_abs3_triggered TEXT DEFAULT '[]', + tom_references TEXT DEFAULT '[]', + risks TEXT DEFAULT '[]', + mitigations TEXT DEFAULT '[]', + stakeholder_consultations TEXT DEFAULT '[]', + review_triggers TEXT DEFAULT '[]', + review_comments TEXT DEFAULT '[]', + ai_use_case_modules TEXT DEFAULT '[]', + section_8_complete INTEGER DEFAULT 0, + -- JSONB objects (stored as TEXT in SQLite) + threshold_analysis TEXT DEFAULT '{}', + consultation_requirement TEXT DEFAULT '{}', + review_schedule TEXT DEFAULT '{}', + section_progress TEXT DEFAULT '{}', + metadata TEXT DEFAULT '{}' +) +""" + +CREATE_AUDIT_TABLE = """ +CREATE TABLE IF NOT EXISTS compliance_dsfa_audit_log ( + id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), + tenant_id TEXT NOT NULL, + dsfa_id TEXT, + action TEXT NOT NULL, + changed_by TEXT DEFAULT 'system', + old_values TEXT, + new_values TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +) +""" + + +@pytest.fixture(autouse=True) +def setup_db(): + """Create tables before each test, drop after.""" + with engine.connect() as conn: + conn.execute(text(CREATE_DSFAS_TABLE)) + conn.execute(text(CREATE_AUDIT_TABLE)) + conn.commit() + yield + with engine.connect() as conn: + conn.execute(text("DROP TABLE IF EXISTS compliance_dsfa_audit_log")) + conn.execute(text("DROP TABLE IF EXISTS compliance_dsfas")) + conn.commit() + + +def _create_dsfa_via_api(**kwargs): + """Helper: create a DSFA via POST and return response JSON.""" + payload = {"title": "Test DSFA", **kwargs} + resp = client.post("/api/compliance/dsfa", json=payload) + assert resp.status_code == 201, resp.text + return resp.json() + + # ============================================================================= # Schema Tests — DSFACreate # ============================================================================= @@ -143,6 +381,38 @@ class TestDSFAStatusUpdate: assert req.status == "needs-update" +# ============================================================================= +# Schema Tests — New Schemas +# ============================================================================= + +class TestDSFASectionUpdate: + def test_content_only(self): + req = DSFASectionUpdate(content="Beschreibung der Verarbeitung") + assert req.content == "Beschreibung der Verarbeitung" + assert req.extra is None + + def test_extra_dict(self): + req = DSFASectionUpdate(extra={"key": "value"}) + assert req.extra == {"key": "value"} + + def test_all_optional(self): + req = DSFASectionUpdate() + assert req.content is None + assert req.extra is None + + +class TestDSFAApproveRequest: + def test_approved_true(self): + req = DSFAApproveRequest(approved=True, approved_by="DSB Mueller") + assert req.approved is True + assert req.approved_by == "DSB Mueller" + + def test_rejected(self): + req = DSFAApproveRequest(approved=False, comments="Massnahmen unzureichend") + assert req.approved is False + assert req.comments == "Massnahmen unzureichend" + + # ============================================================================= # Helper Tests — _get_tenant_id # ============================================================================= @@ -353,94 +623,369 @@ class TestValidRiskLevels: class TestDSFARouterConfig: def test_router_prefix(self): - from compliance.api.dsfa_routes import router - # /v1 prefix is added when router is included in the main app - assert router.prefix == "/dsfa" + assert dsfa_router.prefix == "/dsfa" def test_router_has_tags(self): - from compliance.api.dsfa_routes import router - assert "compliance-dsfa" in router.tags + assert "compliance-dsfa" in dsfa_router.tags def test_router_registered_in_init(self): - from compliance.api import dsfa_router - assert dsfa_router is not None + from compliance.api import dsfa_router as imported_router + assert imported_router is not None # ============================================================================= -# Stats Response Structure +# Route Integration Tests — CRUD # ============================================================================= -class TestDSFAStatsResponse: - def test_stats_keys_present(self): - """Stats endpoint must return these keys.""" - expected_keys = { - "total", "by_status", "by_risk_level", - "draft_count", "in_review_count", "approved_count", "needs_update_count" - } - # Verify by constructing the expected dict shape - stats = { - "total": 0, - "by_status": {}, - "by_risk_level": {}, - "draft_count": 0, - "in_review_count": 0, - "approved_count": 0, - "needs_update_count": 0, - } - assert set(stats.keys()) == expected_keys +class TestDSFARouteCRUD: + """Integration tests using TestClient + SQLite.""" - def test_stats_total_is_int(self): - stats = {"total": 5} - assert isinstance(stats["total"], int) + def test_list_dsfas_empty(self): + resp = client.get("/api/compliance/dsfa") + assert resp.status_code == 200 + assert resp.json() == [] - def test_stats_by_status_is_dict(self): - by_status = {"draft": 2, "approved": 1} - assert isinstance(by_status, dict) + def test_create_dsfa(self): + data = _create_dsfa_via_api(title="DSFA Videoüberwachung", risk_level="high") + assert data["title"] == "DSFA Videoüberwachung" + assert data["status"] == "draft" + assert data["risk_level"] == "high" + assert "id" in data - def test_stats_counts_are_integers(self): - counts = {"draft_count": 2, "in_review_count": 1, "approved_count": 0} - assert all(isinstance(v, int) for v in counts.values()) + def test_list_dsfas_with_data(self): + _create_dsfa_via_api(title="DSFA 1") + _create_dsfa_via_api(title="DSFA 2") + resp = client.get("/api/compliance/dsfa") + assert resp.status_code == 200 + items = resp.json() + assert len(items) == 2 - def test_stats_zero_total_when_no_dsfas(self): - stats = {"total": 0, "draft_count": 0, "in_review_count": 0, "approved_count": 0} - assert stats["total"] == 0 + def test_get_dsfa(self): + created = _create_dsfa_via_api(title="Detail-Test") + dsfa_id = created["id"] + resp = client.get(f"/api/compliance/dsfa/{dsfa_id}") + assert resp.status_code == 200 + assert resp.json()["title"] == "Detail-Test" + + def test_get_dsfa_not_found(self): + resp = client.get(f"/api/compliance/dsfa/{uuid.uuid4()}") + assert resp.status_code == 404 + + def test_update_dsfa(self): + created = _create_dsfa_via_api(title="Original") + dsfa_id = created["id"] + resp = client.put(f"/api/compliance/dsfa/{dsfa_id}", json={"title": "Updated"}) + assert resp.status_code == 200 + assert resp.json()["title"] == "Updated" + + def test_update_dsfa_not_found(self): + resp = client.put(f"/api/compliance/dsfa/{uuid.uuid4()}", json={"title": "X"}) + assert resp.status_code == 404 + + def test_delete_dsfa(self): + created = _create_dsfa_via_api(title="To Delete") + dsfa_id = created["id"] + resp = client.delete(f"/api/compliance/dsfa/{dsfa_id}") + assert resp.status_code == 200 + assert resp.json()["success"] is True + # Verify gone + resp2 = client.get(f"/api/compliance/dsfa/{dsfa_id}") + assert resp2.status_code == 404 + + def test_delete_dsfa_not_found(self): + resp = client.delete(f"/api/compliance/dsfa/{uuid.uuid4()}") + assert resp.status_code == 404 + + def test_list_with_status_filter(self): + _create_dsfa_via_api(title="Draft One") + created2 = _create_dsfa_via_api(title="Approved One") + # Change status to approved + client.patch( + f"/api/compliance/dsfa/{created2['id']}/status", + json={"status": "approved", "approved_by": "DSB"}, + ) + resp = client.get("/api/compliance/dsfa?status=approved") + assert resp.status_code == 200 + items = resp.json() + assert len(items) == 1 + assert items[0]["status"] == "approved" + + def test_create_invalid_status(self): + resp = client.post("/api/compliance/dsfa", json={"title": "Bad", "status": "invalid"}) + assert resp.status_code == 422 + + def test_create_invalid_risk_level(self): + resp = client.post("/api/compliance/dsfa", json={"title": "Bad", "risk_level": "extreme"}) + assert resp.status_code == 422 # ============================================================================= -# Audit Log Entry Structure +# Route Integration Tests — Stats # ============================================================================= -class TestAuditLogEntry: - def test_audit_log_entry_keys(self): - entry = { - "id": "uuid-1", - "tenant_id": "default", - "dsfa_id": "uuid-2", - "action": "CREATE", - "changed_by": "system", - "old_values": None, - "new_values": {"title": "Test"}, - "created_at": "2026-01-01T12:00:00", - } - assert "id" in entry - assert "action" in entry - assert "dsfa_id" in entry - assert "created_at" in entry +class TestDSFARouteStats: + def test_stats_empty(self): + resp = client.get("/api/compliance/dsfa/stats") + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["draft_count"] == 0 - def test_audit_action_values(self): - valid_actions = {"CREATE", "UPDATE", "DELETE", "STATUS_CHANGE"} - assert "CREATE" in valid_actions - assert "DELETE" in valid_actions - assert "STATUS_CHANGE" in valid_actions + def test_stats_with_data(self): + _create_dsfa_via_api(title="DSFA A") + _create_dsfa_via_api(title="DSFA B") + resp = client.get("/api/compliance/dsfa/stats") + data = resp.json() + assert data["total"] == 2 + assert data["draft_count"] == 2 - def test_audit_dsfa_id_can_be_none(self): - entry = {"dsfa_id": None} - assert entry["dsfa_id"] is None - def test_audit_old_values_can_be_none(self): - entry = {"old_values": None, "new_values": {"title": "Test"}} - assert entry["old_values"] is None - assert entry["new_values"] is not None +# ============================================================================= +# Route Integration Tests — Status Patch +# ============================================================================= + +class TestDSFARouteStatusPatch: + def test_patch_status(self): + created = _create_dsfa_via_api(title="Status Test") + resp = client.patch( + f"/api/compliance/dsfa/{created['id']}/status", + json={"status": "in-review"}, + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "in-review" + + def test_patch_status_invalid(self): + created = _create_dsfa_via_api(title="Bad Status") + resp = client.patch( + f"/api/compliance/dsfa/{created['id']}/status", + json={"status": "bogus"}, + ) + assert resp.status_code == 422 + + def test_patch_status_not_found(self): + resp = client.patch( + f"/api/compliance/dsfa/{uuid.uuid4()}/status", + json={"status": "draft"}, + ) + assert resp.status_code == 404 + + +# ============================================================================= +# Route Integration Tests — Section Update +# ============================================================================= + +class TestDSFARouteSectionUpdate: + def test_update_section_1(self): + created = _create_dsfa_via_api(title="Section Test") + resp = client.put( + f"/api/compliance/dsfa/{created['id']}/sections/1", + json={"content": "Verarbeitung personenbezogener Daten"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["processing_description"] == "Verarbeitung personenbezogener Daten" + + def test_update_section_7_conclusion(self): + created = _create_dsfa_via_api(title="Conclusion Test") + resp = client.put( + f"/api/compliance/dsfa/{created['id']}/sections/7", + json={"content": "DSFA abgeschlossen — Restrisiko akzeptabel"}, + ) + assert resp.status_code == 200 + assert resp.json()["conclusion"] == "DSFA abgeschlossen — Restrisiko akzeptabel" + + def test_update_section_progress_tracked(self): + created = _create_dsfa_via_api(title="Progress Test") + client.put( + f"/api/compliance/dsfa/{created['id']}/sections/1", + json={"content": "Test"}, + ) + resp = client.get(f"/api/compliance/dsfa/{created['id']}") + progress = resp.json()["section_progress"] + assert progress.get("section_1") is True + + def test_update_section_invalid_number(self): + created = _create_dsfa_via_api(title="Invalid Section") + resp = client.put( + f"/api/compliance/dsfa/{created['id']}/sections/9", + json={"content": "X"}, + ) + assert resp.status_code == 422 + + def test_update_section_not_found(self): + resp = client.put( + f"/api/compliance/dsfa/{uuid.uuid4()}/sections/1", + json={"content": "X"}, + ) + assert resp.status_code == 404 + + +# ============================================================================= +# Route Integration Tests — Workflow (Submit + Approve) +# ============================================================================= + +class TestDSFARouteWorkflow: + def test_submit_for_review(self): + created = _create_dsfa_via_api(title="Workflow Test") + resp = client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "in-review" + assert data["message"] == "DSFA zur Prüfung eingereicht" + + def test_submit_for_review_wrong_status(self): + created = _create_dsfa_via_api(title="Wrong Status") + # First submit + client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review") + # Try to submit again (already in-review) + resp = client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review") + assert resp.status_code == 422 + + def test_submit_not_found(self): + resp = client.post(f"/api/compliance/dsfa/{uuid.uuid4()}/submit-for-review") + assert resp.status_code == 404 + + def test_approve_dsfa(self): + created = _create_dsfa_via_api(title="Approve Test") + client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review") + resp = client.post( + f"/api/compliance/dsfa/{created['id']}/approve", + json={"approved": True, "approved_by": "DSB Mueller"}, + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "approved" + + def test_reject_dsfa(self): + created = _create_dsfa_via_api(title="Reject Test") + client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review") + resp = client.post( + f"/api/compliance/dsfa/{created['id']}/approve", + json={"approved": False, "comments": "Massnahmen fehlen"}, + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "needs-update" + + def test_approve_wrong_status(self): + created = _create_dsfa_via_api(title="Not In Review") + resp = client.post( + f"/api/compliance/dsfa/{created['id']}/approve", + json={"approved": True}, + ) + assert resp.status_code == 422 + + def test_approve_not_found(self): + resp = client.post( + f"/api/compliance/dsfa/{uuid.uuid4()}/approve", + json={"approved": True}, + ) + assert resp.status_code == 404 + + def test_full_workflow_draft_to_approved(self): + """Full lifecycle: create → submit → approve.""" + created = _create_dsfa_via_api(title="Full Lifecycle") + dsfa_id = created["id"] + assert created["status"] == "draft" + + # Submit for review + resp1 = client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review") + assert resp1.json()["status"] == "in-review" + + # Approve + resp2 = client.post( + f"/api/compliance/dsfa/{dsfa_id}/approve", + json={"approved": True, "approved_by": "CISO"}, + ) + assert resp2.json()["status"] == "approved" + + # Verify final state + resp3 = client.get(f"/api/compliance/dsfa/{dsfa_id}") + final = resp3.json() + assert final["status"] == "approved" + assert final["approved_by"] == "CISO" + + def test_reject_then_resubmit(self): + """Lifecycle: create → submit → reject → resubmit → approve.""" + created = _create_dsfa_via_api(title="Reject Resubmit") + dsfa_id = created["id"] + + client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review") + client.post( + f"/api/compliance/dsfa/{dsfa_id}/approve", + json={"approved": False, "comments": "Incomplete"}, + ) + + # Status should be needs-update → can resubmit + resp = client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review") + assert resp.status_code == 200 + assert resp.json()["status"] == "in-review" + + +# ============================================================================= +# Route Integration Tests — Export +# ============================================================================= + +class TestDSFARouteExport: + def test_export_json(self): + created = _create_dsfa_via_api(title="Export Test") + resp = client.get(f"/api/compliance/dsfa/{created['id']}/export?format=json") + assert resp.status_code == 200 + data = resp.json() + assert "exported_at" in data + assert data["dsfa"]["title"] == "Export Test" + + def test_export_json_not_found(self): + resp = client.get(f"/api/compliance/dsfa/{uuid.uuid4()}/export?format=json") + assert resp.status_code == 404 + + def test_export_csv(self): + _create_dsfa_via_api(title="CSV DSFA 1") + _create_dsfa_via_api(title="CSV DSFA 2") + resp = client.get("/api/compliance/dsfa/export/csv") + assert resp.status_code == 200 + assert "text/csv" in resp.headers.get("content-type", "") + lines = resp.text.strip().split("\n") + assert len(lines) == 3 # header + 2 rows + assert "ID" in lines[0] + assert "CSV DSFA" in lines[1] or "CSV DSFA" in lines[2] + + def test_export_csv_empty(self): + resp = client.get("/api/compliance/dsfa/export/csv") + assert resp.status_code == 200 + lines = resp.text.strip().split("\n") + assert len(lines) == 1 # header only + + +# ============================================================================= +# Route Integration Tests — UCCA Stubs +# ============================================================================= + +class TestDSFARouteUCCAStubs: + def test_from_assessment_returns_501(self): + resp = client.post(f"/api/compliance/dsfa/from-assessment/{uuid.uuid4()}") + assert resp.status_code == 501 + + def test_by_assessment_returns_501(self): + resp = client.get(f"/api/compliance/dsfa/by-assessment/{uuid.uuid4()}") + assert resp.status_code == 501 + + +# ============================================================================= +# Route Integration Tests — Audit Log +# ============================================================================= + +class TestDSFARouteAuditLog: + def test_audit_log_after_create(self): + _create_dsfa_via_api(title="Audit Test") + resp = client.get("/api/compliance/dsfa/audit-log") + assert resp.status_code == 200 + entries = resp.json() + assert len(entries) >= 1 + assert entries[0]["action"] == "CREATE" + + def test_audit_log_empty(self): + resp = client.get("/api/compliance/dsfa/audit-log") + assert resp.status_code == 200 + assert resp.json() == [] # ============================================================================= @@ -556,7 +1101,6 @@ class TestAIUseCaseModules: def test_response_ai_use_case_modules_list_from_list(self): """_dsfa_to_response: ai_use_case_modules list passthrough.""" - from tests.test_dsfa_routes import TestDsfaToResponse helper = TestDsfaToResponse() modules = [{"type": "nlp", "name": "Test"}] row = helper._make_row(ai_use_case_modules=modules) @@ -565,7 +1109,6 @@ class TestAIUseCaseModules: def test_response_ai_use_case_modules_from_json_string(self): """_dsfa_to_response: parses JSON string for ai_use_case_modules.""" - from tests.test_dsfa_routes import TestDsfaToResponse helper = TestDsfaToResponse() modules = [{"type": "computer_vision"}] row = helper._make_row(ai_use_case_modules=_json.dumps(modules)) @@ -574,7 +1117,6 @@ class TestAIUseCaseModules: def test_response_ai_use_case_modules_null_becomes_empty_list(self): """_dsfa_to_response: None → empty list.""" - from tests.test_dsfa_routes import TestDsfaToResponse helper = TestDsfaToResponse() row = helper._make_row(ai_use_case_modules=None) result = _dsfa_to_response(row) @@ -582,7 +1124,6 @@ class TestAIUseCaseModules: def test_response_section_8_complete_flag(self): """_dsfa_to_response: section_8_complete bool preserved.""" - from tests.test_dsfa_routes import TestDsfaToResponse helper = TestDsfaToResponse() row = helper._make_row(section_8_complete=True) result = _dsfa_to_response(row) @@ -598,7 +1139,6 @@ class TestDSFAFullSchema: def _make_row(self, **overrides): """Reuse the shared helper from TestDsfaToResponse.""" - from tests.test_dsfa_routes import TestDsfaToResponse helper = TestDsfaToResponse() return helper._make_row(**overrides) @@ -766,3 +1306,78 @@ class TestDSFAFullSchema: ] for key in new_keys: assert key in result, f"Missing key in response: {key}" + + +# ============================================================================= +# Stats Response Structure +# ============================================================================= + +class TestDSFAStatsResponse: + def test_stats_keys_present(self): + expected_keys = { + "total", "by_status", "by_risk_level", + "draft_count", "in_review_count", "approved_count", "needs_update_count" + } + stats = { + "total": 0, + "by_status": {}, + "by_risk_level": {}, + "draft_count": 0, + "in_review_count": 0, + "approved_count": 0, + "needs_update_count": 0, + } + assert set(stats.keys()) == expected_keys + + def test_stats_total_is_int(self): + stats = {"total": 5} + assert isinstance(stats["total"], int) + + def test_stats_by_status_is_dict(self): + by_status = {"draft": 2, "approved": 1} + assert isinstance(by_status, dict) + + def test_stats_counts_are_integers(self): + counts = {"draft_count": 2, "in_review_count": 1, "approved_count": 0} + assert all(isinstance(v, int) for v in counts.values()) + + def test_stats_zero_total_when_no_dsfas(self): + stats = {"total": 0, "draft_count": 0, "in_review_count": 0, "approved_count": 0} + assert stats["total"] == 0 + + +# ============================================================================= +# Audit Log Entry Structure +# ============================================================================= + +class TestAuditLogEntry: + def test_audit_log_entry_keys(self): + entry = { + "id": "uuid-1", + "tenant_id": "default", + "dsfa_id": "uuid-2", + "action": "CREATE", + "changed_by": "system", + "old_values": None, + "new_values": {"title": "Test"}, + "created_at": "2026-01-01T12:00:00", + } + assert "id" in entry + assert "action" in entry + assert "dsfa_id" in entry + assert "created_at" in entry + + def test_audit_action_values(self): + valid_actions = {"CREATE", "UPDATE", "DELETE", "STATUS_CHANGE"} + assert "CREATE" in valid_actions + assert "DELETE" in valid_actions + assert "STATUS_CHANGE" in valid_actions + + def test_audit_dsfa_id_can_be_none(self): + entry = {"dsfa_id": None} + assert entry["dsfa_id"] is None + + def test_audit_old_values_can_be_none(self): + entry = {"old_values": None, "new_values": {"title": "Test"}} + assert entry["old_values"] is None + assert entry["new_values"] is not None