feat: DSFA vollständiges DB-Schema + PDF-Ingest + Tests
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 36s
CI / test-python-backend-compliance (push) Successful in 37s
CI / test-python-document-crawler (push) Successful in 23s
CI / test-python-dsms-gateway (push) Successful in 22s

- Migration 030: alle fehlenden Spalten für compliance_dsfas (Sections 0-7)
  flat fields: processing_description, legal_basis, dpo_*, authority_*, ...
  JSONB arrays: risks, mitigations, wp248_criteria_met, ai_trigger_ids, ...
  JSONB objects: section_progress, threshold_analysis, review_schedule, metadata
- dsfa_routes.py: DSFACreate/DSFAUpdate erweitert (60+ neue Optional-Felder)
  _dsfa_to_response: alle neuen Felder mit safe _get() Helper
  PUT-Handler: vollständige JSONB_FIELDS-Liste (22 Felder)
- Tests: 101 (+49) Tests — TestAIUseCaseModules + TestDSFAFullSchema
- ingest-dsfa-bundesland.sh: KNOWN_PDF_URLS (15 direkte URLs), download_pdfs()
  find_pdf_for_state() Helper, PDF-first mit Text-Fallback in ingest_all()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-05 10:03:09 +01:00
parent ff765b2d71
commit 789c215e5e
4 changed files with 774 additions and 42 deletions

View File

