All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 41s
CI/CD / test-python-backend-compliance (push) Successful in 31s
CI/CD / test-python-document-crawler (push) Successful in 21s
CI/CD / test-python-dsms-gateway (push) Successful in 16s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Successful in 4s
Tests were failing due to stale mock objects after schema extensions: - DSFA: add _mapping property to _DictRow, use proper mock instead of MagicMock - Company Profile: add 6 missing fields (project_id, offering_urls, etc.) - Legal Templates/Policy: update document type count 52→58 - VVT: add 13 missing attributes to activity mock - Legal Documents: align consent test assertions with production behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1395 lines
51 KiB
Python
1395 lines
51 KiB
Python
"""Tests for DSFA routes and schemas (dsfa_routes.py).
|
|
|
|
Includes:
|
|
- Schema/Pydantic tests (DSFACreate, DSFAUpdate, DSFAStatusUpdate)
|
|
- Helper tests (_dsfa_to_response, _get_tenant_id)
|
|
- Route integration tests (TestClient + SQLite)
|
|
"""
|
|
|
|
import pytest
|
|
import uuid
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from unittest.mock import MagicMock
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine, text, event # noqa: F401
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
# Ensure backend dir is on path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from classroom_engine.database import get_db
|
|
from compliance.api.dsfa_routes import (
|
|
DSFACreate,
|
|
DSFAUpdate,
|
|
DSFAStatusUpdate,
|
|
DSFASectionUpdate,
|
|
DSFAApproveRequest,
|
|
_dsfa_to_response,
|
|
_get_tenant_id,
|
|
DEFAULT_TENANT_ID,
|
|
VALID_STATUSES,
|
|
VALID_RISK_LEVELS,
|
|
router as dsfa_router,
|
|
)
|
|
|
|
import json as _json
|
|
|
|
|
|
# =============================================================================
|
|
# Test App + SQLite Setup
|
|
# =============================================================================
|
|
|
|
SQLALCHEMY_DATABASE_URL = "sqlite:///./test_dsfa.db"
|
|
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
|
|
_RawSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
|
|
@event.listens_for(engine, "connect")
|
|
def _register_sqlite_functions(dbapi_conn, connection_record):
|
|
"""Register PostgreSQL-compatible functions for SQLite."""
|
|
dbapi_conn.create_function("NOW", 0, lambda: datetime.utcnow().isoformat())
|
|
|
|
TENANT_ID = "default"
|
|
|
|
|
|
class _DictRow(dict):
|
|
"""Dict wrapper that mimics PostgreSQL's dict-like row access for SQLite.
|
|
|
|
Provides a ``_mapping`` property (returns self) so that production code
|
|
such as ``row._mapping["id"]`` works, and supports integer indexing via
|
|
``row[0]`` which returns the first value (used as fallback in create_dsfa).
|
|
"""
|
|
|
|
@property
|
|
def _mapping(self):
|
|
return self
|
|
|
|
def __getitem__(self, key):
|
|
if isinstance(key, int):
|
|
return list(self.values())[key]
|
|
return super().__getitem__(key)
|
|
|
|
|
|
class _DictSession:
|
|
"""Wrapper around SQLAlchemy Session that returns dict-like rows.
|
|
|
|
Production code uses row["column_name"] which works with PostgreSQL/psycopg2
|
|
but not with SQLAlchemy 2.0's Row objects on SQLite. This wrapper converts
|
|
all result rows to dicts so the raw-SQL routes work in tests.
|
|
|
|
Also rewrites CAST(:param AS jsonb) → :param for SQLite compatibility.
|
|
PostgreSQL CAST AS jsonb works, but SQLite CAST to unknown type yields 0.
|
|
"""
|
|
def __init__(self, session):
|
|
self._session = session
|
|
|
|
def execute(self, stmt, params=None):
|
|
import re
|
|
# Rewrite CAST(:param AS jsonb) → :param for SQLite
|
|
if hasattr(stmt, 'text'):
|
|
rewritten = re.sub(r'CAST\((:[\w]+)\s+AS\s+jsonb\)', r'\1', stmt.text)
|
|
if rewritten != stmt.text:
|
|
stmt = text(rewritten)
|
|
result = self._session.execute(stmt, params)
|
|
return _DictResult(result)
|
|
|
|
def flush(self):
|
|
self._session.flush()
|
|
|
|
def commit(self):
|
|
self._session.commit()
|
|
|
|
def rollback(self):
|
|
self._session.rollback()
|
|
|
|
def close(self):
|
|
self._session.close()
|
|
|
|
|
|
class _DictResult:
|
|
"""Wraps SQLAlchemy Result to return dict rows."""
|
|
def __init__(self, result):
|
|
self._result = result
|
|
try:
|
|
self._keys = list(result.keys())
|
|
self._returns_rows = True
|
|
except Exception:
|
|
self._keys = []
|
|
self._returns_rows = False
|
|
|
|
def fetchone(self):
|
|
if not self._returns_rows:
|
|
return None
|
|
row = self._result.fetchone()
|
|
if row is None:
|
|
return None
|
|
return _DictRow(zip(self._keys, row))
|
|
|
|
def fetchall(self):
|
|
if not self._returns_rows:
|
|
return []
|
|
rows = self._result.fetchall()
|
|
return [_DictRow(zip(self._keys, r)) for r in rows]
|
|
|
|
@property
|
|
def rowcount(self):
|
|
return self._result.rowcount
|
|
|
|
|
|
app = FastAPI()
|
|
app.include_router(dsfa_router, prefix="/api/compliance")
|
|
|
|
|
|
def override_get_db():
|
|
session = _RawSessionLocal()
|
|
db = _DictSession(session)
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
client = TestClient(app)
|
|
|
|
|
|
# SQL to create the DSFA tables in SQLite (simplified from PostgreSQL)
|
|
CREATE_DSFAS_TABLE = """
|
|
CREATE TABLE IF NOT EXISTS compliance_dsfas (
|
|
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
|
|
tenant_id TEXT NOT NULL DEFAULT 'default',
|
|
title TEXT NOT NULL,
|
|
description TEXT DEFAULT '',
|
|
status TEXT DEFAULT 'draft',
|
|
risk_level TEXT DEFAULT 'low',
|
|
processing_activity TEXT DEFAULT '',
|
|
data_categories TEXT DEFAULT '[]',
|
|
recipients TEXT DEFAULT '[]',
|
|
measures TEXT DEFAULT '[]',
|
|
approved_by TEXT,
|
|
approved_at TIMESTAMP,
|
|
created_by TEXT DEFAULT 'system',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
-- Section 1
|
|
processing_description TEXT,
|
|
processing_purpose TEXT,
|
|
legal_basis TEXT,
|
|
legal_basis_details TEXT,
|
|
-- Section 2
|
|
necessity_assessment TEXT,
|
|
proportionality_assessment TEXT,
|
|
data_minimization TEXT,
|
|
alternatives_considered TEXT,
|
|
retention_justification TEXT,
|
|
-- Section 3
|
|
involves_ai INTEGER DEFAULT 0,
|
|
overall_risk_level TEXT,
|
|
risk_score INTEGER DEFAULT 0,
|
|
risk_assessment TEXT,
|
|
-- Section 6
|
|
dpo_consulted INTEGER DEFAULT 0,
|
|
dpo_consulted_at TIMESTAMP,
|
|
dpo_name TEXT,
|
|
dpo_opinion TEXT,
|
|
dpo_approved INTEGER,
|
|
authority_consulted INTEGER DEFAULT 0,
|
|
authority_consulted_at TIMESTAMP,
|
|
authority_reference TEXT,
|
|
authority_decision TEXT,
|
|
-- Metadata
|
|
version INTEGER DEFAULT 1,
|
|
previous_version_id TEXT,
|
|
conclusion TEXT,
|
|
federal_state TEXT,
|
|
authority_resource_id TEXT,
|
|
submitted_for_review_at TIMESTAMP,
|
|
submitted_by TEXT,
|
|
-- JSONB arrays (stored as TEXT in SQLite)
|
|
data_subjects TEXT DEFAULT '[]',
|
|
affected_rights TEXT DEFAULT '[]',
|
|
triggered_rule_codes TEXT DEFAULT '[]',
|
|
ai_trigger_ids TEXT DEFAULT '[]',
|
|
wp248_criteria_met TEXT DEFAULT '[]',
|
|
art35_abs3_triggered TEXT DEFAULT '[]',
|
|
tom_references TEXT DEFAULT '[]',
|
|
risks TEXT DEFAULT '[]',
|
|
mitigations TEXT DEFAULT '[]',
|
|
stakeholder_consultations TEXT DEFAULT '[]',
|
|
review_triggers TEXT DEFAULT '[]',
|
|
review_comments TEXT DEFAULT '[]',
|
|
ai_use_case_modules TEXT DEFAULT '[]',
|
|
section_8_complete INTEGER DEFAULT 0,
|
|
-- JSONB objects (stored as TEXT in SQLite)
|
|
threshold_analysis TEXT DEFAULT '{}',
|
|
consultation_requirement TEXT DEFAULT '{}',
|
|
review_schedule TEXT DEFAULT '{}',
|
|
section_progress TEXT DEFAULT '{}',
|
|
metadata TEXT DEFAULT '{}'
|
|
)
|
|
"""
|
|
|
|
CREATE_AUDIT_TABLE = """
|
|
CREATE TABLE IF NOT EXISTS compliance_dsfa_audit_log (
|
|
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
|
|
tenant_id TEXT NOT NULL,
|
|
dsfa_id TEXT,
|
|
action TEXT NOT NULL,
|
|
changed_by TEXT DEFAULT 'system',
|
|
old_values TEXT,
|
|
new_values TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db():
|
|
"""Create tables before each test, drop after."""
|
|
with engine.connect() as conn:
|
|
conn.execute(text(CREATE_DSFAS_TABLE))
|
|
conn.execute(text(CREATE_AUDIT_TABLE))
|
|
conn.commit()
|
|
yield
|
|
with engine.connect() as conn:
|
|
conn.execute(text("DROP TABLE IF EXISTS compliance_dsfa_audit_log"))
|
|
conn.execute(text("DROP TABLE IF EXISTS compliance_dsfas"))
|
|
conn.commit()
|
|
|
|
|
|
def _create_dsfa_via_api(**kwargs):
|
|
"""Helper: create a DSFA via POST and return response JSON."""
|
|
payload = {"title": "Test DSFA", **kwargs}
|
|
resp = client.post("/api/compliance/dsfa", json=payload)
|
|
assert resp.status_code == 201, resp.text
|
|
return resp.json()
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — DSFACreate
|
|
# =============================================================================
|
|
|
|
class TestDSFACreate:
|
|
def test_minimal_valid(self):
|
|
req = DSFACreate(title="DSFA - Mitarbeiter-Monitoring")
|
|
assert req.title == "DSFA - Mitarbeiter-Monitoring"
|
|
assert req.status == "draft"
|
|
assert req.risk_level == "low"
|
|
assert req.description == ""
|
|
assert req.processing_activity == ""
|
|
assert req.data_categories == []
|
|
assert req.recipients == []
|
|
assert req.measures == []
|
|
assert req.created_by == "system"
|
|
|
|
def test_full_values(self):
|
|
req = DSFACreate(
|
|
title="DSFA - Video-Ueberwachung",
|
|
description="Videoueberwachung im Buero",
|
|
status="in-review",
|
|
risk_level="high",
|
|
processing_activity="Videoueberwachung zu Sicherheitszwecken",
|
|
data_categories=["Bilddaten", "Bewegungsdaten"],
|
|
recipients=["Sicherheitsdienst"],
|
|
measures=["Loeschfristen", "Hinweisschilder"],
|
|
created_by="admin",
|
|
)
|
|
assert req.title == "DSFA - Video-Ueberwachung"
|
|
assert req.status == "in-review"
|
|
assert req.risk_level == "high"
|
|
assert req.data_categories == ["Bilddaten", "Bewegungsdaten"]
|
|
assert req.recipients == ["Sicherheitsdienst"]
|
|
assert req.measures == ["Loeschfristen", "Hinweisschilder"]
|
|
assert req.created_by == "admin"
|
|
|
|
def test_draft_is_default_status(self):
|
|
req = DSFACreate(title="Test")
|
|
assert req.status == "draft"
|
|
|
|
def test_low_is_default_risk_level(self):
|
|
req = DSFACreate(title="Test")
|
|
assert req.risk_level == "low"
|
|
|
|
def test_empty_arrays_default(self):
|
|
req = DSFACreate(title="Test")
|
|
assert isinstance(req.data_categories, list)
|
|
assert isinstance(req.recipients, list)
|
|
assert isinstance(req.measures, list)
|
|
assert len(req.data_categories) == 0
|
|
|
|
def test_serialization_model_dump(self):
|
|
req = DSFACreate(title="Test", risk_level="critical")
|
|
data = req.model_dump()
|
|
assert data["title"] == "Test"
|
|
assert data["risk_level"] == "critical"
|
|
assert "status" in data
|
|
assert "data_categories" in data
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — DSFAUpdate
|
|
# =============================================================================
|
|
|
|
class TestDSFAUpdate:
|
|
def test_all_optional(self):
|
|
req = DSFAUpdate()
|
|
assert req.title is None
|
|
assert req.description is None
|
|
assert req.status is None
|
|
assert req.risk_level is None
|
|
assert req.processing_activity is None
|
|
assert req.data_categories is None
|
|
assert req.recipients is None
|
|
assert req.measures is None
|
|
assert req.approved_by is None
|
|
|
|
def test_partial_update_title_only(self):
|
|
req = DSFAUpdate(title="Neuer Titel")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data == {"title": "Neuer Titel"}
|
|
|
|
def test_partial_update_status_and_risk(self):
|
|
req = DSFAUpdate(status="approved", risk_level="medium")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["status"] == "approved"
|
|
assert data["risk_level"] == "medium"
|
|
assert "title" not in data
|
|
|
|
def test_update_arrays(self):
|
|
req = DSFAUpdate(data_categories=["Kontaktdaten"], measures=["Verschluesselung"])
|
|
assert req.data_categories == ["Kontaktdaten"]
|
|
assert req.measures == ["Verschluesselung"]
|
|
|
|
def test_exclude_none_removes_unset(self):
|
|
req = DSFAUpdate(approved_by="DSB Mueller")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data == {"approved_by": "DSB Mueller"}
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — DSFAStatusUpdate
|
|
# =============================================================================
|
|
|
|
class TestDSFAStatusUpdate:
|
|
def test_status_only(self):
|
|
req = DSFAStatusUpdate(status="approved")
|
|
assert req.status == "approved"
|
|
assert req.approved_by is None
|
|
|
|
def test_status_with_approved_by(self):
|
|
req = DSFAStatusUpdate(status="approved", approved_by="DSB Mueller")
|
|
assert req.status == "approved"
|
|
assert req.approved_by == "DSB Mueller"
|
|
|
|
def test_in_review_status(self):
|
|
req = DSFAStatusUpdate(status="in-review")
|
|
assert req.status == "in-review"
|
|
|
|
def test_needs_update_status(self):
|
|
req = DSFAStatusUpdate(status="needs-update")
|
|
assert req.status == "needs-update"
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Tests — New Schemas
|
|
# =============================================================================
|
|
|
|
class TestDSFASectionUpdate:
|
|
def test_content_only(self):
|
|
req = DSFASectionUpdate(content="Beschreibung der Verarbeitung")
|
|
assert req.content == "Beschreibung der Verarbeitung"
|
|
assert req.extra is None
|
|
|
|
def test_extra_dict(self):
|
|
req = DSFASectionUpdate(extra={"key": "value"})
|
|
assert req.extra == {"key": "value"}
|
|
|
|
def test_all_optional(self):
|
|
req = DSFASectionUpdate()
|
|
assert req.content is None
|
|
assert req.extra is None
|
|
|
|
|
|
class TestDSFAApproveRequest:
|
|
def test_approved_true(self):
|
|
req = DSFAApproveRequest(approved=True, approved_by="DSB Mueller")
|
|
assert req.approved is True
|
|
assert req.approved_by == "DSB Mueller"
|
|
|
|
def test_rejected(self):
|
|
req = DSFAApproveRequest(approved=False, comments="Massnahmen unzureichend")
|
|
assert req.approved is False
|
|
assert req.comments == "Massnahmen unzureichend"
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Tests — _get_tenant_id
|
|
# =============================================================================
|
|
|
|
class TestGetTenantId:
|
|
def test_none_returns_default(self):
|
|
assert _get_tenant_id(None) == DEFAULT_TENANT_ID
|
|
|
|
def test_empty_string_returns_empty(self):
|
|
# Empty string is falsy → returns default
|
|
assert _get_tenant_id("") == DEFAULT_TENANT_ID
|
|
|
|
def test_custom_tenant_id(self):
|
|
assert _get_tenant_id("my-tenant") == "my-tenant"
|
|
|
|
def test_default_constant_value(self):
|
|
assert DEFAULT_TENANT_ID == "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Tests — _dsfa_to_response
|
|
# =============================================================================
|
|
|
|
class TestDsfaToResponse:
|
|
def _make_row(self, **overrides):
|
|
defaults = {
|
|
# Core fields
|
|
"id": "abc123",
|
|
"tenant_id": "default",
|
|
"title": "Test DSFA",
|
|
"description": "Testbeschreibung",
|
|
"status": "draft",
|
|
"risk_level": "low",
|
|
"processing_activity": "Test-Verarbeitung",
|
|
"data_categories": ["Kontaktdaten"],
|
|
"recipients": ["HR"],
|
|
"measures": ["Verschluesselung"],
|
|
"approved_by": None,
|
|
"approved_at": None,
|
|
"created_by": "system",
|
|
"created_at": datetime(2026, 1, 1, 12, 0, 0),
|
|
"updated_at": datetime(2026, 1, 2, 12, 0, 0),
|
|
# Section 1 (Migration 030)
|
|
"processing_description": None,
|
|
"processing_purpose": None,
|
|
"legal_basis": None,
|
|
"legal_basis_details": None,
|
|
# Section 2
|
|
"necessity_assessment": None,
|
|
"proportionality_assessment": None,
|
|
"data_minimization": None,
|
|
"alternatives_considered": None,
|
|
"retention_justification": None,
|
|
# Section 3
|
|
"involves_ai": False,
|
|
"overall_risk_level": None,
|
|
"risk_score": 0,
|
|
# Section 6
|
|
"dpo_consulted": False,
|
|
"dpo_consulted_at": None,
|
|
"dpo_name": None,
|
|
"dpo_opinion": None,
|
|
"dpo_approved": None,
|
|
"authority_consulted": False,
|
|
"authority_consulted_at": None,
|
|
"authority_reference": None,
|
|
"authority_decision": None,
|
|
# Metadata
|
|
"version": 1,
|
|
"previous_version_id": None,
|
|
"conclusion": None,
|
|
"federal_state": None,
|
|
"authority_resource_id": None,
|
|
"submitted_for_review_at": None,
|
|
"submitted_by": None,
|
|
# JSONB Arrays
|
|
"data_subjects": [],
|
|
"affected_rights": [],
|
|
"triggered_rule_codes": [],
|
|
"ai_trigger_ids": [],
|
|
"wp248_criteria_met": [],
|
|
"art35_abs3_triggered": [],
|
|
"tom_references": [],
|
|
"risks": [],
|
|
"mitigations": [],
|
|
"stakeholder_consultations": [],
|
|
"review_triggers": [],
|
|
"review_comments": [],
|
|
# Section 8 (Migration 028)
|
|
"ai_use_case_modules": [],
|
|
"section_8_complete": False,
|
|
# JSONB Objects
|
|
"threshold_analysis": None,
|
|
"consultation_requirement": None,
|
|
"review_schedule": None,
|
|
"section_progress": {},
|
|
"metadata": {},
|
|
}
|
|
defaults.update(overrides)
|
|
return _DictRow(defaults)
|
|
|
|
def test_basic_fields(self):
|
|
row = self._make_row()
|
|
result = _dsfa_to_response(row)
|
|
assert result["id"] == "abc123"
|
|
assert result["title"] == "Test DSFA"
|
|
assert result["status"] == "draft"
|
|
assert result["risk_level"] == "low"
|
|
|
|
def test_dates_as_iso_strings(self):
|
|
row = self._make_row()
|
|
result = _dsfa_to_response(row)
|
|
assert result["created_at"] == "2026-01-01T12:00:00"
|
|
assert result["updated_at"] == "2026-01-02T12:00:00"
|
|
|
|
def test_approved_at_none_when_not_set(self):
|
|
row = self._make_row(approved_at=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["approved_at"] is None
|
|
|
|
def test_approved_at_iso_when_set(self):
|
|
row = self._make_row(approved_at=datetime(2026, 3, 1, 10, 0, 0))
|
|
result = _dsfa_to_response(row)
|
|
assert result["approved_at"] == "2026-03-01T10:00:00"
|
|
|
|
def test_null_description_becomes_empty_string(self):
|
|
row = self._make_row(description=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["description"] == ""
|
|
|
|
def test_json_string_data_categories_parsed(self):
|
|
import json
|
|
row = self._make_row(data_categories=json.dumps(["Kontaktdaten", "Finanzdaten"]))
|
|
result = _dsfa_to_response(row)
|
|
assert result["data_categories"] == ["Kontaktdaten", "Finanzdaten"]
|
|
|
|
def test_null_arrays_become_empty_lists(self):
|
|
row = self._make_row(data_categories=None, recipients=None, measures=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["data_categories"] == []
|
|
assert result["recipients"] == []
|
|
assert result["measures"] == []
|
|
|
|
def test_null_status_defaults_to_draft(self):
|
|
row = self._make_row(status=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["status"] == "draft"
|
|
|
|
def test_null_risk_level_defaults_to_low(self):
|
|
row = self._make_row(risk_level=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["risk_level"] == "low"
|
|
|
|
|
|
# =============================================================================
|
|
# Valid Status Values
|
|
# =============================================================================
|
|
|
|
class TestValidStatusValues:
|
|
def test_draft_is_valid(self):
|
|
assert "draft" in VALID_STATUSES
|
|
|
|
def test_in_review_is_valid(self):
|
|
assert "in-review" in VALID_STATUSES
|
|
|
|
def test_approved_is_valid(self):
|
|
assert "approved" in VALID_STATUSES
|
|
|
|
def test_needs_update_is_valid(self):
|
|
assert "needs-update" in VALID_STATUSES
|
|
|
|
def test_invalid_status_not_in_set(self):
|
|
assert "invalid_status" not in VALID_STATUSES
|
|
|
|
def test_all_four_statuses_covered(self):
|
|
assert len(VALID_STATUSES) == 4
|
|
|
|
|
|
# =============================================================================
|
|
# Valid Risk Levels
|
|
# =============================================================================
|
|
|
|
class TestValidRiskLevels:
|
|
def test_low_is_valid(self):
|
|
assert "low" in VALID_RISK_LEVELS
|
|
|
|
def test_medium_is_valid(self):
|
|
assert "medium" in VALID_RISK_LEVELS
|
|
|
|
def test_high_is_valid(self):
|
|
assert "high" in VALID_RISK_LEVELS
|
|
|
|
def test_critical_is_valid(self):
|
|
assert "critical" in VALID_RISK_LEVELS
|
|
|
|
def test_invalid_risk_not_in_set(self):
|
|
assert "extreme" not in VALID_RISK_LEVELS
|
|
|
|
def test_all_four_levels_covered(self):
|
|
assert len(VALID_RISK_LEVELS) == 4
|
|
|
|
|
|
# =============================================================================
|
|
# Router Config
|
|
# =============================================================================
|
|
|
|
class TestDSFARouterConfig:
|
|
def test_router_prefix(self):
|
|
assert dsfa_router.prefix == "/dsfa"
|
|
|
|
def test_router_has_tags(self):
|
|
assert "compliance-dsfa" in dsfa_router.tags
|
|
|
|
def test_router_registered_in_init(self):
|
|
from compliance.api.dsfa_routes import router as imported_router
|
|
assert imported_router is not None
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — CRUD
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteCRUD:
|
|
"""Integration tests using TestClient + SQLite."""
|
|
|
|
def test_list_dsfas_empty(self):
|
|
resp = client.get("/api/compliance/dsfa")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
def test_create_dsfa(self):
|
|
data = _create_dsfa_via_api(title="DSFA Videoüberwachung", risk_level="high")
|
|
assert data["title"] == "DSFA Videoüberwachung"
|
|
assert data["status"] == "draft"
|
|
assert data["risk_level"] == "high"
|
|
assert "id" in data
|
|
|
|
def test_list_dsfas_with_data(self):
|
|
_create_dsfa_via_api(title="DSFA 1")
|
|
_create_dsfa_via_api(title="DSFA 2")
|
|
resp = client.get("/api/compliance/dsfa")
|
|
assert resp.status_code == 200
|
|
items = resp.json()
|
|
assert len(items) == 2
|
|
|
|
def test_get_dsfa(self):
|
|
created = _create_dsfa_via_api(title="Detail-Test")
|
|
dsfa_id = created["id"]
|
|
resp = client.get(f"/api/compliance/dsfa/{dsfa_id}")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["title"] == "Detail-Test"
|
|
|
|
def test_get_dsfa_not_found(self):
|
|
resp = client.get(f"/api/compliance/dsfa/{uuid.uuid4()}")
|
|
assert resp.status_code == 404
|
|
|
|
def test_update_dsfa(self):
|
|
created = _create_dsfa_via_api(title="Original")
|
|
dsfa_id = created["id"]
|
|
resp = client.put(f"/api/compliance/dsfa/{dsfa_id}", json={"title": "Updated"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["title"] == "Updated"
|
|
|
|
def test_update_dsfa_not_found(self):
|
|
resp = client.put(f"/api/compliance/dsfa/{uuid.uuid4()}", json={"title": "X"})
|
|
assert resp.status_code == 404
|
|
|
|
def test_delete_dsfa(self):
|
|
created = _create_dsfa_via_api(title="To Delete")
|
|
dsfa_id = created["id"]
|
|
resp = client.delete(f"/api/compliance/dsfa/{dsfa_id}")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["success"] is True
|
|
# Verify gone
|
|
resp2 = client.get(f"/api/compliance/dsfa/{dsfa_id}")
|
|
assert resp2.status_code == 404
|
|
|
|
def test_delete_dsfa_not_found(self):
|
|
resp = client.delete(f"/api/compliance/dsfa/{uuid.uuid4()}")
|
|
assert resp.status_code == 404
|
|
|
|
def test_list_with_status_filter(self):
|
|
_create_dsfa_via_api(title="Draft One")
|
|
created2 = _create_dsfa_via_api(title="Approved One")
|
|
# Change status to approved
|
|
client.patch(
|
|
f"/api/compliance/dsfa/{created2['id']}/status",
|
|
json={"status": "approved", "approved_by": "DSB"},
|
|
)
|
|
resp = client.get("/api/compliance/dsfa?status=approved")
|
|
assert resp.status_code == 200
|
|
items = resp.json()
|
|
assert len(items) == 1
|
|
assert items[0]["status"] == "approved"
|
|
|
|
def test_create_invalid_status(self):
|
|
resp = client.post("/api/compliance/dsfa", json={"title": "Bad", "status": "invalid"})
|
|
assert resp.status_code == 422
|
|
|
|
def test_create_invalid_risk_level(self):
|
|
resp = client.post("/api/compliance/dsfa", json={"title": "Bad", "risk_level": "extreme"})
|
|
assert resp.status_code == 422
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — Stats
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteStats:
|
|
def test_stats_empty(self):
|
|
resp = client.get("/api/compliance/dsfa/stats")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 0
|
|
assert data["draft_count"] == 0
|
|
|
|
def test_stats_with_data(self):
|
|
_create_dsfa_via_api(title="DSFA A")
|
|
_create_dsfa_via_api(title="DSFA B")
|
|
resp = client.get("/api/compliance/dsfa/stats")
|
|
data = resp.json()
|
|
assert data["total"] == 2
|
|
assert data["draft_count"] == 2
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — Status Patch
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteStatusPatch:
|
|
def test_patch_status(self):
|
|
created = _create_dsfa_via_api(title="Status Test")
|
|
resp = client.patch(
|
|
f"/api/compliance/dsfa/{created['id']}/status",
|
|
json={"status": "in-review"},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "in-review"
|
|
|
|
def test_patch_status_invalid(self):
|
|
created = _create_dsfa_via_api(title="Bad Status")
|
|
resp = client.patch(
|
|
f"/api/compliance/dsfa/{created['id']}/status",
|
|
json={"status": "bogus"},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
def test_patch_status_not_found(self):
|
|
resp = client.patch(
|
|
f"/api/compliance/dsfa/{uuid.uuid4()}/status",
|
|
json={"status": "draft"},
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — Section Update
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteSectionUpdate:
|
|
def test_update_section_1(self):
|
|
created = _create_dsfa_via_api(title="Section Test")
|
|
resp = client.put(
|
|
f"/api/compliance/dsfa/{created['id']}/sections/1",
|
|
json={"content": "Verarbeitung personenbezogener Daten"},
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["processing_description"] == "Verarbeitung personenbezogener Daten"
|
|
|
|
def test_update_section_7_conclusion(self):
|
|
created = _create_dsfa_via_api(title="Conclusion Test")
|
|
resp = client.put(
|
|
f"/api/compliance/dsfa/{created['id']}/sections/7",
|
|
json={"content": "DSFA abgeschlossen — Restrisiko akzeptabel"},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["conclusion"] == "DSFA abgeschlossen — Restrisiko akzeptabel"
|
|
|
|
def test_update_section_progress_tracked(self):
|
|
created = _create_dsfa_via_api(title="Progress Test")
|
|
client.put(
|
|
f"/api/compliance/dsfa/{created['id']}/sections/1",
|
|
json={"content": "Test"},
|
|
)
|
|
resp = client.get(f"/api/compliance/dsfa/{created['id']}")
|
|
progress = resp.json()["section_progress"]
|
|
assert progress.get("section_1") is True
|
|
|
|
def test_update_section_invalid_number(self):
|
|
created = _create_dsfa_via_api(title="Invalid Section")
|
|
resp = client.put(
|
|
f"/api/compliance/dsfa/{created['id']}/sections/9",
|
|
json={"content": "X"},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
def test_update_section_not_found(self):
|
|
resp = client.put(
|
|
f"/api/compliance/dsfa/{uuid.uuid4()}/sections/1",
|
|
json={"content": "X"},
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — Workflow (Submit + Approve)
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteWorkflow:
|
|
def test_submit_for_review(self):
|
|
created = _create_dsfa_via_api(title="Workflow Test")
|
|
resp = client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["status"] == "in-review"
|
|
assert data["message"] == "DSFA zur Prüfung eingereicht"
|
|
|
|
def test_submit_for_review_wrong_status(self):
|
|
created = _create_dsfa_via_api(title="Wrong Status")
|
|
# First submit
|
|
client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
|
|
# Try to submit again (already in-review)
|
|
resp = client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
|
|
assert resp.status_code == 422
|
|
|
|
def test_submit_not_found(self):
|
|
resp = client.post(f"/api/compliance/dsfa/{uuid.uuid4()}/submit-for-review")
|
|
assert resp.status_code == 404
|
|
|
|
def test_approve_dsfa(self):
|
|
created = _create_dsfa_via_api(title="Approve Test")
|
|
client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
|
|
resp = client.post(
|
|
f"/api/compliance/dsfa/{created['id']}/approve",
|
|
json={"approved": True, "approved_by": "DSB Mueller"},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "approved"
|
|
|
|
def test_reject_dsfa(self):
|
|
created = _create_dsfa_via_api(title="Reject Test")
|
|
client.post(f"/api/compliance/dsfa/{created['id']}/submit-for-review")
|
|
resp = client.post(
|
|
f"/api/compliance/dsfa/{created['id']}/approve",
|
|
json={"approved": False, "comments": "Massnahmen fehlen"},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "needs-update"
|
|
|
|
def test_approve_wrong_status(self):
|
|
created = _create_dsfa_via_api(title="Not In Review")
|
|
resp = client.post(
|
|
f"/api/compliance/dsfa/{created['id']}/approve",
|
|
json={"approved": True},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
def test_approve_not_found(self):
|
|
resp = client.post(
|
|
f"/api/compliance/dsfa/{uuid.uuid4()}/approve",
|
|
json={"approved": True},
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
def test_full_workflow_draft_to_approved(self):
|
|
"""Full lifecycle: create → submit → approve."""
|
|
created = _create_dsfa_via_api(title="Full Lifecycle")
|
|
dsfa_id = created["id"]
|
|
assert created["status"] == "draft"
|
|
|
|
# Submit for review
|
|
resp1 = client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review")
|
|
assert resp1.json()["status"] == "in-review"
|
|
|
|
# Approve
|
|
resp2 = client.post(
|
|
f"/api/compliance/dsfa/{dsfa_id}/approve",
|
|
json={"approved": True, "approved_by": "CISO"},
|
|
)
|
|
assert resp2.json()["status"] == "approved"
|
|
|
|
# Verify final state
|
|
resp3 = client.get(f"/api/compliance/dsfa/{dsfa_id}")
|
|
final = resp3.json()
|
|
assert final["status"] == "approved"
|
|
assert final["approved_by"] == "CISO"
|
|
|
|
def test_reject_then_resubmit(self):
|
|
"""Lifecycle: create → submit → reject → resubmit → approve."""
|
|
created = _create_dsfa_via_api(title="Reject Resubmit")
|
|
dsfa_id = created["id"]
|
|
|
|
client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review")
|
|
client.post(
|
|
f"/api/compliance/dsfa/{dsfa_id}/approve",
|
|
json={"approved": False, "comments": "Incomplete"},
|
|
)
|
|
|
|
# Status should be needs-update → can resubmit
|
|
resp = client.post(f"/api/compliance/dsfa/{dsfa_id}/submit-for-review")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "in-review"
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — Export
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteExport:
|
|
def test_export_json(self):
|
|
created = _create_dsfa_via_api(title="Export Test")
|
|
resp = client.get(f"/api/compliance/dsfa/{created['id']}/export?format=json")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "exported_at" in data
|
|
assert data["dsfa"]["title"] == "Export Test"
|
|
|
|
def test_export_json_not_found(self):
|
|
resp = client.get(f"/api/compliance/dsfa/{uuid.uuid4()}/export?format=json")
|
|
assert resp.status_code == 404
|
|
|
|
def test_export_csv(self):
|
|
_create_dsfa_via_api(title="CSV DSFA 1")
|
|
_create_dsfa_via_api(title="CSV DSFA 2")
|
|
resp = client.get("/api/compliance/dsfa/export/csv")
|
|
assert resp.status_code == 200
|
|
assert "text/csv" in resp.headers.get("content-type", "")
|
|
lines = resp.text.strip().split("\n")
|
|
assert len(lines) == 3 # header + 2 rows
|
|
assert "ID" in lines[0]
|
|
assert "CSV DSFA" in lines[1] or "CSV DSFA" in lines[2]
|
|
|
|
def test_export_csv_empty(self):
|
|
resp = client.get("/api/compliance/dsfa/export/csv")
|
|
assert resp.status_code == 200
|
|
lines = resp.text.strip().split("\n")
|
|
assert len(lines) == 1 # header only
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — UCCA Stubs
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteUCCAStubs:
|
|
def test_from_assessment_returns_501(self):
|
|
resp = client.post(f"/api/compliance/dsfa/from-assessment/{uuid.uuid4()}")
|
|
assert resp.status_code == 501
|
|
|
|
def test_by_assessment_returns_501(self):
|
|
resp = client.get(f"/api/compliance/dsfa/by-assessment/{uuid.uuid4()}")
|
|
assert resp.status_code == 501
|
|
|
|
|
|
# =============================================================================
|
|
# Route Integration Tests — Audit Log
|
|
# =============================================================================
|
|
|
|
class TestDSFARouteAuditLog:
|
|
def test_audit_log_after_create(self):
|
|
_create_dsfa_via_api(title="Audit Test")
|
|
resp = client.get("/api/compliance/dsfa/audit-log")
|
|
assert resp.status_code == 200
|
|
entries = resp.json()
|
|
assert len(entries) >= 1
|
|
assert entries[0]["action"] == "CREATE"
|
|
|
|
def test_audit_log_empty(self):
|
|
resp = client.get("/api/compliance/dsfa/audit-log")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
|
|
# =============================================================================
|
|
# TestAIUseCaseModules — Section 8 KI-Anwendungsfälle (Migration 028)
|
|
# =============================================================================
|
|
|
|
class TestAIUseCaseModules:
|
|
"""Tests for ai_use_case_modules field (DSFACreate/DSFAUpdate Pydantic schemas)."""
|
|
|
|
def test_ai_use_case_modules_field_accepted_in_create(self):
|
|
req = DSFACreate(title="Test", ai_use_case_modules=[{"type": "generative_ai"}])
|
|
assert req.ai_use_case_modules == [{"type": "generative_ai"}]
|
|
|
|
def test_ai_use_case_modules_default_none_in_create(self):
|
|
req = DSFACreate(title="Test")
|
|
assert req.ai_use_case_modules is None
|
|
|
|
def test_ai_use_case_modules_field_accepted_in_update(self):
|
|
req = DSFAUpdate(ai_use_case_modules=[{"type": "computer_vision", "name": "Bilderkennung"}])
|
|
assert req.ai_use_case_modules == [{"type": "computer_vision", "name": "Bilderkennung"}]
|
|
|
|
def test_ai_use_case_modules_empty_list_accepted(self):
|
|
req = DSFAUpdate(ai_use_case_modules=[])
|
|
assert req.ai_use_case_modules == []
|
|
|
|
def test_ai_use_case_modules_multiple_modules(self):
|
|
modules = [
|
|
{"type": "generative_ai", "name": "LLM-Assistent"},
|
|
{"type": "predictive_analytics", "name": "Risikobewertung"},
|
|
]
|
|
req = DSFAUpdate(ai_use_case_modules=modules)
|
|
assert len(req.ai_use_case_modules) == 2
|
|
|
|
def test_module_generative_ai_type(self):
|
|
module = {"type": "generative_ai", "name": "Text-Generator"}
|
|
req = DSFAUpdate(ai_use_case_modules=[module])
|
|
assert req.ai_use_case_modules[0]["type"] == "generative_ai"
|
|
|
|
def test_module_art22_assessment_structure(self):
|
|
module = {
|
|
"type": "decision_support",
|
|
"art22_relevant": True,
|
|
"art22_assessment": {"automated_decision": True, "human_oversight": True},
|
|
}
|
|
req = DSFAUpdate(ai_use_case_modules=[module])
|
|
assert req.ai_use_case_modules[0]["art22_relevant"] is True
|
|
|
|
def test_module_ai_act_risk_class_values(self):
|
|
for risk_class in ["minimal", "limited", "high", "unacceptable"]:
|
|
module = {"type": "nlp", "ai_act_risk_class": risk_class}
|
|
req = DSFAUpdate(ai_use_case_modules=[module])
|
|
assert req.ai_use_case_modules[0]["ai_act_risk_class"] == risk_class
|
|
|
|
def test_module_risk_criteria_structure(self):
|
|
module = {
|
|
"type": "computer_vision",
|
|
"risk_criteria": [
|
|
{"criterion": "K1", "met": True, "justification": "Scoring vorhanden"},
|
|
{"criterion": "K3", "met": True, "justification": "Systematische Überwachung"},
|
|
],
|
|
}
|
|
req = DSFAUpdate(ai_use_case_modules=[module])
|
|
assert len(req.ai_use_case_modules[0]["risk_criteria"]) == 2
|
|
|
|
def test_module_privacy_by_design_measures(self):
|
|
module = {
|
|
"type": "recommendation",
|
|
"privacy_by_design": ["data_minimization", "pseudonymization"],
|
|
}
|
|
req = DSFAUpdate(ai_use_case_modules=[module])
|
|
assert "data_minimization" in req.ai_use_case_modules[0]["privacy_by_design"]
|
|
|
|
def test_module_review_triggers(self):
|
|
req = DSFAUpdate(review_triggers=[{"trigger": "model_update", "date": "2026-06-01"}])
|
|
assert req.review_triggers[0]["trigger"] == "model_update"
|
|
|
|
def test_section_8_complete_flag_in_create(self):
|
|
req = DSFACreate(title="Test", section_8_complete=True)
|
|
assert req.section_8_complete is True
|
|
|
|
def test_section_8_complete_flag_in_update(self):
|
|
req = DSFAUpdate(section_8_complete=True)
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["section_8_complete"] is True
|
|
|
|
def test_section_8_complete_default_none(self):
|
|
req = DSFAUpdate()
|
|
assert req.section_8_complete is None
|
|
|
|
def test_ai_use_case_modules_excluded_when_none(self):
|
|
req = DSFAUpdate(title="Test")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert "ai_use_case_modules" not in data
|
|
|
|
def test_ai_use_case_modules_included_when_set(self):
|
|
req = DSFAUpdate(ai_use_case_modules=[{"type": "nlp"}])
|
|
data = req.model_dump(exclude_none=True)
|
|
assert "ai_use_case_modules" in data
|
|
|
|
def test_module_with_all_common_fields(self):
|
|
module = {
|
|
"type": "predictive_analytics",
|
|
"name": "Fraud Detection",
|
|
"description": "Erkennung betrügerischer Aktivitäten",
|
|
"data_inputs": ["Transaktionsdaten", "Verhaltensdaten"],
|
|
"ai_act_risk_class": "high",
|
|
"art22_relevant": True,
|
|
}
|
|
req = DSFAUpdate(ai_use_case_modules=[module])
|
|
m = req.ai_use_case_modules[0]
|
|
assert m["name"] == "Fraud Detection"
|
|
assert m["ai_act_risk_class"] == "high"
|
|
|
|
def test_response_ai_use_case_modules_list_from_list(self):
|
|
"""_dsfa_to_response: ai_use_case_modules list passthrough."""
|
|
helper = TestDsfaToResponse()
|
|
modules = [{"type": "nlp", "name": "Test"}]
|
|
row = helper._make_row(ai_use_case_modules=modules)
|
|
result = _dsfa_to_response(row)
|
|
assert result["ai_use_case_modules"] == modules
|
|
|
|
def test_response_ai_use_case_modules_from_json_string(self):
|
|
"""_dsfa_to_response: parses JSON string for ai_use_case_modules."""
|
|
helper = TestDsfaToResponse()
|
|
modules = [{"type": "computer_vision"}]
|
|
row = helper._make_row(ai_use_case_modules=_json.dumps(modules))
|
|
result = _dsfa_to_response(row)
|
|
assert result["ai_use_case_modules"] == modules
|
|
|
|
def test_response_ai_use_case_modules_null_becomes_empty_list(self):
|
|
"""_dsfa_to_response: None → empty list."""
|
|
helper = TestDsfaToResponse()
|
|
row = helper._make_row(ai_use_case_modules=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["ai_use_case_modules"] == []
|
|
|
|
def test_response_section_8_complete_flag(self):
|
|
"""_dsfa_to_response: section_8_complete bool preserved."""
|
|
helper = TestDsfaToResponse()
|
|
row = helper._make_row(section_8_complete=True)
|
|
result = _dsfa_to_response(row)
|
|
assert result["section_8_complete"] is True
|
|
|
|
|
|
# =============================================================================
|
|
# TestDSFAFullSchema — Migration 030 neue Felder
|
|
# =============================================================================
|
|
|
|
class TestDSFAFullSchema:
|
|
"""Tests for all new fields added in Migration 030."""
|
|
|
|
def _make_row(self, **overrides):
|
|
"""Reuse the shared helper from TestDsfaToResponse."""
|
|
helper = TestDsfaToResponse()
|
|
return helper._make_row(**overrides)
|
|
|
|
# --- Pydantic Schema Tests ---
|
|
|
|
def test_processing_description_accepted(self):
|
|
req = DSFAUpdate(processing_description="Verarbeitung von Kundendaten zur Risikoanalyse")
|
|
assert req.processing_description == "Verarbeitung von Kundendaten zur Risikoanalyse"
|
|
|
|
def test_legal_basis_accepted(self):
|
|
req = DSFAUpdate(legal_basis="Art. 6 Abs. 1f DSGVO")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["legal_basis"] == "Art. 6 Abs. 1f DSGVO"
|
|
|
|
def test_dpo_consulted_bool(self):
|
|
req = DSFAUpdate(dpo_consulted=True, dpo_name="Dr. Müller")
|
|
assert req.dpo_consulted is True
|
|
assert req.dpo_name == "Dr. Müller"
|
|
|
|
def test_dpo_approved_bool(self):
|
|
req = DSFAUpdate(dpo_approved=True)
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["dpo_approved"] is True
|
|
|
|
def test_authority_consulted_bool(self):
|
|
req = DSFAUpdate(authority_consulted=True, authority_reference="AZ-2026-001")
|
|
assert req.authority_consulted is True
|
|
assert req.authority_reference == "AZ-2026-001"
|
|
|
|
def test_risks_jsonb_structure(self):
|
|
risks = [
|
|
{"id": "R1", "title": "Datenpanne", "likelihood": "medium", "impact": "high"},
|
|
{"id": "R2", "title": "Unbefugter Zugriff", "likelihood": "low", "impact": "critical"},
|
|
]
|
|
req = DSFAUpdate(risks=risks)
|
|
assert len(req.risks) == 2
|
|
assert req.risks[0]["title"] == "Datenpanne"
|
|
|
|
def test_mitigations_jsonb_structure(self):
|
|
mitigations = [
|
|
{"id": "M1", "measure": "Verschlüsselung", "risk_ref": "R1"},
|
|
]
|
|
req = DSFAUpdate(mitigations=mitigations)
|
|
assert req.mitigations[0]["measure"] == "Verschlüsselung"
|
|
|
|
def test_review_schedule_jsonb(self):
|
|
schedule = {"next_review": "2027-01-01", "frequency": "annual", "responsible": "DSB"}
|
|
req = DSFAUpdate(review_schedule=schedule)
|
|
assert req.review_schedule["frequency"] == "annual"
|
|
|
|
def test_section_progress_jsonb(self):
|
|
progress = {"section_1": True, "section_2": False, "section_3": True}
|
|
req = DSFAUpdate(section_progress=progress)
|
|
assert req.section_progress["section_1"] is True
|
|
|
|
def test_threshold_analysis_jsonb(self):
|
|
analysis = {"wp248_criteria_count": 3, "dsfa_required": True}
|
|
req = DSFAUpdate(threshold_analysis=analysis)
|
|
assert req.threshold_analysis["dsfa_required"] is True
|
|
|
|
def test_involves_ai_bool(self):
|
|
req = DSFAUpdate(involves_ai=True)
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["involves_ai"] is True
|
|
|
|
def test_federal_state_accepted(self):
|
|
req = DSFAUpdate(federal_state="Bayern")
|
|
data = req.model_dump(exclude_none=True)
|
|
assert data["federal_state"] == "Bayern"
|
|
|
|
def test_data_subjects_list(self):
|
|
req = DSFAUpdate(data_subjects=["Kunden", "Mitarbeiter", "Minderjährige"])
|
|
assert len(req.data_subjects) == 3
|
|
|
|
def test_wp248_criteria_met_list(self):
|
|
req = DSFAUpdate(wp248_criteria_met=["K1", "K3", "K5"])
|
|
assert "K3" in req.wp248_criteria_met
|
|
|
|
def test_conclusion_text(self):
|
|
req = DSFAUpdate(conclusion="DSFA erforderlich — hohe Risiken verbleiben nach Maßnahmen.")
|
|
assert "DSFA erforderlich" in req.conclusion
|
|
|
|
def test_all_new_fields_optional_in_update(self):
|
|
req = DSFAUpdate()
|
|
for field in [
|
|
"processing_description", "processing_purpose", "legal_basis",
|
|
"necessity_assessment", "proportionality_assessment",
|
|
"involves_ai", "dpo_consulted", "dpo_opinion", "dpo_approved",
|
|
"authority_consulted", "risks", "mitigations", "section_progress",
|
|
"threshold_analysis", "federal_state", "conclusion",
|
|
]:
|
|
assert getattr(req, field) is None, f"{field} should default to None"
|
|
|
|
# --- _dsfa_to_response Tests ---
|
|
|
|
def test_response_processing_description(self):
|
|
row = self._make_row(processing_description="Test-Beschreibung")
|
|
result = _dsfa_to_response(row)
|
|
assert result["processing_description"] == "Test-Beschreibung"
|
|
|
|
def test_response_risks_parsed_from_json_string(self):
|
|
risks = [{"id": "R1", "title": "Datenpanne"}]
|
|
row = self._make_row(risks=_json.dumps(risks))
|
|
result = _dsfa_to_response(row)
|
|
assert result["risks"] == risks
|
|
|
|
def test_response_section_progress_object(self):
|
|
progress = {"section_1": True, "section_3": False}
|
|
row = self._make_row(section_progress=progress)
|
|
result = _dsfa_to_response(row)
|
|
assert result["section_progress"]["section_1"] is True
|
|
|
|
def test_response_section_progress_from_json_string(self):
|
|
progress = {"section_2": True}
|
|
row = self._make_row(section_progress=_json.dumps(progress))
|
|
result = _dsfa_to_response(row)
|
|
assert result["section_progress"] == progress
|
|
|
|
def test_response_involves_ai_bool(self):
|
|
row = self._make_row(involves_ai=True)
|
|
result = _dsfa_to_response(row)
|
|
assert result["involves_ai"] is True
|
|
|
|
def test_response_dpo_consulted_bool(self):
|
|
row = self._make_row(dpo_consulted=True, dpo_name="Dr. Müller")
|
|
result = _dsfa_to_response(row)
|
|
assert result["dpo_consulted"] is True
|
|
assert result["dpo_name"] == "Dr. Müller"
|
|
|
|
def test_response_version_defaults_to_1(self):
|
|
row = self._make_row(version=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["version"] == 1
|
|
|
|
def test_response_null_risks_becomes_empty_list(self):
|
|
row = self._make_row(risks=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["risks"] == []
|
|
|
|
def test_response_null_section_progress_becomes_empty_dict(self):
|
|
row = self._make_row(section_progress=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["section_progress"] == {}
|
|
|
|
def test_response_threshold_analysis_null_becomes_empty_dict(self):
|
|
row = self._make_row(threshold_analysis=None)
|
|
result = _dsfa_to_response(row)
|
|
assert result["threshold_analysis"] == {}
|
|
|
|
def test_response_federal_state(self):
|
|
row = self._make_row(federal_state="NRW")
|
|
result = _dsfa_to_response(row)
|
|
assert result["federal_state"] == "NRW"
|
|
|
|
def test_response_all_new_keys_present(self):
|
|
"""All new fields must be present in response even with defaults."""
|
|
row = self._make_row()
|
|
result = _dsfa_to_response(row)
|
|
new_keys = [
|
|
"processing_description", "legal_basis", "necessity_assessment",
|
|
"involves_ai", "dpo_consulted", "authority_consulted",
|
|
"risks", "mitigations", "section_progress", "threshold_analysis",
|
|
"ai_use_case_modules", "section_8_complete", "federal_state",
|
|
"version", "conclusion",
|
|
]
|
|
for key in new_keys:
|
|
assert key in result, f"Missing key in response: {key}"
|
|
|
|
|
|
# =============================================================================
|
|
# Stats Response Structure
|
|
# =============================================================================
|
|
|
|
class TestDSFAStatsResponse:
|
|
def test_stats_keys_present(self):
|
|
expected_keys = {
|
|
"total", "by_status", "by_risk_level",
|
|
"draft_count", "in_review_count", "approved_count", "needs_update_count"
|
|
}
|
|
stats = {
|
|
"total": 0,
|
|
"by_status": {},
|
|
"by_risk_level": {},
|
|
"draft_count": 0,
|
|
"in_review_count": 0,
|
|
"approved_count": 0,
|
|
"needs_update_count": 0,
|
|
}
|
|
assert set(stats.keys()) == expected_keys
|
|
|
|
def test_stats_total_is_int(self):
|
|
stats = {"total": 5}
|
|
assert isinstance(stats["total"], int)
|
|
|
|
def test_stats_by_status_is_dict(self):
|
|
by_status = {"draft": 2, "approved": 1}
|
|
assert isinstance(by_status, dict)
|
|
|
|
def test_stats_counts_are_integers(self):
|
|
counts = {"draft_count": 2, "in_review_count": 1, "approved_count": 0}
|
|
assert all(isinstance(v, int) for v in counts.values())
|
|
|
|
def test_stats_zero_total_when_no_dsfas(self):
|
|
stats = {"total": 0, "draft_count": 0, "in_review_count": 0, "approved_count": 0}
|
|
assert stats["total"] == 0
|
|
|
|
|
|
# =============================================================================
|
|
# Audit Log Entry Structure
|
|
# =============================================================================
|
|
|
|
class TestAuditLogEntry:
|
|
def test_audit_log_entry_keys(self):
|
|
entry = {
|
|
"id": "uuid-1",
|
|
"tenant_id": "default",
|
|
"dsfa_id": "uuid-2",
|
|
"action": "CREATE",
|
|
"changed_by": "system",
|
|
"old_values": None,
|
|
"new_values": {"title": "Test"},
|
|
"created_at": "2026-01-01T12:00:00",
|
|
}
|
|
assert "id" in entry
|
|
assert "action" in entry
|
|
assert "dsfa_id" in entry
|
|
assert "created_at" in entry
|
|
|
|
def test_audit_action_values(self):
|
|
valid_actions = {"CREATE", "UPDATE", "DELETE", "STATUS_CHANGE"}
|
|
assert "CREATE" in valid_actions
|
|
assert "DELETE" in valid_actions
|
|
assert "STATUS_CHANGE" in valid_actions
|
|
|
|
def test_audit_dsfa_id_can_be_none(self):
|
|
entry = {"dsfa_id": None}
|
|
assert entry["dsfa_id"] is None
|
|
|
|
def test_audit_old_values_can_be_none(self):
|
|
entry = {"old_values": None, "new_values": {"title": "Test"}}
|
|
assert entry["old_values"] is None
|
|
assert entry["new_values"] is not None
|