feat(impressum): v3 — Layer-Architektur auf doc_check_controls (75 DB-MCs)
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 31s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 31s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Sprint 1.12 Phase 1 (User-Vorgabe 2026-06-09):
Statt eigener 12 hartgepatchter Patterns nutzt der Impressum-Agent jetzt
die 75 echten Master-Controls aus compliance.doc_check_controls. Pipeline:
Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py / regex_boost.py)
→ wenn Pattern hits, MC wird zu PASS überschrieben
Layer 1 — Keyword-Match aus pass_criteria der 75 DB-MCs
(rag_document_checker.check_document_with_controls)
Layer 2 — BGE-M3 Embedding-Match (in rag_document_checker integriert)
Layer 3 — Semantic-Validator (LLM) für übriggebliebene HIGH/MEDIUM
+ Auto-Learning-Pattern-Library
Output-Layer bleibt unverändert: Disclaimer-Linter + Rollup-Dedup +
Methodik-First-UI.
Neue Dateien:
- impressum/v3_engine.py — Pipeline-Orchestrator
- impressum/regex_boost.py — meine 12 Patterns + Boost-Mapping
Refactored:
- impressum/agent.py — komplett umgeschrieben, agent_version=3.0
255 LOC (unter 500-Cap)
Tests: test_impressum_v3.py mit 10 neuen Tests, alle gruen. Mockt
run_v3_pipeline für offline-Lauf. Bestaetigt:
- Layer-0 erkennt Tesla-typische Felder
- Boost matched DB-MC nur bei ≥2 Keyword-Treffern in pass_criteria
- 12 Pattern-Boost-Slots + N DB-MCs in coverage
- Notes enthalten Telemetrie (v3-pipeline, Boost-Overrides)
Telemetrie wird in AgentOutput.notes ausgegeben, damit Frontend
sehen kann: 75 DB-MCs geprueft · 5 Pattern-Boosts · 3 Boost-Overrides.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,22 @@
|
||||
"""Impressum-Agent v2 — konsolidierter BaseSpecialistAgent.
|
||||
"""Impressum-Agent v3 — baut auf doc_check_controls (75 echte MCs aus DB).
|
||||
|
||||
Ablauf:
|
||||
1. Deterministische MCs durchlaufen → Findings + mc_coverage.
|
||||
2. Wenn unklare Felder (HIGH/MEDIUM missing) → LLM-Eskalation.
|
||||
3. LLM-Findings dedupen mit MC-Findings nach field_id.
|
||||
4. Rollup → Recommendations.
|
||||
5. Disclaimer-Lint → AgentOutput.
|
||||
Sprint 1.12 (User-Vorgabe 2026-06-09):
|
||||
|
||||
Phase 1 (jetzt): qwen2.5:7b als Stage-1, OVH optional als Stage-2.
|
||||
Architektur, 4 Layer:
|
||||
Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py + regex_boost.py)
|
||||
Layer 1 — Keyword-Match aus doc_check_controls.pass_criteria
|
||||
(rag_document_checker.check_document_with_controls)
|
||||
Layer 2 — BGE-M3 Embedding-Match (in rag_document_checker integriert)
|
||||
Layer 3 — Semantic-Validator (LLM) für noch immer offene FAILs +
|
||||
Auto-Learning der Labels
|
||||
|
||||
Output-Layer (mein Stack bleibt 1:1):
|
||||
- Disclaimer-Linter
|
||||
- Rollup-Dedup
|
||||
- Methodik-First-UI
|
||||
|
||||
Vorteil: 75 statt 12 Master-Controls; DB-konsistent; Embedding-Fallback
|
||||
gratis; meine Konzepte bleiben als Boost + Output-Sanitizer.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -27,146 +36,131 @@ from .._base import (
|
||||
SourceType,
|
||||
lint_output,
|
||||
)
|
||||
from .._escalation import cascade
|
||||
from .._pattern_library import load_patterns_for, record as record_pattern
|
||||
from .._pattern_library import record as record_pattern
|
||||
from .._rollup import rollup
|
||||
from .._semantic_validator import (
|
||||
build_rename_action,
|
||||
validate_present,
|
||||
)
|
||||
from .mcs import MC_IDS, MCS, detect_automotive, scope_matches
|
||||
from .._semantic_validator import build_rename_action, validate_present
|
||||
from .mcs import MC_IDS, MCS, detect_automotive
|
||||
from .v3_engine import run_v3_pipeline
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus
|
||||
§ 5 TMG / DDG (Anbieterkennzeichnung). Aufgabe: Impressum prüfen und
|
||||
LÜCKEN aufzählen, die einer regex-basierten Vorprüfung entgangen sind.
|
||||
|
||||
WICHTIG:
|
||||
- KEINE Bewertung "rechtssicher" / "garantiert" / "konform".
|
||||
- Wenn unsicher: leeres Array zurückgeben statt zu halluzinieren.
|
||||
- Wörtliches Zitat als evidence bei jeder Lücke.
|
||||
|
||||
Antworte NUR mit JSON, Schema:
|
||||
{"findings": [
|
||||
{"field_id": "...", "severity": "HIGH|MEDIUM|LOW",
|
||||
"title": "...", "evidence": "wörtliches Zitat",
|
||||
"action": "konkrete Empfehlung"}
|
||||
]}
|
||||
"""
|
||||
_SEV_TO_ENUM = {
|
||||
"CRITICAL": Severity.HIGH,
|
||||
"HIGH": Severity.HIGH,
|
||||
"MEDIUM": Severity.MEDIUM,
|
||||
"LOW": Severity.LOW,
|
||||
"INFO": Severity.INFO,
|
||||
}
|
||||
|
||||
|
||||
class ImpressumAgent(BaseSpecialistAgent):
|
||||
agent_id = "impressum"
|
||||
agent_version = "2.0"
|
||||
agent_version = "3.0"
|
||||
doc_type = "impressum"
|
||||
# Owner-Liste der eigenen Boost-Pattern-IDs (Layer 0).
|
||||
# Die DB-MC-IDs werden dynamisch ermittelt und sind kein Owner-
|
||||
# Konflikt: doc_check_controls.control_id-Werte sind global, der
|
||||
# Agent ist Caller, nicht Owner.
|
||||
owned_mc_ids = MC_IDS
|
||||
|
||||
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
|
||||
start = datetime.now(timezone.utc)
|
||||
text = (agent_input.text or "").strip()
|
||||
scope = set(agent_input.business_scope or [])
|
||||
# Auto-detect KFZ
|
||||
is_automotive = detect_automotive(text)
|
||||
if is_automotive:
|
||||
if detect_automotive(text):
|
||||
scope.add("automotive")
|
||||
|
||||
mc_findings: list[Finding] = []
|
||||
coverage: list[McCoverage] = []
|
||||
findings: list[Finding] = []
|
||||
esc_logs: list[EscalationLog] = []
|
||||
notes_parts: list[str] = []
|
||||
|
||||
if len(text) < 50:
|
||||
# Doc zu kurz → alle als skipped
|
||||
if len(text) < 100:
|
||||
# Doc zu kurz — alle eigenen Pattern-IDs als skipped
|
||||
for mc in MCS:
|
||||
coverage.append(McCoverage(
|
||||
mc_id=mc.mc_id, status="skipped",
|
||||
reason="doc too short or empty",
|
||||
reason="text too short",
|
||||
))
|
||||
return self._finalize(
|
||||
start, mc_findings, [], coverage, confidence=0.0,
|
||||
start, findings, esc_logs, coverage,
|
||||
confidence=0.0,
|
||||
notes="Impressum-Text zu kurz oder leer.",
|
||||
)
|
||||
|
||||
for mc in MCS:
|
||||
if not scope_matches(mc, scope, is_automotive):
|
||||
coverage.append(McCoverage(
|
||||
mc_id=mc.mc_id, status="na",
|
||||
reason=f"scope mismatch (needs {mc.requires_scope})",
|
||||
))
|
||||
# ── Layer 0 + 1 + 2 (Boost + Keyword + Embedding) ──────────
|
||||
results, telemetry = await run_v3_pipeline(text, scope)
|
||||
notes_parts.append(
|
||||
f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · "
|
||||
f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · "
|
||||
f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides"
|
||||
)
|
||||
|
||||
# DB-MCs → Findings + Coverage
|
||||
seen_db_mcs: set[str] = set()
|
||||
for r in results:
|
||||
mc_id = r.get("control_id") or ""
|
||||
if not mc_id or mc_id in seen_db_mcs:
|
||||
continue
|
||||
found = any(p.search(text) for p in mc.patterns)
|
||||
if not found:
|
||||
# 1.11: Auto-Learning — gelernte Labels probieren.
|
||||
# Wenn ein gelerntes Pattern matcht: als OK werten +
|
||||
# Coverage-Reason markiert das.
|
||||
learned = load_patterns_for(mc.field_id, self.agent_id)
|
||||
if any(lp.search(text) for lp in learned):
|
||||
coverage.append(McCoverage(
|
||||
mc_id=mc.mc_id, status="ok",
|
||||
reason=f"learned-pattern matched "
|
||||
f"({len(learned)} gelernt)",
|
||||
))
|
||||
continue
|
||||
if found:
|
||||
coverage.append(McCoverage(
|
||||
mc_id=mc.mc_id, status="ok",
|
||||
))
|
||||
seen_db_mcs.add(mc_id)
|
||||
passed = bool(r.get("passed"))
|
||||
sev = _SEV_TO_ENUM.get(
|
||||
(r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM,
|
||||
)
|
||||
coverage.append(McCoverage(
|
||||
mc_id=mc_id,
|
||||
status="ok" if passed else sev.value.lower(),
|
||||
reason=str(r.get("matched_text") or r.get("hint") or "")[:120],
|
||||
))
|
||||
if passed:
|
||||
continue
|
||||
# Missing → Finding
|
||||
sev = self._sev(mc.severity_if_missing)
|
||||
action = self._build_action(mc, is_automotive)
|
||||
mc_findings.append(Finding(
|
||||
check_id=f"IMPRESSUM-AGENT-{mc.field_id.upper()}",
|
||||
label = r.get("label") or r.get("hint") or ""
|
||||
findings.append(Finding(
|
||||
check_id=f"DBMC-{mc_id}",
|
||||
agent=self.agent_id,
|
||||
agent_version=self.agent_version,
|
||||
field_id=mc.field_id,
|
||||
field_id=mc_id,
|
||||
severity=sev,
|
||||
severity_reason="missing",
|
||||
title=f"Pflichtangabe '{mc.label}' fehlt im Impressum",
|
||||
norm=mc.norm,
|
||||
severity_reason="db_mc_failed",
|
||||
title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt",
|
||||
norm=str(r.get("regulation") or "") +
|
||||
(f" Art. {r.get('article')}"
|
||||
if r.get("article") else ""),
|
||||
evidence="",
|
||||
action=action,
|
||||
confidence=0.95,
|
||||
action=str(r.get("hint") or "")[:400]
|
||||
or "Bitte gegen die Pflichtangaben prüfen.",
|
||||
confidence=0.9,
|
||||
sources=[EvidenceSource(
|
||||
source_type=SourceType.MC,
|
||||
source_id=mc.mc_id,
|
||||
detail=f"regex check {len(mc.patterns)} pattern(s) negative",
|
||||
source_id=mc_id,
|
||||
detail=str(r.get("source") or "keyword_match")[:120],
|
||||
confidence=0.9,
|
||||
)],
|
||||
))
|
||||
|
||||
# Layer 0: eigene Pattern-IDs immer mit ins coverage (für UI)
|
||||
boost_ids = set(telemetry.get("layer_0_field_ids") or [])
|
||||
for mc in MCS:
|
||||
cov_status = "ok" if mc.field_id in boost_ids else "na"
|
||||
cov_reason = ("regex-boost hit"
|
||||
if mc.field_id in boost_ids
|
||||
else "kein Pattern-Treffer (kein Veto)")
|
||||
coverage.append(McCoverage(
|
||||
mc_id=mc.mc_id,
|
||||
status=sev.value.lower(),
|
||||
reason="missing",
|
||||
mc_id=mc.mc_id, status=cov_status, reason=cov_reason,
|
||||
))
|
||||
|
||||
# Semantic-Validator: prüft per LLM ob HIGH-Missings doch
|
||||
# vorhanden sind (unter abweichendem Label). Demoted HIGH→LOW
|
||||
# mit Rename-Empfehlung wenn ja. User-Vorgabe 2026-06-09.
|
||||
await self._semantic_demote(text, mc_findings, coverage)
|
||||
# ── Layer 3: Semantic-Validator nur für HIGH/MEDIUM-Fails ──
|
||||
await self._semantic_demote(text, findings, coverage)
|
||||
|
||||
# Eskalation: für die identifizierten Lücken kann ein LLM
|
||||
# zusätzliche Tiefen-Findings liefern (z.B. "Geschäftsführer
|
||||
# genannt, aber ohne Nachname"). Confidence der MC-Findings
|
||||
# ist hoch — eskalieren wir wegen "weitere subtile Lücken
|
||||
# finden", nicht weil wir unsicher sind.
|
||||
esc_findings, esc_logs = await self._maybe_escalate(text, scope)
|
||||
|
||||
# Dedup per field_id (MC hat Priorität)
|
||||
seen_fields = {f.field_id for f in mc_findings if f.field_id}
|
||||
for f in esc_findings:
|
||||
if f.field_id and f.field_id in seen_fields:
|
||||
continue
|
||||
mc_findings.append(f)
|
||||
|
||||
# Confidence: harmonisches Mittel über alle Finding-Confidences
|
||||
if mc_findings:
|
||||
confs = [f.confidence for f in mc_findings if f.confidence]
|
||||
overall = sum(confs) / len(confs) if confs else 0.8
|
||||
else:
|
||||
overall = 0.95 # nichts gefunden → alle MCs ok
|
||||
# Confidence: harmonic mean der Findings (oder hoch wenn 0)
|
||||
confs = [f.confidence for f in findings if f.confidence] or [0.95]
|
||||
overall = sum(confs) / len(confs)
|
||||
|
||||
return self._finalize(
|
||||
start, mc_findings, esc_logs, coverage, confidence=overall,
|
||||
start, findings, esc_logs, coverage,
|
||||
confidence=overall,
|
||||
notes=" · ".join(notes_parts),
|
||||
)
|
||||
|
||||
async def _semantic_demote(
|
||||
@@ -175,45 +169,37 @@ class ImpressumAgent(BaseSpecialistAgent):
|
||||
findings: list[Finding],
|
||||
coverage: list[McCoverage],
|
||||
) -> None:
|
||||
"""LLM-Layer für HIGH/MEDIUM-missings — demote zu LOW wenn da."""
|
||||
candidates: list[tuple[str, str, Finding]] = []
|
||||
for f in findings:
|
||||
# Demote-Kandidaten: HIGH oder MEDIUM-Pattern-Misses.
|
||||
# LOW/INFO bleiben unverändert (sind selbst schon Best-
|
||||
# Practice-Empfehlungen).
|
||||
if f.severity not in (Severity.HIGH.value,
|
||||
Severity.MEDIUM.value):
|
||||
continue
|
||||
if f.severity_reason != "missing":
|
||||
continue
|
||||
# Suche zugehöriges MC für die Beschreibung
|
||||
mc = next((m for m in MCS if m.field_id == f.field_id), None)
|
||||
label = mc.label if mc else f.field_id
|
||||
candidates.append((f.field_id, label, f))
|
||||
"""LLM-Layer für HIGH/MEDIUM-DB-MCs: Label-Mismatch-Check.
|
||||
Bei Fund → HIGH/MEDIUM → LOW + Rename-Action."""
|
||||
candidates = [
|
||||
f for f in findings
|
||||
if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value)
|
||||
and f.severity_reason == "db_mc_failed"
|
||||
]
|
||||
if not candidates:
|
||||
return
|
||||
result = await validate_present(
|
||||
text, [(c[0], c[1]) for c in candidates],
|
||||
text, [(f.field_id, f.title[:80]) for f in candidates],
|
||||
)
|
||||
if not result:
|
||||
return
|
||||
for field_id, label, finding in candidates:
|
||||
row = result.get(field_id)
|
||||
for finding in candidates:
|
||||
row = result.get(finding.field_id)
|
||||
if not row or not row.get("found"):
|
||||
continue
|
||||
if row.get("confidence", 0) < 0.6:
|
||||
continue
|
||||
label_used = row.get("label_used") or "abweichendes Label"
|
||||
# Demote in-place
|
||||
conf = float(row.get("confidence") or 0.8)
|
||||
finding.severity = Severity.LOW.value
|
||||
finding.severity_reason = "label_mismatch"
|
||||
finding.title = (
|
||||
f"Label '{label_used}' weicht von Standard-"
|
||||
f"Bezeichnung ab"
|
||||
f"Label '{label_used}' weicht von Standard ab"
|
||||
)
|
||||
finding.evidence = str(row.get("evidence") or "")[:200]
|
||||
finding.action = build_rename_action(
|
||||
finding.field_id, label_used,
|
||||
)
|
||||
finding.evidence = row.get("evidence", "")[:200]
|
||||
finding.action = build_rename_action(field_id, label_used)
|
||||
conf = float(row.get("confidence") or 0.8)
|
||||
finding.confidence = conf
|
||||
finding.sources.append(EvidenceSource(
|
||||
source_type=SourceType.LLM_LOCAL,
|
||||
@@ -221,79 +207,20 @@ class ImpressumAgent(BaseSpecialistAgent):
|
||||
detail=f"LLM-confirmed: '{label_used}'",
|
||||
confidence=conf,
|
||||
))
|
||||
# 1.11: Auto-Learning — Label-Match in der Library
|
||||
# persistieren. Beim nächsten Run wird das gelernte
|
||||
# Pattern bereits beim MC-Pass berücksichtigt, ohne
|
||||
# erneuten LLM-Call.
|
||||
# Coverage update + auto-learning
|
||||
for c in coverage:
|
||||
if c.mc_id and c.mc_id == f"DBMC-{finding.field_id}":
|
||||
c.status = "low"
|
||||
c.reason = f"label_mismatch: '{label_used}'"
|
||||
try:
|
||||
record_pattern(
|
||||
field_id=field_id,
|
||||
field_id=finding.field_id,
|
||||
label_used=label_used,
|
||||
confidence=conf,
|
||||
agent_id=self.agent_id,
|
||||
)
|
||||
except Exception as e:
|
||||
import logging
|
||||
logging.getLogger(__name__).warning(
|
||||
"pattern-library record failed: %s", e,
|
||||
)
|
||||
# Update coverage status
|
||||
for c in coverage:
|
||||
if c.mc_id and c.mc_id.endswith(field_id.upper()):
|
||||
continue
|
||||
# Robuster: nach mc_id über MCS
|
||||
mc = next((m for m in MCS if m.field_id == field_id), None)
|
||||
if mc:
|
||||
cov = next((c for c in coverage
|
||||
if c.mc_id == mc.mc_id), None)
|
||||
if cov:
|
||||
cov.status = "low"
|
||||
cov.reason = f"label_mismatch: '{label_used}'"
|
||||
|
||||
async def _maybe_escalate(
|
||||
self, text: str, scope: set[str],
|
||||
) -> tuple[list[Finding], list[EscalationLog]]:
|
||||
"""LLM-Stage. Aktivierbar via Agent-Settings (default an)."""
|
||||
# Hard cap auf Text-Größe für den LLM-Pass
|
||||
user_prompt = (
|
||||
f"BUSINESS-SCOPE: {', '.join(sorted(scope))}\n\n"
|
||||
f"IMPRESSUM-TEXT:\n{text[:4000]}\n\n"
|
||||
"Liste subtile Lücken nach § 5 TMG. Nur JSON."
|
||||
)
|
||||
res, logs = await cascade(_SYSTEM_PROMPT, user_prompt)
|
||||
if res is None or not isinstance(res.parsed, (dict, list)):
|
||||
return [], logs
|
||||
raw = (res.parsed.get("findings")
|
||||
if isinstance(res.parsed, dict) else res.parsed)
|
||||
if not isinstance(raw, list):
|
||||
return [], logs
|
||||
out: list[Finding] = []
|
||||
for item in raw:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
fid = str(item.get("field_id") or "unknown")[:40]
|
||||
sev_raw = str(item.get("severity") or "MEDIUM").upper()
|
||||
sev = self._sev(sev_raw)
|
||||
out.append(Finding(
|
||||
check_id=f"IMPRESSUM-AGENT-LLM-{fid.upper()}",
|
||||
agent=self.agent_id,
|
||||
agent_version=self.agent_version,
|
||||
field_id=fid,
|
||||
severity=sev,
|
||||
severity_reason="llm_detected",
|
||||
title=str(item.get("title") or "")[:200],
|
||||
norm="§ 5 TMG / DDG (LLM-Analyse)",
|
||||
evidence=str(item.get("evidence") or "")[:300],
|
||||
action=str(item.get("action") or "")[:400],
|
||||
confidence=0.7,
|
||||
sources=[EvidenceSource(
|
||||
source_type=res.stage,
|
||||
source_id=res.model,
|
||||
detail=f"prompt_chars={len(user_prompt)}",
|
||||
confidence=0.7,
|
||||
)],
|
||||
))
|
||||
return out, logs
|
||||
logger.warning("pattern-library record failed: %s", e)
|
||||
|
||||
def _finalize(
|
||||
self,
|
||||
@@ -326,29 +253,3 @@ class ImpressumAgent(BaseSpecialistAgent):
|
||||
mc_low=sum(1 for c in coverage if c.status == "low"),
|
||||
)
|
||||
return lint_output(out)
|
||||
|
||||
@staticmethod
|
||||
def _sev(value: str) -> Severity:
|
||||
v = (value or "").upper()
|
||||
if v == "HIGH":
|
||||
return Severity.HIGH
|
||||
if v == "MEDIUM":
|
||||
return Severity.MEDIUM
|
||||
if v == "LOW":
|
||||
return Severity.LOW
|
||||
return Severity.INFO
|
||||
|
||||
@staticmethod
|
||||
def _build_action(mc, is_automotive: bool) -> str:
|
||||
if mc.field_id == "aufsichtsbehoerde" and is_automotive:
|
||||
return (
|
||||
"Aufsichtsbehörde im Impressum benennen. Für "
|
||||
"KFZ-Hersteller/-Vertrieb typisch: Kraftfahrt-"
|
||||
"Bundesamt (KBA), Fördestraße 16, 24944 Flensburg, "
|
||||
"www.kba.de. Bei Ladestrom-Vertrieb zusätzlich "
|
||||
"Bundesnetzagentur (BNetzA)."
|
||||
)
|
||||
return (
|
||||
f"{mc.label} im Impressum ergänzen "
|
||||
f"(Pflichtangabe nach {mc.norm})."
|
||||
)
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
"""Layer-0 Regex-Boost — meine 12 Patterns als deterministische Vor-Stufe
|
||||
vor dem Keyword-Match aus doc_check_controls.
|
||||
|
||||
Idee (Sprint 1.12, User-Vorgabe 2026-06-09):
|
||||
- rag_document_checker.py macht Keyword-Match aus den natursprachlichen
|
||||
pass_criteria der 75 Impressum-MCs in der DB. Wenn 60% der Keywords
|
||||
matchen → PASS.
|
||||
- Bei FAIL läuft Embedding-Match (Layer 2).
|
||||
- MEIN Beitrag (Layer 0): meine 12 enger gefassten Regex-Pattern
|
||||
laufen ZUERST. Wenn ein Pattern matched → das thematisch passende
|
||||
MC wird zu PASS überschrieben (auch wenn Keyword-Match unklar war).
|
||||
- Mapping: jedem meiner field_id → Liste von Schlüsselwörtern wie sie
|
||||
in pass_criteria stehen würden. Wenn pass_criteria diese Wörter
|
||||
enthält → dieses MC gehört zu meinem field_id.
|
||||
|
||||
Damit gehen meine 12 Pattern nicht verloren, sondern boost'en das
|
||||
bestehende System.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from .mcs import MCS, detect_automotive, scope_matches
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Für jedes meiner field_id: welche Wörter erscheinen typisch in
|
||||
# der pass_criteria der zugehörigen DB-MCs? Wenn diese Wörter im
|
||||
# pass_criteria gefunden werden, ist es vermutlich derselbe MC.
|
||||
BOOST_KEYWORDS: dict[str, tuple[str, ...]] = {
|
||||
"name_anbieter": (
|
||||
"rechtsform", "anschrift", "anbieter", "firmensitz", "firmenname",
|
||||
"diensteanbieter", "verantwortlich",
|
||||
),
|
||||
"kontakt_email": (
|
||||
"e-mail", "email", "elektronische", "kontaktmöglichkeit",
|
||||
"mailadresse",
|
||||
),
|
||||
"kontakt_telefon": (
|
||||
"telefon", "rufnummer", "telefonnummer", "phone", "kontaktdaten",
|
||||
"telekommunikation",
|
||||
),
|
||||
"handelsregister": (
|
||||
"handelsregister", "registergericht", "hrb", "registernummer",
|
||||
),
|
||||
"ust_id": (
|
||||
"umsatzsteuer", "ust-id", "umsatzsteueridentifikation", "ust-idnr",
|
||||
),
|
||||
"vertretungsberechtigte": (
|
||||
"geschäftsführer", "vorstand", "vertretungsberechtigt",
|
||||
"vertretung", "gesellschafter",
|
||||
),
|
||||
"vertretungsberechtigte_label_korrekt": (
|
||||
"deutsche", "bezeichnung", "rechtsform",
|
||||
),
|
||||
"aufsichtsbehoerde": (
|
||||
"aufsichtsbehörde", "aufsicht", "behörde", "regulierungsbehörde",
|
||||
),
|
||||
"verantwortlicher_redaktion": (
|
||||
"redaktion", "verantwortlich", "rstv", "mstv",
|
||||
"journalistisch", "publizistisch",
|
||||
),
|
||||
"verbraucher_streitbeilegung": (
|
||||
"streitbeilegung", "vsbg", "verbraucherschlichtung",
|
||||
"schlichtungsstelle",
|
||||
),
|
||||
"berufsangaben": (
|
||||
"berufsbezeichnung", "berufsordnung", "kammer", "berufsrecht",
|
||||
),
|
||||
"odr_link": (
|
||||
"online-streitbeilegung", "os-plattform", "odr",
|
||||
"europäische kommission",
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def compute_regex_boosts(text: str, business_scope: set[str]) -> set[str]:
|
||||
"""Welche meiner field_ids hat das Pattern erkannt?
|
||||
|
||||
Returns die Menge an gehit'ten field_ids für die später entschieden
|
||||
wird ob ein doc_check_control darüber automatisch passed werden kann.
|
||||
"""
|
||||
if not text or len(text) < 50:
|
||||
return set()
|
||||
hits: set[str] = set()
|
||||
is_auto = detect_automotive(text)
|
||||
for mc in MCS:
|
||||
if not scope_matches(mc, business_scope, is_auto):
|
||||
continue
|
||||
if any(p.search(text) for p in mc.patterns):
|
||||
hits.add(mc.field_id)
|
||||
return hits
|
||||
|
||||
|
||||
def boost_matches_db_mc(boosts: set[str], pass_criteria: list) -> str | None:
|
||||
"""Hat ein gebooster field_id genug Keyword-Überlapp mit den
|
||||
pass_criteria einer DB-MC, um den MC zu boost'en?
|
||||
|
||||
Returns: field_id (matched), oder None.
|
||||
Vorsichtig: ≥2 Boost-Keywords müssen im pass_criteria-Text auftauchen,
|
||||
sonst zu permissiv.
|
||||
"""
|
||||
if not boosts or not pass_criteria:
|
||||
return None
|
||||
crit_text = " ".join(
|
||||
str(c) for c in pass_criteria if c
|
||||
).lower()
|
||||
best: tuple[int, str] | None = None
|
||||
for field_id in boosts:
|
||||
kws = BOOST_KEYWORDS.get(field_id) or ()
|
||||
match_count = sum(1 for kw in kws if kw in crit_text)
|
||||
if match_count >= 2:
|
||||
if best is None or match_count > best[0]:
|
||||
best = (match_count, field_id)
|
||||
return best[1] if best else None
|
||||
@@ -0,0 +1,104 @@
|
||||
"""Sprint-1.12 v3-Engine: läuft die volle 4-Layer-Pipeline auf einem
|
||||
Impressum-Text:
|
||||
|
||||
Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py)
|
||||
Layer 1 — Keyword-Match aus doc_check_controls.pass_criteria
|
||||
(75 MCs in DB für Impressum)
|
||||
Layer 2 — BGE-M3 Embedding-Match als Fallback (im
|
||||
rag_document_checker integriert)
|
||||
Layer 3 — Semantic-Validator (LLM) wenn auch Embedding nicht half
|
||||
(im Agent angefasst, hier nur Ergebnisse durchgereicht)
|
||||
|
||||
Output: Liste Result-Dicts kompatibel mit rag_document_checker (passed,
|
||||
severity, control_id, regulation, ...). Der Agent konvertiert sie dann
|
||||
zu Finding-Objekten.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from .regex_boost import boost_matches_db_mc, compute_regex_boosts
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def run_v3_pipeline(
|
||||
text: str,
|
||||
business_scope: set[str],
|
||||
db_url: str = "",
|
||||
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
|
||||
"""Returns (results, telemetry).
|
||||
|
||||
results: pro DB-MC ein dict {control_id, passed, severity, ...}
|
||||
telemetry: counters für Frontend-Anzeige (Layer-Aufschlüsselung)
|
||||
"""
|
||||
if not text or len(text) < 100:
|
||||
return [], {"reason": "text too short"}
|
||||
|
||||
# Layer 0: meine Regex-Patterns
|
||||
boosts = compute_regex_boosts(text, business_scope)
|
||||
boost_field_ids = sorted(boosts)
|
||||
logger.info("v3 Layer-0 boosts: %d hits — %s",
|
||||
len(boost_field_ids), boost_field_ids)
|
||||
|
||||
# Layer 1+2: bestehender rag_document_checker (Keyword + Embedding)
|
||||
try:
|
||||
from compliance.services.rag_document_checker import (
|
||||
check_document_with_controls,
|
||||
)
|
||||
results = await check_document_with_controls(
|
||||
text=text,
|
||||
doc_type="impressum",
|
||||
doc_title="Impressum (Agent-Test)",
|
||||
db_url=db_url,
|
||||
max_controls=0,
|
||||
use_agent=False,
|
||||
business_scope=business_scope,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("rag_document_checker failed: %s — using boosts only",
|
||||
e)
|
||||
results = []
|
||||
|
||||
layer_1_pass = sum(1 for r in results if r.get("passed"))
|
||||
layer_1_fail = sum(1 for r in results
|
||||
if r.get("passed") is False)
|
||||
|
||||
# Layer 0 Override: failed MCs deren pass_criteria zu einem meiner
|
||||
# gebooster field_ids passt → überschreiben zu PASS
|
||||
boost_overrides = 0
|
||||
for r in results:
|
||||
if r.get("passed"):
|
||||
continue
|
||||
# rag_document_checker nimmt pass_criteria intern weg vor
|
||||
# dem Return; wir laden sie nochmal (oder bekommen sie via
|
||||
# 'hint'). Hier rufen wir das per Helper.
|
||||
crit = r.get("_pass_criteria") or []
|
||||
if not crit:
|
||||
# Fallback: aus dem Hint (= check_question) Boost-Match
|
||||
# versuchen.
|
||||
crit = [r.get("hint") or ""]
|
||||
matched_field = boost_matches_db_mc(boosts, crit)
|
||||
if matched_field:
|
||||
r["passed"] = True
|
||||
r["matched_text"] = (
|
||||
f"[regex-boost layer 0 — field {matched_field}]"
|
||||
)
|
||||
existing_hint = r.get("hint") or ""
|
||||
r["hint"] = (existing_hint +
|
||||
" (passed via regex-boost layer 0)").strip()
|
||||
r["source"] = (r.get("source") or "") + "+regex_boost"
|
||||
boost_overrides += 1
|
||||
|
||||
telemetry = {
|
||||
"layer_0_field_hits": len(boost_field_ids),
|
||||
"layer_0_field_ids": boost_field_ids,
|
||||
"layer_1_pass": layer_1_pass,
|
||||
"layer_1_fail": layer_1_fail,
|
||||
"layer_0_boost_overrides": boost_overrides,
|
||||
"total_mcs": len(results),
|
||||
}
|
||||
logger.info("v3 telemetry: %s", telemetry)
|
||||
return results, telemetry
|
||||
@@ -0,0 +1,168 @@
|
||||
"""Tests für Impressum-Agent v3 (Sprint 1.12).
|
||||
|
||||
Mockt rag_document_checker damit Tests offline laufen + prüft die
|
||||
Layer-0-Boost-Logik isoliert.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from compliance.services.specialist_agents import (
|
||||
AgentInput,
|
||||
ImpressumAgent,
|
||||
Severity,
|
||||
)
|
||||
from compliance.services.specialist_agents.impressum.regex_boost import (
|
||||
BOOST_KEYWORDS,
|
||||
boost_matches_db_mc,
|
||||
compute_regex_boosts,
|
||||
)
|
||||
|
||||
|
||||
TESLA_TEXT = (
|
||||
"Tesla Germany GmbH\nLudwig-Prandtl-Strasse 25-29\n12526 Berlin\n"
|
||||
"E-Mail: info@tesla.com\n"
|
||||
"Telefon: +49 89 1250 16 800\n"
|
||||
"Management: Elon Musk\n"
|
||||
"Handelsregister: HRB 218904 B Charlottenburg\n"
|
||||
"USt-IdNr: DE123456789\n"
|
||||
)
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.get_event_loop().run_until_complete(coro)
|
||||
|
||||
|
||||
def test_compute_regex_boosts_detects_basic_fields():
|
||||
hits = compute_regex_boosts(TESLA_TEXT, business_scope=set())
|
||||
# Tesla hat klassische Pflichtangaben
|
||||
assert "kontakt_email" in hits
|
||||
assert "kontakt_telefon" in hits
|
||||
assert "handelsregister" in hits
|
||||
assert "ust_id" in hits
|
||||
assert "vertretungsberechtigte" in hits # "Management"
|
||||
# KFZ-Auto-Detect → aufsichtsbehoerde wäre relevant aber kein
|
||||
# Pattern getroffen (KBA nicht genannt)
|
||||
|
||||
|
||||
def test_compute_regex_boosts_short_text_empty():
|
||||
assert compute_regex_boosts("x", business_scope=set()) == set()
|
||||
|
||||
|
||||
def test_boost_matches_db_mc_finds_telefon():
|
||||
boosts = {"kontakt_telefon"}
|
||||
pass_crit = [
|
||||
"Telefonnummer angeben",
|
||||
"Erreichbar per Telefon und E-Mail",
|
||||
]
|
||||
matched = boost_matches_db_mc(boosts, pass_crit)
|
||||
assert matched == "kontakt_telefon"
|
||||
|
||||
|
||||
def test_boost_matches_db_mc_returns_none_when_unrelated():
|
||||
boosts = {"kontakt_telefon"}
|
||||
pass_crit = [
|
||||
"Cookie-Banner muss zentriert sein",
|
||||
]
|
||||
assert boost_matches_db_mc(boosts, pass_crit) is None
|
||||
|
||||
|
||||
def test_boost_keywords_cover_all_field_ids():
|
||||
"""Jedes mcs.py field_id muss in BOOST_KEYWORDS ein Eintrag haben."""
|
||||
from compliance.services.specialist_agents.impressum.mcs import MCS
|
||||
for mc in MCS:
|
||||
assert mc.field_id in BOOST_KEYWORDS, (
|
||||
f"BOOST_KEYWORDS missing for {mc.field_id}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_v3(monkeypatch):
|
||||
"""Mockt run_v3_pipeline mit deterministischen Fake-Results."""
|
||||
async def _fake_pipeline(text, scope, db_url=""):
|
||||
results = [
|
||||
{"control_id": "AUTH-1954-A04",
|
||||
"passed": True,
|
||||
"label": "Anbieterkennzeichnung dokumentiert",
|
||||
"severity": "HIGH",
|
||||
"regulation": "TMG",
|
||||
"article": "§ 5",
|
||||
"hint": "",
|
||||
"matched_text": "Tesla Germany GmbH",
|
||||
"source": "keyword_match"},
|
||||
{"control_id": "DATA-2786-A04",
|
||||
"passed": False,
|
||||
"label": "Freiwilligkeit der TDDDG-Einwilligungen",
|
||||
"severity": "MEDIUM",
|
||||
"regulation": "TDDDG",
|
||||
"article": "§ 25",
|
||||
"hint": "Bitte Freiwilligkeit dokumentieren",
|
||||
"matched_text": "",
|
||||
"source": ""},
|
||||
]
|
||||
telemetry = {
|
||||
"layer_0_field_hits": 5,
|
||||
"layer_0_field_ids": ["kontakt_email", "kontakt_telefon",
|
||||
"handelsregister", "ust_id",
|
||||
"vertretungsberechtigte"],
|
||||
"layer_1_pass": 1,
|
||||
"layer_1_fail": 1,
|
||||
"layer_0_boost_overrides": 0,
|
||||
"total_mcs": 2,
|
||||
}
|
||||
return results, telemetry
|
||||
monkeypatch.setattr(
|
||||
"compliance.services.specialist_agents.impressum.agent.run_v3_pipeline",
|
||||
_fake_pipeline,
|
||||
)
|
||||
async def _no_validator(*a, **kw): return {}
|
||||
monkeypatch.setattr(
|
||||
"compliance.services.specialist_agents.impressum.agent.validate_present",
|
||||
_no_validator,
|
||||
)
|
||||
|
||||
|
||||
def test_agent_uses_db_mcs(mock_v3):
|
||||
agent = ImpressumAgent()
|
||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
||||
text=TESLA_TEXT)))
|
||||
db_mc_findings = [f for f in out.findings
|
||||
if f.check_id.startswith("DBMC-")]
|
||||
assert len(db_mc_findings) == 1
|
||||
assert db_mc_findings[0].check_id == "DBMC-DATA-2786-A04"
|
||||
assert db_mc_findings[0].severity == Severity.MEDIUM.value
|
||||
assert "TDDDG" in db_mc_findings[0].norm
|
||||
|
||||
|
||||
def test_agent_emits_boost_coverage(mock_v3):
|
||||
agent = ImpressumAgent()
|
||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
||||
text=TESLA_TEXT)))
|
||||
# 2 DB-MCs + 12 Pattern-Boost-Slots = 14 coverage entries
|
||||
assert out.mc_total >= 14
|
||||
boost_ok = [c for c in out.mc_coverage
|
||||
if c.mc_id.startswith("IMP-MC-") and c.status == "ok"]
|
||||
assert len(boost_ok) == 5 # 5 boost_ids im fake
|
||||
|
||||
|
||||
def test_agent_notes_telemetry(mock_v3):
|
||||
agent = ImpressumAgent()
|
||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum",
|
||||
text=TESLA_TEXT)))
|
||||
assert "v3-pipeline" in out.notes
|
||||
assert "Pattern-Boosts" in out.notes
|
||||
|
||||
|
||||
def test_short_text_skipped():
|
||||
agent = ImpressumAgent()
|
||||
out = _run(agent.evaluate(AgentInput(doc_type="impressum", text="x")))
|
||||
assert all(c.status == "skipped" for c in out.mc_coverage)
|
||||
assert not out.findings
|
||||
|
||||
|
||||
def test_agent_version_is_three():
|
||||
agent = ImpressumAgent()
|
||||
assert agent.agent_version == "3.0"
|
||||
Reference in New Issue
Block a user