diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py index 05e6290c..3af8bb1d 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py @@ -1,13 +1,22 @@ -"""Impressum-Agent v2 — konsolidierter BaseSpecialistAgent. +"""Impressum-Agent v3 — baut auf doc_check_controls (75 echte MCs aus DB). -Ablauf: - 1. Deterministische MCs durchlaufen → Findings + mc_coverage. - 2. Wenn unklare Felder (HIGH/MEDIUM missing) → LLM-Eskalation. - 3. LLM-Findings dedupen mit MC-Findings nach field_id. - 4. Rollup → Recommendations. - 5. Disclaimer-Lint → AgentOutput. +Sprint 1.12 (User-Vorgabe 2026-06-09): -Phase 1 (jetzt): qwen2.5:7b als Stage-1, OVH optional als Stage-2. +Architektur, 4 Layer: + Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py + regex_boost.py) + Layer 1 — Keyword-Match aus doc_check_controls.pass_criteria + (rag_document_checker.check_document_with_controls) + Layer 2 — BGE-M3 Embedding-Match (in rag_document_checker integriert) + Layer 3 — Semantic-Validator (LLM) für noch immer offene FAILs + + Auto-Learning der Labels + +Output-Layer (mein Stack bleibt 1:1): + - Disclaimer-Linter + - Rollup-Dedup + - Methodik-First-UI + +Vorteil: 75 statt 12 Master-Controls; DB-konsistent; Embedding-Fallback +gratis; meine Konzepte bleiben als Boost + Output-Sanitizer. """ from __future__ import annotations @@ -27,146 +36,131 @@ from .._base import ( SourceType, lint_output, ) -from .._escalation import cascade -from .._pattern_library import load_patterns_for, record as record_pattern +from .._pattern_library import record as record_pattern from .._rollup import rollup -from .._semantic_validator import ( - build_rename_action, - validate_present, -) -from .mcs import MC_IDS, MCS, detect_automotive, scope_matches +from .._semantic_validator import build_rename_action, validate_present +from .mcs import MC_IDS, MCS, detect_automotive +from .v3_engine import run_v3_pipeline logger = logging.getLogger(__name__) -_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus -§ 5 TMG / DDG (Anbieterkennzeichnung). Aufgabe: Impressum prüfen und -LÜCKEN aufzählen, die einer regex-basierten Vorprüfung entgangen sind. - -WICHTIG: - - KEINE Bewertung "rechtssicher" / "garantiert" / "konform". - - Wenn unsicher: leeres Array zurückgeben statt zu halluzinieren. - - Wörtliches Zitat als evidence bei jeder Lücke. - -Antworte NUR mit JSON, Schema: - {"findings": [ - {"field_id": "...", "severity": "HIGH|MEDIUM|LOW", - "title": "...", "evidence": "wörtliches Zitat", - "action": "konkrete Empfehlung"} - ]} -""" +_SEV_TO_ENUM = { + "CRITICAL": Severity.HIGH, + "HIGH": Severity.HIGH, + "MEDIUM": Severity.MEDIUM, + "LOW": Severity.LOW, + "INFO": Severity.INFO, +} class ImpressumAgent(BaseSpecialistAgent): agent_id = "impressum" - agent_version = "2.0" + agent_version = "3.0" doc_type = "impressum" + # Owner-Liste der eigenen Boost-Pattern-IDs (Layer 0). + # Die DB-MC-IDs werden dynamisch ermittelt und sind kein Owner- + # Konflikt: doc_check_controls.control_id-Werte sind global, der + # Agent ist Caller, nicht Owner. owned_mc_ids = MC_IDS async def evaluate(self, agent_input: AgentInput) -> AgentOutput: start = datetime.now(timezone.utc) text = (agent_input.text or "").strip() scope = set(agent_input.business_scope or []) - # Auto-detect KFZ - is_automotive = detect_automotive(text) - if is_automotive: + if detect_automotive(text): scope.add("automotive") - mc_findings: list[Finding] = [] coverage: list[McCoverage] = [] + findings: list[Finding] = [] + esc_logs: list[EscalationLog] = [] + notes_parts: list[str] = [] - if len(text) < 50: - # Doc zu kurz → alle als skipped + if len(text) < 100: + # Doc zu kurz — alle eigenen Pattern-IDs als skipped for mc in MCS: coverage.append(McCoverage( mc_id=mc.mc_id, status="skipped", - reason="doc too short or empty", + reason="text too short", )) return self._finalize( - start, mc_findings, [], coverage, confidence=0.0, + start, findings, esc_logs, coverage, + confidence=0.0, notes="Impressum-Text zu kurz oder leer.", ) - for mc in MCS: - if not scope_matches(mc, scope, is_automotive): - coverage.append(McCoverage( - mc_id=mc.mc_id, status="na", - reason=f"scope mismatch (needs {mc.requires_scope})", - )) + # ── Layer 0 + 1 + 2 (Boost + Keyword + Embedding) ────────── + results, telemetry = await run_v3_pipeline(text, scope) + notes_parts.append( + f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · " + f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · " + f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides" + ) + + # DB-MCs → Findings + Coverage + seen_db_mcs: set[str] = set() + for r in results: + mc_id = r.get("control_id") or "" + if not mc_id or mc_id in seen_db_mcs: continue - found = any(p.search(text) for p in mc.patterns) - if not found: - # 1.11: Auto-Learning — gelernte Labels probieren. - # Wenn ein gelerntes Pattern matcht: als OK werten + - # Coverage-Reason markiert das. - learned = load_patterns_for(mc.field_id, self.agent_id) - if any(lp.search(text) for lp in learned): - coverage.append(McCoverage( - mc_id=mc.mc_id, status="ok", - reason=f"learned-pattern matched " - f"({len(learned)} gelernt)", - )) - continue - if found: - coverage.append(McCoverage( - mc_id=mc.mc_id, status="ok", - )) + seen_db_mcs.add(mc_id) + passed = bool(r.get("passed")) + sev = _SEV_TO_ENUM.get( + (r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM, + ) + coverage.append(McCoverage( + mc_id=mc_id, + status="ok" if passed else sev.value.lower(), + reason=str(r.get("matched_text") or r.get("hint") or "")[:120], + )) + if passed: continue - # Missing → Finding - sev = self._sev(mc.severity_if_missing) - action = self._build_action(mc, is_automotive) - mc_findings.append(Finding( - check_id=f"IMPRESSUM-AGENT-{mc.field_id.upper()}", + label = r.get("label") or r.get("hint") or "" + findings.append(Finding( + check_id=f"DBMC-{mc_id}", agent=self.agent_id, agent_version=self.agent_version, - field_id=mc.field_id, + field_id=mc_id, severity=sev, - severity_reason="missing", - title=f"Pflichtangabe '{mc.label}' fehlt im Impressum", - norm=mc.norm, + severity_reason="db_mc_failed", + title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt", + norm=str(r.get("regulation") or "") + + (f" Art. {r.get('article')}" + if r.get("article") else ""), evidence="", - action=action, - confidence=0.95, + action=str(r.get("hint") or "")[:400] + or "Bitte gegen die Pflichtangaben prüfen.", + confidence=0.9, sources=[EvidenceSource( source_type=SourceType.MC, - source_id=mc.mc_id, - detail=f"regex check {len(mc.patterns)} pattern(s) negative", + source_id=mc_id, + detail=str(r.get("source") or "keyword_match")[:120], + confidence=0.9, )], )) + + # Layer 0: eigene Pattern-IDs immer mit ins coverage (für UI) + boost_ids = set(telemetry.get("layer_0_field_ids") or []) + for mc in MCS: + cov_status = "ok" if mc.field_id in boost_ids else "na" + cov_reason = ("regex-boost hit" + if mc.field_id in boost_ids + else "kein Pattern-Treffer (kein Veto)") coverage.append(McCoverage( - mc_id=mc.mc_id, - status=sev.value.lower(), - reason="missing", + mc_id=mc.mc_id, status=cov_status, reason=cov_reason, )) - # Semantic-Validator: prüft per LLM ob HIGH-Missings doch - # vorhanden sind (unter abweichendem Label). Demoted HIGH→LOW - # mit Rename-Empfehlung wenn ja. User-Vorgabe 2026-06-09. - await self._semantic_demote(text, mc_findings, coverage) + # ── Layer 3: Semantic-Validator nur für HIGH/MEDIUM-Fails ── + await self._semantic_demote(text, findings, coverage) - # Eskalation: für die identifizierten Lücken kann ein LLM - # zusätzliche Tiefen-Findings liefern (z.B. "Geschäftsführer - # genannt, aber ohne Nachname"). Confidence der MC-Findings - # ist hoch — eskalieren wir wegen "weitere subtile Lücken - # finden", nicht weil wir unsicher sind. - esc_findings, esc_logs = await self._maybe_escalate(text, scope) - - # Dedup per field_id (MC hat Priorität) - seen_fields = {f.field_id for f in mc_findings if f.field_id} - for f in esc_findings: - if f.field_id and f.field_id in seen_fields: - continue - mc_findings.append(f) - - # Confidence: harmonisches Mittel über alle Finding-Confidences - if mc_findings: - confs = [f.confidence for f in mc_findings if f.confidence] - overall = sum(confs) / len(confs) if confs else 0.8 - else: - overall = 0.95 # nichts gefunden → alle MCs ok + # Confidence: harmonic mean der Findings (oder hoch wenn 0) + confs = [f.confidence for f in findings if f.confidence] or [0.95] + overall = sum(confs) / len(confs) return self._finalize( - start, mc_findings, esc_logs, coverage, confidence=overall, + start, findings, esc_logs, coverage, + confidence=overall, + notes=" · ".join(notes_parts), ) async def _semantic_demote( @@ -175,45 +169,37 @@ class ImpressumAgent(BaseSpecialistAgent): findings: list[Finding], coverage: list[McCoverage], ) -> None: - """LLM-Layer für HIGH/MEDIUM-missings — demote zu LOW wenn da.""" - candidates: list[tuple[str, str, Finding]] = [] - for f in findings: - # Demote-Kandidaten: HIGH oder MEDIUM-Pattern-Misses. - # LOW/INFO bleiben unverändert (sind selbst schon Best- - # Practice-Empfehlungen). - if f.severity not in (Severity.HIGH.value, - Severity.MEDIUM.value): - continue - if f.severity_reason != "missing": - continue - # Suche zugehöriges MC für die Beschreibung - mc = next((m for m in MCS if m.field_id == f.field_id), None) - label = mc.label if mc else f.field_id - candidates.append((f.field_id, label, f)) + """LLM-Layer für HIGH/MEDIUM-DB-MCs: Label-Mismatch-Check. + Bei Fund → HIGH/MEDIUM → LOW + Rename-Action.""" + candidates = [ + f for f in findings + if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value) + and f.severity_reason == "db_mc_failed" + ] if not candidates: return result = await validate_present( - text, [(c[0], c[1]) for c in candidates], + text, [(f.field_id, f.title[:80]) for f in candidates], ) if not result: return - for field_id, label, finding in candidates: - row = result.get(field_id) + for finding in candidates: + row = result.get(finding.field_id) if not row or not row.get("found"): continue if row.get("confidence", 0) < 0.6: continue label_used = row.get("label_used") or "abweichendes Label" - # Demote in-place + conf = float(row.get("confidence") or 0.8) finding.severity = Severity.LOW.value finding.severity_reason = "label_mismatch" finding.title = ( - f"Label '{label_used}' weicht von Standard-" - f"Bezeichnung ab" + f"Label '{label_used}' weicht von Standard ab" + ) + finding.evidence = str(row.get("evidence") or "")[:200] + finding.action = build_rename_action( + finding.field_id, label_used, ) - finding.evidence = row.get("evidence", "")[:200] - finding.action = build_rename_action(field_id, label_used) - conf = float(row.get("confidence") or 0.8) finding.confidence = conf finding.sources.append(EvidenceSource( source_type=SourceType.LLM_LOCAL, @@ -221,79 +207,20 @@ class ImpressumAgent(BaseSpecialistAgent): detail=f"LLM-confirmed: '{label_used}'", confidence=conf, )) - # 1.11: Auto-Learning — Label-Match in der Library - # persistieren. Beim nächsten Run wird das gelernte - # Pattern bereits beim MC-Pass berücksichtigt, ohne - # erneuten LLM-Call. + # Coverage update + auto-learning + for c in coverage: + if c.mc_id and c.mc_id == f"DBMC-{finding.field_id}": + c.status = "low" + c.reason = f"label_mismatch: '{label_used}'" try: record_pattern( - field_id=field_id, + field_id=finding.field_id, label_used=label_used, confidence=conf, agent_id=self.agent_id, ) except Exception as e: - import logging - logging.getLogger(__name__).warning( - "pattern-library record failed: %s", e, - ) - # Update coverage status - for c in coverage: - if c.mc_id and c.mc_id.endswith(field_id.upper()): - continue - # Robuster: nach mc_id über MCS - mc = next((m for m in MCS if m.field_id == field_id), None) - if mc: - cov = next((c for c in coverage - if c.mc_id == mc.mc_id), None) - if cov: - cov.status = "low" - cov.reason = f"label_mismatch: '{label_used}'" - - async def _maybe_escalate( - self, text: str, scope: set[str], - ) -> tuple[list[Finding], list[EscalationLog]]: - """LLM-Stage. Aktivierbar via Agent-Settings (default an).""" - # Hard cap auf Text-Größe für den LLM-Pass - user_prompt = ( - f"BUSINESS-SCOPE: {', '.join(sorted(scope))}\n\n" - f"IMPRESSUM-TEXT:\n{text[:4000]}\n\n" - "Liste subtile Lücken nach § 5 TMG. Nur JSON." - ) - res, logs = await cascade(_SYSTEM_PROMPT, user_prompt) - if res is None or not isinstance(res.parsed, (dict, list)): - return [], logs - raw = (res.parsed.get("findings") - if isinstance(res.parsed, dict) else res.parsed) - if not isinstance(raw, list): - return [], logs - out: list[Finding] = [] - for item in raw: - if not isinstance(item, dict): - continue - fid = str(item.get("field_id") or "unknown")[:40] - sev_raw = str(item.get("severity") or "MEDIUM").upper() - sev = self._sev(sev_raw) - out.append(Finding( - check_id=f"IMPRESSUM-AGENT-LLM-{fid.upper()}", - agent=self.agent_id, - agent_version=self.agent_version, - field_id=fid, - severity=sev, - severity_reason="llm_detected", - title=str(item.get("title") or "")[:200], - norm="§ 5 TMG / DDG (LLM-Analyse)", - evidence=str(item.get("evidence") or "")[:300], - action=str(item.get("action") or "")[:400], - confidence=0.7, - sources=[EvidenceSource( - source_type=res.stage, - source_id=res.model, - detail=f"prompt_chars={len(user_prompt)}", - confidence=0.7, - )], - )) - return out, logs + logger.warning("pattern-library record failed: %s", e) def _finalize( self, @@ -326,29 +253,3 @@ class ImpressumAgent(BaseSpecialistAgent): mc_low=sum(1 for c in coverage if c.status == "low"), ) return lint_output(out) - - @staticmethod - def _sev(value: str) -> Severity: - v = (value or "").upper() - if v == "HIGH": - return Severity.HIGH - if v == "MEDIUM": - return Severity.MEDIUM - if v == "LOW": - return Severity.LOW - return Severity.INFO - - @staticmethod - def _build_action(mc, is_automotive: bool) -> str: - if mc.field_id == "aufsichtsbehoerde" and is_automotive: - return ( - "Aufsichtsbehörde im Impressum benennen. Für " - "KFZ-Hersteller/-Vertrieb typisch: Kraftfahrt-" - "Bundesamt (KBA), Fördestraße 16, 24944 Flensburg, " - "www.kba.de. Bei Ladestrom-Vertrieb zusätzlich " - "Bundesnetzagentur (BNetzA)." - ) - return ( - f"{mc.label} im Impressum ergänzen " - f"(Pflichtangabe nach {mc.norm})." - ) diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py b/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py new file mode 100644 index 00000000..f7c747b6 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py @@ -0,0 +1,117 @@ +"""Layer-0 Regex-Boost — meine 12 Patterns als deterministische Vor-Stufe +vor dem Keyword-Match aus doc_check_controls. + +Idee (Sprint 1.12, User-Vorgabe 2026-06-09): + - rag_document_checker.py macht Keyword-Match aus den natursprachlichen + pass_criteria der 75 Impressum-MCs in der DB. Wenn 60% der Keywords + matchen → PASS. + - Bei FAIL läuft Embedding-Match (Layer 2). + - MEIN Beitrag (Layer 0): meine 12 enger gefassten Regex-Pattern + laufen ZUERST. Wenn ein Pattern matched → das thematisch passende + MC wird zu PASS überschrieben (auch wenn Keyword-Match unklar war). + - Mapping: jedem meiner field_id → Liste von Schlüsselwörtern wie sie + in pass_criteria stehen würden. Wenn pass_criteria diese Wörter + enthält → dieses MC gehört zu meinem field_id. + +Damit gehen meine 12 Pattern nicht verloren, sondern boost'en das +bestehende System. +""" + +from __future__ import annotations + +import logging + +from .mcs import MCS, detect_automotive, scope_matches + +logger = logging.getLogger(__name__) + + +# Für jedes meiner field_id: welche Wörter erscheinen typisch in +# der pass_criteria der zugehörigen DB-MCs? Wenn diese Wörter im +# pass_criteria gefunden werden, ist es vermutlich derselbe MC. +BOOST_KEYWORDS: dict[str, tuple[str, ...]] = { + "name_anbieter": ( + "rechtsform", "anschrift", "anbieter", "firmensitz", "firmenname", + "diensteanbieter", "verantwortlich", + ), + "kontakt_email": ( + "e-mail", "email", "elektronische", "kontaktmöglichkeit", + "mailadresse", + ), + "kontakt_telefon": ( + "telefon", "rufnummer", "telefonnummer", "phone", "kontaktdaten", + "telekommunikation", + ), + "handelsregister": ( + "handelsregister", "registergericht", "hrb", "registernummer", + ), + "ust_id": ( + "umsatzsteuer", "ust-id", "umsatzsteueridentifikation", "ust-idnr", + ), + "vertretungsberechtigte": ( + "geschäftsführer", "vorstand", "vertretungsberechtigt", + "vertretung", "gesellschafter", + ), + "vertretungsberechtigte_label_korrekt": ( + "deutsche", "bezeichnung", "rechtsform", + ), + "aufsichtsbehoerde": ( + "aufsichtsbehörde", "aufsicht", "behörde", "regulierungsbehörde", + ), + "verantwortlicher_redaktion": ( + "redaktion", "verantwortlich", "rstv", "mstv", + "journalistisch", "publizistisch", + ), + "verbraucher_streitbeilegung": ( + "streitbeilegung", "vsbg", "verbraucherschlichtung", + "schlichtungsstelle", + ), + "berufsangaben": ( + "berufsbezeichnung", "berufsordnung", "kammer", "berufsrecht", + ), + "odr_link": ( + "online-streitbeilegung", "os-plattform", "odr", + "europäische kommission", + ), +} + + +def compute_regex_boosts(text: str, business_scope: set[str]) -> set[str]: + """Welche meiner field_ids hat das Pattern erkannt? + + Returns die Menge an gehit'ten field_ids für die später entschieden + wird ob ein doc_check_control darüber automatisch passed werden kann. + """ + if not text or len(text) < 50: + return set() + hits: set[str] = set() + is_auto = detect_automotive(text) + for mc in MCS: + if not scope_matches(mc, business_scope, is_auto): + continue + if any(p.search(text) for p in mc.patterns): + hits.add(mc.field_id) + return hits + + +def boost_matches_db_mc(boosts: set[str], pass_criteria: list) -> str | None: + """Hat ein gebooster field_id genug Keyword-Überlapp mit den + pass_criteria einer DB-MC, um den MC zu boost'en? + + Returns: field_id (matched), oder None. + Vorsichtig: ≥2 Boost-Keywords müssen im pass_criteria-Text auftauchen, + sonst zu permissiv. + """ + if not boosts or not pass_criteria: + return None + crit_text = " ".join( + str(c) for c in pass_criteria if c + ).lower() + best: tuple[int, str] | None = None + for field_id in boosts: + kws = BOOST_KEYWORDS.get(field_id) or () + match_count = sum(1 for kw in kws if kw in crit_text) + if match_count >= 2: + if best is None or match_count > best[0]: + best = (match_count, field_id) + return best[1] if best else None diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py new file mode 100644 index 00000000..ee722c82 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py @@ -0,0 +1,104 @@ +"""Sprint-1.12 v3-Engine: läuft die volle 4-Layer-Pipeline auf einem +Impressum-Text: + + Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py) + Layer 1 — Keyword-Match aus doc_check_controls.pass_criteria + (75 MCs in DB für Impressum) + Layer 2 — BGE-M3 Embedding-Match als Fallback (im + rag_document_checker integriert) + Layer 3 — Semantic-Validator (LLM) wenn auch Embedding nicht half + (im Agent angefasst, hier nur Ergebnisse durchgereicht) + +Output: Liste Result-Dicts kompatibel mit rag_document_checker (passed, +severity, control_id, regulation, ...). Der Agent konvertiert sie dann +zu Finding-Objekten. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from .regex_boost import boost_matches_db_mc, compute_regex_boosts + +logger = logging.getLogger(__name__) + + +async def run_v3_pipeline( + text: str, + business_scope: set[str], + db_url: str = "", +) -> tuple[list[dict[str, Any]], dict[str, Any]]: + """Returns (results, telemetry). + + results: pro DB-MC ein dict {control_id, passed, severity, ...} + telemetry: counters für Frontend-Anzeige (Layer-Aufschlüsselung) + """ + if not text or len(text) < 100: + return [], {"reason": "text too short"} + + # Layer 0: meine Regex-Patterns + boosts = compute_regex_boosts(text, business_scope) + boost_field_ids = sorted(boosts) + logger.info("v3 Layer-0 boosts: %d hits — %s", + len(boost_field_ids), boost_field_ids) + + # Layer 1+2: bestehender rag_document_checker (Keyword + Embedding) + try: + from compliance.services.rag_document_checker import ( + check_document_with_controls, + ) + results = await check_document_with_controls( + text=text, + doc_type="impressum", + doc_title="Impressum (Agent-Test)", + db_url=db_url, + max_controls=0, + use_agent=False, + business_scope=business_scope, + ) + except Exception as e: + logger.warning("rag_document_checker failed: %s — using boosts only", + e) + results = [] + + layer_1_pass = sum(1 for r in results if r.get("passed")) + layer_1_fail = sum(1 for r in results + if r.get("passed") is False) + + # Layer 0 Override: failed MCs deren pass_criteria zu einem meiner + # gebooster field_ids passt → überschreiben zu PASS + boost_overrides = 0 + for r in results: + if r.get("passed"): + continue + # rag_document_checker nimmt pass_criteria intern weg vor + # dem Return; wir laden sie nochmal (oder bekommen sie via + # 'hint'). Hier rufen wir das per Helper. + crit = r.get("_pass_criteria") or [] + if not crit: + # Fallback: aus dem Hint (= check_question) Boost-Match + # versuchen. + crit = [r.get("hint") or ""] + matched_field = boost_matches_db_mc(boosts, crit) + if matched_field: + r["passed"] = True + r["matched_text"] = ( + f"[regex-boost layer 0 — field {matched_field}]" + ) + existing_hint = r.get("hint") or "" + r["hint"] = (existing_hint + + " (passed via regex-boost layer 0)").strip() + r["source"] = (r.get("source") or "") + "+regex_boost" + boost_overrides += 1 + + telemetry = { + "layer_0_field_hits": len(boost_field_ids), + "layer_0_field_ids": boost_field_ids, + "layer_1_pass": layer_1_pass, + "layer_1_fail": layer_1_fail, + "layer_0_boost_overrides": boost_overrides, + "total_mcs": len(results), + } + logger.info("v3 telemetry: %s", telemetry) + return results, telemetry diff --git a/backend-compliance/tests/test_impressum_v3.py b/backend-compliance/tests/test_impressum_v3.py new file mode 100644 index 00000000..6b603aee --- /dev/null +++ b/backend-compliance/tests/test_impressum_v3.py @@ -0,0 +1,168 @@ +"""Tests für Impressum-Agent v3 (Sprint 1.12). + +Mockt rag_document_checker damit Tests offline laufen + prüft die +Layer-0-Boost-Logik isoliert. +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from compliance.services.specialist_agents import ( + AgentInput, + ImpressumAgent, + Severity, +) +from compliance.services.specialist_agents.impressum.regex_boost import ( + BOOST_KEYWORDS, + boost_matches_db_mc, + compute_regex_boosts, +) + + +TESLA_TEXT = ( + "Tesla Germany GmbH\nLudwig-Prandtl-Strasse 25-29\n12526 Berlin\n" + "E-Mail: info@tesla.com\n" + "Telefon: +49 89 1250 16 800\n" + "Management: Elon Musk\n" + "Handelsregister: HRB 218904 B Charlottenburg\n" + "USt-IdNr: DE123456789\n" +) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_compute_regex_boosts_detects_basic_fields(): + hits = compute_regex_boosts(TESLA_TEXT, business_scope=set()) + # Tesla hat klassische Pflichtangaben + assert "kontakt_email" in hits + assert "kontakt_telefon" in hits + assert "handelsregister" in hits + assert "ust_id" in hits + assert "vertretungsberechtigte" in hits # "Management" + # KFZ-Auto-Detect → aufsichtsbehoerde wäre relevant aber kein + # Pattern getroffen (KBA nicht genannt) + + +def test_compute_regex_boosts_short_text_empty(): + assert compute_regex_boosts("x", business_scope=set()) == set() + + +def test_boost_matches_db_mc_finds_telefon(): + boosts = {"kontakt_telefon"} + pass_crit = [ + "Telefonnummer angeben", + "Erreichbar per Telefon und E-Mail", + ] + matched = boost_matches_db_mc(boosts, pass_crit) + assert matched == "kontakt_telefon" + + +def test_boost_matches_db_mc_returns_none_when_unrelated(): + boosts = {"kontakt_telefon"} + pass_crit = [ + "Cookie-Banner muss zentriert sein", + ] + assert boost_matches_db_mc(boosts, pass_crit) is None + + +def test_boost_keywords_cover_all_field_ids(): + """Jedes mcs.py field_id muss in BOOST_KEYWORDS ein Eintrag haben.""" + from compliance.services.specialist_agents.impressum.mcs import MCS + for mc in MCS: + assert mc.field_id in BOOST_KEYWORDS, ( + f"BOOST_KEYWORDS missing for {mc.field_id}" + ) + + +@pytest.fixture +def mock_v3(monkeypatch): + """Mockt run_v3_pipeline mit deterministischen Fake-Results.""" + async def _fake_pipeline(text, scope, db_url=""): + results = [ + {"control_id": "AUTH-1954-A04", + "passed": True, + "label": "Anbieterkennzeichnung dokumentiert", + "severity": "HIGH", + "regulation": "TMG", + "article": "§ 5", + "hint": "", + "matched_text": "Tesla Germany GmbH", + "source": "keyword_match"}, + {"control_id": "DATA-2786-A04", + "passed": False, + "label": "Freiwilligkeit der TDDDG-Einwilligungen", + "severity": "MEDIUM", + "regulation": "TDDDG", + "article": "§ 25", + "hint": "Bitte Freiwilligkeit dokumentieren", + "matched_text": "", + "source": ""}, + ] + telemetry = { + "layer_0_field_hits": 5, + "layer_0_field_ids": ["kontakt_email", "kontakt_telefon", + "handelsregister", "ust_id", + "vertretungsberechtigte"], + "layer_1_pass": 1, + "layer_1_fail": 1, + "layer_0_boost_overrides": 0, + "total_mcs": 2, + } + return results, telemetry + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.run_v3_pipeline", + _fake_pipeline, + ) + async def _no_validator(*a, **kw): return {} + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.validate_present", + _no_validator, + ) + + +def test_agent_uses_db_mcs(mock_v3): + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput(doc_type="impressum", + text=TESLA_TEXT))) + db_mc_findings = [f for f in out.findings + if f.check_id.startswith("DBMC-")] + assert len(db_mc_findings) == 1 + assert db_mc_findings[0].check_id == "DBMC-DATA-2786-A04" + assert db_mc_findings[0].severity == Severity.MEDIUM.value + assert "TDDDG" in db_mc_findings[0].norm + + +def test_agent_emits_boost_coverage(mock_v3): + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput(doc_type="impressum", + text=TESLA_TEXT))) + # 2 DB-MCs + 12 Pattern-Boost-Slots = 14 coverage entries + assert out.mc_total >= 14 + boost_ok = [c for c in out.mc_coverage + if c.mc_id.startswith("IMP-MC-") and c.status == "ok"] + assert len(boost_ok) == 5 # 5 boost_ids im fake + + +def test_agent_notes_telemetry(mock_v3): + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput(doc_type="impressum", + text=TESLA_TEXT))) + assert "v3-pipeline" in out.notes + assert "Pattern-Boosts" in out.notes + + +def test_short_text_skipped(): + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput(doc_type="impressum", text="x"))) + assert all(c.status == "skipped" for c in out.mc_coverage) + assert not out.findings + + +def test_agent_version_is_three(): + agent = ImpressumAgent() + assert agent.agent_version == "3.0"