feat(dsfa): Go DSFA deprecated, URL-Fix, fehlende Endpoints + 145 Tests
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 34s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 27s
CI / test-python-dsms-gateway (push) Successful in 18s

- Go: DEPRECATED-Kommentare an allen 6 DSFA-Handlern + Route-Block
- api.ts: URL-Fix /dsgvo/dsfas → /dsfa (Detail-Seite war komplett kaputt)
- Python: Section-Update, Workflow (submit/approve), Export (JSON+CSV), UCCA-Stubs
- Tests: 145/145 bestanden (Schema + Route-Integration mit TestClient+SQLite)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-06 19:41:12 +01:00
parent 095eff26d9
commit 6a940344c2
5 changed files with 1005 additions and 97 deletions

View File

@@ -64,7 +64,7 @@ function getHeaders(): HeadersInit {
* List all DSFAs for the current tenant
*/
export async function listDSFAs(status?: string): Promise<DSFA[]> {
const url = new URL(`${getBaseUrl()}/dsgvo/dsfas`, window.location.origin)
const url = new URL(`${getBaseUrl()}/dsfa`, window.location.origin)
if (status) {
url.searchParams.set('status', status)
}
@@ -83,7 +83,7 @@ export async function listDSFAs(status?: string): Promise<DSFA[]> {
* Get a single DSFA by ID
*/
export async function getDSFA(id: string): Promise<DSFA> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}`, {
method: 'GET',
headers: getHeaders(),
credentials: 'include',
@@ -96,7 +96,7 @@ export async function getDSFA(id: string): Promise<DSFA> {
* Create a new DSFA
*/
export async function createDSFA(data: CreateDSFARequest): Promise<DSFA> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas`, {
const response = await fetch(`${getBaseUrl()}/dsfa`, {
method: 'POST',
headers: getHeaders(),
credentials: 'include',
@@ -110,7 +110,7 @@ export async function createDSFA(data: CreateDSFARequest): Promise<DSFA> {
* Update an existing DSFA
*/
export async function updateDSFA(id: string, data: Partial<DSFA>): Promise<DSFA> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}`, {
method: 'PUT',
headers: getHeaders(),
credentials: 'include',
@@ -124,7 +124,7 @@ export async function updateDSFA(id: string, data: Partial<DSFA>): Promise<DSFA>
* Delete a DSFA
*/
export async function deleteDSFA(id: string): Promise<void> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}`, {
method: 'DELETE',
headers: getHeaders(),
credentials: 'include',
@@ -147,7 +147,7 @@ export async function updateDSFASection(
sectionNumber: number,
data: UpdateDSFASectionRequest
): Promise<DSFA> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/sections/${sectionNumber}`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}/sections/${sectionNumber}`, {
method: 'PUT',
headers: getHeaders(),
credentials: 'include',
@@ -165,7 +165,7 @@ export async function updateDSFASection(
* Submit a DSFA for DPO review
*/
export async function submitDSFAForReview(id: string): Promise<SubmitForReviewResponse> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/submit-for-review`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}/submit-for-review`, {
method: 'POST',
headers: getHeaders(),
credentials: 'include',
@@ -178,7 +178,7 @@ export async function submitDSFAForReview(id: string): Promise<SubmitForReviewRe
* Approve or reject a DSFA (DPO/CISO/GF action)
*/
export async function approveDSFA(id: string, data: ApproveDSFARequest): Promise<{ message: string }> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/approve`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}/approve`, {
method: 'POST',
headers: getHeaders(),
credentials: 'include',
@@ -196,7 +196,7 @@ export async function approveDSFA(id: string, data: ApproveDSFARequest): Promise
* Get DSFA statistics for the dashboard
*/
export async function getDSFAStats(): Promise<DSFAStatsResponse> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/stats`, {
const response = await fetch(`${getBaseUrl()}/dsfa/stats`, {
method: 'GET',
headers: getHeaders(),
credentials: 'include',
@@ -216,7 +216,7 @@ export async function createDSFAFromAssessment(
assessmentId: string,
data?: CreateDSFAFromAssessmentRequest
): Promise<CreateDSFAFromAssessmentResponse> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/from-assessment/${assessmentId}`, {
const response = await fetch(`${getBaseUrl()}/dsfa/from-assessment/${assessmentId}`, {
method: 'POST',
headers: getHeaders(),
credentials: 'include',
@@ -231,7 +231,7 @@ export async function createDSFAFromAssessment(
*/
export async function getDSFAByAssessment(assessmentId: string): Promise<DSFA | null> {
try {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/by-assessment/${assessmentId}`, {
const response = await fetch(`${getBaseUrl()}/dsfa/by-assessment/${assessmentId}`, {
method: 'GET',
headers: getHeaders(),
credentials: 'include',
@@ -269,7 +269,7 @@ export async function checkDSFARequired(assessmentId: string): Promise<DSFATrigg
* Export a DSFA as JSON
*/
export async function exportDSFAAsJSON(id: string): Promise<Blob> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/export?format=json`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}/export?format=json`, {
method: 'GET',
headers: {
'Accept': 'application/json',
@@ -288,7 +288,7 @@ export async function exportDSFAAsJSON(id: string): Promise<Blob> {
* Export a DSFA as PDF
*/
export async function exportDSFAAsPDF(id: string): Promise<Blob> {
const response = await fetch(`${getBaseUrl()}/dsgvo/dsfas/${id}/export/pdf`, {
const response = await fetch(`${getBaseUrl()}/dsfa/${id}/export/pdf`, {
method: 'GET',
headers: {
'Accept': 'application/pdf',

View File

@@ -291,6 +291,8 @@ func main() {
}
// DSFA - Datenschutz-Folgenabschätzung (Art. 35)
// DEPRECATED: DSFA migrated to backend-compliance (Python/FastAPI).
// Use backend-compliance /api/compliance/dsfa/* instead.
dsfa := dsgvoRoutes.Group("/dsfa")
{
dsfa.GET("", dsgvoHandlers.ListDSFAs)

View File

@@ -397,9 +397,13 @@ func (h *DSGVOHandlers) GetStats(c *gin.Context) {
// ============================================================================
// DSFA - Datenschutz-Folgenabschätzung
// DEPRECATED: DSFA endpoints migrated to backend-compliance (Python/FastAPI).
// These in-memory Go handlers are kept for backwards compatibility only.
// Use backend-compliance /api/compliance/dsfa/* instead.
// ============================================================================
// ListDSFAs returns all DSFAs for a tenant
// DEPRECATED: Use backend-compliance GET /api/compliance/dsfa
func (h *DSGVOHandlers) ListDSFAs(c *gin.Context) {
tenantID := rbac.GetTenantID(c)
if tenantID == uuid.Nil {
@@ -419,6 +423,7 @@ func (h *DSGVOHandlers) ListDSFAs(c *gin.Context) {
}
// GetDSFA returns a DSFA by ID
// DEPRECATED: Use backend-compliance GET /api/compliance/dsfa/{id}
func (h *DSGVOHandlers) GetDSFA(c *gin.Context) {
id, err := uuid.Parse(c.Param("id"))
if err != nil {
@@ -440,6 +445,7 @@ func (h *DSGVOHandlers) GetDSFA(c *gin.Context) {
}
// CreateDSFA creates a new DSFA
// DEPRECATED: Use backend-compliance POST /api/compliance/dsfa
func (h *DSGVOHandlers) CreateDSFA(c *gin.Context) {
tenantID := rbac.GetTenantID(c)
userID := rbac.GetUserID(c)
@@ -469,6 +475,7 @@ func (h *DSGVOHandlers) CreateDSFA(c *gin.Context) {
}
// UpdateDSFA updates a DSFA
// DEPRECATED: Use backend-compliance PUT /api/compliance/dsfa/{id}
func (h *DSGVOHandlers) UpdateDSFA(c *gin.Context) {
id, err := uuid.Parse(c.Param("id"))
if err != nil {
@@ -492,6 +499,7 @@ func (h *DSGVOHandlers) UpdateDSFA(c *gin.Context) {
}
// DeleteDSFA deletes a DSFA
// DEPRECATED: Use backend-compliance DELETE /api/compliance/dsfa/{id}
func (h *DSGVOHandlers) DeleteDSFA(c *gin.Context) {
id, err := uuid.Parse(c.Param("id"))
if err != nil {
@@ -677,6 +685,7 @@ func (h *DSGVOHandlers) ExportDSR(c *gin.Context) {
}
// ExportDSFA exports a DSFA as JSON
// DEPRECATED: Use backend-compliance GET /api/compliance/dsfa/{id}/export?format=json
func (h *DSGVOHandlers) ExportDSFA(c *gin.Context) {
id, err := uuid.Parse(c.Param("id"))
if err != nil {

View File

@@ -2,14 +2,21 @@
FastAPI routes for DSFA — Datenschutz-Folgenabschaetzung (Art. 35 DSGVO).
Endpoints:
GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit)
POST /v1/dsfa — Neu erstellen → 201
GET /v1/dsfa/stats — Zähler nach Status
GET /v1/dsfa/audit-log — Audit-Log
GET /v1/dsfa/{id} — Detail
PUT /v1/dsfa/{id} — Update
DELETE /v1/dsfa/{id}Löschen (Art. 17 DSGVO)
PATCH /v1/dsfa/{id}/statusSchnell-Statuswechsel
GET /v1/dsfa — Liste (tenant_id + status-filter + skip/limit)
POST /v1/dsfa — Neu erstellen → 201
GET /v1/dsfa/stats — Zähler nach Status
GET /v1/dsfa/audit-log — Audit-Log
GET /v1/dsfa/export/csv — CSV-Export aller DSFAs
POST /v1/dsfa/from-assessment/{id} — Stub: DSFA aus UCCA-Assessment
GET /v1/dsfa/by-assessment/{id}Stub: DSFA nach Assessment-ID
GET /v1/dsfa/{id} Detail
PUT /v1/dsfa/{id} — Update
DELETE /v1/dsfa/{id} — Löschen (Art. 17 DSGVO)
PATCH /v1/dsfa/{id}/status — Schnell-Statuswechsel
PUT /v1/dsfa/{id}/sections/{nr} — Section-Update (1-8)
POST /v1/dsfa/{id}/submit-for-review — Workflow: Einreichen
POST /v1/dsfa/{id}/approve — Workflow: Genehmigen/Ablehnen
GET /v1/dsfa/{id}/export — JSON-Export einer DSFA
"""
import logging
@@ -165,6 +172,20 @@ class DSFAStatusUpdate(BaseModel):
approved_by: Optional[str] = None
class DSFASectionUpdate(BaseModel):
"""Body for PUT /dsfa/{id}/sections/{section_number}."""
content: Optional[str] = None
# Allow arbitrary extra fields so the frontend can send any section-specific data
extra: Optional[dict] = None
class DSFAApproveRequest(BaseModel):
"""Body for POST /dsfa/{id}/approve."""
approved: bool
comments: Optional[str] = None
approved_by: Optional[str] = None
# =============================================================================
# Helpers
# =============================================================================
@@ -207,7 +228,11 @@ def _dsfa_to_response(row) -> dict:
def _ts(val):
"""Timestamp → ISO string or None."""
return val.isoformat() if val else None
if not val:
return None
if isinstance(val, str):
return val
return val.isoformat()
def _get(key, default=None):
"""Safe row access — returns default if key missing (handles old rows)."""
@@ -389,12 +414,68 @@ async def get_audit_log(
"changed_by": r["changed_by"],
"old_values": r["old_values"],
"new_values": r["new_values"],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"created_at": r["created_at"] if isinstance(r["created_at"], str) else (r["created_at"].isoformat() if r["created_at"] else None),
}
for r in rows
]
# =============================================================================
# CSV Export (must be before /{id} to avoid route conflict)
# =============================================================================
@router.get("/export/csv", name="export_dsfas_csv")
async def export_dsfas_csv(
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Export all DSFAs as CSV."""
import csv
import io
tid = _get_tenant_id(tenant_id)
rows = db.execute(
text("SELECT * FROM compliance_dsfas WHERE tenant_id = :tid ORDER BY created_at DESC"),
{"tid": tid},
).fetchall()
output = io.StringIO()
writer = csv.writer(output, delimiter=";")
writer.writerow(["ID", "Titel", "Status", "Risiko-Level", "Erstellt", "Aktualisiert"])
for r in rows:
writer.writerow([
str(r["id"]),
r["title"],
r["status"] or "draft",
r["risk_level"] or "low",
r["created_at"] if isinstance(r["created_at"], str) else (r["created_at"].isoformat() if r["created_at"] else ""),
r["updated_at"] if isinstance(r["updated_at"], str) else (r["updated_at"].isoformat() if r["updated_at"] else ""),
])
from fastapi.responses import Response
return Response(
content=output.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=dsfas_export.csv"},
)
# =============================================================================
# UCCA Integration Stubs (must be before /{id} to avoid route conflict)
# =============================================================================
@router.post("/from-assessment/{assessment_id}", status_code=501)
async def create_from_assessment(assessment_id: str):
"""Stub: Create DSFA from UCCA assessment. Requires cross-service communication."""
return {"detail": "Not implemented — requires cross-service integration with ai-compliance-sdk"}
@router.get("/by-assessment/{assessment_id}", status_code=501)
async def get_by_assessment(assessment_id: str):
"""Stub: Get DSFA by linked UCCA assessment ID."""
return {"detail": "Not implemented — requires cross-service integration with ai-compliance-sdk"}
# =============================================================================
# List + Create
# =============================================================================
@@ -627,3 +708,204 @@ async def update_dsfa_status(
)
db.commit()
return _dsfa_to_response(row)
# =============================================================================
# Section Update
# =============================================================================
SECTION_FIELD_MAP = {
1: "processing_description",
2: "necessity_assessment",
3: "risk_assessment", # maps to overall_risk_level + risk_score
4: "stakeholder_consultations", # JSONB
5: "measures", # JSONB array
6: "dpo_opinion", # consultation section
7: "conclusion", # documentation / conclusion
8: "ai_use_case_modules", # JSONB array Section 8 KI
}
@router.put("/{dsfa_id}/sections/{section_number}")
async def update_section(
dsfa_id: str,
section_number: int,
request: DSFASectionUpdate,
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Update a specific DSFA section (1-8)."""
import json
if section_number < 1 or section_number > 8:
raise HTTPException(status_code=422, detail=f"Section must be 1-8, got {section_number}")
tid = _get_tenant_id(tenant_id)
existing = db.execute(
text("SELECT * FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
{"id": dsfa_id, "tid": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
field = SECTION_FIELD_MAP[section_number]
jsonb_sections = {4, 5, 8}
params: dict = {"id": dsfa_id, "tid": tid}
if section_number in jsonb_sections:
value = request.extra if request.extra is not None else ([] if section_number != 4 else [])
params["val"] = json.dumps(value)
set_clause = f"{field} = CAST(:val AS jsonb)"
else:
params["val"] = request.content or ""
set_clause = f"{field} = :val"
# Also update section_progress
progress = existing["section_progress"] if existing["section_progress"] else {}
if isinstance(progress, str):
progress = json.loads(progress)
progress[f"section_{section_number}"] = True
params["progress"] = json.dumps(progress)
row = db.execute(
text(f"""
UPDATE compliance_dsfas
SET {set_clause}, section_progress = CAST(:progress AS jsonb), updated_at = NOW()
WHERE id = :id AND tenant_id = :tid
RETURNING *
"""),
params,
).fetchone()
_log_audit(db, tid, dsfa_id, "SECTION_UPDATE", new_values={"section": section_number, "field": field})
db.commit()
return _dsfa_to_response(row)
# =============================================================================
# Workflow: Submit for Review + Approve
# =============================================================================
@router.post("/{dsfa_id}/submit-for-review")
async def submit_for_review(
dsfa_id: str,
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Submit a DSFA for DPO review (draft → in-review)."""
tid = _get_tenant_id(tenant_id)
existing = db.execute(
text("SELECT id, status FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
{"id": dsfa_id, "tid": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
if existing["status"] not in ("draft", "needs-update"):
raise HTTPException(
status_code=422,
detail=f"Kann nur aus Status 'draft' oder 'needs-update' eingereicht werden, aktuell: {existing['status']}",
)
row = db.execute(
text("""
UPDATE compliance_dsfas
SET status = 'in-review', submitted_for_review_at = NOW(), updated_at = NOW()
WHERE id = :id AND tenant_id = :tid
RETURNING *
"""),
{"id": dsfa_id, "tid": tid},
).fetchone()
_log_audit(
db, tid, dsfa_id, "SUBMIT_FOR_REVIEW",
old_values={"status": existing["status"]},
new_values={"status": "in-review"},
)
db.commit()
return {"message": "DSFA zur Prüfung eingereicht", "status": "in-review", "dsfa": _dsfa_to_response(row)}
@router.post("/{dsfa_id}/approve")
async def approve_dsfa(
dsfa_id: str,
request: DSFAApproveRequest,
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Approve or reject a DSFA (DPO/CISO action)."""
tid = _get_tenant_id(tenant_id)
existing = db.execute(
text("SELECT id, status FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
{"id": dsfa_id, "tid": tid},
).fetchone()
if not existing:
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
if existing["status"] != "in-review":
raise HTTPException(
status_code=422,
detail=f"Nur DSFAs im Status 'in-review' können genehmigt werden, aktuell: {existing['status']}",
)
if request.approved:
new_status = "approved"
row = db.execute(
text("""
UPDATE compliance_dsfas
SET status = 'approved', approved_by = :approved_by, approved_at = NOW(), updated_at = NOW()
WHERE id = :id AND tenant_id = :tid
RETURNING *
"""),
{"id": dsfa_id, "tid": tid, "approved_by": request.approved_by or "system"},
).fetchone()
else:
new_status = "needs-update"
row = db.execute(
text("""
UPDATE compliance_dsfas
SET status = 'needs-update', updated_at = NOW()
WHERE id = :id AND tenant_id = :tid
RETURNING *
"""),
{"id": dsfa_id, "tid": tid},
).fetchone()
_log_audit(
db, tid, dsfa_id, "APPROVE" if request.approved else "REJECT",
old_values={"status": existing["status"]},
new_values={"status": new_status, "comments": request.comments},
)
db.commit()
return {"message": f"DSFA {'genehmigt' if request.approved else 'zurückgewiesen'}", "status": new_status}
# =============================================================================
# Export
# =============================================================================
@router.get("/{dsfa_id}/export")
async def export_dsfa_json(
dsfa_id: str,
format: str = Query("json"),
tenant_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Export a single DSFA as JSON."""
tid = _get_tenant_id(tenant_id)
row = db.execute(
text("SELECT * FROM compliance_dsfas WHERE id = :id AND tenant_id = :tid"),
{"id": dsfa_id, "tid": tid},
).fetchone()
if not row:
raise HTTPException(status_code=404, detail=f"DSFA {dsfa_id} nicht gefunden")
dsfa_data = _dsfa_to_response(row)
return {
"exported_at": datetime.utcnow().isoformat(),
"format": format,
"dsfa": dsfa_data,
}

View File

@@ -1,23 +1,261 @@
"""Tests for DSFA routes and schemas (dsfa_routes.py)."""
"""Tests for DSFA routes and schemas (dsfa_routes.py).
Includes:
- Schema/Pydantic tests (DSFACreate, DSFAUpdate, DSFAStatusUpdate)
- Helper tests (_dsfa_to_response, _get_tenant_id)
- Route integration tests (TestClient + SQLite)
"""
import pytest
from unittest.mock import MagicMock, patch
import uuid
import os
import sys
from datetime import datetime
from unittest.mock import MagicMock
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, text, event # noqa: F401
from sqlalchemy.orm import sessionmaker
# Ensure backend dir is on path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from classroom_engine.database import get_db
from compliance.api.dsfa_routes import (
DSFACreate,
DSFAUpdate,
DSFAStatusUpdate,
DSFASectionUpdate,
DSFAApproveRequest,
_dsfa_to_response,
_get_tenant_id,
DEFAULT_TENANT_ID,
VALID_STATUSES,
VALID_RISK_LEVELS,
router as dsfa_router,
)
import json as _json
# =============================================================================
# Test App + SQLite Setup
# =============================================================================
SQLALCHEMY_DATABASE_URL = "sqlite:///./test_dsfa.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
_RawSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@event.listens_for(engine, "connect")
def _register_sqlite_functions(dbapi_conn, connection_record):
"""Register PostgreSQL-compatible functions for SQLite."""
dbapi_conn.create_function("NOW", 0, lambda: datetime.utcnow().isoformat())
TENANT_ID = "default"
class _DictRow(dict):
"""Dict wrapper that mimics PostgreSQL's dict-like row access for SQLite."""
pass
class _DictSession:
"""Wrapper around SQLAlchemy Session that returns dict-like rows.
Production code uses row["column_name"] which works with PostgreSQL/psycopg2
but not with SQLAlchemy 2.0's Row objects on SQLite. This wrapper converts
all result rows to dicts so the raw-SQL routes work in tests.
Also rewrites CAST(:param AS jsonb) → :param for SQLite compatibility.
PostgreSQL CAST AS jsonb works, but SQLite CAST to unknown type yields 0.
"""
def __init__(self, session):
self._session = session
def execute(self, stmt, params=None):
import re
# Rewrite CAST(:param AS jsonb) → :param for SQLite
if hasattr(stmt, 'text'):
rewritten = re.sub(r'CAST\((:[\w]+)\s+AS\s+jsonb\)', r'\1', stmt.text)
if rewritten != stmt.text:
stmt = text(rewritten)
result = self._session.execute(stmt, params)
return _DictResult(result)
def flush(self):
self._session.flush()
def commit(self):
self._session.commit()
def rollback(self):
self._session.rollback()
def close(self):
self._session.close()
class _DictResult:
"""Wraps SQLAlchemy Result to return dict rows."""
def __init__(self, result):
self._result = result
try:
self._keys = list(result.keys())
self._returns_rows = True
except Exception:
self._keys = []
self._returns_rows = False
def fetchone(self):
if not self._returns_rows:
return None
row = self._result.fetchone()
if row is None:
return None
return _DictRow(zip(self._keys, row))
def fetchall(self):
if not self._returns_rows:
return []
rows = self._result.fetchall()
return [_DictRow(zip(self._keys, r)) for r in rows]
@property
def rowcount(self):
return self._result.rowcount
app = FastAPI()
app.include_router(dsfa_router, prefix="/api/compliance")
def override_get_db():
session = _RawSessionLocal()
db = _DictSession(session)
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
# SQL to create the DSFA tables in SQLite (simplified from PostgreSQL)
CREATE_DSFAS_TABLE = """
CREATE TABLE IF NOT EXISTS compliance_dsfas (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
tenant_id TEXT NOT NULL DEFAULT 'default',
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'draft',
risk_level TEXT DEFAULT 'low',
processing_activity TEXT DEFAULT '',
data_categories TEXT DEFAULT '[]',
recipients TEXT DEFAULT '[]',
measures TEXT DEFAULT '[]',
approved_by TEXT,
approved_at TIMESTAMP,
created_by TEXT DEFAULT 'system',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Section 1
processing_description TEXT,
processing_purpose TEXT,
legal_basis TEXT,
legal_basis_details TEXT,
-- Section 2
necessity_assessment TEXT,
proportionality_assessment TEXT,
data_minimization TEXT,
alternatives_considered TEXT,
retention_justification TEXT,
-- Section 3
involves_ai INTEGER DEFAULT 0,
overall_risk_level TEXT,
risk_score INTEGER DEFAULT 0,
risk_assessment TEXT,
-- Section 6
dpo_consulted INTEGER DEFAULT 0,
dpo_consulted_at TIMESTAMP,
dpo_name TEXT,
dpo_opinion TEXT,
dpo_approved INTEGER,
authority_consulted INTEGER DEFAULT 0,
authority_consulted_at TIMESTAMP,
authority_reference TEXT,
authority_decision TEXT,
-- Metadata
version INTEGER DEFAULT 1,
previous_version_id TEXT,
conclusion TEXT,
federal_state TEXT,
authority_resource_id TEXT,
submitted_for_review_at TIMESTAMP,
submitted_by TEXT,
-- JSONB arrays (stored as TEXT in SQLite)
data_subjects TEXT DEFAULT '[]',
affected_rights TEXT DEFAULT '[]',
triggered_rule_codes TEXT DEFAULT '[]',
ai_trigger_ids TEXT DEFAULT '[]',
wp248_criteria_met TEXT DEFAULT '[]',
art35_abs3_triggered TEXT DEFAULT '[]',
tom_references TEXT DEFAULT '[]',
risks TEXT DEFAULT '[]',
mitigations TEXT DEFAULT '[]',
stakeholder_consultations TEXT DEFAULT '[]',
review_triggers TEXT DEFAULT '[]',
review_comments TEXT DEFAULT '[]',
ai_use_case_modules TEXT DEFAULT '[]',
section_8_complete INTEGER DEFAULT 0,
-- JSONB objects (stored as TEXT in SQLite)
threshold_analysis TEXT DEFAULT '{}',
consultation_requirement TEXT DEFAULT '{}',
review_schedule TEXT DEFAULT '{}',
section_progress TEXT DEFAULT '{}',
metadata TEXT DEFAULT '{}'
)
"""
CREATE_AUDIT_TABLE = """
CREATE TABLE IF NOT EXISTS compliance_dsfa_audit_log (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
tenant_id TEXT NOT NULL,
dsfa_id TEXT,
action TEXT NOT NULL,
changed_by TEXT DEFAULT 'system',
old_values TEXT,
new_values TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
@pytest.fixture(autouse=True)
def setup_db():
"""Create tables before each test, drop after."""
with engine.connect() as conn:
conn.execute(text(CREATE_DSFAS_TABLE))
conn.execute(text(CREATE_AUDIT_TABLE))
conn.commit()
yield
with engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS compliance_dsfa_audit_log"))
conn.execute(text("DROP TABLE IF EXISTS compliance_dsfas"))
conn.commit()
def _create_dsfa_via_api(**kwargs):
"""Helper: create a DSFA via POST and return response JSON."""
payload = {"title": "Test DSFA", **kwargs}
resp = client.post("/api/compliance/dsfa", json=payload)
assert resp.status_code == 201, resp.text
return resp.json()
# =============================================================================
# Schema Tests — DSFACreate
# =============================================================================
@@ -143,6 +381,38 @@ class TestDSFAStatusUpdate:
assert req.status == "needs-update"
# =============================================================================
# Schema Tests — New Schemas
# =============================================================================
class TestDSFASectionUpdate:
def test_content_only(self):
req = DSFASectionUpdate(content="Beschreibung der Verarbeitung")
assert req.content == "Beschreibung der Verarbeitung"
assert req.extra is None
def test_extra_dict(self):
req = DSFASectionUpdate(extra={"key": "value"})
assert req.extra == {"key": "value"}
def test_all_optional(self):
req = DSFASectionUpdate()
assert req.content is None
assert req.extra is None
class TestDSFAApproveRequest:
def test_approved_true(self):
req = DSFAApproveRequest(approved=True, approved_by="DSB Mueller")
assert req.approved is True
assert req.approved_by == "DSB Mueller"
def test_rejected(self):
req = DSFAApproveRequest(approved=False, comments="Massnahmen unzureichend")
assert req.approved is False
assert req.comments == "Massnahmen unzureichend"
# =============================================================================
# Helper Tests — _get_tenant_id
# =============================================================================
@@ -353,94 +623,369 @@ class TestValidRiskLevels:
class TestDSFARouterConfig:
def test_router_prefix(self):
from compliance.api.dsfa_routes import router
# /v1 prefix is added when router is included in the main app
assert router.prefix == "/dsfa"
assert dsfa_router.prefix == "/dsfa"
def test_router_has_tags(self):
from compliance.api.dsfa_routes import router
assert "compliance-dsfa" in router.tags
assert "compliance-dsfa" in dsfa_router.tags
def test_router_registered_in_init(self):
from compliance.api import dsfa_router
assert dsfa_router is not None
from compliance.api import dsfa_router as imported_router
assert imported_router is not None
# =============================================================================
# Stats Response Structure
# Route Integration Tests — CRUD
# =============================================================================
class TestDSFAStatsResponse:
def test_stats_keys_present(self):
"""Stats endpoint must return these keys."""
expected_keys = {
"total", "by_status", "by_risk_level",
"draft_count", "in_review_count", "approved_count", "needs_update_count"
}
# Verify by constructing the expected dict shape
stats = {
"total": 0,
"by_status": {},
"by_risk_level": {},
"draft_count": 0,
"in_review_count": 0,
"approved_count": 0,
"needs_update_count": 0,
}
assert set(stats.keys()) == expected_keys
class TestDSFARouteCRUD:
"""Integration tests using TestClient + SQLite."""
def test_stats_total_is_int(self):
stats = {"total": 5}
assert isinstance(stats["total"], int)
def test_list_dsfas_empty(self):
resp = client.get("/api/compliance/dsfa")
assert resp.status_code == 200
assert resp.json() == []
def test_stats_by_status_is_dict(self):
by_status = {"draft": 2, "approved": 1}
assert isinstance(by_status, dict)
def test_create_dsfa(self):
data = _create_dsfa_via_api(title="DSFA Videoüberwachung", risk_level="high")
assert data["title"] == "DSFA Videoüberwachung"
assert data["status"] == "draft"
assert data["risk_level"] == "high"
assert "id" in data
def test_stats_counts_are_integers(self):
counts = {"draft_count": 2, "in_review_count": 1, "approved_count": 0}
assert all(isinstance(v, int) for v in counts.values())
def test_list_dsfas_with_data(self):
_create_dsfa_via_api(title="DSFA 1")
_create_dsfa_via_api(title="DSFA 2")
resp = client.get("/api/compliance/dsfa")
assert resp.status_code == 200
items = resp.json()
assert len(items) == 2
def test_stats_zero_total_when_no_dsfas(self):
stats = {"total": 0, "draft_count": 0, "in_review_count": 0, "approved_count": 0}
assert stats["total"] == 0
def test_get_dsfa(self):
created = _create_dsfa_via_api(title="Detail-Test")
dsfa_id = created["id"]
resp = client.get(f"/api/compliance/dsfa/{dsfa_id}")
assert resp.status_code == 200
assert resp.json()["title"] == "Detail-Test"
def test_get_dsfa_not_found(self):
resp = client.get(f"/api/compliance/dsfa/{uuid.uuid4()}")
assert resp.status_code == 404
def test_update_dsfa(self):
created = _create_dsfa_via_api(title="Original")
dsfa_id = created["id"]
resp = client.put(f"/api/compliance/dsfa/{dsfa_id}", json={"title": "Updated"})
assert resp.status_code == 200
assert resp.json()["title"] == "Updated"
def test_update_dsfa_not_found(self):
resp = client.put(f"/api/compliance/dsfa/{uuid.uuid4()}", json={"title": "X"})
assert resp.status_code == 404
def test_delete_dsfa(self):
created = _create_dsfa_via_api(title="To Delete")
dsfa_id = created["id"]
resp = client.delete(f"/api/compliance/dsfa/{dsfa_id}")
assert resp.status_code == 200
assert resp.json()["success"] is True
# Verify gone
resp2 = client.get(f"/api/compliance/dsfa/{dsfa_id}")
assert resp2.status_code == 404
def test_delete_dsfa_not_found(self):
resp = client.delete(f"/api/compliance/dsfa/{uuid.uuid4()}")
assert resp.status_code == 404
def test_list_with_status_filter(self):
_create_dsfa_via_api(title="Draft One")
created2 = _create_dsfa_via_api(title="Approved One")
# Change status to approved
client.patch(
f"/api/compliance/dsfa/{created2['id']}/status",
json={"status": "approved", "approved_by": "DSB"},
)
resp = client.get("/api/compliance/dsfa?status=approved")
assert resp.status_code == 200
items = resp.json()
assert len(items) == 1
assert items[0]["status"] == "approved"
def test_create_invalid_status(self):
resp = client.post("/api/compliance/dsfa", json={"title": "Bad", "status": "invalid"})
assert resp.status_code == 422
def test_create_invalid_risk_level(self):
resp = client.post("/api/compliance/dsfa", json={"title": "Bad", "risk_level": "extreme"})
assert resp.status_code == 422
# =============================================================================
# Audit Log Entry Structure
# Route Integration Tests — Stats
# =============================================================================
class TestAuditLogEntry:
def test_audit_log_entry_keys(self):
entry = {
"id": "uuid-1",
"tenant_id": "default",
"dsfa_id": "uuid-2",
"action": "CREATE",
"changed_by": "system",
"old_values": None,
"new_values": {"title": "Test"},
"created_at": "2026-01-01T12:00:00",
}
assert "id" in entry
assert "action" in entry
assert "dsfa_id" in entry
assert "created_at" in entry
class TestDSFARouteStats:
def test_stats_empty(self):
resp = client.get("/api/compliance/dsfa/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["draft_count"] == 0
def test_audit_action_values(self):
valid_actions = {"CREATE", "UPDATE", "DELETE", "STATUS_CHANGE"}
assert "CREATE" in valid_actions
assert "DELETE" in valid_actions
assert "STATUS_CHANGE" in valid_actions
def test_stats_with_data(self):
_create_dsfa_via_api(title="DSFA A")
_create_dsfa_via_api(title="DSFA B")
resp = client.get("/api/compliance/dsfa/stats")
data = resp.json()
assert data["total"] == 2
assert data["draft_count"] == 2
def test_audit_dsfa_id_can_be_none(self):
entry = {"dsfa_id": None}
assert entry["dsfa_id"] is None
def test_audit_old_values_can_be_none(self):
entry = {"old_values": None, "new_values": {"title": "Test"}}
assert entry["old_values"] is None
assert entry["new_values"] is not None
# =============================================================================
# Route Integration Tests — Status Patch
# =============================================================================
class TestDSFARouteStatusPatch:
def test_patch_status(self):
created = _create_dsfa_via_api(title="Status Test")
resp = client.patch(
f"/api/compliance/dsfa/{created['id']}/status",
json={"status": "in-review"},
)
assert resp.status_code == 200
assert resp.json()["status"] == "in-review"
def test_patch_status_invalid(self):
created = _create_dsfa_via_api(title="Bad Status")
resp = client.patch(
f"/api/compliance/dsfa/{created['id']}/status",
json={"status": "bogus"},
)
assert resp.status_code == 422
def test_patch_status_not_found(self):
resp = client.patch(
f"/api/compliance/dsfa/{uuid.uuid4()}/status",
json={"status": "draft"},
)
assert resp.status_code == 404
# =============================================================================
# Route Integration Tests — Section Update
# =============================================================================
class TestDSFARouteSectionUpdate:
def test_update_section_1(self):
created = _create_dsfa_via_api(title="Section Test")
resp = client.put(
f"/api/compliance/dsfa/{created['id']}/sections/1",
json={"content": "Verarbeitung personenbezogener Daten"},
)
assert resp.status_code == 200
data = resp.json()
assert data["processing_description"] == "Verarbeitung personenbezogener Daten"
def test_update_section_7_conclusion(self):
created = _create_dsfa_via_api(title="Conclusion Test")
resp = client.put(
f"/api/compliance/dsfa/{created['id']}/sections/7",
json={"content": "DSFA abgeschlossen — Restrisiko akzeptabel"},
)
assert resp.status_code == 200
assert resp.json()["conclusion"] == "DSFA abgeschlossen — Restrisiko akzeptabel"
def test_update_section_progress_tracked(self):
created = _create_dsfa_via_api(title="Progress Test")
client.put(
f"/api/compliance/dsfa/{created['id']}/sections/1",
json={"content": "Test"},
)
resp = client.get(f"/api/compliance/dsfa/{created['id']}")
progress = resp.json()["section_progress"]
assert progress.get("section_1") is True
def test_update_section_invalid_number(self):
created = _create_dsfa_via_api(title="Invalid Section")
resp = client.put(
f"/api/compliance/dsfa/{created['id']}/sections/9",
json={"content": "X"},
)
assert resp.status_code == 422
def test_update_section_not_found(self):
resp = client.put(
f"/api/compliance/dsfa/{uuid.uuid4()}/sections/1",
json={"content": "X"},
)
assert resp.status_code == 404
# =============================================================================
# Route Integration Tests — Workflow (Submit + Approve)
# =============================================================================
class TestDSFARouteWorkflow:
def test_submit_for_review(self):
created = _create_dsfa_via_api(title="Workflow Test")
resp = client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "in-review"
assert data["message"] == "DSFA zur Prüfung eingereicht"
def test_submit_for_review_wrong_status(self):
created = _create_dsfa_via_api(title="Wrong Status")
# First submit
client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
# Try to submit again (already in-review)
resp = client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
assert resp.status_code == 422
def test_submit_not_found(self):
resp = client.post(f"/api/compliance/dsfa/{uuid.uuid4()}/submit-for-review")
assert resp.status_code == 404
def test_approve_dsfa(self):
created = _create_dsfa_via_api(title="Approve Test")
client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
resp = client.post(
f"/api/compliance/dsfa/{created['id']}/approve",
json={"approved": True, "approved_by": "DSB Mueller"},
)
assert resp.status_code == 200
assert resp.json()["status"] == "approved"
def test_reject_dsfa(self):
created = _create_dsfa_via_api(title="Reject Test")
client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
resp = client.post(
f"/api/compliance/dsfa/{created['id']}/approve",
json={"approved": False, "comments": "Massnahmen fehlen"},
)
assert resp.status_code == 200
assert resp.json()["status"] == "needs-update"
def test_approve_wrong_status(self):
created = _create_dsfa_via_api(title="Not In Review")
resp = client.post(
f"/api/compliance/dsfa/{created['id']}/approve",
json={"approved": True},
)
assert resp.status_code == 422
def test_approve_not_found(self):
resp = client.post(
f"/api/compliance/dsfa/{uuid.uuid4()}/approve",
json={"approved": True},
)
assert resp.status_code == 404
def test_full_workflow_draft_to_approved(self):
"""Full lifecycle: create → submit → approve."""
created = _create_dsfa_via_api(title="Full Lifecycle")
dsfa_id = created["id"]
assert created["status"] == "draft"
# Submit for review
resp1 = client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review")
assert resp1.json()["status"] == "in-review"
# Approve
resp2 = client.post(
f"/api/compliance/dsfa/{dsfa_id}/approve",
json={"approved": True, "approved_by": "CISO"},
)
assert resp2.json()["status"] == "approved"
# Verify final state
resp3 = client.get(f"/api/compliance/dsfa/{dsfa_id}")
final = resp3.json()
assert final["status"] == "approved"
assert final["approved_by"] == "CISO"
def test_reject_then_resubmit(self):
"""Lifecycle: create → submit → reject → resubmit → approve."""
created = _create_dsfa_via_api(title="Reject Resubmit")
dsfa_id = created["id"]
client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review")
client.post(
f"/api/compliance/dsfa/{dsfa_id}/approve",
json={"approved": False, "comments": "Incomplete"},
)
# Status should be needs-update → can resubmit
resp = client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review")
assert resp.status_code == 200
assert resp.json()["status"] == "in-review"
# =============================================================================
# Route Integration Tests — Export
# =============================================================================
class TestDSFARouteExport:
def test_export_json(self):
created = _create_dsfa_via_api(title="Export Test")
resp = client.get(f"/api/compliance/dsfa/{created['id']}/export?format=json")
assert resp.status_code == 200
data = resp.json()
assert "exported_at" in data
assert data["dsfa"]["title"] == "Export Test"
def test_export_json_not_found(self):
resp = client.get(f"/api/compliance/dsfa/{uuid.uuid4()}/export?format=json")
assert resp.status_code == 404
def test_export_csv(self):
_create_dsfa_via_api(title="CSV DSFA 1")
_create_dsfa_via_api(title="CSV DSFA 2")
resp = client.get("/api/compliance/dsfa/export/csv")
assert resp.status_code == 200
assert "text/csv" in resp.headers.get("content-type", "")
lines = resp.text.strip().split("\n")
assert len(lines) == 3 # header + 2 rows
assert "ID" in lines[0]
assert "CSV DSFA" in lines[1] or "CSV DSFA" in lines[2]
def test_export_csv_empty(self):
resp = client.get("/api/compliance/dsfa/export/csv")
assert resp.status_code == 200
lines = resp.text.strip().split("\n")
assert len(lines) == 1 # header only
# =============================================================================
# Route Integration Tests — UCCA Stubs
# =============================================================================
class TestDSFARouteUCCAStubs:
def test_from_assessment_returns_501(self):
resp = client.post(f"/api/compliance/dsfa/from-assessment/{uuid.uuid4()}")
assert resp.status_code == 501
def test_by_assessment_returns_501(self):
resp = client.get(f"/api/compliance/dsfa/by-assessment/{uuid.uuid4()}")
assert resp.status_code == 501
# =============================================================================
# Route Integration Tests — Audit Log
# =============================================================================
class TestDSFARouteAuditLog:
def test_audit_log_after_create(self):
_create_dsfa_via_api(title="Audit Test")
resp = client.get("/api/compliance/dsfa/audit-log")
assert resp.status_code == 200
entries = resp.json()
assert len(entries) >= 1
assert entries[0]["action"] == "CREATE"
def test_audit_log_empty(self):
resp = client.get("/api/compliance/dsfa/audit-log")
assert resp.status_code == 200
assert resp.json() == []
# =============================================================================
@@ -556,7 +1101,6 @@ class TestAIUseCaseModules:
def test_response_ai_use_case_modules_list_from_list(self):
"""_dsfa_to_response: ai_use_case_modules list passthrough."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
modules = [{"type": "nlp", "name": "Test"}]
row = helper._make_row(ai_use_case_modules=modules)
@@ -565,7 +1109,6 @@ class TestAIUseCaseModules:
def test_response_ai_use_case_modules_from_json_string(self):
"""_dsfa_to_response: parses JSON string for ai_use_case_modules."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
modules = [{"type": "computer_vision"}]
row = helper._make_row(ai_use_case_modules=_json.dumps(modules))
@@ -574,7 +1117,6 @@ class TestAIUseCaseModules:
def test_response_ai_use_case_modules_null_becomes_empty_list(self):
"""_dsfa_to_response: None → empty list."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
row = helper._make_row(ai_use_case_modules=None)
result = _dsfa_to_response(row)
@@ -582,7 +1124,6 @@ class TestAIUseCaseModules:
def test_response_section_8_complete_flag(self):
"""_dsfa_to_response: section_8_complete bool preserved."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
row = helper._make_row(section_8_complete=True)
result = _dsfa_to_response(row)
@@ -598,7 +1139,6 @@ class TestDSFAFullSchema:
def _make_row(self, **overrides):
"""Reuse the shared helper from TestDsfaToResponse."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
return helper._make_row(**overrides)
@@ -766,3 +1306,78 @@ class TestDSFAFullSchema:
]
for key in new_keys:
assert key in result, f"Missing key in response: {key}"
# =============================================================================
# Stats Response Structure
# =============================================================================
class TestDSFAStatsResponse:
def test_stats_keys_present(self):
expected_keys = {
"total", "by_status", "by_risk_level",
"draft_count", "in_review_count", "approved_count", "needs_update_count"
}
stats = {
"total": 0,
"by_status": {},
"by_risk_level": {},
"draft_count": 0,
"in_review_count": 0,
"approved_count": 0,
"needs_update_count": 0,
}
assert set(stats.keys()) == expected_keys
def test_stats_total_is_int(self):
stats = {"total": 5}
assert isinstance(stats["total"], int)
def test_stats_by_status_is_dict(self):
by_status = {"draft": 2, "approved": 1}
assert isinstance(by_status, dict)
def test_stats_counts_are_integers(self):
counts = {"draft_count": 2, "in_review_count": 1, "approved_count": 0}
assert all(isinstance(v, int) for v in counts.values())
def test_stats_zero_total_when_no_dsfas(self):
stats = {"total": 0, "draft_count": 0, "in_review_count": 0, "approved_count": 0}
assert stats["total"] == 0
# =============================================================================
# Audit Log Entry Structure
# =============================================================================
class TestAuditLogEntry:
def test_audit_log_entry_keys(self):
entry = {
"id": "uuid-1",
"tenant_id": "default",
"dsfa_id": "uuid-2",
"action": "CREATE",
"changed_by": "system",
"old_values": None,
"new_values": {"title": "Test"},
"created_at": "2026-01-01T12:00:00",
}
assert "id" in entry
assert "action" in entry
assert "dsfa_id" in entry
assert "created_at" in entry
def test_audit_action_values(self):
valid_actions = {"CREATE", "UPDATE", "DELETE", "STATUS_CHANGE"}
assert "CREATE" in valid_actions
assert "DELETE" in valid_actions
assert "STATUS_CHANGE" in valid_actions
def test_audit_dsfa_id_can_be_none(self):
entry = {"dsfa_id": None}
assert entry["dsfa_id"] is None
def test_audit_old_values_can_be_none(self):
entry = {"old_values": None, "new_values": {"title": "Test"}}
assert entry["old_values"] is None
assert entry["new_values"] is not None