diff --git a/backend-compliance/compliance/api/agent_check/_b18_wiring.py b/backend-compliance/compliance/api/agent_check/_b18_wiring.py
new file mode 100644
index 00000000..8f550243
--- /dev/null
+++ b/backend-compliance/compliance/api/agent_check/_b18_wiring.py
@@ -0,0 +1,130 @@
+"""B18 wiring — Specialist-Agents Phase 2 (Impressum LLM).
+
+Ruft den LLM-Agent (impressum_agent_llm.evaluate_llm) auf, mergt das
+Ergebnis mit dem Pattern-Match-Agent und deduplet nach field_id.
+Rendert einen V2-HTML-Block (impressum_agent_html).
+"""
+
+from __future__ import annotations
+
+import html
+import logging
+import os
+
+from compliance.services.specialist_agents.impressum_agent import (
+ PFLICHTANGABEN, evaluate as evaluate_pattern,
+)
+from compliance.services.specialist_agents.impressum_agent_llm import (
+ evaluate_llm,
+)
+
+logger = logging.getLogger(__name__)
+
+
+_DISABLED = os.environ.get("IMPRESSUM_AGENT_DISABLED", "").lower() in (
+ "1", "true", "yes",
+)
+
+
+async def run_b18(state: dict) -> None:
+ if _DISABLED:
+ return
+ doc_texts = state.get("doc_texts") or {}
+ imp = (doc_texts.get("impressum") or "").strip()
+ if len(imp) < 100:
+ return
+
+ # Business-scope-Inferenz aus dem profile, falls vorhanden.
+ profile_dict = state.get("profile_dict") or {}
+ scope: set[str] = set()
+ if profile_dict.get("has_online_shop"):
+ scope.add("ecommerce")
+ if profile_dict.get("is_regulated_profession"):
+ scope.add("regulated_profession")
+ if profile_dict.get("industry") in ("insurance", "Finance",
+ "finance"):
+ scope.add("insurance")
+
+ pattern_findings = evaluate_pattern(imp, scope)
+ llm_findings = await evaluate_llm(imp, scope)
+
+ # Dedup: pattern-agent + llm-agent können ähnliche field_ids melden.
+ # Keep first, prefer pattern (deterministisch + stable).
+ seen_keys: set[str] = set()
+ merged: list[dict] = []
+ for f in pattern_findings + llm_findings:
+ # Stable dedup key: field_id (normalised). Both agents emit
+ # the same field for the same gap → fold to one.
+ key = (f.get("field_id") or "").lower()
+ if key and key in seen_keys:
+ continue
+ seen_keys.add(key)
+ merged.append(f)
+
+ if not merged:
+ return
+
+ extras = state.get("extra_findings") or []
+ extras.extend(merged)
+ state["extra_findings"] = extras
+ state["impressum_agent_html"] = _render(merged, pattern_findings,
+ llm_findings)
+ logger.info(
+ "B18 impressum-agent: pattern=%d llm=%d merged=%d",
+ len(pattern_findings), len(llm_findings), len(merged),
+ )
+
+
+def _render(merged: list[dict], pattern: list[dict],
+ llm: list[dict]) -> str:
+ cards = []
+ for f in merged:
+ sev = (f.get("severity") or "").upper()
+ color = "#dc2626" if sev == "HIGH" else (
+ "#f59e0b" if sev == "MEDIUM" else "#64748b"
+ )
+ agent_tag = f.get("agent") or ""
+ tag_html = ""
+ if agent_tag:
+ short = "LLM" if "llm" in agent_tag.lower() else "KB"
+ bg = "#dbeafe" if short == "LLM" else "#f1f5f9"
+ col = "#1e40af" if short == "LLM" else "#475569"
+ tag_html = (
+ f"{short}"
+ )
+ evidence_html = ""
+ if f.get("evidence"):
+ evidence_html = (
+ "
"
+ f"{html.escape(f['evidence'])}
"
+ )
+ cards.append(
+ f""
+ f"
"
+ f"{sev} · {html.escape(f.get('check_id') or '')}{tag_html}
"
+ f"
"
+ f"{html.escape(f.get('title') or '')}
"
+ f"
"
+ f"{html.escape(f.get('norm') or '')}
"
+ f"{evidence_html}"
+ f"
"
+ f"→ Empfehlung: "
+ f"{html.escape(f.get('action') or '')}
"
+ "
"
+ )
+ return (
+ ""
+ "
"
+ "🤖 Impressum-Specialist-Agent (Pattern-KB + LLM)"
+ "
"
+ f"
"
+ f"Pattern-Match: {len(pattern)} · LLM-Analyse: {len(llm)} · "
+ f"dedupliziert: {len(merged)}
"
+ + "".join(cards) +
+ "
"
+ )
diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py
index e99d2645..053a7a11 100644
--- a/backend-compliance/compliance/api/agent_check/_orchestrator.py
+++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py
@@ -28,6 +28,7 @@ from ._b14_wiring import run_b14
from ._b15_wiring import run_b15
from ._b16_wiring import run_b16
from ._b17_wiring import run_b17
+from ._b18_wiring import run_b18
from ._constants import _compliance_check_jobs
from ._phase_a_resolve import run_phase_a
from ._phase_b_profile_check import run_phase_b
@@ -42,6 +43,9 @@ from ._phase_d3_blocks_top import run_phase_d3_top
from ._phase_e_email import run_phase_e
from ._phase_f_persist import run_phase_f
from ._state import new_state
+from compliance.services.chatbot_policy_discovery import (
+ enrich_dse_with_chatbot_policies,
+)
logger = logging.getLogger(__name__)
@@ -54,6 +58,13 @@ async def run_compliance_check(check_id: str, req) -> None:
continue_run = await run_phase_a(state)
if not continue_run:
return # TDM denied — job already marked skipped_tdm
+ # DSE-Enrichment: Sub-Chatbot-Policies anhängen (Westfield-iAdvize,
+ # vergleichbare Pattern). Best-effort, läuft VOR Phase B damit
+ # die enrichte DSE in alle per-doc-checks fließt.
+ try:
+ await enrich_dse_with_chatbot_policies(state)
+ except Exception as e:
+ logger.warning("chatbot-policy enrichment skipped: %s", e)
# Phase B: Step 2 (profile detect) + Step 3 (per-doc checks)
await run_phase_b(state)
# Phase C: Step 3b-d (banner + cross-check + TCF) + Step 4
@@ -80,6 +91,7 @@ async def run_compliance_check(check_id: str, req) -> None:
run_b15(state) # AI-Act Rechtsgrundlage (LLM-Vendor auf lit. f)
run_b16(state) # Footer-Label-vs-URL-Slug-Drift
await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung)
+ await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM)
# Phase D-3 top/mid/bot: Step 5 HTML blocks
await run_phase_d3_top(state)
await run_phase_d3_mid(state)
diff --git a/backend-compliance/compliance/api/agent_check/_phase_f_persist.py b/backend-compliance/compliance/api/agent_check/_phase_f_persist.py
index 413a9ef6..c2b8359c 100644
--- a/backend-compliance/compliance/api/agent_check/_phase_f_persist.py
+++ b/backend-compliance/compliance/api/agent_check/_phase_f_persist.py
@@ -72,6 +72,24 @@ def run_phase_f(state: dict) -> None:
"total_findings": total_findings,
"email_status": email_result.get("status", "failed"),
"checked_at": datetime.now(timezone.utc).isoformat(),
+ # P125: B-Wiring-Output (B1, B3-B17) ins API-Response-Payload.
+ # Bisher landeten diese nur im Audit-Mail-HTML — externe Aufrufer
+ # (Admin-UI) sahen sie nicht. Schema additiv; legacy clients
+ # ignorieren unbekannte Felder.
+ "extra_findings": state.get("extra_findings") or [],
+ "audit_walk": state.get("audit_walk") or None,
+ "html_blocks": {
+ "widerruf_reach": state.get("widerruf_reach_html", ""),
+ "retention_conflict": state.get("retention_conflict_html", ""),
+ "ai_legal_basis": state.get("ai_legal_basis_html", ""),
+ "url_slug_drift": state.get("url_slug_drift_html", ""),
+ "chatbot_cookie": state.get("chatbot_cookie_html", ""),
+ "audit_walk": state.get("audit_walk_html", ""),
+ "browser_matrix": state.get("browser_matrix_html", ""),
+ "vendor_consistency": state.get("vendor_consistency_html", ""),
+ "ai_act": state.get("ai_act_html", ""),
+ "impressum_agent": state.get("impressum_agent_html", ""),
+ },
}
_compliance_check_jobs[check_id]["status"] = "completed"
diff --git a/backend-compliance/compliance/services/chatbot_policy_discovery.py b/backend-compliance/compliance/services/chatbot_policy_discovery.py
new file mode 100644
index 00000000..29bc74da
--- /dev/null
+++ b/backend-compliance/compliance/services/chatbot_policy_discovery.py
@@ -0,0 +1,161 @@
+"""Discover separate chatbot-/AI-policy pages and merge them into the
+main DSE text.
+
+Many sites publish their chatbot data-protection notice on a separate
+URL (e.g. westfield.com/germany/privacypolicychatbot) that the regular
+auto-discovery misses because it doesn't classify as 'dse'. As a
+result, B12/B15 (chatbot-cookie classification, AI-Act legal basis)
+never see the iAdvize/Vertex provider names.
+
+Strategy:
+ 1. From the discovered URLs derive the base host.
+ 2. Probe a fixed list of well-known chatbot-policy paths.
+ 3. For each 2xx-response with > 300 words, merge the text into
+ state['doc_texts']['dse'] with a separator.
+
+Best-effort: a probe failure NEVER aborts the check.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import re
+from urllib.parse import urlparse
+
+import httpx
+
+logger = logging.getLogger(__name__)
+
+
+# Slug-Kandidaten, sortiert von häufigsten zu seltensten.
+_CHATBOT_POLICY_SLUGS = (
+ "privacypolicychatbot",
+ "chatbot-datenschutz", "chatbot/datenschutz",
+ "datenschutz-chatbot", "datenschutz/chatbot",
+ "ai-policy", "ai-datenschutz", "ki-datenschutz",
+ "privacy-chatbot", "privacy-ai",
+ "datenschutz-ki", "datenschutz-assistent",
+ "chatbot-privacy", "ai-privacy",
+)
+
+
+# Sprach-Prefixe die wir abklopfen.
+_LANG_PREFIXES = ("", "/de", "/de_DE", "/en", "/germany")
+
+
+def _build_candidate_urls(base_origin: str) -> list[str]:
+ """Build all (lang × slug) combinations for one origin."""
+ out: list[str] = []
+ seen: set[str] = set()
+ for lang in _LANG_PREFIXES:
+ for slug in _CHATBOT_POLICY_SLUGS:
+ url = f"{base_origin}{lang}/{slug}".replace("//", "/")
+ url = url.replace("https:/", "https://").replace("http:/", "http://")
+ if url not in seen:
+ seen.add(url)
+ out.append(url)
+ return out
+
+
+async def _probe(url: str, timeout_s: float = 4.0) -> tuple[str, str] | None:
+ """Return (url, text) on 2xx + >300-word body, else None."""
+ try:
+ async with httpx.AsyncClient(
+ timeout=timeout_s, follow_redirects=True,
+ ) as c:
+ r = await c.get(url)
+ if r.status_code >= 400:
+ return None
+ text = re.sub(r"", " ",
+ r.text, flags=re.S | re.I)
+ text = re.sub(r"", " ",
+ text, flags=re.S | re.I)
+ text = re.sub(r"<[^>]+>", " ", text)
+ text = re.sub(r"\s+", " ", text).strip()
+ if len(text.split()) < 300:
+ return None
+ return url, text
+ except Exception:
+ return None
+
+
+def _base_origins(doc_entries: list[dict]) -> list[str]:
+ seen: set[str] = set()
+ out: list[str] = []
+ for e in doc_entries:
+ url = (e.get("url") or "").strip()
+ if not url:
+ continue
+ try:
+ p = urlparse(url)
+ if not p.scheme or not p.netloc:
+ continue
+ origin = f"{p.scheme}://{p.netloc}"
+ if origin not in seen:
+ seen.add(origin)
+ out.append(origin)
+ except Exception:
+ continue
+ return out
+
+
+async def enrich_dse_with_chatbot_policies(state: dict) -> dict:
+ """Probe known chatbot-policy paths; merge findings into DSE text.
+
+ Returns metadata dict describing what was merged (for logging /
+ debugging). Mutates state['doc_texts']['dse'] in place.
+ """
+ doc_entries = state.get("doc_entries") or []
+ origins = _base_origins(doc_entries)
+ if not origins:
+ return {"probed": 0, "found": [], "merged_chars": 0}
+
+ # Build candidate URL list, capped per origin to avoid noise.
+ candidates: list[str] = []
+ for origin in origins[:2]: # cap origins for safety
+ candidates.extend(_build_candidate_urls(origin)[:20])
+
+ if not candidates:
+ return {"probed": 0, "found": [], "merged_chars": 0}
+
+ results = await asyncio.gather(
+ *[_probe(u) for u in candidates],
+ return_exceptions=True,
+ )
+ found = [r for r in results if isinstance(r, tuple) and r]
+
+ if not found:
+ return {"probed": len(candidates), "found": [], "merged_chars": 0}
+
+ # Merge into DSE text.
+ doc_texts = state.setdefault("doc_texts", {})
+ dse_text = doc_texts.get("dse") or ""
+ appended_chars = 0
+ appended_urls: list[str] = []
+ for url, text in found:
+ sep = (
+ f"\n\n--- ergänzt aus {url} (chatbot-policy-discovery) ---\n\n"
+ )
+ dse_text += sep + text
+ appended_chars += len(text)
+ appended_urls.append(url)
+ doc_texts["dse"] = dse_text
+
+ # Also record on the dse-entry (audit trail).
+ for e in doc_entries:
+ if e.get("doc_type") == "dse":
+ e["chatbot_policy_sources"] = appended_urls
+ e["text"] = dse_text
+ break
+
+ logger.info(
+ "chatbot-policy enrichment: %d candidate(s) probed, %d found, "
+ "+%d chars merged into DSE",
+ len(candidates), len(found), appended_chars,
+ )
+ return {
+ "probed": len(candidates),
+ "found": appended_urls,
+ "merged_chars": appended_chars,
+ }
diff --git a/backend-compliance/compliance/services/finding_plausibility_check.py b/backend-compliance/compliance/services/finding_plausibility_check.py
index daecc07d..a3434d17 100644
--- a/backend-compliance/compliance/services/finding_plausibility_check.py
+++ b/backend-compliance/compliance/services/finding_plausibility_check.py
@@ -132,54 +132,102 @@ def _build_user_prompt(items: list[dict], doc_title: str,
)
+async def _post_llm(body: dict) -> str:
+ """One LLM call. Returns content string or empty on failure.
+ Catches network errors so the caller can decide fallback strategy."""
+ try:
+ async with httpx.AsyncClient(timeout=TIMEOUT) as c:
+ r = await c.post(f"{OLLAMA_URL}/api/chat", json=body)
+ r.raise_for_status()
+ return (r.json().get("message") or {}).get("content", "") or ""
+ except Exception as e:
+ logger.warning("plausibility LLM call failed: %s", e)
+ return ""
+
+
+def _try_extract_json(content: str) -> dict | None:
+ """Extract a JSON object from free-form LLM output. Handles
+ markdown-fenced and prose-wrapped responses."""
+ if not content:
+ return None
+ s = content.strip()
+ # Strip ```json … ``` fences
+ if s.startswith("```"):
+ s = s.strip("`")
+ if s.lower().startswith("json"):
+ s = s[4:]
+ s = s.strip()
+ # Heuristic: cut from first { to last }
+ first = s.find("{")
+ last = s.rfind("}")
+ if first >= 0 and last > first:
+ s = s[first:last + 1]
+ try:
+ return json.loads(s)
+ except Exception:
+ return None
+
+
async def _ask_llm_batch(items: list[dict], doc_title: str,
doc_excerpt: str) -> dict[str, dict]:
- """Send a batch of up to BATCH_SIZE findings to the LLM."""
- body = {
+ """Send a batch of up to BATCH_SIZE findings to the LLM.
+
+ Resilience strategy (P125 fix for empty-response bug):
+ A. format='json' (strict) — current default
+ B. If A returns empty: format='' (loose), extract JSON manually
+ C. If B also empty AND batch >2: split batch + recurse
+ D. Else: give up, return {} (callers stamp llm_skipped=true)
+ """
+ user_prompt = _build_user_prompt(items, doc_title, doc_excerpt)
+ base_body = {
"model": MODEL,
"messages": [
{"role": "system", "content": _SYSTEM_PROMPT},
- {"role": "user", "content": _build_user_prompt(
- items, doc_title, doc_excerpt,
- )},
+ {"role": "user", "content": user_prompt},
],
- "format": "json",
"stream": False,
"options": {"temperature": 0.0, "seed": 42, "num_predict": 1500},
}
out: dict[str, dict] = {}
input_ids = [it["id"] for it in items]
try:
- async with httpx.AsyncClient(timeout=TIMEOUT) as c:
- r = await c.post(f"{OLLAMA_URL}/api/chat", json=body)
- r.raise_for_status()
- content = (r.json().get("message") or {}).get("content", "")
- if not content:
- # Single retry with smaller batch — qwen3 sometimes
- # rejects ≥6-item prompts under format='json'.
- if len(items) > 2:
- half = len(items) // 2
- logger.info(
- "plausibility empty → retry split %d → %dx2",
- len(items), half,
- )
- first = await _ask_llm_batch(
- items[:half], doc_title, doc_excerpt,
- )
- second = await _ask_llm_batch(
- items[half:], doc_title, doc_excerpt,
- )
- out.update(first)
- out.update(second)
- return out
- logger.warning("plausibility LLM returned empty content")
+ # Strategy A: format='json'
+ content = await _post_llm({**base_body, "format": "json"})
+ if not content:
+ # Strategy B: format-free, parse-on-our-side
+ logger.info(
+ "plausibility A→empty, trying B (format-free) batch=%d",
+ len(items),
+ )
+ content = await _post_llm(base_body)
+
+ if not content:
+ # Strategy C: split + recurse
+ if len(items) > 2:
+ half = len(items) // 2
+ logger.info(
+ "plausibility A+B empty → split %d → %dx2",
+ len(items), half,
+ )
+ first = await _ask_llm_batch(
+ items[:half], doc_title, doc_excerpt,
+ )
+ second = await _ask_llm_batch(
+ items[half:], doc_title, doc_excerpt,
+ )
+ out.update(first)
+ out.update(second)
return out
- try:
- data = json.loads(content)
- except json.JSONDecodeError as je:
+ # Strategy D: give up
+ logger.warning(
+ "plausibility gave up after A+B for batch=%d", len(items),
+ )
+ return out
+ data = _try_extract_json(content)
+ if data is None:
logger.warning(
- "plausibility LLM JSON parse failed: %s; raw=%s",
- je, content[:300],
+ "plausibility LLM JSON parse failed (after fallback); "
+ "raw=%s", content[:300],
)
return out
llm_findings = data.get("findings") or []
diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py
index eab6b1bd..bf481050 100644
--- a/backend-compliance/compliance/services/mail_render_v2/_compose.py
+++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py
@@ -58,6 +58,8 @@ def compose_v2(state: dict) -> str:
state.get("url_slug_drift_html", ""),
# B17 Audit-Walk-Video (Beweis-Aufzeichnung)
state.get("audit_walk_html", ""),
+ # B18 Impressum-Specialist-Agent (Pattern + LLM)
+ state.get("impressum_agent_html", ""),
# Browser-Matrix (Stage 1.c)
state.get("browser_matrix_html", ""),
# All legacy build_*_html() wrapped in V2 sections — preserves
diff --git a/backend-compliance/compliance/services/specialist_agents/impressum_agent_llm.py b/backend-compliance/compliance/services/specialist_agents/impressum_agent_llm.py
new file mode 100644
index 00000000..98df701c
--- /dev/null
+++ b/backend-compliance/compliance/services/specialist_agents/impressum_agent_llm.py
@@ -0,0 +1,166 @@
+"""Impressum-Specialist-Agent Phase 2 — LLM-gestützt.
+
+Komplementiert den Pattern-Match-Agent (impressum_agent.py) durch
+eine LLM-Pass. Beide Output-Formate sind identisch, sodass das B-Wiring
+beide kombinieren / dedupen kann.
+
+LLM-Setup:
+ - Modell: qwen3:30b-a3b (Standard Ollama, siehe Plausibility-Check)
+ - System-Prompt: KB der § 5 TMG Pflichtangaben
+ - User-Prompt: Impressum-Text + business_scope-Hinweis
+ - Output: JSON-Liste mit {field_id, severity, hint, evidence}
+
+Phase-2-Ziel: schwer-mit-Regex-erfassbare Lücken finden, z.B.
+ - "Geschäftsführer" wird genannt aber ohne Vor- oder Nachname
+ - Aufsichtsbehörde-Pflicht erkannt, aber für falsche Branche
+ - Vertretungsberechtigte einer GmbH bei mehreren Personen unvollständig
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import re
+
+import httpx
+
+logger = logging.getLogger(__name__)
+
+OLLAMA_URL = os.environ.get(
+ "OLLAMA_URL", "http://bp-core-ollama:11434",
+)
+MODEL = os.environ.get("IMPRESSUM_AGENT_MODEL", "qwen3:30b-a3b")
+TIMEOUT = float(os.environ.get("IMPRESSUM_AGENT_TIMEOUT", "60"))
+
+
+_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus
+§ 5 TMG / DDG (Anbieterkennzeichnung). Deine Aufgabe: einen Impressum-
+Text auf Vollständigkeit der Pflichtangaben prüfen und Lücken /
+Mängel strukturiert auflisten.
+
+Pflichtangaben nach § 5 TMG (Standard):
+ - Anbieter-Name + Anschrift (juristische Person: Firma + Sitz)
+ - Vertretungsberechtigte (bei juristischen Personen: ALLE Geschäftsführer
+ mit Vor- und Nachname)
+ - E-Mail UND Telefon (Schnelle elektronische Kontaktaufnahme + UNMITTELBAR)
+ - Handelsregister-Eintrag (HRB/HRA + Registergericht)
+ - USt-IdNr. (falls vorhanden — DE\\d{9})
+ - Bei B2C/Onlineshop: Verbraucherschlichtung + OS-Plattform
+ - Bei reglementiertem Beruf: Berufsbezeichnung + Kammer
+ - Bei genehmigungspflichtigen Tätigkeiten: Aufsichtsbehörde
+
+Ausgabe: NUR gültiges JSON mit Feld "findings", jedes Element:
+ {
+ "field_id": "kurzer-id",
+ "severity": "HIGH"|"MEDIUM"|"LOW",
+ "title": "kurze Lücken-Beschreibung",
+ "evidence": "wörtliches Zitat aus dem Impressum, das das Problem belegt",
+ "action": "konkrete Empfehlung"
+ }
+
+Keine Erklärung außerhalb JSON. Keine Prosa. Wenn alles vollständig:
+gib {"findings": []} zurück.
+"""
+
+
+def _user_prompt(impressum_text: str,
+ business_scope: set[str] | None) -> str:
+ scope_hint = ""
+ if business_scope:
+ scope_hint = (
+ f"BUSINESS-SCOPE-HINTS: "
+ f"{', '.join(sorted(business_scope))}\n\n"
+ )
+ return (
+ f"{scope_hint}"
+ f"IMPRESSUM-TEXT:\n"
+ f"{impressum_text[:4000]}\n\n"
+ "Liste Lücken nach § 5 TMG. Nur JSON."
+ )
+
+
+def _parse_response(content: str) -> list[dict]:
+ """Robust JSON extraction (handles ```json fences, prose-wrap)."""
+ if not content:
+ return []
+ s = content.strip()
+ if s.startswith("```"):
+ s = s.strip("`")
+ if s.lower().startswith("json"):
+ s = s[4:]
+ s = s.strip()
+ first = s.find("{")
+ last = s.rfind("}")
+ if first >= 0 and last > first:
+ s = s[first:last + 1]
+ try:
+ data = json.loads(s)
+ except Exception:
+ # Try array directly
+ first = content.find("[")
+ last = content.rfind("]")
+ if first >= 0 and last > first:
+ try:
+ arr = json.loads(content[first:last + 1])
+ return arr if isinstance(arr, list) else []
+ except Exception:
+ return []
+ return []
+ findings = data.get("findings") if isinstance(data, dict) else data
+ return findings if isinstance(findings, list) else []
+
+
+async def evaluate_llm(
+ impressum_text: str,
+ business_scope: set[str] | None = None,
+) -> list[dict]:
+ """LLM-gestützte Impressum-Analyse. Returns finding dicts in the
+ same shape as impressum_agent.evaluate() so callers can merge."""
+ if not impressum_text or len(impressum_text.strip()) < 100:
+ return []
+ body = {
+ "model": MODEL,
+ "messages": [
+ {"role": "system", "content": _SYSTEM_PROMPT},
+ {"role": "user", "content": _user_prompt(
+ impressum_text, business_scope,
+ )},
+ ],
+ "format": "json",
+ "stream": False,
+ "options": {"temperature": 0.0, "seed": 42, "num_predict": 1200},
+ }
+ try:
+ async with httpx.AsyncClient(timeout=TIMEOUT) as c:
+ r = await c.post(f"{OLLAMA_URL}/api/chat", json=body)
+ r.raise_for_status()
+ content = (r.json().get("message") or {}).get("content", "") or ""
+ except Exception as e:
+ logger.warning("impressum_agent_llm call failed: %s", e)
+ return []
+
+ raw_findings = _parse_response(content)
+ out: list[dict] = []
+ for f in raw_findings:
+ if not isinstance(f, dict):
+ continue
+ fid = re.sub(r"[^\w\-]", "_",
+ str(f.get("field_id") or "unknown"))[:40]
+ sev = (f.get("severity") or "MEDIUM").upper()
+ if sev not in ("HIGH", "MEDIUM", "LOW", "INFO"):
+ sev = "MEDIUM"
+ out.append({
+ "check_id": f"IMPRESSUM-AGENT-LLM-{fid.upper()}",
+ "agent": "impressum_agent_v2_llm",
+ "field_id": fid,
+ "severity": sev,
+ "severity_reason": "missing",
+ "title": str(f.get("title") or "")[:200],
+ "norm": "§ 5 TMG / DDG (LLM-Analyse)",
+ "evidence": str(f.get("evidence") or "")[:300],
+ "action": str(f.get("action") or "")[:400],
+ })
+ if out:
+ logger.info("impressum_agent_llm: %d finding(s)", len(out))
+ return out
diff --git a/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py b/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py
index 9c04c258..4c9223fc 100644
--- a/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py
+++ b/backend-compliance/compliance/services/widerrufsbelehrung_reachability_check.py
@@ -44,6 +44,17 @@ _B2C_WEAK = (
"shop", "store", "kaufen", "produkt", "ware", "rechnung",
"agb", "widerrufsfrist", "widerrufsrecht", "wallbox", "hardware",
"abonnement", "tarif buchen", "naturstrom", "ladetarif",
+ # Versicherungs- / Finanz-B2C
+ "reiseversicherung", "versicherung abschließen",
+ "versicherung kaufen", "online abschließen", "online-antrag",
+ "antrag stellen", "police", "vertrag abschließen",
+ "tarifrechner", "beitrag berechnen", "jetzt online",
+ # Telekom / Energie / Mobilfunk B2C
+ "vertrag buchen", "tarif wechseln", "stromtarif",
+ "gastarif", "mobilfunkvertrag", "dsl-tarif",
+ # Reise / Hotel / Mobility B2C
+ "buchen", "reservieren", "buchung", "ticket kaufen",
+ "fahrkarte", "flug buchen",
)
# Hard B2B-only signals that override B2C-Verdacht.
diff --git a/backend-compliance/tests/test_b18_impressum_agent.py b/backend-compliance/tests/test_b18_impressum_agent.py
new file mode 100644
index 00000000..0f9cb31a
--- /dev/null
+++ b/backend-compliance/tests/test_b18_impressum_agent.py
@@ -0,0 +1,132 @@
+"""Tests for B18 Impressum-Specialist-Agent (Pattern + LLM)."""
+
+import asyncio
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from compliance.api.agent_check._b18_wiring import _render, run_b18
+from compliance.services.specialist_agents.impressum_agent_llm import (
+ _parse_response,
+)
+
+
+_GOOD_IMPRESSUM = """
+Acme GmbH
+Musterstraße 1
+10115 Berlin
+
+Handelsregister: HRB 12345 Berlin
+USt-IdNr: DE123456789
+Geschäftsführer: Max Mustermann
+
+Telefon: +49 30 12345
+E-Mail: info@acme.example
+"""
+
+_BAD_IMPRESSUM = (
+ "Acme GmbH, Musterstraße 1, 10115 Berlin. "
+ "Kontakt: info@acme.example. "
+ "Wir freuen uns ueber Ihren Besuch auf unserer Website "
+ "und ueber Ihr Interesse an unserem Unternehmen und unseren "
+ "Produkten. Bitte beachten Sie auch unsere weiteren Hinweise."
+)
+
+
+class TestParseResponse:
+ def test_pure_json(self):
+ out = _parse_response('{"findings":[{"field_id":"foo","severity":"HIGH"}]}')
+ assert len(out) == 1
+ assert out[0]["field_id"] == "foo"
+
+ def test_markdown_fenced_json(self):
+ out = _parse_response('```json\n{"findings":[{"field_id":"x"}]}\n```')
+ assert len(out) == 1
+
+ def test_prose_wrapped(self):
+ out = _parse_response(
+ 'Hier ist die Analyse: {"findings":[{"field_id":"y"}]} Ende.'
+ )
+ assert len(out) == 1
+
+ def test_empty(self):
+ assert _parse_response("") == []
+
+ def test_garbage(self):
+ assert _parse_response("not json at all") == []
+
+
+class TestRunB18Wiring:
+ def test_short_impressum_skipped(self):
+ state = {"doc_texts": {"impressum": "tiny"}}
+ asyncio.run(run_b18(state))
+ assert "impressum_agent_html" not in state
+
+ def test_no_impressum_skipped(self):
+ asyncio.run(run_b18({"doc_texts": {}}))
+
+ def test_merges_pattern_and_llm(self):
+ # Pattern-agent will likely find no gaps in _GOOD_IMPRESSUM.
+ # Mock the LLM to return a fake additional finding.
+ async def fake_llm(text, scope):
+ return [{
+ "check_id": "IMPRESSUM-AGENT-LLM-DPO",
+ "agent": "impressum_agent_v2_llm",
+ "field_id": "dpo",
+ "severity": "MEDIUM",
+ "title": "DSB-Verweis fehlt",
+ "norm": "§ 5 TMG / DDG (LLM)",
+ "evidence": "kein Hinweis auf DSB",
+ "action": "DSB im Impressum verlinken",
+ }]
+ with patch(
+ "compliance.api.agent_check._b18_wiring.evaluate_llm",
+ new=fake_llm,
+ ):
+ state = {"doc_texts": {"impressum": _GOOD_IMPRESSUM},
+ "profile_dict": {}}
+ asyncio.run(run_b18(state))
+ assert "impressum_agent_html" in state
+ extras = state.get("extra_findings") or []
+ ids = [f.get("check_id") for f in extras]
+ assert any("LLM-DPO" in i for i in ids)
+
+ def test_dedup_pattern_vs_llm_same_field(self):
+ # Pattern agent returns ust_id; mocked LLM also returns ust_id —
+ # only one should survive the dedup.
+ async def fake_llm(text, scope):
+ return [{
+ "check_id": "IMPRESSUM-AGENT-LLM-UST_ID",
+ "agent": "impressum_agent_v2_llm",
+ "field_id": "ust_id",
+ "severity": "HIGH",
+ "title": "duplicate ust_id finding",
+ "norm": "§ 5 TMG",
+ "evidence": "—",
+ "action": "—",
+ }]
+ with patch(
+ "compliance.api.agent_check._b18_wiring.evaluate_llm",
+ new=fake_llm,
+ ):
+ state = {"doc_texts": {"impressum": _BAD_IMPRESSUM},
+ "profile_dict": {}}
+ asyncio.run(run_b18(state))
+ ust_findings = [
+ f for f in state.get("extra_findings") or []
+ if (f.get("field_id") or "").lower() == "ust_id"
+ ]
+ assert len(ust_findings) == 1
+
+
+class TestRender:
+ def test_render_with_two_findings(self):
+ merged = [
+ {"check_id": "X", "title": "A", "severity": "HIGH",
+ "agent": "impressum_agent_v1", "norm": "n", "action": "do"},
+ {"check_id": "Y", "title": "B", "severity": "MEDIUM",
+ "agent": "impressum_agent_v2_llm", "norm": "n", "action": "do"},
+ ]
+ html = _render(merged, merged[:1], merged[1:])
+ assert "KB" in html # pattern tag
+ assert "LLM" in html # llm tag
+ assert "Pattern-Match: 1" in html
+ assert "LLM-Analyse: 1" in html
diff --git a/backend-compliance/tests/test_chatbot_policy_discovery.py b/backend-compliance/tests/test_chatbot_policy_discovery.py
new file mode 100644
index 00000000..75d855a3
--- /dev/null
+++ b/backend-compliance/tests/test_chatbot_policy_discovery.py
@@ -0,0 +1,107 @@
+"""Tests for chatbot-policy DSE-enrichment."""
+
+import asyncio
+from unittest.mock import patch
+
+from compliance.services.chatbot_policy_discovery import (
+ _base_origins,
+ _build_candidate_urls,
+ enrich_dse_with_chatbot_policies,
+)
+
+
+class TestBuildCandidates:
+ def test_includes_known_slug(self):
+ urls = _build_candidate_urls("https://example.com")
+ assert any("privacypolicychatbot" in u for u in urls)
+
+ def test_includes_lang_prefix_variants(self):
+ urls = _build_candidate_urls("https://example.com")
+ # Both root and /de variants exist
+ assert any("/de/" in u for u in urls)
+ assert any("https://example.com/privacypolicychatbot" == u
+ for u in urls)
+
+
+class TestBaseOrigins:
+ def test_dedup(self):
+ entries = [
+ {"url": "https://example.com/a"},
+ {"url": "https://example.com/b"},
+ {"url": "https://other.de/x"},
+ ]
+ assert _base_origins(entries) == [
+ "https://example.com", "https://other.de",
+ ]
+
+ def test_skip_empty(self):
+ entries = [{"url": ""}, {"url": "https://example.com/"}]
+ assert _base_origins(entries) == ["https://example.com"]
+
+
+class TestEnrichment:
+ def test_no_entries_returns_zero(self):
+ result = asyncio.run(enrich_dse_with_chatbot_policies({}))
+ assert result["probed"] == 0
+
+ def test_all_404_no_merge(self):
+ async def fake_probe(url, timeout_s=4.0):
+ return None
+ with patch(
+ "compliance.services.chatbot_policy_discovery._probe",
+ new=fake_probe,
+ ):
+ state = {
+ "doc_entries": [{"url": "https://x.de/dse"}],
+ "doc_texts": {"dse": "original"},
+ }
+ result = asyncio.run(enrich_dse_with_chatbot_policies(state))
+ assert result["found"] == []
+ assert state["doc_texts"]["dse"] == "original"
+
+ def test_mocked_probe_merges_short_text(self):
+ # When _probe is mocked, the word-count gate of the real _probe
+ # is bypassed; this is the helper-level contract.
+ async def fake_probe(url, timeout_s=4.0):
+ if "privacypolicychatbot" in url:
+ return (url, "short text")
+ return None
+ with patch(
+ "compliance.services.chatbot_policy_discovery._probe",
+ new=fake_probe,
+ ):
+ state = {
+ "doc_entries": [
+ {"url": "https://x.de/dse", "doc_type": "dse",
+ "text": "main dse"},
+ ],
+ "doc_texts": {"dse": "main dse"},
+ }
+ result = asyncio.run(enrich_dse_with_chatbot_policies(state))
+ assert len(result["found"]) >= 1
+
+ def test_long_enough_text_is_merged(self):
+ async def fake_probe(url, timeout_s=4.0):
+ if "privacypolicychatbot" in url:
+ return (url, "chatbot iadvize ".strip() * 200)
+ return None
+ with patch(
+ "compliance.services.chatbot_policy_discovery._probe",
+ new=fake_probe,
+ ):
+ state = {
+ "doc_entries": [
+ {"url": "https://x.de/dse", "doc_type": "dse",
+ "text": "original"},
+ ],
+ "doc_texts": {"dse": "original"},
+ }
+ asyncio.run(enrich_dse_with_chatbot_policies(state))
+ # The text has 200 repeats of "chatbot iadvize " = 400 words
+ assert "iadvize" in state["doc_texts"]["dse"]
+ assert state["doc_texts"]["dse"].startswith("original")
+ # dse-entry should record source for audit trail
+ dse_entry = next(
+ e for e in state["doc_entries"] if e["doc_type"] == "dse"
+ )
+ assert dse_entry["chatbot_policy_sources"]
diff --git a/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py b/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py
index eebb8cb8..f4a6f5b6 100644
--- a/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py
+++ b/backend-compliance/tests/test_widerrufsbelehrung_reachability_check.py
@@ -42,6 +42,17 @@ class TestDetectB2CScope:
scope, _ = _detect_b2c_scope(s)
assert scope == "unknown"
+ def test_versicherung_combo_promotes_to_likely(self):
+ s = _state(home_text="Reiseversicherung jetzt online "
+ "abschließen. Tarifrechner verfügbar.")
+ scope, _ = _detect_b2c_scope(s)
+ assert scope == "b2c_likely"
+
+ def test_buchung_combo_promotes_to_likely(self):
+ s = _state(home_text="Flug buchen oder Hotel reservieren.")
+ scope, _ = _detect_b2c_scope(s)
+ assert scope == "b2c_likely"
+
def test_empty_state(self):
s = _state()
scope, _ = _detect_b2c_scope(s)