@@ -15,6 +15,8 @@ from compliance.api.dsfa_routes import (
VALID_RISK_LEVELS,
)
import json as _json
# =============================================================================
# Schema Tests — DSFACreate
@@ -167,6 +169,7 @@ class TestGetTenantId:
class TestDsfaToResponse:
def _make_row(self, **overrides):
defaults = {
# Core fields
"id": "abc123",
"tenant_id": "default",
"title": "Test DSFA",
@@ -182,6 +185,61 @@ class TestDsfaToResponse:
"created_by": "system",
"created_at": datetime(2026, 1, 1, 12, 0, 0),
"updated_at": datetime(2026, 1, 2, 12, 0, 0),
# Section 1 (Migration 030)
"processing_description": None,
"processing_purpose": None,
"legal_basis": None,
"legal_basis_details": None,
# Section 2
"necessity_assessment": None,
"proportionality_assessment": None,
"data_minimization": None,
"alternatives_considered": None,
"retention_justification": None,
# Section 3
"involves_ai": False,
"overall_risk_level": None,
"risk_score": 0,
# Section 6
"dpo_consulted": False,
"dpo_consulted_at": None,
"dpo_name": None,
"dpo_opinion": None,
"dpo_approved": None,
"authority_consulted": False,
"authority_consulted_at": None,
"authority_reference": None,
"authority_decision": None,
# Metadata
"version": 1,
"previous_version_id": None,
"conclusion": None,
"federal_state": None,
"authority_resource_id": None,
"submitted_for_review_at": None,
"submitted_by": None,
# JSONB Arrays
"data_subjects": [],
"affected_rights": [],
"triggered_rule_codes": [],
"ai_trigger_ids": [],
"wp248_criteria_met": [],
"art35_abs3_triggered": [],
"tom_references": [],
"risks": [],
"mitigations": [],
"stakeholder_consultations": [],
"review_triggers": [],
"review_comments": [],
# Section 8 (Migration 028)
"ai_use_case_modules": [],
"section_8_complete": False,
# JSONB Objects
"threshold_analysis": None,
"consultation_requirement": None,
"review_schedule": None,
"section_progress": {},
"metadata": {},
}
defaults.update(overrides)
row = MagicMock()
@@ -296,7 +354,8 @@ class TestValidRiskLevels:
class TestDSFARouterConfig:
def test_router_prefix(self):
from compliance.api.dsfa_routes import router
assert router.prefix == "/v1/dsfa"
# /v1 prefix is added when router is included in the main app
assert router.prefix == "/dsfa"
def test_router_has_tags(self):
from compliance.api.dsfa_routes import router
@@ -382,3 +441,328 @@ class TestAuditLogEntry:
entry = {"old_values": None, "new_values": {"title": "Test"}}
assert entry["old_values"] is None
assert entry["new_values"] is not None
# =============================================================================
# TestAIUseCaseModules — Section 8 KI-Anwendungsfälle (Migration 028)
# =============================================================================
class TestAIUseCaseModules:
"""Tests for ai_use_case_modules field (DSFACreate/DSFAUpdate Pydantic schemas)."""
def test_ai_use_case_modules_field_accepted_in_create(self):
req = DSFACreate(title="Test", ai_use_case_modules=[{"type": "generative_ai"}])
assert req.ai_use_case_modules == [{"type": "generative_ai"}]
def test_ai_use_case_modules_default_none_in_create(self):
req = DSFACreate(title="Test")
assert req.ai_use_case_modules is None
def test_ai_use_case_modules_field_accepted_in_update(self):
req = DSFAUpdate(ai_use_case_modules=[{"type": "computer_vision", "name": "Bilderkennung"}])
assert req.ai_use_case_modules == [{"type": "computer_vision", "name": "Bilderkennung"}]
def test_ai_use_case_modules_empty_list_accepted(self):
req = DSFAUpdate(ai_use_case_modules=[])
assert req.ai_use_case_modules == []
def test_ai_use_case_modules_multiple_modules(self):
modules = [
{"type": "generative_ai", "name": "LLM-Assistent"},
{"type": "predictive_analytics", "name": "Risikobewertung"},
]
req = DSFAUpdate(ai_use_case_modules=modules)
assert len(req.ai_use_case_modules) == 2
def test_module_generative_ai_type(self):
module = {"type": "generative_ai", "name": "Text-Generator"}
req = DSFAUpdate(ai_use_case_modules=[module])
assert req.ai_use_case_modules[0]["type"] == "generative_ai"
def test_module_art22_assessment_structure(self):
module = {
"type": "decision_support",
"art22_relevant": True,
"art22_assessment": {"automated_decision": True, "human_oversight": True},
}
req = DSFAUpdate(ai_use_case_modules=[module])
assert req.ai_use_case_modules[0]["art22_relevant"] is True
def test_module_ai_act_risk_class_values(self):
for risk_class in ["minimal", "limited", "high", "unacceptable"]:
module = {"type": "nlp", "ai_act_risk_class": risk_class}
req = DSFAUpdate(ai_use_case_modules=[module])
assert req.ai_use_case_modules[0]["ai_act_risk_class"] == risk_class
def test_module_risk_criteria_structure(self):
module = {
"type": "computer_vision",
"risk_criteria": [
{"criterion": "K1", "met": True, "justification": "Scoring vorhanden"},
{"criterion": "K3", "met": True, "justification": "Systematische Überwachung"},
],
}
req = DSFAUpdate(ai_use_case_modules=[module])
assert len(req.ai_use_case_modules[0]["risk_criteria"]) == 2
def test_module_privacy_by_design_measures(self):
module = {
"type": "recommendation",
"privacy_by_design": ["data_minimization", "pseudonymization"],
}
req = DSFAUpdate(ai_use_case_modules=[module])
assert "data_minimization" in req.ai_use_case_modules[0]["privacy_by_design"]
def test_module_review_triggers(self):
req = DSFAUpdate(review_triggers=[{"trigger": "model_update", "date": "2026-06-01"}])
assert req.review_triggers[0]["trigger"] == "model_update"
def test_section_8_complete_flag_in_create(self):
req = DSFACreate(title="Test", section_8_complete=True)
assert req.section_8_complete is True
def test_section_8_complete_flag_in_update(self):
req = DSFAUpdate(section_8_complete=True)
data = req.model_dump(exclude_none=True)
assert data["section_8_complete"] is True
def test_section_8_complete_default_none(self):
req = DSFAUpdate()
assert req.section_8_complete is None
def test_ai_use_case_modules_excluded_when_none(self):
req = DSFAUpdate(title="Test")
data = req.model_dump(exclude_none=True)
assert "ai_use_case_modules" not in data
def test_ai_use_case_modules_included_when_set(self):
req = DSFAUpdate(ai_use_case_modules=[{"type": "nlp"}])
data = req.model_dump(exclude_none=True)
assert "ai_use_case_modules" in data
def test_module_with_all_common_fields(self):
module = {
"type": "predictive_analytics",
"name": "Fraud Detection",
"description": "Erkennung betrügerischer Aktivitäten",
"data_inputs": ["Transaktionsdaten", "Verhaltensdaten"],
"ai_act_risk_class": "high",
"art22_relevant": True,
}
req = DSFAUpdate(ai_use_case_modules=[module])
m = req.ai_use_case_modules[0]
assert m["name"] == "Fraud Detection"
assert m["ai_act_risk_class"] == "high"
def test_response_ai_use_case_modules_list_from_list(self):
"""_dsfa_to_response: ai_use_case_modules list passthrough."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
modules = [{"type": "nlp", "name": "Test"}]
row = helper._make_row(ai_use_case_modules=modules)
result = _dsfa_to_response(row)
assert result["ai_use_case_modules"] == modules
def test_response_ai_use_case_modules_from_json_string(self):
"""_dsfa_to_response: parses JSON string for ai_use_case_modules."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
modules = [{"type": "computer_vision"}]
row = helper._make_row(ai_use_case_modules=_json.dumps(modules))
result = _dsfa_to_response(row)
assert result["ai_use_case_modules"] == modules
def test_response_ai_use_case_modules_null_becomes_empty_list(self):
"""_dsfa_to_response: None → empty list."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
row = helper._make_row(ai_use_case_modules=None)
result = _dsfa_to_response(row)
assert result["ai_use_case_modules"] == []
def test_response_section_8_complete_flag(self):
"""_dsfa_to_response: section_8_complete bool preserved."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
row = helper._make_row(section_8_complete=True)
result = _dsfa_to_response(row)
assert result["section_8_complete"] is True
# =============================================================================
# TestDSFAFullSchema — Migration 030 neue Felder
# =============================================================================
class TestDSFAFullSchema:
"""Tests for all new fields added in Migration 030."""
def _make_row(self, **overrides):
"""Reuse the shared helper from TestDsfaToResponse."""
from tests.test_dsfa_routes import TestDsfaToResponse
helper = TestDsfaToResponse()
return helper._make_row(**overrides)
# --- Pydantic Schema Tests ---
def test_processing_description_accepted(self):
req = DSFAUpdate(processing_description="Verarbeitung von Kundendaten zur Risikoanalyse")
assert req.processing_description == "Verarbeitung von Kundendaten zur Risikoanalyse"
def test_legal_basis_accepted(self):
req = DSFAUpdate(legal_basis="Art. 6 Abs. 1f DSGVO")
data = req.model_dump(exclude_none=True)
assert data["legal_basis"] == "Art. 6 Abs. 1f DSGVO"
def test_dpo_consulted_bool(self):
req = DSFAUpdate(dpo_consulted=True, dpo_name="Dr. Müller")
assert req.dpo_consulted is True
assert req.dpo_name == "Dr. Müller"
def test_dpo_approved_bool(self):
req = DSFAUpdate(dpo_approved=True)
data = req.model_dump(exclude_none=True)
assert data["dpo_approved"] is True
def test_authority_consulted_bool(self):
req = DSFAUpdate(authority_consulted=True, authority_reference="AZ-2026-001")
assert req.authority_consulted is True
assert req.authority_reference == "AZ-2026-001"
def test_risks_jsonb_structure(self):
risks = [
{"id": "R1", "title": "Datenpanne", "likelihood": "medium", "impact": "high"},
{"id": "R2", "title": "Unbefugter Zugriff", "likelihood": "low", "impact": "critical"},
]
req = DSFAUpdate(risks=risks)
assert len(req.risks) == 2
assert req.risks[0]["title"] == "Datenpanne"
def test_mitigations_jsonb_structure(self):
mitigations = [
{"id": "M1", "measure": "Verschlüsselung", "risk_ref": "R1"},
]
req = DSFAUpdate(mitigations=mitigations)
assert req.mitigations[0]["measure"] == "Verschlüsselung"
def test_review_schedule_jsonb(self):
schedule = {"next_review": "2027-01-01", "frequency": "annual", "responsible": "DSB"}
req = DSFAUpdate(review_schedule=schedule)
assert req.review_schedule["frequency"] == "annual"
def test_section_progress_jsonb(self):
progress = {"section_1": True, "section_2": False, "section_3": True}
req = DSFAUpdate(section_progress=progress)
assert req.section_progress["section_1"] is True
def test_threshold_analysis_jsonb(self):
analysis = {"wp248_criteria_count": 3, "dsfa_required": True}
req = DSFAUpdate(threshold_analysis=analysis)
assert req.threshold_analysis["dsfa_required"] is True
def test_involves_ai_bool(self):
req = DSFAUpdate(involves_ai=True)
data = req.model_dump(exclude_none=True)
assert data["involves_ai"] is True
def test_federal_state_accepted(self):
req = DSFAUpdate(federal_state="Bayern")
data = req.model_dump(exclude_none=True)
assert data["federal_state"] == "Bayern"
def test_data_subjects_list(self):
req = DSFAUpdate(data_subjects=["Kunden", "Mitarbeiter", "Minderjährige"])
assert len(req.data_subjects) == 3
def test_wp248_criteria_met_list(self):
req = DSFAUpdate(wp248_criteria_met=["K1", "K3", "K5"])
assert "K3" in req.wp248_criteria_met
def test_conclusion_text(self):
req = DSFAUpdate(conclusion="DSFA erforderlich — hohe Risiken verbleiben nach Maßnahmen.")
assert "DSFA erforderlich" in req.conclusion
def test_all_new_fields_optional_in_update(self):
req = DSFAUpdate()
for field in [
"processing_description", "processing_purpose", "legal_basis",
"necessity_assessment", "proportionality_assessment",
"involves_ai", "dpo_consulted", "dpo_opinion", "dpo_approved",
"authority_consulted", "risks", "mitigations", "section_progress",
"threshold_analysis", "federal_state", "conclusion",
]:
assert getattr(req, field) is None, f"{field} should default to None"
# --- _dsfa_to_response Tests ---
def test_response_processing_description(self):
row = self._make_row(processing_description="Test-Beschreibung")
result = _dsfa_to_response(row)
assert result["processing_description"] == "Test-Beschreibung"
def test_response_risks_parsed_from_json_string(self):
risks = [{"id": "R1", "title": "Datenpanne"}]
row = self._make_row(risks=_json.dumps(risks))
result = _dsfa_to_response(row)
assert result["risks"] == risks
def test_response_section_progress_object(self):
progress = {"section_1": True, "section_3": False}
row = self._make_row(section_progress=progress)
result = _dsfa_to_response(row)
assert result["section_progress"]["section_1"] is True
def test_response_section_progress_from_json_string(self):
progress = {"section_2": True}
row = self._make_row(section_progress=_json.dumps(progress))
result = _dsfa_to_response(row)
assert result["section_progress"] == progress
def test_response_involves_ai_bool(self):
row = self._make_row(involves_ai=True)
result = _dsfa_to_response(row)
assert result["involves_ai"] is True
def test_response_dpo_consulted_bool(self):
row = self._make_row(dpo_consulted=True, dpo_name="Dr. Müller")
result = _dsfa_to_response(row)
assert result["dpo_consulted"] is True
assert result["dpo_name"] == "Dr. Müller"
def test_response_version_defaults_to_1(self):
row = self._make_row(version=None)
result = _dsfa_to_response(row)
assert result["version"] == 1
def test_response_null_risks_becomes_empty_list(self):
row = self._make_row(risks=None)
result = _dsfa_to_response(row)
assert result["risks"] == []
def test_response_null_section_progress_becomes_empty_dict(self):
row = self._make_row(section_progress=None)
result = _dsfa_to_response(row)
assert result["section_progress"] == {}
def test_response_threshold_analysis_null_becomes_empty_dict(self):
row = self._make_row(threshold_analysis=None)
result = _dsfa_to_response(row)
assert result["threshold_analysis"] == {}
def test_response_federal_state(self):
row = self._make_row(federal_state="NRW")
result = _dsfa_to_response(row)
assert result["federal_state"] == "NRW"
def test_response_all_new_keys_present(self):
"""All new fields must be present in response even with defaults."""
row = self._make_row()
result = _dsfa_to_response(row)
new_keys = [
"processing_description", "legal_basis", "necessity_assessment",
"involves_ai", "dpo_consulted", "authority_consulted",
"risks", "mitigations", "section_progress", "threshold_analysis",
"ai_use_case_modules", "section_8_complete", "federal_state",
"version", "conclusion",
]
for key in new_keys:
assert key in result, f"Missing key in response: {key}"