From 7258744107de39c500d905f12794c700a6ef4082 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 11 Jun 2026 14:23:29 +0200 Subject: [PATCH] refactor+feat: Snapshot-Router-Split + generischer ChecklistAgent + AGB-Modul MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Item 2: Snapshot-Doc-Checks (cookie/impressum/dse/agb) in snapshot_check_routes.py (agent_compliance_check_routes.py 464→365 Z.); gleiche Pfade, in main.py registriert. - ChecklistAgent-Basis: DSE-Logik generalisiert (L1/L2, kurze Titel, _severity_ override-Hook). DSEAgent + AGBAgent sind jetzt Thin-Subclasses → künftige Doc-Agenten (widerruf/avv/…) trivial. - Item 4: AGBAgent (§§ 305 ff. BGB, AGB_CHECKLIST) + agb-check + AGB-Tab via AgentModuleTab. Kein Library-Firehose. Co-Authored-By: Claude Opus 4.7 --- .../snapshots/[snapshotId]/agb-check/route.ts | 34 ++++ .../sdk/agent/snapshots/[snapshotId]/page.tsx | 5 + .../api/agent_compliance_check_routes.py | 99 ---------- .../compliance/api/snapshot_check_routes.py | 100 ++++++++++ .../services/specialist_agents/__init__.py | 4 +- .../specialist_agents/_checklist_agent.py | 175 ++++++++++++++++++ .../specialist_agents/agb/__init__.py | 5 + .../services/specialist_agents/agb/agent.py | 19 ++ .../services/specialist_agents/dse/agent.py | 173 ++--------------- .../compliance/tests/test_agb_agent.py | 37 ++++ backend-compliance/main.py | 2 + 11 files changed, 391 insertions(+), 262 deletions(-) create mode 100644 admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts create mode 100644 backend-compliance/compliance/api/snapshot_check_routes.py create mode 100644 backend-compliance/compliance/services/specialist_agents/_checklist_agent.py create mode 100644 backend-compliance/compliance/services/specialist_agents/agb/__init__.py create mode 100644 backend-compliance/compliance/services/specialist_agents/agb/agent.py create mode 100644 backend-compliance/compliance/tests/test_agb_agent.py diff --git a/admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts b/admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts new file mode 100644 index 00000000..b5b97bc0 --- /dev/null +++ b/admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts @@ -0,0 +1,34 @@ +/** + * AGB-Analyse-Proxy + * GET /api/sdk/v1/agent/snapshots/{snapshotId}/agb-check + * → backend /api/compliance/agent/snapshots/{snapshotId}/agb-check + * + * Laeuft den kuratierten AGBAgent (§§ 305 ff. BGB) auf dem gespeicherten + * AGB-Text (kein Re-Crawl). + */ + +import { NextRequest, NextResponse } from 'next/server' + +const BACKEND_URL = + process.env.BACKEND_API_URL || process.env.BACKEND_URL || + 'http://backend-compliance:8002' + +export async function GET( + _request: NextRequest, + { params }: { params: Promise<{ snapshotId: string }> }, +) { + const { snapshotId } = await params + try { + const response = await fetch( + `${BACKEND_URL}/api/compliance/agent/snapshots/${snapshotId}/agb-check`, + { signal: AbortSignal.timeout(120_000) }, + ) + const data = await response.json() + return NextResponse.json(data, { status: response.status }) + } catch { + return NextResponse.json( + { error: 'AGB-Analyse fehlgeschlagen', findings: [] }, + { status: 503 }, + ) + } +} diff --git a/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx b/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx index 97a9c610..b4b94be7 100644 --- a/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx +++ b/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx @@ -57,6 +57,7 @@ export default function SnapshotDetail( ...(hasCookies ? [{ key: 'cookie', label: 'Cookies & Tracking' }] : []), ...(hasDoc('impressum') ? [{ key: 'impressum', label: 'Impressum' }] : []), ...(hasDoc('dse') ? [{ key: 'dse', label: 'Datenschutzerklärung' }] : []), + ...(hasDoc('agb') ? [{ key: 'agb', label: 'AGB' }] : []), // eslint-disable-next-line react-hooks/exhaustive-deps ], [snap]) @@ -104,6 +105,10 @@ export default function SnapshotDetail( {tab === 'dse' && ( )} + + {tab === 'agb' && ( + + )} )} diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index ac270c6f..6cdeacf5 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -223,105 +223,6 @@ async def get_snapshot(snapshot_id: str): db.close() -@router.get("/snapshots/{snapshot_id}/cookie-check") -async def snapshot_cookie_check(snapshot_id: str): - """Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db.""" - from fastapi import HTTPException - from database import SessionLocal - from compliance.services.check_snapshot import load_snapshot - from compliance.services.cookie_library_check import ( - analyze_cookies, load_big_library, - ) - from compliance.services.cookie_storage_inventory import ( - build_storage_inventory, storage_transparency_finding, - ) - from compliance.services.cookie_compliance_audit import ( - audit_cookie_compliance, - ) - db = SessionLocal() - try: - snap = load_snapshot(db, snapshot_id) - if not snap: - raise HTTPException(status_code=404, detail="snapshot not found") - vendors = snap.get("cmp_vendors") or [] - names = [c.get("name", "") - for v in vendors for c in (v.get("cookies") or [])] - big = load_big_library(db, names) - out = analyze_cookies(vendors, big) - inv = build_storage_inventory(vendors) - tf = storage_transparency_finding(inv) - if tf: - out["findings"].insert(0, tf) - out["summary"]["findings"] = len(out["findings"]) - out["storage_inventory"] = inv - # ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität. - docs = snap.get("doc_entries") or [] - cookie_text = next( - (e.get("text") or e.get("content") or "" for e in docs - if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")), - "", - ) - out["drift"] = audit_cookie_compliance( - db, cookie_text, snap.get("banner_result")) - return out - finally: - db.close() - - -@router.get("/snapshots/{snapshot_id}/impressum-check") -async def snapshot_impressum_check(snapshot_id: str): - """Impressum-Analyse aus dem Snapshot (kein Re-Crawl): laeuft den v3 - ImpressumAgent auf dem gespeicherten Impressum-Text + Profil/Scope und - liefert den AgentOutput (Findings/Massnahmen/MC-Coverage) fuer den Tab.""" - from fastapi import HTTPException - from database import SessionLocal - from compliance.services.check_snapshot import load_snapshot - from compliance.services.specialist_agents import REGISTRY, AgentInput - from compliance.api.agent_check._agent_outputs import ( - impressum_input_from_snapshot, - ) - db = SessionLocal() - try: - snap = load_snapshot(db, snapshot_id) - if not snap: - raise HTTPException(status_code=404, detail="snapshot not found") - agent_input = impressum_input_from_snapshot(snap) - if not agent_input: - return {"findings": [], "recommendations": [], "mc_coverage": [], - "notes": "kein Impressum-Text im Snapshot", "confidence": 0.0} - out = await REGISTRY.get("impressum").evaluate(AgentInput(**agent_input)) - return out.model_dump(mode="json") - finally: - db.close() - - -@router.get("/snapshots/{snapshot_id}/dse-check") -async def snapshot_dse_check(snapshot_id: str): - """DSE-Analyse aus dem Snapshot (kein Re-Crawl): laeuft den kuratierten - DSEAgent (Art. 13/14, ART13_CHECKLIST — KEIN Library-Firehose) auf dem - gespeicherten DSE-Text und liefert den AgentOutput fuer den Tab.""" - from fastapi import HTTPException - from database import SessionLocal - from compliance.services.check_snapshot import load_snapshot - from compliance.services.specialist_agents import REGISTRY, AgentInput - from compliance.api.agent_check._agent_outputs import ( - doc_input_from_snapshot, - ) - db = SessionLocal() - try: - snap = load_snapshot(db, snapshot_id) - if not snap: - raise HTTPException(status_code=404, detail="snapshot not found") - agent_input = doc_input_from_snapshot(snap, "dse") - if not agent_input: - return {"findings": [], "recommendations": [], "mc_coverage": [], - "notes": "kein DSE-Text im Snapshot", "confidence": 0.0} - out = await REGISTRY.get("dse").evaluate(AgentInput(**agent_input)) - return out.model_dump(mode="json") - finally: - db.close() - - @router.get("/admin/benchmark") async def benchmark( industry: str = "", diff --git a/backend-compliance/compliance/api/snapshot_check_routes.py b/backend-compliance/compliance/api/snapshot_check_routes.py new file mode 100644 index 00000000..f0f871c0 --- /dev/null +++ b/backend-compliance/compliance/api/snapshot_check_routes.py @@ -0,0 +1,100 @@ +"""Snapshot-getriebene Doc-Check-Endpoints (kein Re-Crawl). + +Cookie-Library-Abgleich + v3-Doc-Agenten (Impressum/DSE/AGB …) laufen auf den +gespeicherten Snapshot-Texten. Ausgelagert aus agent_compliance_check_routes.py +(LOC-Budget). Gleicher Router-Prefix → identische Pfade, keine Contract-Änderung. +""" + +from __future__ import annotations + +import logging + +from fastapi import APIRouter, HTTPException + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/compliance/agent", tags=["agent-snapshots"]) + + +async def _run_doc_agent(snapshot_id: str, doc_type: str, agent_id: str) -> dict: + """Lädt den Snapshot, baut den AgentInput für doc_type und läuft den + registrierten v3-Doc-Agenten. Geteilt von impressum/dse/agb (kein Re-Crawl).""" + from database import SessionLocal + from compliance.services.check_snapshot import load_snapshot + from compliance.services.specialist_agents import REGISTRY, AgentInput + from compliance.api.agent_check._agent_outputs import doc_input_from_snapshot + db = SessionLocal() + try: + snap = load_snapshot(db, snapshot_id) + if not snap: + raise HTTPException(status_code=404, detail="snapshot not found") + agent_input = doc_input_from_snapshot(snap, doc_type) + if not agent_input: + return {"findings": [], "recommendations": [], "mc_coverage": [], + "notes": f"kein {doc_type}-Text im Snapshot", "confidence": 0.0} + out = await REGISTRY.get(agent_id).evaluate(AgentInput(**agent_input)) + return out.model_dump(mode="json") + finally: + db.close() + + +@router.get("/snapshots/{snapshot_id}/cookie-check") +async def snapshot_cookie_check(snapshot_id: str): + """Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db.""" + from database import SessionLocal + from compliance.services.check_snapshot import load_snapshot + from compliance.services.cookie_library_check import ( + analyze_cookies, load_big_library, + ) + from compliance.services.cookie_storage_inventory import ( + build_storage_inventory, storage_transparency_finding, + ) + from compliance.services.cookie_compliance_audit import ( + audit_cookie_compliance, + ) + db = SessionLocal() + try: + snap = load_snapshot(db, snapshot_id) + if not snap: + raise HTTPException(status_code=404, detail="snapshot not found") + vendors = snap.get("cmp_vendors") or [] + names = [c.get("name", "") + for v in vendors for c in (v.get("cookies") or [])] + big = load_big_library(db, names) + out = analyze_cookies(vendors, big) + inv = build_storage_inventory(vendors) + tf = storage_transparency_finding(inv) + if tf: + out["findings"].insert(0, tf) + out["summary"]["findings"] = len(out["findings"]) + out["storage_inventory"] = inv + # ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität. + docs = snap.get("doc_entries") or [] + cookie_text = next( + (e.get("text") or e.get("content") or "" for e in docs + if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")), + "", + ) + out["drift"] = audit_cookie_compliance( + db, cookie_text, snap.get("banner_result")) + return out + finally: + db.close() + + +@router.get("/snapshots/{snapshot_id}/impressum-check") +async def snapshot_impressum_check(snapshot_id: str): + """Impressum-Analyse (v3 ImpressumAgent) auf dem gespeicherten Text.""" + return await _run_doc_agent(snapshot_id, "impressum", "impressum") + + +@router.get("/snapshots/{snapshot_id}/dse-check") +async def snapshot_dse_check(snapshot_id: str): + """DSE-Analyse (kuratierter DSEAgent, Art. 13/14) auf dem gespeicherten Text.""" + return await _run_doc_agent(snapshot_id, "dse", "dse") + + +@router.get("/snapshots/{snapshot_id}/agb-check") +async def snapshot_agb_check(snapshot_id: str): + """AGB-Analyse (kuratierter AGBAgent, §§ 305 ff. BGB) auf dem gespeicherten Text.""" + return await _run_doc_agent(snapshot_id, "agb", "agb") diff --git a/backend-compliance/compliance/services/specialist_agents/__init__.py b/backend-compliance/compliance/services/specialist_agents/__init__.py index d47b00cf..5d99b1db 100644 --- a/backend-compliance/compliance/services/specialist_agents/__init__.py +++ b/backend-compliance/compliance/services/specialist_agents/__init__.py @@ -28,6 +28,7 @@ from ._base import ( SourceType, ) from ._registry import REGISTRY +from .agb import AGBAgent from .cookie_policy import CookiePolicyAgent from .cross_placement import CrossPlacementAgent from .dse import DSEAgent @@ -38,11 +39,12 @@ REGISTRY.register(ImpressumAgent()) REGISTRY.register(CookiePolicyAgent()) REGISTRY.register(CrossPlacementAgent()) REGISTRY.register(DSEAgent()) +REGISTRY.register(AGBAgent()) __all__ = [ "AgentInput", "AgentOutput", "BaseSpecialistAgent", "EscalationLog", "EvidenceSource", "Finding", "McCoverage", "Recommendation", "Severity", "SourceType", "REGISTRY", "ImpressumAgent", "CookiePolicyAgent", - "CrossPlacementAgent", "DSEAgent", + "CrossPlacementAgent", "DSEAgent", "AGBAgent", ] diff --git a/backend-compliance/compliance/services/specialist_agents/_checklist_agent.py b/backend-compliance/compliance/services/specialist_agents/_checklist_agent.py new file mode 100644 index 00000000..c3234b6e --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/_checklist_agent.py @@ -0,0 +1,175 @@ +"""ChecklistAgent — generischer Doc-Agent über eine kuratierte Pflichtangaben- +Checkliste (L1 „erwähnt?" + L2 „vollständig?"). + +Basis für DSE/AGB/Widerruf/… : läuft die Checkliste deterministisch über den +Text → strukturierter AgentOutput (mc_coverage + Findings + Maßnahmen). BEWUSST +KEIN Library-Firehose. Subklassen setzen nur CHECKLIST/agent_id/doc_type und +können per _severity_override() die Severity kontextabhängig anheben. +""" + +from __future__ import annotations + +import re +from datetime import datetime, timezone + +from ._base import ( + AgentInput, + AgentOutput, + BaseSpecialistAgent, + CheckStatus, + EvidenceSource, + Finding, + McCoverage, + Severity, + SourceType, + lint_output, +) +from ._rollup import rollup + +_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM, + "LOW": Severity.LOW, "INFO": Severity.INFO} +# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle. +_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"} +# Norm-Referenz aus dem Label ziehen: '(Art. 13(1)(a))' oder '(§305 BGB)'. +_NORM_RE = re.compile(r"\((Art\.[^)]+|§\s*\d+[^)]*)\)") + + +def _match_value(text: str, start: int, end: int) -> str: + """Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt.""" + return " ".join(text[start:end].split())[:120] + + +def _norm_of(label: str) -> str: + m = _NORM_RE.search(label or "") + return m.group(1).strip() if m else "" + + +def _compiled(check: dict) -> list: + out = [] + for p in check.get("patterns", []): + try: + out.append(re.compile(p, re.IGNORECASE | re.MULTILINE)) + except re.error: + continue + return out + + +def _search(patterns: list, text: str): + for p in patterns: + m = p.search(text) + if m: + return m + return None + + +class ChecklistAgent(BaseSpecialistAgent): + CHECKLIST: list[dict] = [] + agent_id = "" + agent_version = "1.0" + doc_type = "" + + def _severity_override(self, c: dict, agent_input: AgentInput): + """Hook: Subklasse kann die Checklist-Severity kontextabhängig anheben + (z.B. DSE: Drittland → HIGH bei dokumentiertem Transfer). None = keine.""" + return None + + def _eff_sev(self, c: dict, agent_input: AgentInput) -> str: + return self._severity_override(c, agent_input) or c.get("severity", "MEDIUM") + + async def evaluate(self, agent_input: AgentInput) -> AgentOutput: + start = datetime.now(timezone.utc) + text = (agent_input.text or "").strip() + coverage: list[McCoverage] = [] + findings: list[Finding] = [] + + if len(text) < 100: + for c in self.CHECKLIST: + coverage.append(McCoverage( + mc_id=c["id"], status="skipped", + label=c["label"], reason="Text zu kurz")) + return self._finalize(start, findings, coverage, 0.0, + f"{self.doc_type}-Text zu kurz oder leer.") + + # L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2. + l1_present: dict[str, bool] = {} + for c in self.CHECKLIST: + if c.get("level", 1) != 1: + continue + m = _search(_compiled(c), text) + l1_present[c["id"]] = m is not None + coverage.append(self._cov(c, m, text, agent_input)) + if m is None: + findings.append(self._finding(c, False, agent_input)) + + # L2 (vollständig?) — nur wenn die übergeordnete L1 da ist. Fehlt die L1, + # deckt deren Finding die Lücke ab → KEIN irreführendes 'na' (nicht + # anwendbar) für das Detail. + for c in self.CHECKLIST: + if c.get("level", 1) != 2: + continue + parent = c.get("parent") + if parent and not l1_present.get(parent, False): + continue + m = _search(_compiled(c), text) + coverage.append(self._cov(c, m, text, agent_input)) + if m is None: + findings.append(self._finding(c, True, agent_input)) + + return self._finalize(start, findings, coverage, 0.7, "") + + def _cov(self, c: dict, m, text: str, ai: AgentInput) -> McCoverage: + if m is not None: + return McCoverage( + mc_id=c["id"], status="ok", label=c["label"], + reason="Pattern-Treffer", + found=_match_value(text, m.start(), m.end())) + sev = self._eff_sev(c, ai) + return McCoverage( + mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"), + label=c["label"], + reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig") + + def _finding(self, c: dict, present: bool, ai: AgentInput) -> Finding: + sev = self._eff_sev(c, ai) + # Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die + # ausführliche Begründung steht als evidence auf der Finding-Karte. + title = (f"{c['label']}: Detail unvollständig" if present + else f"{c['label']} fehlt") + action = (f"{c['label']} präzisieren." if present + else f"{c['label']} ergänzen.") + return Finding( + check_id=f"{self.agent_id.upper()}-{c['id']}", + agent=self.agent_id, agent_version=self.agent_version, + field_id=c["id"], status=CheckStatus.FAIL, + severity=_SEV.get(sev, Severity.MEDIUM), + severity_reason=("detail_incomplete" if present + else "pflichtangabe_missing"), + title=title, norm=_norm_of(c["label"]), + action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7, + sources=[EvidenceSource( + source_type=SourceType.REGEX, source_id=c["id"], + detail="kein Pattern-Treffer", confidence=0.7)], + ) + + def _finalize(self, start, findings, coverage, confidence, notes): + end = datetime.now(timezone.utc) + recs = rollup([f for f in findings + if f.status == CheckStatus.FAIL.value]) + out = AgentOutput( + agent=self.agent_id, agent_version=self.agent_version, + started_at=start, finished_at=end, + duration_ms=int((end - start).total_seconds() * 1000), + findings=findings, recommendations=recs, mc_coverage=coverage, + confidence=confidence, notes=notes, + mc_total=len(coverage), + mc_ok=sum(1 for c in coverage if c.status == "ok"), + mc_na=sum(1 for c in coverage if c.status == "na"), + mc_high=sum(1 for c in coverage if c.status == "high"), + mc_medium=sum(1 for c in coverage if c.status == "medium"), + mc_low=sum(1 for c in coverage if c.status == "low"), + mc_insufficient=sum( + 1 for c in coverage if c.status == "insufficient_evidence"), + mc_possibly=sum( + 1 for c in coverage if c.status == "possibly_applicable"), + ) + return lint_output(out) diff --git a/backend-compliance/compliance/services/specialist_agents/agb/__init__.py b/backend-compliance/compliance/services/specialist_agents/agb/__init__.py new file mode 100644 index 00000000..46374b9f --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/agb/__init__.py @@ -0,0 +1,5 @@ +"""AGB-Agent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB), kuratiert.""" + +from .agent import AGBAgent + +__all__ = ["AGBAgent"] diff --git a/backend-compliance/compliance/services/specialist_agents/agb/agent.py b/backend-compliance/compliance/services/specialist_agents/agb/agent.py new file mode 100644 index 00000000..a6cd5f20 --- /dev/null +++ b/backend-compliance/compliance/services/specialist_agents/agb/agent.py @@ -0,0 +1,19 @@ +"""AGBAgent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB). + +Thin-Subclass von ChecklistAgent über die kuratierte AGB_CHECKLIST (L1 +Pflichtangaben + L2 Detailchecks). KEIN Library-Firehose. +""" + +from __future__ import annotations + +from compliance.services.doc_checks.agb_checks import AGB_CHECKLIST + +from .._checklist_agent import ChecklistAgent + + +class AGBAgent(ChecklistAgent): + CHECKLIST = AGB_CHECKLIST + agent_id = "agb" + agent_version = "1.0" + doc_type = "agb" + owned_mc_ids = tuple(c["id"] for c in AGB_CHECKLIST) diff --git a/backend-compliance/compliance/services/specialist_agents/dse/agent.py b/backend-compliance/compliance/services/specialist_agents/dse/agent.py index 98de8bcc..babd9f2d 100644 --- a/backend-compliance/compliance/services/specialist_agents/dse/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/dse/agent.py @@ -1,180 +1,29 @@ """DSEAgent — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 DSGVO). -Kuratiert: läuft die ART13_CHECKLIST (Pflichtangaben L1 „erwähnt?" + -Detailchecks L2 „vollständig?") deterministisch über den DSE-Text. BEWUSST -KEIN Library-Firehose (eCall/Gesundheit/Telekom/Data-Act-Lärm aus der 90k- -Control-Library) — nur die echten Art-13/14-Auskunftspflichten. Output = -AgentOutput (mc_coverage + Findings + Maßnahmen), gerendert im AgentResultTab -wie das Impressum-Modul. +Thin-Subclass von ChecklistAgent über die kuratierte ART13_CHECKLIST (KEIN +90k-Library-Firehose). Einzige Spezialität: Drittland wird bei dokumentiertem +Drittlandtransfer (Scan-Kontext) zu HIGH angehoben. """ from __future__ import annotations -import re -from datetime import datetime, timezone - from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST -from .._base import ( - AgentInput, - AgentOutput, - BaseSpecialistAgent, - CheckStatus, - EvidenceSource, - Finding, - McCoverage, - Severity, - SourceType, - lint_output, -) -from .._rollup import rollup - -_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM, - "LOW": Severity.LOW, "INFO": Severity.INFO} -# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle. -_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"} -_NORM_RE = re.compile(r"\((Art\.[^)]+)\)") +from .._base import AgentInput +from .._checklist_agent import ChecklistAgent -def _match_value(text: str, start: int, end: int) -> str: - """Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt.""" - return " ".join(text[start:end].split())[:120] - - -def _norm_of(label: str) -> str: - m = _NORM_RE.search(label or "") - return m.group(1) if m else "Art. 13/14 DSGVO" - - -def _compiled(check: dict) -> list: - out = [] - for p in check.get("patterns", []): - try: - out.append(re.compile(p, re.IGNORECASE | re.MULTILINE)) - except re.error: - continue - return out - - -def _search(patterns: list, text: str): - for p in patterns: - m = p.search(text) - if m: - return m - return None - - -class DSEAgent(BaseSpecialistAgent): +class DSEAgent(ChecklistAgent): + CHECKLIST = ART13_CHECKLIST agent_id = "dse" agent_version = "1.0" doc_type = "dse" owned_mc_ids = tuple(c["id"] for c in ART13_CHECKLIST) - async def evaluate(self, agent_input: AgentInput) -> AgentOutput: - start = datetime.now(timezone.utc) - text = (agent_input.text or "").strip() + def _severity_override(self, c: dict, agent_input: AgentInput): sc = (agent_input.context or {}).get("scan_context") or {} - tc_applies = str(sc.get("third_country_transfer", "")).lower() in ( + tc = str(sc.get("third_country_transfer", "")).lower() in ( "yes", "true", "1", "ja") - coverage: list[McCoverage] = [] - findings: list[Finding] = [] - - if len(text) < 100: - for c in ART13_CHECKLIST: - coverage.append(McCoverage( - mc_id=c["id"], status="skipped", - label=c["label"], reason="Text zu kurz")) - return self._finalize(start, findings, coverage, 0.0, - "DSE-Text zu kurz oder leer.") - - # L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2. - l1_present: dict[str, bool] = {} - for c in ART13_CHECKLIST: - if c.get("level", 1) != 1: - continue - m = _search(_compiled(c), text) - l1_present[c["id"]] = m is not None - coverage.append(self._cov(c, m, text, tc_applies)) - if m is None: - findings.append(self._finding(c, False, tc_applies)) - - # L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 da ist. Fehlt - # die L1, deckt deren Finding die Lücke ab → KEIN irreführendes 'na' - # (nicht anwendbar) für das Detail (z.B. Transfermechanismus bei BMW). - for c in ART13_CHECKLIST: - if c.get("level", 1) != 2: - continue - parent = c.get("parent") - if parent and not l1_present.get(parent, False): - continue - m = _search(_compiled(c), text) - coverage.append(self._cov(c, m, text, tc_applies)) - if m is None: - findings.append(self._finding(c, True, tc_applies)) - - return self._finalize(start, findings, coverage, 0.7, "") - - @staticmethod - def _eff_sev(c: dict, tc_applies: bool) -> str: - """Drittland ist bei dokumentiertem Drittlandtransfer (Scan-Kontext) - keine weiche MEDIUM-Empfehlung mehr, sondern HIGH (Konzern/US-Provider).""" - if tc_applies and c["id"] in ("third_country", "third_country_mechanism"): + if tc and c["id"] in ("third_country", "third_country_mechanism"): return "HIGH" - return c.get("severity", "MEDIUM") - - def _cov(self, c: dict, m, text: str, tc_applies: bool) -> McCoverage: - if m is not None: - return McCoverage( - mc_id=c["id"], status="ok", label=c["label"], - reason="Pattern-Treffer", - found=_match_value(text, m.start(), m.end())) - sev = self._eff_sev(c, tc_applies) - return McCoverage( - mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"), - label=c["label"], - reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig") - - def _finding(self, c: dict, present: bool, tc_applies: bool) -> Finding: - sev = self._eff_sev(c, tc_applies) - # Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die - # ausführliche Begründung steht als evidence auf der Finding-Karte. - title = (f"{c['label']}: Detail unvollständig" if present - else f"{c['label']} fehlt") - action = (f"{c['label']} präzisieren." if present - else f"{c['label']} in der Datenschutzerklärung ergänzen.") - return Finding( - check_id=f"DSE-{c['id']}", - agent=self.agent_id, agent_version=self.agent_version, - field_id=c["id"], status=CheckStatus.FAIL, - severity=_SEV.get(sev, Severity.MEDIUM), - severity_reason=("detail_incomplete" if present - else "pflichtangabe_missing"), - title=title, norm=_norm_of(c["label"]), - action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7, - sources=[EvidenceSource( - source_type=SourceType.REGEX, source_id=c["id"], - detail="kein Pattern-Treffer", confidence=0.7)], - ) - - def _finalize(self, start, findings, coverage, confidence, notes): - end = datetime.now(timezone.utc) - recs = rollup([f for f in findings - if f.status == CheckStatus.FAIL.value]) - out = AgentOutput( - agent=self.agent_id, agent_version=self.agent_version, - started_at=start, finished_at=end, - duration_ms=int((end - start).total_seconds() * 1000), - findings=findings, recommendations=recs, mc_coverage=coverage, - confidence=confidence, notes=notes, - mc_total=len(coverage), - mc_ok=sum(1 for c in coverage if c.status == "ok"), - mc_na=sum(1 for c in coverage if c.status == "na"), - mc_high=sum(1 for c in coverage if c.status == "high"), - mc_medium=sum(1 for c in coverage if c.status == "medium"), - mc_low=sum(1 for c in coverage if c.status == "low"), - mc_insufficient=sum( - 1 for c in coverage if c.status == "insufficient_evidence"), - mc_possibly=sum( - 1 for c in coverage if c.status == "possibly_applicable"), - ) - return lint_output(out) + return None diff --git a/backend-compliance/compliance/tests/test_agb_agent.py b/backend-compliance/compliance/tests/test_agb_agent.py new file mode 100644 index 00000000..ba218b6b --- /dev/null +++ b/backend-compliance/compliance/tests/test_agb_agent.py @@ -0,0 +1,37 @@ +"""AGBAgent — kuratierte §§-305-ff-BGB-Checkliste (ChecklistAgent-Subclass).""" + +from __future__ import annotations + +import asyncio + +from compliance.services.specialist_agents import REGISTRY, AgentInput + + +def _run(text: str): + return asyncio.run( + REGISTRY.get("agb").evaluate(AgentInput(doc_type="agb", text=text))) + + +def test_agb_agent_registered(): + assert REGISTRY.get("agb") is not None + + +def test_agb_detects_core_clauses(): + text = ( + "Allgemeine Geschaeftsbedingungen. Geltungsbereich: Diese AGB gelten " + "fuer alle Vertraege. Vertragsschluss durch Bestellung. Preise inkl. " + "MwSt. Lieferung. Zahlung. Widerrufsrecht. Gewaehrleistung. Haftung. " + "Gerichtsstand Muenchen. ") * 4 + out = _run(text) + assert out.agent == "agb" + assert out.mc_total >= 1 + ok = [c.label for c in out.mc_coverage if c.status == "ok"] + assert any("Geltungsbereich" in lbl for lbl in ok) + # Titel/Maßnahme kurz (ChecklistAgent-Vertrag) + assert all(len(f.action) < 110 for f in out.findings) + + +def test_agb_short_text_skips(): + out = _run("zu kurz") + assert out.confidence == 0.0 + assert all(c.status == "skipped" for c in out.mc_coverage) diff --git a/backend-compliance/main.py b/backend-compliance/main.py index 03e859cb..6f0a0577 100644 --- a/backend-compliance/main.py +++ b/backend-compliance/main.py @@ -50,6 +50,7 @@ from compliance.api.agent_recurring_routes import router as agent_recurring_rout from compliance.api.agent_compare_routes import router as agent_compare_router from compliance.api.agent_doc_check_routes import router as agent_doc_check_router from compliance.api.agent_compliance_check_routes import router as agent_compliance_check_router +from compliance.api.snapshot_check_routes import router as snapshot_check_router from compliance.api.agent_findings_routes import router as agent_findings_router from compliance.api.saving_scan_routes import router as saving_scan_router from compliance.api.agent_migration_routes import router as agent_migration_router @@ -160,6 +161,7 @@ app.include_router(agent_recurring_router, prefix="/api") app.include_router(agent_compare_router, prefix="/api") app.include_router(agent_doc_check_router, prefix="/api") app.include_router(agent_compliance_check_router, prefix="/api") +app.include_router(snapshot_check_router, prefix="/api") app.include_router(agent_findings_router, prefix="/api") app.include_router(saving_scan_router, prefix="/api") app.include_router(agent_migration_router, prefix="/api")