From c7d2038ad9785139aaabf01b79cae6a4b7ca1a80 Mon Sep 17 00:00:00 2001
From: Benjamin Admin
Date: Sun, 7 Jun 2026 17:32:34 +0200
Subject: [PATCH] =?UTF-8?q?feat(b17):=20DSMS-CID-Anchor=20f=C3=BCr=20Audit?=
=?UTF-8?q?-Walk-Video=20(Stufe=203,=20#7)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Video + walk.json werden nach Aufnahme zu DSMS-IPFS hochgeladen.
Die zurückgegebenen CIDs sind manipulationssichere Audit-Anker —
Reviewer können das Walk-Video Monate später noch verifizieren und
auf Unverändertheit prüfen.
consent-tester:
- _upload_to_dsms(): Best-Effort-Upload zu /api/v1/documents
(Bearer-Token, document_type=audit_walk_video|meta). DSMS-Down
bricht den Walk nicht ab — CID fehlt einfach im result.
- record_audit_walk(): nach video.webm + walk.json erzeugt, beide
hochladen. walk.json wird re-written sodass es BEIDE CIDs
selbstreferenziell enthält.
- ENV: DSMS_GATEWAY_URL + DSMS_BEARER konfigurierbar.
backend:
- _b17_wiring._publicize_gateway_url(): DSMS gibt intern
http://dsms-node:8080/ipfs/{cid} zurück. Für die Audit-Mail
wird das via env DSMS_PUBLIC_GATEWAY (default
https://dsms-dev.breakpilot.ai) durch eine extern erreichbare
URL ersetzt.
- Render-Block: gelber DSMS-Anchor-Hinweis mit Video-CID +
walk.json-CID, beide als klickbare Links zur public Gateway.
Real-World-Smoke gegen Elli:
- Video-CID: QmbdFwtSymPuWGYYdC6eNZ1eEvVLsTYmoRRxEo5L6BXgwt
- walk.json-CID: QmWaTqwZq4KVd5wYFVAKB12uZtAosPqoG1X4m1azysXYJi
- DSMS-Upload erfolgreich, gateway_url im response
Tests: 12/12 grün (+2 für DSMS-Anchor-Render-Pfade inkl.
Internal-Host → Public-Gateway-Rewrite).
Co-Authored-By: Claude Opus 4.7 (1M context)
---
.../compliance/api/agent_check/_b17_wiring.py | 50 ++++++++++++++++
.../tests/test_b17_audit_walk.py | 25 ++++++++
.../services/audit_walk_recorder.py | 60 ++++++++++++++++++-
3 files changed, 134 insertions(+), 1 deletion(-)
diff --git a/backend-compliance/compliance/api/agent_check/_b17_wiring.py b/backend-compliance/compliance/api/agent_check/_b17_wiring.py
index 892c7813..58929d43 100644
--- a/backend-compliance/compliance/api/agent_check/_b17_wiring.py
+++ b/backend-compliance/compliance/api/agent_check/_b17_wiring.py
@@ -13,6 +13,7 @@ from __future__ import annotations
import html
import logging
+import os
from urllib.parse import urlparse
import httpx
@@ -21,6 +22,24 @@ from ._constants import CONSENT_TESTER_URL
logger = logging.getLogger(__name__)
+# Optionaler Override für die öffentliche IPFS-Gateway-URL. DSMS gibt
+# intern http://dsms-node:8080/ipfs/{cid} zurück — für die Mail brauchen
+# Reviewer aber eine extern erreichbare URL.
+DSMS_PUBLIC_GATEWAY = os.environ.get(
+ "DSMS_PUBLIC_GATEWAY", "https://dsms-dev.breakpilot.ai",
+)
+
+
+def _publicize_gateway_url(internal_url: str) -> str:
+ """Replace internal dsms-node host with the public gateway."""
+ if not internal_url:
+ return ""
+ return internal_url.replace(
+ "http://dsms-node:8080", DSMS_PUBLIC_GATEWAY,
+ ).replace(
+ "http://bp-compliance-dsms-node:8080", DSMS_PUBLIC_GATEWAY,
+ )
+
async def run_b17(state: dict) -> None:
"""Trigger walk recording + store metadata in state."""
@@ -81,6 +100,36 @@ def _render(walk: dict) -> str:
walk_link = _video_link(wid)
meta_link = f"{CONSENT_TESTER_URL}/audit-walks/{wid}/walk.json"
+ # Stufe-3 DSMS-Anchor
+ video_dsms = (video.get("dsms") or {})
+ meta_dsms = (walk.get("walk_json_dsms") or {})
+ video_cid = video_dsms.get("cid") or ""
+ meta_cid = meta_dsms.get("cid") or ""
+ video_gw = _publicize_gateway_url(video_dsms.get("gateway_url") or "")
+ meta_gw = _publicize_gateway_url(meta_dsms.get("gateway_url") or "")
+ dsms_html = ""
+ if video_cid or meta_cid:
+ parts = []
+ if video_cid:
+ link = (f""
+ f"{html.escape(video_cid[:20])}…"
+ if video_gw else
+ f"{html.escape(video_cid)}")
+ parts.append(f"Video-CID: {link}")
+ if meta_cid:
+ link = (f""
+ f"{html.escape(meta_cid[:20])}…"
+ if meta_gw else
+ f"{html.escape(meta_cid)}")
+ parts.append(f"walk.json-CID: {link}")
+ dsms_html = (
+ ""
+ "🔒 DSMS-Anchor (manipulationssicher): "
+ + " · ".join(parts) +
+ "
"
+ )
+
rows = []
for a in actions:
ts = (a.get("timestamp") or "")[11:19] # HH:MM:SS
@@ -126,6 +175,7 @@ def _render(walk: dict) -> str:
f"{nav_count} Compliance-Seiten besucht, jede 4 Sek "
"verweilt — Reviewer kann den Audit-Walk nachverfolgen."
"
"
+ + dsms_html +
""
""
diff --git a/backend-compliance/tests/test_b17_audit_walk.py b/backend-compliance/tests/test_b17_audit_walk.py
index 89078cc7..b9bcd87a 100644
--- a/backend-compliance/tests/test_b17_audit_walk.py
+++ b/backend-compliance/tests/test_b17_audit_walk.py
@@ -72,6 +72,31 @@ class TestRender:
html = _render(walk)
assert "Keine Akkordeons gefunden" in html
+ def test_dsms_anchor_rendered_when_cid_present(self):
+ walk = dict(_FAKE_WALK)
+ walk["video"] = dict(walk["video"])
+ walk["video"]["dsms"] = {
+ "cid": "QmTestCidVideoXX1234567890ABCDEFGHJKLMN",
+ "gateway_url": "http://dsms-node:8080/ipfs/QmTestCidVideo",
+ }
+ walk["walk_json_dsms"] = {
+ "cid": "QmTestCidMetaXX1234567890ABCDEFGHJKLMN",
+ "gateway_url": "http://dsms-node:8080/ipfs/QmTestCidMeta",
+ }
+ html = _render(walk)
+ assert "DSMS-Anchor" in html
+ assert "QmTestCidVideoXX1234" in html
+ # Internal gateway-host must be rewritten to public for the mail
+ assert "dsms-node:8080" not in html
+
+ def test_no_dsms_block_when_cid_absent(self):
+ walk = dict(_FAKE_WALK)
+ walk["video"] = dict(walk["video"])
+ walk["video"].pop("dsms", None)
+ walk.pop("walk_json_dsms", None)
+ html = _render(walk)
+ assert "DSMS-Anchor" not in html
+
class TestRunB17:
def test_no_request_skipped(self):
diff --git a/consent-tester/services/audit_walk_recorder.py b/consent-tester/services/audit_walk_recorder.py
index a2ddf35c..47300d70 100644
--- a/consent-tester/services/audit_walk_recorder.py
+++ b/consent-tester/services/audit_walk_recorder.py
@@ -36,6 +36,13 @@ logger = logging.getLogger(__name__)
# Walk-Output-Root (Volume mount: /data ist im docker-compose definiert)
WALK_ROOT = os.getenv("AUDIT_WALK_DIR", "/data/audit-walks")
+# DSMS-Gateway intern (kein Public-Hostname nötig). Setzt der
+# docker-compose env. Wird Stufe-3-Anchor benutzt.
+DSMS_GATEWAY_URL = os.getenv(
+ "DSMS_GATEWAY_URL", "http://bp-compliance-dsms-gateway:8082",
+)
+DSMS_BEARER = os.getenv("DSMS_BEARER", "audit-walk-uploader")
+
# Footer-Link-Text-Hints — was wir als relevante Compliance-Anker
# erkennen. Wir laden NICHT jeden Footer-Link (sonst riesige Videos),
# sondern nur die compliance-relevanten.
@@ -62,6 +69,35 @@ def _ts() -> str:
return datetime.now(timezone.utc).isoformat()
+async def _upload_to_dsms(
+ path: Path, document_type: str, document_id: str,
+) -> dict:
+ """Upload a single file to DSMS. Returns {cid, size, gateway_url}
+ or {error}. Best-effort: a DSMS-down doesn't abort the walk."""
+ try:
+ import httpx
+ async with httpx.AsyncClient(timeout=60.0) as client:
+ with path.open("rb") as f:
+ files = {"file": (path.name, f.read())}
+ r = await client.post(
+ f"{DSMS_GATEWAY_URL}/api/v1/documents",
+ files=files,
+ data={"document_type": document_type,
+ "document_id": document_id},
+ headers={"Authorization": f"Bearer {DSMS_BEARER}"},
+ )
+ if r.status_code in (200, 201):
+ data = r.json() or {}
+ return {
+ "cid": data.get("cid"),
+ "size": data.get("size"),
+ "gateway_url": data.get("gateway_url") or "",
+ }
+ return {"error": f"HTTP {r.status_code}: {r.text[:200]}"}
+ except Exception as e:
+ return {"error": str(e)[:200]}
+
+
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
@@ -329,8 +365,30 @@ async def record_audit_walk(
"actions": actions,
"video": video_meta,
}
+
+ # Stufe 3: DSMS-CID-Anchor — Video + walk.json zu IPFS hochladen
+ # bevor walk.json final geschrieben wird, damit der CID in der
+ # walk.json selbst stehen kann (self-referential audit anchor).
+ video_path = out_dir / "video.webm"
+ if video_path.exists():
+ video_dsms = await _upload_to_dsms(
+ video_path, document_type="audit_walk_video",
+ document_id=walk_id,
+ )
+ walk_doc["video"]["dsms"] = video_dsms
+
try:
- (out_dir / "walk.json").write_text(
+ walk_json_path = out_dir / "walk.json"
+ walk_json_path.write_text(
+ json.dumps(walk_doc, indent=2, ensure_ascii=False),
+ )
+ walk_dsms = await _upload_to_dsms(
+ walk_json_path, document_type="audit_walk_meta",
+ document_id=walk_id,
+ )
+ walk_doc["walk_json_dsms"] = walk_dsms
+ # Re-write so the on-disk walk.json contains BOTH CIDs
+ walk_json_path.write_text(
json.dumps(walk_doc, indent=2, ensure_ascii=False),
)
except Exception as e: