Files
breakpilot-compliance/backend-compliance/compliance/api/obligation_routes.py
Benjamin Admin 3467bce222
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 31s
CI / test-python-backend-compliance (push) Successful in 29s
CI / test-python-document-crawler (push) Successful in 19s
CI / test-python-dsms-gateway (push) Successful in 26s
feat(obligations): Go PARTIAL DEPRECATED, Python x-user-id, UCCA Proxy Headers, 62 Tests
- Go obligations_handlers.go: CRUD-Overlap als deprecated markiert, AI-Features (Assess/Gap/TOM/Export) bleiben aktiv
- Python obligation_routes.py: x-user-id Header + Audit-Logging an 4 Write-Endpoints
- 3 UCCA Proxy-Dateien: Default X-Tenant-ID + X-User-ID Headers
- Tests von 39 auf 62 erweitert (+23 Route-Integration-Tests mit mock_db/TestClient)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 21:31:14 +01:00

327 lines
11 KiB
Python

"""
FastAPI routes for Compliance Obligations Tracking.
Endpoints:
GET /obligations — list with filters (status, priority, source, limit, offset)
POST /obligations — create obligation
GET /obligations/stats — counts per status and priority
GET /obligations/{id} — get single obligation
PUT /obligations/{id} — update obligation
PUT /obligations/{id}/status — quick status update
DELETE /obligations/{id} — delete obligation
"""
import logging
from datetime import datetime
from typing import Optional, List, Any, Dict
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/obligations", tags=["obligations"])
DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================
# Pydantic Schemas
# =============================================================================
class ObligationCreate(BaseModel):
title: str
description: Optional[str] = None
source: str = "DSGVO"
source_article: Optional[str] = None
deadline: Optional[datetime] = None
status: str = "pending"
priority: str = "medium"
responsible: Optional[str] = None
linked_systems: Optional[List[str]] = None
assessment_id: Optional[str] = None
rule_code: Optional[str] = None
notes: Optional[str] = None
class ObligationUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
source: Optional[str] = None
source_article: Optional[str] = None
deadline: Optional[datetime] = None
status: Optional[str] = None
priority: Optional[str] = None
responsible: Optional[str] = None
linked_systems: Optional[List[str]] = None
notes: Optional[str] = None
class ObligationStatusUpdate(BaseModel):
status: str
def _row_to_dict(row) -> Dict[str, Any]:
result = dict(row._mapping)
for key, val in result.items():
if isinstance(val, datetime):
result[key] = val.isoformat()
elif hasattr(val, '__str__') and not isinstance(val, (str, int, float, bool, list, dict, type(None))):
result[key] = str(val)
return result
def _get_tenant_id(x_tenant_id: Optional[str] = Header(None)) -> str:
if x_tenant_id:
try:
UUID(x_tenant_id)
return x_tenant_id
except ValueError:
pass
return DEFAULT_TENANT_ID
# =============================================================================
# Routes
# =============================================================================
@router.get("")
async def list_obligations(
status: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
source: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
):
"""List obligations with optional filters."""
tenant_id = _get_tenant_id(x_tenant_id)
where_clauses = ["tenant_id = :tenant_id"]
params: Dict[str, Any] = {"tenant_id": tenant_id, "limit": limit, "offset": offset}
if status:
where_clauses.append("status = :status")
params["status"] = status
if priority:
where_clauses.append("priority = :priority")
params["priority"] = priority
if source:
where_clauses.append("source ILIKE :source")
params["source"] = f"%{source}%"
if search:
where_clauses.append("(title ILIKE :search OR description ILIKE :search)")
params["search"] = f"%{search}%"
where_sql = " AND ".join(where_clauses)
total_row = db.execute(
text(f"SELECT COUNT(*) FROM compliance_obligations WHERE {where_sql}"),
params,
).fetchone()
total = total_row[0] if total_row else 0
rows = db.execute(
text(f"""
SELECT * FROM compliance_obligations
WHERE {where_sql}
ORDER BY
CASE priority
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
ELSE 3
END,
CASE status
WHEN 'overdue' THEN 0
WHEN 'in-progress' THEN 1
WHEN 'pending' THEN 2
ELSE 3
END,
created_at DESC
LIMIT :limit OFFSET :offset
"""),
params,
).fetchall()
return {
"obligations": [_row_to_dict(r) for r in rows],
"total": total,
}
@router.get("/stats")
async def get_obligation_stats(
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
):
"""Return obligation counts per status and priority."""
tenant_id = _get_tenant_id(x_tenant_id)
rows = db.execute(text("""
SELECT
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
COUNT(*) FILTER (WHERE status = 'in-progress') AS in_progress,
COUNT(*) FILTER (WHERE status = 'overdue') AS overdue,
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
COUNT(*) FILTER (WHERE priority = 'critical') AS critical,
COUNT(*) FILTER (WHERE priority = 'high') AS high,
COUNT(*) AS total
FROM compliance_obligations
WHERE tenant_id = :tenant_id
"""), {"tenant_id": tenant_id}).fetchone()
if rows:
d = dict(rows._mapping)
return {k: (v or 0) for k, v in d.items()}
return {"pending": 0, "in_progress": 0, "overdue": 0, "completed": 0, "critical": 0, "high": 0, "total": 0}
@router.post("", status_code=201)
async def create_obligation(
payload: ObligationCreate,
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
x_user_id: Optional[str] = Header(None),
):
"""Create a new compliance obligation."""
tenant_id = _get_tenant_id(x_tenant_id)
logger.info("create_obligation user_id=%s tenant_id=%s title=%s", x_user_id, tenant_id, payload.title)
import json
linked_systems = json.dumps(payload.linked_systems or [])
row = db.execute(text("""
INSERT INTO compliance_obligations
(tenant_id, title, description, source, source_article, deadline,
status, priority, responsible, linked_systems, assessment_id, rule_code, notes)
VALUES
(:tenant_id, :title, :description, :source, :source_article, :deadline,
:status, :priority, :responsible, CAST(:linked_systems AS jsonb), :assessment_id, :rule_code, :notes)
RETURNING *
"""), {
"tenant_id": tenant_id,
"title": payload.title,
"description": payload.description,
"source": payload.source,
"source_article": payload.source_article,
"deadline": payload.deadline,
"status": payload.status,
"priority": payload.priority,
"responsible": payload.responsible,
"linked_systems": linked_systems,
"assessment_id": payload.assessment_id,
"rule_code": payload.rule_code,
"notes": payload.notes,
}).fetchone()
db.commit()
return _row_to_dict(row)
@router.get("/{obligation_id}")
async def get_obligation(
obligation_id: str,
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
):
tenant_id = _get_tenant_id(x_tenant_id)
row = db.execute(text("""
SELECT * FROM compliance_obligations
WHERE id = :id AND tenant_id = :tenant_id
"""), {"id": obligation_id, "tenant_id": tenant_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Obligation not found")
return _row_to_dict(row)
@router.put("/{obligation_id}")
async def update_obligation(
obligation_id: str,
payload: ObligationUpdate,
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
x_user_id: Optional[str] = Header(None),
):
"""Update an obligation's fields."""
tenant_id = _get_tenant_id(x_tenant_id)
logger.info("update_obligation user_id=%s tenant_id=%s id=%s", x_user_id, tenant_id, obligation_id)
import json
updates: Dict[str, Any] = {"id": obligation_id, "tenant_id": tenant_id, "updated_at": datetime.utcnow()}
set_clauses = ["updated_at = :updated_at"]
for field, value in payload.model_dump(exclude_unset=True).items():
if field == "linked_systems":
updates["linked_systems"] = json.dumps(value or [])
set_clauses.append("linked_systems = CAST(:linked_systems AS jsonb)")
else:
updates[field] = value
set_clauses.append(f"{field} = :{field}")
if len(set_clauses) == 1:
raise HTTPException(status_code=400, detail="No fields to update")
row = db.execute(text(f"""
UPDATE compliance_obligations
SET {', '.join(set_clauses)}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING *
"""), updates).fetchone()
db.commit()
if not row:
raise HTTPException(status_code=404, detail="Obligation not found")
return _row_to_dict(row)
@router.put("/{obligation_id}/status")
async def update_obligation_status(
obligation_id: str,
payload: ObligationStatusUpdate,
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
x_user_id: Optional[str] = Header(None),
):
"""Quick status update for an obligation."""
tenant_id = _get_tenant_id(x_tenant_id)
logger.info("update_obligation_status user_id=%s tenant_id=%s id=%s status=%s", x_user_id, tenant_id, obligation_id, payload.status)
valid_statuses = {"pending", "in-progress", "completed", "overdue"}
if payload.status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}")
row = db.execute(text("""
UPDATE compliance_obligations
SET status = :status, updated_at = :now
WHERE id = :id AND tenant_id = :tenant_id
RETURNING *
"""), {"status": payload.status, "now": datetime.utcnow(), "id": obligation_id, "tenant_id": tenant_id}).fetchone()
db.commit()
if not row:
raise HTTPException(status_code=404, detail="Obligation not found")
return _row_to_dict(row)
@router.delete("/{obligation_id}", status_code=204)
async def delete_obligation(
obligation_id: str,
db: Session = Depends(get_db),
x_tenant_id: Optional[str] = Header(None),
x_user_id: Optional[str] = Header(None),
):
tenant_id = _get_tenant_id(x_tenant_id)
logger.info("delete_obligation user_id=%s tenant_id=%s id=%s", x_user_id, tenant_id, obligation_id)
result = db.execute(text("""
DELETE FROM compliance_obligations
WHERE id = :id AND tenant_id = :tenant_id
"""), {"id": obligation_id, "tenant_id": tenant_id})
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Obligation not found")