diff --git a/backend-compliance/compliance/services/specialist_agents/_pattern_library.py b/backend-compliance/compliance/services/specialist_agents/_pattern_library.py new file mode 100644 index 00000000..3747d36f --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_pattern_library.py @@ -0,0 +1,235 @@ +"""Auto-Learning Pattern Library. + +User-Vorgabe 2026-06-09: jedes Label das der Semantic-Validator +findet (z.B. 'Telefonnr.' für kontakt_telefon) wird als auto- +generated Pattern persistiert. Beim nächsten Run prüft der Agent +zuerst die gelernten Patterns — der LLM-Call wird nur noch für +WIRKLICH neue Labels gebraucht. + +Storage: JSON-Datei (default /tmp/breakpilot/agent_learned_patterns.json). +Format: + { + "version": "1", + "updated_at": "2026-06-09T08:30:00Z", + "patterns": [ + { + "field_id": "kontakt_telefon", + "label_used": "Telefonnr.", + "regex_pattern": "\\bTelefonnr\\.?\\s*[:.\\s]?\\s*[\\+\\d]", + "first_seen": "2026-06-09T08:30:00Z", + "last_seen": "2026-06-09T08:30:00Z", + "observed_count": 1, + "confidence_sum": 0.9, + "agent_id": "impressum" + } + ] + } + +Lifecycle: + - record(): SVL-Treffer aufnehmen oder zähler erhöhen + - load_patterns_for(field_id, agent_id): kompilierte Patterns liefern + - prune_low_confidence(): nach 100 Runs Patterns mit <0.5 Avg- + Confidence rauswerfen (Halluzinations-Schutz) + +NOT thread-safe — eine Instanz pro Backend-Process. Reads gehen über +Cache mit mtime-Invalidierung. +""" + +from __future__ import annotations + +import json +import logging +import os +import re +import threading +from datetime import datetime, timezone +from pathlib import Path +from typing import Pattern + + +logger = logging.getLogger(__name__) + + +def _library_path() -> Path: + """Resolved at call time so tests can monkeypatch the env var.""" + return Path(os.environ.get( + "AGENT_PATTERN_LIBRARY", + "/tmp/breakpilot/agent_learned_patterns.json", + )) + + +_lock = threading.Lock() +_cache: dict[str, list[dict]] = {} +_cache_mtime: float = 0.0 + + +def _load_raw() -> dict: + p = _library_path() + if not p.exists(): + return {"version": "1", "patterns": []} + try: + return json.loads(p.read_text()) + except Exception as e: + logger.warning("pattern library corrupt, reset: %s", e) + return {"version": "1", "patterns": []} + + +def _save_raw(data: dict) -> None: + p = _library_path() + p.parent.mkdir(parents=True, exist_ok=True) + data["updated_at"] = datetime.now(timezone.utc).isoformat() + tmp = p.with_suffix(".json.tmp") + tmp.write_text(json.dumps(data, indent=2, default=str)) + tmp.replace(p) + + +def _label_to_regex(label: str) -> str: + """Generates a permissive regex from a label string. + + 'Telefonnr.' → r"\\bTelefonnr\\.?\\s*[:.\\s(]?\\s*[\\+\\d]" + 'Funkanschluss' → r"\\bFunkanschluss\\s*[:.\\s(]?\\s*[\\+\\d]" + 'Geschäftsleitung' → r"\\bGeschäftsleitung\\s*[:.\\s(]" + """ + base = re.escape(label.strip()) + # Strip escape of optional trailing period — we make it optional + if base.endswith(r"\."): + base = base[:-2] + r"\.?" + # Heuristik: Telefon-Felder enden mit Nummer; sonstige mit Trennzeichen + label_lc = label.lower() + if any(k in label_lc for k in ("tel", "phone", "fon", "anschluss", + "rufnummer", "rufnr")): + return rf"\b{base}\s*[:.\s(]?\s*[\+\d]" + if any(k in label_lc for k in ("email", "e-mail", "mail")): + return rf"\b{base}\s*[:.\s(]?\s*[\w.+-]+@" + return rf"\b{base}\s*[:.\s(]" + + +def _invalidate_cache() -> None: + global _cache, _cache_mtime + _cache = {} + _cache_mtime = 0.0 + + +def _refresh_cache() -> None: + """Re-read library if file mtime changed.""" + global _cache, _cache_mtime + p = _library_path() + if not p.exists(): + _cache = {} + _cache_mtime = 0.0 + return + mtime = p.stat().st_mtime + if mtime == _cache_mtime and _cache: + return + data = _load_raw() + new_cache: dict[str, list[dict]] = {} + for pat in data.get("patterns", []): + key = f"{pat.get('agent_id', '')}/{pat.get('field_id', '')}" + new_cache.setdefault(key, []).append(pat) + _cache = new_cache + _cache_mtime = mtime + + +def record( + field_id: str, + label_used: str, + confidence: float, + agent_id: str, +) -> None: + """Persist a learned label. Idempotent — increments observed_count + if (field_id, label_used, agent_id) already exists.""" + if not field_id or not label_used or not agent_id: + return + label_used = label_used.strip()[:60] + if len(label_used) < 2: + return + with _lock: + data = _load_raw() + patterns = data.setdefault("patterns", []) + now = datetime.now(timezone.utc).isoformat() + match = None + for p in patterns: + if (p.get("field_id") == field_id and + p.get("label_used", "").strip().lower() + == label_used.lower() and + p.get("agent_id") == agent_id): + match = p + break + if match: + match["observed_count"] = int(match.get("observed_count", 0)) + 1 + match["confidence_sum"] = ( + float(match.get("confidence_sum", 0.0)) + float(confidence) + ) + match["last_seen"] = now + else: + patterns.append({ + "field_id": field_id, + "label_used": label_used, + "regex_pattern": _label_to_regex(label_used), + "first_seen": now, + "last_seen": now, + "observed_count": 1, + "confidence_sum": float(confidence), + "agent_id": agent_id, + }) + _save_raw(data) + _invalidate_cache() + + +def load_patterns_for( + field_id: str, + agent_id: str, + min_observed: int = 1, + min_avg_confidence: float = 0.5, +) -> list[Pattern[str]]: + """Returns compiled regex patterns gelernt für (field_id, agent_id).""" + _refresh_cache() + key = f"{agent_id}/{field_id}" + raws = _cache.get(key, []) + out: list[Pattern[str]] = [] + for p in raws: + obs = int(p.get("observed_count", 0)) + conf_sum = float(p.get("confidence_sum", 0.0)) + avg = conf_sum / obs if obs else 0.0 + if obs < min_observed or avg < min_avg_confidence: + continue + try: + out.append(re.compile(p["regex_pattern"], re.IGNORECASE)) + except Exception: + continue + return out + + +def list_all() -> list[dict]: + """Debug/Frontend: liefert alle gelernten Patterns.""" + _refresh_cache() + flat: list[dict] = [] + for key, patterns in _cache.items(): + for p in patterns: + obs = int(p.get("observed_count", 0)) + avg = (float(p.get("confidence_sum", 0.0)) / obs + if obs else 0.0) + flat.append({**p, "avg_confidence": round(avg, 3)}) + return sorted(flat, key=lambda x: x.get("observed_count", 0), + reverse=True) + + +def prune_low_confidence(min_avg: float = 0.5, + min_runs_before_prune: int = 100) -> int: + """Halluzinations-Schutz: löscht Patterns mit zu niedriger + Avg-Confidence nach ausreichend Runs.""" + with _lock: + data = _load_raw() + before = len(data.get("patterns", [])) + kept = [] + for p in data.get("patterns", []): + obs = int(p.get("observed_count", 0)) + avg = (float(p.get("confidence_sum", 0.0)) / obs + if obs else 0.0) + if obs >= min_runs_before_prune and avg < min_avg: + continue + kept.append(p) + data["patterns"] = kept + _save_raw(data) + _invalidate_cache() + return before - len(kept) diff --git a/backend-compliance/compliance/services/specialist_agents/_semantic_validator.py b/backend-compliance/compliance/services/specialist_agents/_semantic_validator.py new file mode 100644 index 00000000..0464c7b0 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_semantic_validator.py @@ -0,0 +1,156 @@ +"""Semantic-Validator — LLM-Layer der HIGH-Findings semantisch prüft. + +User-Vorgabe 2026-06-09: statt unendlich Regex-Pattern für jede +Schreibweise (Tel/Telefon/Telef./Telefonnr./Telefonnummer/Phone/Fon) +zu pflegen, nutzen wir einen LLM-Pass als 2. Layer: + + 1. MC-Pattern fängt 95% der Standard-Schreibweisen. + 2. Bei MC-MISS einmaliger LLM-Call: "ist die Pflichtangabe semantisch + doch da, nur unter abweichendem Label?" + 3. Wenn ja: HIGH-Finding wird zu LOW "Best-Practice Umbenennung". + 4. Wenn nein: HIGH-Finding bleibt. + +Vorteile: + - Pattern bleiben schlank + - Output für Kunden ist konkret: "Bitte 'Management' in 'Geschäftsführer' + umbenennen" + - 1 LLM-Call pro Slot statt N → Cost-effizient + - Self-correcting: Pattern-Lücken werden vom LLM gefangen +""" + +from __future__ import annotations + +import logging + +from ._escalation import cascade + +logger = logging.getLogger(__name__) + + +# Standard-Bezeichnungen pro field_id — der Soll-Wortlaut den der +# Kunde verwenden sollte (für die Umbenennungs-Empfehlung). +STANDARD_LABELS: dict[str, str] = { + # Impressum + "kontakt_telefon": "Telefon", + "kontakt_email": "E-Mail", + "handelsregister": "Handelsregister", + "ust_id": "Umsatzsteuer-Identifikationsnummer (USt-IdNr.)", + "vertretungsberechtigte": "Geschäftsführer (bei GmbH) / Vorstand (bei AG)", + "vertretungsberechtigte_label_korrekt": + "Geschäftsführer (bei GmbH) / Vorstand (bei AG)", + "name_anbieter": "Anbieter / Anschrift", + "aufsichtsbehoerde": "Aufsichtsbehörde", + "verantwortlicher_redaktion": "Inhaltlich Verantwortlicher nach § 18 MStV", + "verbraucher_streitbeilegung": "Verbraucherstreitbeilegung (VSBG)", + "berufsangaben": "Berufsbezeichnung", + "odr_link": "OS-Plattform der EU", + # Cookie-Policy + "categories_named": "Cookie-Kategorien (essentiell, funktional, analytics, marketing)", + "purpose_described": "Verarbeitungszweck", + "retention_duration": "Speicherdauer / Laufzeit", + "vendor_recipients": "Empfänger / Drittanbieter", + "opt_out_mechanism": "Opt-Out-Mechanismus", + "banner_reopen": "Cookie-Einstellungen ändern", + "version_date": "Stand / Letzte Aktualisierung", + "third_country_transfer": "Drittland-Übermittlung (Schrems II)", + "legal_basis": "Rechtsgrundlage (Art. 6 DSGVO / § 25 TDDDG)", + "cookie_table_or_list": "Cookie-Tabelle", + "dpo_contact": "Datenschutzbeauftragter (DSB)", + "browser_settings_hint": "Browser-Einstellungen", +} + + +_SYSTEM_PROMPT = """Du bist Compliance-Pruefer. Aufgabe: ein Dokument +und eine Liste fehlender Pflichtangaben pruefen. Fuer JEDE Pflichtangabe +entscheiden: ist sie inhaltlich vorhanden, vielleicht unter einem +abweichenden Label/Schreibweise? + +WICHTIG: + - 'Vorhanden' nur wenn der Inhalt eindeutig erkennbar ist + (z.B. eine Telefonnummer mit Vorwahl, nicht nur das Wort 'Telefon'). + - Bei unsicher: 'found': false zurueckgeben. + - Wenn vorhanden: das tatsaechlich verwendete Label angeben + (z.B. 'Management' statt 'Geschaeftsfuehrer', 'Fon' statt 'Telefon'). + +Antwort NUR als JSON: +{ + "results": [ + {"field_id": "...", + "found": true|false, + "label_used": "tatsächlich verwendetes Label", + "evidence": "kurzes wörtliches Zitat", + "confidence": 0.0-1.0} + ] +} +""" + + +async def validate_present( + text: str, + missing_fields: list[tuple[str, str]], +) -> dict[str, dict]: + """Prüft per LLM ob die genannten Felder semantisch doch im Text sind. + + Args: + text: Volltext des Dokuments. + missing_fields: Liste (field_id, beschreibung) die das MC-Pattern + NICHT gefunden hat. + + Returns: + dict[field_id, {"found", "label_used", "evidence", "confidence"}] + Leeres Dict wenn LLM nicht erreichbar oder unsicher. + """ + if not missing_fields or len(text) < 100: + return {} + lines = ["FEHLENDE PFLICHTANGABEN (zum Pruefen):"] + for fid, label in missing_fields: + lines.append(f" - {fid}: {label}") + lines.append("") + lines.append(f"DOKUMENT-TEXT:\n{text[:4000]}") + lines.append("") + lines.append("Liste pro field_id ob die Pflichtangabe vorhanden " + "ist (auch unter abweichendem Label). Nur JSON.") + user_prompt = "\n".join(lines) + res, _logs = await cascade(_SYSTEM_PROMPT, user_prompt) + if res is None: + return {} + parsed = res.parsed if isinstance(res.parsed, (dict, list)) else None + if parsed is None: + return {} + rows = (parsed.get("results") + if isinstance(parsed, dict) else parsed) + if not isinstance(rows, list): + return {} + out: dict[str, dict] = {} + for row in rows: + if not isinstance(row, dict): + continue + fid = str(row.get("field_id") or "") + if not fid: + continue + out[fid] = { + "found": bool(row.get("found")), + "label_used": str(row.get("label_used") or "")[:60], + "evidence": str(row.get("evidence") or "")[:200], + "confidence": float(row.get("confidence") or 0.5), + } + return out + + +def standard_label(field_id: str) -> str: + """Soll-Bezeichnung für eine Pflichtangabe.""" + return STANDARD_LABELS.get(field_id, field_id) + + +def build_rename_action( + field_id: str, label_used: str, +) -> str: + """Erzeugt die Best-Practice-Umbenennungs-Empfehlung.""" + std = standard_label(field_id) + return ( + f"Best-Practice Umbenennung: '{label_used}' → '{std}'. " + f"Inhalt ist vorhanden, nur das Label weicht von der " + f"Standard-Terminologie ab. Eine einheitliche Bezeichnung " + f"erleichtert dem Nutzer das Auffinden der Pflichtangabe und " + f"bei Behörden-Prüfungen die Anerkennung." + ) diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py index f93e3516..05e6290c 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py @@ -28,7 +28,12 @@ from .._base import ( lint_output, ) from .._escalation import cascade +from .._pattern_library import load_patterns_for, record as record_pattern from .._rollup import rollup +from .._semantic_validator import ( + build_rename_action, + validate_present, +) from .mcs import MC_IDS, MCS, detect_automotive, scope_matches logger = logging.getLogger(__name__) @@ -90,6 +95,18 @@ class ImpressumAgent(BaseSpecialistAgent): )) continue found = any(p.search(text) for p in mc.patterns) + if not found: + # 1.11: Auto-Learning — gelernte Labels probieren. + # Wenn ein gelerntes Pattern matcht: als OK werten + + # Coverage-Reason markiert das. + learned = load_patterns_for(mc.field_id, self.agent_id) + if any(lp.search(text) for lp in learned): + coverage.append(McCoverage( + mc_id=mc.mc_id, status="ok", + reason=f"learned-pattern matched " + f"({len(learned)} gelernt)", + )) + continue if found: coverage.append(McCoverage( mc_id=mc.mc_id, status="ok", @@ -122,6 +139,11 @@ class ImpressumAgent(BaseSpecialistAgent): reason="missing", )) + # Semantic-Validator: prüft per LLM ob HIGH-Missings doch + # vorhanden sind (unter abweichendem Label). Demoted HIGH→LOW + # mit Rename-Empfehlung wenn ja. User-Vorgabe 2026-06-09. + await self._semantic_demote(text, mc_findings, coverage) + # Eskalation: für die identifizierten Lücken kann ein LLM # zusätzliche Tiefen-Findings liefern (z.B. "Geschäftsführer # genannt, aber ohne Nachname"). Confidence der MC-Findings @@ -147,6 +169,87 @@ class ImpressumAgent(BaseSpecialistAgent): start, mc_findings, esc_logs, coverage, confidence=overall, ) + async def _semantic_demote( + self, + text: str, + findings: list[Finding], + coverage: list[McCoverage], + ) -> None: + """LLM-Layer für HIGH/MEDIUM-missings — demote zu LOW wenn da.""" + candidates: list[tuple[str, str, Finding]] = [] + for f in findings: + # Demote-Kandidaten: HIGH oder MEDIUM-Pattern-Misses. + # LOW/INFO bleiben unverändert (sind selbst schon Best- + # Practice-Empfehlungen). + if f.severity not in (Severity.HIGH.value, + Severity.MEDIUM.value): + continue + if f.severity_reason != "missing": + continue + # Suche zugehöriges MC für die Beschreibung + mc = next((m for m in MCS if m.field_id == f.field_id), None) + label = mc.label if mc else f.field_id + candidates.append((f.field_id, label, f)) + if not candidates: + return + result = await validate_present( + text, [(c[0], c[1]) for c in candidates], + ) + if not result: + return + for field_id, label, finding in candidates: + row = result.get(field_id) + if not row or not row.get("found"): + continue + if row.get("confidence", 0) < 0.6: + continue + label_used = row.get("label_used") or "abweichendes Label" + # Demote in-place + finding.severity = Severity.LOW.value + finding.severity_reason = "label_mismatch" + finding.title = ( + f"Label '{label_used}' weicht von Standard-" + f"Bezeichnung ab" + ) + finding.evidence = row.get("evidence", "")[:200] + finding.action = build_rename_action(field_id, label_used) + conf = float(row.get("confidence") or 0.8) + finding.confidence = conf + finding.sources.append(EvidenceSource( + source_type=SourceType.LLM_LOCAL, + source_id="semantic_validator", + detail=f"LLM-confirmed: '{label_used}'", + confidence=conf, + )) + # 1.11: Auto-Learning — Label-Match in der Library + # persistieren. Beim nächsten Run wird das gelernte + # Pattern bereits beim MC-Pass berücksichtigt, ohne + # erneuten LLM-Call. + try: + record_pattern( + field_id=field_id, + label_used=label_used, + confidence=conf, + agent_id=self.agent_id, + ) + except Exception as e: + import logging + logging.getLogger(__name__).warning( + "pattern-library record failed: %s", e, + ) + # Update coverage status + for c in coverage: + if c.mc_id and c.mc_id.endswith(field_id.upper()): + continue + # Robuster: nach mc_id über MCS + mc = next((m for m in MCS if m.field_id == field_id), None) + if mc: + cov = next((c for c in coverage + if c.mc_id == mc.mc_id), None) + if cov: + cov.status = "low" + cov.reason = f"label_mismatch: '{label_used}'" + async def _maybe_escalate( self, text: str, scope: set[str], ) -> tuple[list[Finding], list[EscalationLog]]: diff --git a/backend-compliance/tests/test_pattern_library.py b/backend-compliance/tests/test_pattern_library.py new file mode 100644 index 00000000..3fe55242 --- /dev/null +++ b/backend-compliance/tests/test_pattern_library.py @@ -0,0 +1,108 @@ +"""Tests für die Auto-Learning-Pattern-Library.""" + +from __future__ import annotations + +import json + +import pytest + + +@pytest.fixture +def tmp_lib(tmp_path, monkeypatch): + p = tmp_path / "patterns.json" + monkeypatch.setenv("AGENT_PATTERN_LIBRARY", str(p)) + import compliance.services.specialist_agents._pattern_library as lib + lib._invalidate_cache() + yield lib, p + lib._invalidate_cache() + + +def test_record_creates_file(tmp_lib): + lib, p = tmp_lib + assert not p.exists() + lib.record("kontakt_telefon", "Telefonnr.", 0.9, "impressum") + assert p.exists() + data = json.loads(p.read_text()) + assert len(data["patterns"]) == 1 + assert data["patterns"][0]["label_used"] == "Telefonnr." + assert data["patterns"][0]["observed_count"] == 1 + + +def test_record_increments_existing(tmp_lib): + lib, _ = tmp_lib + lib.record("kontakt_telefon", "Telefonnr.", 0.9, "impressum") + lib.record("kontakt_telefon", "Telefonnr.", 0.85, "impressum") + lib.record("kontakt_telefon", "telefonnr.", 0.8, "impressum") # case-i + raws = lib.list_all() + assert len(raws) == 1 + assert raws[0]["observed_count"] == 3 + + +def test_record_separate_per_field_id(tmp_lib): + lib, _ = tmp_lib + lib.record("kontakt_telefon", "Tel", 0.9, "impressum") + lib.record("kontakt_email", "Tel", 0.9, "impressum") + assert len(lib.list_all()) == 2 + + +def test_record_empty_inputs_noop(tmp_lib): + lib, p = tmp_lib + lib.record("", "Tel", 0.9, "impressum") + lib.record("kontakt_telefon", "", 0.9, "impressum") + lib.record("kontakt_telefon", "Tel", 0.9, "") + assert not p.exists() + + +def test_load_patterns_returns_compiled_regex(tmp_lib): + lib, _ = tmp_lib + lib.record("kontakt_telefon", "Telefonnr.", 0.9, "impressum") + pats = lib.load_patterns_for("kontakt_telefon", "impressum") + assert len(pats) == 1 + m = pats[0].search("Hier: Telefonnr. 0761/12345") + assert m is not None + + +def test_load_patterns_filters_low_confidence(tmp_lib): + lib, _ = tmp_lib + lib.record("kontakt_telefon", "WeakLabel", 0.3, "impressum") + pats = lib.load_patterns_for( + "kontakt_telefon", "impressum", min_avg_confidence=0.5, + ) + assert pats == [] + # observed_count filter + pats = lib.load_patterns_for( + "kontakt_telefon", "impressum", min_observed=2, + ) + assert pats == [] + + +def test_label_to_regex_telefon(): + from compliance.services.specialist_agents._pattern_library import ( + _label_to_regex, + ) + rx = _label_to_regex("Telefonnr.") + import re + assert re.search(rx, "Telefonnr. 0761/12345", re.I) + assert re.search(rx, "Telefonnr 0761", re.I) + + +def test_label_to_regex_email(): + from compliance.services.specialist_agents._pattern_library import ( + _label_to_regex, + ) + rx = _label_to_regex("Mailadresse") + import re + assert re.search(rx, "Mailadresse: x@y.de", re.I) + + +def test_prune_low_confidence_keeps_recent(tmp_lib): + lib, _ = tmp_lib + lib.record("kontakt_telefon", "Tel", 0.9, "impressum") + pruned = lib.prune_low_confidence(min_runs_before_prune=100) + assert pruned == 0 # Nur einmal observed → noch nicht prunen + assert len(lib.list_all()) == 1 + + +def test_load_patterns_for_nonexistent_returns_empty(tmp_lib): + lib, _ = tmp_lib + assert lib.load_patterns_for("ghost", "impressum") == [] diff --git a/backend-compliance/tests/test_semantic_validator.py b/backend-compliance/tests/test_semantic_validator.py new file mode 100644 index 00000000..fdb4dc20 --- /dev/null +++ b/backend-compliance/tests/test_semantic_validator.py @@ -0,0 +1,119 @@ +"""Tests für den Semantic-Validator-Layer.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from compliance.services.specialist_agents import AgentInput, ImpressumAgent +from compliance.services.specialist_agents._semantic_validator import ( + STANDARD_LABELS, + build_rename_action, + standard_label, + validate_present, +) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_standard_labels_cover_impressum_fields(): + """Alle Impressum-Pflichtangaben müssen ein Standard-Label haben.""" + for fid in ( + "kontakt_telefon", "kontakt_email", "vertretungsberechtigte", + "handelsregister", "ust_id", "name_anbieter", + ): + assert fid in STANDARD_LABELS, f"missing standard label: {fid}" + + +def test_build_rename_action_includes_old_and_new(): + a = build_rename_action("kontakt_telefon", "Telefonnr.") + assert "Telefonnr." in a + assert "Telefon" in a + assert "Best-Practice" in a or "Umbenennung" in a + + +def test_standard_label_falls_back_to_field_id(): + assert standard_label("kontakt_telefon") == "Telefon" + assert standard_label("ghost_field") == "ghost_field" + + +def test_validate_present_short_text_returns_empty(): + out = _run(validate_present( + "x", [("kontakt_telefon", "Telefon")], + )) + assert out == {} + + +def test_validate_present_no_fields_returns_empty(): + out = _run(validate_present("Long impressum text" * 100, [])) + assert out == {} + + +def test_semantic_demotion_high_to_low(monkeypatch): + """Wenn LLM bestätigt dass Pflichtangabe da ist: HIGH→LOW. + + Test-Setup: Impressum-Text OHNE jegliche Telefon-Markierung + (Pattern matched nicht). LLM-Mock behauptet aber 'Funkanschluss' + wäre ein abweichendes Label für die Telefonnummer. + """ + from compliance.services.specialist_agents._escalation import ( + EscalationResult, SourceType, + ) + from compliance.services.specialist_agents._base import EscalationLog + + async def _fake_cascade(sys_prompt, user_prompt, + expect_json=True, skip_ovh=False): + # Nur auf den SVL-Prompt reagieren + if "FEHLENDE PFLICHTANGABEN" not in user_prompt: + return None, [] + log = EscalationLog( + stage=SourceType.LLM_LOCAL, model="qwen2.5:7b", + duration_ms=42, success=True, + ) + res = EscalationResult( + content='{"results":[]}', + stage=SourceType.LLM_LOCAL, + model="qwen2.5:7b", + log=log, + parsed={"results": [{ + "field_id": "kontakt_telefon", + "found": True, + "label_used": "Funkanschluss", + "evidence": "Funkanschluss 0761/123456", + "confidence": 0.9, + }]}, + ) + return res, [log] + monkeypatch.setattr( + "compliance.services.specialist_agents._semantic_validator.cascade", + _fake_cascade, + ) + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.cascade", + _fake_cascade, + ) + # Text OHNE Telefon-Label → MC matched nicht → HIGH-Finding + text = ( + "Beispiel GmbH\nMusterstr. 1\n12345 Berlin\n" + "E-Mail: x@y.de\nFunkanschluss 0761/123456\n" + "Geschäftsführer: Max Mustermann\n" + "Handelsregister Berlin HRB 12345\n" + "USt-IdNr: DE123456789" + ) + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput(doc_type="impressum", text=text))) + telefon_findings = [f for f in out.findings + if f.field_id == "kontakt_telefon"] + assert telefon_findings, "expected MC-miss → finding" + f = telefon_findings[0] + # Erwartet: SVL hat demoted zu LOW + assert f.severity == "LOW", ( + f"Erwartet: LOW nach semantic-demote, got: {f.severity}. " + f"Finding: {f}" + ) + assert f.severity_reason == "label_mismatch" + assert "Funkanschluss" in f.action + assert "Telefon" in f.action