Files
Benjamin Admin 95fcba34cd
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Failing after 30s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 17s
fix(quality): Ruff/CVE/TS-Fixes, 104 neue Tests, Complexity-Refactoring
- Ruff: 144 auto-fixes (unused imports, == None → is None), F821/F811/F841 manuell
- CVEs: python-multipart>=0.0.22, weasyprint>=68.0, pillow>=12.1.1, npm audit fix (0 vulns)
- TS: 5 tote Drafting-Engine-Dateien entfernt, allowed-facts/sanitizer/StepHeader/context fixes
- Tests: +104 (ISMS 58, Evidence 18, VVT 14, Generation 14) → 1449 passed
- Refactoring: collect_ci_evidence (F→A), row_to_response (E→A), extract_requirements (E→A)
- Dead Code: pca-platform, 7 Go-Handler, dsr_api.py, duplicate Schemas entfernt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 19:00:33 +01:00

226 lines
7.2 KiB
Python

"""
Shared versioning utilities for all compliance document types.
Provides create_version_snapshot() and list_versions() helpers that work
with all 5 version tables (DSFA, VVT, TOM, Loeschfristen, Obligations).
"""
import json
import logging
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from .tenant_utils import get_tenant_id
logger = logging.getLogger(__name__)
# Table → FK column mapping
VERSION_TABLES = {
"dsfa": ("compliance_dsfa_versions", "dsfa_id", "compliance_dsfas"),
"vvt_activity": ("compliance_vvt_activity_versions", "activity_id", "compliance_vvt_activities"),
"tom": ("compliance_tom_versions", "measure_id", "compliance_tom_measures"),
"loeschfristen": ("compliance_loeschfristen_versions", "policy_id", "compliance_loeschfristen"),
"obligation": ("compliance_obligation_versions", "obligation_id", "compliance_obligations"),
}
def create_version_snapshot(
db: Session,
doc_type: str,
doc_id: str,
tenant_id: str,
snapshot: dict,
change_summary: str = "",
changed_sections: list = None,
created_by: str = "system",
) -> dict:
"""Create a new version snapshot for any document type.
Args:
doc_type: One of "dsfa", "vvt_activity", "tom", "loeschfristen", "obligation"
doc_id: UUID of the source document
tenant_id: Tenant UUID
snapshot: Full JSONB snapshot of the document state
change_summary: Human-readable summary of changes
changed_sections: List of section identifiers that changed
created_by: User who created this version
Returns:
Dict with version info (id, version_number, created_at)
"""
if doc_type not in VERSION_TABLES:
raise ValueError(f"Unknown document type: {doc_type}")
version_table, fk_column, source_table = VERSION_TABLES[doc_type]
# Get next version number
result = db.execute(
text(f"SELECT COALESCE(MAX(version_number), 0) FROM {version_table} WHERE {fk_column} = :doc_id"),
{"doc_id": doc_id},
)
next_version = result.scalar() + 1
# Insert version
result = db.execute(
text(f"""
INSERT INTO {version_table}
({fk_column}, tenant_id, version_number, snapshot, change_summary,
changed_sections, created_by)
VALUES (:doc_id, :tenant_id, :version_number, CAST(:snapshot AS jsonb),
:change_summary, CAST(:changed_sections AS jsonb), :created_by)
RETURNING id, version_number, created_at
"""),
{
"doc_id": doc_id,
"tenant_id": tenant_id,
"version_number": next_version,
"snapshot": json.dumps(snapshot),
"change_summary": change_summary,
"changed_sections": json.dumps(changed_sections or []),
"created_by": created_by,
},
)
row = result.fetchone()
# Update current_version on the source table
db.execute(
text(f"UPDATE {source_table} SET current_version = :v WHERE id = :doc_id"),
{"v": next_version, "doc_id": doc_id},
)
return {
"id": str(row[0]),
"version_number": row[1],
"created_at": row[2].isoformat() if row[2] else None,
}
def list_versions(
db: Session,
doc_type: str,
doc_id: str,
tenant_id: str,
) -> List[dict]:
"""List all versions for a document, newest first."""
if doc_type not in VERSION_TABLES:
raise ValueError(f"Unknown document type: {doc_type}")
version_table, fk_column, _ = VERSION_TABLES[doc_type]
result = db.execute(
text(f"""
SELECT id, version_number, status, change_summary, changed_sections,
created_by, approved_by, approved_at, created_at
FROM {version_table}
WHERE {fk_column} = :doc_id AND tenant_id = :tenant_id
ORDER BY version_number DESC
"""),
{"doc_id": doc_id, "tenant_id": tenant_id},
)
rows = result.fetchall()
return [
{
"id": str(r[0]),
"version_number": r[1],
"status": r[2],
"change_summary": r[3],
"changed_sections": r[4] or [],
"created_by": r[5],
"approved_by": r[6],
"approved_at": r[7].isoformat() if r[7] else None,
"created_at": r[8].isoformat() if r[8] else None,
}
for r in rows
]
def get_version(
db: Session,
doc_type: str,
doc_id: str,
version_number: int,
tenant_id: str,
) -> Optional[dict]:
"""Get a specific version with its full snapshot."""
if doc_type not in VERSION_TABLES:
raise ValueError(f"Unknown document type: {doc_type}")
version_table, fk_column, _ = VERSION_TABLES[doc_type]
result = db.execute(
text(f"""
SELECT id, version_number, status, snapshot, change_summary,
changed_sections, created_by, approved_by, approved_at, created_at
FROM {version_table}
WHERE {fk_column} = :doc_id AND version_number = :v AND tenant_id = :tenant_id
"""),
{"doc_id": doc_id, "v": version_number, "tenant_id": tenant_id},
)
r = result.fetchone()
if not r:
return None
return {
"id": str(r[0]),
"version_number": r[1],
"status": r[2],
"snapshot": r[3],
"change_summary": r[4],
"changed_sections": r[5] or [],
"created_by": r[6],
"approved_by": r[7],
"approved_at": r[8].isoformat() if r[8] else None,
"created_at": r[9].isoformat() if r[9] else None,
}
def register_version_routes(
router: APIRouter,
doc_type: str,
id_param: str = "item_id",
resource_name: str = "Item",
):
"""Register GET /{id}/versions and GET /{id}/versions/{v} on an existing router.
Uses a standardized path param name `item_id` in the generated routes.
The actual URL path parameter can be customized via `id_param`.
Args:
router: The APIRouter to add version routes to
doc_type: One of the keys in VERSION_TABLES
id_param: Path parameter name in the URL (e.g. "obligation_id")
resource_name: Human-readable name for error messages
"""
# Capture doc_type and resource_name in closure
_doc_type = doc_type
_resource_name = resource_name
@router.get(f"/{{{id_param}}}/versions")
async def list_item_versions(
request: Request,
db: Session = Depends(get_db),
tid: str = Depends(get_tenant_id),
):
doc_id = request.path_params[id_param]
return list_versions(db, _doc_type, doc_id, tid)
@router.get(f"/{{{id_param}}}/versions/{{version_number}}")
async def get_item_version(
version_number: int,
request: Request,
db: Session = Depends(get_db),
tid: str = Depends(get_tenant_id),
):
doc_id = request.path_params[id_param]
v = get_version(db, _doc_type, doc_id, version_number, tid)
if not v:
raise HTTPException(
status_code=404,
detail=f"{_resource_name} version {version_number} not found",
)
return v