Files
breakpilot-compliance/dsms-gateway/routers/documents.py
T
Benjamin Admin 299375e486 feat(dsms): version chain history + diff endpoint + Audit Timeline UI
DSMS Stufe 3 — making the parent_cid chain useful end-to-end.

Gateway (dsms-gateway):
- /api/v1/documents/{cid}/history alias added next to the legacy
  /documents/{cid}/history (history endpoint itself was already there,
  just under an inconsistent prefix).
- NEW /api/v1/documents/{cid_a}/diff/{cid_b}: fetches both packages from
  IPFS, computes a metadata diff (per-field old/new), and renders a
  unified text diff for utf-8 payloads. Binary payloads return only
  metadata diff with a "binary — compare via rendered export" note.
- 4 new pytest cases (mocking ipfs_cat): text diff, binary fallback,
  fetch error, history chain depth — all green.

Frontend (admin-compliance):
- CIDHistoryModal: lazy-loads /dsms/documents/:cid/history, renders the
  version chain as a vertical timeline, marks the AKTUELL entry, and
  per-step exposes a "Diff zu V<n>" button that loads + renders the diff
  inline (metadata table + unified text diff in a monospace panel).
- AuditTimelinePage: existing CID badge now sits next to a "Verlauf
  anzeigen" link that opens the modal. Handles both Python's plain-CID
  audit values and the Go techfile flow's JSON envelope {cid, filename,
  size} via extractCID() helper.

This makes "show me how this CE-Akte changed between V2 and V3"
self-service in the UI instead of a curl-against-IPFS workflow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 10:10:07 +02:00

385 lines
12 KiB
Python

