From f4357a2e9b9bae13029f09c7c8e46977ad64cba8 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 8 Jun 2026 17:40:05 +0200 Subject: [PATCH] feat(agents): Specialist-Agents Phase 2 Foundation + Cookie-Policy-Agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sprint 1 — Foundation (User-Vorgabe 2026-06-08): Foundation: - _base.py: BaseSpecialistAgent ABC + Pydantic Contract (AgentInput/AgentOutput/Finding/Recommendation/McCoverage/EscalationLog). - _base.lint_output(): Disclaimer-Linter verbietet "rechtssicher" / "garantiert" / "gesetzeskonform" — scrubbed inline + Log in notes. - _registry.py: AgentRegistry mit MC-Owner-Mapping (verhindert Doppel-Ownership). - _escalation.py: cascade(local → ovh). qwen2.5:7b default, OVH 120b als Stage-2 (deaktiviert wenn OVH_URL leer). - _rollup.py: deterministisches Dedup ähnlicher actions zu Recommendations mit related_finding_ids[]. - _evidence_vault.py: Pro-Run File-Vault für Playwright-Videos, Screenshots, CSV. SHA256 + manifest.json. DSR-tauglich (delete_run). Agenten: - ImpressumAgent v2 (impressum/agent.py + mcs.py) — konsolidiert v1-Pattern-Match + v2-LLM-MVP unter dem neuen Contract. 12 MCs. - CookiePolicyAgent v1 (cookie_policy/agent.py + mcs.py) — 12 MCs zu Cookie-Richtlinie-Vollständigkeit + KB-Layer für CMP-Vendor-Cross-Check. Tests: 25/25 grün (10 Impressum + 9 Vault + 6 Cookie-Policy). Roadmap: SSE-Test-Endpoint + Frontend-Tab → DSE/AGB-Agents → Cookie-Banner-Themen-Agent → Cross-Doc-Konsistenz-Agent. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../services/specialist_agents/__init__.py | 46 ++- .../services/specialist_agents/_base.py | 207 +++++++++++ .../services/specialist_agents/_escalation.py | 224 +++++++++++ .../specialist_agents/_evidence_vault.py | 254 +++++++++++++ .../services/specialist_agents/_registry.py | 57 +++ .../services/specialist_agents/_rollup.py | 92 +++++ .../cookie_policy/__init__.py | 8 + .../specialist_agents/cookie_policy/agent.py | 347 ++++++++++++++++++ .../specialist_agents/cookie_policy/mcs.py | 225 ++++++++++++ .../specialist_agents/impressum/__init__.py | 8 + .../specialist_agents/impressum/agent.py | 251 +++++++++++++ .../specialist_agents/impressum/mcs.py | 234 ++++++++++++ .../tests/test_specialist_cookie_policy.py | 143 ++++++++ .../tests/test_specialist_evidence_vault.py | 93 +++++ .../tests/test_specialist_impressum_v2.py | 185 ++++++++++ 15 files changed, 2364 insertions(+), 10 deletions(-) create mode 100644 backend-compliance/compliance/services/specialist_agents/_base.py create mode 100644 backend-compliance/compliance/services/specialist_agents/_escalation.py create mode 100644 backend-compliance/compliance/services/specialist_agents/_evidence_vault.py create mode 100644 backend-compliance/compliance/services/specialist_agents/_registry.py create mode 100644 backend-compliance/compliance/services/specialist_agents/_rollup.py create mode 100644 backend-compliance/compliance/services/specialist_agents/cookie_policy/__init__.py create mode 100644 backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py create mode 100644 backend-compliance/compliance/services/specialist_agents/cookie_policy/mcs.py create mode 100644 backend-compliance/compliance/services/specialist_agents/impressum/__init__.py create mode 100644 backend-compliance/compliance/services/specialist_agents/impressum/agent.py create mode 100644 backend-compliance/compliance/services/specialist_agents/impressum/mcs.py create mode 100644 backend-compliance/tests/test_specialist_cookie_policy.py create mode 100644 backend-compliance/tests/test_specialist_evidence_vault.py create mode 100644 backend-compliance/tests/test_specialist_impressum_v2.py diff --git a/backend-compliance/compliance/services/specialist_agents/__init__.py b/backend-compliance/compliance/services/specialist_agents/__init__.py index 0c72e364..c3713608 100644 --- a/backend-compliance/compliance/services/specialist_agents/__init__.py +++ b/backend-compliance/compliance/services/specialist_agents/__init__.py @@ -1,17 +1,43 @@ -"""Doc-Type Specialist-Agents — Phase 1 Prototyp. +"""Doc-Type Specialist-Agents — Phase 2 LLM (Foundation 2026-06-08). Architektur: - - Pro Doc-Type ein Spezialist-Agent mit System-Prompt (Domänenwissen) - + Knowledge-Base (anonymisierte Patterns/Statistiken aus - Multi-Mandanten-Daten) - - Jeder Agent liefert strukturierte Findings → enriched state - - Ein Cross-Doc-Router-Agent prüft ob Absätze falsch zugeordnet sind - ("Cookie-Inhalt steht in AGB statt Cookie-Richtlinie") + - BaseSpecialistAgent + AgentInput/AgentOutput Contract (_base.py) + - AgentRegistry mit MC-Owner-Mapping (_registry.py) + - Eskalations-Kaskade qwen2.5:7b → OVH 120b (_escalation.py) + - Rollup-Dedup für Recommendations (_rollup.py) + - Disclaimer-Linter verbietet 'rechtssicher'/'garantiert' (_base.lint_output) -Phase 1: Impressum-Agent als Prototyp (Pattern-Match-only, ohne LLM). -Phase 2: DSE-Agent + Cross-Doc-Router (LLM-gestützt). -Phase 3+: Weitere Doc-Types + Continuous Learning der KB. +Aktive Agenten: + - impressum (v2) — § 5 TMG/DDG, § 18 MStV, § 36 VSBG + - (dse, agb, cookie_policy, cookie_banner, cross — folgen) Privacy: KB enthält NIEMALS Roh-Mandantendaten. Anonymisierung + Aggregation Pflicht (NER-Maskierung vor KB-Speicher). """ + +from ._base import ( + AgentInput, + AgentOutput, + BaseSpecialistAgent, + EscalationLog, + EvidenceSource, + Finding, + McCoverage, + Recommendation, + Severity, + SourceType, +) +from ._registry import REGISTRY +from .cookie_policy import CookiePolicyAgent +from .impressum import ImpressumAgent + +# Self-register all agents +REGISTRY.register(ImpressumAgent()) +REGISTRY.register(CookiePolicyAgent()) + +__all__ = [ + "AgentInput", "AgentOutput", "BaseSpecialistAgent", + "EscalationLog", "EvidenceSource", "Finding", "McCoverage", + "Recommendation", "Severity", "SourceType", + "REGISTRY", "ImpressumAgent", "CookiePolicyAgent", +] diff --git a/backend-compliance/compliance/services/specialist_agents/_base.py b/backend-compliance/compliance/services/specialist_agents/_base.py new file mode 100644 index 00000000..74658a39 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_base.py @@ -0,0 +1,207 @@ +"""BaseSpecialistAgent — Contract für alle Doc-Type-Agenten. + +Architektur-Vertrag (User-Vorgabe 2026-06-08): + - Jeder Agent ist ein hochspezialisierter Anwalt für sein Thema. + - Standard: deterministische MCs + Regex + FAQ-KB. + - Eskalation bei Unsicherheit: qwen2.5:7b → OVH 120b → Claude (TBD). + - Jeder Output ist auditfest: Quelle pro Finding (MC-ID / Regex / + FAQ / LLM-Stufe + Prompt-Hash). + - Output enthält NIEMALS "rechtssicher" / "garantiert" — Linter + verbietet diese Wörter. + - Token-Budget pro Agent (Cost-Cap). +""" + +from __future__ import annotations + +import abc +import hashlib +from datetime import datetime +from enum import Enum +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class Severity(str, Enum): + HIGH = "HIGH" + MEDIUM = "MEDIUM" + LOW = "LOW" + INFO = "INFO" + + +class SourceType(str, Enum): + """Wo kommt das Finding her? Für die auditfeste Beweiskette.""" + MC = "mc" # Machine-Check (deterministisch) + REGEX = "regex" # Pattern-Match + KB_FAQ = "kb_faq" # Knowledge-Base / kuratiert + LLM_LOCAL = "llm_local" # qwen2.5:7b oder qwen3 + LLM_LOCAL_BIG = "llm_local_big" # OVH 120b + LLM_CLOUD = "llm_cloud" # Claude API (Anonymisierung Pflicht) + CROSS = "cross" # Cross-Doc-Agent + + +class EvidenceSource(BaseModel): + """Eine Quelle in der Beweiskette eines Findings.""" + source_type: SourceType + source_id: str = "" # mc_id, regex_name, faq_id, model + detail: str = "" # Snippet, Prompt-Hash, etc. + confidence: float = 1.0 # 0.0 – 1.0 + + +class Finding(BaseModel): + """Ein einzelnes Audit-Finding aus einem Specialist-Agent.""" + model_config = ConfigDict(use_enum_values=True) + + check_id: str # z.B. IMPRESSUM-AGENT-HANDELSREGISTER + agent: str # impressum_v2 + agent_version: str # 2.0 + field_id: str = "" # field-key innerhalb des Agenten + severity: Severity + severity_reason: str = "" + title: str + norm: str = "" + evidence: str = "" + action: str = "" + confidence: float = 1.0 # 0.0 – 1.0; treibt Eskalation + sources: list[EvidenceSource] = Field(default_factory=list) + + +class Recommendation(BaseModel): + """Konsolidierte Maßnahme. Kann aus 1..N Findings stammen + (Rollup-Dedup-Schritt bündelt ähnliche Maßnahmen).""" + recommendation_id: str # stable hash der canonical_text + title: str + body: str + severity: Severity # höchste Severity der Quell-Findings + related_finding_ids: list[str] = Field(default_factory=list) + estimated_effort_hours: float = 0.0 + + +class McCoverage(BaseModel): + """Welche MC hat der Agent geprüft + Ergebnis.""" + mc_id: str + status: str # ok | high | medium | low | na | skipped + reason: str = "" + + +class EscalationLog(BaseModel): + """Ein Eskalations-Schritt mit Modell + Latenz + Kosten.""" + stage: SourceType + model: str + duration_ms: int = 0 + tokens_in: int = 0 + tokens_out: int = 0 + success: bool = True + error: str = "" + + +class AgentInput(BaseModel): + """Was ein Agent zur Bearbeitung bekommt.""" + doc_type: str + text: str + url: str = "" + business_scope: list[str] = Field(default_factory=list) + company_name: str = "" + origin_domain: str = "" + # extra context z.B. extracted_profile, multi-doc-context + context: dict[str, Any] = Field(default_factory=dict) + + +class AgentOutput(BaseModel): + """Was ein Agent zurückgibt — auditfestes Resultat.""" + agent: str + agent_version: str + started_at: datetime + finished_at: datetime + duration_ms: int + findings: list[Finding] = Field(default_factory=list) + recommendations: list[Recommendation] = Field(default_factory=list) + mc_coverage: list[McCoverage] = Field(default_factory=list) + escalation_log: list[EscalationLog] = Field(default_factory=list) + confidence: float = 1.0 + notes: str = "" + # Speed-O-Meter aggregates (vorberechnet fürs Frontend) + mc_total: int = 0 + mc_ok: int = 0 + mc_na: int = 0 + mc_high: int = 0 + mc_medium: int = 0 + mc_low: int = 0 + + +# Verbotene Wörter im Output — sicherheitshalber, damit kein Agent +# (oder LLM-Stufe) "rechtssicher" suggeriert. User-Vorgabe. +FORBIDDEN_OUTPUT_TERMS = ( + "rechtssicher", + "rechts-sicher", + "garantiert", + "garantieren", + "garantie", + "gesetzeskonform", + "gesetzes-konform", + "100% konform", + "100 % konform", + "vollständig konform", + "absolut sicher", +) + + +class DisclaimerViolation(ValueError): + """Erhoben wenn ein Agent-Output ein verbotenes Wort enthält.""" + + +def lint_output(output: AgentOutput) -> AgentOutput: + """Prüft den Output gegen FORBIDDEN_OUTPUT_TERMS. Sanitisiert + titles/actions inline — ersetzt verbotene Begriffe durch + [→ neutraler Wortlaut empfohlen]-Marker. Loggt aber NICHT in + den Datenbestand, sondern als notes.""" + issues: list[str] = [] + for f in output.findings: + for field_name in ("title", "evidence", "action"): + v = getattr(f, field_name) or "" + for term in FORBIDDEN_OUTPUT_TERMS: + if term in v.lower(): + issues.append(f"Finding {f.check_id}.{field_name}: '{term}'") + v = _scrub(v, term) + setattr(f, field_name, v) + for r in output.recommendations: + for field_name in ("title", "body"): + v = getattr(r, field_name) or "" + for term in FORBIDDEN_OUTPUT_TERMS: + if term in v.lower(): + issues.append(f"Rec {r.recommendation_id}.{field_name}: '{term}'") + v = _scrub(v, term) + setattr(r, field_name, v) + if issues: + prefix = output.notes + " · " if output.notes else "" + output.notes = prefix + "linter scrubbed: " + "; ".join(issues[:5]) + return output + + +def _scrub(text: str, term: str) -> str: + """Case-insensitive replace mit Marker.""" + import re as _re + return _re.sub( + _re.escape(term), "[→ neutraler Wortlaut]", + text, flags=_re.IGNORECASE, + ) + + +def stable_recommendation_id(canonical_text: str) -> str: + """Stable hash der canonical recommendation für Dedup-Rollup.""" + h = hashlib.sha256(canonical_text.lower().strip().encode("utf-8")) + return h.hexdigest()[:16] + + +class BaseSpecialistAgent(abc.ABC): + """Abstract base — jeder Agent erbt davon.""" + + agent_id: str = "" # "impressum", "dse", ... + agent_version: str = "0.0" + doc_type: str = "" # Doc-Type den der Agent owned + owned_mc_ids: tuple[str, ...] = () + + @abc.abstractmethod + async def evaluate(self, agent_input: AgentInput) -> AgentOutput: + """Run the agent. MUST return a complete AgentOutput. + Implementations should call lint_output() before returning.""" diff --git a/backend-compliance/compliance/services/specialist_agents/_escalation.py b/backend-compliance/compliance/services/specialist_agents/_escalation.py new file mode 100644 index 00000000..474ac57f --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_escalation.py @@ -0,0 +1,224 @@ +"""Eskalations-Kaskade: qwen2.5:7b → OVH 120b → DOWN_FOR_REVIEW. + +User-Vorgabe 2026-06-08: + - Standard: deterministisch (Pattern + MC + KB) + - Eskalation nur wenn deterministisches Ergebnis unklar bleibt. + - Stage 1: lokales qwen2.5:7b (schnell, billig) + - Stage 2: OVH 120b (groß, externalisiert) + - Stage 3: Claude API mit Anonymisierung — NICHT in Phase 1 aktiv. + - Bei Total-Fail: Output "DOWN_FOR_REVIEW" (kein Halluzinieren). + +Cost-Cap: jeder Agent meldet sein Token-Budget. Wenn überschritten: +abbrechen und "manuelle Prüfung empfohlen" emittieren. +""" + +from __future__ import annotations + +import json +import logging +import os +import time +from typing import Any + +import httpx + +from ._base import EscalationLog, SourceType + +logger = logging.getLogger(__name__) + +OLLAMA_URL = os.environ.get( + "OLLAMA_URL", "http://host.docker.internal:11434", +) +OLLAMA_MODEL_LOCAL = os.environ.get( + "AGENT_MODEL_LOCAL", "qwen2.5:7b", +) +OVH_URL = os.environ.get("OVH_LLM_URL", "") +OVH_MODEL = os.environ.get("OVH_LLM_MODEL", "Meta-Llama-3.1-70B-Instruct") +OVH_API_KEY = os.environ.get("OVH_LLM_API_KEY", "") +OVH_TIMEOUT = float(os.environ.get("OVH_LLM_TIMEOUT", "60")) +LOCAL_TIMEOUT = float(os.environ.get("AGENT_LOCAL_TIMEOUT", "60")) + + +class EscalationResult: + """Wrapper für ein Eskalations-Resultat.""" + + def __init__( + self, + content: str | None, + stage: SourceType, + model: str, + log: EscalationLog, + parsed: Any = None, + ) -> None: + self.content = content + self.stage = stage + self.model = model + self.log = log + self.parsed = parsed + + @property + def success(self) -> bool: + return bool(self.content) + + +async def call_local_llm( + system_prompt: str, + user_prompt: str, + expect_json: bool = True, + model: str | None = None, + timeout: float | None = None, +) -> EscalationResult: + """Stage 1: lokales Ollama-Modell.""" + mdl = model or OLLAMA_MODEL_LOCAL + body = { + "model": mdl, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "stream": False, + "options": {"temperature": 0.0, "seed": 42, "num_predict": 1200}, + } + if expect_json: + body["format"] = "json" + t0 = time.time() + log = EscalationLog( + stage=SourceType.LLM_LOCAL, + model=mdl, + success=False, + ) + try: + async with httpx.AsyncClient(timeout=timeout or LOCAL_TIMEOUT) as c: + r = await c.post(f"{OLLAMA_URL}/api/chat", json=body) + r.raise_for_status() + j = r.json() + content = (j.get("message") or {}).get("content", "") or "" + log.tokens_in = int(j.get("prompt_eval_count") or 0) + log.tokens_out = int(j.get("eval_count") or 0) + except Exception as e: + log.error = f"{type(e).__name__}: {str(e)[:120]}" + logger.warning("call_local_llm fail: %s", log.error) + log.duration_ms = int((time.time() - t0) * 1000) + return EscalationResult(None, SourceType.LLM_LOCAL, mdl, log) + log.duration_ms = int((time.time() - t0) * 1000) + parsed = None + if content and expect_json: + parsed = _try_parse_json(content) + if parsed is None: + log.error = "json_parse_fail" + content = None + if content: + log.success = True + return EscalationResult(content, SourceType.LLM_LOCAL, mdl, log, parsed) + + +async def call_ovh_llm( + system_prompt: str, + user_prompt: str, + expect_json: bool = True, +) -> EscalationResult: + """Stage 2: OVH 120b (extern). Deaktiviert wenn OVH_URL leer.""" + log = EscalationLog( + stage=SourceType.LLM_LOCAL_BIG, + model=OVH_MODEL, + success=False, + ) + if not OVH_URL: + log.error = "ovh_disabled" + return EscalationResult(None, SourceType.LLM_LOCAL_BIG, OVH_MODEL, log) + body = { + "model": OVH_MODEL, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "temperature": 0.0, + "max_tokens": 1500, + } + if expect_json: + body["response_format"] = {"type": "json_object"} + headers = {"Content-Type": "application/json"} + if OVH_API_KEY: + headers["Authorization"] = f"Bearer {OVH_API_KEY}" + t0 = time.time() + try: + async with httpx.AsyncClient(timeout=OVH_TIMEOUT) as c: + r = await c.post( + f"{OVH_URL.rstrip('/')}/v1/chat/completions", + json=body, + headers=headers, + ) + r.raise_for_status() + j = r.json() + content = ( + ((j.get("choices") or [{}])[0] + .get("message") or {}) + .get("content", "") + ) or "" + usage = j.get("usage") or {} + log.tokens_in = int(usage.get("prompt_tokens") or 0) + log.tokens_out = int(usage.get("completion_tokens") or 0) + except Exception as e: + log.error = f"{type(e).__name__}: {str(e)[:120]}" + logger.warning("call_ovh_llm fail: %s", log.error) + log.duration_ms = int((time.time() - t0) * 1000) + return EscalationResult(None, SourceType.LLM_LOCAL_BIG, OVH_MODEL, log) + log.duration_ms = int((time.time() - t0) * 1000) + parsed = None + if content and expect_json: + parsed = _try_parse_json(content) + if parsed is None: + log.error = "json_parse_fail" + content = None + if content: + log.success = True + return EscalationResult(content, SourceType.LLM_LOCAL_BIG, OVH_MODEL, + log, parsed) + + +def _try_parse_json(content: str) -> Any | None: + """Robust JSON extract (handles fences, prose-wrap).""" + if not content: + return None + s = content.strip() + if s.startswith("```"): + s = s.strip("`") + if s.lower().startswith("json"): + s = s[4:] + s = s.strip() + first_o, last_o = s.find("{"), s.rfind("}") + first_a, last_a = s.find("["), s.rfind("]") + candidates = [] + if first_o >= 0 and last_o > first_o: + candidates.append(s[first_o:last_o + 1]) + if first_a >= 0 and last_a > first_a: + candidates.append(s[first_a:last_a + 1]) + for c in candidates: + try: + return json.loads(c) + except Exception: + continue + return None + + +async def cascade( + system_prompt: str, + user_prompt: str, + expect_json: bool = True, + skip_ovh: bool = False, +) -> tuple[EscalationResult | None, list[EscalationLog]]: + """Default-Kaskade: Local → OVH. Bei Local-Erfolg KEIN OVH-Call.""" + logs: list[EscalationLog] = [] + res = await call_local_llm(system_prompt, user_prompt, + expect_json=expect_json) + logs.append(res.log) + if res.success: + return res, logs + if skip_ovh or not OVH_URL: + return None, logs + res2 = await call_ovh_llm(system_prompt, user_prompt, + expect_json=expect_json) + logs.append(res2.log) + if res2.success: + return res2, logs + return None, logs diff --git a/backend-compliance/compliance/services/specialist_agents/_evidence_vault.py b/backend-compliance/compliance/services/specialist_agents/_evidence_vault.py new file mode 100644 index 00000000..44fa8572 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_evidence_vault.py @@ -0,0 +1,254 @@ +"""Evidence-Vault — zentraler Beweis-Bereich pro Agent-Run. + +User-Vorgabe 2026-06-08: pro Agent-Run brauchen wir einen Bereich um +Playwright-Videos (mp4), Screenshots (png) und CSV-Exporte abzulegen, +damit DSB/Anwalt eine portable Beweiskette hat. + +Layout: + artifacts/agent_runs/{run_id}/ + manifest.json ← Index + SHA256 pro Asset + screenshots/ + {slot}_{name}.png + videos/ + {slot}_walk.mp4 + csv/ + cookies_{slot}.csv + findings/ + {slot}_output.json ← AgentOutput + raw/ + {slot}_doc.html ← Original-Source + +Pro Run-ID: kann mehrere "Slots" enthalten (z.B. 5 URLs). + +manifest.json: + { + "run_id": "...", + "agent_id": "impressum", + "agent_version": "2.0", + "started_at": "2026-06-08T...", + "assets": [ + {"slot": "url1", "kind": "video", "path": "videos/url1_walk.mp4", + "sha256": "...", "size_bytes": 12345, "mime": "video/mp4"} + ] + } +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import mimetypes +import os +import shutil +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +def _vault_root() -> Path: + """Resolved at call time so tests can monkeypatch the env var.""" + return Path(os.environ.get( + "EVIDENCE_VAULT_ROOT", "/app/artifacts/agent_runs", + )) + + +class EvidenceVault: + """Pro Agent-Run eine Instanz. Schreibt alle Assets unter + VAULT_ROOT/{run_id}/.""" + + def __init__( + self, + agent_id: str, + agent_version: str, + run_id: str | None = None, + ) -> None: + self.run_id = run_id or uuid.uuid4().hex[:16] + self.agent_id = agent_id + self.agent_version = agent_version + self.root = _vault_root() / self.run_id + self.root.mkdir(parents=True, exist_ok=True) + for sub in ("screenshots", "videos", "csv", + "findings", "raw"): + (self.root / sub).mkdir(exist_ok=True) + self.manifest_path = self.root / "manifest.json" + self._manifest: dict[str, Any] = self._load_or_init() + # Persist immediately so the file exists even before any + # put_*() call — important for ../runs-listing. + if not self.manifest_path.exists(): + self._save_manifest() + + def _load_or_init(self) -> dict[str, Any]: + if self.manifest_path.exists(): + try: + return json.loads(self.manifest_path.read_text()) + except Exception as e: + logger.warning("manifest load fail, reset: %s", e) + return { + "run_id": self.run_id, + "agent_id": self.agent_id, + "agent_version": self.agent_version, + "started_at": datetime.now(timezone.utc).isoformat(), + "assets": [], + } + + def _save_manifest(self) -> None: + self.manifest_path.write_text( + json.dumps(self._manifest, indent=2, default=str), + ) + + def put_bytes( + self, + kind: str, + slot: str, + filename: str, + data: bytes, + mime: str | None = None, + ) -> str: + """Schreibt Bytes in einen Vault-Slot. + + kind: 'screenshot' | 'video' | 'csv' | 'finding' | 'raw' + slot: identifier (z.B. URL-Index "url1") + Returns: relative path inside the vault. + """ + subdir = self._subdir_for(kind) + safe_name = self._safe_filename(filename) + rel = f"{subdir}/{slot}_{safe_name}" + target = self.root / rel + target.write_bytes(data) + sha = hashlib.sha256(data).hexdigest() + if not mime: + mime, _ = mimetypes.guess_type(safe_name) + mime = mime or "application/octet-stream" + self._manifest["assets"].append({ + "slot": slot, + "kind": kind, + "path": rel, + "sha256": sha, + "size_bytes": len(data), + "mime": mime, + "stored_at": datetime.now(timezone.utc).isoformat(), + }) + self._save_manifest() + return rel + + def put_file( + self, + kind: str, + slot: str, + source_path: str, + target_filename: str | None = None, + ) -> str: + """Copy a file (e.g. Playwright video) into the vault.""" + src = Path(source_path) + if not src.is_file(): + raise FileNotFoundError(source_path) + target_filename = target_filename or src.name + data = src.read_bytes() + return self.put_bytes( + kind, slot, target_filename, data, + mime=mimetypes.guess_type(src.name)[0], + ) + + def put_json( + self, + kind: str, + slot: str, + filename: str, + payload: Any, + ) -> str: + data = json.dumps(payload, indent=2, default=str).encode("utf-8") + return self.put_bytes(kind, slot, filename, data, + mime="application/json") + + def assets_for_slot(self, slot: str) -> list[dict]: + return [a for a in self._manifest["assets"] if a["slot"] == slot] + + def list_assets(self) -> list[dict]: + return list(self._manifest["assets"]) + + def asset_path(self, relative_path: str) -> Path | None: + """Resolve a manifest-relative path to absolute. Returns None + if outside vault (defense against path traversal).""" + p = (self.root / relative_path).resolve() + try: + p.relative_to(self.root.resolve()) + except ValueError: + return None + if not p.exists(): + return None + return p + + def finalize(self) -> dict[str, Any]: + """Marks the run as finished and returns the manifest snapshot.""" + self._manifest["finished_at"] = datetime.now( + timezone.utc).isoformat() + self._save_manifest() + return dict(self._manifest) + + def url(self) -> str: + """API-URL um die Assets zu listen (für Frontend).""" + return f"/api/v1/specialist-agent/run/{self.run_id}/artifacts" + + @staticmethod + def _subdir_for(kind: str) -> str: + return { + "screenshot": "screenshots", + "video": "videos", + "csv": "csv", + "finding": "findings", + "raw": "raw", + }.get(kind, "raw") + + @staticmethod + def _safe_filename(name: str) -> str: + # Strip directory prefixes first so "../../etc/passwd" → "passwd" + base = os.path.basename(name.replace("\\", "/")) + keep = "._-" + cleaned = "".join( + c if (c.isalnum() or c in keep) else "_" for c in base + ) + # Collapse parent-dir markers that survived in unusual encodings + cleaned = cleaned.replace("..", "_") + return cleaned[:120] or "unnamed" + + +def open_vault( + agent_id: str, + agent_version: str, + run_id: str | None = None, +) -> EvidenceVault: + return EvidenceVault(agent_id, agent_version, run_id) + + +def list_runs(limit: int = 50) -> list[dict]: + """Lists recent run-manifests.""" + root = _vault_root() + if not root.exists(): + return [] + runs: list[tuple[float, dict]] = [] + for p in root.iterdir(): + if not p.is_dir(): + continue + m = p / "manifest.json" + if not m.is_file(): + continue + try: + data = json.loads(m.read_text()) + runs.append((m.stat().st_mtime, data)) + except Exception: + continue + runs.sort(key=lambda x: x[0], reverse=True) + return [r[1] for r in runs[:limit]] + + +def delete_run(run_id: str) -> bool: + """Tombstone: löscht den ganzen Run-Ordner. Für DSGVO-DSR Art. 17.""" + p = _vault_root() / run_id + if not p.exists(): + return False + shutil.rmtree(p) + return True diff --git a/backend-compliance/compliance/services/specialist_agents/_registry.py b/backend-compliance/compliance/services/specialist_agents/_registry.py new file mode 100644 index 00000000..0a751efa --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_registry.py @@ -0,0 +1,57 @@ +"""AgentRegistry — zentrale Liste aller Specialist-Agenten + MC-Owner-Mapping. + +Verhindert dass eine MC zweimal geprüft wird (oder gar nicht). Der +SSE-Test-Endpoint löst Agent-Namen über diese Registry auf. +""" + +from __future__ import annotations + +from ._base import BaseSpecialistAgent + + +class AgentRegistry: + """Singleton-ähnliche Registry. Ein Agent registriert sich beim + Import, der SSE-Endpoint löst auf.""" + + def __init__(self) -> None: + self._agents: dict[str, BaseSpecialistAgent] = {} + # MC-ID → agent_id. Verhindert Doppel-Ownership. + self._mc_owner: dict[str, str] = {} + + def register(self, agent: BaseSpecialistAgent) -> None: + if not agent.agent_id: + raise ValueError("agent.agent_id darf nicht leer sein") + if agent.agent_id in self._agents: + # Re-register erlaubt (z.B. Hot-Reload), aber überschreibt + self._agents[agent.agent_id] = agent + return + self._agents[agent.agent_id] = agent + for mc_id in agent.owned_mc_ids: + existing = self._mc_owner.get(mc_id) + if existing and existing != agent.agent_id: + raise ValueError( + f"MC {mc_id} bereits von Agent {existing} owned — " + f"jetzt versucht {agent.agent_id}. Konflikt auflösen." + ) + self._mc_owner[mc_id] = agent.agent_id + + def get(self, agent_id: str) -> BaseSpecialistAgent | None: + return self._agents.get(agent_id) + + def list_agents(self) -> list[dict]: + """Public listing für das Frontend (Agent-Wähler).""" + return [ + { + "agent_id": a.agent_id, + "agent_version": a.agent_version, + "doc_type": a.doc_type, + "mc_count": len(a.owned_mc_ids), + } + for a in self._agents.values() + ] + + def owner_of(self, mc_id: str) -> str | None: + return self._mc_owner.get(mc_id) + + +REGISTRY = AgentRegistry() diff --git a/backend-compliance/compliance/services/specialist_agents/_rollup.py b/backend-compliance/compliance/services/specialist_agents/_rollup.py new file mode 100644 index 00000000..8a4db2ce --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_rollup.py @@ -0,0 +1,92 @@ +"""Rollup-Dedup für Recommendations. + +Wenn 3 Findings alle "AVV mit Anbieter X abschließen" sagen, werden sie +zu einer Recommendation gebündelt mit Liste der ursächlichen Findings. +Spart Kundenzeit beim Abarbeiten. + +KEIN LLM — deterministische Normalisierung + Hashing. +""" + +from __future__ import annotations + +import re + +from ._base import Finding, Recommendation, Severity, stable_recommendation_id + + +_SEVERITY_ORDER = { + Severity.HIGH: 3, + Severity.MEDIUM: 2, + Severity.LOW: 1, + Severity.INFO: 0, +} + + +def _canonical(text: str) -> str: + """Normalisiert: lowercase, Whitespace zusammenfalten, Punkte raus. + Liefert eine kanonische Form für Dedup-Hashing.""" + t = (text or "").lower().strip() + t = re.sub(r"\s+", " ", t) + t = re.sub(r"[.,;:!?]+$", "", t) + return t + + +def rollup(findings: list[Finding]) -> list[Recommendation]: + """Bündelt actions zu Recommendations. Eine Recommendation enthält + alle Findings deren action denselben canonical_text hat.""" + buckets: dict[str, list[Finding]] = {} + for f in findings: + action = f.action or "" + if not action: + continue + key = _canonical(action) + if not key: + continue + buckets.setdefault(key, []).append(f) + out: list[Recommendation] = [] + for key, group in buckets.items(): + max_sev_finding = max( + group, key=lambda x: _SEVERITY_ORDER.get(_sev(x), 0), + ) + # title aus dem ersten Finding (am ehesten lesbar) + first = group[0] + body = first.action + # wenn mehrere distincte titles: liste sie + related_titles = sorted({f.title for f in group if f.title}) + if len(related_titles) > 1: + body = ( + body + "\n\nGilt für: " + + "; ".join(related_titles[:8]) + ) + rid = stable_recommendation_id(key) + out.append(Recommendation( + recommendation_id=rid, + title=first.action[:120] or "(ohne Titel)", + body=body, + severity=_sev(max_sev_finding), + related_finding_ids=[f.check_id for f in group], + estimated_effort_hours=_estimate_effort(group), + )) + return sorted( + out, + key=lambda r: _SEVERITY_ORDER.get(Severity(r.severity), 0), + reverse=True, + ) + + +def _sev(f: Finding) -> Severity: + """Helper: handle Enum vs str (because use_enum_values=True).""" + v = f.severity + if isinstance(v, Severity): + return v + try: + return Severity(str(v).upper()) + except Exception: + return Severity.INFO + + +def _estimate_effort(group: list[Finding]) -> float: + """Heuristik: pro Finding 0.5h Basis, +0.5h pro HIGH, max 8h.""" + base = 0.5 * len(group) + extra = 0.5 * sum(1 for f in group if _sev(f) == Severity.HIGH) + return min(8.0, base + extra) diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/__init__.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/__init__.py new file mode 100644 index 00000000..751666a8 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/__init__.py @@ -0,0 +1,8 @@ +"""Cookie-Policy-Specialist-Agent v2. + +Public Entry-Point: CookiePolicyAgent (inherits BaseSpecialistAgent). +""" + +from .agent import CookiePolicyAgent + +__all__ = ["CookiePolicyAgent"] diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py new file mode 100644 index 00000000..56a3ec90 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py @@ -0,0 +1,347 @@ +"""Cookie-Policy-Agent v2 — BaseSpecialistAgent. + +Prüft den Cookie-Policy-DOKUMENT-Text (NICHT das Banner — das macht +der Cookie-Banner-Themen-Agent). Konsumiert optional context.cmp_vendors +für Konsistenz-Checks gegen die tatsächlich beobachtete Cookie-Liste. + +Eskalations-Stufen: + 1. MC (regex) — schnell, deterministisch + 2. cookie_library_lookup gegen state.context.cmp_vendors (wenn vorhanden) + 3. LLM (qwen2.5:7b) für strukturelle/semantische Lücken + 4. OVH 120b als Fallback +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from .._base import ( + AgentInput, + AgentOutput, + BaseSpecialistAgent, + EscalationLog, + EvidenceSource, + Finding, + McCoverage, + Severity, + SourceType, + lint_output, +) +from .._escalation import cascade +from .._rollup import rollup +from .mcs import MC_IDS, MCS + +logger = logging.getLogger(__name__) + + +_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus +TDDDG § 25 + DSGVO Art. 13 + EuGH Planet49 + BGH Cookie-II. Aufgabe: +eine Cookie-Richtlinie auf strukturelle und inhaltliche LÜCKEN prüfen, +die einer regex-basierten Vorprüfung entgangen sind. + +WICHTIG: + - KEINE Bewertung "rechtssicher" / "garantiert" / "konform". + - Wenn unsicher: leeres Array zurückgeben statt zu halluzinieren. + - Wörtliches Zitat als evidence bei jeder Lücke. + +Antworte NUR mit JSON, Schema: + {"findings": [ + {"field_id": "...", "severity": "HIGH|MEDIUM|LOW", + "title": "...", "evidence": "wörtliches Zitat", + "action": "konkrete Empfehlung"} + ]} + +Typische Lücken-Kategorien: + - pseudo_purpose: "Siehe dazugehörige Datenverarbeitung" ohne konkrete Aussage + - duration_floskel: "solange erforderlich" ohne Zeitangabe + - vendor_unklar: "möglicherweise Drittanbieter" ohne Liste + - retention_inkonsistent: Tabelle nennt Tage, Fließtext nennt "session" + - drittland_fehlend: US-Vendor genannt (Google, Meta) aber Schrems-II + nicht thematisiert + - banner_reopen_fehlt: "Cookie-Einstellungen ändern" Link fehlt +""" + + +class CookiePolicyAgent(BaseSpecialistAgent): + agent_id = "cookie_policy" + agent_version = "1.0" + doc_type = "cookie" + owned_mc_ids = MC_IDS + + async def evaluate(self, agent_input: AgentInput) -> AgentOutput: + start = datetime.now(timezone.utc) + text = (agent_input.text or "").strip() + coverage: list[McCoverage] = [] + findings: list[Finding] = [] + esc_logs: list[EscalationLog] = [] + + if len(text) < 100: + for mc in MCS: + coverage.append(McCoverage( + mc_id=mc.mc_id, status="skipped", + reason="cookie policy text too short or empty", + )) + return self._finalize( + start, findings, esc_logs, coverage, confidence=0.0, + notes="Cookie-Policy-Text zu kurz oder leer.", + ) + + for mc in MCS: + matched = [p for p in mc.patterns if p.search(text)] + if mc.require_all: + ok = len(matched) == len(mc.patterns) + else: + ok = bool(matched) + if ok: + coverage.append(McCoverage( + mc_id=mc.mc_id, status="ok", + reason=f"{len(matched)}/{len(mc.patterns)} patterns hit", + )) + continue + sev = self._sev(mc.severity_if_missing) + action = self._build_action(mc) + findings.append(Finding( + check_id=f"COOKIE-POLICY-AGENT-{mc.field_id.upper()}", + agent=self.agent_id, + agent_version=self.agent_version, + field_id=mc.field_id, + severity=sev, + severity_reason="missing", + title=f"Cookie-Policy-Lücke: '{mc.label}'", + norm=mc.norm, + action=action, + confidence=0.92, + sources=[EvidenceSource( + source_type=SourceType.MC, + source_id=mc.mc_id, + detail=f"0/{len(mc.patterns)} pattern hit", + )], + )) + coverage.append(McCoverage( + mc_id=mc.mc_id, + status=sev.value.lower(), + reason="missing", + )) + + # KB-Layer: wenn cmp_vendors im Kontext, checke ob die Policy + # alle beobachteten Vendoren erwähnt + kb_findings = self._kb_layer(text, agent_input.context or {}) + findings.extend(kb_findings) + + # LLM-Eskalation für subtile Lücken (Pseudo-Zwecke, Floskeln) + llm_findings, llm_logs = await self._maybe_escalate(text) + esc_logs.extend(llm_logs) + seen = {f.field_id for f in findings if f.field_id} + for f in llm_findings: + if f.field_id and f.field_id in seen: + continue + findings.append(f) + + confs = [f.confidence for f in findings if f.confidence] or [0.95] + overall = sum(confs) / len(confs) + + return self._finalize(start, findings, esc_logs, coverage, + confidence=overall) + + def _kb_layer( + self, text: str, context: dict, + ) -> list[Finding]: + """Wenn cmp_vendors gegeben: prüfe ob alle Vendoren in der Policy + erwähnt werden. Sonst Skip (keine Cross-Check ohne Datenbasis).""" + cmp_vendors = context.get("cmp_vendors") or [] + if not cmp_vendors: + return [] + text_lc = text.lower() + # Extrahiere Top-Vendor-Namen aus dem CMP + seen_names: set[str] = set() + for v in cmp_vendors: + if not isinstance(v, dict): + continue + name = (v.get("name") or v.get("vendor") or "").strip() + if name and len(name) > 2: + seen_names.add(name) + missing: list[str] = [] + for n in sorted(seen_names): + if n.lower() not in text_lc: + missing.append(n) + if not missing: + return [] + # Ein Sammel-Finding pro Lücke + sample = missing[:8] + return [Finding( + check_id="COOKIE-POLICY-AGENT-CMP-VS-POLICY", + agent=self.agent_id, + agent_version=self.agent_version, + field_id="vendor_consistency", + severity=Severity.MEDIUM, + severity_reason="cmp_observed_vendors_not_in_policy", + title=( + f"{len(missing)} im CMP beobachtete Vendor(en) " + "fehlen in der Cookie-Policy" + ), + norm="DSGVO Art. 13 Abs. 1 lit. e (Empfänger vollständig nennen)", + evidence=f"Fehlend: {', '.join(sample)}" + + (" …" if len(missing) > 8 else ""), + action=( + "Die im Cookie-Consent-Banner beobachteten Vendoren " + "(Tracker/Werbenetzwerke) müssen vollständig in der " + "Cookie-Richtlinie aufgelistet sein." + ), + confidence=0.88, + sources=[EvidenceSource( + source_type=SourceType.MC, + source_id="CMP-CROSS-CHECK", + detail=f"{len(missing)} missing of {len(seen_names)}", + )], + )] + + async def _maybe_escalate( + self, text: str, + ) -> tuple[list[Finding], list[EscalationLog]]: + user_prompt = ( + f"COOKIE-POLICY-TEXT:\n{text[:4500]}\n\n" + "Liste subtile Lücken nach TDDDG § 25 + DSGVO Art. 13. " + "Nur JSON." + ) + res, logs = await cascade(_SYSTEM_PROMPT, user_prompt) + if res is None or not isinstance(res.parsed, (dict, list)): + return [], logs + raw = (res.parsed.get("findings") + if isinstance(res.parsed, dict) else res.parsed) + if not isinstance(raw, list): + return [], logs + out: list[Finding] = [] + for item in raw: + if not isinstance(item, dict): + continue + fid = str(item.get("field_id") or "unknown")[:40] + sev_raw = str(item.get("severity") or "MEDIUM").upper() + sev = self._sev(sev_raw) + out.append(Finding( + check_id=f"COOKIE-POLICY-AGENT-LLM-{fid.upper()}", + agent=self.agent_id, + agent_version=self.agent_version, + field_id=fid, + severity=sev, + severity_reason="llm_detected", + title=str(item.get("title") or "")[:200], + norm="TDDDG § 25 + DSGVO Art. 13 (LLM-Analyse)", + evidence=str(item.get("evidence") or "")[:300], + action=str(item.get("action") or "")[:400], + confidence=0.7, + sources=[EvidenceSource( + source_type=res.stage, + source_id=res.model, + detail=f"prompt_chars={len(user_prompt)}", + confidence=0.7, + )], + )) + return out, logs + + def _finalize( + self, + start: datetime, + findings: list[Finding], + esc_logs: list[EscalationLog], + coverage: list[McCoverage], + confidence: float, + notes: str = "", + ) -> AgentOutput: + end = datetime.now(timezone.utc) + recs = rollup(findings) + out = AgentOutput( + agent=self.agent_id, + agent_version=self.agent_version, + started_at=start, + finished_at=end, + duration_ms=int((end - start).total_seconds() * 1000), + findings=findings, + recommendations=recs, + mc_coverage=coverage, + escalation_log=esc_logs, + confidence=confidence, + notes=notes, + mc_total=len(coverage), + mc_ok=sum(1 for c in coverage if c.status == "ok"), + mc_na=sum(1 for c in coverage if c.status == "na"), + mc_high=sum(1 for c in coverage if c.status == "high"), + mc_medium=sum(1 for c in coverage if c.status == "medium"), + mc_low=sum(1 for c in coverage if c.status == "low"), + ) + return lint_output(out) + + @staticmethod + def _sev(value: str) -> Severity: + v = (value or "").upper() + if v == "HIGH": + return Severity.HIGH + if v == "MEDIUM": + return Severity.MEDIUM + if v == "LOW": + return Severity.LOW + return Severity.INFO + + @staticmethod + def _build_action(mc) -> str: + suggestions = { + "categories_named": ( + "Die Cookie-Richtlinie sollte die Kategorien essentiell, " + "funktional, analytics und marketing klar benennen und " + "abgrenzen." + ), + "purpose_described": ( + "Pro Cookie-Kategorie den Verarbeitungszweck konkret " + "benennen (keine Pauschal-Formulierungen wie " + "'verschiedene Zwecke')." + ), + "retention_duration": ( + "Speicherdauer pro Cookie konkret angeben " + "(z.B. 'Session', '30 Tage', '2 Jahre') statt " + "'solange erforderlich'." + ), + "vendor_recipients": ( + "Alle Empfänger / Drittanbieter namentlich auflisten " + "(z.B. Google LLC, Meta Platforms Inc., …) inkl. Sitz." + ), + "opt_out_mechanism": ( + "Konkreten Opt-Out-Weg beschreiben: Banner-Reopen-Link, " + "Browser-Einstellungen, Vendor-spezifische Opt-Out-URLs." + ), + "banner_reopen": ( + "Sichtbaren Link 'Cookie-Einstellungen ändern' in die " + "Policy aufnehmen, der den CMP-Banner wieder öffnet." + ), + "version_date": ( + "Stand der Cookie-Richtlinie sichtbar angeben " + "(z.B. 'Stand: 1. Juni 2026')." + ), + "third_country_transfer": ( + "Bei Drittland-Transfer (USA u.a.) Hinweis auf " + "Schrems-II-Risiko + verwendete Schutzmaßnahmen " + "(SCC, DPF) ergänzen." + ), + "legal_basis": ( + "Rechtsgrundlage pro Kategorie benennen: § 25 Abs. 1 " + "TDDDG (Einwilligung) bzw. § 25 Abs. 2 TDDDG " + "(unbedingt erforderlich)." + ), + "cookie_table_or_list": ( + "Detail-Tabelle mit Cookie-Namen, Vendor, Zweck und " + "Laufzeit pro Cookie ergänzen (DSK-Best-Practice)." + ), + "dpo_contact": ( + "Kontaktmöglichkeit zum DSB oder Datenschutz-Team " + "in der Cookie-Richtlinie nennen (z.B. " + "datenschutz@)." + ), + "browser_settings_hint": ( + "Hinweis auf Browser-Einstellungen zum Blockieren/" + "Löschen von Cookies (Chrome, Firefox, Safari, Edge) " + "ergänzen." + ), + } + return suggestions.get(mc.field_id, ( + f"{mc.label} in der Cookie-Richtlinie ergänzen " + f"({mc.norm})." + )) diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/mcs.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/mcs.py new file mode 100644 index 00000000..da971135 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/mcs.py @@ -0,0 +1,225 @@ +"""Cookie-Policy-Agent Machine-Checks. + +Pflichten an eine Cookie-Richtlinie nach TDDDG + DSGVO Art. 13 + +EuGH Planet49 + BGH Cookie-II: + - Cookie-Kategorien benannt (essential / functional / analytics / + marketing) + - Pro Cookie: Name, Zweck, Speicherdauer, Empfänger/Vendor, Drittland + - Opt-Out-Mechanik beschrieben + Banner-Reopen-Möglichkeit verlinkt + - Versionsdatum / Stand + - Drittland-Hinweise bei US-Transfer (Schrems II) + - Verweis auf Browser-Einstellungen (Best-Practice) + - Kontakt für Datenschutz (DSB oder verantwortliche Stelle) +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Pattern + + +@dataclass(frozen=True) +class MC: + mc_id: str + field_id: str + label: str + norm: str + patterns: tuple[Pattern[str], ...] = field(default_factory=tuple) + severity_if_missing: str = "MEDIUM" + # Eines der Patterns reicht (any) — default. False = ALLE müssen matchen + require_all: bool = False + + +_CATEGORIES = ( + r"essenti(?:ell|al)|notwendig|technisch\s+notwendig|unbedingt\s+erforderlich", + r"funktional|funktionelle?|preference|komfort", + r"analyti(?:sch|cs)|statisti(?:k|sch)|leistung|performance", + r"marketing|werbung|advertising|tracking|targeting", +) + + +MCS: tuple[MC, ...] = ( + MC( + mc_id="CP-MC-001", + field_id="categories_named", + label="Cookie-Kategorien benannt (essential/functional/analytics/marketing)", + norm="DSGVO Art. 13 + TDDDG § 25 (Transparenzpflicht)", + severity_if_missing="HIGH", + require_all=False, + patterns=tuple(re.compile(p, re.IGNORECASE) for p in _CATEGORIES), + ), + MC( + mc_id="CP-MC-002", + field_id="purpose_described", + label="Zweck pro Kategorie beschrieben", + norm="DSGVO Art. 13 Abs. 1 lit. c (Zweck der Verarbeitung)", + severity_if_missing="HIGH", + patterns=( + re.compile(r"\bzweck(?:e)?\b", re.IGNORECASE), + re.compile(r"verwendet\s+wir", re.IGNORECASE), + re.compile(r"dient\s+(?:zur?|dem|der|den)", re.IGNORECASE), + re.compile(r"erm(?:ö|oe)glich(?:t|en)", re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-003", + field_id="retention_duration", + label="Speicherdauer pro Cookie angegeben", + norm="DSGVO Art. 13 Abs. 2 lit. a (Speicherdauer)", + severity_if_missing="HIGH", + patterns=( + re.compile(r"\b(?:laufzeit|speicherdauer|dauer|gültig)", + re.IGNORECASE), + re.compile(r"\b\d+\s*(?:tag|tage|monat|jahr|stunden|min)", + re.IGNORECASE), + re.compile(r"session(?:-?cookie)?", re.IGNORECASE), + re.compile(r"persistent(?:e|er)?", re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-004", + field_id="vendor_recipients", + label="Empfänger/Vendoren genannt (Art. 13 Abs. 1 lit. e)", + norm="DSGVO Art. 13 Abs. 1 lit. e (Empfänger)", + severity_if_missing="HIGH", + patterns=( + re.compile(r"empf(?:ä|ae)nger|recipient", re.IGNORECASE), + re.compile(r"dritt(?:e|er)\s+anbieter|drittanbieter|" + r"third[\-\s]?party", re.IGNORECASE), + re.compile(r"\b(?:Google|Meta|Facebook|Microsoft|Hotjar|" + r"LinkedIn|Adobe|TikTok|Cloudflare|Akamai|" + r"Salesforce|HubSpot|Pinterest|Twitter|X\.com)\b", + re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-005", + field_id="opt_out_mechanism", + label="Opt-Out-Mechanik dokumentiert", + norm="DSGVO Art. 7 Abs. 3 (Widerruf der Einwilligung)", + severity_if_missing="HIGH", + patterns=( + re.compile(r"opt[\-\s]?out|widerruf|widersprechen|" + r"abw(?:ä|ae)hlen|deaktivier", + re.IGNORECASE), + re.compile(r"einstellungen\s+(?:[\wäöüÄÖÜ]+\s+)?" + r"(?:ä|ae)ndern|" + r"einwilligung\s+widerrufen|" + r"cookie[\-\s]?einstellungen?\b", + re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-006", + field_id="banner_reopen", + label="Banner-Reopen / Cookie-Einstellungen-Link", + norm="TDDDG § 25 (Symmetrie Annehmen/Ablehnen — Reopen Pflicht)", + severity_if_missing="MEDIUM", + patterns=( + re.compile(r"cookie[\-\s]?einstellung(?:en)?\s+(?:ä|ae)ndern|" + r"cookie[\-\s]?einstellungen\s+(?:ö|oe)ffnen|" + r"einstellungen\s+anpassen|" + r"consent(?:[\-\s]?banner)?\s+(?:erneut|" + r"wieder)?\s*(?:ö|oe)ffnen|" + r"einwilligung\s+verwalten|" + r"cookie[\-\s]?pr(?:ä|ae)ferenzen", + re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-007", + field_id="version_date", + label="Versionsdatum / Stand angegeben", + norm="DSGVO Rechenschaftspflicht (Art. 5 Abs. 2)", + severity_if_missing="MEDIUM", + patterns=( + re.compile(r"stand(?:\s+vom)?\s*[:\-]?\s*\d{1,2}", + re.IGNORECASE), + re.compile(r"letzte\s+(?:aktualisierung|(?:ä|ae)nderung)", + re.IGNORECASE), + re.compile(r"version\s*[:\-]?\s*\d", re.IGNORECASE), + re.compile(r"g(?:ü|ue)ltig\s+ab\s+\d{1,2}", re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-008", + field_id="third_country_transfer", + label="Drittland-Hinweis (Schrems-II / US-Transfer)", + norm="DSGVO Art. 44 ff. + EuGH Schrems II", + severity_if_missing="MEDIUM", + patterns=( + re.compile(r"dritt(?:land|staat)|USA?|vereinigte\s+staaten|" + r"international(?:e[rn]?)?\s+(?:transfer|" + r"daten(?:ü|ue)bermittlung)|" + r"(?:standardvertragsklauseln|SCC|" + r"angemessenheitsbeschluss|" + r"adequacy\s+decision|DPF|" + r"Data\s+Privacy\s+Framework)", + re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-009", + field_id="legal_basis", + label="Rechtsgrundlage benannt (Art. 6 / § 25 TDDDG)", + norm="DSGVO Art. 13 Abs. 1 lit. c + TDDDG § 25", + severity_if_missing="MEDIUM", + patterns=( + re.compile(r"art\.?\s*6\s+(?:abs\.?\s*1\s+)?lit\.?\s*[a-f]", + re.IGNORECASE), + re.compile(r"§\s*25\s+TDDDG", re.IGNORECASE), + re.compile(r"rechtsgrundlage|einwilligung\s+(?:ist|liegt)|" + r"berechtigtes\s+interesse", + re.IGNORECASE), + ), + ), + MC( + mc_id="CP-MC-010", + field_id="cookie_table_or_list", + label="Cookie-Tabelle / Liste pro Cookie", + norm="Best-Practice DSK + EDPB Guidelines 03/2022", + severity_if_missing="MEDIUM", + patterns=( + # Konkrete Cookie-Namen oder typische Hosts + re.compile(r"\b(?:_ga|_gid|_fbp|_fbc|_pk_id|" + r"OptanonConsent|JSESSIONID|cf_clearance|" + r"PHPSESSID|wp-settings|wordpress_logged_in)", + re.IGNORECASE), + re.compile(r" AgentOutput: + start = datetime.now(timezone.utc) + text = (agent_input.text or "").strip() + scope = set(agent_input.business_scope or []) + # Auto-detect KFZ + is_automotive = detect_automotive(text) + if is_automotive: + scope.add("automotive") + + mc_findings: list[Finding] = [] + coverage: list[McCoverage] = [] + + if len(text) < 50: + # Doc zu kurz → alle als skipped + for mc in MCS: + coverage.append(McCoverage( + mc_id=mc.mc_id, status="skipped", + reason="doc too short or empty", + )) + return self._finalize( + start, mc_findings, [], coverage, confidence=0.0, + notes="Impressum-Text zu kurz oder leer.", + ) + + for mc in MCS: + if not scope_matches(mc, scope, is_automotive): + coverage.append(McCoverage( + mc_id=mc.mc_id, status="na", + reason=f"scope mismatch (needs {mc.requires_scope})", + )) + continue + found = any(p.search(text) for p in mc.patterns) + if found: + coverage.append(McCoverage( + mc_id=mc.mc_id, status="ok", + )) + continue + # Missing → Finding + sev = self._sev(mc.severity_if_missing) + action = self._build_action(mc, is_automotive) + mc_findings.append(Finding( + check_id=f"IMPRESSUM-AGENT-{mc.field_id.upper()}", + agent=self.agent_id, + agent_version=self.agent_version, + field_id=mc.field_id, + severity=sev, + severity_reason="missing", + title=f"Pflichtangabe '{mc.label}' fehlt im Impressum", + norm=mc.norm, + evidence="", + action=action, + confidence=0.95, + sources=[EvidenceSource( + source_type=SourceType.MC, + source_id=mc.mc_id, + detail=f"regex check {len(mc.patterns)} pattern(s) negative", + )], + )) + coverage.append(McCoverage( + mc_id=mc.mc_id, + status=sev.value.lower(), + reason="missing", + )) + + # Eskalation: für die identifizierten Lücken kann ein LLM + # zusätzliche Tiefen-Findings liefern (z.B. "Geschäftsführer + # genannt, aber ohne Nachname"). Confidence der MC-Findings + # ist hoch — eskalieren wir wegen "weitere subtile Lücken + # finden", nicht weil wir unsicher sind. + esc_findings, esc_logs = await self._maybe_escalate(text, scope) + + # Dedup per field_id (MC hat Priorität) + seen_fields = {f.field_id for f in mc_findings if f.field_id} + for f in esc_findings: + if f.field_id and f.field_id in seen_fields: + continue + mc_findings.append(f) + + # Confidence: harmonisches Mittel über alle Finding-Confidences + if mc_findings: + confs = [f.confidence for f in mc_findings if f.confidence] + overall = sum(confs) / len(confs) if confs else 0.8 + else: + overall = 0.95 # nichts gefunden → alle MCs ok + + return self._finalize( + start, mc_findings, esc_logs, coverage, confidence=overall, + ) + + async def _maybe_escalate( + self, text: str, scope: set[str], + ) -> tuple[list[Finding], list[EscalationLog]]: + """LLM-Stage. Aktivierbar via Agent-Settings (default an).""" + # Hard cap auf Text-Größe für den LLM-Pass + user_prompt = ( + f"BUSINESS-SCOPE: {', '.join(sorted(scope))}\n\n" + f"IMPRESSUM-TEXT:\n{text[:4000]}\n\n" + "Liste subtile Lücken nach § 5 TMG. Nur JSON." + ) + res, logs = await cascade(_SYSTEM_PROMPT, user_prompt) + if res is None or not isinstance(res.parsed, (dict, list)): + return [], logs + raw = (res.parsed.get("findings") + if isinstance(res.parsed, dict) else res.parsed) + if not isinstance(raw, list): + return [], logs + out: list[Finding] = [] + for item in raw: + if not isinstance(item, dict): + continue + fid = str(item.get("field_id") or "unknown")[:40] + sev_raw = str(item.get("severity") or "MEDIUM").upper() + sev = self._sev(sev_raw) + out.append(Finding( + check_id=f"IMPRESSUM-AGENT-LLM-{fid.upper()}", + agent=self.agent_id, + agent_version=self.agent_version, + field_id=fid, + severity=sev, + severity_reason="llm_detected", + title=str(item.get("title") or "")[:200], + norm="§ 5 TMG / DDG (LLM-Analyse)", + evidence=str(item.get("evidence") or "")[:300], + action=str(item.get("action") or "")[:400], + confidence=0.7, + sources=[EvidenceSource( + source_type=res.stage, + source_id=res.model, + detail=f"prompt_chars={len(user_prompt)}", + confidence=0.7, + )], + )) + return out, logs + + def _finalize( + self, + start: datetime, + findings: list[Finding], + esc_logs: list[EscalationLog], + coverage: list[McCoverage], + confidence: float, + notes: str = "", + ) -> AgentOutput: + end = datetime.now(timezone.utc) + recs = rollup(findings) + out = AgentOutput( + agent=self.agent_id, + agent_version=self.agent_version, + started_at=start, + finished_at=end, + duration_ms=int((end - start).total_seconds() * 1000), + findings=findings, + recommendations=recs, + mc_coverage=coverage, + escalation_log=esc_logs, + confidence=confidence, + notes=notes, + mc_total=len(coverage), + mc_ok=sum(1 for c in coverage if c.status == "ok"), + mc_na=sum(1 for c in coverage if c.status == "na"), + mc_high=sum(1 for c in coverage if c.status == "high"), + mc_medium=sum(1 for c in coverage if c.status == "medium"), + mc_low=sum(1 for c in coverage if c.status == "low"), + ) + return lint_output(out) + + @staticmethod + def _sev(value: str) -> Severity: + v = (value or "").upper() + if v == "HIGH": + return Severity.HIGH + if v == "MEDIUM": + return Severity.MEDIUM + if v == "LOW": + return Severity.LOW + return Severity.INFO + + @staticmethod + def _build_action(mc, is_automotive: bool) -> str: + if mc.field_id == "aufsichtsbehoerde" and is_automotive: + return ( + "Aufsichtsbehörde im Impressum benennen. Für " + "KFZ-Hersteller/-Vertrieb typisch: Kraftfahrt-" + "Bundesamt (KBA), Fördestraße 16, 24944 Flensburg, " + "www.kba.de. Bei Ladestrom-Vertrieb zusätzlich " + "Bundesnetzagentur (BNetzA)." + ) + return ( + f"{mc.label} im Impressum ergänzen " + f"(Pflichtangabe nach {mc.norm})." + ) diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py b/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py new file mode 100644 index 00000000..f7d1a04e --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py @@ -0,0 +1,234 @@ +"""Machine-Check-Definitionen für den Impressum-Agent. + +Eine MC = ein abgegrenzter, deterministischer Check über das +Impressum-Dokument. Owner = impressum-agent. + +Quelle: § 5 TMG / DDG + § 18 MStV + § 36 VSBG + Art. 14 EU-VO 524/2013. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Pattern + + +@dataclass(frozen=True) +class MC: + """Eine Machine-Check-Definition.""" + mc_id: str # IMP-MC-001 ... + field_id: str # name_anbieter, handelsregister, ... + label: str + norm: str + patterns: tuple[Pattern[str], ...] = field(default_factory=tuple) + severity_if_missing: str = "MEDIUM" # HIGH | MEDIUM | LOW | INFO + requires_scope: tuple[str, ...] = field(default_factory=tuple) + # Wenn True: bei Scope-Mismatch nicht-applicable melden, sonst skip + explicit_na: bool = True + + +MCS: tuple[MC, ...] = ( + MC( + mc_id="IMP-MC-001", + field_id="name_anbieter", + label="Name + Anschrift des Anbieters", + norm="§ 5 Abs. 1 Nr. 1 TMG", + severity_if_missing="HIGH", + patterns=( + re.compile( + r"\b(?:Anbieter|Diensteanbieter|" + r"Verantwortlich(?:er Anbieter)?)\s*[:.\s]", + re.IGNORECASE, + ), + # Label-free fallback: Firma (Rechtsform) + Adresse + re.compile( + r"\b[A-ZÄÖÜ][\w\-\& ]{1,80}?\s+" + r"(?:GmbH|AG|UG|KG|SE|GbR|OHG|Limited|Ltd|LLC)\s*" + r"[\s\S]{0,400}?" + r"\b\d{5}\s+[A-ZÄÖÜ]", + re.IGNORECASE, + ), + ), + ), + MC( + mc_id="IMP-MC-002", + field_id="kontakt_email", + label="Email-Adresse", + norm="§ 5 Abs. 1 Nr. 2 TMG", + severity_if_missing="HIGH", + patterns=(re.compile(r"\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b", + re.IGNORECASE),), + ), + MC( + mc_id="IMP-MC-003", + field_id="kontakt_telefon", + label="Telefon", + norm="§ 5 Abs. 1 Nr. 2 TMG", + severity_if_missing="MEDIUM", + patterns=(re.compile( + r"(?:Tel(?:efon)?|Phone)\.?\s*[:.\s]\s*[\+\d][\d\s/\-()]{5,}", + re.IGNORECASE, + ),), + ), + MC( + mc_id="IMP-MC-004", + field_id="handelsregister", + label="Handelsregister-Eintrag", + norm="§ 5 Abs. 1 Nr. 4 TMG", + severity_if_missing="HIGH", + patterns=( + re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE), + re.compile(r"Handelsregister", re.IGNORECASE), + ), + ), + MC( + mc_id="IMP-MC-005", + field_id="ust_id", + label="USt-IdNr", + norm="§ 5 Abs. 1 Nr. 6 TMG", + severity_if_missing="MEDIUM", + patterns=( + re.compile( + r"\b(?:USt-?Id(?:Nr)?\.?|VAT(?:-?Id)?)\s*[:.\s]", + re.IGNORECASE, + ), + re.compile(r"\bDE\d{9}\b"), + ), + ), + MC( + mc_id="IMP-MC-006", + field_id="vertretungsberechtigte", + label="Vertretungsberechtigte Person", + norm="§ 5 Abs. 1 Nr. 1 TMG (juristische Personen)", + severity_if_missing="HIGH", + patterns=( + re.compile( + r"(?:Gesch(?:ae|ä)ftsf(?:ue|ü)hrer|" + r"Vertretungsberechtigt|vertreten\s+durch)" + r"\s*[:.\s]", + re.IGNORECASE, + ), + re.compile(r"\bManagement\s*[:.\s]\s*[A-ZÄÖÜ]", + re.IGNORECASE), + re.compile(r"\bDirector(?:s|en)?\s*[:.\s]\s*[A-ZÄÖÜ]", + re.IGNORECASE), + ), + ), + MC( + mc_id="IMP-MC-007", + field_id="vertretungsberechtigte_label_korrekt", + label="Deutsches Label 'Geschäftsführer' statt 'Management'", + norm="§ 5 Abs. 1 Nr. 1 TMG (Deutsch-Pflicht, gerichtsfest)", + severity_if_missing="MEDIUM", + patterns=(re.compile( + r"(?:Gesch(?:ae|ä)ftsf(?:ue|ü)hrer|" + r"Vorstand|" + r"Vertretungsberechtigt|vertreten\s+durch)" + r"\s*[:.\s]", + re.IGNORECASE, + ),), + ), + MC( + mc_id="IMP-MC-008", + field_id="aufsichtsbehoerde", + label="Aufsichtsbehörde (regulierte Branchen)", + norm="§ 5 Abs. 1 Nr. 3 TMG (Branchen-bedingt)", + severity_if_missing="LOW", + requires_scope=("regulated_profession", "financial_services", + "insurance", "automotive"), + patterns=( + re.compile(r"Aufsichtsbeh(?:ö|oe)rde\s*[:.\s]", + re.IGNORECASE), + re.compile( + r"\bBAFin\b|\bBNetzA\b|\bLKA\b|\bKBA\b|" + r"Kraftfahrt-?Bundesamt|Bundesnetzagentur|" + r"Bundesanstalt\s+f(?:ü|ue)r", + re.IGNORECASE, + ), + ), + ), + MC( + mc_id="IMP-MC-009", + field_id="verantwortlicher_redaktion", + label="Verantwortlicher § 18 MStV (journalistisch-redaktionell)", + norm="§ 18 MStV (bei Blog/News/Magazin/Newsroom Pflicht)", + severity_if_missing="MEDIUM", + requires_scope=("editorial",), + patterns=(re.compile( + r"(?:Verantwortlich(?:er|e)?\s+(?:f(?:ue|ü)r|i\.S\.d\.|" + r"nach|gem(?:ae|ä)ß)\s+§\s*18|" + r"V\.i\.S\.d\.\s*§?\s*18|" + r"redaktionell\s+Verantwortlich)", + re.IGNORECASE, + ),), + ), + MC( + mc_id="IMP-MC-010", + field_id="verbraucher_streitbeilegung", + label="Verbraucher-Streitbeilegung-Hinweis", + norm="§ 36 VSBG (B2C-Anbieter Pflicht)", + severity_if_missing="MEDIUM", + requires_scope=("ecommerce", "b2c"), + patterns=(re.compile( + r"(?:Verbraucherschlichtungs|VSBG|" + r"Streitbeilegung|" + r"Schlichtungsstelle|" + r"alternative\s+Streit(?:beilegung|schlichtung))", + re.IGNORECASE, + ),), + ), + MC( + mc_id="IMP-MC-011", + field_id="berufsangaben", + label="Berufsbezeichnung + Berufsrechtliche Angaben", + norm="§ 5 Abs. 1 Nr. 5 TMG (Kammerberufe)", + severity_if_missing="LOW", + requires_scope=("regulated_profession",), + patterns=(re.compile( + r"Berufsbezeichnung|Berufsordnung|Kammer", re.IGNORECASE, + ),), + ), + MC( + mc_id="IMP-MC-012", + field_id="odr_link", + label="OS-Link auf EU-Plattform", + norm="Art. 14 EU-VO 524/2013 (B2C-Onlineshops)", + severity_if_missing="MEDIUM", + requires_scope=("ecommerce",), + patterns=(re.compile(r"ec\.europa\.eu/consumers/odr", + re.IGNORECASE),), + ), +) + + +# Public list of all MC-IDs for the Registry +MC_IDS: tuple[str, ...] = tuple(m.mc_id for m in MCS) + + +def scope_matches(mc: MC, scope: set[str], is_automotive: bool) -> bool: + """Entscheidet ob die MC auf den Business-Scope anwendbar ist.""" + if not mc.requires_scope: + return True + if mc.field_id == "aufsichtsbehoerde" and is_automotive: + return True + return any(s in scope for s in mc.requires_scope) + + +def detect_automotive(text: str) -> bool: + """KFZ-Hersteller/-Vertrieb → triggert KBA-Hint.""" + if re.search( + r"\b(?:KFZ|Fahrzeug(?:e|herstellung|verkauf)?|Automobil|" + r"E-Auto|Elektroauto|Auto-?Konfigurator|" + r"Elektrofahrzeug|Hybrid-?Fahrzeug)\b", + text, re.IGNORECASE, + ): + return True + return bool(re.search( + r"\b(?:Tesla|BMW|Mercedes-?Benz|Audi|Volkswagen|Porsche|" + r"Volvo|Stellantis|Skoda|Seat|Cupra|MINI|Smart|" + r"Opel|Ford\s+Deutschland|Hyundai|Kia|Toyota|Mazda|" + r"Nissan|Honda|Subaru|Lexus|Polestar|NIO|BYD|Rivian|" + r"Lucid)\s+(?:Germany|Deutschland|Group|Holding|AG|" + r"GmbH|S(?:E|\.A\.))\b", + text, re.IGNORECASE, + )) diff --git a/backend-compliance/tests/test_specialist_cookie_policy.py b/backend-compliance/tests/test_specialist_cookie_policy.py new file mode 100644 index 00000000..da95f370 --- /dev/null +++ b/backend-compliance/tests/test_specialist_cookie_policy.py @@ -0,0 +1,143 @@ +"""Tests für Cookie-Policy-Agent.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from compliance.services.specialist_agents import ( + REGISTRY, + AgentInput, + CookiePolicyAgent, + Severity, +) + + +FULL_POLICY = """Cookie-Richtlinie + +Stand: 1. Juni 2026 + +Wir verwenden auf unserer Website verschiedene Cookies. Diese werden +in folgende Kategorien eingeteilt: + +1. Essentielle Cookies (unbedingt erforderlich) + Zweck: Diese Cookies dienen der grundlegenden Funktion der Website. + Rechtsgrundlage: § 25 Abs. 2 TDDDG + Laufzeit: Session + +2. Funktionale Cookies + Zweck: Speichern Ihre Präferenzen wie Sprache und Region. + Rechtsgrundlage: Art. 6 Abs. 1 lit. a DSGVO + Laufzeit: 30 Tage + +3. Analytics-Cookies (Performance) + Drittanbieter: Google LLC, USA + Zweck: Nutzungsstatistiken erheben. + Laufzeit: 24 Monate + Cookies: _ga, _gid + Drittland: USA — Standardvertragsklauseln + Data Privacy Framework + +4. Marketing-Cookies (Tracking) + Drittanbieter: Meta Platforms Inc., USA + Cookies: _fbp, _fbc + Laufzeit: 90 Tage + +Sie können Ihre Cookie-Einstellungen jederzeit ändern über den Link +unten oder das Banner erneut öffnen. + +Browser-Einstellungen: Auch in Chrome, Firefox, Safari und Edge +können Sie Cookies blockieren oder löschen. + +Kontakt: datenschutz@example.com +Datenschutzbeauftragter: Max Mustermann +""" + + +GAPPY_POLICY = """Cookies + +Wir verwenden Cookies um die Website zu betreiben. +Cookies werden so lange gespeichert wie nötig. +""" + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_agent_is_registered(): + agent = REGISTRY.get("cookie_policy") + assert agent is not None + assert agent.doc_type == "cookie" + + +def test_short_text_skipped(monkeypatch): + async def _no_cascade(*a, **kw): return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.cookie_policy.agent.cascade", + _no_cascade, + ) + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput(doc_type="cookie", text="x"))) + assert out.mc_total > 0 + assert all(c.status == "skipped" for c in out.mc_coverage) + + +def test_full_policy_has_few_high_findings(monkeypatch): + async def _no_cascade(*a, **kw): return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.cookie_policy.agent.cascade", + _no_cascade, + ) + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput(doc_type="cookie", text=FULL_POLICY))) + high = [f for f in out.findings if f.severity == Severity.HIGH.value] + assert not high, f"unexpected HIGH findings: {[f.field_id for f in high]}" + + +def test_gappy_policy_triggers_high(monkeypatch): + async def _no_cascade(*a, **kw): return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.cookie_policy.agent.cascade", + _no_cascade, + ) + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput(doc_type="cookie", + text=GAPPY_POLICY))) + field_ids = {f.field_id for f in out.findings} + # 4 Kategorien fehlen, Vendoren fehlen, Opt-Out fehlt, Tabelle fehlt + assert "categories_named" in field_ids + assert "vendor_recipients" in field_ids + assert "opt_out_mechanism" in field_ids + + +def test_cmp_vendor_cross_check_emits_finding(monkeypatch): + async def _no_cascade(*a, **kw): return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.cookie_policy.agent.cascade", + _no_cascade, + ) + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput( + doc_type="cookie", text=FULL_POLICY, + context={"cmp_vendors": [ + {"name": "Hotjar"}, # NICHT in Policy + {"name": "Google LLC"}, # IN Policy + ]}, + ))) + field_ids = {f.field_id for f in out.findings} + assert "vendor_consistency" in field_ids + cmp_f = next(f for f in out.findings + if f.field_id == "vendor_consistency") + assert "Hotjar" in cmp_f.evidence + assert "Google" not in cmp_f.evidence + + +def test_recommendations_are_built(): + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput(doc_type="cookie", + text=GAPPY_POLICY))) + assert out.recommendations + # Jede Recommendation hat mind. ein related_finding + for r in out.recommendations: + assert r.related_finding_ids diff --git a/backend-compliance/tests/test_specialist_evidence_vault.py b/backend-compliance/tests/test_specialist_evidence_vault.py new file mode 100644 index 00000000..0b897862 --- /dev/null +++ b/backend-compliance/tests/test_specialist_evidence_vault.py @@ -0,0 +1,93 @@ +"""Tests für Evidence-Vault.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + + +@pytest.fixture +def tmp_vault(tmp_path, monkeypatch): + monkeypatch.setenv("EVIDENCE_VAULT_ROOT", str(tmp_path)) + import compliance.services.specialist_agents._evidence_vault as v + yield v + + +def test_open_vault_creates_structure(tmp_vault): + vault = tmp_vault.open_vault("impressum", "2.0") + assert vault.root.exists() + for sub in ("screenshots", "videos", "csv", "findings", "raw"): + assert (vault.root / sub).is_dir() + assert vault.manifest_path.exists() + + +def test_put_bytes_appends_manifest(tmp_vault): + vault = tmp_vault.open_vault("impressum", "2.0") + rel = vault.put_bytes("screenshot", "url1", "test.png", + b"\x89PNG\r\n\x1a\n", mime="image/png") + assert rel.startswith("screenshots/") + assert (vault.root / rel).exists() + assets = vault.list_assets() + assert len(assets) == 1 + assert assets[0]["sha256"] + assert assets[0]["mime"] == "image/png" + assert assets[0]["size_bytes"] == 8 + + +def test_put_json_stores_finding(tmp_vault): + vault = tmp_vault.open_vault("cookie_policy", "1.0") + rel = vault.put_json("finding", "url1", "output.json", + {"findings": [{"check_id": "X"}]}) + p = vault.root / rel + data = json.loads(p.read_text()) + assert data["findings"][0]["check_id"] == "X" + + +def test_assets_for_slot_isolation(tmp_vault): + vault = tmp_vault.open_vault("agent", "1.0") + vault.put_bytes("screenshot", "url1", "a.png", b"a") + vault.put_bytes("screenshot", "url2", "b.png", b"b") + vault.put_bytes("video", "url1", "w.mp4", b"v") + assert len(vault.assets_for_slot("url1")) == 2 + assert len(vault.assets_for_slot("url2")) == 1 + + +def test_asset_path_blocks_traversal(tmp_vault): + vault = tmp_vault.open_vault("agent", "1.0") + p = vault.asset_path("../../../etc/passwd") + assert p is None + + +def test_finalize_writes_finished_at(tmp_vault): + vault = tmp_vault.open_vault("agent", "1.0") + snap = vault.finalize() + assert "finished_at" in snap + manifest = json.loads(vault.manifest_path.read_text()) + assert "finished_at" in manifest + + +def test_list_runs_returns_recent(tmp_vault): + tmp_vault.open_vault("a", "1.0", run_id="run1") + tmp_vault.open_vault("b", "1.0", run_id="run2") + runs = tmp_vault.list_runs(limit=10) + ids = {r["run_id"] for r in runs} + assert {"run1", "run2"} <= ids + + +def test_delete_run_removes_dir(tmp_vault): + vault = tmp_vault.open_vault("a", "1.0", run_id="kill-me") + vault.put_bytes("screenshot", "u", "x.png", b"x") + assert tmp_vault.delete_run("kill-me") + assert not vault.root.exists() + assert not tmp_vault.delete_run("kill-me") # idempotent + + +def test_safe_filename_strips_path_chars(tmp_vault): + vault = tmp_vault.open_vault("a", "1.0") + rel = vault.put_bytes("raw", "slot", + "../../etc/passwd", b"x") + assert "passwd" in rel + assert ".." not in rel diff --git a/backend-compliance/tests/test_specialist_impressum_v2.py b/backend-compliance/tests/test_specialist_impressum_v2.py new file mode 100644 index 00000000..7bc0beb4 --- /dev/null +++ b/backend-compliance/tests/test_specialist_impressum_v2.py @@ -0,0 +1,185 @@ +"""Tests für Impressum-Agent v2 (BaseSpecialistAgent).""" + +from __future__ import annotations + +import asyncio + +import pytest + +from compliance.services.specialist_agents import ( + REGISTRY, + AgentInput, + AgentOutput, + ImpressumAgent, + Severity, +) +from compliance.services.specialist_agents._base import ( + FORBIDDEN_OUTPUT_TERMS, + lint_output, + stable_recommendation_id, +) +from compliance.services.specialist_agents._rollup import rollup + + +TESLA_IMPRESSUM = ( + "Tesla Germany GmbH\n" + "Ludwig-Prandtl-Strasse 25-29\n" + "12526 Berlin\n" + "Deutschland\n\n" + "Email: kontakt@tesla.com\n" + "Telefon: +49 89 1250 16 800\n\n" + "Management:\n" + "Elon Musk\n\n" + "Handelsregister: HRB 218904 B, Amtsgericht Charlottenburg\n" +) + +FULL_IMPRESSUM = ( + TESLA_IMPRESSUM + + "\nUSt-IdNr: DE123456789\nGeschäftsführer: Max Mustermann\n" +) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_agent_is_registered(): + agent = REGISTRY.get("impressum") + assert agent is not None + assert agent.doc_type == "impressum" + assert len(agent.owned_mc_ids) >= 10 + + +def test_short_text_skipped(): + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput(doc_type="impressum", text="x"))) + assert out.mc_total > 0 + assert all(c.status == "skipped" for c in out.mc_coverage) + assert not out.findings + + +def test_tesla_missing_german_label(monkeypatch): + # Skip LLM escalation for unit test (no Ollama in CI) + async def _no_cascade(*a, **kw): + return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.cascade", + _no_cascade, + ) + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput( + doc_type="impressum", text=TESLA_IMPRESSUM, + ))) + field_ids = {f.field_id for f in out.findings} + # Tesla pattern: "Management:" matches IMP-MC-006 → present + # But IMP-MC-007 (deutsches Label) MUSS fehlen + assert "vertretungsberechtigte_label_korrekt" in field_ids + # USt fehlt + assert "ust_id" in field_ids + + +def test_full_impressum_has_no_basic_findings(monkeypatch): + async def _no_cascade(*a, **kw): + return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.cascade", + _no_cascade, + ) + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput( + doc_type="impressum", text=FULL_IMPRESSUM, + ))) + # nur scope-dependent fields fehlen (vsbg, odr, redaktion) + high = [f for f in out.findings if f.severity == Severity.HIGH.value] + assert not high, f"unexpected HIGH findings: {[f.field_id for f in high]}" + + +def test_b2c_scope_adds_vsbg(monkeypatch): + async def _no_cascade(*a, **kw): + return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.cascade", + _no_cascade, + ) + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput( + doc_type="impressum", text=TESLA_IMPRESSUM, + business_scope=["b2c", "ecommerce"], + ))) + field_ids = {f.field_id for f in out.findings} + assert "verbraucher_streitbeilegung" in field_ids + assert "odr_link" in field_ids + + +def test_automotive_scope_auto_detected(monkeypatch): + async def _no_cascade(*a, **kw): + return None, [] + monkeypatch.setattr( + "compliance.services.specialist_agents.impressum.agent.cascade", + _no_cascade, + ) + agent = ImpressumAgent() + out = _run(agent.evaluate(AgentInput( + doc_type="impressum", text=TESLA_IMPRESSUM, + ))) + field_ids = {f.field_id for f in out.findings} + assert "aufsichtsbehoerde" in field_ids + # Action MUSS KBA-Hint enthalten + aufsicht = next(f for f in out.findings + if f.field_id == "aufsichtsbehoerde") + assert "KBA" in aufsicht.action or "Kraftfahrt" in aufsicht.action + + +def test_disclaimer_linter_scrubs_forbidden(): + from compliance.services.specialist_agents._base import Finding + from datetime import datetime, timezone + f = Finding( + check_id="X", agent="t", agent_version="1", + severity=Severity.HIGH, + title="Diese Lösung ist rechtssicher und garantiert konform", + action="Voll konform machen", + ) + out = AgentOutput( + agent="t", agent_version="1", + started_at=datetime.now(timezone.utc), + finished_at=datetime.now(timezone.utc), + duration_ms=0, findings=[f], + ) + cleaned = lint_output(out) + assert "rechtssicher" not in cleaned.findings[0].title.lower() + assert "garantiert" not in cleaned.findings[0].title.lower() + assert "linter scrubbed" in cleaned.notes.lower() + + +def test_rollup_bundles_same_action(): + from compliance.services.specialist_agents._base import Finding + fs = [ + Finding(check_id="A", agent="t", agent_version="1", + severity=Severity.HIGH, + title="Lücke 1", action="AVV mit Anbieter X abschließen"), + Finding(check_id="B", agent="t", agent_version="1", + severity=Severity.MEDIUM, + title="Lücke 2", action="AVV mit Anbieter X abschließen."), + Finding(check_id="C", agent="t", agent_version="1", + severity=Severity.LOW, + title="Lücke 3", action="Etwas anderes machen"), + ] + recs = rollup(fs) + assert len(recs) == 2 + bundled = next(r for r in recs if len(r.related_finding_ids) == 2) + assert bundled.severity == Severity.HIGH.value + assert set(bundled.related_finding_ids) == {"A", "B"} + + +def test_stable_recommendation_id_is_deterministic(): + a = stable_recommendation_id("AVV mit Anbieter X abschließen") + b = stable_recommendation_id("avv mit anbieter x abschliessen") + # case insensitive aber Diakritika strict (deutsch ß ≠ ss) + assert len(a) == 16 + assert len(b) == 16 + + +def test_forbidden_terms_complete(): + """Sanity-Test, dass alle wichtigen Wörter im Linter sind.""" + for term in ("rechtssicher", "garantiert", "gesetzeskonform"): + assert any(term in t for t in FORBIDDEN_OUTPUT_TERMS)