feat(agents): Specialist-Agents Phase 2 Foundation + Cookie-Policy-Agent

Sprint 1 — Foundation (User-Vorgabe 2026-06-08):

Foundation:
- _base.py: BaseSpecialistAgent ABC + Pydantic Contract
  (AgentInput/AgentOutput/Finding/Recommendation/McCoverage/EscalationLog).
- _base.lint_output(): Disclaimer-Linter verbietet "rechtssicher" /
  "garantiert" / "gesetzeskonform" — scrubbed inline + Log in notes.
- _registry.py: AgentRegistry mit MC-Owner-Mapping (verhindert
  Doppel-Ownership).
- _escalation.py: cascade(local → ovh). qwen2.5:7b default,
  OVH 120b als Stage-2 (deaktiviert wenn OVH_URL leer).
- _rollup.py: deterministisches Dedup ähnlicher actions zu
  Recommendations mit related_finding_ids[].
- _evidence_vault.py: Pro-Run File-Vault für Playwright-Videos,
  Screenshots, CSV. SHA256 + manifest.json. DSR-tauglich (delete_run).

Agenten:
- ImpressumAgent v2 (impressum/agent.py + mcs.py) — konsolidiert
  v1-Pattern-Match + v2-LLM-MVP unter dem neuen Contract. 12 MCs.
- CookiePolicyAgent v1 (cookie_policy/agent.py + mcs.py) — 12 MCs
  zu Cookie-Richtlinie-Vollständigkeit + KB-Layer für
  CMP-Vendor-Cross-Check.

Tests: 25/25 grün (10 Impressum + 9 Vault + 6 Cookie-Policy).

