diff --git a/backend-compliance/compliance/api/agent_check/_b13_wiring.py b/backend-compliance/compliance/api/agent_check/_b13_wiring.py new file mode 100644 index 00000000..608b4330 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b13_wiring.py @@ -0,0 +1,67 @@ +"""B13 wiring — Widerrufsbelehrung-Reachability. + +Hängt sich an `state["extra_findings"]` an und rendert einen +eigenständigen V2-HTML-Block (`widerruf_reach_html`). +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.widerrufsbelehrung_reachability_check import ( + check_widerrufsbelehrung_reachability, +) + +logger = logging.getLogger(__name__) + + +def run_b13(state: dict) -> None: + new = check_widerrufsbelehrung_reachability(state) + if not new: + return + extras = state.get("extra_findings") or [] + extras.extend(new) + state["extra_findings"] = extras + state["widerruf_reach_html"] = _render(new) + logger.info("B13 widerruf-reach: %d finding(s)", len(new)) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else "#f59e0b" + scope_tag = f.get("b2c_scope") or "" + scope_html = ( + f"Scope: {html.escape(scope_tag)}" + if scope_tag else "" + ) + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}{scope_html}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"
" + f"{html.escape(f.get('evidence') or '')}
" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('action') or '')}
" + "
" + ) + return ( + "
" + "

" + "📜 Widerrufsbelehrung-Reachability (B2C-Pflicht)" + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 653e8553..bdde8681 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -23,6 +23,7 @@ from ._b5_wiring import run_b5 from ._b6b7b8_wiring import run_b6b7b8 from ._b9b10_wiring import run_b9b10 from ._b12_wiring import run_b12 +from ._b13_wiring import run_b13 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -70,6 +71,7 @@ async def run_compliance_check(check_id: str, req) -> None: run_b6b7b8(state) # DPO-cross-doc + Doc-Staleness + CMP-fingerprint run_b9b10(state) # Multi-Entity-Impressum + Drittland-Mechanismus run_b12(state) # Chatbot-Cookie-Klassifikation (B11 ist in B9B10) + run_b13(state) # Widerrufsbelehrung-Reachability (B2C-Pflicht) # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index 3afe3c4a..ca1a2d62 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -48,6 +48,8 @@ def compose_v2(state: dict) -> str: state.get("extra_findings_html", ""), # B12 Chatbot-Cookie-Klassifikation state.get("chatbot_cookie_html", ""), + # B13 Widerrufsbelehrung-Reachability (B2C-Pflicht) + state.get("widerruf_reach_html", ""), # Browser-Matrix (Stage 1.c) state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves diff --git a/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py b/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py new file mode 100644 index 00000000..9c04c258 --- /dev/null +++ b/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py @@ -0,0 +1,183 @@ +"""B13 — Widerrufsbelehrung-Reachability-Check. + +Erkennt: B2C-Shop ohne öffentlich erreichbare Widerrufsbelehrung. + +Norm: Art. 246a § 1 Abs. 2 Nr. 1 EGBGB i.V.m. § 312d BGB — +Widerrufsbelehrung muss dauerhaft + leicht zugänglich auf der Website +verfügbar sein. Footer ohne Widerruf-Link + alle Widerruf-Pfade 404 +verletzt das. + +Signale aus state: + - doc_entries: ein entry mit doc_type='widerruf'. Discovery hat es + versucht (discovery_attempted=True), aber Text ist leer / unter + Mindestlänge → Pfad nicht erreichbar. + - DSE / Homepage / sonstige Texte: B2C-Scope-Detection per Keywords. + +Ein einzelnes Finding mit Schweregrad HIGH bei B2C, MEDIUM bei +unklarem Scope, kein Finding bei klarem B2B. +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + + +# Min characters that count as 'reachable' widerruf doc. +_MIN_TEXT_CHARS = 400 + + +# Strong B2C-Shop signals (Endkunden-Verkauf / E-Commerce). +_B2C_STRONG = ( + "warenkorb", "in den warenkorb", "zur kasse", "kasse", + "bestellung aufgeben", "jetzt kaufen", "preis inkl. mwst", + "preis inkl. mehrwertsteuer", "lieferzeit", "versandkosten", + "rückgaberecht", "rueckgaberecht", "rücksende", + "endkunden", "endverbraucher", "verbraucher i.s.d. § 13 bgb", +) + +# Weaker B2C signals — only count when paired with a strong one or +# when AT LEAST TWO appear together. +_B2C_WEAK = ( + "shop", "store", "kaufen", "produkt", "ware", "rechnung", + "agb", "widerrufsfrist", "widerrufsrecht", "wallbox", "hardware", + "abonnement", "tarif buchen", "naturstrom", "ladetarif", +) + +# Hard B2B-only signals that override B2C-Verdacht. +_B2B_ONLY = ( + "ausschließlich an unternehmer", "ausschliesslich an unternehmer", + "nur für unternehmen", "b2b only", "kein verkauf an verbraucher", + "ausschluss verbraucher", +) + + +def _detect_b2c_scope(state: dict) -> tuple[str, list[str]]: + """Return (scope, signals_found). + + scope ∈ {'b2c_strong', 'b2c_likely', 'b2b_only', 'unknown'} + """ + haystack_parts: list[str] = [] + for e in (state.get("doc_entries") or []): + t = (e.get("text") or "").lower() + if t: + haystack_parts.append(t) + home = (state.get("home_text") or "").lower() + if home: + haystack_parts.append(home) + hay = "\n".join(haystack_parts) + + if not hay: + return "unknown", [] + + b2b_hits = [s for s in _B2B_ONLY if s in hay] + if b2b_hits: + return "b2b_only", b2b_hits + + strong_hits = [s for s in _B2C_STRONG if s in hay] + if strong_hits: + return "b2c_strong", strong_hits + + weak_hits = [s for s in _B2C_WEAK if s in hay] + if len(weak_hits) >= 2: + return "b2c_likely", weak_hits[:5] + return "unknown", weak_hits[:3] + + +def _footer_has_widerruf_link(state: dict) -> bool: + """Best-effort scan for a Widerruf-link in footer / discovered URLs. + + The discovery phase merges any same-owner widerruf URLs it finds + into doc_entries[widerruf].url. If that URL exists AND the + discovered text is non-empty, the page is reachable. + """ + for e in (state.get("doc_entries") or []): + if e.get("doc_type") != "widerruf": + continue + url = (e.get("url") or "").strip() + text = (e.get("text") or "").strip() + if url and len(text) >= _MIN_TEXT_CHARS: + return True + # Optional: scan a raw footer-snapshot if the orchestrator stored one. + footer = (state.get("footer_html") or "") + " " + \ + (state.get("footer_text") or "") + if footer and re.search( + r"widerruf|cancellation|withdrawal|rückgabe|rueckgabe", + footer, re.IGNORECASE, + ): + return True + return False + + +def check_widerrufsbelehrung_reachability(state: dict) -> list[dict]: + """Emit a single finding when a B2C-Shop has no reachable Widerruf + document and no footer link to one.""" + widerruf_entry = next( + (e for e in (state.get("doc_entries") or []) + if e.get("doc_type") == "widerruf"), + None, + ) + if not widerruf_entry: + # No widerruf processing happened at all → don't fabricate. + return [] + + discovery_tried = bool(widerruf_entry.get("discovery_attempted")) + text_len = len((widerruf_entry.get("text") or "").strip()) + has_url = bool((widerruf_entry.get("url") or "").strip()) + + if text_len >= _MIN_TEXT_CHARS: + # widerruf doc is actually reachable — no finding. + return [] + + if not discovery_tried and not has_url: + # User did not submit a widerruf URL and discovery did not run + # (e.g. no homepage to crawl). Cannot make a claim. + return [] + + if _footer_has_widerruf_link(state): + return [] + + scope, signals = _detect_b2c_scope(state) + if scope == "b2b_only": + return [] + + if scope == "unknown": + # Without B2C-Indikatoren remain silent — false positives at + # pure agency / B2B sites would erode trust in the report. + return [] + + sev = "HIGH" if scope == "b2c_strong" else "MEDIUM" + sev_reason = "missing" if scope == "b2c_strong" else "unverifiable" + + tried_url = (widerruf_entry.get("url") + or widerruf_entry.get("rejected_url") or "").strip() + tried_hint = f" (probiert: {tried_url})" if tried_url else "" + + return [{ + "check_id": "WIDERRUF-REACH-001", + "severity": sev, + "severity_reason": sev_reason, + "title": ( + "Widerrufsbelehrung nicht öffentlich erreichbar " + "trotz B2C-Shop-Merkmalen" + ), + "norm": ( + "Art. 246a § 1 Abs. 2 Nr. 1 EGBGB i.V.m. § 312d BGB" + ), + "evidence": ( + f"Discovery hat Widerruf-Pfade versucht{tried_hint} — keine " + f"erreichbare Belehrung gefunden. Footer enthält keinen " + f"Widerruf-Link. B2C-Signale: " + f"{', '.join(signals[:3]) if signals else 'keine direkten'}." + ), + "action": ( + "Eigenständige Widerrufsbelehrungs-Seite (z.B. " + "/widerrufsbelehrung) anlegen UND im Footer dauerhaft " + "verlinken. Gesetzliche Musterbelehrung nach Anlage 1 zu " + "Art. 246a EGBGB verwenden — eigene Formulierungen sind " + "abmahnfähig." + ), + "b2c_scope": scope, + }] diff --git a/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py b/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py new file mode 100644 index 00000000..eebb8cb8 --- /dev/null +++ b/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py @@ -0,0 +1,137 @@ +"""Tests for B13 Widerrufsbelehrung-Reachability-Check.""" + +from compliance.services.widerrufsbelehrung_reachability_check import ( + _detect_b2c_scope, + check_widerrufsbelehrung_reachability, +) + + +def _state(widerruf_entry=None, home_text="", doc_entries=None, + footer_text=""): + entries = list(doc_entries or []) + if widerruf_entry is not None: + entries.append({"doc_type": "widerruf", **widerruf_entry}) + return { + "doc_entries": entries, + "home_text": home_text, + "footer_text": footer_text, + } + + +class TestDetectB2CScope: + def test_strong_b2c_warenkorb(self): + s = _state(home_text="Legen Sie das Produkt in den Warenkorb.") + scope, hits = _detect_b2c_scope(s) + assert scope == "b2c_strong" + assert any("warenkorb" in h for h in hits) + + def test_b2b_only_overrides_b2c(self): + s = _state(home_text="Wir verkaufen ausschließlich an Unternehmer. " + "Warenkorb für Großkunden.") + scope, _ = _detect_b2c_scope(s) + assert scope == "b2b_only" + + def test_weak_signals_two_promote_to_likely(self): + s = _state(home_text="Unser Shop bietet Wallbox-Produkte mit " + "Rechnung zur Bestellung.") + scope, _ = _detect_b2c_scope(s) + assert scope == "b2c_likely" + + def test_single_weak_signal_stays_unknown(self): + s = _state(home_text="Wir bieten einen Shop.") + scope, _ = _detect_b2c_scope(s) + assert scope == "unknown" + + def test_empty_state(self): + s = _state() + scope, _ = _detect_b2c_scope(s) + assert scope == "unknown" + + +class TestCheck: + def test_no_widerruf_entry_no_finding(self): + out = check_widerrufsbelehrung_reachability(_state()) + assert out == [] + + def test_widerruf_reachable_no_finding(self): + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "https://shop.de/widerruf", + "text": "Belehrung " * 100, + "discovery_attempted": True, + }, + home_text="Warenkorb / zur Kasse.", + )) + assert out == [] + + def test_unreachable_plus_b2c_strong_high_finding(self): + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "", "text": "", + "discovery_attempted": True, + "rejected_url": "https://shop.de/widerruf", + }, + home_text="In den Warenkorb. Lieferzeit 2 Tage. Preis inkl. MwSt.", + )) + assert len(out) == 1 + f = out[0] + assert f["check_id"] == "WIDERRUF-REACH-001" + assert f["severity"] == "HIGH" + assert f["b2c_scope"] == "b2c_strong" + assert "Art. 246a" in f["norm"] + assert "shop.de/widerruf" in f["evidence"] + + def test_unreachable_plus_b2c_likely_medium(self): + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "", "text": "", + "discovery_attempted": True, + }, + home_text="Shop bietet Wallbox und Tarif buchen.", + )) + assert len(out) == 1 + assert out[0]["severity"] == "MEDIUM" + assert out[0]["b2c_scope"] == "b2c_likely" + + def test_unreachable_plus_b2b_only_no_finding(self): + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "", "text": "", + "discovery_attempted": True, + }, + home_text="B2B only — kein Verkauf an Verbraucher.", + )) + assert out == [] + + def test_unreachable_plus_unknown_scope_no_finding(self): + # Pure agency / B2B-services without clear shop signals — silent. + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "", "text": "", + "discovery_attempted": True, + }, + home_text="Wir sind eine Beratungsagentur für Mittelstand.", + )) + assert out == [] + + def test_discovery_not_attempted_no_finding(self): + # Avoid false positives when discovery had no homepage to crawl. + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "", "text": "", + "discovery_attempted": False, + }, + home_text="In den Warenkorb. Preis inkl. MwSt.", + )) + assert out == [] + + def test_footer_widerruf_link_suppresses_finding(self): + out = check_widerrufsbelehrung_reachability(_state( + widerruf_entry={ + "url": "", "text": "", + "discovery_attempted": True, + }, + home_text="Warenkorb. Lieferzeit. Preis inkl. MwSt.", + footer_text='Widerruf', + )) + assert out == []