"""
Documents router — handles /api/v1/documents and /api/v1/legal-documents endpoints.
"""
import hashlib
import json
import io
from datetime import datetime
from typing import Optional
import httpx
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from models import DocumentList, DocumentMetadata, StoredDocument
from dependencies import verify_token, ipfs_add, ipfs_cat, ipfs_pin_ls
from config import IPFS_API_URL, IPFS_GATEWAY_URL
router = APIRouter()
@router.post("/api/v1/documents", response_model=StoredDocument)
async def store_document(
file: UploadFile = File(...),
document_type: str = "legal_document",
document_id: Optional[str] = None,
version: Optional[str] = None,
language: str = "de",
_auth: dict = Depends(verify_token)
):
"""
Speichert ein Dokument im DSMS.
- **file**: Das zu speichernde Dokument
- **document_type**: Typ des Dokuments (legal_document, consent_record, audit_log)
- **document_id**: Optionale ID des Dokuments
- **version**: Optionale Versionsnummer
- **language**: Sprache (default: de)
"""
content = await file.read()
# Checksum berechnen
checksum = hashlib.sha256(content).hexdigest()
# Metadaten erstellen
metadata = DocumentMetadata(
document_type=document_type,
document_id=document_id,
version=version,
language=language,
created_at=datetime.utcnow().isoformat(),
checksum=checksum,
encrypted=False
)
# Dokument mit Metadaten als JSON verpacken
package = {
"metadata": metadata.model_dump(),
"content_base64": content.hex(), # Hex-encodiert für JSON
"filename": file.filename
}
package_bytes = json.dumps(package).encode()
# Zu IPFS hinzufügen
result = await ipfs_add(package_bytes)
cid = result.get("Hash")
size = int(result.get("Size", 0))
return StoredDocument(
cid=cid,
size=size,
metadata=metadata,
gateway_url=f"{IPFS_GATEWAY_URL}/ipfs/{cid}",
timestamp=datetime.utcnow().isoformat()
)
@router.get("/api/v1/documents/{cid}")
async def get_document(
cid: str,
_auth: dict = Depends(verify_token)
):
"""
Ruft ein Dokument aus dem DSMS ab.
- **cid**: Content Identifier (IPFS Hash)
"""
content = await ipfs_cat(cid)
try:
package = json.loads(content)
metadata = package.get("metadata", {})
original_content = bytes.fromhex(package.get("content_base64", ""))
filename = package.get("filename", "document")
return StreamingResponse(
io.BytesIO(original_content),
media_type="application/octet-stream",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-DSMS-Document-Type": metadata.get("document_type", "unknown"),
"X-DSMS-Checksum": metadata.get("checksum", ""),
"X-DSMS-Created-At": metadata.get("created_at", "")
}
)
except json.JSONDecodeError:
# Wenn es kein DSMS-Paket ist, gib rohen Inhalt zurück
return StreamingResponse(
io.BytesIO(content),
media_type="application/octet-stream"
)
@router.get("/api/v1/documents/{cid}/metadata")
async def get_document_metadata(
cid: str,
_auth: dict = Depends(verify_token)
):
"""
Ruft nur die Metadaten eines Dokuments ab.
- **cid**: Content Identifier (IPFS Hash)
"""
content = await ipfs_cat(cid)
try:
package = json.loads(content)
return {
"cid": cid,
"metadata": package.get("metadata", {}),
"filename": package.get("filename"),
"size": len(bytes.fromhex(package.get("content_base64", "")))
}
except json.JSONDecodeError:
return {
"cid": cid,
"metadata": {},
"raw_size": len(content)
}
@router.get("/api/v1/documents", response_model=DocumentList)
async def list_documents(
_auth: dict = Depends(verify_token)
):
"""
Listet alle gespeicherten Dokumente auf.
"""
cids = await ipfs_pin_ls()
documents = []
for cid in cids[:100]: # Limit auf 100 für Performance
try:
content = await ipfs_cat(cid)
package = json.loads(content)
documents.append({
"cid": cid,
"metadata": package.get("metadata", {}),
"filename": package.get("filename")
})
except Exception:
# Überspringe nicht-DSMS Objekte
continue
return DocumentList(
documents=documents,
total=len(documents)
)
@router.delete("/api/v1/documents/{cid}")
async def unpin_document(
cid: str,
_auth: dict = Depends(verify_token)
):
"""
Entfernt ein Dokument aus dem lokalen Pin-Set.
Das Dokument bleibt im Netzwerk, wird aber bei GC entfernt.
- **cid**: Content Identifier (IPFS Hash)
"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{IPFS_API_URL}/api/v0/pin/rm",
params={"arg": cid}
)
if response.status_code != 200:
raise HTTPException(
status_code=404,
detail=f"Konnte Pin nicht entfernen: {cid}"
)
return {
"status": "unpinned",
"cid": cid,
"message": "Dokument wird bei nächster Garbage Collection entfernt"
}
@router.post("/api/v1/legal-documents/archive")
async def archive_legal_document(
document_id: str,
version: str,
content: str,
language: str = "de",
_auth: dict = Depends(verify_token)
):
"""
Archiviert eine rechtliche Dokumentversion dauerhaft.
Speziell für AGB, Datenschutzerklärung, etc.
- **document_id**: ID des Legal Documents
- **version**: Versionsnummer
- **content**: HTML/Markdown Inhalt
- **language**: Sprache
"""
# Checksum berechnen
content_bytes = content.encode('utf-8')
checksum = hashlib.sha256(content_bytes).hexdigest()
# Metadaten
metadata = {
"document_type": "legal_document",
"document_id": document_id,
"version": version,
"language": language,
"created_at": datetime.utcnow().isoformat(),
"checksum": checksum,
"content_type": "text/html"
}
# Paket erstellen
package = {
"metadata": metadata,
"content": content,
"archived_at": datetime.utcnow().isoformat()
}
package_bytes = json.dumps(package, ensure_ascii=False).encode('utf-8')
# Zu IPFS hinzufügen
result = await ipfs_add(package_bytes)
cid = result.get("Hash")
return {
"cid": cid,
"document_id": document_id,
"version": version,
"checksum": checksum,
"archived_at": datetime.utcnow().isoformat(),
"verification_url": f"{IPFS_GATEWAY_URL}/ipfs/{cid}"
}
@router.get("/api/v1/documents/{cid}/history")
@router.get("/documents/{cid}/history") # legacy path, kept for backwards compatibility
async def get_document_history(cid: str):
"""Follow the parent_cid chain to reconstruct version history."""
history = []
current_cid = cid
max_depth = 50 # prevent infinite loops
for _ in range(max_depth):
try:
raw = await ipfs_cat(current_cid)
package = json.loads(raw)
metadata = package.get("metadata", {})
history.append({
"cid": current_cid,
"version": metadata.get("version"),
"document_type": metadata.get("document_type"),
"document_id": metadata.get("document_id"),
"parent_cid": metadata.get("parent_cid"),
"created_at": metadata.get("created_at"),
"checksum": metadata.get("checksum"),
})
parent = metadata.get("parent_cid")
if not parent:
break
current_cid = parent
except Exception:
break
return {"cid": cid, "history": history, "depth": len(history)}
@router.get("/api/v1/documents/{cid_a}/diff/{cid_b}")
async def diff_documents(cid_a: str, cid_b: str):
"""
Compare two DSMS document versions by their CIDs.
Returns a unified diff of the textual content when both documents are
text-decodable (UTF-8). For binary documents the response indicates
"binary" and returns just the metadata differences. Used by the Audit
Timeline UI to render "what changed between V2 and V3 of CE-Akte X".
"""
try:
raw_a = await ipfs_cat(cid_a)
raw_b = await ipfs_cat(cid_b)
except Exception as exc:
return {"error": f"could not fetch one of the CIDs: {exc}", "cid_a": cid_a, "cid_b": cid_b}
try:
pkg_a = json.loads(raw_a)
pkg_b = json.loads(raw_b)
except Exception:
# Documents are not the wrapped-package JSON shape — treat as raw.
pkg_a = {"metadata": {}, "content_base64": ""}
pkg_b = {"metadata": {}, "content_base64": ""}
meta_a = pkg_a.get("metadata", {}) or {}
meta_b = pkg_b.get("metadata", {}) or {}
meta_diff = _diff_metadata(meta_a, meta_b)
# Try to decode the content. The Archive flow stores files as base64 in
# `content_base64`; older payloads may use `content` (utf-8 text).
text_a, text_b, is_binary = _extract_texts(pkg_a, pkg_b)
if is_binary:
return {
"cid_a": cid_a,
"cid_b": cid_b,
"kind": "binary",
"metadata_diff": meta_diff,
"note": "Binary payload — text diff omitted. Compare via the rendered tech-file export instead.",
}
diff_lines = list(
_unified_diff(text_a.splitlines(), text_b.splitlines(), fromfile=cid_a, tofile=cid_b, lineterm="")
)
return {
"cid_a": cid_a,
"cid_b": cid_b,
"kind": "text",
"metadata_diff": meta_diff,
"diff": "\n".join(diff_lines),
"added_lines": sum(1 for ln in diff_lines if ln.startswith("+") and not ln.startswith("+++")),
"removed_lines": sum(1 for ln in diff_lines if ln.startswith("-") and not ln.startswith("---")),
}
def _diff_metadata(a: dict, b: dict) -> dict:
"""Return per-field change list: {field: {"old": ..., "new": ...}}."""
keys = set(a.keys()) | set(b.keys())
changes = {}
for k in sorted(keys):
if a.get(k) != b.get(k):
changes[k] = {"old": a.get(k), "new": b.get(k)}
return changes
def _extract_texts(pkg_a: dict, pkg_b: dict) -> tuple[str, str, bool]:
"""Return (text_a, text_b, is_binary). Falls back to base64-decode."""
import base64
def to_text(pkg: dict) -> tuple[str, bool]:
if isinstance(pkg.get("content"), str):
return pkg["content"], False
b64 = pkg.get("content_base64")
if not b64:
return "", False
try:
raw = base64.b64decode(b64)
except Exception:
return "", True
try:
return raw.decode("utf-8"), False
except UnicodeDecodeError:
return "", True
text_a, bin_a = to_text(pkg_a)
text_b, bin_b = to_text(pkg_b)
return text_a, text_b, (bin_a or bin_b)
def _unified_diff(a, b, fromfile, tofile, lineterm):
"""Tiny shim around difflib.unified_diff so the function reads cleanly."""
import difflib
return difflib.unified_diff(a, b, fromfile=fromfile, tofile=tofile, lineterm=lineterm, n=2)