Roadmap: SSE-Test-Endpoint + Frontend-Tab → DSE/AGB-Agents →
Cookie-Banner-Themen-Agent → Cross-Doc-Konsistenz-Agent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-08 17:40:05 +02:00
parent d6b8bf87c2
commit f4357a2e9b
15 changed files with 2364 additions and 10 deletions
@@ -1,17 +1,43 @@
"""Doc-Type Specialist-Agents — Phase 1 Prototyp.
"""Doc-Type Specialist-Agents — Phase 2 LLM (Foundation 2026-06-08).
Architektur:
- Pro Doc-Type ein Spezialist-Agent mit System-Prompt (Domänenwissen)
+ Knowledge-Base (anonymisierte Patterns/Statistiken aus
Multi-Mandanten-Daten)
- Jeder Agent liefert strukturierte Findings → enriched state
- Ein Cross-Doc-Router-Agent prüft ob Absätze falsch zugeordnet sind
("Cookie-Inhalt steht in AGB statt Cookie-Richtlinie")
- BaseSpecialistAgent + AgentInput/AgentOutput Contract (_base.py)
- AgentRegistry mit MC-Owner-Mapping (_registry.py)
- Eskalations-Kaskade qwen2.5:7b → OVH 120b (_escalation.py)
- Rollup-Dedup für Recommendations (_rollup.py)
- Disclaimer-Linter verbietet 'rechtssicher'/'garantiert' (_base.lint_output)
Phase 1: Impressum-Agent als Prototyp (Pattern-Match-only, ohne LLM).
Phase 2: DSE-Agent + Cross-Doc-Router (LLM-gestützt).
Phase 3+: Weitere Doc-Types + Continuous Learning der KB.
Aktive Agenten:
- impressum (v2) — § 5 TMG/DDG, § 18 MStV, § 36 VSBG
- (dse, agb, cookie_policy, cookie_banner, cross — folgen)
Privacy: KB enthält NIEMALS Roh-Mandantendaten. Anonymisierung +
Aggregation Pflicht (NER-Maskierung vor KB-Speicher).
"""
from ._base import (
AgentInput,
AgentOutput,
BaseSpecialistAgent,
EscalationLog,
EvidenceSource,
Finding,
McCoverage,
Recommendation,
Severity,
SourceType,
)
from ._registry import REGISTRY
from .cookie_policy import CookiePolicyAgent
from .impressum import ImpressumAgent
# Self-register all agents
REGISTRY.register(ImpressumAgent())
REGISTRY.register(CookiePolicyAgent())
__all__ = [
"AgentInput", "AgentOutput", "BaseSpecialistAgent",
"EscalationLog", "EvidenceSource", "Finding", "McCoverage",
"Recommendation", "Severity", "SourceType",
"REGISTRY", "ImpressumAgent", "CookiePolicyAgent",
]
@@ -0,0 +1,207 @@
"""BaseSpecialistAgent — Contract für alle Doc-Type-Agenten.
Architektur-Vertrag (User-Vorgabe 2026-06-08):
- Jeder Agent ist ein hochspezialisierter Anwalt für sein Thema.
- Standard: deterministische MCs + Regex + FAQ-KB.
- Eskalation bei Unsicherheit: qwen2.5:7b → OVH 120b → Claude (TBD).
- Jeder Output ist auditfest: Quelle pro Finding (MC-ID / Regex /
FAQ / LLM-Stufe + Prompt-Hash).
- Output enthält NIEMALS "rechtssicher" / "garantiert" — Linter
verbietet diese Wörter.
- Token-Budget pro Agent (Cost-Cap).
"""
from __future__ import annotations
import abc
import hashlib
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class Severity(str, Enum):
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
INFO = "INFO"
class SourceType(str, Enum):
"""Wo kommt das Finding her? Für die auditfeste Beweiskette."""
MC = "mc" # Machine-Check (deterministisch)
REGEX = "regex" # Pattern-Match
KB_FAQ = "kb_faq" # Knowledge-Base / kuratiert
LLM_LOCAL = "llm_local" # qwen2.5:7b oder qwen3
LLM_LOCAL_BIG = "llm_local_big" # OVH 120b
LLM_CLOUD = "llm_cloud" # Claude API (Anonymisierung Pflicht)
CROSS = "cross" # Cross-Doc-Agent
class EvidenceSource(BaseModel):
"""Eine Quelle in der Beweiskette eines Findings."""
source_type: SourceType
source_id: str = "" # mc_id, regex_name, faq_id, model
detail: str = "" # Snippet, Prompt-Hash, etc.
confidence: float = 1.0 # 0.0 1.0
class Finding(BaseModel):
"""Ein einzelnes Audit-Finding aus einem Specialist-Agent."""
model_config = ConfigDict(use_enum_values=True)
check_id: str # z.B. IMPRESSUM-AGENT-HANDELSREGISTER
agent: str # impressum_v2
agent_version: str # 2.0
field_id: str = "" # field-key innerhalb des Agenten
severity: Severity
severity_reason: str = ""
title: str
norm: str = ""
evidence: str = ""
action: str = ""
confidence: float = 1.0 # 0.0 1.0; treibt Eskalation
sources: list[EvidenceSource] = Field(default_factory=list)
class Recommendation(BaseModel):
"""Konsolidierte Maßnahme. Kann aus 1..N Findings stammen
(Rollup-Dedup-Schritt bündelt ähnliche Maßnahmen)."""
recommendation_id: str # stable hash der canonical_text
title: str
body: str
severity: Severity # höchste Severity der Quell-Findings
related_finding_ids: list[str] = Field(default_factory=list)
estimated_effort_hours: float = 0.0
class McCoverage(BaseModel):
"""Welche MC hat der Agent geprüft + Ergebnis."""
mc_id: str
status: str # ok | high | medium | low | na | skipped
reason: str = ""
class EscalationLog(BaseModel):
"""Ein Eskalations-Schritt mit Modell + Latenz + Kosten."""
stage: SourceType
model: str
duration_ms: int = 0
tokens_in: int = 0
tokens_out: int = 0
success: bool = True
error: str = ""
class AgentInput(BaseModel):
"""Was ein Agent zur Bearbeitung bekommt."""
doc_type: str
text: str
url: str = ""
business_scope: list[str] = Field(default_factory=list)
company_name: str = ""
origin_domain: str = ""
# extra context z.B. extracted_profile, multi-doc-context
context: dict[str, Any] = Field(default_factory=dict)
class AgentOutput(BaseModel):
"""Was ein Agent zurückgibt — auditfestes Resultat."""
agent: str
agent_version: str
started_at: datetime
finished_at: datetime
duration_ms: int
findings: list[Finding] = Field(default_factory=list)
recommendations: list[Recommendation] = Field(default_factory=list)
mc_coverage: list[McCoverage] = Field(default_factory=list)
escalation_log: list[EscalationLog] = Field(default_factory=list)
confidence: float = 1.0
notes: str = ""
# Speed-O-Meter aggregates (vorberechnet fürs Frontend)
mc_total: int = 0
mc_ok: int = 0
mc_na: int = 0
mc_high: int = 0
mc_medium: int = 0
mc_low: int = 0
# Verbotene Wörter im Output — sicherheitshalber, damit kein Agent
# (oder LLM-Stufe) "rechtssicher" suggeriert. User-Vorgabe.
FORBIDDEN_OUTPUT_TERMS = (
"rechtssicher",
"rechts-sicher",
"garantiert",
"garantieren",
"garantie",
"gesetzeskonform",
"gesetzes-konform",
"100% konform",
"100 % konform",
"vollständig konform",
"absolut sicher",
)
class DisclaimerViolation(ValueError):
"""Erhoben wenn ein Agent-Output ein verbotenes Wort enthält."""
def lint_output(output: AgentOutput) -> AgentOutput:
"""Prüft den Output gegen FORBIDDEN_OUTPUT_TERMS. Sanitisiert
titles/actions inline — ersetzt verbotene Begriffe durch
[→ neutraler Wortlaut empfohlen]-Marker. Loggt aber NICHT in
den Datenbestand, sondern als notes."""
issues: list[str] = []
for f in output.findings:
for field_name in ("title", "evidence", "action"):
v = getattr(f, field_name) or ""
for term in FORBIDDEN_OUTPUT_TERMS:
if term in v.lower():
issues.append(f"Finding {f.check_id}.{field_name}: '{term}'")
v = _scrub(v, term)
setattr(f, field_name, v)
for r in output.recommendations:
for field_name in ("title", "body"):
v = getattr(r, field_name) or ""
for term in FORBIDDEN_OUTPUT_TERMS:
if term in v.lower():
issues.append(f"Rec {r.recommendation_id}.{field_name}: '{term}'")
v = _scrub(v, term)
setattr(r, field_name, v)
if issues:
prefix = output.notes + " · " if output.notes else ""
output.notes = prefix + "linter scrubbed: " + "; ".join(issues[:5])
return output
def _scrub(text: str, term: str) -> str:
"""Case-insensitive replace mit Marker."""
import re as _re
return _re.sub(
_re.escape(term), "[→ neutraler Wortlaut]",
text, flags=_re.IGNORECASE,
)
def stable_recommendation_id(canonical_text: str) -> str:
"""Stable hash der canonical recommendation für Dedup-Rollup."""
h = hashlib.sha256(canonical_text.lower().strip().encode("utf-8"))
return h.hexdigest()[:16]
class BaseSpecialistAgent(abc.ABC):
"""Abstract base — jeder Agent erbt davon."""
agent_id: str = "" # "impressum", "dse", ...
agent_version: str = "0.0"
doc_type: str = "" # Doc-Type den der Agent owned
owned_mc_ids: tuple[str, ...] = ()
@abc.abstractmethod
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
"""Run the agent. MUST return a complete AgentOutput.
Implementations should call lint_output() before returning."""
@@ -0,0 +1,224 @@
"""Eskalations-Kaskade: qwen2.5:7b → OVH 120b → DOWN_FOR_REVIEW.
User-Vorgabe 2026-06-08:
- Standard: deterministisch (Pattern + MC + KB)
- Eskalation nur wenn deterministisches Ergebnis unklar bleibt.
- Stage 1: lokales qwen2.5:7b (schnell, billig)
- Stage 2: OVH 120b (groß, externalisiert)
- Stage 3: Claude API mit Anonymisierung — NICHT in Phase 1 aktiv.
- Bei Total-Fail: Output "DOWN_FOR_REVIEW" (kein Halluzinieren).
Cost-Cap: jeder Agent meldet sein Token-Budget. Wenn überschritten:
abbrechen und "manuelle Prüfung empfohlen" emittieren.
"""
from __future__ import annotations
import json
import logging
import os
import time
from typing import Any
import httpx
from ._base import EscalationLog, SourceType
logger = logging.getLogger(__name__)
OLLAMA_URL = os.environ.get(
"OLLAMA_URL", "http://host.docker.internal:11434",
)
OLLAMA_MODEL_LOCAL = os.environ.get(
"AGENT_MODEL_LOCAL", "qwen2.5:7b",
)
OVH_URL = os.environ.get("OVH_LLM_URL", "")
OVH_MODEL = os.environ.get("OVH_LLM_MODEL", "Meta-Llama-3.1-70B-Instruct")
OVH_API_KEY = os.environ.get("OVH_LLM_API_KEY", "")
OVH_TIMEOUT = float(os.environ.get("OVH_LLM_TIMEOUT", "60"))
LOCAL_TIMEOUT = float(os.environ.get("AGENT_LOCAL_TIMEOUT", "60"))
class EscalationResult:
"""Wrapper für ein Eskalations-Resultat."""
def __init__(
self,
content: str | None,
stage: SourceType,
model: str,
log: EscalationLog,
parsed: Any = None,
) -> None:
self.content = content
self.stage = stage
self.model = model
self.log = log
self.parsed = parsed
@property
def success(self) -> bool:
return bool(self.content)
async def call_local_llm(
system_prompt: str,
user_prompt: str,
expect_json: bool = True,
model: str | None = None,
timeout: float | None = None,
) -> EscalationResult:
"""Stage 1: lokales Ollama-Modell."""
mdl = model or OLLAMA_MODEL_LOCAL
body = {
"model": mdl,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"stream": False,
"options": {"temperature": 0.0, "seed": 42, "num_predict": 1200},
}
if expect_json:
body["format"] = "json"
t0 = time.time()
log = EscalationLog(
stage=SourceType.LLM_LOCAL,
model=mdl,
success=False,
)
try:
async with httpx.AsyncClient(timeout=timeout or LOCAL_TIMEOUT) as c:
r = await c.post(f"{OLLAMA_URL}/api/chat", json=body)
r.raise_for_status()
j = r.json()
content = (j.get("message") or {}).get("content", "") or ""
log.tokens_in = int(j.get("prompt_eval_count") or 0)
log.tokens_out = int(j.get("eval_count") or 0)
except Exception as e:
log.error = f"{type(e).__name__}: {str(e)[:120]}"
logger.warning("call_local_llm fail: %s", log.error)
log.duration_ms = int((time.time() - t0) * 1000)
return EscalationResult(None, SourceType.LLM_LOCAL, mdl, log)
log.duration_ms = int((time.time() - t0) * 1000)
parsed = None
if content and expect_json:
parsed = _try_parse_json(content)
if parsed is None:
log.error = "json_parse_fail"
content = None
if content:
log.success = True
return EscalationResult(content, SourceType.LLM_LOCAL, mdl, log, parsed)
async def call_ovh_llm(
system_prompt: str,
user_prompt: str,
expect_json: bool = True,
) -> EscalationResult:
"""Stage 2: OVH 120b (extern). Deaktiviert wenn OVH_URL leer."""
log = EscalationLog(
stage=SourceType.LLM_LOCAL_BIG,
model=OVH_MODEL,
success=False,
)
if not OVH_URL:
log.error = "ovh_disabled"
return EscalationResult(None, SourceType.LLM_LOCAL_BIG, OVH_MODEL, log)
body = {
"model": OVH_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.0,
"max_tokens": 1500,
}
if expect_json:
body["response_format"] = {"type": "json_object"}
headers = {"Content-Type": "application/json"}
if OVH_API_KEY:
headers["Authorization"] = f"Bearer {OVH_API_KEY}"
t0 = time.time()
try:
async with httpx.AsyncClient(timeout=OVH_TIMEOUT) as c:
r = await c.post(
f"{OVH_URL.rstrip('/')}/v1/chat/completions",
json=body,
headers=headers,
)
r.raise_for_status()
j = r.json()
content = (
((j.get("choices") or [{}])[0]
.get("message") or {})
.get("content", "")
) or ""
usage = j.get("usage") or {}
log.tokens_in = int(usage.get("prompt_tokens") or 0)
log.tokens_out = int(usage.get("completion_tokens") or 0)
except Exception as e:
log.error = f"{type(e).__name__}: {str(e)[:120]}"
logger.warning("call_ovh_llm fail: %s", log.error)
log.duration_ms = int((time.time() - t0) * 1000)
return EscalationResult(None, SourceType.LLM_LOCAL_BIG, OVH_MODEL, log)
log.duration_ms = int((time.time() - t0) * 1000)
parsed = None
if content and expect_json:
parsed = _try_parse_json(content)
if parsed is None:
log.error = "json_parse_fail"
content = None
if content:
log.success = True
return EscalationResult(content, SourceType.LLM_LOCAL_BIG, OVH_MODEL,
log, parsed)
def _try_parse_json(content: str) -> Any | None:
"""Robust JSON extract (handles fences, prose-wrap)."""
if not content:
return None
s = content.strip()
if s.startswith("```"):
s = s.strip("`")
if s.lower().startswith("json"):
s = s[4:]
s = s.strip()
first_o, last_o = s.find("{"), s.rfind("}")
first_a, last_a = s.find("["), s.rfind("]")
candidates = []
if first_o >= 0 and last_o > first_o:
candidates.append(s[first_o:last_o + 1])
if first_a >= 0 and last_a > first_a:
candidates.append(s[first_a:last_a + 1])
for c in candidates:
try:
return json.loads(c)
except Exception:
continue
return None
async def cascade(
system_prompt: str,
user_prompt: str,
expect_json: bool = True,
skip_ovh: bool = False,
) -> tuple[EscalationResult | None, list[EscalationLog]]:
"""Default-Kaskade: Local → OVH. Bei Local-Erfolg KEIN OVH-Call."""
logs: list[EscalationLog] = []
res = await call_local_llm(system_prompt, user_prompt,
expect_json=expect_json)
logs.append(res.log)
if res.success:
return res, logs
if skip_ovh or not OVH_URL:
return None, logs
res2 = await call_ovh_llm(system_prompt, user_prompt,
expect_json=expect_json)
logs.append(res2.log)
if res2.success:
return res2, logs
return None, logs
@@ -0,0 +1,254 @@
"""Evidence-Vault — zentraler Beweis-Bereich pro Agent-Run.
User-Vorgabe 2026-06-08: pro Agent-Run brauchen wir einen Bereich um
Playwright-Videos (mp4), Screenshots (png) und CSV-Exporte abzulegen,
damit DSB/Anwalt eine portable Beweiskette hat.
Layout:
artifacts/agent_runs/{run_id}/
manifest.json ← Index + SHA256 pro Asset
screenshots/
{slot}_{name}.png
videos/
{slot}_walk.mp4
csv/
cookies_{slot}.csv
findings/
{slot}_output.json ← AgentOutput
raw/
{slot}_doc.html ← Original-Source
Pro Run-ID: kann mehrere "Slots" enthalten (z.B. 5 URLs).
manifest.json:
{
"run_id": "...",
"agent_id": "impressum",
"agent_version": "2.0",
"started_at": "2026-06-08T...",
"assets": [
{"slot": "url1", "kind": "video", "path": "videos/url1_walk.mp4",
"sha256": "...", "size_bytes": 12345, "mime": "video/mp4"}
]
}
"""
from __future__ import annotations
import hashlib
import json
import logging
import mimetypes
import os
import shutil
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
def _vault_root() -> Path:
"""Resolved at call time so tests can monkeypatch the env var."""
return Path(os.environ.get(
"EVIDENCE_VAULT_ROOT", "/app/artifacts/agent_runs",
))
class EvidenceVault:
"""Pro Agent-Run eine Instanz. Schreibt alle Assets unter
VAULT_ROOT/{run_id}/."""
def __init__(
self,
agent_id: str,
agent_version: str,
run_id: str | None = None,
) -> None:
self.run_id = run_id or uuid.uuid4().hex[:16]
self.agent_id = agent_id
self.agent_version = agent_version
self.root = _vault_root() / self.run_id
self.root.mkdir(parents=True, exist_ok=True)
for sub in ("screenshots", "videos", "csv",
"findings", "raw"):
(self.root / sub).mkdir(exist_ok=True)
self.manifest_path = self.root / "manifest.json"
self._manifest: dict[str, Any] = self._load_or_init()
# Persist immediately so the file exists even before any
# put_*() call — important for ../runs-listing.
if not self.manifest_path.exists():
self._save_manifest()
def _load_or_init(self) -> dict[str, Any]:
if self.manifest_path.exists():
try:
return json.loads(self.manifest_path.read_text())
except Exception as e:
logger.warning("manifest load fail, reset: %s", e)
return {
"run_id": self.run_id,
"agent_id": self.agent_id,
"agent_version": self.agent_version,
"started_at": datetime.now(timezone.utc).isoformat(),
"assets": [],
}
def _save_manifest(self) -> None:
self.manifest_path.write_text(
json.dumps(self._manifest, indent=2, default=str),
)
def put_bytes(
self,
kind: str,
slot: str,
filename: str,
data: bytes,
mime: str | None = None,
) -> str:
"""Schreibt Bytes in einen Vault-Slot.
kind: 'screenshot' | 'video' | 'csv' | 'finding' | 'raw'
slot: identifier (z.B. URL-Index "url1")
Returns: relative path inside the vault.
"""
subdir = self._subdir_for(kind)
safe_name = self._safe_filename(filename)
rel = f"{subdir}/{slot}_{safe_name}"
target = self.root / rel
target.write_bytes(data)
sha = hashlib.sha256(data).hexdigest()
if not mime:
mime, _ = mimetypes.guess_type(safe_name)
mime = mime or "application/octet-stream"
self._manifest["assets"].append({
"slot": slot,
"kind": kind,
"path": rel,
"sha256": sha,
"size_bytes": len(data),
"mime": mime,
"stored_at": datetime.now(timezone.utc).isoformat(),
})
self._save_manifest()
return rel
def put_file(
self,
kind: str,
slot: str,
source_path: str,
target_filename: str | None = None,
) -> str:
"""Copy a file (e.g. Playwright video) into the vault."""
src = Path(source_path)
if not src.is_file():
raise FileNotFoundError(source_path)
target_filename = target_filename or src.name
data = src.read_bytes()
return self.put_bytes(
kind, slot, target_filename, data,
mime=mimetypes.guess_type(src.name)[0],
)
def put_json(
self,
kind: str,
slot: str,
filename: str,
payload: Any,
) -> str:
data = json.dumps(payload, indent=2, default=str).encode("utf-8")
return self.put_bytes(kind, slot, filename, data,
mime="application/json")
def assets_for_slot(self, slot: str) -> list[dict]:
return [a for a in self._manifest["assets"] if a["slot"] == slot]
def list_assets(self) -> list[dict]:
return list(self._manifest["assets"])
def asset_path(self, relative_path: str) -> Path | None:
"""Resolve a manifest-relative path to absolute. Returns None
if outside vault (defense against path traversal)."""
p = (self.root / relative_path).resolve()
try:
p.relative_to(self.root.resolve())
except ValueError:
return None
if not p.exists():
return None
return p
def finalize(self) -> dict[str, Any]:
"""Marks the run as finished and returns the manifest snapshot."""
self._manifest["finished_at"] = datetime.now(
timezone.utc).isoformat()
self._save_manifest()
return dict(self._manifest)
def url(self) -> str:
"""API-URL um die Assets zu listen (für Frontend)."""
return f"/api/v1/specialist-agent/run/{self.run_id}/artifacts"
@staticmethod
def _subdir_for(kind: str) -> str:
return {
"screenshot": "screenshots",
"video": "videos",
"csv": "csv",
"finding": "findings",
"raw": "raw",
}.get(kind, "raw")
@staticmethod
def _safe_filename(name: str) -> str:
# Strip directory prefixes first so "../../etc/passwd" → "passwd"
base = os.path.basename(name.replace("\\", "/"))
keep = "._-"
cleaned = "".join(
c if (c.isalnum() or c in keep) else "_" for c in base
)
# Collapse parent-dir markers that survived in unusual encodings
cleaned = cleaned.replace("..", "_")
return cleaned[:120] or "unnamed"
def open_vault(
agent_id: str,
agent_version: str,
run_id: str | None = None,
) -> EvidenceVault:
return EvidenceVault(agent_id, agent_version, run_id)
def list_runs(limit: int = 50) -> list[dict]:
"""Lists recent run-manifests."""
root = _vault_root()
if not root.exists():
return []
runs: list[tuple[float, dict]] = []
for p in root.iterdir():
if not p.is_dir():
continue
m = p / "manifest.json"
if not m.is_file():
continue
try:
data = json.loads(m.read_text())
runs.append((m.stat().st_mtime, data))
except Exception:
continue
runs.sort(key=lambda x: x[0], reverse=True)
return [r[1] for r in runs[:limit]]
def delete_run(run_id: str) -> bool:
"""Tombstone: löscht den ganzen Run-Ordner. Für DSGVO-DSR Art. 17."""
p = _vault_root() / run_id
if not p.exists():
return False
shutil.rmtree(p)
return True
@@ -0,0 +1,57 @@
"""AgentRegistry — zentrale Liste aller Specialist-Agenten + MC-Owner-Mapping.
Verhindert dass eine MC zweimal geprüft wird (oder gar nicht). Der
SSE-Test-Endpoint löst Agent-Namen über diese Registry auf.
"""
from __future__ import annotations
from ._base import BaseSpecialistAgent
class AgentRegistry:
"""Singleton-ähnliche Registry. Ein Agent registriert sich beim
Import, der SSE-Endpoint löst auf."""
def __init__(self) -> None:
self._agents: dict[str, BaseSpecialistAgent] = {}
# MC-ID → agent_id. Verhindert Doppel-Ownership.
self._mc_owner: dict[str, str] = {}
def register(self, agent: BaseSpecialistAgent) -> None:
if not agent.agent_id:
raise ValueError("agent.agent_id darf nicht leer sein")
if agent.agent_id in self._agents:
# Re-register erlaubt (z.B. Hot-Reload), aber überschreibt
self._agents[agent.agent_id] = agent
return
self._agents[agent.agent_id] = agent
for mc_id in agent.owned_mc_ids:
existing = self._mc_owner.get(mc_id)
if existing and existing != agent.agent_id:
raise ValueError(
f"MC {mc_id} bereits von Agent {existing} owned — "
f"jetzt versucht {agent.agent_id}. Konflikt auflösen."
)
self._mc_owner[mc_id] = agent.agent_id
def get(self, agent_id: str) -> BaseSpecialistAgent | None:
return self._agents.get(agent_id)
def list_agents(self) -> list[dict]:
"""Public listing für das Frontend (Agent-Wähler)."""
return [
{
"agent_id": a.agent_id,
"agent_version": a.agent_version,
"doc_type": a.doc_type,
"mc_count": len(a.owned_mc_ids),
}
for a in self._agents.values()
]
def owner_of(self, mc_id: str) -> str | None:
return self._mc_owner.get(mc_id)
REGISTRY = AgentRegistry()
@@ -0,0 +1,92 @@
"""Rollup-Dedup für Recommendations.
Wenn 3 Findings alle "AVV mit Anbieter X abschließen" sagen, werden sie
zu einer Recommendation gebündelt mit Liste der ursächlichen Findings.
Spart Kundenzeit beim Abarbeiten.
KEIN LLM — deterministische Normalisierung + Hashing.
"""
from __future__ import annotations
import re
from ._base import Finding, Recommendation, Severity, stable_recommendation_id
_SEVERITY_ORDER = {
Severity.HIGH: 3,
Severity.MEDIUM: 2,
Severity.LOW: 1,
Severity.INFO: 0,
}
def _canonical(text: str) -> str:
"""Normalisiert: lowercase, Whitespace zusammenfalten, Punkte raus.
Liefert eine kanonische Form für Dedup-Hashing."""
t = (text or "").lower().strip()
t = re.sub(r"\s+", " ", t)
t = re.sub(r"[.,;:!?]+$", "", t)
return t
def rollup(findings: list[Finding]) -> list[Recommendation]:
"""Bündelt actions zu Recommendations. Eine Recommendation enthält
alle Findings deren action denselben canonical_text hat."""
buckets: dict[str, list[Finding]] = {}
for f in findings:
action = f.action or ""
if not action:
continue
key = _canonical(action)
if not key:
continue
buckets.setdefault(key, []).append(f)
out: list[Recommendation] = []
for key, group in buckets.items():
max_sev_finding = max(
group, key=lambda x: _SEVERITY_ORDER.get(_sev(x), 0),
)
# title aus dem ersten Finding (am ehesten lesbar)
first = group[0]
body = first.action
# wenn mehrere distincte titles: liste sie
related_titles = sorted({f.title for f in group if f.title})
if len(related_titles) > 1:
body = (
body + "\n\nGilt für: "
+ "; ".join(related_titles[:8])
)
rid = stable_recommendation_id(key)
out.append(Recommendation(
recommendation_id=rid,
title=first.action[:120] or "(ohne Titel)",
body=body,
severity=_sev(max_sev_finding),
related_finding_ids=[f.check_id for f in group],
estimated_effort_hours=_estimate_effort(group),
))
return sorted(
out,
key=lambda r: _SEVERITY_ORDER.get(Severity(r.severity), 0),
reverse=True,
)
def _sev(f: Finding) -> Severity:
"""Helper: handle Enum vs str (because use_enum_values=True)."""
v = f.severity
if isinstance(v, Severity):
return v
try:
return Severity(str(v).upper())
except Exception:
return Severity.INFO
def _estimate_effort(group: list[Finding]) -> float:
"""Heuristik: pro Finding 0.5h Basis, +0.5h pro HIGH, max 8h."""
base = 0.5 * len(group)
extra = 0.5 * sum(1 for f in group if _sev(f) == Severity.HIGH)
return min(8.0, base + extra)
@@ -0,0 +1,8 @@
"""Cookie-Policy-Specialist-Agent v2.
Public Entry-Point: CookiePolicyAgent (inherits BaseSpecialistAgent).
"""
from .agent import CookiePolicyAgent
__all__ = ["CookiePolicyAgent"]
@@ -0,0 +1,347 @@
"""Cookie-Policy-Agent v2 — BaseSpecialistAgent.
Prüft den Cookie-Policy-DOKUMENT-Text (NICHT das Banner — das macht
der Cookie-Banner-Themen-Agent). Konsumiert optional context.cmp_vendors
für Konsistenz-Checks gegen die tatsächlich beobachtete Cookie-Liste.
Eskalations-Stufen:
1. MC (regex) — schnell, deterministisch
2. cookie_library_lookup gegen state.context.cmp_vendors (wenn vorhanden)
3. LLM (qwen2.5:7b) für strukturelle/semantische Lücken
4. OVH 120b als Fallback
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from .._base import (
AgentInput,
AgentOutput,
BaseSpecialistAgent,
EscalationLog,
EvidenceSource,
Finding,
McCoverage,
Severity,
SourceType,
lint_output,
)
from .._escalation import cascade
from .._rollup import rollup
from .mcs import MC_IDS, MCS
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus
TDDDG § 25 + DSGVO Art. 13 + EuGH Planet49 + BGH Cookie-II. Aufgabe:
eine Cookie-Richtlinie auf strukturelle und inhaltliche LÜCKEN prüfen,
die einer regex-basierten Vorprüfung entgangen sind.
WICHTIG:
- KEINE Bewertung "rechtssicher" / "garantiert" / "konform".
- Wenn unsicher: leeres Array zurückgeben statt zu halluzinieren.
- Wörtliches Zitat als evidence bei jeder Lücke.
Antworte NUR mit JSON, Schema:
{"findings": [
{"field_id": "...", "severity": "HIGH|MEDIUM|LOW",
"title": "...", "evidence": "wörtliches Zitat",
"action": "konkrete Empfehlung"}
]}
Typische Lücken-Kategorien:
- pseudo_purpose: "Siehe dazugehörige Datenverarbeitung" ohne konkrete Aussage
- duration_floskel: "solange erforderlich" ohne Zeitangabe
- vendor_unklar: "möglicherweise Drittanbieter" ohne Liste
- retention_inkonsistent: Tabelle nennt Tage, Fließtext nennt "session"
- drittland_fehlend: US-Vendor genannt (Google, Meta) aber Schrems-II
nicht thematisiert
- banner_reopen_fehlt: "Cookie-Einstellungen ändern" Link fehlt
"""
class CookiePolicyAgent(BaseSpecialistAgent):
agent_id = "cookie_policy"
agent_version = "1.0"
doc_type = "cookie"
owned_mc_ids = MC_IDS
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
start = datetime.now(timezone.utc)
text = (agent_input.text or "").strip()
coverage: list[McCoverage] = []
findings: list[Finding] = []
esc_logs: list[EscalationLog] = []
if len(text) < 100:
for mc in MCS:
coverage.append(McCoverage(
mc_id=mc.mc_id, status="skipped",
reason="cookie policy text too short or empty",
))
return self._finalize(
start, findings, esc_logs, coverage, confidence=0.0,
notes="Cookie-Policy-Text zu kurz oder leer.",
)
for mc in MCS:
matched = [p for p in mc.patterns if p.search(text)]
if mc.require_all:
ok = len(matched) == len(mc.patterns)
else:
ok = bool(matched)
if ok:
coverage.append(McCoverage(
mc_id=mc.mc_id, status="ok",
reason=f"{len(matched)}/{len(mc.patterns)} patterns hit",
))
continue
sev = self._sev(mc.severity_if_missing)
action = self._build_action(mc)
findings.append(Finding(
check_id=f"COOKIE-POLICY-AGENT-{mc.field_id.upper()}",
agent=self.agent_id,
agent_version=self.agent_version,
field_id=mc.field_id,
severity=sev,
severity_reason="missing",
title=f"Cookie-Policy-Lücke: '{mc.label}'",
norm=mc.norm,
action=action,
confidence=0.92,
sources=[EvidenceSource(
source_type=SourceType.MC,
source_id=mc.mc_id,
detail=f"0/{len(mc.patterns)} pattern hit",
)],
))
coverage.append(McCoverage(
mc_id=mc.mc_id,
status=sev.value.lower(),
reason="missing",
))
# KB-Layer: wenn cmp_vendors im Kontext, checke ob die Policy
# alle beobachteten Vendoren erwähnt
kb_findings = self._kb_layer(text, agent_input.context or {})
findings.extend(kb_findings)
# LLM-Eskalation für subtile Lücken (Pseudo-Zwecke, Floskeln)
llm_findings, llm_logs = await self._maybe_escalate(text)
esc_logs.extend(llm_logs)
seen = {f.field_id for f in findings if f.field_id}
for f in llm_findings:
if f.field_id and f.field_id in seen:
continue
findings.append(f)
confs = [f.confidence for f in findings if f.confidence] or [0.95]
overall = sum(confs) / len(confs)
return self._finalize(start, findings, esc_logs, coverage,
confidence=overall)
def _kb_layer(
self, text: str, context: dict,
) -> list[Finding]:
"""Wenn cmp_vendors gegeben: prüfe ob alle Vendoren in der Policy
erwähnt werden. Sonst Skip (keine Cross-Check ohne Datenbasis)."""
cmp_vendors = context.get("cmp_vendors") or []
if not cmp_vendors:
return []
text_lc = text.lower()
# Extrahiere Top-Vendor-Namen aus dem CMP
seen_names: set[str] = set()
for v in cmp_vendors:
if not isinstance(v, dict):
continue
name = (v.get("name") or v.get("vendor") or "").strip()
if name and len(name) > 2:
seen_names.add(name)
missing: list[str] = []
for n in sorted(seen_names):
if n.lower() not in text_lc:
missing.append(n)
if not missing:
return []
# Ein Sammel-Finding pro Lücke
sample = missing[:8]
return [Finding(
check_id="COOKIE-POLICY-AGENT-CMP-VS-POLICY",
agent=self.agent_id,
agent_version=self.agent_version,
field_id="vendor_consistency",
severity=Severity.MEDIUM,
severity_reason="cmp_observed_vendors_not_in_policy",
title=(
f"{len(missing)} im CMP beobachtete Vendor(en) "
"fehlen in der Cookie-Policy"
),
norm="DSGVO Art. 13 Abs. 1 lit. e (Empfänger vollständig nennen)",
evidence=f"Fehlend: {', '.join(sample)}"
+ ("" if len(missing) > 8 else ""),
action=(
"Die im Cookie-Consent-Banner beobachteten Vendoren "
"(Tracker/Werbenetzwerke) müssen vollständig in der "
"Cookie-Richtlinie aufgelistet sein."
),
confidence=0.88,
sources=[EvidenceSource(
source_type=SourceType.MC,
source_id="CMP-CROSS-CHECK",
detail=f"{len(missing)} missing of {len(seen_names)}",
)],
)]
async def _maybe_escalate(
self, text: str,
) -> tuple[list[Finding], list[EscalationLog]]:
user_prompt = (
f"COOKIE-POLICY-TEXT:\n{text[:4500]}\n\n"
"Liste subtile Lücken nach TDDDG § 25 + DSGVO Art. 13. "
"Nur JSON."
)
res, logs = await cascade(_SYSTEM_PROMPT, user_prompt)
if res is None or not isinstance(res.parsed, (dict, list)):
return [], logs
raw = (res.parsed.get("findings")
if isinstance(res.parsed, dict) else res.parsed)
if not isinstance(raw, list):
return [], logs
out: list[Finding] = []
for item in raw:
if not isinstance(item, dict):
continue
fid = str(item.get("field_id") or "unknown")[:40]
sev_raw = str(item.get("severity") or "MEDIUM").upper()
sev = self._sev(sev_raw)
out.append(Finding(
check_id=f"COOKIE-POLICY-AGENT-LLM-{fid.upper()}",
agent=self.agent_id,
agent_version=self.agent_version,
field_id=fid,
severity=sev,
severity_reason="llm_detected",
title=str(item.get("title") or "")[:200],
norm="TDDDG § 25 + DSGVO Art. 13 (LLM-Analyse)",
evidence=str(item.get("evidence") or "")[:300],
action=str(item.get("action") or "")[:400],
confidence=0.7,
sources=[EvidenceSource(
source_type=res.stage,
source_id=res.model,
detail=f"prompt_chars={len(user_prompt)}",
confidence=0.7,
)],
))
return out, logs
def _finalize(
self,
start: datetime,
findings: list[Finding],
esc_logs: list[EscalationLog],
coverage: list[McCoverage],
confidence: float,
notes: str = "",
) -> AgentOutput:
end = datetime.now(timezone.utc)
recs = rollup(findings)
out = AgentOutput(
agent=self.agent_id,
agent_version=self.agent_version,
started_at=start,
finished_at=end,
duration_ms=int((end - start).total_seconds() * 1000),
findings=findings,
recommendations=recs,
mc_coverage=coverage,
escalation_log=esc_logs,
confidence=confidence,
notes=notes,
mc_total=len(coverage),
mc_ok=sum(1 for c in coverage if c.status == "ok"),
mc_na=sum(1 for c in coverage if c.status == "na"),
mc_high=sum(1 for c in coverage if c.status == "high"),
mc_medium=sum(1 for c in coverage if c.status == "medium"),
mc_low=sum(1 for c in coverage if c.status == "low"),
)
return lint_output(out)
@staticmethod
def _sev(value: str) -> Severity:
v = (value or "").upper()
if v == "HIGH":
return Severity.HIGH
if v == "MEDIUM":
return Severity.MEDIUM
if v == "LOW":
return Severity.LOW
return Severity.INFO
@staticmethod
def _build_action(mc) -> str:
suggestions = {
"categories_named": (
"Die Cookie-Richtlinie sollte die Kategorien essentiell, "
"funktional, analytics und marketing klar benennen und "
"abgrenzen."
),
"purpose_described": (
"Pro Cookie-Kategorie den Verarbeitungszweck konkret "
"benennen (keine Pauschal-Formulierungen wie "
"'verschiedene Zwecke')."
),
"retention_duration": (
"Speicherdauer pro Cookie konkret angeben "
"(z.B. 'Session', '30 Tage', '2 Jahre') statt "
"'solange erforderlich'."
),
"vendor_recipients": (
"Alle Empfänger / Drittanbieter namentlich auflisten "
"(z.B. Google LLC, Meta Platforms Inc., …) inkl. Sitz."
),
"opt_out_mechanism": (
"Konkreten Opt-Out-Weg beschreiben: Banner-Reopen-Link, "
"Browser-Einstellungen, Vendor-spezifische Opt-Out-URLs."
),
"banner_reopen": (
"Sichtbaren Link 'Cookie-Einstellungen ändern' in die "
"Policy aufnehmen, der den CMP-Banner wieder öffnet."
),
"version_date": (
"Stand der Cookie-Richtlinie sichtbar angeben "
"(z.B. 'Stand: 1. Juni 2026')."
),
"third_country_transfer": (
"Bei Drittland-Transfer (USA u.a.) Hinweis auf "
"Schrems-II-Risiko + verwendete Schutzmaßnahmen "
"(SCC, DPF) ergänzen."
),
"legal_basis": (
"Rechtsgrundlage pro Kategorie benennen: § 25 Abs. 1 "
"TDDDG (Einwilligung) bzw. § 25 Abs. 2 TDDDG "
"(unbedingt erforderlich)."
),
"cookie_table_or_list": (
"Detail-Tabelle mit Cookie-Namen, Vendor, Zweck und "
"Laufzeit pro Cookie ergänzen (DSK-Best-Practice)."
),
"dpo_contact": (
"Kontaktmöglichkeit zum DSB oder Datenschutz-Team "
"in der Cookie-Richtlinie nennen (z.B. "
"datenschutz@<domain>)."
),
"browser_settings_hint": (
"Hinweis auf Browser-Einstellungen zum Blockieren/"
"Löschen von Cookies (Chrome, Firefox, Safari, Edge) "
"ergänzen."
),
}
return suggestions.get(mc.field_id, (
f"{mc.label} in der Cookie-Richtlinie ergänzen "
f"({mc.norm})."
))
@@ -0,0 +1,225 @@
"""Cookie-Policy-Agent Machine-Checks.
Pflichten an eine Cookie-Richtlinie nach TDDDG + DSGVO Art. 13 +
EuGH Planet49 + BGH Cookie-II:
- Cookie-Kategorien benannt (essential / functional / analytics /
marketing)
- Pro Cookie: Name, Zweck, Speicherdauer, Empfänger/Vendor, Drittland
- Opt-Out-Mechanik beschrieben + Banner-Reopen-Möglichkeit verlinkt
- Versionsdatum / Stand
- Drittland-Hinweise bei US-Transfer (Schrems II)
- Verweis auf Browser-Einstellungen (Best-Practice)
- Kontakt für Datenschutz (DSB oder verantwortliche Stelle)
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Pattern
@dataclass(frozen=True)
class MC:
mc_id: str
field_id: str
label: str
norm: str
patterns: tuple[Pattern[str], ...] = field(default_factory=tuple)
severity_if_missing: str = "MEDIUM"
# Eines der Patterns reicht (any) — default. False = ALLE müssen matchen
require_all: bool = False
_CATEGORIES = (
r"essenti(?:ell|al)|notwendig|technisch\s+notwendig|unbedingt\s+erforderlich",
r"funktional|funktionelle?|preference|komfort",
r"analyti(?:sch|cs)|statisti(?:k|sch)|leistung|performance",
r"marketing|werbung|advertising|tracking|targeting",
)
MCS: tuple[MC, ...] = (
MC(
mc_id="CP-MC-001",
field_id="categories_named",
label="Cookie-Kategorien benannt (essential/functional/analytics/marketing)",
norm="DSGVO Art. 13 + TDDDG § 25 (Transparenzpflicht)",
severity_if_missing="HIGH",
require_all=False,
patterns=tuple(re.compile(p, re.IGNORECASE) for p in _CATEGORIES),
),
MC(
mc_id="CP-MC-002",
field_id="purpose_described",
label="Zweck pro Kategorie beschrieben",
norm="DSGVO Art. 13 Abs. 1 lit. c (Zweck der Verarbeitung)",
severity_if_missing="HIGH",
patterns=(
re.compile(r"\bzweck(?:e)?\b", re.IGNORECASE),
re.compile(r"verwendet\s+wir", re.IGNORECASE),
re.compile(r"dient\s+(?:zur?|dem|der|den)", re.IGNORECASE),
re.compile(r"erm(?:ö|oe)glich(?:t|en)", re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-003",
field_id="retention_duration",
label="Speicherdauer pro Cookie angegeben",
norm="DSGVO Art. 13 Abs. 2 lit. a (Speicherdauer)",
severity_if_missing="HIGH",
patterns=(
re.compile(r"\b(?:laufzeit|speicherdauer|dauer|gültig)",
re.IGNORECASE),
re.compile(r"\b\d+\s*(?:tag|tage|monat|jahr|stunden|min)",
re.IGNORECASE),
re.compile(r"session(?:-?cookie)?", re.IGNORECASE),
re.compile(r"persistent(?:e|er)?", re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-004",
field_id="vendor_recipients",
label="Empfänger/Vendoren genannt (Art. 13 Abs. 1 lit. e)",
norm="DSGVO Art. 13 Abs. 1 lit. e (Empfänger)",
severity_if_missing="HIGH",
patterns=(
re.compile(r"empf(?:ä|ae)nger|recipient", re.IGNORECASE),
re.compile(r"dritt(?:e|er)\s+anbieter|drittanbieter|"
r"third[\-\s]?party", re.IGNORECASE),
re.compile(r"\b(?:Google|Meta|Facebook|Microsoft|Hotjar|"
r"LinkedIn|Adobe|TikTok|Cloudflare|Akamai|"
r"Salesforce|HubSpot|Pinterest|Twitter|X\.com)\b",
re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-005",
field_id="opt_out_mechanism",
label="Opt-Out-Mechanik dokumentiert",
norm="DSGVO Art. 7 Abs. 3 (Widerruf der Einwilligung)",
severity_if_missing="HIGH",
patterns=(
re.compile(r"opt[\-\s]?out|widerruf|widersprechen|"
r"abw(?:ä|ae)hlen|deaktivier",
re.IGNORECASE),
re.compile(r"einstellungen\s+(?:[\wäöüÄÖÜ]+\s+)?"
r"(?:ä|ae)ndern|"
r"einwilligung\s+widerrufen|"
r"cookie[\-\s]?einstellungen?\b",
re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-006",
field_id="banner_reopen",
label="Banner-Reopen / Cookie-Einstellungen-Link",
norm="TDDDG § 25 (Symmetrie Annehmen/Ablehnen — Reopen Pflicht)",
severity_if_missing="MEDIUM",
patterns=(
re.compile(r"cookie[\-\s]?einstellung(?:en)?\s+(?:ä|ae)ndern|"
r"cookie[\-\s]?einstellungen\s+(?:ö|oe)ffnen|"
r"einstellungen\s+anpassen|"
r"consent(?:[\-\s]?banner)?\s+(?:erneut|"
r"wieder)?\s*(?:ö|oe)ffnen|"
r"einwilligung\s+verwalten|"
r"cookie[\-\s]?pr(?:ä|ae)ferenzen",
re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-007",
field_id="version_date",
label="Versionsdatum / Stand angegeben",
norm="DSGVO Rechenschaftspflicht (Art. 5 Abs. 2)",
severity_if_missing="MEDIUM",
patterns=(
re.compile(r"stand(?:\s+vom)?\s*[:\-]?\s*\d{1,2}",
re.IGNORECASE),
re.compile(r"letzte\s+(?:aktualisierung|(?:ä|ae)nderung)",
re.IGNORECASE),
re.compile(r"version\s*[:\-]?\s*\d", re.IGNORECASE),
re.compile(r"g(?:ü|ue)ltig\s+ab\s+\d{1,2}", re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-008",
field_id="third_country_transfer",
label="Drittland-Hinweis (Schrems-II / US-Transfer)",
norm="DSGVO Art. 44 ff. + EuGH Schrems II",
severity_if_missing="MEDIUM",
patterns=(
re.compile(r"dritt(?:land|staat)|USA?|vereinigte\s+staaten|"
r"international(?:e[rn]?)?\s+(?:transfer|"
r"daten(?:ü|ue)bermittlung)|"
r"(?:standardvertragsklauseln|SCC|"
r"angemessenheitsbeschluss|"
r"adequacy\s+decision|DPF|"
r"Data\s+Privacy\s+Framework)",
re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-009",
field_id="legal_basis",
label="Rechtsgrundlage benannt (Art. 6 / § 25 TDDDG)",
norm="DSGVO Art. 13 Abs. 1 lit. c + TDDDG § 25",
severity_if_missing="MEDIUM",
patterns=(
re.compile(r"art\.?\s*6\s+(?:abs\.?\s*1\s+)?lit\.?\s*[a-f]",
re.IGNORECASE),
re.compile(r"§\s*25\s+TDDDG", re.IGNORECASE),
re.compile(r"rechtsgrundlage|einwilligung\s+(?:ist|liegt)|"
r"berechtigtes\s+interesse",
re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-010",
field_id="cookie_table_or_list",
label="Cookie-Tabelle / Liste pro Cookie",
norm="Best-Practice DSK + EDPB Guidelines 03/2022",
severity_if_missing="MEDIUM",
patterns=(
# Konkrete Cookie-Namen oder typische Hosts
re.compile(r"\b(?:_ga|_gid|_fbp|_fbc|_pk_id|"
r"OptanonConsent|JSESSIONID|cf_clearance|"
r"PHPSESSID|wp-settings|wordpress_logged_in)",
re.IGNORECASE),
re.compile(r"<table|<tr|<td", re.IGNORECASE),
re.compile(r"\|.*?\|.*?\|", re.IGNORECASE), # markdown-Tabelle
),
),
MC(
mc_id="CP-MC-011",
field_id="dpo_contact",
label="Kontakt zum Datenschutzbeauftragten (DSB)",
norm="DSGVO Art. 13 Abs. 1 lit. b (Kontaktdaten DSB)",
severity_if_missing="MEDIUM",
patterns=(
re.compile(r"datenschutzbeauftragt|DSB|DPO",
re.IGNORECASE),
re.compile(r"datenschutz[\-\s]?(?:team|kontakt|"
r"anfrag(?:e|en))",
re.IGNORECASE),
re.compile(r"\bdatenschutz@", re.IGNORECASE),
),
),
MC(
mc_id="CP-MC-012",
field_id="browser_settings_hint",
label="Verweis auf Browser-Einstellungen (Cookies blockieren)",
norm="Best-Practice — informierter Nutzer",
severity_if_missing="LOW",
patterns=(
re.compile(r"browser[\-\s]?einstellung", re.IGNORECASE),
re.compile(r"chrome|firefox|safari|edge", re.IGNORECASE),
re.compile(r"cookies?\s+(?:l(?:ö|oe)schen|deaktivieren|"
r"blockieren)",
re.IGNORECASE),
),
),
)
MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS)
@@ -0,0 +1,8 @@
"""Impressum-Specialist-Agent v2 — konsolidiert Pattern (v1) + LLM (v2-mvp).
Public Entry-Point: ImpressumAgent (inherits BaseSpecialistAgent).
"""
from .agent import ImpressumAgent
__all__ = ["ImpressumAgent"]
@@ -0,0 +1,251 @@
"""Impressum-Agent v2 — konsolidierter BaseSpecialistAgent.
Ablauf:
1. Deterministische MCs durchlaufen → Findings + mc_coverage.
2. Wenn unklare Felder (HIGH/MEDIUM missing) → LLM-Eskalation.
3. LLM-Findings dedupen mit MC-Findings nach field_id.
4. Rollup → Recommendations.
5. Disclaimer-Lint → AgentOutput.
Phase 1 (jetzt): qwen2.5:7b als Stage-1, OVH optional als Stage-2.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from .._base import (
AgentInput,
AgentOutput,
BaseSpecialistAgent,
EscalationLog,
EvidenceSource,
Finding,
McCoverage,
Severity,
SourceType,
lint_output,
)
from .._escalation import cascade
from .._rollup import rollup
from .mcs import MC_IDS, MCS, detect_automotive, scope_matches
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus
§ 5 TMG / DDG (Anbieterkennzeichnung). Aufgabe: Impressum prüfen und
LÜCKEN aufzählen, die einer regex-basierten Vorprüfung entgangen sind.
WICHTIG:
- KEINE Bewertung "rechtssicher" / "garantiert" / "konform".
- Wenn unsicher: leeres Array zurückgeben statt zu halluzinieren.
- Wörtliches Zitat als evidence bei jeder Lücke.
Antworte NUR mit JSON, Schema:
{"findings": [
{"field_id": "...", "severity": "HIGH|MEDIUM|LOW",
"title": "...", "evidence": "wörtliches Zitat",
"action": "konkrete Empfehlung"}
]}
"""
class ImpressumAgent(BaseSpecialistAgent):
agent_id = "impressum"
agent_version = "2.0"
doc_type = "impressum"
owned_mc_ids = MC_IDS
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
start = datetime.now(timezone.utc)
text = (agent_input.text or "").strip()
scope = set(agent_input.business_scope or [])
# Auto-detect KFZ
is_automotive = detect_automotive(text)
if is_automotive:
scope.add("automotive")
mc_findings: list[Finding] = []
coverage: list[McCoverage] = []
if len(text) < 50:
# Doc zu kurz → alle als skipped
for mc in MCS:
coverage.append(McCoverage(
mc_id=mc.mc_id, status="skipped",
reason="doc too short or empty",
))
return self._finalize(
start, mc_findings, [], coverage, confidence=0.0,
notes="Impressum-Text zu kurz oder leer.",
)
for mc in MCS:
if not scope_matches(mc, scope, is_automotive):
coverage.append(McCoverage(
mc_id=mc.mc_id, status="na",
reason=f"scope mismatch (needs {mc.requires_scope})",
))
continue
found = any(p.search(text) for p in mc.patterns)
if found:
coverage.append(McCoverage(
mc_id=mc.mc_id, status="ok",
))
continue
# Missing → Finding
sev = self._sev(mc.severity_if_missing)
action = self._build_action(mc, is_automotive)
mc_findings.append(Finding(
check_id=f"IMPRESSUM-AGENT-{mc.field_id.upper()}",
agent=self.agent_id,
agent_version=self.agent_version,
field_id=mc.field_id,
severity=sev,
severity_reason="missing",
title=f"Pflichtangabe '{mc.label}' fehlt im Impressum",
norm=mc.norm,
evidence="",
action=action,
confidence=0.95,
sources=[EvidenceSource(
source_type=SourceType.MC,
source_id=mc.mc_id,
detail=f"regex check {len(mc.patterns)} pattern(s) negative",
)],
))
coverage.append(McCoverage(
mc_id=mc.mc_id,
status=sev.value.lower(),
reason="missing",
))
# Eskalation: für die identifizierten Lücken kann ein LLM
# zusätzliche Tiefen-Findings liefern (z.B. "Geschäftsführer
# genannt, aber ohne Nachname"). Confidence der MC-Findings
# ist hoch — eskalieren wir wegen "weitere subtile Lücken
# finden", nicht weil wir unsicher sind.
esc_findings, esc_logs = await self._maybe_escalate(text, scope)
# Dedup per field_id (MC hat Priorität)
seen_fields = {f.field_id for f in mc_findings if f.field_id}
for f in esc_findings:
if f.field_id and f.field_id in seen_fields:
continue
mc_findings.append(f)
# Confidence: harmonisches Mittel über alle Finding-Confidences
if mc_findings:
confs = [f.confidence for f in mc_findings if f.confidence]
overall = sum(confs) / len(confs) if confs else 0.8
else:
overall = 0.95 # nichts gefunden → alle MCs ok
return self._finalize(
start, mc_findings, esc_logs, coverage, confidence=overall,
)
async def _maybe_escalate(
self, text: str, scope: set[str],
) -> tuple[list[Finding], list[EscalationLog]]:
"""LLM-Stage. Aktivierbar via Agent-Settings (default an)."""
# Hard cap auf Text-Größe für den LLM-Pass
user_prompt = (
f"BUSINESS-SCOPE: {', '.join(sorted(scope))}\n\n"
f"IMPRESSUM-TEXT:\n{text[:4000]}\n\n"
"Liste subtile Lücken nach § 5 TMG. Nur JSON."
)
res, logs = await cascade(_SYSTEM_PROMPT, user_prompt)
if res is None or not isinstance(res.parsed, (dict, list)):
return [], logs
raw = (res.parsed.get("findings")
if isinstance(res.parsed, dict) else res.parsed)
if not isinstance(raw, list):
return [], logs
out: list[Finding] = []
for item in raw:
if not isinstance(item, dict):
continue
fid = str(item.get("field_id") or "unknown")[:40]
sev_raw = str(item.get("severity") or "MEDIUM").upper()
sev = self._sev(sev_raw)
out.append(Finding(
check_id=f"IMPRESSUM-AGENT-LLM-{fid.upper()}",
agent=self.agent_id,
agent_version=self.agent_version,
field_id=fid,
severity=sev,
severity_reason="llm_detected",
title=str(item.get("title") or "")[:200],
norm="§ 5 TMG / DDG (LLM-Analyse)",
evidence=str(item.get("evidence") or "")[:300],
action=str(item.get("action") or "")[:400],
confidence=0.7,
sources=[EvidenceSource(
source_type=res.stage,
source_id=res.model,
detail=f"prompt_chars={len(user_prompt)}",
confidence=0.7,
)],
))
return out, logs
def _finalize(
self,
start: datetime,
findings: list[Finding],
esc_logs: list[EscalationLog],
coverage: list[McCoverage],
confidence: float,
notes: str = "",
) -> AgentOutput:
end = datetime.now(timezone.utc)
recs = rollup(findings)
out = AgentOutput(
agent=self.agent_id,
agent_version=self.agent_version,
started_at=start,
finished_at=end,
duration_ms=int((end - start).total_seconds() * 1000),
findings=findings,
recommendations=recs,
mc_coverage=coverage,
escalation_log=esc_logs,
confidence=confidence,
notes=notes,
mc_total=len(coverage),
mc_ok=sum(1 for c in coverage if c.status == "ok"),
mc_na=sum(1 for c in coverage if c.status == "na"),
mc_high=sum(1 for c in coverage if c.status == "high"),
mc_medium=sum(1 for c in coverage if c.status == "medium"),
mc_low=sum(1 for c in coverage if c.status == "low"),
)
return lint_output(out)
@staticmethod
def _sev(value: str) -> Severity:
v = (value or "").upper()
if v == "HIGH":
return Severity.HIGH
if v == "MEDIUM":
return Severity.MEDIUM
if v == "LOW":
return Severity.LOW
return Severity.INFO
@staticmethod
def _build_action(mc, is_automotive: bool) -> str:
if mc.field_id == "aufsichtsbehoerde" and is_automotive:
return (
"Aufsichtsbehörde im Impressum benennen. Für "
"KFZ-Hersteller/-Vertrieb typisch: Kraftfahrt-"
"Bundesamt (KBA), Fördestraße 16, 24944 Flensburg, "
"www.kba.de. Bei Ladestrom-Vertrieb zusätzlich "
"Bundesnetzagentur (BNetzA)."
)
return (
f"{mc.label} im Impressum ergänzen "
f"(Pflichtangabe nach {mc.norm})."
)
@@ -0,0 +1,234 @@
"""Machine-Check-Definitionen für den Impressum-Agent.
Eine MC = ein abgegrenzter, deterministischer Check über das
Impressum-Dokument. Owner = impressum-agent.
Quelle: § 5 TMG / DDG + § 18 MStV + § 36 VSBG + Art. 14 EU-VO 524/2013.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Pattern
@dataclass(frozen=True)
class MC:
"""Eine Machine-Check-Definition."""
mc_id: str # IMP-MC-001 ...
field_id: str # name_anbieter, handelsregister, ...
label: str
norm: str
patterns: tuple[Pattern[str], ...] = field(default_factory=tuple)
severity_if_missing: str = "MEDIUM" # HIGH | MEDIUM | LOW | INFO
requires_scope: tuple[str, ...] = field(default_factory=tuple)
# Wenn True: bei Scope-Mismatch nicht-applicable melden, sonst skip
explicit_na: bool = True
MCS: tuple[MC, ...] = (
MC(
mc_id="IMP-MC-001",
field_id="name_anbieter",
label="Name + Anschrift des Anbieters",
norm="§ 5 Abs. 1 Nr. 1 TMG",
severity_if_missing="HIGH",
patterns=(
re.compile(
r"\b(?:Anbieter|Diensteanbieter|"
r"Verantwortlich(?:er Anbieter)?)\s*[:.\s]",
re.IGNORECASE,
),
# Label-free fallback: Firma (Rechtsform) + Adresse
re.compile(
r"\b[A-ZÄÖÜ][\w\-\& ]{1,80}?\s+"
r"(?:GmbH|AG|UG|KG|SE|GbR|OHG|Limited|Ltd|LLC)\s*"
r"[\s\S]{0,400}?"
r"\b\d{5}\s+[A-ZÄÖÜ]",
re.IGNORECASE,
),
),
),
MC(
mc_id="IMP-MC-002",
field_id="kontakt_email",
label="Email-Adresse",
norm="§ 5 Abs. 1 Nr. 2 TMG",
severity_if_missing="HIGH",
patterns=(re.compile(r"\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b",
re.IGNORECASE),),
),
MC(
mc_id="IMP-MC-003",
field_id="kontakt_telefon",
label="Telefon",
norm="§ 5 Abs. 1 Nr. 2 TMG",
severity_if_missing="MEDIUM",
patterns=(re.compile(
r"(?:Tel(?:efon)?|Phone)\.?\s*[:.\s]\s*[\+\d][\d\s/\-()]{5,}",
re.IGNORECASE,
),),
),
MC(
mc_id="IMP-MC-004",
field_id="handelsregister",
label="Handelsregister-Eintrag",
norm="§ 5 Abs. 1 Nr. 4 TMG",
severity_if_missing="HIGH",
patterns=(
re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE),
re.compile(r"Handelsregister", re.IGNORECASE),
),
),
MC(
mc_id="IMP-MC-005",
field_id="ust_id",
label="USt-IdNr",
norm="§ 5 Abs. 1 Nr. 6 TMG",
severity_if_missing="MEDIUM",
patterns=(
re.compile(
r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]",
re.IGNORECASE,
),
re.compile(r"\bDE\d{9}\b"),
),
),
MC(
mc_id="IMP-MC-006",
field_id="vertretungsberechtigte",
label="Vertretungsberechtigte Person",
norm="§ 5 Abs. 1 Nr. 1 TMG (juristische Personen)",
severity_if_missing="HIGH",
patterns=(
re.compile(
r"(?:Gesch(?:ae|ä)ftsf(?:ue|ü)hrer|"
r"Vertretungsberechtigt|vertreten\s+durch)"
r"\s*[:.\s]",
re.IGNORECASE,
),
re.compile(r"\bManagement\s*[:.\s]\s*[A-ZÄÖÜ]",
re.IGNORECASE),
re.compile(r"\bDirector(?:s|en)?\s*[:.\s]\s*[A-ZÄÖÜ]",
re.IGNORECASE),
),
),
MC(
mc_id="IMP-MC-007",
field_id="vertretungsberechtigte_label_korrekt",
label="Deutsches Label 'Geschäftsführer' statt 'Management'",
norm="§ 5 Abs. 1 Nr. 1 TMG (Deutsch-Pflicht, gerichtsfest)",
severity_if_missing="MEDIUM",
patterns=(re.compile(
r"(?:Gesch(?:ae|ä)ftsf(?:ue|ü)hrer|"
r"Vorstand|"
r"Vertretungsberechtigt|vertreten\s+durch)"
r"\s*[:.\s]",
re.IGNORECASE,
),),
),
MC(
mc_id="IMP-MC-008",
field_id="aufsichtsbehoerde",
label="Aufsichtsbehörde (regulierte Branchen)",
norm="§ 5 Abs. 1 Nr. 3 TMG (Branchen-bedingt)",
severity_if_missing="LOW",
requires_scope=("regulated_profession", "financial_services",
"insurance", "automotive"),
patterns=(
re.compile(r"Aufsichtsbeh(?:ö|oe)rde\s*[:.\s]",
re.IGNORECASE),
re.compile(
r"\bBAFin\b|\bBNetzA\b|\bLKA\b|\bKBA\b|"
r"Kraftfahrt-?Bundesamt|Bundesnetzagentur|"
r"Bundesanstalt\s+f(?:ü|ue)r",
re.IGNORECASE,
),
),
),
MC(
mc_id="IMP-MC-009",
field_id="verantwortlicher_redaktion",
label="Verantwortlicher § 18 MStV (journalistisch-redaktionell)",
norm="§ 18 MStV (bei Blog/News/Magazin/Newsroom Pflicht)",
severity_if_missing="MEDIUM",
requires_scope=("editorial",),
patterns=(re.compile(
r"(?:Verantwortlich(?:er|e)?\s+(?:f(?:ue|ü)r|i\.S\.d\.|"
r"nach|gem(?:ae|ä)ß)\s+§\s*18|"
r"V\.i\.S\.d\.\s*§?\s*18|"
r"redaktionell\s+Verantwortlich)",
re.IGNORECASE,
),),
),
MC(
mc_id="IMP-MC-010",
field_id="verbraucher_streitbeilegung",
label="Verbraucher-Streitbeilegung-Hinweis",
norm="§ 36 VSBG (B2C-Anbieter Pflicht)",
severity_if_missing="MEDIUM",
requires_scope=("ecommerce", "b2c"),
patterns=(re.compile(
r"(?:Verbraucherschlichtungs|VSBG|"
r"Streitbeilegung|"
r"Schlichtungsstelle|"
r"alternative\s+Streit(?:beilegung|schlichtung))",
re.IGNORECASE,
),),
),
MC(
mc_id="IMP-MC-011",
field_id="berufsangaben",
label="Berufsbezeichnung + Berufsrechtliche Angaben",
norm="§ 5 Abs. 1 Nr. 5 TMG (Kammerberufe)",
severity_if_missing="LOW",
requires_scope=("regulated_profession",),
patterns=(re.compile(
r"Berufsbezeichnung|Berufsordnung|Kammer", re.IGNORECASE,
),),
),
MC(
mc_id="IMP-MC-012",
field_id="odr_link",
label="OS-Link auf EU-Plattform",
norm="Art. 14 EU-VO 524/2013 (B2C-Onlineshops)",
severity_if_missing="MEDIUM",
requires_scope=("ecommerce",),
patterns=(re.compile(r"ec\.europa\.eu/consumers/odr",
re.IGNORECASE),),
),
)
# Public list of all MC-IDs for the Registry
MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS)
def scope_matches(mc: MC, scope: set[str], is_automotive: bool) -> bool:
"""Entscheidet ob die MC auf den Business-Scope anwendbar ist."""
if not mc.requires_scope:
return True
if mc.field_id == "aufsichtsbehoerde" and is_automotive:
return True
return any(s in scope for s in mc.requires_scope)
def detect_automotive(text: str) -> bool:
"""KFZ-Hersteller/-Vertrieb → triggert KBA-Hint."""
if re.search(
r"\b(?:KFZ|Fahrzeug(?:e|herstellung|verkauf)?|Automobil|"
r"E-Auto|Elektroauto|Auto-?Konfigurator|"
r"Elektrofahrzeug|Hybrid-?Fahrzeug)\b",
text, re.IGNORECASE,
):
return True
return bool(re.search(
r"\b(?:Tesla|BMW|Mercedes-?Benz|Audi|Volkswagen|Porsche|"
r"Volvo|Stellantis|Skoda|Seat|Cupra|MINI|Smart|"
r"Opel|Ford\s+Deutschland|Hyundai|Kia|Toyota|Mazda|"
r"Nissan|Honda|Subaru|Lexus|Polestar|NIO|BYD|Rivian|"
r"Lucid)\s+(?:Germany|Deutschland|Group|Holding|AG|"
r"GmbH|S(?:E|\.A\.))\b",
text, re.IGNORECASE,
))
@@ -0,0 +1,143 @@
"""Tests für Cookie-Policy-Agent."""
from __future__ import annotations
import asyncio
import pytest
from compliance.services.specialist_agents import (
REGISTRY,
AgentInput,
CookiePolicyAgent,
Severity,
)
FULL_POLICY = """Cookie-Richtlinie
Stand: 1. Juni 2026
Wir verwenden auf unserer Website verschiedene Cookies. Diese werden
in folgende Kategorien eingeteilt:
1. Essentielle Cookies (unbedingt erforderlich)
Zweck: Diese Cookies dienen der grundlegenden Funktion der Website.
Rechtsgrundlage: § 25 Abs. 2 TDDDG
Laufzeit: Session
2. Funktionale Cookies
Zweck: Speichern Ihre Präferenzen wie Sprache und Region.
Rechtsgrundlage: Art. 6 Abs. 1 lit. a DSGVO
Laufzeit: 30 Tage
3. Analytics-Cookies (Performance)
Drittanbieter: Google LLC, USA
Zweck: Nutzungsstatistiken erheben.
Laufzeit: 24 Monate
Cookies: _ga, _gid
Drittland: USA — Standardvertragsklauseln + Data Privacy Framework
4. Marketing-Cookies (Tracking)
Drittanbieter: Meta Platforms Inc., USA
Cookies: _fbp, _fbc
Laufzeit: 90 Tage
Sie können Ihre Cookie-Einstellungen jederzeit ändern über den Link
unten oder das Banner erneut öffnen.
Browser-Einstellungen: Auch in Chrome, Firefox, Safari und Edge
können Sie Cookies blockieren oder löschen.
Kontakt: datenschutz@example.com
Datenschutzbeauftragter: Max Mustermann
"""
GAPPY_POLICY = """Cookies
Wir verwenden Cookies um die Website zu betreiben.
Cookies werden so lange gespeichert wie nötig.
"""
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_agent_is_registered():
agent = REGISTRY.get("cookie_policy")
assert agent is not None
assert agent.doc_type == "cookie"
def test_short_text_skipped(monkeypatch):
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.cookie_policy.agent.cascade",
_no_cascade,
)
agent = CookiePolicyAgent()
out = _run(agent.evaluate(AgentInput(doc_type="cookie", text="x")))
assert out.mc_total > 0
assert all(c.status == "skipped" for c in out.mc_coverage)
def test_full_policy_has_few_high_findings(monkeypatch):
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.cookie_policy.agent.cascade",
_no_cascade,
)
agent = CookiePolicyAgent()
out = _run(agent.evaluate(AgentInput(doc_type="cookie", text=FULL_POLICY)))
high = [f for f in out.findings if f.severity == Severity.HIGH.value]
assert not high, f"unexpected HIGH findings: {[f.field_id for f in high]}"
def test_gappy_policy_triggers_high(monkeypatch):
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.cookie_policy.agent.cascade",
_no_cascade,
)
agent = CookiePolicyAgent()
out = _run(agent.evaluate(AgentInput(doc_type="cookie",
text=GAPPY_POLICY)))
field_ids = {f.field_id for f in out.findings}
# 4 Kategorien fehlen, Vendoren fehlen, Opt-Out fehlt, Tabelle fehlt
assert "categories_named" in field_ids
assert "vendor_recipients" in field_ids
assert "opt_out_mechanism" in field_ids
def test_cmp_vendor_cross_check_emits_finding(monkeypatch):
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.cookie_policy.agent.cascade",
_no_cascade,
)
agent = CookiePolicyAgent()
out = _run(agent.evaluate(AgentInput(
doc_type="cookie", text=FULL_POLICY,
context={"cmp_vendors": [
{"name": "Hotjar"}, # NICHT in Policy
{"name": "Google LLC"}, # IN Policy
]},
)))
field_ids = {f.field_id for f in out.findings}
assert "vendor_consistency" in field_ids
cmp_f = next(f for f in out.findings
if f.field_id == "vendor_consistency")
assert "Hotjar" in cmp_f.evidence
assert "Google" not in cmp_f.evidence
def test_recommendations_are_built():
agent = CookiePolicyAgent()
out = _run(agent.evaluate(AgentInput(doc_type="cookie",
text=GAPPY_POLICY)))
assert out.recommendations
# Jede Recommendation hat mind. ein related_finding
for r in out.recommendations:
assert r.related_finding_ids
@@ -0,0 +1,93 @@
"""Tests für Evidence-Vault."""
from __future__ import annotations
import json
import os
from pathlib import Path
import pytest
@pytest.fixture
def tmp_vault(tmp_path, monkeypatch):
monkeypatch.setenv("EVIDENCE_VAULT_ROOT", str(tmp_path))
import compliance.services.specialist_agents._evidence_vault as v
yield v
def test_open_vault_creates_structure(tmp_vault):
vault = tmp_vault.open_vault("impressum", "2.0")
assert vault.root.exists()
for sub in ("screenshots", "videos", "csv", "findings", "raw"):
assert (vault.root / sub).is_dir()
assert vault.manifest_path.exists()
def test_put_bytes_appends_manifest(tmp_vault):
vault = tmp_vault.open_vault("impressum", "2.0")
rel = vault.put_bytes("screenshot", "url1", "test.png",
b"\x89PNG\r\n\x1a\n", mime="image/png")
assert rel.startswith("screenshots/")
assert (vault.root / rel).exists()
assets = vault.list_assets()
assert len(assets) == 1
assert assets[0]["sha256"]
assert assets[0]["mime"] == "image/png"
assert assets[0]["size_bytes"] == 8
def test_put_json_stores_finding(tmp_vault):
vault = tmp_vault.open_vault("cookie_policy", "1.0")
rel = vault.put_json("finding", "url1", "output.json",
{"findings": [{"check_id": "X"}]})
p = vault.root / rel
data = json.loads(p.read_text())
assert data["findings"][0]["check_id"] == "X"
def test_assets_for_slot_isolation(tmp_vault):
vault = tmp_vault.open_vault("agent", "1.0")
vault.put_bytes("screenshot", "url1", "a.png", b"a")
vault.put_bytes("screenshot", "url2", "b.png", b"b")
vault.put_bytes("video", "url1", "w.mp4", b"v")
assert len(vault.assets_for_slot("url1")) == 2
assert len(vault.assets_for_slot("url2")) == 1
def test_asset_path_blocks_traversal(tmp_vault):
vault = tmp_vault.open_vault("agent", "1.0")
p = vault.asset_path("../../../etc/passwd")
assert p is None
def test_finalize_writes_finished_at(tmp_vault):
vault = tmp_vault.open_vault("agent", "1.0")
snap = vault.finalize()
assert "finished_at" in snap
manifest = json.loads(vault.manifest_path.read_text())
assert "finished_at" in manifest
def test_list_runs_returns_recent(tmp_vault):
tmp_vault.open_vault("a", "1.0", run_id="run1")
tmp_vault.open_vault("b", "1.0", run_id="run2")
runs = tmp_vault.list_runs(limit=10)
ids = {r["run_id"] for r in runs}
assert {"run1", "run2"} <= ids
def test_delete_run_removes_dir(tmp_vault):
vault = tmp_vault.open_vault("a", "1.0", run_id="kill-me")
vault.put_bytes("screenshot", "u", "x.png", b"x")
assert tmp_vault.delete_run("kill-me")
assert not vault.root.exists()
assert not tmp_vault.delete_run("kill-me") # idempotent
def test_safe_filename_strips_path_chars(tmp_vault):
vault = tmp_vault.open_vault("a", "1.0")
rel = vault.put_bytes("raw", "slot",
"../../etc/passwd", b"x")
assert "passwd" in rel
assert ".." not in rel
@@ -0,0 +1,185 @@
"""Tests für Impressum-Agent v2 (BaseSpecialistAgent)."""
from __future__ import annotations
import asyncio
import pytest
from compliance.services.specialist_agents import (
REGISTRY,
AgentInput,
AgentOutput,
ImpressumAgent,
Severity,
)
from compliance.services.specialist_agents._base import (
FORBIDDEN_OUTPUT_TERMS,
lint_output,
stable_recommendation_id,
)
from compliance.services.specialist_agents._rollup import rollup
TESLA_IMPRESSUM = (
"Tesla Germany GmbH\n"
"Ludwig-Prandtl-Strasse 25-29\n"
"12526 Berlin\n"
"Deutschland\n\n"
"Email: kontakt@tesla.com\n"
"Telefon: +49 89 1250 16 800\n\n"
"Management:\n"
"Elon Musk\n\n"
"Handelsregister: HRB 218904 B, Amtsgericht Charlottenburg\n"
)
FULL_IMPRESSUM = (
TESLA_IMPRESSUM
+ "\nUSt-IdNr: DE123456789\nGeschäftsführer: Max Mustermann\n"
)
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_agent_is_registered():
agent = REGISTRY.get("impressum")
assert agent is not None
assert agent.doc_type == "impressum"
assert len(agent.owned_mc_ids) >= 10
def test_short_text_skipped():
agent = ImpressumAgent()
out = _run(agent.evaluate(AgentInput(doc_type="impressum", text="x")))
assert out.mc_total > 0
assert all(c.status == "skipped" for c in out.mc_coverage)
assert not out.findings
def test_tesla_missing_german_label(monkeypatch):
# Skip LLM escalation for unit test (no Ollama in CI)
async def _no_cascade(*a, **kw):
return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_no_cascade,
)
agent = ImpressumAgent()
out = _run(agent.evaluate(AgentInput(
doc_type="impressum", text=TESLA_IMPRESSUM,
)))
field_ids = {f.field_id for f in out.findings}
# Tesla pattern: "Management:" matches IMP-MC-006 → present
# But IMP-MC-007 (deutsches Label) MUSS fehlen
assert "vertretungsberechtigte_label_korrekt" in field_ids
# USt fehlt
assert "ust_id" in field_ids
def test_full_impressum_has_no_basic_findings(monkeypatch):
async def _no_cascade(*a, **kw):
return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_no_cascade,
)
agent = ImpressumAgent()
out = _run(agent.evaluate(AgentInput(
doc_type="impressum", text=FULL_IMPRESSUM,
)))
# nur scope-dependent fields fehlen (vsbg, odr, redaktion)
high = [f for f in out.findings if f.severity == Severity.HIGH.value]
assert not high, f"unexpected HIGH findings: {[f.field_id for f in high]}"
def test_b2c_scope_adds_vsbg(monkeypatch):
async def _no_cascade(*a, **kw):
return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_no_cascade,
)
agent = ImpressumAgent()
out = _run(agent.evaluate(AgentInput(
doc_type="impressum", text=TESLA_IMPRESSUM,
business_scope=["b2c", "ecommerce"],
)))
field_ids = {f.field_id for f in out.findings}
assert "verbraucher_streitbeilegung" in field_ids
assert "odr_link" in field_ids
def test_automotive_scope_auto_detected(monkeypatch):
async def _no_cascade(*a, **kw):
return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_no_cascade,
)
agent = ImpressumAgent()
out = _run(agent.evaluate(AgentInput(
doc_type="impressum", text=TESLA_IMPRESSUM,
)))
field_ids = {f.field_id for f in out.findings}
assert "aufsichtsbehoerde" in field_ids
# Action MUSS KBA-Hint enthalten
aufsicht = next(f for f in out.findings
if f.field_id == "aufsichtsbehoerde")
assert "KBA" in aufsicht.action or "Kraftfahrt" in aufsicht.action
def test_disclaimer_linter_scrubs_forbidden():
from compliance.services.specialist_agents._base import Finding
from datetime import datetime, timezone
f = Finding(
check_id="X", agent="t", agent_version="1",
severity=Severity.HIGH,
title="Diese Lösung ist rechtssicher und garantiert konform",
action="Voll konform machen",
)
out = AgentOutput(
agent="t", agent_version="1",
started_at=datetime.now(timezone.utc),
finished_at=datetime.now(timezone.utc),
duration_ms=0, findings=[f],
)
cleaned = lint_output(out)
assert "rechtssicher" not in cleaned.findings[0].title.lower()
assert "garantiert" not in cleaned.findings[0].title.lower()
assert "linter scrubbed" in cleaned.notes.lower()
def test_rollup_bundles_same_action():
from compliance.services.specialist_agents._base import Finding
fs = [
Finding(check_id="A", agent="t", agent_version="1",
severity=Severity.HIGH,
title="Lücke 1", action="AVV mit Anbieter X abschließen"),
Finding(check_id="B", agent="t", agent_version="1",
severity=Severity.MEDIUM,
title="Lücke 2", action="AVV mit Anbieter X abschließen."),
Finding(check_id="C", agent="t", agent_version="1",
severity=Severity.LOW,
title="Lücke 3", action="Etwas anderes machen"),
]
recs = rollup(fs)
assert len(recs) == 2
bundled = next(r for r in recs if len(r.related_finding_ids) == 2)
assert bundled.severity == Severity.HIGH.value
assert set(bundled.related_finding_ids) == {"A", "B"}
def test_stable_recommendation_id_is_deterministic():
a = stable_recommendation_id("AVV mit Anbieter X abschließen")
b = stable_recommendation_id("avv mit anbieter x abschliessen")
# case insensitive aber Diakritika strict (deutsch ß ≠ ss)
assert len(a) == 16
assert len(b) == 16
def test_forbidden_terms_complete():
"""Sanity-Test, dass alle wichtigen Wörter im Linter sind."""
for term in ("rechtssicher", "garantiert", "gesetzeskonform"):
assert any(term in t for t in FORBIDDEN_OUTPUT_TERMS)