diff --git a/backend-compliance/compliance/services/rag_document_checker.py b/backend-compliance/compliance/services/rag_document_checker.py index 4a876841..91d3c350 100644 --- a/backend-compliance/compliance/services/rag_document_checker.py +++ b/backend-compliance/compliance/services/rag_document_checker.py @@ -14,6 +14,11 @@ Flow: → Returns structured results compatible with CheckItem format """ +# Lazy annotations: dieses Modul nutzt PEP-604-Hints (z.B. `set[str] | None`) +# und muss auch auf Python 3.9 importierbar bleiben (lokale Tests / safe- +# import in compliance.api). Keine Pydantic-Modelle hier — daher unkritisch. +from __future__ import annotations + import logging import os import re diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py index d6d2073d..3606de7b 100644 --- a/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/agent.py @@ -44,6 +44,19 @@ _SEV_TO_ENUM = { } +def _build_measure(label: str, norm: str) -> str: + """Maßnahme (Imperativ) statt Pruef-Frage als action. Das Tool + definiert Maßnahmen im Frontend — es stellt keine Fragen.""" + base = (label or "").strip().rstrip(".") + if not base: + return ("Cookie-Angabe ergänzen und gegen die gesetzlichen " + "Vorgaben prüfen.") + msg = f"Cookie-Angabe ergänzen: {base}." + if norm: + msg += f" Rechtsgrundlage: {norm}." + return msg + + class CookiePolicyAgent(BaseSpecialistAgent): agent_id = "cookie_policy" agent_version = "3.0" @@ -77,6 +90,11 @@ class CookiePolicyAgent(BaseSpecialistAgent): f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · " f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides" ) + if telemetry.get("sector_dropped"): + notes_parts.append( + f"Scope-Filter: {telemetry['sector_dropped']} " + "Branchen-MCs entfernt" + ) seen: set[str] = set() for r in results: @@ -96,6 +114,9 @@ class CookiePolicyAgent(BaseSpecialistAgent): if passed: continue label = r.get("label") or r.get("hint") or "" + norm_str = str(r.get("regulation") or "") + if r.get("article"): + norm_str = (norm_str + f" Art. {r.get('article')}").strip() findings.append(Finding( check_id=f"DBMC-{mc_id}", agent=self.agent_id, @@ -104,12 +125,9 @@ class CookiePolicyAgent(BaseSpecialistAgent): severity=sev, severity_reason="db_mc_failed", title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt", - norm=str(r.get("regulation") or "") + - (f" Art. {r.get('article')}" - if r.get("article") else ""), + norm=norm_str, evidence="", - action=str(r.get("hint") or "")[:400] - or "Bitte gegen die Cookie-Pflichten prüfen.", + action=_build_measure(str(label), norm_str)[:400], confidence=0.9, sources=[EvidenceSource( source_type=SourceType.MC, diff --git a/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py index 317502e9..d38c1b33 100644 --- a/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py +++ b/backend-compliance/compliance/services/specialist_agents/cookie_policy/v3_engine.py @@ -1,7 +1,10 @@ """Cookie-Policy v3-Pipeline — analog zu impressum/v3_engine.py. -Lädt 381 Cookie-MCs aus compliance.doc_check_controls (doc_type='cookie'), -ruft den deterministischen Keyword-Check + Embedding-Match + Boost-Override. +MC-Laden DELEGIERT an die Main-Tool-Engine (rag_document_checker._load_controls, +doc_type='cookie'): eine Quelle der Wahrheit inkl. P72-Scope, check_type='text' +und fits_doc_type/scope_requires. KEINE parallele Roh-Query mehr. Danach +deterministischer Keyword-Check + Embedding-Match + Boost-Override. +Zusaetzlich ein subtraktives Sektor-Gate (Branchen-Prefix) am Agent-Rand. """ from __future__ import annotations @@ -13,9 +16,17 @@ from .regex_boost import boost_matches_db_mc, compute_regex_boosts logger = logging.getLogger(__name__) +# Branchen-Prefix -> erwarteter Scope-Token (reuse aus dem Mail-V2-Filter). +try: + from compliance.services.mail_render_v2._scope_filter import ( + SECTOR_PREFIXES, + ) +except Exception: # pragma: no cover + SECTOR_PREFIXES = {} + async def run_v3_pipeline( - text: str, business_scope: set[str], + text: str, business_scope: set[str], db_url: str = "", ) -> tuple[list[dict[str, Any]], dict[str, Any]]: if not text or len(text) < 100: return [], {"reason": "text too short"} @@ -24,8 +35,16 @@ async def run_v3_pipeline( boosts = compute_regex_boosts(text) boost_field_ids = sorted(boosts) - # Layer 1: alle 381 Cookie-MCs aus DB laden - controls = await _load_cookie_mcs() + # Layer 1: MC-Laden DELEGIERT an die Main-Tool-Engine (Scope-Schutz + # inklusive). Danach subtraktives Sektor-Gate am Agent-Rand. + try: + from compliance.services.rag_document_checker import _load_controls + controls = await _load_controls("cookie", db_url, 0, business_scope) + except Exception as e: + logger.warning("cookie v3 load via main-tool engine failed: %s", e) + controls = [] + _normalize_criteria(controls) + controls, sector_dropped = _filter_sector(controls, business_scope) results: list[dict[str, Any]] = [] if controls: try: @@ -91,51 +110,50 @@ async def run_v3_pipeline( "layer_1_pass": layer_1_pass, "layer_0_boost_overrides": boost_overrides, "total_mcs": len(results), + "sector_dropped": sector_dropped, } return results, telemetry -async def _load_cookie_mcs() -> list[dict]: - """Lädt alle 381 Cookie-MCs aus compliance.doc_check_controls.""" - try: - import json - from classroom_engine.database import SessionLocal - from sqlalchemy import text as _sa_text - db = SessionLocal() - try: - rows = db.execute(_sa_text( - "SELECT id, control_id, control_uuid, title, regulation, " - " article, check_question, pass_criteria, " - " fail_criteria, severity " - "FROM compliance.doc_check_controls " - "WHERE doc_type='cookie' " - "ORDER BY severity DESC, title" - )).fetchall() - finally: - db.close() - out = [] - for r in rows: - def _parse(v): - if isinstance(v, list): return v - if isinstance(v, str): - try: - j = json.loads(v) - return j if isinstance(j, list) else [v] - except Exception: return [v] - return [] - out.append({ - "id": str(r[0]), - "control_id": r[1], - "control_uuid": str(r[2]) if r[2] else "", - "title": r[3] or "", - "regulation": r[4] or "", - "article": r[5] or "", - "check_question": r[6] or "", - "pass_criteria": _parse(r[7]), - "fail_criteria": _parse(r[8]), - "severity": r[9] or "MEDIUM", - }) - return out - except Exception as e: - logger.warning("_load_cookie_mcs failed: %s", e) - return [] +def _normalize_criteria(controls: list[dict[str, Any]]) -> None: + """asyncpg liefert JSONB-Spalten als Roh-String → zu Listen parsen.""" + import json + for c in controls: + for key in ("pass_criteria", "fail_criteria"): + v = c.get(key) + if isinstance(v, list): + continue + if isinstance(v, str): + try: + parsed = json.loads(v) + c[key] = parsed if isinstance(parsed, list) else [v] + except Exception: + c[key] = [v] if v else [] + else: + c[key] = [] + + +def _filter_sector( + controls: list[dict[str, Any]], + business_scope: set[str], +) -> tuple[list[dict[str, Any]], int]: + """Subtraktives Sektor-Gate: MCs deren control_id-Prefix eine Branche + bezeichnet (FIN/GOV/MED/INS/EDU/LEG/REL/POL), die NICHT im business_scope + liegt, werden verworfen — sonst tauchen z.B. GOV-MCs bei einem + OEM/Zulieferer als Finding auf. Reuse der SECTOR_PREFIXES.""" + scope_lc = {s.lower() for s in (business_scope or set())} + kept: list[dict[str, Any]] = [] + dropped = 0 + for c in controls: + cid = c.get("control_id") or "" + prefix = cid.split("-")[0].upper() if "-" in cid else "" + required = SECTOR_PREFIXES.get(prefix) + if required and not (scope_lc & required): + dropped += 1 + continue + kept.append(c) + if dropped: + logger.info("cookie v3 sector-filter: -%d Branchen-MCs", dropped) + return kept, dropped + + diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py index 3af8bb1d..9cade99c 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/agent.py @@ -54,6 +54,24 @@ _SEV_TO_ENUM = { } +def _build_measure(label: str, norm: str) -> str: + """Formuliert aus einer fehlenden Pflichtangabe eine konkrete Maßnahme + (Imperativ) statt die Pruef-Frage auszugeben. + + Das Tool definiert Maßnahmen im Frontend — es stellt keine Fragen. Der + check_question-Text ('Ist X angegeben?') wird daher NICHT mehr als + action durchgereicht (sonst zeigt die Finding-Card eine Frage unter + 'Pflicht-Maßnahme').""" + base = (label or "").strip().rstrip(".") + if not base: + return ("Pflichtangabe ergänzen und gegen die gesetzlichen " + "Vorgaben prüfen.") + msg = f"Pflichtangabe ergänzen: {base}." + if norm: + msg += f" Rechtsgrundlage: {norm}." + return msg + + class ImpressumAgent(BaseSpecialistAgent): agent_id = "impressum" agent_version = "3.0" @@ -96,6 +114,13 @@ class ImpressumAgent(BaseSpecialistAgent): f"{telemetry.get('layer_0_field_hits', 0)} Pattern-Boosts · " f"{telemetry.get('layer_0_boost_overrides', 0)} Boost-Overrides" ) + sec_drop = telemetry.get("sector_dropped", 0) + off_drop = telemetry.get("offtopic_dropped", 0) + if sec_drop or off_drop: + notes_parts.append( + f"Scope-Filter: {sec_drop} Branchen-MCs + " + f"{off_drop} themenfremde MCs entfernt" + ) # DB-MCs → Findings + Coverage seen_db_mcs: set[str] = set() @@ -116,6 +141,9 @@ class ImpressumAgent(BaseSpecialistAgent): if passed: continue label = r.get("label") or r.get("hint") or "" + norm_str = str(r.get("regulation") or "") + if r.get("article"): + norm_str = (norm_str + f" Art. {r.get('article')}").strip() findings.append(Finding( check_id=f"DBMC-{mc_id}", agent=self.agent_id, @@ -124,12 +152,9 @@ class ImpressumAgent(BaseSpecialistAgent): severity=sev, severity_reason="db_mc_failed", title=str(label)[:200] or f"DB-MC {mc_id} nicht erfüllt", - norm=str(r.get("regulation") or "") + - (f" Art. {r.get('article')}" - if r.get("article") else ""), + norm=norm_str, evidence="", - action=str(r.get("hint") or "")[:400] - or "Bitte gegen die Pflichtangaben prüfen.", + action=_build_measure(str(label), norm_str)[:400], confidence=0.9, sources=[EvidenceSource( source_type=SourceType.MC, diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py b/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py index f8b6cd99..db745eb7 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/mcs.py @@ -43,7 +43,8 @@ MCS: tuple[MC, ...] = ( # Label-free fallback: Firma (Rechtsform) + Adresse re.compile( r"\b[A-ZÄÖÜ][\w\-\& ]{1,80}?\s+" - r"(?:GmbH|AG|UG|KG|SE|GbR|OHG|Limited|Ltd|LLC)\s*" + r"(?:Aktiengesellschaft|GmbH|AG|UG|KG|SE|GbR|OHG|" + r"Limited|Ltd|LLC)\s*" r"[\s\S]{0,400}?" r"\b\d{5}\s+[A-ZÄÖÜ]", re.IGNORECASE, diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py b/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py index 1137fb35..31fa3199 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/regex_boost.py @@ -150,3 +150,44 @@ def boost_matches_db_mc( if best is None or match_count > best[0]: best = (match_count, field_id) return best[1] if best else None + + +def criteria_on_topic( + pass_criteria: list | None, + fail_criteria: list | None = None, + min_hits: int = 2, +) -> bool: + """Deterministischer Themen-Gate: gehoert eine DB-MC ueberhaupt ins + Impressum-Themenfeld? + + Prueft ob die kombinierten pass/fail_criteria mindestens `min_hits` + UNTERSCHIEDLICHE Schluesselwoerter aus IRGENDEINEM der 12 Impressum- + Felder (BOOST_KEYWORDS) enthalten. Fremd-MCs (z.B. 'Bestellbestaetigung', + 'behoerdliche Anzeige'), die faelschlich unter doc_type='impressum' + getaggt sind, haben keinen Themen-Ueberlapp und werden so aussortiert + — unabhaengig von DB-Sidecar/Klassifikation. + + Konservativ in beide Richtungen: + - ein einzelner inzidenteller Treffer (z.B. 'E-Mail' in einer + Bestellbestaetigung) reicht NICHT -> >=2 verschiedene Stichwoerter. + - leere Kriterien -> on-topic behalten (lieber ein FP als eine echte + Pflichtangabe verlieren). + """ + crit_parts: list[str] = [] + for c in (pass_criteria or []): + if c: + crit_parts.append(str(c).lower()) + for c in (fail_criteria or []): + if c: + crit_parts.append(str(c).lower()) + if not crit_parts: + return True + crit_text = " ".join(crit_parts) + hits: set[str] = set() + for kws in BOOST_KEYWORDS.values(): + for kw in kws: + if kw in crit_text: + hits.add(kw) + if len(hits) >= min_hits: + return True + return False diff --git a/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py b/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py index b9e75afd..85118799 100644 --- a/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py +++ b/backend-compliance/compliance/services/specialist_agents/impressum/v3_engine.py @@ -1,17 +1,20 @@ -"""Sprint-1.12 v3-Engine: läuft die volle 4-Layer-Pipeline auf einem -Impressum-Text: +"""v3-Engine: läuft die 4-Layer-Pipeline auf einem Impressum-Text. - Layer 0 — Regex-Boost (meine 12 Patterns aus mcs.py) - Layer 1 — Keyword-Match aus doc_check_controls.pass_criteria - (75 MCs in DB für Impressum) - Layer 2 — BGE-M3 Embedding-Match als Fallback (im - rag_document_checker integriert) - Layer 3 — Semantic-Validator (LLM) wenn auch Embedding nicht half - (im Agent angefasst, hier nur Ergebnisse durchgereicht) + Layer 0 — Regex-Boost (die 12 deterministischen Agent-Patterns) + Layer 1 — MC-Laden + Keyword-Match. Das LADEN delegiert an die + Main-Tool-Engine (rag_document_checker._load_controls): + eine Quelle der Wahrheit inkl. P72-Scope, check_type='text' + und fits_doc_type/scope_requires aus dem Sidecar. KEINE + parallele Roh-Query mehr. + Layer 2 — BGE-M3 Embedding-Match (mc_embedding_matcher, shared) + Layer 3 — Semantic-Validator (LLM) im Agent (hier nur durchgereicht) -Output: Liste Result-Dicts kompatibel mit rag_document_checker (passed, -severity, control_id, regulation, ...). Der Agent konvertiert sie dann -zu Finding-Objekten. +Zusätzlich am Agent-Rand: subtraktives Sektor-/Themen-Gate +(_filter_controls) — das Sektor-Gate (Branchen-Prefix) macht die Engine +nicht, es lebt sonst im Mail-V2-Report. + +Output: Liste Result-Dicts kompatibel mit rag_document_checker. Der Agent +konvertiert sie zu Finding-Objekten. """ from __future__ import annotations @@ -19,10 +22,25 @@ from __future__ import annotations import logging from typing import Any -from .regex_boost import boost_matches_db_mc, compute_regex_boosts +from .regex_boost import ( + boost_matches_db_mc, + compute_regex_boosts, + criteria_on_topic, +) logger = logging.getLogger(__name__) +# Branchen-Prefix -> erwarteter Scope-Token. Reuse aus dem Mail-V2- +# Scope-Filter, damit Agent-Pfad und Report-Pfad dieselbe Quelle nutzen +# (keine divergente Zweit-Logik). Import defensiv: faellt der Mail-Pfad +# weg, bleibt der Agent lauffaehig (ohne Sektor-Gate). +try: + from compliance.services.mail_render_v2._scope_filter import ( + SECTOR_PREFIXES, + ) +except Exception: # pragma: no cover - defensiver Fallback + SECTOR_PREFIXES = {} + async def run_v3_pipeline( text: str, @@ -43,11 +61,22 @@ async def run_v3_pipeline( logger.info("v3 Layer-0 boosts: %d hits — %s", len(boost_field_ids), boost_field_ids) - # Layer 1: lade ALLE 75 doc_check_controls für 'impressum' direkt - # aus DB. Sidecar-Klassifizierung wird bewusst übersprungen — der - # Agent soll auf der vollen MC-Liste arbeiten (Layer 3 LLM-Validator - # demoted Pattern-Misses zu LOW, sodass Breitenwirkung kein Risiko ist). - controls = await _load_impressum_mcs() + # Layer 1: MC-Laden DELEGIERT an die Main-Tool-Engine. Damit erbt der + # Agent automatisch deren Scope-Schutz (P72 canonical-scope, + # check_type='text', fits_doc_type/scope_requires) — genau die Filter, + # an denen die alte parallele Roh-Query vorbeilief. + try: + from compliance.services.rag_document_checker import _load_controls + controls = await _load_controls( + "impressum", db_url, 0, business_scope, + ) + except Exception as e: + logger.warning("v3 load via main-tool engine failed: %s", e) + controls = [] + _normalize_criteria(controls) + # Agent-Rand-Backstop (DB-unabhaengig): Sektor-Gate (Branchen-Prefix, + # macht die Engine nicht) + Themen-Gate (falls der Sidecar leer ist). + controls, drop_stats = _filter_controls(controls, business_scope) results: list[dict[str, Any]] = [] if controls: try: @@ -122,55 +151,73 @@ async def run_v3_pipeline( "layer_1_fail": layer_1_fail, "layer_0_boost_overrides": boost_overrides, "total_mcs": len(results), + "sector_dropped": drop_stats.get("sector_dropped", 0), + "offtopic_dropped": drop_stats.get("offtopic_dropped", 0), } logger.info("v3 telemetry: %s", telemetry) return results, telemetry -async def _load_impressum_mcs() -> list[dict]: - """Lädt alle Impressum-MCs aus compliance.doc_check_controls — ohne - Sidecar-Filter. v3_engine nimmt die volle Breite.""" - try: - import json - from classroom_engine.database import SessionLocal - from sqlalchemy import text as _sa_text - db = SessionLocal() - try: - rows = db.execute(_sa_text( - "SELECT id, control_id, control_uuid, title, regulation, " - " article, check_question, pass_criteria, " - " fail_criteria, severity " - "FROM compliance.doc_check_controls " - "WHERE doc_type='impressum' " - "ORDER BY severity DESC, title" - )).fetchall() - finally: - db.close() - out: list[dict] = [] - for r in rows: - def _parse(v): - if isinstance(v, list): - return v - if isinstance(v, str): - try: - j = json.loads(v) - return j if isinstance(j, list) else [v] - except Exception: - return [v] - return [] - out.append({ - "id": str(r[0]), - "control_id": r[1], - "control_uuid": str(r[2]) if r[2] else "", - "title": r[3] or "", - "regulation": r[4] or "", - "article": r[5] or "", - "check_question": r[6] or "", - "pass_criteria": _parse(r[7]), - "fail_criteria": _parse(r[8]), - "severity": r[9] or "MEDIUM", - }) - return out - except Exception as e: - logger.warning("_load_impressum_mcs failed: %s", e) - return [] +def _filter_controls( + controls: list[dict[str, Any]], + business_scope: set[str], +) -> tuple[list[dict[str, Any]], dict[str, int]]: + """Subtraktiver Scope-Filter VOR der Bewertung. + + 1. Sektor-Gate — MCs deren control_id-Prefix eine Branche bezeichnet + (FIN/GOV/MED/INS/EDU/LEG/REL/POL), die NICHT im business_scope + liegt, werden verworfen. Fuer einen OEM/Zulieferer/Maschinenbauer + (kein Behoerden-/Finanz-/Medizin-Scope) fallen GOV/FIN/MED-MCs so + heraus — derselbe Mechanismus wie im Mail-V2-Report. + 2. Themen-Gate — MCs ohne Impressum-Themenueberlapp werden verworfen + (faengt fremd-getaggte MCs wie 'Bestellbestaetigung'). + + Rein subtraktiv: entfernt nur falsch-positive Kandidaten, erzeugt nie + neue Findings. + """ + scope_lc = {s.lower() for s in (business_scope or set())} + kept: list[dict[str, Any]] = [] + sector_dropped = 0 + offtopic_dropped = 0 + for c in controls: + cid = c.get("control_id") or "" + prefix = cid.split("-")[0].upper() if "-" in cid else "" + required = SECTOR_PREFIXES.get(prefix) + if required and not (scope_lc & required): + sector_dropped += 1 + continue + if not criteria_on_topic(c.get("pass_criteria"), + c.get("fail_criteria")): + offtopic_dropped += 1 + continue + kept.append(c) + if sector_dropped or offtopic_dropped: + logger.info( + "v3 scope-filter: -%d Branchen-MCs, -%d themenfremde MCs " + "(scope=%s)", sector_dropped, offtopic_dropped, + sorted(scope_lc) or "leer", + ) + return kept, { + "sector_dropped": sector_dropped, + "offtopic_dropped": offtopic_dropped, + } + + +def _normalize_criteria(controls: list[dict[str, Any]]) -> None: + """asyncpg liefert JSONB-Spalten (pass_criteria/fail_criteria) als + Roh-String. In echte Listen parsen, damit Sektor-/Themen-Gate und + der Boost-Layer Element-weise (nicht Zeichen-weise) iterieren.""" + import json + for c in controls: + for key in ("pass_criteria", "fail_criteria"): + v = c.get(key) + if isinstance(v, list): + continue + if isinstance(v, str): + try: + parsed = json.loads(v) + c[key] = parsed if isinstance(parsed, list) else [v] + except Exception: + c[key] = [v] if v else [] + else: + c[key] = [] diff --git a/backend-compliance/tests/fixtures/impressum_groundtruth.py b/backend-compliance/tests/fixtures/impressum_groundtruth.py index 07b48459..3ae018c9 100644 --- a/backend-compliance/tests/fixtures/impressum_groundtruth.py +++ b/backend-compliance/tests/fixtures/impressum_groundtruth.py @@ -207,3 +207,47 @@ HECTRONIC = ImpressumGT( ALL_GROUND_TRUTH = (ETO, SAFETYKON, BMW, ELLI, HECTRONIC) + + +def make_mc( + control_id: str, + pass_criteria: list[str], + severity: str = "HIGH", + title: str | None = None, + regulation: str = "TMG", + article: str = "§ 5", + check_question: str | None = None, +) -> dict: + """Baut ein doc_check_controls-MC-Dict wie `_load_controls` es liefert — + fuer Tests die `_load_controls` mocken (Agent delegiert ans Main Tool).""" + return { + "id": control_id, + "control_id": control_id, + "title": title or control_id, + "regulation": regulation, + "article": article, + "severity": severity, + "check_question": check_question or f"Ist {control_id} vorhanden?", + "pass_criteria": list(pass_criteria), + "fail_criteria": [], + } + + +# Kern-Impressum-MCs die ein vollstaendiges Impressum erfuellt. Der +# Regex-Boost (Layer 0) hebt sie bei vorhandenen Pflichtangaben auf PASS, +# sodass ein sauberes Impressum 0 Findings produziert. Jede pass_criteria +# enthaelt >=2 Boost-Keywords damit boost_matches_db_mc greift. +CORE_IMPRESSUM_MCS: list[dict] = [ + make_mc("IMP-NAME-A1", ["Anbieter Anschrift Adresse Firma angegeben"]), + make_mc("IMP-MAIL-A1", ["E-Mail Kontaktmöglichkeit angegeben"]), + make_mc("IMP-TEL-A1", ["Telefon Telefonnummer Rufnummer angegeben"], + severity="MEDIUM"), + make_mc("IMP-HR-A1", ["Handelsregister Registernummer HRB angegeben"]), + make_mc("IMP-UST-A1", ["Umsatzsteuer USt-IdNr angegeben"], + severity="MEDIUM"), + make_mc("IMP-VTR-A1", + ["Geschäftsführer Vorstand Vertretungsberechtigt angegeben"]), + make_mc("IMP-VTRL-A1", + ["Geschäftsführer Vorstand deutsche Bezeichnung Rechtsform"], + severity="MEDIUM"), +] diff --git a/backend-compliance/tests/test_cookie_policy_v3.py b/backend-compliance/tests/test_cookie_policy_v3.py new file mode 100644 index 00000000..c40fe792 --- /dev/null +++ b/backend-compliance/tests/test_cookie_policy_v3.py @@ -0,0 +1,69 @@ +"""Tests für Cookie-Policy-Agent v3 — Delegation an die Main-Tool-Engine, +Sektor-Filter und Maßnahmen statt Fragen.""" + +from __future__ import annotations + +import asyncio + +from compliance.services.specialist_agents.cookie_policy import v3_engine +from compliance.services.specialist_agents.cookie_policy.agent import ( + _build_measure, +) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_build_measure_is_imperative_not_question(): + m = _build_measure("Speicherdauer der Cookies", "TDDDG § 25") + assert "?" not in m + assert "ergänzen" in m.lower() + assert "Rechtsgrundlage" in m + + +def test_build_measure_handles_empty_label(): + m = _build_measure("", "") + assert "?" not in m + assert m.strip() != "" + + +def test_cookie_v3_delegates_and_sector_filters(monkeypatch): + """run_v3_pipeline lädt über die Main-Tool-Engine (cookie) und das + Sektor-Gate entfernt GOV-MCs out-of-scope.""" + async def _fake_load(doc_type, db_url, limit, business_scope=None): + assert doc_type == "cookie" + return [ + {"control_id": "COOKIE-1-A1", "title": "Cookie-Kategorien", + "regulation": "TDDDG", "article": "§ 25", "severity": "HIGH", + "check_question": "Kategorien genannt?", + "pass_criteria": '["Cookie Kategorien essentiell"]', + "fail_criteria": "[]"}, + {"control_id": "GOV-9-A1", "title": "Behörden-Cookie", + "regulation": "X", "article": "", "severity": "HIGH", + "check_question": "Behörde?", + "pass_criteria": '["Behörde Aufsicht"]', + "fail_criteria": "[]"}, + ] + monkeypatch.setattr( + "compliance.services.rag_document_checker._load_controls", + _fake_load, + ) + + async def _no_match(*a, **kw): + return set() + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.embedding_match", + _no_match, raising=False, + ) + + text = ("Diese Website verwendet Cookies. Cookie-Kategorien: " + "essentiell, funktional. Speicherdauer und Zweck beschrieben. " + ) * 4 # > 100 Zeichen + results, telem = _run( + v3_engine.run_v3_pipeline(text, business_scope=set()), + ) + cids = {r["control_id"] for r in results} + assert "GOV-9-A1" not in cids # Sektor out-of-scope entfernt + assert "COOKIE-1-A1" in cids # cookie-MC bleibt + assert telem["sector_dropped"] == 1 diff --git a/backend-compliance/tests/test_impressum_groundtruth.py b/backend-compliance/tests/test_impressum_groundtruth.py index 595197ce..ebcaf43c 100644 --- a/backend-compliance/tests/test_impressum_groundtruth.py +++ b/backend-compliance/tests/test_impressum_groundtruth.py @@ -11,7 +11,10 @@ import asyncio import pytest from compliance.services.specialist_agents import AgentInput, ImpressumAgent -from tests.fixtures.impressum_groundtruth import ALL_GROUND_TRUTH +from tests.fixtures.impressum_groundtruth import ( + ALL_GROUND_TRUTH, + CORE_IMPRESSUM_MCS, +) def _run(coro): @@ -19,15 +22,40 @@ def _run(coro): @pytest.fixture(autouse=True) -def _no_llm(monkeypatch): - """Skip LLM-Eskalation in den GT-Tests — wir testen MC-Pattern, - nicht LLM-Halluzinationen.""" - async def _no_cascade(*a, **kw): return None, [] +def _agent_offline(monkeypatch): + """GT-Tests offline + deterministisch: LLM-Eskalation aus, MC-Laden ueber + die gemockte Main-Tool-Engine (CORE_IMPRESSUM_MCS), Embedding aus. Der + Agent delegiert jetzt ans Main Tool — daher `_load_controls` mocken.""" + import copy + + async def _no_cascade(*a, **kw): + return None, [] monkeypatch.setattr( - "compliance.services.specialist_agents.impressum.agent.cascade", + "compliance.services.specialist_agents._semantic_validator.cascade", _no_cascade, ) + async def _fake_load(doc_type, db_url, limit, business_scope=None): + return copy.deepcopy(CORE_IMPRESSUM_MCS) + monkeypatch.setattr( + "compliance.services.rag_document_checker._load_controls", + _fake_load, + ) + + async def _no_embed(*a, **kw): + return None + + async def _no_match(*a, **kw): + return set() + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.ensure_mc_embeddings", + _no_embed, raising=False, + ) + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.embedding_match", + _no_match, raising=False, + ) + @pytest.mark.parametrize("gt", ALL_GROUND_TRUTH, ids=lambda g: g.name) def test_no_false_positives_on_expected_clean(gt): diff --git a/backend-compliance/tests/test_impressum_v3.py b/backend-compliance/tests/test_impressum_v3.py index bffc354b..a2e420b1 100644 --- a/backend-compliance/tests/test_impressum_v3.py +++ b/backend-compliance/tests/test_impressum_v3.py @@ -15,10 +15,17 @@ from compliance.services.specialist_agents import ( ImpressumAgent, Severity, ) +from compliance.services.specialist_agents.impressum.agent import ( + _build_measure, +) from compliance.services.specialist_agents.impressum.regex_boost import ( BOOST_KEYWORDS, boost_matches_db_mc, compute_regex_boosts, + criteria_on_topic, +) +from compliance.services.specialist_agents.impressum.v3_engine import ( + _filter_controls, ) @@ -187,3 +194,152 @@ def test_short_text_skipped(): def test_agent_version_is_three(): agent = ImpressumAgent() assert agent.agent_version == "3.0" + + +# ── Themen-Gate: criteria_on_topic ────────────────────────────────── + + +def test_criteria_on_topic_keeps_genuine_telefon(): + assert criteria_on_topic([ + "Telefonnummer angeben", + "Erreichbar per Telefon", + ]) is True + + +def test_criteria_on_topic_keeps_genuine_address(): + assert criteria_on_topic([ + "Vollständige Postadresse (Straße, Hausnummer, PLZ, Ort)", + ]) is True + + +def test_criteria_on_topic_drops_bestellbestaetigung(): + # Fremd-MC: kein Impressum-Themenüberlapp → raus. + assert criteria_on_topic([ + "Bestellbestätigung wird nach Vertragsschluss versendet", + "Bestelleingang wird dokumentiert", + ]) is False + + +def test_criteria_on_topic_single_incidental_hit_dropped(): + # 'E-Mail' allein (1 Treffer) reicht nicht — braucht >=2. + assert criteria_on_topic([ + "Bestellbestätigung wird per E-Mail versendet", + ]) is False + + +def test_criteria_on_topic_drops_behoerdliche_anzeige(): + assert criteria_on_topic([ + "Behördliche Anzeige der Tätigkeit erfolgt", + "Gewerbeanmeldung liegt vor", + ]) is False + + +def test_criteria_on_topic_empty_kept(): + # Keine Kriterien = kein Signal → konservativ behalten. + assert criteria_on_topic([]) is True + + +# ── Scope-Filter: _filter_controls ────────────────────────────────── + + +def _mc(control_id, pass_criteria): + return {"control_id": control_id, "pass_criteria": pass_criteria, + "fail_criteria": []} + + +def test_filter_controls_drops_gov_when_out_of_scope(): + controls = [_mc("GOV-814-A03", ["Behörde meldet an Aufsichtsstelle"])] + kept, stats = _filter_controls(controls, business_scope=set()) + assert kept == [] + assert stats["sector_dropped"] == 1 + + +def test_filter_controls_keeps_gov_when_in_scope(): + controls = [_mc("GOV-814-A03", + ["Aufsichtsbehörde und Behörde benannt"])] + kept, stats = _filter_controls(controls, + business_scope={"government"}) + assert len(kept) == 1 + assert stats["sector_dropped"] == 0 + + +def test_filter_controls_keeps_genuine_impressum_mc(): + controls = [_mc("AUTH-1954-A07", + ["Vollständige Postadresse mit Straße und PLZ"])] + kept, stats = _filter_controls(controls, business_scope=set()) + assert len(kept) == 1 + assert stats["sector_dropped"] == 0 + assert stats["offtopic_dropped"] == 0 + + +def test_filter_controls_drops_offtopic_non_sector_mc(): + controls = [_mc("ECOM-1-A1", + ["Bestellbestätigung nach Vertragsschluss versenden"])] + kept, stats = _filter_controls(controls, business_scope=set()) + assert kept == [] + assert stats["offtopic_dropped"] == 1 + + +# ── Maßnahme statt Frage: _build_measure ──────────────────────────── + + +def test_build_measure_is_imperative_not_question(): + m = _build_measure("USt-IdNr", "§ 5 Abs. 1 Nr. 6 TMG") + assert "?" not in m + assert "ergänzen" in m.lower() + assert "Rechtsgrundlage" in m + + +def test_build_measure_handles_empty_label(): + m = _build_measure("", "") + assert "?" not in m + assert m.strip() != "" + + +# ── Delegation an Main-Tool-Engine + Filter (Integration) ─────────── + + +def test_run_v3_pipeline_delegates_and_filters(monkeypatch): + """run_v3_pipeline lädt über die Main-Tool-Engine (_load_controls + gemockt), normalisiert JSONB-Strings und das Sektor-/Themen-Gate + entfernt GOV (out-of-scope) + fremde MCs. Genuine MC bleibt.""" + from compliance.services.specialist_agents.impressum import v3_engine + + async def _fake_load(doc_type, db_url, limit, business_scope=None): + # pass_criteria absichtlich als JSON-STRING (wie asyncpg JSONB) + return [ + {"control_id": "AUTH-1954-A07", "title": "USt-IdNr", + "regulation": "TMG", "article": "§ 5", "severity": "HIGH", + "check_question": "Ist die USt-IdNr angegeben?", + "pass_criteria": '["USt-IdNr"]', + "fail_criteria": "[]"}, + {"control_id": "GOV-814-A03", "title": "Behördliche Anzeige", + "regulation": "X", "article": "", "severity": "HIGH", + "check_question": "Behörde informiert?", + "pass_criteria": '["Aufsichtsbehörde und Behörde benannt"]', + "fail_criteria": "[]"}, + {"control_id": "ECOM-1-A1", "title": "Bestellbestätigung", + "regulation": "X", "article": "", "severity": "HIGH", + "check_question": "Bestellbestätigung versandt?", + "pass_criteria": + '["Bestellbestätigung nach Vertragsschluss versenden"]', + "fail_criteria": "[]"}, + ] + monkeypatch.setattr( + "compliance.services.rag_document_checker._load_controls", + _fake_load, + ) + # AUTH-MC matched per Keyword → kein Layer-2-Embedding nötig; kein + # mc_embedding_matcher-Mock erforderlich. + + text = ("Beispiel GmbH\nMusterstr. 1\n12345 Berlin\n" + "USt-IdNr: DE123456789\n") * 5 # >100 Zeichen + results, telem = _run( + v3_engine.run_v3_pipeline(text, business_scope=set()), + ) + cids = {r["control_id"] for r in results} + assert "GOV-814-A03" not in cids # Sektor out-of-scope + assert "ECOM-1-A1" not in cids # themenfremd + assert "AUTH-1954-A07" in cids # genuine MC bleibt + assert telem["sector_dropped"] == 1 + assert telem["offtopic_dropped"] == 1 diff --git a/backend-compliance/tests/test_semantic_validator.py b/backend-compliance/tests/test_semantic_validator.py index fdb4dc20..850bc0c2 100644 --- a/backend-compliance/tests/test_semantic_validator.py +++ b/backend-compliance/tests/test_semantic_validator.py @@ -13,6 +13,7 @@ from compliance.services.specialist_agents._semantic_validator import ( standard_label, validate_present, ) +from tests.fixtures.impressum_groundtruth import make_mc def _run(coro): @@ -91,10 +92,33 @@ def test_semantic_demotion_high_to_low(monkeypatch): "compliance.services.specialist_agents._semantic_validator.cascade", _fake_cascade, ) + + # Agent delegiert MC-Laden ans Main Tool → _load_controls mocken. + # control_id == field_id 'kontakt_telefon', damit der Semantic-Demote + # das Finding ueber die LLM-Antwort zuordnet. + async def _fake_load(doc_type, db_url, limit, business_scope=None): + return [make_mc("kontakt_telefon", + ["Telefon Telefonnummer Rufnummer"], + severity="HIGH")] monkeypatch.setattr( - "compliance.services.specialist_agents.impressum.agent.cascade", - _fake_cascade, + "compliance.services.rag_document_checker._load_controls", + _fake_load, ) + + async def _no_match(*a, **kw): + return set() + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.embedding_match", + _no_match, raising=False, + ) + + async def _no_embed(*a, **kw): + return None + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.ensure_mc_embeddings", + _no_embed, raising=False, + ) + # Text OHNE Telefon-Label → MC matched nicht → HIGH-Finding text = ( "Beispiel GmbH\nMusterstr. 1\n12345 Berlin\n" diff --git a/backend-compliance/tests/test_specialist_agent_routes.py b/backend-compliance/tests/test_specialist_agent_routes.py index 7af94aa4..0e152216 100644 --- a/backend-compliance/tests/test_specialist_agent_routes.py +++ b/backend-compliance/tests/test_specialist_agent_routes.py @@ -9,6 +9,8 @@ from unittest.mock import AsyncMock, patch import pytest from fastapi.testclient import TestClient +from tests.fixtures.impressum_groundtruth import make_mc + @pytest.fixture def app(tmp_path, monkeypatch): @@ -69,9 +71,37 @@ def test_run_result_after_text_input(client, monkeypatch): # Skip LLM async def _no_cascade(*a, **kw): return None, [] monkeypatch.setattr( - "compliance.services.specialist_agents.impressum.agent.cascade", + "compliance.services.specialist_agents._semantic_validator.cascade", _no_cascade, ) + # Agent delegiert MC-Laden ans Main Tool → _load_controls mocken. + # Tesla nennt 'Management' (engl.) statt deutschem GF-Label → das + # label_korrekt-MC schlaegt fehl → erwartetes Finding. + async def _fake_load(doc_type, db_url, limit, business_scope=None): + # pass_criteria nur mit label_korrekt-eigenen Keywords (deutsche/ + # Bezeichnung/Rechtsform) — NICHT 'Geschäftsführer/Vorstand', sonst + # boostet das aktive vertretungsberechtigte-Feld (Tesla: 'Management') + # das MC faelschlich auf PASS. + return [make_mc( + "vertretungsberechtigte_label_korrekt", + ["deutsche Bezeichnung Rechtsform Pflicht angeben"], + )] + monkeypatch.setattr( + "compliance.services.rag_document_checker._load_controls", + _fake_load, + ) + + async def _no_match(*a, **kw): return set() + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.embedding_match", + _no_match, raising=False, + ) + + async def _no_embed(*a, **kw): return None + monkeypatch.setattr( + "compliance.services.mc_embedding_matcher.ensure_mc_embeddings", + _no_embed, raising=False, + ) r = client.post("/api/v1/specialist-agent/test/start", json={"agent_id": "impressum", "raw_texts": [ @@ -103,7 +133,7 @@ def test_run_result_after_text_input(client, monkeypatch): def test_artifacts_listing(client, monkeypatch): async def _no_cascade(*a, **kw): return None, [] monkeypatch.setattr( - "compliance.services.specialist_agents.impressum.agent.cascade", + "compliance.services.specialist_agents._semantic_validator.cascade", _no_cascade, ) r = client.post("/api/v1/specialist-agent/test/start",