feat(b17): DSMS-CID-Anchor für Audit-Walk-Video (Stufe 3, #7)

Video + walk.json werden nach Aufnahme zu DSMS-IPFS hochgeladen.
Die zurückgegebenen CIDs sind manipulationssichere Audit-Anker —
Reviewer können das Walk-Video Monate später noch verifizieren und
auf Unverändertheit prüfen.

consent-tester:
  - _upload_to_dsms(): Best-Effort-Upload zu /api/v1/documents
    (Bearer-Token, document_type=audit_walk_video|meta). DSMS-Down
    bricht den Walk nicht ab — CID fehlt einfach im result.
  - record_audit_walk(): nach video.webm + walk.json erzeugt, beide
    hochladen. walk.json wird re-written sodass es BEIDE CIDs
    selbstreferenziell enthält.
  - ENV: DSMS_GATEWAY_URL + DSMS_BEARER konfigurierbar.

backend:
  - _b17_wiring._publicize_gateway_url(): DSMS gibt intern
    http://dsms-node:8080/ipfs/{cid} zurück. Für die Audit-Mail
    wird das via env DSMS_PUBLIC_GATEWAY (default
    https://dsms-dev.breakpilot.ai) durch eine extern erreichbare
    URL ersetzt.
  - Render-Block: gelber DSMS-Anchor-Hinweis mit Video-CID +
    walk.json-CID, beide als klickbare Links zur public Gateway.

Real-World-Smoke gegen Elli:
  - Video-CID: QmbdFwtSymPuWGYYdC6eNZ1eEvVLsTYmoRRxEo5L6BXgwt
  - walk.json-CID: QmWaTqwZq4KVd5wYFVAKB12uZtAosPqoG1X4m1azysXYJi
  - DSMS-Upload erfolgreich, gateway_url im response

Tests: 12/12 grün (+2 für DSMS-Anchor-Render-Pfade inkl.
Internal-Host → Public-Gateway-Rewrite).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-07 17:32:34 +02:00
parent 80c4778017
commit c7d2038ad9
3 changed files with 134 additions and 1 deletions
@@ -13,6 +13,7 @@ from __future__ import annotations
import html
import logging
import os
from urllib.parse import urlparse
import httpx
@@ -21,6 +22,24 @@ from ._constants import CONSENT_TESTER_URL
logger = logging.getLogger(__name__)
# Optionaler Override für die öffentliche IPFS-Gateway-URL. DSMS gibt
# intern http://dsms-node:8080/ipfs/{cid} zurück — für die Mail brauchen
# Reviewer aber eine extern erreichbare URL.
DSMS_PUBLIC_GATEWAY = os.environ.get(
"DSMS_PUBLIC_GATEWAY", "https://dsms-dev.breakpilot.ai",
)
def _publicize_gateway_url(internal_url: str) -> str:
"""Replace internal dsms-node host with the public gateway."""
if not internal_url:
return ""
return internal_url.replace(
"http://dsms-node:8080", DSMS_PUBLIC_GATEWAY,
).replace(
"http://bp-compliance-dsms-node:8080", DSMS_PUBLIC_GATEWAY,
)
async def run_b17(state: dict) -> None:
"""Trigger walk recording + store metadata in state."""
@@ -81,6 +100,36 @@ def _render(walk: dict) -> str:
walk_link = _video_link(wid)
meta_link = f"{CONSENT_TESTER_URL}/audit-walks/{wid}/walk.json"
# Stufe-3 DSMS-Anchor
video_dsms = (video.get("dsms") or {})
meta_dsms = (walk.get("walk_json_dsms") or {})
video_cid = video_dsms.get("cid") or ""
meta_cid = meta_dsms.get("cid") or ""
video_gw = _publicize_gateway_url(video_dsms.get("gateway_url") or "")
meta_gw = _publicize_gateway_url(meta_dsms.get("gateway_url") or "")
dsms_html = ""
if video_cid or meta_cid:
parts = []
if video_cid:
link = (f"<a href='{html.escape(video_gw)}' style='color:#0369a1;'>"
f"<code>{html.escape(video_cid[:20])}…</code></a>"
if video_gw else
f"<code>{html.escape(video_cid)}</code>")
parts.append(f"Video-CID: {link}")
if meta_cid:
link = (f"<a href='{html.escape(meta_gw)}' style='color:#0369a1;'>"
f"<code>{html.escape(meta_cid[:20])}…</code></a>"
if meta_gw else
f"<code>{html.escape(meta_cid)}</code>")
parts.append(f"walk.json-CID: {link}")
dsms_html = (
"<p style='margin:0 0 8px;padding:6px 10px;background:#fef3c7;"
"border-radius:4px;font-size:12px;color:#78350f;'>"
"<strong>🔒 DSMS-Anchor (manipulationssicher):</strong> "
+ " · ".join(parts) +
"</p>"
)
rows = []
for a in actions:
ts = (a.get("timestamp") or "")[11:19] # HH:MM:SS
@@ -126,6 +175,7 @@ def _render(walk: dict) -> str:
f"{nav_count} Compliance-Seiten besucht, jede 4 Sek "
"verweilt — Reviewer kann den Audit-Walk nachverfolgen."
"</p>"
+ dsms_html +
"<table style='font-size:12px;width:100%;border-collapse:collapse;"
"background:#fff;border-radius:4px;'>"
"<thead><tr style='background:#e0f2fe;'>"
@@ -72,6 +72,31 @@ class TestRender:
html = _render(walk)
assert "Keine Akkordeons gefunden" in html
def test_dsms_anchor_rendered_when_cid_present(self):
walk = dict(_FAKE_WALK)
walk["video"] = dict(walk["video"])
walk["video"]["dsms"] = {
"cid": "QmTestCidVideoXX1234567890ABCDEFGHJKLMN",
"gateway_url": "http://dsms-node:8080/ipfs/QmTestCidVideo",
}
walk["walk_json_dsms"] = {
"cid": "QmTestCidMetaXX1234567890ABCDEFGHJKLMN",
"gateway_url": "http://dsms-node:8080/ipfs/QmTestCidMeta",
}
html = _render(walk)
assert "DSMS-Anchor" in html
assert "QmTestCidVideoXX1234" in html
# Internal gateway-host must be rewritten to public for the mail
assert "dsms-node:8080" not in html
def test_no_dsms_block_when_cid_absent(self):
walk = dict(_FAKE_WALK)
walk["video"] = dict(walk["video"])
walk["video"].pop("dsms", None)
walk.pop("walk_json_dsms", None)
html = _render(walk)
assert "DSMS-Anchor" not in html
class TestRunB17:
def test_no_request_skipped(self):
+59 -1
View File
@@ -36,6 +36,13 @@ logger = logging.getLogger(__name__)
# Walk-Output-Root (Volume mount: /data ist im docker-compose definiert)
WALK_ROOT = os.getenv("AUDIT_WALK_DIR", "/data/audit-walks")
# DSMS-Gateway intern (kein Public-Hostname nötig). Setzt der
# docker-compose env. Wird Stufe-3-Anchor benutzt.
DSMS_GATEWAY_URL = os.getenv(
"DSMS_GATEWAY_URL", "http://bp-compliance-dsms-gateway:8082",
)
DSMS_BEARER = os.getenv("DSMS_BEARER", "audit-walk-uploader")
# Footer-Link-Text-Hints — was wir als relevante Compliance-Anker
# erkennen. Wir laden NICHT jeden Footer-Link (sonst riesige Videos),
# sondern nur die compliance-relevanten.
@@ -62,6 +69,35 @@ def _ts() -> str:
return datetime.now(timezone.utc).isoformat()
async def _upload_to_dsms(
path: Path, document_type: str, document_id: str,
) -> dict:
"""Upload a single file to DSMS. Returns {cid, size, gateway_url}
or {error}. Best-effort: a DSMS-down doesn't abort the walk."""
try:
import httpx
async with httpx.AsyncClient(timeout=60.0) as client:
with path.open("rb") as f:
files = {"file": (path.name, f.read())}
r = await client.post(
f"{DSMS_GATEWAY_URL}/api/v1/documents",
files=files,
data={"document_type": document_type,
"document_id": document_id},
headers={"Authorization": f"Bearer {DSMS_BEARER}"},
)
if r.status_code in (200, 201):
data = r.json() or {}
return {
"cid": data.get("cid"),
"size": data.get("size"),
"gateway_url": data.get("gateway_url") or "",
}
return {"error": f"HTTP {r.status_code}: {r.text[:200]}"}
except Exception as e:
return {"error": str(e)[:200]}
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
@@ -329,8 +365,30 @@ async def record_audit_walk(
"actions": actions,
"video": video_meta,
}
# Stufe 3: DSMS-CID-Anchor — Video + walk.json zu IPFS hochladen
# bevor walk.json final geschrieben wird, damit der CID in der
# walk.json selbst stehen kann (self-referential audit anchor).
video_path = out_dir / "video.webm"
if video_path.exists():
video_dsms = await _upload_to_dsms(
video_path, document_type="audit_walk_video",
document_id=walk_id,
)
walk_doc["video"]["dsms"] = video_dsms
try:
(out_dir / "walk.json").write_text(
walk_json_path = out_dir / "walk.json"
walk_json_path.write_text(
json.dumps(walk_doc, indent=2, ensure_ascii=False),
)
walk_dsms = await _upload_to_dsms(
walk_json_path, document_type="audit_walk_meta",
document_id=walk_id,
)
walk_doc["walk_json_dsms"] = walk_dsms
# Re-write so the on-disk walk.json contains BOTH CIDs
walk_json_path.write_text(
json.dumps(walk_doc, indent=2, ensure_ascii=False),
)
except Exception as e: