refactor+feat: Snapshot-Router-Split + generischer ChecklistAgent + AGB-Modul

- Item 2: Snapshot-Doc-Checks (cookie/impressum/dse/agb) in snapshot_check_routes.py
  (agent_compliance_check_routes.py 464→365 Z.); gleiche Pfade, in main.py registriert.
- ChecklistAgent-Basis: DSE-Logik generalisiert (L1/L2, kurze Titel, _severity_
  override-Hook). DSEAgent + AGBAgent sind jetzt Thin-Subclasses → künftige
  Doc-Agenten (widerruf/avv/…) trivial.
- Item 4: AGBAgent (§§ 305 ff. BGB, AGB_CHECKLIST) + agb-check + AGB-Tab via
  AgentModuleTab. Kein Library-Firehose.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-11 14:23:29 +02:00
parent b40edd6d33
commit 7258744107
11 changed files with 391 additions and 262 deletions
@@ -28,6 +28,7 @@ from ._base import (
SourceType,
)
from ._registry import REGISTRY
from .agb import AGBAgent
from .cookie_policy import CookiePolicyAgent
from .cross_placement import CrossPlacementAgent
from .dse import DSEAgent
@@ -38,11 +39,12 @@ REGISTRY.register(ImpressumAgent())
REGISTRY.register(CookiePolicyAgent())
REGISTRY.register(CrossPlacementAgent())
REGISTRY.register(DSEAgent())
REGISTRY.register(AGBAgent())
__all__ = [
"AgentInput", "AgentOutput", "BaseSpecialistAgent",
"EscalationLog", "EvidenceSource", "Finding", "McCoverage",
"Recommendation", "Severity", "SourceType",
"REGISTRY", "ImpressumAgent", "CookiePolicyAgent",
"CrossPlacementAgent", "DSEAgent",
"CrossPlacementAgent", "DSEAgent", "AGBAgent",
]
@@ -0,0 +1,175 @@
"""ChecklistAgent — generischer Doc-Agent über eine kuratierte Pflichtangaben-
Checkliste (L1 „erwähnt?" + L2 „vollständig?").
Basis für DSE/AGB/Widerruf/… : läuft die Checkliste deterministisch über den
Text → strukturierter AgentOutput (mc_coverage + Findings + Maßnahmen). BEWUSST
KEIN Library-Firehose. Subklassen setzen nur CHECKLIST/agent_id/doc_type und
können per _severity_override() die Severity kontextabhängig anheben.
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from ._base import (
AgentInput,
AgentOutput,
BaseSpecialistAgent,
CheckStatus,
EvidenceSource,
Finding,
McCoverage,
Severity,
SourceType,
lint_output,
)
from ._rollup import rollup
_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM,
"LOW": Severity.LOW, "INFO": Severity.INFO}
# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle.
_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"}
# Norm-Referenz aus dem Label ziehen: '(Art. 13(1)(a))' oder '(§305 BGB)'.
_NORM_RE = re.compile(r"\((Art\.[^)]+|§\s*\d+[^)]*)\)")
def _match_value(text: str, start: int, end: int) -> str:
"""Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt."""
return " ".join(text[start:end].split())[:120]
def _norm_of(label: str) -> str:
m = _NORM_RE.search(label or "")
return m.group(1).strip() if m else ""
def _compiled(check: dict) -> list:
out = []
for p in check.get("patterns", []):
try:
out.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
except re.error:
continue
return out
def _search(patterns: list, text: str):
for p in patterns:
m = p.search(text)
if m:
return m
return None
class ChecklistAgent(BaseSpecialistAgent):
CHECKLIST: list[dict] = []
agent_id = ""
agent_version = "1.0"
doc_type = ""
def _severity_override(self, c: dict, agent_input: AgentInput):
"""Hook: Subklasse kann die Checklist-Severity kontextabhängig anheben
(z.B. DSE: Drittland → HIGH bei dokumentiertem Transfer). None = keine."""
return None
def _eff_sev(self, c: dict, agent_input: AgentInput) -> str:
return self._severity_override(c, agent_input) or c.get("severity", "MEDIUM")
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
start = datetime.now(timezone.utc)
text = (agent_input.text or "").strip()
coverage: list[McCoverage] = []
findings: list[Finding] = []
if len(text) < 100:
for c in self.CHECKLIST:
coverage.append(McCoverage(
mc_id=c["id"], status="skipped",
label=c["label"], reason="Text zu kurz"))
return self._finalize(start, findings, coverage, 0.0,
f"{self.doc_type}-Text zu kurz oder leer.")
# L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2.
l1_present: dict[str, bool] = {}
for c in self.CHECKLIST:
if c.get("level", 1) != 1:
continue
m = _search(_compiled(c), text)
l1_present[c["id"]] = m is not None
coverage.append(self._cov(c, m, text, agent_input))
if m is None:
findings.append(self._finding(c, False, agent_input))
# L2 (vollständig?) — nur wenn die übergeordnete L1 da ist. Fehlt die L1,
# deckt deren Finding die Lücke ab → KEIN irreführendes 'na' (nicht
# anwendbar) für das Detail.
for c in self.CHECKLIST:
if c.get("level", 1) != 2:
continue
parent = c.get("parent")
if parent and not l1_present.get(parent, False):
continue
m = _search(_compiled(c), text)
coverage.append(self._cov(c, m, text, agent_input))
if m is None:
findings.append(self._finding(c, True, agent_input))
return self._finalize(start, findings, coverage, 0.7, "")
def _cov(self, c: dict, m, text: str, ai: AgentInput) -> McCoverage:
if m is not None:
return McCoverage(
mc_id=c["id"], status="ok", label=c["label"],
reason="Pattern-Treffer",
found=_match_value(text, m.start(), m.end()))
sev = self._eff_sev(c, ai)
return McCoverage(
mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"),
label=c["label"],
reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig")
def _finding(self, c: dict, present: bool, ai: AgentInput) -> Finding:
sev = self._eff_sev(c, ai)
# Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die
# ausführliche Begründung steht als evidence auf der Finding-Karte.
title = (f"{c['label']}: Detail unvollständig" if present
else f"{c['label']} fehlt")
action = (f"{c['label']} präzisieren." if present
else f"{c['label']} ergänzen.")
return Finding(
check_id=f"{self.agent_id.upper()}-{c['id']}",
agent=self.agent_id, agent_version=self.agent_version,
field_id=c["id"], status=CheckStatus.FAIL,
severity=_SEV.get(sev, Severity.MEDIUM),
severity_reason=("detail_incomplete" if present
else "pflichtangabe_missing"),
title=title, norm=_norm_of(c["label"]),
action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7,
sources=[EvidenceSource(
source_type=SourceType.REGEX, source_id=c["id"],
detail="kein Pattern-Treffer", confidence=0.7)],
)
def _finalize(self, start, findings, coverage, confidence, notes):
end = datetime.now(timezone.utc)
recs = rollup([f for f in findings
if f.status == CheckStatus.FAIL.value])
out = AgentOutput(
agent=self.agent_id, agent_version=self.agent_version,
started_at=start, finished_at=end,
duration_ms=int((end - start).total_seconds() * 1000),
findings=findings, recommendations=recs, mc_coverage=coverage,
confidence=confidence, notes=notes,
mc_total=len(coverage),
mc_ok=sum(1 for c in coverage if c.status == "ok"),
mc_na=sum(1 for c in coverage if c.status == "na"),
mc_high=sum(1 for c in coverage if c.status == "high"),
mc_medium=sum(1 for c in coverage if c.status == "medium"),
mc_low=sum(1 for c in coverage if c.status == "low"),
mc_insufficient=sum(
1 for c in coverage if c.status == "insufficient_evidence"),
mc_possibly=sum(
1 for c in coverage if c.status == "possibly_applicable"),
)
return lint_output(out)
@@ -0,0 +1,5 @@
"""AGB-Agent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB), kuratiert."""
from .agent import AGBAgent
__all__ = ["AGBAgent"]
@@ -0,0 +1,19 @@
"""AGBAgent — Allgemeine Geschäftsbedingungen (§§ 305 ff. BGB).
Thin-Subclass von ChecklistAgent über die kuratierte AGB_CHECKLIST (L1
Pflichtangaben + L2 Detailchecks). KEIN Library-Firehose.
"""
from __future__ import annotations
from compliance.services.doc_checks.agb_checks import AGB_CHECKLIST
from .._checklist_agent import ChecklistAgent
class AGBAgent(ChecklistAgent):
CHECKLIST = AGB_CHECKLIST
agent_id = "agb"
agent_version = "1.0"
doc_type = "agb"
owned_mc_ids = tuple(c["id"] for c in AGB_CHECKLIST)
@@ -1,180 +1,29 @@
"""DSEAgent — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 DSGVO).
Kuratiert: läuft die ART13_CHECKLIST (Pflichtangaben L1 „erwähnt?" +
Detailchecks L2 „vollständig?") deterministisch über den DSE-Text. BEWUSST
KEIN Library-Firehose (eCall/Gesundheit/Telekom/Data-Act-Lärm aus der 90k-
Control-Library) — nur die echten Art-13/14-Auskunftspflichten. Output =
AgentOutput (mc_coverage + Findings + Maßnahmen), gerendert im AgentResultTab
wie das Impressum-Modul.
Thin-Subclass von ChecklistAgent über die kuratierte ART13_CHECKLIST (KEIN
90k-Library-Firehose). Einzige Spezialität: Drittland wird bei dokumentiertem
Drittlandtransfer (Scan-Kontext) zu HIGH angehoben.
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST
from .._base import (
AgentInput,
AgentOutput,
BaseSpecialistAgent,
CheckStatus,
EvidenceSource,
Finding,
McCoverage,
Severity,
SourceType,
lint_output,
)
from .._rollup import rollup
_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM,
"LOW": Severity.LOW, "INFO": Severity.INFO}
# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle.
_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"}
_NORM_RE = re.compile(r"\((Art\.[^)]+)\)")
from .._base import AgentInput
from .._checklist_agent import ChecklistAgent
def _match_value(text: str, start: int, end: int) -> str:
"""Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt."""
return " ".join(text[start:end].split())[:120]
def _norm_of(label: str) -> str:
m = _NORM_RE.search(label or "")
return m.group(1) if m else "Art. 13/14 DSGVO"
def _compiled(check: dict) -> list:
out = []
for p in check.get("patterns", []):
try:
out.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
except re.error:
continue
return out
def _search(patterns: list, text: str):
for p in patterns:
m = p.search(text)
if m:
return m
return None
class DSEAgent(BaseSpecialistAgent):
class DSEAgent(ChecklistAgent):
CHECKLIST = ART13_CHECKLIST
agent_id = "dse"
agent_version = "1.0"
doc_type = "dse"
owned_mc_ids = tuple(c["id"] for c in ART13_CHECKLIST)
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
start = datetime.now(timezone.utc)
text = (agent_input.text or "").strip()
def _severity_override(self, c: dict, agent_input: AgentInput):
sc = (agent_input.context or {}).get("scan_context") or {}
tc_applies = str(sc.get("third_country_transfer", "")).lower() in (
tc = str(sc.get("third_country_transfer", "")).lower() in (
"yes", "true", "1", "ja")
coverage: list[McCoverage] = []
findings: list[Finding] = []
if len(text) < 100:
for c in ART13_CHECKLIST:
coverage.append(McCoverage(
mc_id=c["id"], status="skipped",
label=c["label"], reason="Text zu kurz"))
return self._finalize(start, findings, coverage, 0.0,
"DSE-Text zu kurz oder leer.")
# L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2.
l1_present: dict[str, bool] = {}
for c in ART13_CHECKLIST:
if c.get("level", 1) != 1:
continue
m = _search(_compiled(c), text)
l1_present[c["id"]] = m is not None
coverage.append(self._cov(c, m, text, tc_applies))
if m is None:
findings.append(self._finding(c, False, tc_applies))
# L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 da ist. Fehlt
# die L1, deckt deren Finding die Lücke ab → KEIN irreführendes 'na'
# (nicht anwendbar) für das Detail (z.B. Transfermechanismus bei BMW).
for c in ART13_CHECKLIST:
if c.get("level", 1) != 2:
continue
parent = c.get("parent")
if parent and not l1_present.get(parent, False):
continue
m = _search(_compiled(c), text)
coverage.append(self._cov(c, m, text, tc_applies))
if m is None:
findings.append(self._finding(c, True, tc_applies))
return self._finalize(start, findings, coverage, 0.7, "")
@staticmethod
def _eff_sev(c: dict, tc_applies: bool) -> str:
"""Drittland ist bei dokumentiertem Drittlandtransfer (Scan-Kontext)
keine weiche MEDIUM-Empfehlung mehr, sondern HIGH (Konzern/US-Provider)."""
if tc_applies and c["id"] in ("third_country", "third_country_mechanism"):
if tc and c["id"] in ("third_country", "third_country_mechanism"):
return "HIGH"
return c.get("severity", "MEDIUM")
def _cov(self, c: dict, m, text: str, tc_applies: bool) -> McCoverage:
if m is not None:
return McCoverage(
mc_id=c["id"], status="ok", label=c["label"],
reason="Pattern-Treffer",
found=_match_value(text, m.start(), m.end()))
sev = self._eff_sev(c, tc_applies)
return McCoverage(
mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"),
label=c["label"],
reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig")
def _finding(self, c: dict, present: bool, tc_applies: bool) -> Finding:
sev = self._eff_sev(c, tc_applies)
# Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die
# ausführliche Begründung steht als evidence auf der Finding-Karte.
title = (f"{c['label']}: Detail unvollständig" if present
else f"{c['label']} fehlt")
action = (f"{c['label']} präzisieren." if present
else f"{c['label']} in der Datenschutzerklärung ergänzen.")
return Finding(
check_id=f"DSE-{c['id']}",
agent=self.agent_id, agent_version=self.agent_version,
field_id=c["id"], status=CheckStatus.FAIL,
severity=_SEV.get(sev, Severity.MEDIUM),
severity_reason=("detail_incomplete" if present
else "pflichtangabe_missing"),
title=title, norm=_norm_of(c["label"]),
action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7,
sources=[EvidenceSource(
source_type=SourceType.REGEX, source_id=c["id"],
detail="kein Pattern-Treffer", confidence=0.7)],
)
def _finalize(self, start, findings, coverage, confidence, notes):
end = datetime.now(timezone.utc)
recs = rollup([f for f in findings
if f.status == CheckStatus.FAIL.value])
out = AgentOutput(
agent=self.agent_id, agent_version=self.agent_version,
started_at=start, finished_at=end,
duration_ms=int((end - start).total_seconds() * 1000),
findings=findings, recommendations=recs, mc_coverage=coverage,
confidence=confidence, notes=notes,
mc_total=len(coverage),
mc_ok=sum(1 for c in coverage if c.status == "ok"),
mc_na=sum(1 for c in coverage if c.status == "na"),
mc_high=sum(1 for c in coverage if c.status == "high"),
mc_medium=sum(1 for c in coverage if c.status == "medium"),
mc_low=sum(1 for c in coverage if c.status == "low"),
mc_insufficient=sum(
1 for c in coverage if c.status == "insufficient_evidence"),
mc_possibly=sum(
1 for c in coverage if c.status == "possibly_applicable"),
)
return lint_output(out)
return None