diff --git a/admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts b/admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts
new file mode 100644
index 00000000..b5b97bc0
--- /dev/null
+++ b/admin-compliance/app/api/sdk/v1/agent/snapshots/[snapshotId]/agb-check/route.ts
@@ -0,0 +1,34 @@
+/**
+ * AGB-Analyse-Proxy
+ * GET /api/sdk/v1/agent/snapshots/{snapshotId}/agb-check
+ * → backend /api/compliance/agent/snapshots/{snapshotId}/agb-check
+ *
+ * Laeuft den kuratierten AGBAgent (§§ 305 ff. BGB) auf dem gespeicherten
+ * AGB-Text (kein Re-Crawl).
+ */
+
+import { NextRequest, NextResponse } from 'next/server'
+
+const BACKEND_URL =
+ process.env.BACKEND_API_URL || process.env.BACKEND_URL ||
+ 'http://backend-compliance:8002'
+
+export async function GET(
+ _request: NextRequest,
+ { params }: { params: Promise<{ snapshotId: string }> },
+) {
+ const { snapshotId } = await params
+ try {
+ const response = await fetch(
+ `${BACKEND_URL}/api/compliance/agent/snapshots/${snapshotId}/agb-check`,
+ { signal: AbortSignal.timeout(120_000) },
+ )
+ const data = await response.json()
+ return NextResponse.json(data, { status: response.status })
+ } catch {
+ return NextResponse.json(
+ { error: 'AGB-Analyse fehlgeschlagen', findings: [] },
+ { status: 503 },
+ )
+ }
+}
diff --git a/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx b/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx
index 97a9c610..b4b94be7 100644
--- a/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx
+++ b/admin-compliance/app/sdk/agent/snapshots/[snapshotId]/page.tsx
@@ -57,6 +57,7 @@ export default function SnapshotDetail(
...(hasCookies ? [{ key: 'cookie', label: 'Cookies & Tracking' }] : []),
...(hasDoc('impressum') ? [{ key: 'impressum', label: 'Impressum' }] : []),
...(hasDoc('dse') ? [{ key: 'dse', label: 'Datenschutzerklärung' }] : []),
+ ...(hasDoc('agb') ? [{ key: 'agb', label: 'AGB' }] : []),
// eslint-disable-next-line react-hooks/exhaustive-deps
], [snap])
@@ -104,6 +105,10 @@ export default function SnapshotDetail(
{tab === 'dse' && (
)}
+
+ {tab === 'agb' && (
+
+ )}
>
)}
diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py
index ac270c6f..6cdeacf5 100644
--- a/backend-compliance/compliance/api/agent_compliance_check_routes.py
+++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py
@@ -223,105 +223,6 @@ async def get_snapshot(snapshot_id: str):
db.close()
-@router.get("/snapshots/{snapshot_id}/cookie-check")
-async def snapshot_cookie_check(snapshot_id: str):
- """Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db."""
- from fastapi import HTTPException
- from database import SessionLocal
- from compliance.services.check_snapshot import load_snapshot
- from compliance.services.cookie_library_check import (
- analyze_cookies, load_big_library,
- )
- from compliance.services.cookie_storage_inventory import (
- build_storage_inventory, storage_transparency_finding,
- )
- from compliance.services.cookie_compliance_audit import (
- audit_cookie_compliance,
- )
- db = SessionLocal()
- try:
- snap = load_snapshot(db, snapshot_id)
- if not snap:
- raise HTTPException(status_code=404, detail="snapshot not found")
- vendors = snap.get("cmp_vendors") or []
- names = [c.get("name", "")
- for v in vendors for c in (v.get("cookies") or [])]
- big = load_big_library(db, names)
- out = analyze_cookies(vendors, big)
- inv = build_storage_inventory(vendors)
- tf = storage_transparency_finding(inv)
- if tf:
- out["findings"].insert(0, tf)
- out["summary"]["findings"] = len(out["findings"])
- out["storage_inventory"] = inv
- # ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität.
- docs = snap.get("doc_entries") or []
- cookie_text = next(
- (e.get("text") or e.get("content") or "" for e in docs
- if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")),
- "",
- )
- out["drift"] = audit_cookie_compliance(
- db, cookie_text, snap.get("banner_result"))
- return out
- finally:
- db.close()
-
-
-@router.get("/snapshots/{snapshot_id}/impressum-check")
-async def snapshot_impressum_check(snapshot_id: str):
- """Impressum-Analyse aus dem Snapshot (kein Re-Crawl): laeuft den v3
- ImpressumAgent auf dem gespeicherten Impressum-Text + Profil/Scope und
- liefert den AgentOutput (Findings/Massnahmen/MC-Coverage) fuer den Tab."""
- from fastapi import HTTPException
- from database import SessionLocal
- from compliance.services.check_snapshot import load_snapshot
- from compliance.services.specialist_agents import REGISTRY, AgentInput
- from compliance.api.agent_check._agent_outputs import (
- impressum_input_from_snapshot,
- )
- db = SessionLocal()
- try:
- snap = load_snapshot(db, snapshot_id)
- if not snap:
- raise HTTPException(status_code=404, detail="snapshot not found")
- agent_input = impressum_input_from_snapshot(snap)
- if not agent_input:
- return {"findings": [], "recommendations": [], "mc_coverage": [],
- "notes": "kein Impressum-Text im Snapshot", "confidence": 0.0}
- out = await REGISTRY.get("impressum").evaluate(AgentInput(**agent_input))
- return out.model_dump(mode="json")
- finally:
- db.close()
-
-
-@router.get("/snapshots/{snapshot_id}/dse-check")
-async def snapshot_dse_check(snapshot_id: str):
- """DSE-Analyse aus dem Snapshot (kein Re-Crawl): laeuft den kuratierten
- DSEAgent (Art. 13/14, ART13_CHECKLIST — KEIN Library-Firehose) auf dem
- gespeicherten DSE-Text und liefert den AgentOutput fuer den Tab."""
- from fastapi import HTTPException
- from database import SessionLocal
- from compliance.services.check_snapshot import load_snapshot
- from compliance.services.specialist_agents import REGISTRY, AgentInput
- from compliance.api.agent_check._agent_outputs import (
- doc_input_from_snapshot,
- )
- db = SessionLocal()
- try:
- snap = load_snapshot(db, snapshot_id)
- if not snap:
- raise HTTPException(status_code=404, detail="snapshot not found")
- agent_input = doc_input_from_snapshot(snap, "dse")
- if not agent_input:
- return {"findings": [], "recommendations": [], "mc_coverage": [],
- "notes": "kein DSE-Text im Snapshot", "confidence": 0.0}
- out = await REGISTRY.get("dse").evaluate(AgentInput(**agent_input))
- return out.model_dump(mode="json")
- finally:
- db.close()
-
-
@router.get("/admin/benchmark")
async def benchmark(
industry: str = "",
diff --git a/backend-compliance/compliance/api/snapshot_check_routes.py b/backend-compliance/compliance/api/snapshot_check_routes.py
new file mode 100644
index 00000000..f0f871c0
--- /dev/null
+++ b/backend-compliance/compliance/api/snapshot_check_routes.py
@@ -0,0 +1,100 @@
+"""Snapshot-getriebene Doc-Check-Endpoints (kein Re-Crawl).
+
+Cookie-Library-Abgleich + v3-Doc-Agenten (Impressum/DSE/AGB …) laufen auf den
+gespeicherten Snapshot-Texten. Ausgelagert aus agent_compliance_check_routes.py
+(LOC-Budget). Gleicher Router-Prefix → identische Pfade, keine Contract-Änderung.
+"""
+
+from __future__ import annotations
+
+import logging
+
+from fastapi import APIRouter, HTTPException
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/compliance/agent", tags=["agent-snapshots"])
+
+
+async def _run_doc_agent(snapshot_id: str, doc_type: str, agent_id: str) -> dict:
+ """Lädt den Snapshot, baut den AgentInput für doc_type und läuft den
+ registrierten v3-Doc-Agenten. Geteilt von impressum/dse/agb (kein Re-Crawl)."""
+ from database import SessionLocal
+ from compliance.services.check_snapshot import load_snapshot
+ from compliance.services.specialist_agents import REGISTRY, AgentInput
+ from compliance.api.agent_check._agent_outputs import doc_input_from_snapshot
+ db = SessionLocal()
+ try:
+ snap = load_snapshot(db, snapshot_id)
+ if not snap:
+ raise HTTPException(status_code=404, detail="snapshot not found")
+ agent_input = doc_input_from_snapshot(snap, doc_type)
+ if not agent_input:
+ return {"findings": [], "recommendations": [], "mc_coverage": [],
+ "notes": f"kein {doc_type}-Text im Snapshot", "confidence": 0.0}
+ out = await REGISTRY.get(agent_id).evaluate(AgentInput(**agent_input))
+ return out.model_dump(mode="json")
+ finally:
+ db.close()
+
+
+@router.get("/snapshots/{snapshot_id}/cookie-check")
+async def snapshot_cookie_check(snapshot_id: str):
+ """Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db."""
+ from database import SessionLocal
+ from compliance.services.check_snapshot import load_snapshot
+ from compliance.services.cookie_library_check import (
+ analyze_cookies, load_big_library,
+ )
+ from compliance.services.cookie_storage_inventory import (
+ build_storage_inventory, storage_transparency_finding,
+ )
+ from compliance.services.cookie_compliance_audit import (
+ audit_cookie_compliance,
+ )
+ db = SessionLocal()
+ try:
+ snap = load_snapshot(db, snapshot_id)
+ if not snap:
+ raise HTTPException(status_code=404, detail="snapshot not found")
+ vendors = snap.get("cmp_vendors") or []
+ names = [c.get("name", "")
+ for v in vendors for c in (v.get("cookies") or [])]
+ big = load_big_library(db, names)
+ out = analyze_cookies(vendors, big)
+ inv = build_storage_inventory(vendors)
+ tf = storage_transparency_finding(inv)
+ if tf:
+ out["findings"].insert(0, tf)
+ out["summary"]["findings"] = len(out["findings"])
+ out["storage_inventory"] = inv
+ # ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität.
+ docs = snap.get("doc_entries") or []
+ cookie_text = next(
+ (e.get("text") or e.get("content") or "" for e in docs
+ if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")),
+ "",
+ )
+ out["drift"] = audit_cookie_compliance(
+ db, cookie_text, snap.get("banner_result"))
+ return out
+ finally:
+ db.close()
+
+
+@router.get("/snapshots/{snapshot_id}/impressum-check")
+async def snapshot_impressum_check(snapshot_id: str):
+ """Impressum-Analyse (v3 ImpressumAgent) auf dem gespeicherten Text."""
+ return await _run_doc_agent(snapshot_id, "impressum", "impressum")
+
+
+@router.get("/snapshots/{snapshot_id}/dse-check")
+async def snapshot_dse_check(snapshot_id: str):
+ """DSE-Analyse (kuratierter DSEAgent, Art. 13/14) auf dem gespeicherten Text."""
+ return await _run_doc_agent(snapshot_id, "dse", "dse")
+
+
+@router.get("/snapshots/{snapshot_id}/agb-check")
+async def snapshot_agb_check(snapshot_id: str):
+ """AGB-Analyse (kuratierter AGBAgent, §§ 305 ff. BGB) auf dem gespeicherten Text."""
+ return await _run_doc_agent(snapshot_id, "agb", "agb")
diff --git a/backend-compliance/compliance/services/specialist_agents/__init__.py b/backend-compliance/compliance/services/specialist_agents/__init__.py
index d47b00cf..5d99b1db 100644
--- a/backend-compliance/compliance/services/specialist_agents/__init__.py
+++ b/backend-compliance/compliance/services/specialist_agents/__init__.py
@@ -28,6 +28,7 @@ from ._base import (
SourceType,
)
from ._registry import REGISTRY
+from .agb import AGBAgent
from .cookie_policy import CookiePolicyAgent
from .cross_placement import CrossPlacementAgent
from .dse import DSEAgent
@@ -38,11 +39,12 @@ REGISTRY.register(ImpressumAgent())
REGISTRY.register(CookiePolicyAgent())
REGISTRY.register(CrossPlacementAgent())
REGISTRY.register(DSEAgent())
+REGISTRY.register(AGBAgent())
__all__ = [
"AgentInput", "AgentOutput", "BaseSpecialistAgent",
"EscalationLog", "EvidenceSource", "Finding", "McCoverage",
"Recommendation", "Severity", "SourceType",
"REGISTRY", "ImpressumAgent", "CookiePolicyAgent",
- "CrossPlacementAgent", "DSEAgent",
+ "CrossPlacementAgent", "DSEAgent", "AGBAgent",
]
diff --git a/backend-compliance/compliance/services/specialist_agents/_checklist_agent.py b/backend-compliance/compliance/services/specialist_agents/_checklist_agent.py
new file mode 100644
index 00000000..c3234b6e
--- /dev/null
+++ b/backend-compliance/compliance/services/specialist_agents/_checklist_agent.py
@@ -0,0 +1,175 @@
+"""ChecklistAgent — generischer Doc-Agent über eine kuratierte Pflichtangaben-
+Checkliste (L1 „erwähnt?" + L2 „vollständig?").
+
+Basis für DSE/AGB/Widerruf/… : läuft die Checkliste deterministisch über den
+Text → strukturierter AgentOutput (mc_coverage + Findings + Maßnahmen). BEWUSST
+KEIN Library-Firehose. Subklassen setzen nur CHECKLIST/agent_id/doc_type und
+können per _severity_override() die Severity kontextabhängig anheben.
+"""
+
+from __future__ import annotations
+
+import re
+from datetime import datetime, timezone
+
+from ._base import (
+ AgentInput,
+ AgentOutput,
+ BaseSpecialistAgent,
+ CheckStatus,
+ EvidenceSource,
+ Finding,
+ McCoverage,
+ Severity,
+ SourceType,
+ lint_output,
+)
+from ._rollup import rollup
+
+_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM,
+ "LOW": Severity.LOW, "INFO": Severity.INFO}
+# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle.
+_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"}
+# Norm-Referenz aus dem Label ziehen: '(Art. 13(1)(a))' oder '(§305 BGB)'.
+_NORM_RE = re.compile(r"\((Art\.[^)]+|§\s*\d+[^)]*)\)")
+
+
+def _match_value(text: str, start: int, end: int) -> str:
+ """Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt."""
+ return " ".join(text[start:end].split())[:120]
+
+
+def _norm_of(label: str) -> str:
+ m = _NORM_RE.search(label or "")
+ return m.group(1).strip() if m else ""
+
+
+def _compiled(check: dict) -> list:
+ out = []
+ for p in check.get("patterns", []):
+ try:
+ out.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
+ except re.error:
+ continue
+ return out
+
+
+def _search(patterns: list, text: str):
+ for p in patterns:
+ m = p.search(text)
+ if m:
+ return m
+ return None
+
+
+class ChecklistAgent(BaseSpecialistAgent):
+ CHECKLIST: list[dict] = []
+ agent_id = ""
+ agent_version = "1.0"
+ doc_type = ""
+
+ def _severity_override(self, c: dict, agent_input: AgentInput):
+ """Hook: Subklasse kann die Checklist-Severity kontextabhängig anheben
+ (z.B. DSE: Drittland → HIGH bei dokumentiertem Transfer). None = keine."""
+ return None
+
+ def _eff_sev(self, c: dict, agent_input: AgentInput) -> str:
+ return self._severity_override(c, agent_input) or c.get("severity", "MEDIUM")
+
+ async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
+ start = datetime.now(timezone.utc)
+ text = (agent_input.text or "").strip()
+ coverage: list[McCoverage] = []
+ findings: list[Finding] = []
+
+ if len(text) < 100:
+ for c in self.CHECKLIST:
+ coverage.append(McCoverage(
+ mc_id=c["id"], status="skipped",
+ label=c["label"], reason="Text zu kurz"))
+ return self._finalize(start, findings, coverage, 0.0,
+ f"{self.doc_type}-Text zu kurz oder leer.")
+
+ # L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2.
+ l1_present: dict[str, bool] = {}
+ for c in self.CHECKLIST:
+ if c.get("level", 1) != 1:
+ continue
+ m = _search(_compiled(c), text)
+ l1_present[c["id"]] = m is not None
+ coverage.append(self._cov(c, m, text, agent_input))
+ if m is None:
+ findings.append(self._finding(c, False, agent_input))
+
+ # L2 (vollständig?) — nur wenn die übergeordnete L1 da ist. Fehlt die L1,
+ # deckt deren Finding die Lücke ab → KEIN irreführendes 'na' (nicht
+ # anwendbar) für das Detail.
+ for c in self.CHECKLIST:
+ if c.get("level", 1) != 2:
+ continue
+ parent = c.get("parent")
+ if parent and not l1_present.get(parent, False):
+ continue
+ m = _search(_compiled(c), text)
+ coverage.append(self._cov(c, m, text, agent_input))
+ if m is None:
+ findings.append(self._finding(c, True, agent_input))
+
+ return self._finalize(start, findings, coverage, 0.7, "")
+
+ def _cov(self, c: dict, m, text: str, ai: AgentInput) -> McCoverage:
+ if m is not None:
+ return McCoverage(
+ mc_id=c["id"], status="ok", label=c["label"],
+ reason="Pattern-Treffer",
+ found=_match_value(text, m.start(), m.end()))
+ sev = self._eff_sev(c, ai)
+ return McCoverage(
+ mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"),
+ label=c["label"],
+ reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig")
+
+ def _finding(self, c: dict, present: bool, ai: AgentInput) -> Finding:
+ sev = self._eff_sev(c, ai)
+ # Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die
+ # ausführliche Begründung steht als evidence auf der Finding-Karte.
+ title = (f"{c['label']}: Detail unvollständig" if present
+ else f"{c['label']} fehlt")
+ action = (f"{c['label']} präzisieren." if present
+ else f"{c['label']} ergänzen.")
+ return Finding(
+ check_id=f"{self.agent_id.upper()}-{c['id']}",
+ agent=self.agent_id, agent_version=self.agent_version,
+ field_id=c["id"], status=CheckStatus.FAIL,
+ severity=_SEV.get(sev, Severity.MEDIUM),
+ severity_reason=("detail_incomplete" if present
+ else "pflichtangabe_missing"),
+ title=title, norm=_norm_of(c["label"]),
+ action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7,
+ sources=[EvidenceSource(
+ source_type=SourceType.REGEX, source_id=c["id"],
+ detail="kein Pattern-Treffer", confidence=0.7)],
+ )
+
+ def _finalize(self, start, findings, coverage, confidence, notes):
+ end = datetime.now(timezone.utc)
+ recs = rollup([f for f in findings
+ if f.status == CheckStatus.FAIL.value])
+ out = AgentOutput(
+ agent=self.agent_id, agent_version=self.agent_version,
+ started_at=start, finished_at=end,
+ duration_ms=int((end - start).total_seconds() * 1000),
+ findings=findings, recommendations=recs, mc_coverage=coverage,
+ confidence=confidence, notes=notes,
+ mc_total=len(coverage),
+ mc_ok=sum(1 for c in coverage if c.status == "ok"),
+ mc_na=sum(1 for c in coverage if c.status == "na"),
+ mc_high=sum(1 for c in coverage if c.status == "high"),
+ mc_medium=sum(1 for c in coverage if c.status == "medium"),
+ mc_low=sum(1 for c in coverage if c.status == "low"),
+ mc_insufficient=sum(
+ 1 for c in coverage if c.status == "insufficient_evidence"),
+ mc_possibly=sum(
+ 1 for c in coverage if c.status == "possibly_applicable"),
+ )
+ return lint_output(out)
diff --git a/backend-compliance/compliance/services/specialist_agents/agb/__init__.py b/backend-compliance/compliance/services/specialist_agents/agb/__init__.py
new file mode 100644
index 00000000..46374b9f
--- /dev/null
+++ b/backend-compliance/compliance/services/specialist_agents/agb/__init__.py
@@ -0,0 +1,5 @@
+"""AGB-Agent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB), kuratiert."""
+
+from .agent import AGBAgent
+
+__all__ = ["AGBAgent"]
diff --git a/backend-compliance/compliance/services/specialist_agents/agb/agent.py b/backend-compliance/compliance/services/specialist_agents/agb/agent.py
new file mode 100644
index 00000000..a6cd5f20
--- /dev/null
+++ b/backend-compliance/compliance/services/specialist_agents/agb/agent.py
@@ -0,0 +1,19 @@
+"""AGBAgent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB).
+
+Thin-Subclass von ChecklistAgent über die kuratierte AGB_CHECKLIST (L1
+Pflichtangaben + L2 Detailchecks). KEIN Library-Firehose.
+"""
+
+from __future__ import annotations
+
+from compliance.services.doc_checks.agb_checks import AGB_CHECKLIST
+
+from .._checklist_agent import ChecklistAgent
+
+
+class AGBAgent(ChecklistAgent):
+ CHECKLIST = AGB_CHECKLIST
+ agent_id = "agb"
+ agent_version = "1.0"
+ doc_type = "agb"
+ owned_mc_ids = tuple(c["id"] for c in AGB_CHECKLIST)
diff --git a/backend-compliance/compliance/services/specialist_agents/dse/agent.py b/backend-compliance/compliance/services/specialist_agents/dse/agent.py
index 98de8bcc..babd9f2d 100644
--- a/backend-compliance/compliance/services/specialist_agents/dse/agent.py
+++ b/backend-compliance/compliance/services/specialist_agents/dse/agent.py
@@ -1,180 +1,29 @@
"""DSEAgent — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 DSGVO).
-Kuratiert: läuft die ART13_CHECKLIST (Pflichtangaben L1 „erwähnt?" +
-Detailchecks L2 „vollständig?") deterministisch über den DSE-Text. BEWUSST
-KEIN Library-Firehose (eCall/Gesundheit/Telekom/Data-Act-Lärm aus der 90k-
-Control-Library) — nur die echten Art-13/14-Auskunftspflichten. Output =
-AgentOutput (mc_coverage + Findings + Maßnahmen), gerendert im AgentResultTab
-wie das Impressum-Modul.
+Thin-Subclass von ChecklistAgent über die kuratierte ART13_CHECKLIST (KEIN
+90k-Library-Firehose). Einzige Spezialität: Drittland wird bei dokumentiertem
+Drittlandtransfer (Scan-Kontext) zu HIGH angehoben.
"""
from __future__ import annotations
-import re
-from datetime import datetime, timezone
-
from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST
-from .._base import (
- AgentInput,
- AgentOutput,
- BaseSpecialistAgent,
- CheckStatus,
- EvidenceSource,
- Finding,
- McCoverage,
- Severity,
- SourceType,
- lint_output,
-)
-from .._rollup import rollup
-
-_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM,
- "LOW": Severity.LOW, "INFO": Severity.INFO}
-# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle.
-_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"}
-_NORM_RE = re.compile(r"\((Art\.[^)]+)\)")
+from .._base import AgentInput
+from .._checklist_agent import ChecklistAgent
-def _match_value(text: str, start: int, end: int) -> str:
- """Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt."""
- return " ".join(text[start:end].split())[:120]
-
-
-def _norm_of(label: str) -> str:
- m = _NORM_RE.search(label or "")
- return m.group(1) if m else "Art. 13/14 DSGVO"
-
-
-def _compiled(check: dict) -> list:
- out = []
- for p in check.get("patterns", []):
- try:
- out.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
- except re.error:
- continue
- return out
-
-
-def _search(patterns: list, text: str):
- for p in patterns:
- m = p.search(text)
- if m:
- return m
- return None
-
-
-class DSEAgent(BaseSpecialistAgent):
+class DSEAgent(ChecklistAgent):
+ CHECKLIST = ART13_CHECKLIST
agent_id = "dse"
agent_version = "1.0"
doc_type = "dse"
owned_mc_ids = tuple(c["id"] for c in ART13_CHECKLIST)
- async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
- start = datetime.now(timezone.utc)
- text = (agent_input.text or "").strip()
+ def _severity_override(self, c: dict, agent_input: AgentInput):
sc = (agent_input.context or {}).get("scan_context") or {}
- tc_applies = str(sc.get("third_country_transfer", "")).lower() in (
+ tc = str(sc.get("third_country_transfer", "")).lower() in (
"yes", "true", "1", "ja")
- coverage: list[McCoverage] = []
- findings: list[Finding] = []
-
- if len(text) < 100:
- for c in ART13_CHECKLIST:
- coverage.append(McCoverage(
- mc_id=c["id"], status="skipped",
- label=c["label"], reason="Text zu kurz"))
- return self._finalize(start, findings, coverage, 0.0,
- "DSE-Text zu kurz oder leer.")
-
- # L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2.
- l1_present: dict[str, bool] = {}
- for c in ART13_CHECKLIST:
- if c.get("level", 1) != 1:
- continue
- m = _search(_compiled(c), text)
- l1_present[c["id"]] = m is not None
- coverage.append(self._cov(c, m, text, tc_applies))
- if m is None:
- findings.append(self._finding(c, False, tc_applies))
-
- # L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 da ist. Fehlt
- # die L1, deckt deren Finding die Lücke ab → KEIN irreführendes 'na'
- # (nicht anwendbar) für das Detail (z.B. Transfermechanismus bei BMW).
- for c in ART13_CHECKLIST:
- if c.get("level", 1) != 2:
- continue
- parent = c.get("parent")
- if parent and not l1_present.get(parent, False):
- continue
- m = _search(_compiled(c), text)
- coverage.append(self._cov(c, m, text, tc_applies))
- if m is None:
- findings.append(self._finding(c, True, tc_applies))
-
- return self._finalize(start, findings, coverage, 0.7, "")
-
- @staticmethod
- def _eff_sev(c: dict, tc_applies: bool) -> str:
- """Drittland ist bei dokumentiertem Drittlandtransfer (Scan-Kontext)
- keine weiche MEDIUM-Empfehlung mehr, sondern HIGH (Konzern/US-Provider)."""
- if tc_applies and c["id"] in ("third_country", "third_country_mechanism"):
+ if tc and c["id"] in ("third_country", "third_country_mechanism"):
return "HIGH"
- return c.get("severity", "MEDIUM")
-
- def _cov(self, c: dict, m, text: str, tc_applies: bool) -> McCoverage:
- if m is not None:
- return McCoverage(
- mc_id=c["id"], status="ok", label=c["label"],
- reason="Pattern-Treffer",
- found=_match_value(text, m.start(), m.end()))
- sev = self._eff_sev(c, tc_applies)
- return McCoverage(
- mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"),
- label=c["label"],
- reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig")
-
- def _finding(self, c: dict, present: bool, tc_applies: bool) -> Finding:
- sev = self._eff_sev(c, tc_applies)
- # Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die
- # ausführliche Begründung steht als evidence auf der Finding-Karte.
- title = (f"{c['label']}: Detail unvollständig" if present
- else f"{c['label']} fehlt")
- action = (f"{c['label']} präzisieren." if present
- else f"{c['label']} in der Datenschutzerklärung ergänzen.")
- return Finding(
- check_id=f"DSE-{c['id']}",
- agent=self.agent_id, agent_version=self.agent_version,
- field_id=c["id"], status=CheckStatus.FAIL,
- severity=_SEV.get(sev, Severity.MEDIUM),
- severity_reason=("detail_incomplete" if present
- else "pflichtangabe_missing"),
- title=title, norm=_norm_of(c["label"]),
- action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7,
- sources=[EvidenceSource(
- source_type=SourceType.REGEX, source_id=c["id"],
- detail="kein Pattern-Treffer", confidence=0.7)],
- )
-
- def _finalize(self, start, findings, coverage, confidence, notes):
- end = datetime.now(timezone.utc)
- recs = rollup([f for f in findings
- if f.status == CheckStatus.FAIL.value])
- out = AgentOutput(
- agent=self.agent_id, agent_version=self.agent_version,
- started_at=start, finished_at=end,
- duration_ms=int((end - start).total_seconds() * 1000),
- findings=findings, recommendations=recs, mc_coverage=coverage,
- confidence=confidence, notes=notes,
- mc_total=len(coverage),
- mc_ok=sum(1 for c in coverage if c.status == "ok"),
- mc_na=sum(1 for c in coverage if c.status == "na"),
- mc_high=sum(1 for c in coverage if c.status == "high"),
- mc_medium=sum(1 for c in coverage if c.status == "medium"),
- mc_low=sum(1 for c in coverage if c.status == "low"),
- mc_insufficient=sum(
- 1 for c in coverage if c.status == "insufficient_evidence"),
- mc_possibly=sum(
- 1 for c in coverage if c.status == "possibly_applicable"),
- )
- return lint_output(out)
+ return None
diff --git a/backend-compliance/compliance/tests/test_agb_agent.py b/backend-compliance/compliance/tests/test_agb_agent.py
new file mode 100644
index 00000000..ba218b6b
--- /dev/null
+++ b/backend-compliance/compliance/tests/test_agb_agent.py
@@ -0,0 +1,37 @@
+"""AGBAgent — kuratierte §§-305-ff-BGB-Checkliste (ChecklistAgent-Subclass)."""
+
+from __future__ import annotations
+
+import asyncio
+
+from compliance.services.specialist_agents import REGISTRY, AgentInput
+
+
+def _run(text: str):
+ return asyncio.run(
+ REGISTRY.get("agb").evaluate(AgentInput(doc_type="agb", text=text)))
+
+
+def test_agb_agent_registered():
+ assert REGISTRY.get("agb") is not None
+
+
+def test_agb_detects_core_clauses():
+ text = (
+ "Allgemeine Geschaeftsbedingungen. Geltungsbereich: Diese AGB gelten "
+ "fuer alle Vertraege. Vertragsschluss durch Bestellung. Preise inkl. "
+ "MwSt. Lieferung. Zahlung. Widerrufsrecht. Gewaehrleistung. Haftung. "
+ "Gerichtsstand Muenchen. ") * 4
+ out = _run(text)
+ assert out.agent == "agb"
+ assert out.mc_total >= 1
+ ok = [c.label for c in out.mc_coverage if c.status == "ok"]
+ assert any("Geltungsbereich" in lbl for lbl in ok)
+ # Titel/Maßnahme kurz (ChecklistAgent-Vertrag)
+ assert all(len(f.action) < 110 for f in out.findings)
+
+
+def test_agb_short_text_skips():
+ out = _run("zu kurz")
+ assert out.confidence == 0.0
+ assert all(c.status == "skipped" for c in out.mc_coverage)
diff --git a/backend-compliance/main.py b/backend-compliance/main.py
index 03e859cb..6f0a0577 100644
--- a/backend-compliance/main.py
+++ b/backend-compliance/main.py
@@ -50,6 +50,7 @@ from compliance.api.agent_recurring_routes import router as agent_recurring_rout
from compliance.api.agent_compare_routes import router as agent_compare_router
from compliance.api.agent_doc_check_routes import router as agent_doc_check_router
from compliance.api.agent_compliance_check_routes import router as agent_compliance_check_router
+from compliance.api.snapshot_check_routes import router as snapshot_check_router
from compliance.api.agent_findings_routes import router as agent_findings_router
from compliance.api.saving_scan_routes import router as saving_scan_router
from compliance.api.agent_migration_routes import router as agent_migration_router
@@ -160,6 +161,7 @@ app.include_router(agent_recurring_router, prefix="/api")
app.include_router(agent_compare_router, prefix="/api")
app.include_router(agent_doc_check_router, prefix="/api")
app.include_router(agent_compliance_check_router, prefix="/api")
+app.include_router(snapshot_check_router, prefix="/api")
app.include_router(agent_findings_router, prefix="/api")
app.include_router(saving_scan_router, prefix="/api")
app.include_router(agent_migration_router, prefix="/api")