diff --git a/backend-compliance/compliance/api/tom_routes.py b/backend-compliance/compliance/api/tom_routes.py index 4752c62..24b021f 100644 --- a/backend-compliance/compliance/api/tom_routes.py +++ b/backend-compliance/compliance/api/tom_routes.py @@ -11,276 +11,94 @@ Endpoints: POST /tom/measures/bulk — Bulk upsert (for deriveTOMs sync) GET /tom/stats — Statistics GET /tom/export — Export as CSV or JSON + GET /tom/measures/{id}/versions — List measure versions + GET /tom/measures/{id}/versions/{n} — Get specific version + +Phase 1 Step 4 refactor: handlers are thin and delegate to TOMService. """ -import csv -import io -import json -import logging -from datetime import datetime, timezone -from typing import Optional, List, Any, Dict +from typing import Any, Optional from uuid import UUID -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse -from pydantic import BaseModel -from sqlalchemy import func from sqlalchemy.orm import Session from classroom_engine.database import get_db +from compliance.api._http_errors import translate_domain_errors +from compliance.schemas.tom import ( + TOMMeasureBulkBody, + TOMMeasureBulkItem, # re-exported for backwards compat (legacy test imports) + TOMMeasureCreate, + TOMMeasureUpdate, + TOMStateBody, +) -from ..db.tom_models import TOMStateDB, TOMMeasureDB +# Keep the legacy import path ``from compliance.api.tom_routes import TOMMeasureBulkItem`` +# working — it was the public name before the Step 3 schemas split. +__all__ = [ + "router", + "TOMMeasureBulkBody", + "TOMMeasureBulkItem", + "TOMMeasureCreate", + "TOMMeasureUpdate", + "TOMStateBody", + "DEFAULT_TENANT_ID", + "_parse_dt", + "_measure_to_dict", +] +from compliance.services.tom_service import ( + DEFAULT_TENANT_ID, + TOMService, + _measure_to_dict, # re-exported for legacy test imports + _parse_dt, # re-exported for legacy test imports +) -logger = logging.getLogger(__name__) router = APIRouter(prefix="/tom", tags=["tom"]) -DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" + +def get_tom_service(db: Session = Depends(get_db)) -> TOMService: + return TOMService(db) # ============================================================================= -# Pydantic Schemas (kept close to routes like loeschfristen pattern) -# ============================================================================= - -class TOMStateBody(BaseModel): - tenant_id: Optional[str] = None - tenantId: Optional[str] = None # Accept camelCase from frontend - state: Dict[str, Any] - version: Optional[int] = None - - def get_tenant_id(self) -> str: - return self.tenant_id or self.tenantId or DEFAULT_TENANT_ID - - -class TOMMeasureCreate(BaseModel): - control_id: str - name: str - description: Optional[str] = None - category: str - type: str - applicability: str = "REQUIRED" - applicability_reason: Optional[str] = None - implementation_status: str = "NOT_IMPLEMENTED" - responsible_person: Optional[str] = None - responsible_department: Optional[str] = None - implementation_date: Optional[str] = None - review_date: Optional[str] = None - review_frequency: Optional[str] = None - priority: Optional[str] = None - complexity: Optional[str] = None - linked_evidence: Optional[List[Any]] = None - evidence_gaps: Optional[List[Any]] = None - related_controls: Optional[Dict[str, Any]] = None - verified_at: Optional[str] = None - verified_by: Optional[str] = None - effectiveness_rating: Optional[str] = None - - -class TOMMeasureUpdate(BaseModel): - name: Optional[str] = None - description: Optional[str] = None - category: Optional[str] = None - type: Optional[str] = None - applicability: Optional[str] = None - applicability_reason: Optional[str] = None - implementation_status: Optional[str] = None - responsible_person: Optional[str] = None - responsible_department: Optional[str] = None - implementation_date: Optional[str] = None - review_date: Optional[str] = None - review_frequency: Optional[str] = None - priority: Optional[str] = None - complexity: Optional[str] = None - linked_evidence: Optional[List[Any]] = None - evidence_gaps: Optional[List[Any]] = None - related_controls: Optional[Dict[str, Any]] = None - verified_at: Optional[str] = None - verified_by: Optional[str] = None - effectiveness_rating: Optional[str] = None - - -class TOMMeasureBulkItem(BaseModel): - control_id: str - name: str - description: Optional[str] = None - category: str - type: str - applicability: str = "REQUIRED" - applicability_reason: Optional[str] = None - implementation_status: str = "NOT_IMPLEMENTED" - responsible_person: Optional[str] = None - responsible_department: Optional[str] = None - implementation_date: Optional[str] = None - review_date: Optional[str] = None - review_frequency: Optional[str] = None - priority: Optional[str] = None - complexity: Optional[str] = None - linked_evidence: Optional[List[Any]] = None - evidence_gaps: Optional[List[Any]] = None - related_controls: Optional[Dict[str, Any]] = None - - -class TOMMeasureBulkBody(BaseModel): - tenant_id: Optional[str] = None - measures: List[TOMMeasureBulkItem] - - -# ============================================================================= -# Helper: parse optional datetime strings -# ============================================================================= - -def _parse_dt(val: Optional[str]) -> Optional[datetime]: - if not val: - return None - try: - return datetime.fromisoformat(val.replace("Z", "+00:00")) - except (ValueError, AttributeError): - return None - - -def _measure_to_dict(m: TOMMeasureDB) -> dict: - return { - "id": str(m.id), - "tenant_id": m.tenant_id, - "control_id": m.control_id, - "name": m.name, - "description": m.description, - "category": m.category, - "type": m.type, - "applicability": m.applicability, - "applicability_reason": m.applicability_reason, - "implementation_status": m.implementation_status, - "responsible_person": m.responsible_person, - "responsible_department": m.responsible_department, - "implementation_date": m.implementation_date.isoformat() if m.implementation_date else None, - "review_date": m.review_date.isoformat() if m.review_date else None, - "review_frequency": m.review_frequency, - "priority": m.priority, - "complexity": m.complexity, - "linked_evidence": m.linked_evidence or [], - "evidence_gaps": m.evidence_gaps or [], - "related_controls": m.related_controls or {}, - "verified_at": m.verified_at.isoformat() if m.verified_at else None, - "verified_by": m.verified_by, - "effectiveness_rating": m.effectiveness_rating, - "created_by": m.created_by, - "created_at": m.created_at.isoformat() if m.created_at else None, - "updated_at": m.updated_at.isoformat() if m.updated_at else None, - } - - -# ============================================================================= -# STATE ENDPOINTS +# STATE # ============================================================================= @router.get("/state") async def get_tom_state( tenant_id: Optional[str] = Query(None, alias="tenant_id"), tenantId: Optional[str] = Query(None), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Load TOM generator state for a tenant.""" - tid = tenant_id or tenantId or DEFAULT_TENANT_ID - - row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() - - if not row: - return { - "success": True, - "data": { - "tenantId": tid, - "state": {}, - "version": 0, - "isNew": True, - }, - } - - return { - "success": True, - "data": { - "tenantId": tid, - "state": row.state, - "version": row.version, - "lastModified": row.updated_at.isoformat() if row.updated_at else None, - }, - } + with translate_domain_errors(): + return service.get_state(tenant_id or tenantId or DEFAULT_TENANT_ID) @router.post("/state") -async def save_tom_state(body: TOMStateBody, db: Session = Depends(get_db)): +async def save_tom_state( + body: TOMStateBody, + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Save TOM generator state with optimistic locking (version check).""" - tid = body.get_tenant_id() - - existing = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() - - # Version conflict check - if body.version is not None and existing and existing.version != body.version: - raise HTTPException( - status_code=409, - detail={ - "success": False, - "error": "Version conflict. State was modified by another request.", - "code": "VERSION_CONFLICT", - }, - ) - - now = datetime.now(timezone.utc) - - if existing: - existing.state = body.state - existing.version = existing.version + 1 - existing.updated_at = now - else: - existing = TOMStateDB( - tenant_id=tid, - state=body.state, - version=1, - created_at=now, - updated_at=now, - ) - db.add(existing) - - db.commit() - db.refresh(existing) - - return { - "success": True, - "data": { - "tenantId": tid, - "state": existing.state, - "version": existing.version, - "lastModified": existing.updated_at.isoformat() if existing.updated_at else None, - }, - } + with translate_domain_errors(): + return service.save_state(body) @router.delete("/state") async def delete_tom_state( tenant_id: Optional[str] = Query(None, alias="tenant_id"), tenantId: Optional[str] = Query(None), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Clear TOM generator state for a tenant.""" - tid = tenant_id or tenantId - if not tid: - raise HTTPException(status_code=400, detail="tenant_id is required") - - row = db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() - deleted = False - if row: - db.delete(row) - db.commit() - deleted = True - - return { - "success": True, - "tenantId": tid, - "deleted": deleted, - "deletedAt": datetime.now(timezone.utc).isoformat(), - } + with translate_domain_errors(): + return service.delete_state(tenant_id or tenantId) # ============================================================================= -# MEASURES ENDPOINTS +# MEASURES # ============================================================================= @router.get("/measures") @@ -292,188 +110,51 @@ async def list_measures( search: Optional[str] = Query(None), limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """List TOM measures with optional filters.""" - tid = tenant_id or DEFAULT_TENANT_ID - q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid) - - if category: - q = q.filter(TOMMeasureDB.category == category) - if implementation_status: - q = q.filter(TOMMeasureDB.implementation_status == implementation_status) - if priority: - q = q.filter(TOMMeasureDB.priority == priority) - if search: - pattern = f"%{search}%" - q = q.filter( - (TOMMeasureDB.name.ilike(pattern)) - | (TOMMeasureDB.description.ilike(pattern)) - | (TOMMeasureDB.control_id.ilike(pattern)) + with translate_domain_errors(): + return service.list_measures( + tenant_id=tenant_id or DEFAULT_TENANT_ID, + category=category, + implementation_status=implementation_status, + priority=priority, + search=search, + limit=limit, + offset=offset, ) - total = q.count() - rows = q.order_by(TOMMeasureDB.control_id).offset(offset).limit(limit).all() - - return { - "measures": [_measure_to_dict(r) for r in rows], - "total": total, - "limit": limit, - "offset": offset, - } - @router.post("/measures", status_code=201) async def create_measure( body: TOMMeasureCreate, tenant_id: Optional[str] = Query(None), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Create a single TOM measure.""" - tid = tenant_id or DEFAULT_TENANT_ID - - # Check for duplicate control_id - existing = ( - db.query(TOMMeasureDB) - .filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == body.control_id) - .first() - ) - if existing: - raise HTTPException(status_code=409, detail=f"Measure with control_id '{body.control_id}' already exists") - - now = datetime.now(timezone.utc) - measure = TOMMeasureDB( - tenant_id=tid, - control_id=body.control_id, - name=body.name, - description=body.description, - category=body.category, - type=body.type, - applicability=body.applicability, - applicability_reason=body.applicability_reason, - implementation_status=body.implementation_status, - responsible_person=body.responsible_person, - responsible_department=body.responsible_department, - implementation_date=_parse_dt(body.implementation_date), - review_date=_parse_dt(body.review_date), - review_frequency=body.review_frequency, - priority=body.priority, - complexity=body.complexity, - linked_evidence=body.linked_evidence or [], - evidence_gaps=body.evidence_gaps or [], - related_controls=body.related_controls or {}, - verified_at=_parse_dt(body.verified_at), - verified_by=body.verified_by, - effectiveness_rating=body.effectiveness_rating, - created_at=now, - updated_at=now, - ) - db.add(measure) - db.commit() - db.refresh(measure) - - return _measure_to_dict(measure) + with translate_domain_errors(): + return service.create_measure(tenant_id or DEFAULT_TENANT_ID, body) @router.put("/measures/{measure_id}") async def update_measure( measure_id: UUID, body: TOMMeasureUpdate, - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Update a TOM measure.""" - row = db.query(TOMMeasureDB).filter(TOMMeasureDB.id == measure_id).first() - if not row: - raise HTTPException(status_code=404, detail="Measure not found") - - update_data = body.model_dump(exclude_unset=True) - for key, val in update_data.items(): - if key in ("implementation_date", "review_date", "verified_at"): - val = _parse_dt(val) - setattr(row, key, val) - - row.updated_at = datetime.now(timezone.utc) - db.commit() - db.refresh(row) - - return _measure_to_dict(row) + with translate_domain_errors(): + return service.update_measure(measure_id, body) @router.post("/measures/bulk") async def bulk_upsert_measures( body: TOMMeasureBulkBody, - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Bulk upsert measures — used by deriveTOMs sync from frontend.""" - tid = body.tenant_id or DEFAULT_TENANT_ID - now = datetime.now(timezone.utc) - - created = 0 - updated = 0 - - for item in body.measures: - existing = ( - db.query(TOMMeasureDB) - .filter(TOMMeasureDB.tenant_id == tid, TOMMeasureDB.control_id == item.control_id) - .first() - ) - - if existing: - existing.name = item.name - existing.description = item.description - existing.category = item.category - existing.type = item.type - existing.applicability = item.applicability - existing.applicability_reason = item.applicability_reason - existing.implementation_status = item.implementation_status - existing.responsible_person = item.responsible_person - existing.responsible_department = item.responsible_department - existing.implementation_date = _parse_dt(item.implementation_date) - existing.review_date = _parse_dt(item.review_date) - existing.review_frequency = item.review_frequency - existing.priority = item.priority - existing.complexity = item.complexity - existing.linked_evidence = item.linked_evidence or [] - existing.evidence_gaps = item.evidence_gaps or [] - existing.related_controls = item.related_controls or {} - existing.updated_at = now - updated += 1 - else: - measure = TOMMeasureDB( - tenant_id=tid, - control_id=item.control_id, - name=item.name, - description=item.description, - category=item.category, - type=item.type, - applicability=item.applicability, - applicability_reason=item.applicability_reason, - implementation_status=item.implementation_status, - responsible_person=item.responsible_person, - responsible_department=item.responsible_department, - implementation_date=_parse_dt(item.implementation_date), - review_date=_parse_dt(item.review_date), - review_frequency=item.review_frequency, - priority=item.priority, - complexity=item.complexity, - linked_evidence=item.linked_evidence or [], - evidence_gaps=item.evidence_gaps or [], - related_controls=item.related_controls or {}, - created_at=now, - updated_at=now, - ) - db.add(measure) - created += 1 - - db.commit() - - return { - "success": True, - "tenant_id": tid, - "created": created, - "updated": updated, - "total": created + updated, - } + with translate_domain_errors(): + return service.bulk_upsert(body) # ============================================================================= @@ -483,96 +164,22 @@ async def bulk_upsert_measures( @router.get("/stats") async def get_tom_stats( tenant_id: Optional[str] = Query(None), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> dict[str, Any]: """Return TOM statistics for a tenant.""" - tid = tenant_id or DEFAULT_TENANT_ID - - base_q = db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tid) - total = base_q.count() - - # By status - status_rows = ( - db.query(TOMMeasureDB.implementation_status, func.count(TOMMeasureDB.id)) - .filter(TOMMeasureDB.tenant_id == tid) - .group_by(TOMMeasureDB.implementation_status) - .all() - ) - by_status = {row[0]: row[1] for row in status_rows} - - # By category - cat_rows = ( - db.query(TOMMeasureDB.category, func.count(TOMMeasureDB.id)) - .filter(TOMMeasureDB.tenant_id == tid) - .group_by(TOMMeasureDB.category) - .all() - ) - by_category = {row[0]: row[1] for row in cat_rows} - - # Overdue reviews - now = datetime.now(timezone.utc) - overdue = ( - base_q.filter( - TOMMeasureDB.review_date.isnot(None), - TOMMeasureDB.review_date < now, - ) - .count() - ) - - return { - "total": total, - "by_status": by_status, - "by_category": by_category, - "overdue_review_count": overdue, - "implemented": by_status.get("IMPLEMENTED", 0), - "partial": by_status.get("PARTIAL", 0), - "not_implemented": by_status.get("NOT_IMPLEMENTED", 0), - } + with translate_domain_errors(): + return service.stats(tenant_id or DEFAULT_TENANT_ID) @router.get("/export") async def export_measures( tenant_id: Optional[str] = Query(None), format: str = Query("csv"), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> StreamingResponse: """Export TOM measures as CSV (semicolon-separated) or JSON.""" - tid = tenant_id or DEFAULT_TENANT_ID - - rows = ( - db.query(TOMMeasureDB) - .filter(TOMMeasureDB.tenant_id == tid) - .order_by(TOMMeasureDB.control_id) - .all() - ) - measures = [_measure_to_dict(r) for r in rows] - - if format == "json": - return StreamingResponse( - io.BytesIO(json.dumps(measures, ensure_ascii=False, indent=2).encode("utf-8")), - media_type="application/json", - headers={"Content-Disposition": "attachment; filename=tom_export.json"}, - ) - - # CSV (semicolon, like VVT) - output = io.StringIO() - fieldnames = [ - "control_id", "name", "description", "category", "type", - "applicability", "implementation_status", "responsible_person", - "responsible_department", "implementation_date", "review_date", - "review_frequency", "priority", "complexity", "effectiveness_rating", - ] - writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=";", extrasaction="ignore") - writer.writeheader() - for m in measures: - writer.writerow(m) - - output.seek(0) - return StreamingResponse( - io.BytesIO(output.getvalue().encode("utf-8")), - media_type="text/csv; charset=utf-8", - headers={"Content-Disposition": "attachment; filename=tom_export.csv"}, - ) + with translate_domain_errors(): + return service.export(tenant_id or DEFAULT_TENANT_ID, format) # ============================================================================= @@ -584,12 +191,13 @@ async def list_measure_versions( measure_id: str, tenant_id: Optional[str] = Query(None, alias="tenant_id"), tenantId: Optional[str] = Query(None, alias="tenantId"), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> Any: """List all versions for a TOM measure.""" - from .versioning_utils import list_versions - tid = tenant_id or tenantId or DEFAULT_TENANT_ID - return list_versions(db, "tom", measure_id, tid) + with translate_domain_errors(): + return service.list_versions( + measure_id, tenant_id or tenantId or DEFAULT_TENANT_ID + ) @router.get("/measures/{measure_id}/versions/{version_number}") @@ -598,12 +206,10 @@ async def get_measure_version( version_number: int, tenant_id: Optional[str] = Query(None, alias="tenant_id"), tenantId: Optional[str] = Query(None, alias="tenantId"), - db: Session = Depends(get_db), -): + service: TOMService = Depends(get_tom_service), +) -> Any: """Get a specific TOM measure version with full snapshot.""" - from .versioning_utils import get_version - tid = tenant_id or tenantId or DEFAULT_TENANT_ID - v = get_version(db, "tom", measure_id, version_number, tid) - if not v: - raise HTTPException(status_code=404, detail=f"Version {version_number} not found") - return v + with translate_domain_errors(): + return service.get_version( + measure_id, version_number, tenant_id or tenantId or DEFAULT_TENANT_ID + ) diff --git a/backend-compliance/compliance/schemas/tom.py b/backend-compliance/compliance/schemas/tom.py index 488b26f..d741a9f 100644 --- a/backend-compliance/compliance/schemas/tom.py +++ b/backend-compliance/compliance/schemas/tom.py @@ -21,6 +21,98 @@ from compliance.schemas.common import ( # TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO) # ============================================================================ +# ---- Request bodies (extracted from compliance/api/tom_routes.py) ----------- + +class TOMStateBody(BaseModel): + """Request body for POST /tom/state (save with optimistic locking).""" + tenant_id: Optional[str] = None + tenantId: Optional[str] = None # Accept camelCase from frontend + state: Dict[str, Any] + version: Optional[int] = None + + def get_tenant_id(self) -> str: + return self.tenant_id or self.tenantId or "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" + + +class TOMMeasureCreate(BaseModel): + """Request body for POST /tom/measures.""" + control_id: str + name: str + description: Optional[str] = None + category: str + type: str + applicability: str = "REQUIRED" + applicability_reason: Optional[str] = None + implementation_status: str = "NOT_IMPLEMENTED" + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[str] = None + review_date: Optional[str] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: Optional[List[Any]] = None + evidence_gaps: Optional[List[Any]] = None + related_controls: Optional[Dict[str, Any]] = None + verified_at: Optional[str] = None + verified_by: Optional[str] = None + effectiveness_rating: Optional[str] = None + + +class TOMMeasureUpdate(BaseModel): + """Request body for PUT /tom/measures/{id} (all fields optional).""" + name: Optional[str] = None + description: Optional[str] = None + category: Optional[str] = None + type: Optional[str] = None + applicability: Optional[str] = None + applicability_reason: Optional[str] = None + implementation_status: Optional[str] = None + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[str] = None + review_date: Optional[str] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: Optional[List[Any]] = None + evidence_gaps: Optional[List[Any]] = None + related_controls: Optional[Dict[str, Any]] = None + verified_at: Optional[str] = None + verified_by: Optional[str] = None + effectiveness_rating: Optional[str] = None + + +class TOMMeasureBulkItem(BaseModel): + """Single item in a TOMMeasureBulkBody — no verification fields.""" + control_id: str + name: str + description: Optional[str] = None + category: str + type: str + applicability: str = "REQUIRED" + applicability_reason: Optional[str] = None + implementation_status: str = "NOT_IMPLEMENTED" + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[str] = None + review_date: Optional[str] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: Optional[List[Any]] = None + evidence_gaps: Optional[List[Any]] = None + related_controls: Optional[Dict[str, Any]] = None + + +class TOMMeasureBulkBody(BaseModel): + """Request body for POST /tom/measures/bulk.""" + tenant_id: Optional[str] = None + measures: List["TOMMeasureBulkItem"] = [] + + +# ---- Response models -------------------------------------------------------- + class TOMStateResponse(BaseModel): tenant_id: str state: Dict[str, Any] = {} diff --git a/backend-compliance/compliance/services/tom_service.py b/backend-compliance/compliance/services/tom_service.py new file mode 100644 index 0000000..1e13cbb --- /dev/null +++ b/backend-compliance/compliance/services/tom_service.py @@ -0,0 +1,434 @@ +# mypy: disable-error-code="arg-type,assignment" +# SQLAlchemy 1.x Column() descriptors are Column[T] statically, T at runtime. +""" +TOM service — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO). + +Phase 1 Step 4: extracted from ``compliance.api.tom_routes``. Covers TOM +generator state persistence, the measures CRUD + bulk upsert, stats, +CSV/JSON export, and version lookups via the shared +``compliance.api.versioning_utils``. +""" + +import csv +import io +import json +from datetime import datetime, timezone +from typing import Any, Optional + +from fastapi.responses import StreamingResponse +from sqlalchemy import func +from sqlalchemy.orm import Session + +from compliance.db.tom_models import TOMMeasureDB, TOMStateDB +from compliance.domain import ConflictError, NotFoundError, ValidationError +from compliance.schemas.tom import ( + TOMMeasureBulkBody, + TOMMeasureCreate, + TOMMeasureUpdate, + TOMStateBody, +) + +DEFAULT_TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" + +_CSV_FIELDS = [ + "control_id", "name", "description", "category", "type", + "applicability", "implementation_status", "responsible_person", + "responsible_department", "implementation_date", "review_date", + "review_frequency", "priority", "complexity", "effectiveness_rating", +] + + +def _parse_dt(val: Optional[str]) -> Optional[datetime]: + """Parse an ISO-8601 string (accepting trailing 'Z') or return None.""" + if not val: + return None + try: + return datetime.fromisoformat(val.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return None + + +def _measure_to_dict(m: TOMMeasureDB) -> dict[str, Any]: + return { + "id": str(m.id), + "tenant_id": m.tenant_id, + "control_id": m.control_id, + "name": m.name, + "description": m.description, + "category": m.category, + "type": m.type, + "applicability": m.applicability, + "applicability_reason": m.applicability_reason, + "implementation_status": m.implementation_status, + "responsible_person": m.responsible_person, + "responsible_department": m.responsible_department, + "implementation_date": m.implementation_date.isoformat() if m.implementation_date else None, + "review_date": m.review_date.isoformat() if m.review_date else None, + "review_frequency": m.review_frequency, + "priority": m.priority, + "complexity": m.complexity, + "linked_evidence": m.linked_evidence or [], + "evidence_gaps": m.evidence_gaps or [], + "related_controls": m.related_controls or {}, + "verified_at": m.verified_at.isoformat() if m.verified_at else None, + "verified_by": m.verified_by, + "effectiveness_rating": m.effectiveness_rating, + "created_by": m.created_by, + "created_at": m.created_at.isoformat() if m.created_at else None, + "updated_at": m.updated_at.isoformat() if m.updated_at else None, + } + + +class TOMService: + """Business logic for TOM state, measures, stats, and export.""" + + def __init__(self, db: Session) -> None: + self.db = db + + # ------------------------------------------------------------------ + # State endpoints + # ------------------------------------------------------------------ + + def get_state(self, tenant_id: str) -> dict[str, Any]: + row = ( + self.db.query(TOMStateDB) + .filter(TOMStateDB.tenant_id == tenant_id) + .first() + ) + if not row: + return { + "success": True, + "data": { + "tenantId": tenant_id, + "state": {}, + "version": 0, + "isNew": True, + }, + } + return { + "success": True, + "data": { + "tenantId": tenant_id, + "state": row.state, + "version": row.version, + "lastModified": row.updated_at.isoformat() if row.updated_at else None, + }, + } + + def save_state(self, body: TOMStateBody) -> dict[str, Any]: + tid = body.get_tenant_id() + existing = self.db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tid).first() + + if body.version is not None and existing and existing.version != body.version: + raise ConflictError( + "Version conflict. State was modified by another request." + ) + + now = datetime.now(timezone.utc) + if existing: + existing.state = body.state + existing.version = existing.version + 1 + existing.updated_at = now + else: + existing = TOMStateDB( + tenant_id=tid, + state=body.state, + version=1, + created_at=now, + updated_at=now, + ) + self.db.add(existing) + + self.db.commit() + self.db.refresh(existing) + return { + "success": True, + "data": { + "tenantId": tid, + "state": existing.state, + "version": existing.version, + "lastModified": existing.updated_at.isoformat() if existing.updated_at else None, + }, + } + + def delete_state(self, tenant_id: Optional[str]) -> dict[str, Any]: + if not tenant_id: + raise ValidationError("tenant_id is required") + row = ( + self.db.query(TOMStateDB).filter(TOMStateDB.tenant_id == tenant_id).first() + ) + deleted = False + if row: + self.db.delete(row) + self.db.commit() + deleted = True + return { + "success": True, + "tenantId": tenant_id, + "deleted": deleted, + "deletedAt": datetime.now(timezone.utc).isoformat(), + } + + # ------------------------------------------------------------------ + # Measures CRUD + # ------------------------------------------------------------------ + + def list_measures( + self, + tenant_id: str, + category: Optional[str], + implementation_status: Optional[str], + priority: Optional[str], + search: Optional[str], + limit: int, + offset: int, + ) -> dict[str, Any]: + q = self.db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tenant_id) + if category: + q = q.filter(TOMMeasureDB.category == category) + if implementation_status: + q = q.filter(TOMMeasureDB.implementation_status == implementation_status) + if priority: + q = q.filter(TOMMeasureDB.priority == priority) + if search: + pattern = f"%{search}%" + q = q.filter( + (TOMMeasureDB.name.ilike(pattern)) + | (TOMMeasureDB.description.ilike(pattern)) + | (TOMMeasureDB.control_id.ilike(pattern)) + ) + total = q.count() + rows = q.order_by(TOMMeasureDB.control_id).offset(offset).limit(limit).all() + return { + "measures": [_measure_to_dict(r) for r in rows], + "total": total, + "limit": limit, + "offset": offset, + } + + def create_measure( + self, tenant_id: str, body: TOMMeasureCreate + ) -> dict[str, Any]: + existing = ( + self.db.query(TOMMeasureDB) + .filter( + TOMMeasureDB.tenant_id == tenant_id, + TOMMeasureDB.control_id == body.control_id, + ) + .first() + ) + if existing: + raise ConflictError( + f"Measure with control_id '{body.control_id}' already exists" + ) + + now = datetime.now(timezone.utc) + measure = TOMMeasureDB( + tenant_id=tenant_id, + control_id=body.control_id, + name=body.name, + description=body.description, + category=body.category, + type=body.type, + applicability=body.applicability, + applicability_reason=body.applicability_reason, + implementation_status=body.implementation_status, + responsible_person=body.responsible_person, + responsible_department=body.responsible_department, + implementation_date=_parse_dt(body.implementation_date), + review_date=_parse_dt(body.review_date), + review_frequency=body.review_frequency, + priority=body.priority, + complexity=body.complexity, + linked_evidence=body.linked_evidence or [], + evidence_gaps=body.evidence_gaps or [], + related_controls=body.related_controls or {}, + verified_at=_parse_dt(body.verified_at), + verified_by=body.verified_by, + effectiveness_rating=body.effectiveness_rating, + created_at=now, + updated_at=now, + ) + self.db.add(measure) + self.db.commit() + self.db.refresh(measure) + return _measure_to_dict(measure) + + def update_measure(self, measure_id: Any, body: TOMMeasureUpdate) -> dict[str, Any]: + row = self.db.query(TOMMeasureDB).filter(TOMMeasureDB.id == measure_id).first() + if not row: + raise NotFoundError("Measure not found") + + for key, val in body.model_dump(exclude_unset=True).items(): + if key in ("implementation_date", "review_date", "verified_at"): + val = _parse_dt(val) + setattr(row, key, val) + row.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(row) + return _measure_to_dict(row) + + def bulk_upsert(self, body: TOMMeasureBulkBody) -> dict[str, Any]: + tid = body.tenant_id or DEFAULT_TENANT_ID + now = datetime.now(timezone.utc) + created = 0 + updated = 0 + + for item in body.measures: + existing = ( + self.db.query(TOMMeasureDB) + .filter( + TOMMeasureDB.tenant_id == tid, + TOMMeasureDB.control_id == item.control_id, + ) + .first() + ) + if existing: + existing.name = item.name + existing.description = item.description + existing.category = item.category + existing.type = item.type + existing.applicability = item.applicability + existing.applicability_reason = item.applicability_reason + existing.implementation_status = item.implementation_status + existing.responsible_person = item.responsible_person + existing.responsible_department = item.responsible_department + existing.implementation_date = _parse_dt(item.implementation_date) + existing.review_date = _parse_dt(item.review_date) + existing.review_frequency = item.review_frequency + existing.priority = item.priority + existing.complexity = item.complexity + existing.linked_evidence = item.linked_evidence or [] + existing.evidence_gaps = item.evidence_gaps or [] + existing.related_controls = item.related_controls or {} + existing.updated_at = now + updated += 1 + else: + self.db.add( + TOMMeasureDB( + tenant_id=tid, + control_id=item.control_id, + name=item.name, + description=item.description, + category=item.category, + type=item.type, + applicability=item.applicability, + applicability_reason=item.applicability_reason, + implementation_status=item.implementation_status, + responsible_person=item.responsible_person, + responsible_department=item.responsible_department, + implementation_date=_parse_dt(item.implementation_date), + review_date=_parse_dt(item.review_date), + review_frequency=item.review_frequency, + priority=item.priority, + complexity=item.complexity, + linked_evidence=item.linked_evidence or [], + evidence_gaps=item.evidence_gaps or [], + related_controls=item.related_controls or {}, + created_at=now, + updated_at=now, + ) + ) + created += 1 + + self.db.commit() + return { + "success": True, + "tenant_id": tid, + "created": created, + "updated": updated, + "total": created + updated, + } + + # ------------------------------------------------------------------ + # Stats + export + # ------------------------------------------------------------------ + + def stats(self, tenant_id: str) -> dict[str, Any]: + base_q = self.db.query(TOMMeasureDB).filter(TOMMeasureDB.tenant_id == tenant_id) + total = base_q.count() + + status_rows = ( + self.db.query( + TOMMeasureDB.implementation_status, func.count(TOMMeasureDB.id) + ) + .filter(TOMMeasureDB.tenant_id == tenant_id) + .group_by(TOMMeasureDB.implementation_status) + .all() + ) + by_status: dict[str, int] = {row[0]: row[1] for row in status_rows} + + cat_rows = ( + self.db.query(TOMMeasureDB.category, func.count(TOMMeasureDB.id)) + .filter(TOMMeasureDB.tenant_id == tenant_id) + .group_by(TOMMeasureDB.category) + .all() + ) + by_category: dict[str, int] = {row[0]: row[1] for row in cat_rows} + + now = datetime.now(timezone.utc) + overdue = base_q.filter( + TOMMeasureDB.review_date.isnot(None), + TOMMeasureDB.review_date < now, + ).count() + + return { + "total": total, + "by_status": by_status, + "by_category": by_category, + "overdue_review_count": overdue, + "implemented": by_status.get("IMPLEMENTED", 0), + "partial": by_status.get("PARTIAL", 0), + "not_implemented": by_status.get("NOT_IMPLEMENTED", 0), + } + + def export(self, tenant_id: str, fmt: str) -> StreamingResponse: + rows = ( + self.db.query(TOMMeasureDB) + .filter(TOMMeasureDB.tenant_id == tenant_id) + .order_by(TOMMeasureDB.control_id) + .all() + ) + measures = [_measure_to_dict(r) for r in rows] + + if fmt == "json": + return StreamingResponse( + io.BytesIO( + json.dumps(measures, ensure_ascii=False, indent=2).encode("utf-8") + ), + media_type="application/json", + headers={"Content-Disposition": "attachment; filename=tom_export.json"}, + ) + + # CSV (semicolon-separated to match VVT convention) + output = io.StringIO() + writer = csv.DictWriter( + output, fieldnames=_CSV_FIELDS, delimiter=";", extrasaction="ignore" + ) + writer.writeheader() + for m in measures: + writer.writerow(m) + + output.seek(0) + return StreamingResponse( + io.BytesIO(output.getvalue().encode("utf-8")), + media_type="text/csv; charset=utf-8", + headers={"Content-Disposition": "attachment; filename=tom_export.csv"}, + ) + + # ------------------------------------------------------------------ + # Versioning (delegates to shared versioning_utils) + # ------------------------------------------------------------------ + + def list_versions(self, measure_id: str, tenant_id: str) -> Any: + from compliance.api.versioning_utils import list_versions + return list_versions(self.db, "tom", measure_id, tenant_id) + + def get_version( + self, measure_id: str, version_number: int, tenant_id: str + ) -> Any: + from compliance.api.versioning_utils import get_version + v = get_version(self.db, "tom", measure_id, version_number, tenant_id) + if not v: + raise NotFoundError(f"Version {version_number} not found") + return v diff --git a/backend-compliance/mypy.ini b/backend-compliance/mypy.ini index abd8118..d68cc10 100644 --- a/backend-compliance/mypy.ini +++ b/backend-compliance/mypy.ini @@ -75,5 +75,7 @@ ignore_errors = True ignore_errors = False [mypy-compliance.api.banner_routes] ignore_errors = False +[mypy-compliance.api.tom_routes] +ignore_errors = False [mypy-compliance.api._http_errors] ignore_errors = False diff --git a/backend-compliance/tests/contracts/openapi.baseline.json b/backend-compliance/tests/contracts/openapi.baseline.json index 7b48249..bf6045e 100644 --- a/backend-compliance/tests/contracts/openapi.baseline.json +++ b/backend-compliance/tests/contracts/openapi.baseline.json @@ -16948,8 +16948,10 @@ "type": "object" }, "TOMMeasureBulkBody": { + "description": "Request body for POST /tom/measures/bulk.", "properties": { "measures": { + "default": [], "items": { "$ref": "#/components/schemas/TOMMeasureBulkItem" }, @@ -16968,13 +16970,11 @@ "title": "Tenant Id" } }, - "required": [ - "measures" - ], "title": "TOMMeasureBulkBody", "type": "object" }, "TOMMeasureBulkItem": { + "description": "Single item in a TOMMeasureBulkBody \u2014 no verification fields.", "properties": { "applicability": { "default": "REQUIRED", @@ -17148,6 +17148,7 @@ "type": "object" }, "TOMMeasureCreate": { + "description": "Request body for POST /tom/measures.", "properties": { "applicability": { "default": "REQUIRED", @@ -17354,6 +17355,7 @@ "type": "object" }, "TOMMeasureUpdate": { + "description": "Request body for PUT /tom/measures/{id} (all fields optional).", "properties": { "applicability": { "anyOf": [ @@ -17583,6 +17585,7 @@ "type": "object" }, "TOMStateBody": { + "description": "Request body for POST /tom/state (save with optimistic locking).", "properties": { "state": { "additionalProperties": true, @@ -40733,7 +40736,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response List Measures Api Compliance Tom Measures Get", + "type": "object" + } } }, "description": "Successful Response" @@ -40790,7 +40797,11 @@ "201": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Create Measure Api Compliance Tom Measures Post", + "type": "object" + } } }, "description": "Successful Response" @@ -40831,7 +40842,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Bulk Upsert Measures Api Compliance Tom Measures Bulk Post", + "type": "object" + } } }, "description": "Successful Response" @@ -40884,7 +40899,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Update Measure Api Compliance Tom Measures Measure Id Put", + "type": "object" + } } }, "description": "Successful Response" @@ -40958,7 +40977,9 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "title": "Response List Measure Versions Api Compliance Tom Measures Measure Id Versions Get" + } } }, "description": "Successful Response" @@ -41041,7 +41062,9 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "title": "Response Get Measure Version Api Compliance Tom Measures Measure Id Versions Version Number Get" + } } }, "description": "Successful Response" @@ -41106,7 +41129,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Delete Tom State Api Compliance Tom State Delete", + "type": "object" + } } }, "description": "Successful Response" @@ -41169,7 +41196,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Get Tom State Api Compliance Tom State Get", + "type": "object" + } } }, "description": "Successful Response" @@ -41208,7 +41239,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Save Tom State Api Compliance Tom State Post", + "type": "object" + } } }, "description": "Successful Response" @@ -41257,7 +41292,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Get Tom Stats Api Compliance Tom Stats Get", + "type": "object" + } } }, "description": "Successful Response"