feat(agents): Semantic-Validator + Auto-Learning-Pattern-Library
CI / detect-changes (push) Successful in 5s
CI / branch-name (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped

Sprint 1.10 — Semantic-Validator (User-Vorgabe 2026-06-09):
  - Statt unendlich Regex-Pattern fuer jede Schreibweise zu pflegen
    (Tel/Telefon/Telefonnr/Phone/Fon/Funkanschluss/…), nutzen wir
    bei MC-MISS einen LLM-Call: 'Ist die Pflichtangabe semantisch
    doch da, nur unter abweichendem Label?'
  - Bei LLM-Treffer: HIGH/MEDIUM-Finding wird zu LOW demoted,
    Empfehlung wird zu 'Best-Practice Umbenennung: Management ->
    Geschaeftsfuehrer' (mit STANDARD_LABELS-Mapping).
  - 1 LLM-Call pro Slot statt N: cost-effizient.

Sprint 1.11 — Auto-Learning-Pattern-Library:
  - Jedes Label das SVL findet wird in JSON persistiert:
    /tmp/breakpilot/agent_learned_patterns.json
  - Beim naechsten Run prueft der Agent zuerst gelernte Patterns
    BEVOR er das HIGH-Finding emittiert -> kein LLM-Call mehr.
  - Asymptotisch 0 LLM-Calls fuer haeufige Edge-Cases.
  - Halluzinations-Schutz: prune_low_confidence() loescht Patterns
    mit <0.5 Avg-Confidence nach 100 Beobachtungen.
  - Idempotent: gleicher (field_id, label, agent) -> Counter +1.

Tests: 40/40 gruen (10 Pattern-Library + 7 SVL + 13 GT + 11 v2).

STANDARD_LABELS-Map deckt Impressum + Cookie-Policy. Spaeter
erweiterbar fuer DSE, AGB, Widerrufs-Agenten.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-09 08:16:21 +02:00
parent 882e4f9798
commit ca8c388f37
5 changed files with 721 additions and 0 deletions
@@ -0,0 +1,235 @@
"""Auto-Learning Pattern Library.
User-Vorgabe 2026-06-09: jedes Label das der Semantic-Validator
findet (z.B. 'Telefonnr.' für kontakt_telefon) wird als auto-
generated Pattern persistiert. Beim nächsten Run prüft der Agent
zuerst die gelernten Patterns — der LLM-Call wird nur noch für
WIRKLICH neue Labels gebraucht.
Storage: JSON-Datei (default /tmp/breakpilot/agent_learned_patterns.json).
Format:
{
"version": "1",
"updated_at": "2026-06-09T08:30:00Z",
"patterns": [
{
"field_id": "kontakt_telefon",
"label_used": "Telefonnr.",
"regex_pattern": "\\bTelefonnr\\.?\\s*[:.\\s]?\\s*[\\+\\d]",
"first_seen": "2026-06-09T08:30:00Z",
"last_seen": "2026-06-09T08:30:00Z",
"observed_count": 1,
"confidence_sum": 0.9,
"agent_id": "impressum"
}
]
}
Lifecycle:
- record(): SVL-Treffer aufnehmen oder zähler erhöhen
- load_patterns_for(field_id, agent_id): kompilierte Patterns liefern
- prune_low_confidence(): nach 100 Runs Patterns mit <0.5 Avg-
Confidence rauswerfen (Halluzinations-Schutz)
NOT thread-safe — eine Instanz pro Backend-Process. Reads gehen über
Cache mit mtime-Invalidierung.
"""
from __future__ import annotations
import json
import logging
import os
import re
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Pattern
logger = logging.getLogger(__name__)
def _library_path() -> Path:
"""Resolved at call time so tests can monkeypatch the env var."""
return Path(os.environ.get(
"AGENT_PATTERN_LIBRARY",
"/tmp/breakpilot/agent_learned_patterns.json",
))
_lock = threading.Lock()
_cache: dict[str, list[dict]] = {}
_cache_mtime: float = 0.0
def _load_raw() -> dict:
p = _library_path()
if not p.exists():
return {"version": "1", "patterns": []}
try:
return json.loads(p.read_text())
except Exception as e:
logger.warning("pattern library corrupt, reset: %s", e)
return {"version": "1", "patterns": []}
def _save_raw(data: dict) -> None:
p = _library_path()
p.parent.mkdir(parents=True, exist_ok=True)
data["updated_at"] = datetime.now(timezone.utc).isoformat()
tmp = p.with_suffix(".json.tmp")
tmp.write_text(json.dumps(data, indent=2, default=str))
tmp.replace(p)
def _label_to_regex(label: str) -> str:
"""Generates a permissive regex from a label string.
'Telefonnr.' → r"\\bTelefonnr\\.?\\s*[:.\\s(]?\\s*[\\+\\d]"
'Funkanschluss' → r"\\bFunkanschluss\\s*[:.\\s(]?\\s*[\\+\\d]"
'Geschäftsleitung' → r"\\bGeschäftsleitung\\s*[:.\\s(]"
"""
base = re.escape(label.strip())
# Strip escape of optional trailing period — we make it optional
if base.endswith(r"\."):
base = base[:-2] + r"\.?"
# Heuristik: Telefon-Felder enden mit Nummer; sonstige mit Trennzeichen
label_lc = label.lower()
if any(k in label_lc for k in ("tel", "phone", "fon", "anschluss",
"rufnummer", "rufnr")):
return rf"\b{base}\s*[:.\s(]?\s*[\+\d]"
if any(k in label_lc for k in ("email", "e-mail", "mail")):
return rf"\b{base}\s*[:.\s(]?\s*[\w.+-]+@"
return rf"\b{base}\s*[:.\s(]"
def _invalidate_cache() -> None:
global _cache, _cache_mtime
_cache = {}
_cache_mtime = 0.0
def _refresh_cache() -> None:
"""Re-read library if file mtime changed."""
global _cache, _cache_mtime
p = _library_path()
if not p.exists():
_cache = {}
_cache_mtime = 0.0
return
mtime = p.stat().st_mtime
if mtime == _cache_mtime and _cache:
return
data = _load_raw()
new_cache: dict[str, list[dict]] = {}
for pat in data.get("patterns", []):
key = f"{pat.get('agent_id', '')}/{pat.get('field_id', '')}"
new_cache.setdefault(key, []).append(pat)
_cache = new_cache
_cache_mtime = mtime
def record(
field_id: str,
label_used: str,
confidence: float,
agent_id: str,
) -> None:
"""Persist a learned label. Idempotent — increments observed_count
if (field_id, label_used, agent_id) already exists."""
if not field_id or not label_used or not agent_id:
return
label_used = label_used.strip()[:60]
if len(label_used) < 2:
return
with _lock:
data = _load_raw()
patterns = data.setdefault("patterns", [])
now = datetime.now(timezone.utc).isoformat()
match = None
for p in patterns:
if (p.get("field_id") == field_id and
p.get("label_used", "").strip().lower()
== label_used.lower() and
p.get("agent_id") == agent_id):
match = p
break
if match:
match["observed_count"] = int(match.get("observed_count", 0)) + 1
match["confidence_sum"] = (
float(match.get("confidence_sum", 0.0)) + float(confidence)
)
match["last_seen"] = now
else:
patterns.append({
"field_id": field_id,
"label_used": label_used,
"regex_pattern": _label_to_regex(label_used),
"first_seen": now,
"last_seen": now,
"observed_count": 1,
"confidence_sum": float(confidence),
"agent_id": agent_id,
})
_save_raw(data)
_invalidate_cache()
def load_patterns_for(
field_id: str,
agent_id: str,
min_observed: int = 1,
min_avg_confidence: float = 0.5,
) -> list[Pattern[str]]:
"""Returns compiled regex patterns gelernt für (field_id, agent_id)."""
_refresh_cache()
key = f"{agent_id}/{field_id}"
raws = _cache.get(key, [])
out: list[Pattern[str]] = []
for p in raws:
obs = int(p.get("observed_count", 0))
conf_sum = float(p.get("confidence_sum", 0.0))
avg = conf_sum / obs if obs else 0.0
if obs < min_observed or avg < min_avg_confidence:
continue
try:
out.append(re.compile(p["regex_pattern"], re.IGNORECASE))
except Exception:
continue
return out
def list_all() -> list[dict]:
"""Debug/Frontend: liefert alle gelernten Patterns."""
_refresh_cache()
flat: list[dict] = []
for key, patterns in _cache.items():
for p in patterns:
obs = int(p.get("observed_count", 0))
avg = (float(p.get("confidence_sum", 0.0)) / obs
if obs else 0.0)
flat.append({**p, "avg_confidence": round(avg, 3)})
return sorted(flat, key=lambda x: x.get("observed_count", 0),
reverse=True)
def prune_low_confidence(min_avg: float = 0.5,
min_runs_before_prune: int = 100) -> int:
"""Halluzinations-Schutz: löscht Patterns mit zu niedriger
Avg-Confidence nach ausreichend Runs."""
with _lock:
data = _load_raw()
before = len(data.get("patterns", []))
kept = []
for p in data.get("patterns", []):
obs = int(p.get("observed_count", 0))
avg = (float(p.get("confidence_sum", 0.0)) / obs
if obs else 0.0)
if obs >= min_runs_before_prune and avg < min_avg:
continue
kept.append(p)
data["patterns"] = kept
_save_raw(data)
_invalidate_cache()
return before - len(kept)
@@ -0,0 +1,156 @@
"""Semantic-Validator — LLM-Layer der HIGH-Findings semantisch prüft.
User-Vorgabe 2026-06-09: statt unendlich Regex-Pattern für jede
Schreibweise (Tel/Telefon/Telef./Telefonnr./Telefonnummer/Phone/Fon)
zu pflegen, nutzen wir einen LLM-Pass als 2. Layer:
1. MC-Pattern fängt 95% der Standard-Schreibweisen.
2. Bei MC-MISS einmaliger LLM-Call: "ist die Pflichtangabe semantisch
doch da, nur unter abweichendem Label?"
3. Wenn ja: HIGH-Finding wird zu LOW "Best-Practice Umbenennung".
4. Wenn nein: HIGH-Finding bleibt.
Vorteile:
- Pattern bleiben schlank
- Output für Kunden ist konkret: "Bitte 'Management' in 'Geschäftsführer'
umbenennen"
- 1 LLM-Call pro Slot statt N → Cost-effizient
- Self-correcting: Pattern-Lücken werden vom LLM gefangen
"""
from __future__ import annotations
import logging
from ._escalation import cascade
logger = logging.getLogger(__name__)
# Standard-Bezeichnungen pro field_id — der Soll-Wortlaut den der
# Kunde verwenden sollte (für die Umbenennungs-Empfehlung).
STANDARD_LABELS: dict[str, str] = {
# Impressum
"kontakt_telefon": "Telefon",
"kontakt_email": "E-Mail",
"handelsregister": "Handelsregister",
"ust_id": "Umsatzsteuer-Identifikationsnummer (USt-IdNr.)",
"vertretungsberechtigte": "Geschäftsführer (bei GmbH) / Vorstand (bei AG)",
"vertretungsberechtigte_label_korrekt":
"Geschäftsführer (bei GmbH) / Vorstand (bei AG)",
"name_anbieter": "Anbieter / Anschrift",
"aufsichtsbehoerde": "Aufsichtsbehörde",
"verantwortlicher_redaktion": "Inhaltlich Verantwortlicher nach § 18 MStV",
"verbraucher_streitbeilegung": "Verbraucherstreitbeilegung (VSBG)",
"berufsangaben": "Berufsbezeichnung",
"odr_link": "OS-Plattform der EU",
# Cookie-Policy
"categories_named": "Cookie-Kategorien (essentiell, funktional, analytics, marketing)",
"purpose_described": "Verarbeitungszweck",
"retention_duration": "Speicherdauer / Laufzeit",
"vendor_recipients": "Empfänger / Drittanbieter",
"opt_out_mechanism": "Opt-Out-Mechanismus",
"banner_reopen": "Cookie-Einstellungen ändern",
"version_date": "Stand / Letzte Aktualisierung",
"third_country_transfer": "Drittland-Übermittlung (Schrems II)",
"legal_basis": "Rechtsgrundlage (Art. 6 DSGVO / § 25 TDDDG)",
"cookie_table_or_list": "Cookie-Tabelle",
"dpo_contact": "Datenschutzbeauftragter (DSB)",
"browser_settings_hint": "Browser-Einstellungen",
}
_SYSTEM_PROMPT = """Du bist Compliance-Pruefer. Aufgabe: ein Dokument
und eine Liste fehlender Pflichtangaben pruefen. Fuer JEDE Pflichtangabe
entscheiden: ist sie inhaltlich vorhanden, vielleicht unter einem
abweichenden Label/Schreibweise?
WICHTIG:
- 'Vorhanden' nur wenn der Inhalt eindeutig erkennbar ist
(z.B. eine Telefonnummer mit Vorwahl, nicht nur das Wort 'Telefon').
- Bei unsicher: 'found': false zurueckgeben.
- Wenn vorhanden: das tatsaechlich verwendete Label angeben
(z.B. 'Management' statt 'Geschaeftsfuehrer', 'Fon' statt 'Telefon').
Antwort NUR als JSON:
{
"results": [
{"field_id": "...",
"found": true|false,
"label_used": "tatsächlich verwendetes Label",
"evidence": "kurzes wörtliches Zitat",
"confidence": 0.0-1.0}
]
}
"""
async def validate_present(
text: str,
missing_fields: list[tuple[str, str]],
) -> dict[str, dict]:
"""Prüft per LLM ob die genannten Felder semantisch doch im Text sind.
Args:
text: Volltext des Dokuments.
missing_fields: Liste (field_id, beschreibung) die das MC-Pattern
NICHT gefunden hat.
Returns:
dict[field_id, {"found", "label_used", "evidence", "confidence"}]
Leeres Dict wenn LLM nicht erreichbar oder unsicher.
"""
if not missing_fields or len(text) < 100:
return {}
lines = ["FEHLENDE PFLICHTANGABEN (zum Pruefen):"]
for fid, label in missing_fields:
lines.append(f" - {fid}: {label}")
lines.append("")
lines.append(f"DOKUMENT-TEXT:\n{text[:4000]}")
lines.append("")
lines.append("Liste pro field_id ob die Pflichtangabe vorhanden "
"ist (auch unter abweichendem Label). Nur JSON.")
user_prompt = "\n".join(lines)
res, _logs = await cascade(_SYSTEM_PROMPT, user_prompt)
if res is None:
return {}
parsed = res.parsed if isinstance(res.parsed, (dict, list)) else None
if parsed is None:
return {}
rows = (parsed.get("results")
if isinstance(parsed, dict) else parsed)
if not isinstance(rows, list):
return {}
out: dict[str, dict] = {}
for row in rows:
if not isinstance(row, dict):
continue
fid = str(row.get("field_id") or "")
if not fid:
continue
out[fid] = {
"found": bool(row.get("found")),
"label_used": str(row.get("label_used") or "")[:60],
"evidence": str(row.get("evidence") or "")[:200],
"confidence": float(row.get("confidence") or 0.5),
}
return out
def standard_label(field_id: str) -> str:
"""Soll-Bezeichnung für eine Pflichtangabe."""
return STANDARD_LABELS.get(field_id, field_id)
def build_rename_action(
field_id: str, label_used: str,
) -> str:
"""Erzeugt die Best-Practice-Umbenennungs-Empfehlung."""
std = standard_label(field_id)
return (
f"Best-Practice Umbenennung: '{label_used}''{std}'. "
f"Inhalt ist vorhanden, nur das Label weicht von der "
f"Standard-Terminologie ab. Eine einheitliche Bezeichnung "
f"erleichtert dem Nutzer das Auffinden der Pflichtangabe und "
f"bei Behörden-Prüfungen die Anerkennung."
)
@@ -28,7 +28,12 @@ from .._base import (
lint_output,
)
from .._escalation import cascade
from .._pattern_library import load_patterns_for, record as record_pattern
from .._rollup import rollup
from .._semantic_validator import (
build_rename_action,
validate_present,
)
from .mcs import MC_IDS, MCS, detect_automotive, scope_matches
logger = logging.getLogger(__name__)
@@ -90,6 +95,18 @@ class ImpressumAgent(BaseSpecialistAgent):
))
continue
found = any(p.search(text) for p in mc.patterns)
if not found:
# 1.11: Auto-Learning — gelernte Labels probieren.
# Wenn ein gelerntes Pattern matcht: als OK werten +
# Coverage-Reason markiert das.
learned = load_patterns_for(mc.field_id, self.agent_id)
if any(lp.search(text) for lp in learned):
coverage.append(McCoverage(
mc_id=mc.mc_id, status="ok",
reason=f"learned-pattern matched "
f"({len(learned)} gelernt)",
))
continue
if found:
coverage.append(McCoverage(
mc_id=mc.mc_id, status="ok",
@@ -122,6 +139,11 @@ class ImpressumAgent(BaseSpecialistAgent):
reason="missing",
))
# Semantic-Validator: prüft per LLM ob HIGH-Missings doch
# vorhanden sind (unter abweichendem Label). Demoted HIGH→LOW
# mit Rename-Empfehlung wenn ja. User-Vorgabe 2026-06-09.
await self._semantic_demote(text, mc_findings, coverage)
# Eskalation: für die identifizierten Lücken kann ein LLM
# zusätzliche Tiefen-Findings liefern (z.B. "Geschäftsführer
# genannt, aber ohne Nachname"). Confidence der MC-Findings
@@ -147,6 +169,87 @@ class ImpressumAgent(BaseSpecialistAgent):
start, mc_findings, esc_logs, coverage, confidence=overall,
)
async def _semantic_demote(
self,
text: str,
findings: list[Finding],
coverage: list[McCoverage],
) -> None:
"""LLM-Layer für HIGH/MEDIUM-missings — demote zu LOW wenn da."""
candidates: list[tuple[str, str, Finding]] = []
for f in findings:
# Demote-Kandidaten: HIGH oder MEDIUM-Pattern-Misses.
# LOW/INFO bleiben unverändert (sind selbst schon Best-
# Practice-Empfehlungen).
if f.severity not in (Severity.HIGH.value,
Severity.MEDIUM.value):
continue
if f.severity_reason != "missing":
continue
# Suche zugehöriges MC für die Beschreibung
mc = next((m for m in MCS if m.field_id == f.field_id), None)
label = mc.label if mc else f.field_id
candidates.append((f.field_id, label, f))
if not candidates:
return
result = await validate_present(
text, [(c[0], c[1]) for c in candidates],
)
if not result:
return
for field_id, label, finding in candidates:
row = result.get(field_id)
if not row or not row.get("found"):
continue
if row.get("confidence", 0) < 0.6:
continue
label_used = row.get("label_used") or "abweichendes Label"
# Demote in-place
finding.severity = Severity.LOW.value
finding.severity_reason = "label_mismatch"
finding.title = (
f"Label '{label_used}' weicht von Standard-"
f"Bezeichnung ab"
)
finding.evidence = row.get("evidence", "")[:200]
finding.action = build_rename_action(field_id, label_used)
conf = float(row.get("confidence") or 0.8)
finding.confidence = conf
finding.sources.append(EvidenceSource(
source_type=SourceType.LLM_LOCAL,
source_id="semantic_validator",
detail=f"LLM-confirmed: '{label_used}'",
confidence=conf,
))
# 1.11: Auto-Learning — Label-Match in der Library
# persistieren. Beim nächsten Run wird das gelernte
# Pattern bereits beim MC-Pass berücksichtigt, ohne
# erneuten LLM-Call.
try:
record_pattern(
field_id=field_id,
label_used=label_used,
confidence=conf,
agent_id=self.agent_id,
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(
"pattern-library record failed: %s", e,
)
# Update coverage status
for c in coverage:
if c.mc_id and c.mc_id.endswith(field_id.upper()):
continue
# Robuster: nach mc_id über MCS
mc = next((m for m in MCS if m.field_id == field_id), None)
if mc:
cov = next((c for c in coverage
if c.mc_id == mc.mc_id), None)
if cov:
cov.status = "low"
cov.reason = f"label_mismatch: '{label_used}'"
async def _maybe_escalate(
self, text: str, scope: set[str],
) -> tuple[list[Finding], list[EscalationLog]]:
@@ -0,0 +1,108 @@
"""Tests für die Auto-Learning-Pattern-Library."""
from __future__ import annotations
import json
import pytest
@pytest.fixture
def tmp_lib(tmp_path, monkeypatch):
p = tmp_path / "patterns.json"
monkeypatch.setenv("AGENT_PATTERN_LIBRARY", str(p))
import compliance.services.specialist_agents._pattern_library as lib
lib._invalidate_cache()
yield lib, p
lib._invalidate_cache()
def test_record_creates_file(tmp_lib):
lib, p = tmp_lib
assert not p.exists()
lib.record("kontakt_telefon", "Telefonnr.", 0.9, "impressum")
assert p.exists()
data = json.loads(p.read_text())
assert len(data["patterns"]) == 1
assert data["patterns"][0]["label_used"] == "Telefonnr."
assert data["patterns"][0]["observed_count"] == 1
def test_record_increments_existing(tmp_lib):
lib, _ = tmp_lib
lib.record("kontakt_telefon", "Telefonnr.", 0.9, "impressum")
lib.record("kontakt_telefon", "Telefonnr.", 0.85, "impressum")
lib.record("kontakt_telefon", "telefonnr.", 0.8, "impressum") # case-i
raws = lib.list_all()
assert len(raws) == 1
assert raws[0]["observed_count"] == 3
def test_record_separate_per_field_id(tmp_lib):
lib, _ = tmp_lib
lib.record("kontakt_telefon", "Tel", 0.9, "impressum")
lib.record("kontakt_email", "Tel", 0.9, "impressum")
assert len(lib.list_all()) == 2
def test_record_empty_inputs_noop(tmp_lib):
lib, p = tmp_lib
lib.record("", "Tel", 0.9, "impressum")
lib.record("kontakt_telefon", "", 0.9, "impressum")
lib.record("kontakt_telefon", "Tel", 0.9, "")
assert not p.exists()
def test_load_patterns_returns_compiled_regex(tmp_lib):
lib, _ = tmp_lib
lib.record("kontakt_telefon", "Telefonnr.", 0.9, "impressum")
pats = lib.load_patterns_for("kontakt_telefon", "impressum")
assert len(pats) == 1
m = pats[0].search("Hier: Telefonnr. 0761/12345")
assert m is not None
def test_load_patterns_filters_low_confidence(tmp_lib):
lib, _ = tmp_lib
lib.record("kontakt_telefon", "WeakLabel", 0.3, "impressum")
pats = lib.load_patterns_for(
"kontakt_telefon", "impressum", min_avg_confidence=0.5,
)
assert pats == []
# observed_count filter
pats = lib.load_patterns_for(
"kontakt_telefon", "impressum", min_observed=2,
)
assert pats == []
def test_label_to_regex_telefon():
from compliance.services.specialist_agents._pattern_library import (
_label_to_regex,
)
rx = _label_to_regex("Telefonnr.")
import re
assert re.search(rx, "Telefonnr. 0761/12345", re.I)
assert re.search(rx, "Telefonnr 0761", re.I)
def test_label_to_regex_email():
from compliance.services.specialist_agents._pattern_library import (
_label_to_regex,
)
rx = _label_to_regex("Mailadresse")
import re
assert re.search(rx, "Mailadresse: x@y.de", re.I)
def test_prune_low_confidence_keeps_recent(tmp_lib):
lib, _ = tmp_lib
lib.record("kontakt_telefon", "Tel", 0.9, "impressum")
pruned = lib.prune_low_confidence(min_runs_before_prune=100)
assert pruned == 0 # Nur einmal observed → noch nicht prunen
assert len(lib.list_all()) == 1
def test_load_patterns_for_nonexistent_returns_empty(tmp_lib):
lib, _ = tmp_lib
assert lib.load_patterns_for("ghost", "impressum") == []
@@ -0,0 +1,119 @@
"""Tests für den Semantic-Validator-Layer."""
from __future__ import annotations
import asyncio
import pytest
from compliance.services.specialist_agents import AgentInput, ImpressumAgent
from compliance.services.specialist_agents._semantic_validator import (
STANDARD_LABELS,
build_rename_action,
standard_label,
validate_present,
)
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_standard_labels_cover_impressum_fields():
"""Alle Impressum-Pflichtangaben müssen ein Standard-Label haben."""
for fid in (
"kontakt_telefon", "kontakt_email", "vertretungsberechtigte",
"handelsregister", "ust_id", "name_anbieter",
):
assert fid in STANDARD_LABELS, f"missing standard label: {fid}"
def test_build_rename_action_includes_old_and_new():
a = build_rename_action("kontakt_telefon", "Telefonnr.")
assert "Telefonnr." in a
assert "Telefon" in a
assert "Best-Practice" in a or "Umbenennung" in a
def test_standard_label_falls_back_to_field_id():
assert standard_label("kontakt_telefon") == "Telefon"
assert standard_label("ghost_field") == "ghost_field"
def test_validate_present_short_text_returns_empty():
out = _run(validate_present(
"x", [("kontakt_telefon", "Telefon")],
))
assert out == {}
def test_validate_present_no_fields_returns_empty():
out = _run(validate_present("Long impressum text" * 100, []))
assert out == {}
def test_semantic_demotion_high_to_low(monkeypatch):
"""Wenn LLM bestätigt dass Pflichtangabe da ist: HIGH→LOW.
Test-Setup: Impressum-Text OHNE jegliche Telefon-Markierung
(Pattern matched nicht). LLM-Mock behauptet aber 'Funkanschluss'
wäre ein abweichendes Label für die Telefonnummer.
"""
from compliance.services.specialist_agents._escalation import (
EscalationResult, SourceType,
)
from compliance.services.specialist_agents._base import EscalationLog
async def _fake_cascade(sys_prompt, user_prompt,
expect_json=True, skip_ovh=False):
# Nur auf den SVL-Prompt reagieren
if "FEHLENDE PFLICHTANGABEN" not in user_prompt:
return None, []
log = EscalationLog(
stage=SourceType.LLM_LOCAL, model="qwen2.5:7b",
duration_ms=42, success=True,
)
res = EscalationResult(
content='{"results":[]}',
stage=SourceType.LLM_LOCAL,
model="qwen2.5:7b",
log=log,
parsed={"results": [{
"field_id": "kontakt_telefon",
"found": True,
"label_used": "Funkanschluss",
"evidence": "Funkanschluss 0761/123456",
"confidence": 0.9,
}]},
)
return res, [log]
monkeypatch.setattr(
"compliance.services.specialist_agents._semantic_validator.cascade",
_fake_cascade,
)
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_fake_cascade,
)
# Text OHNE Telefon-Label → MC matched nicht → HIGH-Finding
text = (
"Beispiel GmbH\nMusterstr. 1\n12345 Berlin\n"
"E-Mail: x@y.de\nFunkanschluss 0761/123456\n"
"Geschäftsführer: Max Mustermann\n"
"Handelsregister Berlin HRB 12345\n"
"USt-IdNr: DE123456789"
)
agent = ImpressumAgent()
out = _run(agent.evaluate(AgentInput(doc_type="impressum", text=text)))
telefon_findings = [f for f in out.findings
if f.field_id == "kontakt_telefon"]
assert telefon_findings, "expected MC-miss → finding"
f = telefon_findings[0]
# Erwartet: SVL hat demoted zu LOW
assert f.severity == "LOW", (
f"Erwartet: LOW nach semantic-demote, got: {f.severity}. "
f"Finding: {f}"
)
assert f.severity_reason == "label_mismatch"
assert "Funkanschluss" in f.action
assert "Telefon" in f.action