From bd4882e143c14cc21381fe39e73f4f363d71846c Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 9 Jun 2026 09:23:12 +0200 Subject: [PATCH] =?UTF-8?q?feat(agents):=20Sprint=201.12=20Phase=202=20?= =?UTF-8?q?=E2=80=94=20Cookie-Policy=20v3=20+=20ImpressumAgent=20v3=20fine?= =?UTF-8?q?tune?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ImpressumAgent v3 (Refactor): - v3_engine: laedt direkt alle 75 doc_check_controls['impressum'] ohne Sidecar-Filter (Sidecar war zu streng, lieferte nur 3 von 75 MCs). - Layer 0 Boost prueft pass+fail_criteria gegen meine 12 Patterns mit erweiterten Initial-Seeds (User-Vorgabe 2026-06-09: manuelle Initial-Seeds OK, Auto-Learning erweitert zur Laufzeit). - ETO-Smoke: 75 DB-MCs · 7 Pattern-Boosts · 24 Boost-Overrides (versus 3 DB-MCs vorher). CookiePolicyAgent v3 (Refactor): - cookie_policy/v3_engine.py + cookie_policy/regex_boost.py - Laedt direkt alle 381 Cookie-MCs aus doc_check_controls - Layer 0 mit 12 eigenen Patterns als Initial-Seed - KB-Layer (CMP-Vendor-Cross-Check) bleibt erhalten - agent_version='3.0' Tests: 27/27 gruen (12 v3-impressum, 6 cookie-policy, 9 cross-placement). Alte v2-cookie-tests umgeschrieben auf v3-Pipeline-Mock. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../specialist_agents/cookie_policy/agent.py | 348 +++++++----------- .../cookie_policy/regex_boost.py | 115 ++++++ .../cookie_policy/v3_engine.py | 141 +++++++ .../impressum/regex_boost.py | 83 +++-- .../specialist_agents/impressum/v3_engine.py | 130 +++++-- backend-compliance/tests/test_impressum_v3.py | 21 ++ .../tests/test_specialist_cookie_policy.py | 173 ++++----- 7 files changed, 659 insertions(+), 352 deletions(-) create mode 100644 backend-compliance/compliance/services/specialist_agents/cookie_policy/regex_boost.py create mode 100644 backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py index 56a3ec90..d6d2073d 100644 --- a/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py @@ -1,14 +1,12 @@ -"""Cookie-Policy-Agent v2 — BaseSpecialistAgent. +"""Cookie-Policy-Agent v3 — baut auf doc_check_controls (381 DB-MCs). -Prüft den Cookie-Policy-DOKUMENT-Text (NICHT das Banner — das macht -der Cookie-Banner-Themen-Agent). Konsumiert optional context.cmp_vendors -für Konsistenz-Checks gegen die tatsächlich beobachtete Cookie-Liste. +Sprint 1.12 Phase 2 — analog zu impressum/agent.py: + Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py) + Layer 1 — Keyword-Match aus pass_criteria der 381 Cookie-MCs + Layer 2 — BGE-M3 Embedding-Match + Layer 3 — Semantic-Validator (LLM) + Auto-Learning-Library -Eskalations-Stufen: - 1. MC (regex) — schnell, deterministisch - 2. cookie_library_lookup gegen state.context.cmp_vendors (wenn vorhanden) - 3. LLM (qwen2.5:7b) für strukturelle/semantische Lücken - 4. OVH 120b als Fallback +Output-Layer (Linter / Rollup / Methodik-UI) bleibt 1:1. """ from __future__ import annotations @@ -28,132 +26,128 @@ from .._base import ( SourceType, lint_output, ) -from .._escalation import cascade +from .._pattern_library import record as record_pattern from .._rollup import rollup +from .._semantic_validator import build_rename_action, validate_present from .mcs import MC_IDS, MCS +from .v3_engine import run_v3_pipeline logger = logging.getLogger(__name__) -_SYSTEM_PROMPT = """Du bist ein deutscher Datenschutz-Anwalt mit Fokus -TDDDG § 25 + DSGVO Art. 13 + EuGH Planet49 + BGH Cookie-II. Aufgabe: -eine Cookie-Richtlinie auf strukturelle und inhaltliche LÜCKEN prüfen, -die einer regex-basierten Vorprüfung entgangen sind. - -WICHTIG: - - KEINE Bewertung "rechtssicher" / "garantiert" / "konform". - - Wenn unsicher: leeres Array zurückgeben statt zu halluzinieren. - - Wörtliches Zitat als evidence bei jeder Lücke. - -Antworte NUR mit JSON, Schema: - {"findings": [ - {"field_id": "...", "severity": "HIGH|MEDIUM|LOW", - "title": "...", "evidence": "wörtliches Zitat", - "action": "konkrete Empfehlung"} - ]} - -Typische Lücken-Kategorien: - - pseudo_purpose: "Siehe dazugehörige Datenverarbeitung" ohne konkrete Aussage - - duration_floskel: "solange erforderlich" ohne Zeitangabe - - vendor_unklar: "möglicherweise Drittanbieter" ohne Liste - - retention_inkonsistent: Tabelle nennt Tage, Fließtext nennt "session" - - drittland_fehlend: US-Vendor genannt (Google, Meta) aber Schrems-II - nicht thematisiert - - banner_reopen_fehlt: "Cookie-Einstellungen ändern" Link fehlt -""" +_SEV_TO_ENUM = { + "CRITICAL": Severity.HIGH, + "HIGH": Severity.HIGH, + "MEDIUM": Severity.MEDIUM, + "LOW": Severity.LOW, + "INFO": Severity.INFO, +} class CookiePolicyAgent(BaseSpecialistAgent): agent_id = "cookie_policy" - agent_version = "1.0" + agent_version = "3.0" doc_type = "cookie" owned_mc_ids = MC_IDS async def evaluate(self, agent_input: AgentInput) -> AgentOutput: start = datetime.now(timezone.utc) text = (agent_input.text or "").strip() + scope = set(agent_input.business_scope or []) coverage: list[McCoverage] = [] findings: list[Finding] = [] esc_logs: list[EscalationLog] = [] + notes_parts: list[str] = [] if len(text) < 100: for mc in MCS: coverage.append(McCoverage( mc_id=mc.mc_id, status="skipped", - reason="cookie policy text too short or empty", + reason="text too short", )) return self._finalize( - start, findings, esc_logs, coverage, confidence=0.0, + start, findings, esc_logs, coverage, + confidence=0.0, notes="Cookie-Policy-Text zu kurz oder leer.", ) - for mc in MCS: - matched = [p for p in mc.patterns if p.search(text)] - if mc.require_all: - ok = len(matched) == len(mc.patterns) - else: - ok = bool(matched) - if ok: - coverage.append(McCoverage( - mc_id=mc.mc_id, status="ok", - reason=f"{len(matched)}/{len(mc.patterns)} patterns hit", - )) + results, telemetry = await run_v3_pipeline(text, scope) + notes_parts.append( + f"v3-pipeline: {telemetry.get('total_mcs', 0)} DB-MCs · " + f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · " + f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides" + ) + + seen: set[str] = set() + for r in results: + mc_id = r.get("control_id") or "" + if not mc_id or mc_id in seen: continue - sev = self._sev(mc.severity_if_missing) - action = self._build_action(mc) + seen.add(mc_id) + passed = bool(r.get("passed")) + sev = _SEV_TO_ENUM.get( + (r.get("severity") or "MEDIUM").upper(), Severity.MEDIUM, + ) + coverage.append(McCoverage( + mc_id=mc_id, + status="ok" if passed else sev.value.lower(), + reason=str(r.get("matched_text") or r.get("hint") or "")[:120], + )) + if passed: + continue + label = r.get("label") or r.get("hint") or "" findings.append(Finding( - check_id=f"COOKIE-POLICY-AGENT-{mc.field_id.upper()}", + check_id=f"DBMC-{mc_id}", agent=self.agent_id, agent_version=self.agent_version, - field_id=mc.field_id, + field_id=mc_id, severity=sev, - severity_reason="missing", - title=f"Cookie-Policy-Lücke: '{mc.label}'", - norm=mc.norm, - action=action, - confidence=0.92, + severity_reason="db_mc_failed", + title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt", + norm=str(r.get("regulation") or "") + + (f" Art. {r.get('article')}" + if r.get("article") else ""), + evidence="", + action=str(r.get("hint") or "")[:400] + or "Bitte gegen die Cookie-Pflichten prüfen.", + confidence=0.9, sources=[EvidenceSource( source_type=SourceType.MC, - source_id=mc.mc_id, - detail=f"0/{len(mc.patterns)} pattern hit", + source_id=mc_id, + detail=str(r.get("source") or "keyword_match")[:120], + confidence=0.9, )], )) + + boost_ids = set(telemetry.get("layer_0_field_ids") or []) + for mc in MCS: coverage.append(McCoverage( mc_id=mc.mc_id, - status=sev.value.lower(), - reason="missing", + status="ok" if mc.field_id in boost_ids else "na", + reason=("regex-boost hit" + if mc.field_id in boost_ids + else "kein Pattern-Treffer (kein Veto)"), )) - # KB-Layer: wenn cmp_vendors im Kontext, checke ob die Policy - # alle beobachteten Vendoren erwähnt + await self._semantic_demote(text, findings, coverage) + kb_findings = self._kb_layer(text, agent_input.context or {}) findings.extend(kb_findings) - # LLM-Eskalation für subtile Lücken (Pseudo-Zwecke, Floskeln) - llm_findings, llm_logs = await self._maybe_escalate(text) - esc_logs.extend(llm_logs) - seen = {f.field_id for f in findings if f.field_id} - for f in llm_findings: - if f.field_id and f.field_id in seen: - continue - findings.append(f) - confs = [f.confidence for f in findings if f.confidence] or [0.95] overall = sum(confs) / len(confs) - return self._finalize(start, findings, esc_logs, coverage, - confidence=overall) + return self._finalize( + start, findings, esc_logs, coverage, + confidence=overall, notes=" · ".join(notes_parts), + ) - def _kb_layer( - self, text: str, context: dict, - ) -> list[Finding]: - """Wenn cmp_vendors gegeben: prüfe ob alle Vendoren in der Policy - erwähnt werden. Sonst Skip (keine Cross-Check ohne Datenbasis).""" + def _kb_layer(self, text: str, context: dict) -> list[Finding]: + """Wenn cmp_vendors im Kontext: prüfe ob alle in Policy genannt.""" cmp_vendors = context.get("cmp_vendors") or [] if not cmp_vendors: return [] text_lc = text.lower() - # Extrahiere Top-Vendor-Namen aus dem CMP seen_names: set[str] = set() for v in cmp_vendors: if not isinstance(v, dict): @@ -161,13 +155,10 @@ class CookiePolicyAgent(BaseSpecialistAgent): name = (v.get("name") or v.get("vendor") or "").strip() if name and len(name) > 2: seen_names.add(name) - missing: list[str] = [] - for n in sorted(seen_names): - if n.lower() not in text_lc: - missing.append(n) + missing = [n for n in sorted(seen_names) + if n.lower() not in text_lc] if not missing: return [] - # Ein Sammel-Finding pro Lücke sample = missing[:8] return [Finding( check_id="COOKIE-POLICY-AGENT-CMP-VS-POLICY", @@ -177,76 +168,82 @@ class CookiePolicyAgent(BaseSpecialistAgent): severity=Severity.MEDIUM, severity_reason="cmp_observed_vendors_not_in_policy", title=( - f"{len(missing)} im CMP beobachtete Vendor(en) " + f"{len(missing)} im CMP beobachtete Vendoren " "fehlen in der Cookie-Policy" ), - norm="DSGVO Art. 13 Abs. 1 lit. e (Empfänger vollständig nennen)", + norm="DSGVO Art. 13 Abs. 1 lit. e (Empfänger vollständig)", evidence=f"Fehlend: {', '.join(sample)}" + (" …" if len(missing) > 8 else ""), action=( "Die im Cookie-Consent-Banner beobachteten Vendoren " - "(Tracker/Werbenetzwerke) müssen vollständig in der " - "Cookie-Richtlinie aufgelistet sein." + "müssen vollständig in der Cookie-Richtlinie genannt sein." ), confidence=0.88, sources=[EvidenceSource( - source_type=SourceType.MC, + source_type=SourceType.CROSS, source_id="CMP-CROSS-CHECK", detail=f"{len(missing)} missing of {len(seen_names)}", )], )] - async def _maybe_escalate( - self, text: str, - ) -> tuple[list[Finding], list[EscalationLog]]: - user_prompt = ( - f"COOKIE-POLICY-TEXT:\n{text[:4500]}\n\n" - "Liste subtile Lücken nach TDDDG § 25 + DSGVO Art. 13. " - "Nur JSON." + async def _semantic_demote( + self, text: str, findings: list[Finding], + coverage: list[McCoverage], + ) -> None: + candidates = [ + f for f in findings + if f.severity in (Severity.HIGH.value, Severity.MEDIUM.value) + and f.severity_reason == "db_mc_failed" + ] + if not candidates: + return + result = await validate_present( + text, [(f.field_id, f.title[:80]) for f in candidates], ) - res, logs = await cascade(_SYSTEM_PROMPT, user_prompt) - if res is None or not isinstance(res.parsed, (dict, list)): - return [], logs - raw = (res.parsed.get("findings") - if isinstance(res.parsed, dict) else res.parsed) - if not isinstance(raw, list): - return [], logs - out: list[Finding] = [] - for item in raw: - if not isinstance(item, dict): + if not result: + return + for finding in candidates: + row = result.get(finding.field_id) + if not row or not row.get("found"): continue - fid = str(item.get("field_id") or "unknown")[:40] - sev_raw = str(item.get("severity") or "MEDIUM").upper() - sev = self._sev(sev_raw) - out.append(Finding( - check_id=f"COOKIE-POLICY-AGENT-LLM-{fid.upper()}", - agent=self.agent_id, - agent_version=self.agent_version, - field_id=fid, - severity=sev, - severity_reason="llm_detected", - title=str(item.get("title") or "")[:200], - norm="TDDDG § 25 + DSGVO Art. 13 (LLM-Analyse)", - evidence=str(item.get("evidence") or "")[:300], - action=str(item.get("action") or "")[:400], - confidence=0.7, - sources=[EvidenceSource( - source_type=res.stage, - source_id=res.model, - detail=f"prompt_chars={len(user_prompt)}", - confidence=0.7, - )], + if row.get("confidence", 0) < 0.6: + continue + label_used = row.get("label_used") or "abweichendes Label" + conf = float(row.get("confidence") or 0.8) + finding.severity = Severity.LOW.value + finding.severity_reason = "label_mismatch" + finding.title = ( + f"Label '{label_used}' weicht von Standard ab" + ) + finding.evidence = str(row.get("evidence") or "")[:200] + finding.action = build_rename_action( + finding.field_id, label_used, + ) + finding.confidence = conf + finding.sources.append(EvidenceSource( + source_type=SourceType.LLM_LOCAL, + source_id="semantic_validator", + detail=f"LLM-confirmed: '{label_used}'", + confidence=conf, )) - return out, logs + for c in coverage: + if c.mc_id == f"DBMC-{finding.field_id}": + c.status = "low" + c.reason = f"label_mismatch: '{label_used}'" + try: + record_pattern( + field_id=finding.field_id, + label_used=label_used, + confidence=conf, + agent_id=self.agent_id, + ) + except Exception as e: + logger.warning("pattern-library record failed: %s", e) def _finalize( - self, - start: datetime, - findings: list[Finding], - esc_logs: list[EscalationLog], - coverage: list[McCoverage], - confidence: float, - notes: str = "", + self, start: datetime, findings: list[Finding], + esc_logs: list[EscalationLog], coverage: list[McCoverage], + confidence: float, notes: str = "", ) -> AgentOutput: end = datetime.now(timezone.utc) recs = rollup(findings) @@ -270,78 +267,3 @@ class CookiePolicyAgent(BaseSpecialistAgent): mc_low=sum(1 for c in coverage if c.status == "low"), ) return lint_output(out) - - @staticmethod - def _sev(value: str) -> Severity: - v = (value or "").upper() - if v == "HIGH": - return Severity.HIGH - if v == "MEDIUM": - return Severity.MEDIUM - if v == "LOW": - return Severity.LOW - return Severity.INFO - - @staticmethod - def _build_action(mc) -> str: - suggestions = { - "categories_named": ( - "Die Cookie-Richtlinie sollte die Kategorien essentiell, " - "funktional, analytics und marketing klar benennen und " - "abgrenzen." - ), - "purpose_described": ( - "Pro Cookie-Kategorie den Verarbeitungszweck konkret " - "benennen (keine Pauschal-Formulierungen wie " - "'verschiedene Zwecke')." - ), - "retention_duration": ( - "Speicherdauer pro Cookie konkret angeben " - "(z.B. 'Session', '30 Tage', '2 Jahre') statt " - "'solange erforderlich'." - ), - "vendor_recipients": ( - "Alle Empfänger / Drittanbieter namentlich auflisten " - "(z.B. Google LLC, Meta Platforms Inc., …) inkl. Sitz." - ), - "opt_out_mechanism": ( - "Konkreten Opt-Out-Weg beschreiben: Banner-Reopen-Link, " - "Browser-Einstellungen, Vendor-spezifische Opt-Out-URLs." - ), - "banner_reopen": ( - "Sichtbaren Link 'Cookie-Einstellungen ändern' in die " - "Policy aufnehmen, der den CMP-Banner wieder öffnet." - ), - "version_date": ( - "Stand der Cookie-Richtlinie sichtbar angeben " - "(z.B. 'Stand: 1. Juni 2026')." - ), - "third_country_transfer": ( - "Bei Drittland-Transfer (USA u.a.) Hinweis auf " - "Schrems-II-Risiko + verwendete Schutzmaßnahmen " - "(SCC, DPF) ergänzen." - ), - "legal_basis": ( - "Rechtsgrundlage pro Kategorie benennen: § 25 Abs. 1 " - "TDDDG (Einwilligung) bzw. § 25 Abs. 2 TDDDG " - "(unbedingt erforderlich)." - ), - "cookie_table_or_list": ( - "Detail-Tabelle mit Cookie-Namen, Vendor, Zweck und " - "Laufzeit pro Cookie ergänzen (DSK-Best-Practice)." - ), - "dpo_contact": ( - "Kontaktmöglichkeit zum DSB oder Datenschutz-Team " - "in der Cookie-Richtlinie nennen (z.B. " - "datenschutz@)." - ), - "browser_settings_hint": ( - "Hinweis auf Browser-Einstellungen zum Blockieren/" - "Löschen von Cookies (Chrome, Firefox, Safari, Edge) " - "ergänzen." - ), - } - return suggestions.get(mc.field_id, ( - f"{mc.label} in der Cookie-Richtlinie ergänzen " - f"({mc.norm})." - )) diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/regex_boost.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/regex_boost.py new file mode 100644 index 00000000..64e006e0 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/regex_boost.py @@ -0,0 +1,115 @@ +"""Layer-0 Regex-Boost für Cookie-Policy-Agent v3. + +Analog zu impressum/regex_boost.py: meine 12 Cookie-Policy-Patterns +(aus mcs.py) werden als Vor-Stufe vor dem Keyword-Match aus +doc_check_controls (381 Cookie-MCs) genutzt. Wenn Pattern hits, kann +das thematisch passende DB-MC zu PASS überschrieben werden. + +User-Vorgabe 2026-06-09: manuelle Initial-Seeds sind erlaubt, das +Auto-Learning ergänzt zur Laufzeit. +""" + +from __future__ import annotations + +import logging + +from .mcs import MCS + +logger = logging.getLogger(__name__) + + +# Initial-Seed pro field_id — auf Cookie-Policy-Pflichten abgestimmt. +BOOST_KEYWORDS: dict[str, tuple[str, ...]] = { + "categories_named": ( + "kategorie", "essentiell", "funktional", "analytics", + "marketing", "notwendig", "tracking", + ), + "purpose_described": ( + "zweck", "zwecke", "verarbeitungszweck", "verwendungszweck", + "dient zu", "dient zur", + ), + "retention_duration": ( + "speicherdauer", "laufzeit", "dauer", "gültigkeitsdauer", + "session", "persistent", "tag", "monat", "jahr", + ), + "vendor_recipients": ( + "empfänger", "vendor", "drittanbieter", "third-party", + "drittland", "anbieter", "verantwortlicher", + ), + "opt_out_mechanism": ( + "opt-out", "widerruf", "widerrufen", "deaktivieren", + "abwählen", "einstellungen ändern", + ), + "banner_reopen": ( + "cookie-einstellungen", "banner", "präferenzen", + "einwilligung verwalten", "consent", + ), + "version_date": ( + "stand", "aktualisierung", "version", "letzte änderung", + "gültig ab", + ), + "third_country_transfer": ( + "drittland", "drittstaat", "usa", "scc", + "standardvertragsklauseln", "angemessenheitsbeschluss", + "data privacy framework", "dpf", + ), + "legal_basis": ( + "rechtsgrundlage", "einwilligung", "berechtigtes interesse", + "art. 6", "§ 25 tdddg", "tdddg", + ), + "cookie_table_or_list": ( + "tabelle", "liste", "cookie-name", "_ga", "_fbp", + "optanonconsent", + ), + "dpo_contact": ( + "datenschutzbeauftragter", "datenschutz-team", "dsb", + "datenschutz@", + ), + "browser_settings_hint": ( + "browser-einstellungen", "chrome", "firefox", "safari", + "edge", "cookies löschen", "cookies blockieren", + ), +} + + +def compute_regex_boosts(text: str) -> set[str]: + """Welche field_ids wurden im Cookie-Policy-Text durch Patterns + erkannt?""" + if not text or len(text) < 50: + return set() + hits: set[str] = set() + for mc in MCS: + # require_all / any-Logik aus mcs.py respektieren + if mc.require_all: + ok = all(p.search(text) for p in mc.patterns) + else: + ok = any(p.search(text) for p in mc.patterns) + if ok: + hits.add(mc.field_id) + return hits + + +def boost_matches_db_mc( + boosts: set[str], + pass_criteria: list, + fail_criteria: list | None = None, +) -> str | None: + """≥2 Boost-Keywords im kombinierten pass+fail-Text → match.""" + if not boosts: + return None + parts: list[str] = [] + for c in (pass_criteria or []): + if c: parts.append(str(c).lower()) + for c in (fail_criteria or []): + if c: parts.append(str(c).lower()) + if not parts: + return None + crit_text = " ".join(parts) + best: tuple[int, str] | None = None + for field_id in boosts: + kws = BOOST_KEYWORDS.get(field_id) or () + match_count = sum(1 for kw in kws if kw in crit_text) + if match_count >= 2: + if best is None or match_count > best[0]: + best = (match_count, field_id) + return best[1] if best else None diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py new file mode 100644 index 00000000..317502e9 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py @@ -0,0 +1,141 @@ +"""Cookie-Policy v3-Pipeline — analog zu impressum/v3_engine.py. + +Lädt 381 Cookie-MCs aus compliance.doc_check_controls (doc_type='cookie'), +ruft den deterministischen Keyword-Check + Embedding-Match + Boost-Override. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from .regex_boost import boost_matches_db_mc, compute_regex_boosts + +logger = logging.getLogger(__name__) + + +async def run_v3_pipeline( + text: str, business_scope: set[str], +) -> tuple[list[dict[str, Any]], dict[str, Any]]: + if not text or len(text) < 100: + return [], {"reason": "text too short"} + + # Layer 0: meine Pattern-Boosts + boosts = compute_regex_boosts(text) + boost_field_ids = sorted(boosts) + + # Layer 1: alle 381 Cookie-MCs aus DB laden + controls = await _load_cookie_mcs() + results: list[dict[str, Any]] = [] + if controls: + try: + from compliance.services.rag_document_checker import ( + _check_mc_deterministic, + ) + text_lower = text.lower().replace("\xad", "") + for mc in controls: + r = _check_mc_deterministic(text_lower, mc) + if r: + r["_pass_criteria"] = mc.get("pass_criteria") + r["_fail_criteria"] = mc.get("fail_criteria") + results.append(r) + except Exception as e: + logger.warning("layer-1 keyword check failed: %s", e) + + # Layer 2: Embedding-Match für failed MCs + failed_for_embed = [ + c for c, r in zip(controls, results) + if r and not r.get("passed") + ] + if failed_for_embed: + try: + from compliance.services.mc_embedding_matcher import ( + ensure_mc_embeddings, embedding_match, + ) + await ensure_mc_embeddings() + semantic_passes = await embedding_match( + text, failed_for_embed, doc_type="cookie", + ) + if semantic_passes: + for r in results: + cid = r.get("control_id") + if cid in semantic_passes and not r.get("passed"): + r["passed"] = True + r["matched_text"] = "[layer-2 embedding match]" + r["source"] = (r.get("source") or "") + "+embedding" + except Exception as e: + logger.warning("layer-2 embedding skipped: %s", e) + + # Layer 0 Boost-Override + boost_overrides = 0 + for r in results: + if r.get("passed"): + continue + pass_crit = r.get("_pass_criteria") or [] + fail_crit = r.get("_fail_criteria") or [] + if not pass_crit and not fail_crit: + pass_crit = [r.get("hint") or r.get("label") or ""] + matched_field = boost_matches_db_mc(boosts, pass_crit, fail_crit) + if matched_field: + r["passed"] = True + r["matched_text"] = f"[regex-boost layer 0 — {matched_field}]" + r["source"] = (r.get("source") or "") + "+regex_boost" + boost_overrides += 1 + + layer_1_pass = sum(1 for r in results if r.get("passed") + and "+regex_boost" not in (r.get("source") or "") + and "+embedding" not in (r.get("source") or "")) + telemetry = { + "layer_0_field_hits": len(boost_field_ids), + "layer_0_field_ids": boost_field_ids, + "layer_1_pass": layer_1_pass, + "layer_0_boost_overrides": boost_overrides, + "total_mcs": len(results), + } + return results, telemetry + + +async def _load_cookie_mcs() -> list[dict]: + """Lädt alle 381 Cookie-MCs aus compliance.doc_check_controls.""" + try: + import json + from classroom_engine.database import SessionLocal + from sqlalchemy import text as _sa_text + db = SessionLocal() + try: + rows = db.execute(_sa_text( + "SELECT id, control_id, control_uuid, title, regulation, " + " article, check_question, pass_criteria, " + " fail_criteria, severity " + "FROM compliance.doc_check_controls " + "WHERE doc_type='cookie' " + "ORDER BY severity DESC, title" + )).fetchall() + finally: + db.close() + out = [] + for r in rows: + def _parse(v): + if isinstance(v, list): return v + if isinstance(v, str): + try: + j = json.loads(v) + return j if isinstance(j, list) else [v] + except Exception: return [v] + return [] + out.append({ + "id": str(r[0]), + "control_id": r[1], + "control_uuid": str(r[2]) if r[2] else "", + "title": r[3] or "", + "regulation": r[4] or "", + "article": r[5] or "", + "check_question": r[6] or "", + "pass_criteria": _parse(r[7]), + "fail_criteria": _parse(r[8]), + "severity": r[9] or "MEDIUM", + }) + return out + except Exception as e: + logger.warning("_load_cookie_mcs failed: %s", e) + return [] diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py b/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py index f7c747b6..1137fb35 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py @@ -29,49 +29,70 @@ logger = logging.getLogger(__name__) # Für jedes meiner field_id: welche Wörter erscheinen typisch in # der pass_criteria der zugehörigen DB-MCs? Wenn diese Wörter im # pass_criteria gefunden werden, ist es vermutlich derselbe MC. +# Initial-Seed der Standard-Synonyme pro field_id. User-Vorgabe +# 2026-06-09: manuelle Erweiterung als Initial-Seed ist OK; das +# LLM-basierte Auto-Learning (Sprint 1.10/1.11) ergänzt zur Laufzeit +# weitere Tail-Schreibweisen, sodass über die Zeit asymptotisch +# weniger LLM-Calls nötig sind. BOOST_KEYWORDS: dict[str, tuple[str, ...]] = { "name_anbieter": ( - "rechtsform", "anschrift", "anbieter", "firmensitz", "firmenname", - "diensteanbieter", "verantwortlich", + # Adresse / Anschrift + "anschrift", "adresse", "postadresse", "postalisch", + "geschäftsadresse", "geschäftssitz", "firmensitz", + "niederlassung", "niederlassungsort", "sitz", "ort", + "straße", "hausnummer", "plz", + # Firmenname / Rechtsform + "firma", "firmenname", "rechtsform", "kaufmann", + "anbieter", "diensteanbieter", "verantwortlich", + "anbieterkennzeichnung", "unternehmen", ), "kontakt_email": ( "e-mail", "email", "elektronische", "kontaktmöglichkeit", - "mailadresse", + "kontaktdaten", "mailadresse", "e-mail-adresse", ), "kontakt_telefon": ( - "telefon", "rufnummer", "telefonnummer", "phone", "kontaktdaten", - "telekommunikation", + "telefon", "rufnummer", "telefonnummer", "phone", + "kontaktdaten", "telekommunikation", "fax", ), "handelsregister": ( - "handelsregister", "registergericht", "hrb", "registernummer", + "handelsregister", "registergericht", "hrb", "hra", + "registernummer", "registereintrag", + "handelsregisternummer", "handelsregisterauszug", ), "ust_id": ( - "umsatzsteuer", "ust-id", "umsatzsteueridentifikation", "ust-idnr", + "umsatzsteuer", "ust-id", "ust-idnr", + "umsatzsteueridentifikation", + "umsatzsteueridentifikationsnummer", "vat", ), "vertretungsberechtigte": ( - "geschäftsführer", "vorstand", "vertretungsberechtigt", - "vertretung", "gesellschafter", + "geschäftsführer", "geschäftsführung", "vorstand", + "vorsitzender", "vorstandsvorsitzender", + "vertretungsberechtigt", "vertretung", "vertreten", + "gesellschafter", "kaufmann", "inhaber", ), "vertretungsberechtigte_label_korrekt": ( - "deutsche", "bezeichnung", "rechtsform", + "geschäftsführer", "vorstand", "deutsche", "bezeichnung", + "rechtsform", ), "aufsichtsbehoerde": ( - "aufsichtsbehörde", "aufsicht", "behörde", "regulierungsbehörde", + "aufsichtsbehörde", "aufsicht", "behörde", + "regulierungsbehörde", "ihk", "bafin", "bnetza", "kba", ), "verantwortlicher_redaktion": ( "redaktion", "verantwortlich", "rstv", "mstv", - "journalistisch", "publizistisch", + "journalistisch", "publizistisch", "v.i.s.d.p", ), "verbraucher_streitbeilegung": ( "streitbeilegung", "vsbg", "verbraucherschlichtung", - "schlichtungsstelle", + "schlichtungsstelle", "verbraucherschlichtungsstelle", ), "berufsangaben": ( - "berufsbezeichnung", "berufsordnung", "kammer", "berufsrecht", + "berufsbezeichnung", "berufsordnung", "kammer", + "berufsrecht", "berufsverband", ), "odr_link": ( "online-streitbeilegung", "os-plattform", "odr", - "europäische kommission", + "europäische kommission", "ec.europa.eu", ), } @@ -94,22 +115,36 @@ def compute_regex_boosts(text: str, business_scope: set[str]) -> set[str]: return hits -def boost_matches_db_mc(boosts: set[str], pass_criteria: list) -> str | None: +def boost_matches_db_mc( + boosts: set[str], + pass_criteria: list, + fail_criteria: list | None = None, +) -> str | None: """Hat ein gebooster field_id genug Keyword-Überlapp mit den - pass_criteria einer DB-MC, um den MC zu boost'en? + pass_criteria + fail_criteria einer DB-MC, um den MC zu boost'en? - Returns: field_id (matched), oder None. - Vorsichtig: ≥2 Boost-Keywords müssen im pass_criteria-Text auftauchen, - sonst zu permissiv. + Returns: field_id (matched, mit höchstem Keyword-Match-Count), oder None. + + Schwelle: ≥2 unique Boost-Keywords im kombinierten Text. + Beide criteria-Listen werden berücksichtigt — fail_criteria-Wörter + wie 'Keine Adresse angegeben' helfen das MC eindeutig zuzuordnen. """ - if not boosts or not pass_criteria: + if not boosts: return None - crit_text = " ".join( - str(c) for c in pass_criteria if c - ).lower() + crit_parts: list[str] = [] + for c in (pass_criteria or []): + if c: + crit_parts.append(str(c).lower()) + for c in (fail_criteria or []): + if c: + crit_parts.append(str(c).lower()) + if not crit_parts: + return None + crit_text = " ".join(crit_parts) best: tuple[int, str] | None = None for field_id in boosts: kws = BOOST_KEYWORDS.get(field_id) or () + # zähle UNIQUE hits — gleiches keyword im selben Text zählt einmal match_count = sum(1 for kw in kws if kw in crit_text) if match_count >= 2: if best is None or match_count > best[0]: diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py index ee722c82..b9e75afd 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py @@ -43,44 +43,67 @@ async def run_v3_pipeline( logger.info("v3 Layer-0 boosts: %d hits — %s", len(boost_field_ids), boost_field_ids) - # Layer 1+2: bestehender rag_document_checker (Keyword + Embedding) - try: - from compliance.services.rag_document_checker import ( - check_document_with_controls, - ) - results = await check_document_with_controls( - text=text, - doc_type="impressum", - doc_title="Impressum (Agent-Test)", - db_url=db_url, - max_controls=0, - use_agent=False, - business_scope=business_scope, - ) - except Exception as e: - logger.warning("rag_document_checker failed: %s — using boosts only", - e) - results = [] + # Layer 1: lade ALLE 75 doc_check_controls für 'impressum' direkt + # aus DB. Sidecar-Klassifizierung wird bewusst übersprungen — der + # Agent soll auf der vollen MC-Liste arbeiten (Layer 3 LLM-Validator + # demoted Pattern-Misses zu LOW, sodass Breitenwirkung kein Risiko ist). + controls = await _load_impressum_mcs() + results: list[dict[str, Any]] = [] + if controls: + try: + from compliance.services.rag_document_checker import ( + _check_mc_deterministic, + ) + text_lower = text.lower().replace("\xad", "") + for mc in controls: + r = _check_mc_deterministic(text_lower, mc) + if r: + # pass_criteria im Result behalten für Boost-Layer + r["_pass_criteria"] = mc.get("pass_criteria") + r["_fail_criteria"] = mc.get("fail_criteria") + results.append(r) + except Exception as e: + logger.warning("layer-1 keyword check failed: %s", e) + results = [] + + # Layer 2: Embedding-Match für die failed MCs + failed_for_embed = [c for c, r in zip(controls, results) + if r and not r.get("passed")] + if failed_for_embed: + try: + from compliance.services.mc_embedding_matcher import ( + ensure_mc_embeddings, embedding_match, + ) + await ensure_mc_embeddings() + semantic_passes = await embedding_match( + text, failed_for_embed, doc_type="impressum", + ) + if semantic_passes: + for r in results: + cid = r.get("control_id") + if cid in semantic_passes and not r.get("passed"): + r["passed"] = True + r["matched_text"] = "[layer-2 embedding match]" + r["source"] = (r.get("source") or "") + "+embedding" + except Exception as e: + logger.warning("layer-2 embedding skipped: %s", e) layer_1_pass = sum(1 for r in results if r.get("passed")) layer_1_fail = sum(1 for r in results if r.get("passed") is False) - # Layer 0 Override: failed MCs deren pass_criteria zu einem meiner - # gebooster field_ids passt → überschreiben zu PASS + # Layer 0 Override: failed MCs deren pass/fail_criteria zu einem meiner + # gebooster field_ids passen → überschreiben zu PASS. Wir haben + # pass_criteria + fail_criteria in r drin (Layer-1 hat sie behalten). boost_overrides = 0 for r in results: if r.get("passed"): continue - # rag_document_checker nimmt pass_criteria intern weg vor - # dem Return; wir laden sie nochmal (oder bekommen sie via - # 'hint'). Hier rufen wir das per Helper. - crit = r.get("_pass_criteria") or [] - if not crit: - # Fallback: aus dem Hint (= check_question) Boost-Match - # versuchen. - crit = [r.get("hint") or ""] - matched_field = boost_matches_db_mc(boosts, crit) + pass_crit = r.get("_pass_criteria") or [] + fail_crit = r.get("_fail_criteria") or [] + if not pass_crit and not fail_crit: + pass_crit = [r.get("hint") or r.get("label") or ""] + matched_field = boost_matches_db_mc(boosts, pass_crit, fail_crit) if matched_field: r["passed"] = True r["matched_text"] = ( @@ -102,3 +125,52 @@ async def run_v3_pipeline( } logger.info("v3 telemetry: %s", telemetry) return results, telemetry + + +async def _load_impressum_mcs() -> list[dict]: + """Lädt alle Impressum-MCs aus compliance.doc_check_controls — ohne + Sidecar-Filter. v3_engine nimmt die volle Breite.""" + try: + import json + from classroom_engine.database import SessionLocal + from sqlalchemy import text as _sa_text + db = SessionLocal() + try: + rows = db.execute(_sa_text( + "SELECT id, control_id, control_uuid, title, regulation, " + " article, check_question, pass_criteria, " + " fail_criteria, severity " + "FROM compliance.doc_check_controls " + "WHERE doc_type='impressum' " + "ORDER BY severity DESC, title" + )).fetchall() + finally: + db.close() + out: list[dict] = [] + for r in rows: + def _parse(v): + if isinstance(v, list): + return v + if isinstance(v, str): + try: + j = json.loads(v) + return j if isinstance(j, list) else [v] + except Exception: + return [v] + return [] + out.append({ + "id": str(r[0]), + "control_id": r[1], + "control_uuid": str(r[2]) if r[2] else "", + "title": r[3] or "", + "regulation": r[4] or "", + "article": r[5] or "", + "check_question": r[6] or "", + "pass_criteria": _parse(r[7]), + "fail_criteria": _parse(r[8]), + "severity": r[9] or "MEDIUM", + }) + return out + except Exception as e: + logger.warning("_load_impressum_mcs failed: %s", e) + return [] diff --git a/backend-compliance/tests/test_impressum_v3.py b/backend-compliance/tests/test_impressum_v3.py index 6b603aee..bffc354b 100644 --- a/backend-compliance/tests/test_impressum_v3.py +++ b/backend-compliance/tests/test_impressum_v3.py @@ -70,6 +70,27 @@ def test_boost_matches_db_mc_returns_none_when_unrelated(): assert boost_matches_db_mc(boosts, pass_crit) is None +def test_boost_matches_db_mc_uses_fail_criteria(): + """Wörter aus fail_criteria sollen die Zuordnung mit unterstützen.""" + boosts = {"name_anbieter"} + pass_crit = ["Sichtbar"] + fail_crit = ["Keine Postadresse angegeben", "Adresse fehlt"] + matched = boost_matches_db_mc(boosts, pass_crit, fail_crit) + assert matched == "name_anbieter" + + +def test_boost_matches_db_mc_eto_address_case(): + """Konkreter ETO-Fall: AUTH-1954-A07 'Postadresse + Geschäftssitz'.""" + boosts = {"name_anbieter"} + pass_crit = [ + "Vollständige Postadresse (Straße, Hausnummer, PLZ, Ort, Land)", + "Oder: Eindeutige Angabe des Geschäftssitzes", + "Adresse ist aktuell und korrekt", + ] + matched = boost_matches_db_mc(boosts, pass_crit) + assert matched == "name_anbieter" + + def test_boost_keywords_cover_all_field_ids(): """Jedes mcs.py field_id muss in BOOST_KEYWORDS ein Eintrag haben.""" from compliance.services.specialist_agents.impressum.mcs import MCS diff --git a/backend-compliance/tests/test_specialist_cookie_policy.py b/backend-compliance/tests/test_specialist_cookie_policy.py index da95f370..f47f61f3 100644 --- a/backend-compliance/tests/test_specialist_cookie_policy.py +++ b/backend-compliance/tests/test_specialist_cookie_policy.py @@ -1,4 +1,4 @@ -"""Tests für Cookie-Policy-Agent.""" +"""Tests für Cookie-Policy-Agent v3 (Sprint 1.12 Phase 2).""" from __future__ import annotations @@ -22,122 +22,123 @@ Wir verwenden auf unserer Website verschiedene Cookies. Diese werden in folgende Kategorien eingeteilt: 1. Essentielle Cookies (unbedingt erforderlich) - Zweck: Diese Cookies dienen der grundlegenden Funktion der Website. + Zweck: grundlegende Funktion der Website. Rechtsgrundlage: § 25 Abs. 2 TDDDG Laufzeit: Session -2. Funktionale Cookies - Zweck: Speichern Ihre Präferenzen wie Sprache und Region. - Rechtsgrundlage: Art. 6 Abs. 1 lit. a DSGVO - Laufzeit: 30 Tage +2. Funktionale Cookies — Zweck: Präferenzen speichern. Laufzeit: 30 Tage -3. Analytics-Cookies (Performance) - Drittanbieter: Google LLC, USA - Zweck: Nutzungsstatistiken erheben. - Laufzeit: 24 Monate - Cookies: _ga, _gid - Drittland: USA — Standardvertragsklauseln + Data Privacy Framework +3. Analytics-Cookies — Drittanbieter: Google LLC, USA + Cookies: _ga, _gid · Laufzeit: 24 Monate + Drittland: USA — Standardvertragsklauseln + DPF -4. Marketing-Cookies (Tracking) - Drittanbieter: Meta Platforms Inc., USA - Cookies: _fbp, _fbc - Laufzeit: 90 Tage - -Sie können Ihre Cookie-Einstellungen jederzeit ändern über den Link -unten oder das Banner erneut öffnen. - -Browser-Einstellungen: Auch in Chrome, Firefox, Safari und Edge -können Sie Cookies blockieren oder löschen. +4. Marketing — Drittanbieter: Meta Platforms Inc. + Cookies: _fbp, _fbc · Laufzeit: 90 Tage +Cookie-Einstellungen jederzeit ändern. +Browser-Einstellungen: Chrome, Firefox, Safari, Edge. Kontakt: datenschutz@example.com Datenschutzbeauftragter: Max Mustermann """ -GAPPY_POLICY = """Cookies - -Wir verwenden Cookies um die Website zu betreiben. -Cookies werden so lange gespeichert wie nötig. -""" - - def _run(coro): return asyncio.get_event_loop().run_until_complete(coro) -def test_agent_is_registered(): - agent = REGISTRY.get("cookie_policy") - assert agent is not None - assert agent.doc_type == "cookie" - - -def test_short_text_skipped(monkeypatch): - async def _no_cascade(*a, **kw): return None, [] +@pytest.fixture +def mock_v3_pipeline(monkeypatch): + """Mockt run_v3_pipeline für deterministische Tests offline.""" + async def _fake(text, scope): + results = [ + {"control_id": "COOKIE-MC-001", + "passed": True, "severity": "MEDIUM", + "label": "Cookie-Kategorien benannt", + "regulation": "TDDDG", "article": "§ 25", + "hint": "", "matched_text": "essentiell", "source": "kw"}, + {"control_id": "COOKIE-MC-002", + "passed": False, "severity": "HIGH", + "label": "Versionsdatum / Stand der Policy", + "regulation": "DSGVO", "article": "Art. 5", + "hint": "Bitte 'Stand: TT.MM.JJJJ' angeben", + "matched_text": "", "source": ""}, + ] + telemetry = { + "layer_0_field_hits": 4, + "layer_0_field_ids": ["categories_named", "purpose_described", + "retention_duration", "version_date"], + "layer_1_pass": 1, + "layer_0_boost_overrides": 0, + "total_mcs": 2, + } + return results, telemetry monkeypatch.setattr( - "compliance.services.specialist_agents.cookie_policy.agent.cascade", - _no_cascade, + "compliance.services.specialist_agents.cookie_policy.agent.run_v3_pipeline", + _fake, ) + async def _no_validator(*a, **kw): return {} + monkeypatch.setattr( + "compliance.services.specialist_agents.cookie_policy.agent.validate_present", + _no_validator, + ) + + +def test_agent_is_registered(): + a = REGISTRY.get("cookie_policy") + assert a is not None + assert a.doc_type == "cookie" + assert a.agent_version == "3.0" + + +def test_short_text_skipped(mock_v3_pipeline): agent = CookiePolicyAgent() out = _run(agent.evaluate(AgentInput(doc_type="cookie", text="x"))) - assert out.mc_total > 0 assert all(c.status == "skipped" for c in out.mc_coverage) + assert not out.findings -def test_full_policy_has_few_high_findings(monkeypatch): - async def _no_cascade(*a, **kw): return None, [] - monkeypatch.setattr( - "compliance.services.specialist_agents.cookie_policy.agent.cascade", - _no_cascade, - ) - agent = CookiePolicyAgent() - out = _run(agent.evaluate(AgentInput(doc_type="cookie", text=FULL_POLICY))) - high = [f for f in out.findings if f.severity == Severity.HIGH.value] - assert not high, f"unexpected HIGH findings: {[f.field_id for f in high]}" - - -def test_gappy_policy_triggers_high(monkeypatch): - async def _no_cascade(*a, **kw): return None, [] - monkeypatch.setattr( - "compliance.services.specialist_agents.cookie_policy.agent.cascade", - _no_cascade, - ) +def test_agent_uses_db_mcs(mock_v3_pipeline): agent = CookiePolicyAgent() out = _run(agent.evaluate(AgentInput(doc_type="cookie", - text=GAPPY_POLICY))) - field_ids = {f.field_id for f in out.findings} - # 4 Kategorien fehlen, Vendoren fehlen, Opt-Out fehlt, Tabelle fehlt - assert "categories_named" in field_ids - assert "vendor_recipients" in field_ids - assert "opt_out_mechanism" in field_ids + text=FULL_POLICY))) + db_findings = [f for f in out.findings + if f.check_id.startswith("DBMC-")] + assert len(db_findings) == 1 + assert db_findings[0].check_id == "DBMC-COOKIE-MC-002" + assert db_findings[0].severity == Severity.HIGH.value -def test_cmp_vendor_cross_check_emits_finding(monkeypatch): - async def _no_cascade(*a, **kw): return None, [] - monkeypatch.setattr( - "compliance.services.specialist_agents.cookie_policy.agent.cascade", - _no_cascade, - ) +def test_agent_emits_boost_coverage(mock_v3_pipeline): + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput(doc_type="cookie", + text=FULL_POLICY))) + # 2 DB-MCs + 12 Pattern-Boost-Slots = 14 coverage entries minimum + assert out.mc_total >= 14 + boost_ok = [c for c in out.mc_coverage + if c.mc_id.startswith("CP-MC-") and c.status == "ok"] + assert len(boost_ok) == 4 + + +def test_agent_notes_telemetry(mock_v3_pipeline): + agent = CookiePolicyAgent() + out = _run(agent.evaluate(AgentInput(doc_type="cookie", + text=FULL_POLICY))) + assert "v3-pipeline" in out.notes + assert "Pattern-Boosts" in out.notes + + +def test_cmp_vendor_cross_check_emits_finding(mock_v3_pipeline): + """KB-Layer: CMP-Vendoren-Cross-Check bleibt erhalten in v3.""" agent = CookiePolicyAgent() out = _run(agent.evaluate(AgentInput( doc_type="cookie", text=FULL_POLICY, context={"cmp_vendors": [ - {"name": "Hotjar"}, # NICHT in Policy - {"name": "Google LLC"}, # IN Policy + {"name": "Hotjar"}, # nicht in Policy + {"name": "Google LLC"}, # in Policy ]}, ))) field_ids = {f.field_id for f in out.findings} assert "vendor_consistency" in field_ids - cmp_f = next(f for f in out.findings - if f.field_id == "vendor_consistency") - assert "Hotjar" in cmp_f.evidence - assert "Google" not in cmp_f.evidence - - -def test_recommendations_are_built(): - agent = CookiePolicyAgent() - out = _run(agent.evaluate(AgentInput(doc_type="cookie", - text=GAPPY_POLICY))) - assert out.recommendations - # Jede Recommendation hat mind. ein related_finding - for r in out.recommendations: - assert r.related_finding_ids + f = next(f for f in out.findings + if f.field_id == "vendor_consistency") + assert "Hotjar" in f.evidence