feat: Obligations-Modul auf 100% — vollständige CRUD-Implementierung
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 35s
CI / test-python-backend-compliance (push) Successful in 38s
CI / test-python-document-crawler (push) Successful in 25s
CI / test-python-dsms-gateway (push) Successful in 21s
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 35s
CI / test-python-backend-compliance (push) Successful in 38s
CI / test-python-document-crawler (push) Successful in 25s
CI / test-python-dsms-gateway (push) Successful in 21s
- Backend: compliance_obligations Tabelle (Migration 013) - Backend: obligation_routes.py — GET/POST/PUT/DELETE + Stats-Endpoint - Backend: obligation_router in __init__.py registriert - Frontend: obligations/page.tsx — ObligationModal, ObligationDetail, ObligationCard, alle Buttons verdrahtet - Proxy: PATCH-Methode in compliance catch-all route ergänzt - Tests: 39/39 Obligation-Tests (Schemas, Helpers, Business Logic) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -47,7 +47,7 @@ async function proxyRequest(
|
||||
signal: AbortSignal.timeout(60000),
|
||||
}
|
||||
|
||||
if (method === 'POST' || method === 'PUT') {
|
||||
if (method === 'POST' || method === 'PUT' || method === 'PATCH') {
|
||||
const body = await request.text()
|
||||
if (body) {
|
||||
fetchOptions.body = body
|
||||
@@ -118,6 +118,14 @@ export async function PUT(
|
||||
return proxyRequest(request, path, 'PUT')
|
||||
}
|
||||
|
||||
export async function PATCH(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ path?: string[] }> }
|
||||
) {
|
||||
const { path } = await params
|
||||
return proxyRequest(request, path, 'PATCH')
|
||||
}
|
||||
|
||||
export async function DELETE(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ path?: string[] }> }
|
||||
|
||||
@@ -15,6 +15,7 @@ from .einwilligungen_routes import router as einwilligungen_router
|
||||
from .escalation_routes import router as escalation_router
|
||||
from .consent_template_routes import router as consent_template_router
|
||||
from .notfallplan_routes import router as notfallplan_router
|
||||
from .obligation_routes import router as obligation_router
|
||||
|
||||
# Include sub-routers
|
||||
router.include_router(audit_router)
|
||||
@@ -31,6 +32,7 @@ router.include_router(einwilligungen_router)
|
||||
router.include_router(escalation_router)
|
||||
router.include_router(consent_template_router)
|
||||
router.include_router(notfallplan_router)
|
||||
router.include_router(obligation_router)
|
||||
|
||||
__all__ = [
|
||||
"router",
|
||||
@@ -48,4 +50,5 @@ __all__ = [
|
||||
"escalation_router",
|
||||
"consent_template_router",
|
||||
"notfallplan_router",
|
||||
"obligation_router",
|
||||
]
|
||||
|
||||
318
backend-compliance/compliance/api/obligation_routes.py
Normal file
318
backend-compliance/compliance/api/obligation_routes.py
Normal file
@@ -0,0 +1,318 @@
|
||||
"""
|
||||
FastAPI routes for Compliance Obligations Tracking.
|
||||
|
||||
Endpoints:
|
||||
GET /obligations — list with filters (status, priority, source, limit, offset)
|
||||
POST /obligations — create obligation
|
||||
GET /obligations/stats — counts per status and priority
|
||||
GET /obligations/{id} — get single obligation
|
||||
PUT /obligations/{id} — update obligation
|
||||
PUT /obligations/{id}/status — quick status update
|
||||
DELETE /obligations/{id} — delete obligation
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Any, Dict
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, Header
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from classroom_engine.database import get_db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/obligations", tags=["obligations"])
|
||||
|
||||
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Pydantic Schemas
|
||||
# =============================================================================
|
||||
|
||||
class ObligationCreate(BaseModel):
|
||||
title: str
|
||||
description: Optional[str] = None
|
||||
source: str = "DSGVO"
|
||||
source_article: Optional[str] = None
|
||||
deadline: Optional[datetime] = None
|
||||
status: str = "pending"
|
||||
priority: str = "medium"
|
||||
responsible: Optional[str] = None
|
||||
linked_systems: Optional[List[str]] = None
|
||||
assessment_id: Optional[str] = None
|
||||
rule_code: Optional[str] = None
|
||||
notes: Optional[str] = None
|
||||
|
||||
|
||||
class ObligationUpdate(BaseModel):
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
source: Optional[str] = None
|
||||
source_article: Optional[str] = None
|
||||
deadline: Optional[datetime] = None
|
||||
status: Optional[str] = None
|
||||
priority: Optional[str] = None
|
||||
responsible: Optional[str] = None
|
||||
linked_systems: Optional[List[str]] = None
|
||||
notes: Optional[str] = None
|
||||
|
||||
|
||||
class ObligationStatusUpdate(BaseModel):
|
||||
status: str
|
||||
|
||||
|
||||
def _row_to_dict(row) -> Dict[str, Any]:
|
||||
result = dict(row._mapping)
|
||||
for key, val in result.items():
|
||||
if isinstance(val, datetime):
|
||||
result[key] = val.isoformat()
|
||||
elif hasattr(val, '__str__') and not isinstance(val, (str, int, float, bool, list, dict, type(None))):
|
||||
result[key] = str(val)
|
||||
return result
|
||||
|
||||
|
||||
def _get_tenant_id(x_tenant_id: Optional[str] = Header(None)) -> str:
|
||||
if x_tenant_id:
|
||||
try:
|
||||
UUID(x_tenant_id)
|
||||
return x_tenant_id
|
||||
except ValueError:
|
||||
pass
|
||||
return DEFAULT_TENANT_ID
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Routes
|
||||
# =============================================================================
|
||||
|
||||
@router.get("")
|
||||
async def list_obligations(
|
||||
status: Optional[str] = Query(None),
|
||||
priority: Optional[str] = Query(None),
|
||||
source: Optional[str] = Query(None),
|
||||
search: Optional[str] = Query(None),
|
||||
limit: int = Query(100, ge=1, le=500),
|
||||
offset: int = Query(0, ge=0),
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
"""List obligations with optional filters."""
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
|
||||
where_clauses = ["tenant_id = :tenant_id"]
|
||||
params: Dict[str, Any] = {"tenant_id": tenant_id, "limit": limit, "offset": offset}
|
||||
|
||||
if status:
|
||||
where_clauses.append("status = :status")
|
||||
params["status"] = status
|
||||
if priority:
|
||||
where_clauses.append("priority = :priority")
|
||||
params["priority"] = priority
|
||||
if source:
|
||||
where_clauses.append("source ILIKE :source")
|
||||
params["source"] = f"%{source}%"
|
||||
if search:
|
||||
where_clauses.append("(title ILIKE :search OR description ILIKE :search)")
|
||||
params["search"] = f"%{search}%"
|
||||
|
||||
where_sql = " AND ".join(where_clauses)
|
||||
|
||||
total_row = db.execute(
|
||||
text(f"SELECT COUNT(*) FROM compliance_obligations WHERE {where_sql}"),
|
||||
params,
|
||||
).fetchone()
|
||||
total = total_row[0] if total_row else 0
|
||||
|
||||
rows = db.execute(
|
||||
text(f"""
|
||||
SELECT * FROM compliance_obligations
|
||||
WHERE {where_sql}
|
||||
ORDER BY
|
||||
CASE priority
|
||||
WHEN 'critical' THEN 0
|
||||
WHEN 'high' THEN 1
|
||||
WHEN 'medium' THEN 2
|
||||
ELSE 3
|
||||
END,
|
||||
CASE status
|
||||
WHEN 'overdue' THEN 0
|
||||
WHEN 'in-progress' THEN 1
|
||||
WHEN 'pending' THEN 2
|
||||
ELSE 3
|
||||
END,
|
||||
created_at DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""),
|
||||
params,
|
||||
).fetchall()
|
||||
|
||||
return {
|
||||
"obligations": [_row_to_dict(r) for r in rows],
|
||||
"total": total,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/stats")
|
||||
async def get_obligation_stats(
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
"""Return obligation counts per status and priority."""
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
|
||||
rows = db.execute(text("""
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
|
||||
COUNT(*) FILTER (WHERE status = 'in-progress') AS in_progress,
|
||||
COUNT(*) FILTER (WHERE status = 'overdue') AS overdue,
|
||||
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
|
||||
COUNT(*) FILTER (WHERE priority = 'critical') AS critical,
|
||||
COUNT(*) FILTER (WHERE priority = 'high') AS high,
|
||||
COUNT(*) AS total
|
||||
FROM compliance_obligations
|
||||
WHERE tenant_id = :tenant_id
|
||||
"""), {"tenant_id": tenant_id}).fetchone()
|
||||
|
||||
if rows:
|
||||
d = dict(rows._mapping)
|
||||
return {k: (v or 0) for k, v in d.items()}
|
||||
return {"pending": 0, "in_progress": 0, "overdue": 0, "completed": 0, "critical": 0, "high": 0, "total": 0}
|
||||
|
||||
|
||||
@router.post("", status_code=201)
|
||||
async def create_obligation(
|
||||
payload: ObligationCreate,
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
"""Create a new compliance obligation."""
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
|
||||
import json
|
||||
linked_systems = json.dumps(payload.linked_systems or [])
|
||||
|
||||
row = db.execute(text("""
|
||||
INSERT INTO compliance_obligations
|
||||
(tenant_id, title, description, source, source_article, deadline,
|
||||
status, priority, responsible, linked_systems, assessment_id, rule_code, notes)
|
||||
VALUES
|
||||
(:tenant_id, :title, :description, :source, :source_article, :deadline,
|
||||
:status, :priority, :responsible, :linked_systems::jsonb, :assessment_id, :rule_code, :notes)
|
||||
RETURNING *
|
||||
"""), {
|
||||
"tenant_id": tenant_id,
|
||||
"title": payload.title,
|
||||
"description": payload.description,
|
||||
"source": payload.source,
|
||||
"source_article": payload.source_article,
|
||||
"deadline": payload.deadline,
|
||||
"status": payload.status,
|
||||
"priority": payload.priority,
|
||||
"responsible": payload.responsible,
|
||||
"linked_systems": linked_systems,
|
||||
"assessment_id": payload.assessment_id,
|
||||
"rule_code": payload.rule_code,
|
||||
"notes": payload.notes,
|
||||
}).fetchone()
|
||||
db.commit()
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
@router.get("/{obligation_id}")
|
||||
async def get_obligation(
|
||||
obligation_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
row = db.execute(text("""
|
||||
SELECT * FROM compliance_obligations
|
||||
WHERE id = :id AND tenant_id = :tenant_id
|
||||
"""), {"id": obligation_id, "tenant_id": tenant_id}).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Obligation not found")
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
@router.put("/{obligation_id}")
|
||||
async def update_obligation(
|
||||
obligation_id: str,
|
||||
payload: ObligationUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
"""Update an obligation's fields."""
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
import json
|
||||
|
||||
updates: Dict[str, Any] = {"id": obligation_id, "tenant_id": tenant_id, "updated_at": datetime.utcnow()}
|
||||
set_clauses = ["updated_at = :updated_at"]
|
||||
|
||||
for field, value in payload.model_dump(exclude_unset=True).items():
|
||||
if field == "linked_systems":
|
||||
updates["linked_systems"] = json.dumps(value or [])
|
||||
set_clauses.append("linked_systems = :linked_systems::jsonb")
|
||||
else:
|
||||
updates[field] = value
|
||||
set_clauses.append(f"{field} = :{field}")
|
||||
|
||||
if len(set_clauses) == 1:
|
||||
raise HTTPException(status_code=400, detail="No fields to update")
|
||||
|
||||
row = db.execute(text(f"""
|
||||
UPDATE compliance_obligations
|
||||
SET {', '.join(set_clauses)}
|
||||
WHERE id = :id AND tenant_id = :tenant_id
|
||||
RETURNING *
|
||||
"""), updates).fetchone()
|
||||
db.commit()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Obligation not found")
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
@router.put("/{obligation_id}/status")
|
||||
async def update_obligation_status(
|
||||
obligation_id: str,
|
||||
payload: ObligationStatusUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
"""Quick status update for an obligation."""
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
valid_statuses = {"pending", "in-progress", "completed", "overdue"}
|
||||
if payload.status not in valid_statuses:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}")
|
||||
|
||||
row = db.execute(text("""
|
||||
UPDATE compliance_obligations
|
||||
SET status = :status, updated_at = :now
|
||||
WHERE id = :id AND tenant_id = :tenant_id
|
||||
RETURNING *
|
||||
"""), {"status": payload.status, "now": datetime.utcnow(), "id": obligation_id, "tenant_id": tenant_id}).fetchone()
|
||||
db.commit()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Obligation not found")
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
@router.delete("/{obligation_id}", status_code=204)
|
||||
async def delete_obligation(
|
||||
obligation_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
x_tenant_id: Optional[str] = Header(None),
|
||||
):
|
||||
tenant_id = _get_tenant_id(x_tenant_id)
|
||||
result = db.execute(text("""
|
||||
DELETE FROM compliance_obligations
|
||||
WHERE id = :id AND tenant_id = :tenant_id
|
||||
"""), {"id": obligation_id, "tenant_id": tenant_id})
|
||||
db.commit()
|
||||
if result.rowcount == 0:
|
||||
raise HTTPException(status_code=404, detail="Obligation not found")
|
||||
31
backend-compliance/migrations/013_obligations.sql
Normal file
31
backend-compliance/migrations/013_obligations.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
-- Migration 013: Compliance Obligations Tracking
|
||||
-- Standalone obligation items (DSGVO/AI-Act Pflichten-Verwaltung)
|
||||
-- Derived from UCCA assessments or created manually
|
||||
|
||||
SET search_path TO compliance, core, public;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS compliance_obligations (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
tenant_id UUID NOT NULL DEFAULT '9282a473-5c95-4b3a-bf78-0ecc0ec71d3e',
|
||||
title TEXT NOT NULL,
|
||||
description TEXT,
|
||||
source TEXT NOT NULL DEFAULT 'DSGVO', -- 'DSGVO', 'AI Act', 'NIS2', etc.
|
||||
source_article TEXT, -- e.g. 'Art. 35', 'Art. 9'
|
||||
deadline TIMESTAMPTZ,
|
||||
status TEXT NOT NULL DEFAULT 'pending', -- pending|in-progress|completed|overdue
|
||||
priority TEXT NOT NULL DEFAULT 'medium', -- critical|high|medium|low
|
||||
responsible TEXT,
|
||||
linked_systems JSONB DEFAULT '[]'::jsonb,
|
||||
-- Link to UCCA assessment (if auto-derived)
|
||||
assessment_id UUID,
|
||||
rule_code TEXT, -- UCCA rule code if derived
|
||||
notes TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_obligations_tenant ON compliance_obligations(tenant_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_obligations_status ON compliance_obligations(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_obligations_priority ON compliance_obligations(priority);
|
||||
CREATE INDEX IF NOT EXISTS idx_obligations_deadline ON compliance_obligations(deadline) WHERE deadline IS NOT NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_obligations_created ON compliance_obligations(created_at DESC);
|
||||
325
backend-compliance/tests/test_obligation_routes.py
Normal file
325
backend-compliance/tests/test_obligation_routes.py
Normal file
@@ -0,0 +1,325 @@
|
||||
"""Tests for compliance obligation routes and schemas (obligation_routes.py)."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from datetime import datetime
|
||||
|
||||
from compliance.api.obligation_routes import (
|
||||
ObligationCreate,
|
||||
ObligationUpdate,
|
||||
ObligationStatusUpdate,
|
||||
_row_to_dict,
|
||||
_get_tenant_id,
|
||||
DEFAULT_TENANT_ID,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schema Tests — ObligationCreate
|
||||
# =============================================================================
|
||||
|
||||
class TestObligationCreate:
|
||||
def test_minimal_valid(self):
|
||||
req = ObligationCreate(title="Art. 30 VVT führen")
|
||||
assert req.title == "Art. 30 VVT führen"
|
||||
assert req.source == "DSGVO"
|
||||
assert req.status == "pending"
|
||||
assert req.priority == "medium"
|
||||
assert req.description is None
|
||||
assert req.source_article is None
|
||||
assert req.deadline is None
|
||||
assert req.responsible is None
|
||||
assert req.linked_systems is None
|
||||
assert req.assessment_id is None
|
||||
assert req.rule_code is None
|
||||
assert req.notes is None
|
||||
|
||||
def test_full_values(self):
|
||||
deadline = datetime(2026, 6, 1, 0, 0, 0)
|
||||
req = ObligationCreate(
|
||||
title="DSFA durchführen",
|
||||
description="Pflicht nach Art. 35 DSGVO",
|
||||
source="DSGVO",
|
||||
source_article="Art. 35",
|
||||
deadline=deadline,
|
||||
status="in-progress",
|
||||
priority="critical",
|
||||
responsible="Datenschutzbeauftragter",
|
||||
linked_systems=["CRM", "ERP"],
|
||||
assessment_id="abc123",
|
||||
rule_code="RULE-DSFA-001",
|
||||
notes="Frist ist bindend",
|
||||
)
|
||||
assert req.title == "DSFA durchführen"
|
||||
assert req.source_article == "Art. 35"
|
||||
assert req.priority == "critical"
|
||||
assert req.responsible == "Datenschutzbeauftragter"
|
||||
assert req.linked_systems == ["CRM", "ERP"]
|
||||
assert req.rule_code == "RULE-DSFA-001"
|
||||
|
||||
def test_ai_act_source(self):
|
||||
req = ObligationCreate(title="Risikoklasse bestimmen", source="AI Act")
|
||||
assert req.source == "AI Act"
|
||||
assert req.status == "pending"
|
||||
|
||||
def test_nis2_source(self):
|
||||
req = ObligationCreate(title="Meldepflicht einrichten", source="NIS2")
|
||||
assert req.source == "NIS2"
|
||||
|
||||
def test_serialization_excludes_none(self):
|
||||
req = ObligationCreate(title="Test", priority="high")
|
||||
data = req.model_dump(exclude_none=True)
|
||||
assert data["title"] == "Test"
|
||||
assert data["priority"] == "high"
|
||||
assert "description" not in data
|
||||
assert "deadline" not in data
|
||||
|
||||
def test_serialization_includes_set_fields(self):
|
||||
req = ObligationCreate(title="Test", status="overdue", responsible="admin")
|
||||
data = req.model_dump()
|
||||
assert data["status"] == "overdue"
|
||||
assert data["responsible"] == "admin"
|
||||
assert data["source"] == "DSGVO"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schema Tests — ObligationUpdate
|
||||
# =============================================================================
|
||||
|
||||
class TestObligationUpdate:
|
||||
def test_empty_update(self):
|
||||
req = ObligationUpdate()
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert data == {}
|
||||
|
||||
def test_partial_update_title(self):
|
||||
req = ObligationUpdate(title="Neuer Titel")
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert data == {"title": "Neuer Titel"}
|
||||
|
||||
def test_partial_update_status_priority(self):
|
||||
req = ObligationUpdate(status="completed", priority="low")
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert data["status"] == "completed"
|
||||
assert data["priority"] == "low"
|
||||
assert "title" not in data
|
||||
|
||||
def test_update_linked_systems(self):
|
||||
req = ObligationUpdate(linked_systems=["CRM"])
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert data["linked_systems"] == ["CRM"]
|
||||
|
||||
def test_update_clears_linked_systems(self):
|
||||
req = ObligationUpdate(linked_systems=[])
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert data["linked_systems"] == []
|
||||
|
||||
def test_update_deadline(self):
|
||||
dl = datetime(2026, 12, 31)
|
||||
req = ObligationUpdate(deadline=dl)
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert data["deadline"] == dl
|
||||
|
||||
def test_full_update(self):
|
||||
req = ObligationUpdate(
|
||||
title="Updated",
|
||||
description="Neue Beschreibung",
|
||||
source="NIS2",
|
||||
source_article="Art. 21",
|
||||
status="in-progress",
|
||||
priority="high",
|
||||
responsible="CISO",
|
||||
notes="Jetzt eilt es",
|
||||
)
|
||||
data = req.model_dump(exclude_unset=True)
|
||||
assert len(data) == 8
|
||||
assert data["responsible"] == "CISO"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schema Tests — ObligationStatusUpdate
|
||||
# =============================================================================
|
||||
|
||||
class TestObligationStatusUpdate:
|
||||
def test_pending(self):
|
||||
req = ObligationStatusUpdate(status="pending")
|
||||
assert req.status == "pending"
|
||||
|
||||
def test_in_progress(self):
|
||||
req = ObligationStatusUpdate(status="in-progress")
|
||||
assert req.status == "in-progress"
|
||||
|
||||
def test_completed(self):
|
||||
req = ObligationStatusUpdate(status="completed")
|
||||
assert req.status == "completed"
|
||||
|
||||
def test_overdue(self):
|
||||
req = ObligationStatusUpdate(status="overdue")
|
||||
assert req.status == "overdue"
|
||||
|
||||
def test_serialization(self):
|
||||
req = ObligationStatusUpdate(status="completed")
|
||||
data = req.model_dump()
|
||||
assert data == {"status": "completed"}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper Tests — _row_to_dict
|
||||
# =============================================================================
|
||||
|
||||
class TestRowToDict:
|
||||
def test_basic_conversion(self):
|
||||
row = MagicMock()
|
||||
row._mapping = {"id": "abc-123", "title": "Test Pflicht", "priority": "medium"}
|
||||
result = _row_to_dict(row)
|
||||
assert result["id"] == "abc-123"
|
||||
assert result["title"] == "Test Pflicht"
|
||||
assert result["priority"] == "medium"
|
||||
|
||||
def test_datetime_serialized(self):
|
||||
ts = datetime(2026, 6, 1, 12, 0, 0)
|
||||
row = MagicMock()
|
||||
row._mapping = {"id": "abc", "created_at": ts, "updated_at": ts}
|
||||
result = _row_to_dict(row)
|
||||
assert result["created_at"] == ts.isoformat()
|
||||
assert result["updated_at"] == ts.isoformat()
|
||||
|
||||
def test_deadline_serialized(self):
|
||||
dl = datetime(2026, 12, 31, 23, 59, 59)
|
||||
row = MagicMock()
|
||||
row._mapping = {"id": "abc", "deadline": dl}
|
||||
result = _row_to_dict(row)
|
||||
assert result["deadline"] == dl.isoformat()
|
||||
|
||||
def test_none_values_preserved(self):
|
||||
row = MagicMock()
|
||||
row._mapping = {
|
||||
"id": "abc",
|
||||
"description": None,
|
||||
"deadline": None,
|
||||
"responsible": None,
|
||||
"notes": None,
|
||||
}
|
||||
result = _row_to_dict(row)
|
||||
assert result["description"] is None
|
||||
assert result["deadline"] is None
|
||||
assert result["responsible"] is None
|
||||
assert result["notes"] is None
|
||||
|
||||
def test_uuid_converted_to_string(self):
|
||||
import uuid
|
||||
uid = uuid.UUID("9282a473-5c95-4b3a-bf78-0ecc0ec71d3e")
|
||||
row = MagicMock()
|
||||
row._mapping = {"id": uid, "tenant_id": uid}
|
||||
result = _row_to_dict(row)
|
||||
assert result["id"] == str(uid)
|
||||
assert result["tenant_id"] == str(uid)
|
||||
|
||||
def test_string_fields_unchanged(self):
|
||||
row = MagicMock()
|
||||
row._mapping = {
|
||||
"title": "DSFA durchführen",
|
||||
"status": "pending",
|
||||
"source": "DSGVO",
|
||||
"source_article": "Art. 35",
|
||||
"priority": "critical",
|
||||
}
|
||||
result = _row_to_dict(row)
|
||||
assert result["title"] == "DSFA durchführen"
|
||||
assert result["status"] == "pending"
|
||||
assert result["source"] == "DSGVO"
|
||||
assert result["priority"] == "critical"
|
||||
|
||||
def test_int_and_bool_unchanged(self):
|
||||
row = MagicMock()
|
||||
row._mapping = {"count": 42, "active": True, "flag": False}
|
||||
result = _row_to_dict(row)
|
||||
assert result["count"] == 42
|
||||
assert result["active"] is True
|
||||
assert result["flag"] is False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper Tests — _get_tenant_id
|
||||
# =============================================================================
|
||||
|
||||
class TestGetTenantId:
|
||||
def test_valid_uuid_returned(self):
|
||||
tenant_id = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
||||
result = _get_tenant_id(x_tenant_id=tenant_id)
|
||||
assert result == tenant_id
|
||||
|
||||
def test_different_valid_uuid(self):
|
||||
tenant_id = "12345678-1234-1234-1234-123456789abc"
|
||||
result = _get_tenant_id(x_tenant_id=tenant_id)
|
||||
assert result == tenant_id
|
||||
|
||||
def test_none_returns_default(self):
|
||||
result = _get_tenant_id(x_tenant_id=None)
|
||||
assert result == DEFAULT_TENANT_ID
|
||||
|
||||
def test_invalid_uuid_returns_default(self):
|
||||
result = _get_tenant_id(x_tenant_id="not-a-valid-uuid")
|
||||
assert result == DEFAULT_TENANT_ID
|
||||
|
||||
def test_empty_string_returns_default(self):
|
||||
result = _get_tenant_id(x_tenant_id="")
|
||||
assert result == DEFAULT_TENANT_ID
|
||||
|
||||
def test_partial_uuid_returns_default(self):
|
||||
result = _get_tenant_id(x_tenant_id="9282a473-5c95-4b3a")
|
||||
assert result == DEFAULT_TENANT_ID
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Business Logic Tests
|
||||
# =============================================================================
|
||||
|
||||
class TestObligationBusinessLogic:
|
||||
def test_default_tenant_id_is_valid_uuid(self):
|
||||
import uuid
|
||||
# Should not raise
|
||||
parsed = uuid.UUID(DEFAULT_TENANT_ID)
|
||||
assert str(parsed) == DEFAULT_TENANT_ID
|
||||
|
||||
def test_valid_statuses(self):
|
||||
valid = {"pending", "in-progress", "completed", "overdue"}
|
||||
# Each status should be a valid string, matching what the route validates
|
||||
assert "pending" in valid
|
||||
assert "in-progress" in valid
|
||||
assert "completed" in valid
|
||||
assert "overdue" in valid
|
||||
|
||||
def test_valid_priorities(self):
|
||||
valid = {"critical", "high", "medium", "low"}
|
||||
req = ObligationCreate(title="Test", priority="critical")
|
||||
assert req.priority in valid
|
||||
req2 = ObligationCreate(title="Test", priority="low")
|
||||
assert req2.priority in valid
|
||||
|
||||
def test_priority_order_correctness(self):
|
||||
"""Ensure priority values match the SQL CASE ordering in the route."""
|
||||
priorities_ordered = ["critical", "high", "medium", "low"]
|
||||
for i, p in enumerate(priorities_ordered):
|
||||
req = ObligationCreate(title=f"Test {p}", priority=p)
|
||||
assert req.priority == p
|
||||
|
||||
def test_linked_systems_defaults_to_none(self):
|
||||
req = ObligationCreate(title="Test")
|
||||
assert req.linked_systems is None
|
||||
|
||||
def test_linked_systems_can_be_empty_list(self):
|
||||
req = ObligationCreate(title="Test", linked_systems=[])
|
||||
assert req.linked_systems == []
|
||||
|
||||
def test_linked_systems_multiple_items(self):
|
||||
systems = ["CRM", "ERP", "HR-System", "Buchhaltung"]
|
||||
req = ObligationCreate(title="Test", linked_systems=systems)
|
||||
assert len(req.linked_systems) == 4
|
||||
assert "ERP" in req.linked_systems
|
||||
|
||||
def test_source_defaults(self):
|
||||
"""Verify all common DSGVO/AI Act sources can be stored."""
|
||||
for source in ["DSGVO", "AI Act", "NIS2", "BDSG", "ISO 27001"]:
|
||||
req = ObligationCreate(title="Test", source=source)
|
||||
assert req.source == source
|
||||
Reference in New Issue
Block a user