feat(dse): kuratierter DSEAgent + Snapshot-Tab (Art. 13/14, kein Firehose)
DSEAgent wrappt die existierende ART13_CHECKLIST (33 kuratierte Pflichtangaben
L1 + Detailchecks L2) → strukturierter AgentOutput, NICHT der 90k-Library-
Firehose (eCall/Gesundheit/Telekom-Lärm). GET /snapshots/{id}/dse-check spiegelt
impressum-check; doc_input_from_snapshot generalisiert. Frontend: generischer
AgentModuleTab (lazy → AgentResultTab) für Impressum + DSE; DSE-Tab in der
Snapshot-Seite. Plus HRB-Pattern \d→\d+ (volle Registernummer als Beleg).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -53,15 +53,15 @@ def _derive_scope(profile_dict: dict) -> list[str]:
|
||||
return sorted(scope)
|
||||
|
||||
|
||||
def impressum_input_from_snapshot(snap: dict) -> dict | None:
|
||||
"""Baut den ImpressumAgent-Input aus einem gespeicherten Snapshot (kein
|
||||
Re-Crawl). Pure + testbar: zieht den Impressum-Text aus doc_entries, leitet
|
||||
den Scope aus scan_context + Profil ab (identisch zur Live-Auswertung) und
|
||||
nimmt site_label als company_name-Fallback. None, wenn kein Impressum-Text.
|
||||
def doc_input_from_snapshot(snap: dict, doc_type: str) -> dict | None:
|
||||
"""Baut den AgentInput für EINEN Doc-Type aus einem gespeicherten Snapshot
|
||||
(kein Re-Crawl). Pure + testbar: zieht den Text aus doc_entries, leitet den
|
||||
Scope aus scan_context + Profil ab (identisch zur Live-Auswertung) und nimmt
|
||||
site_label als company_name-Fallback. None, wenn kein/zu kurzer Text.
|
||||
"""
|
||||
docs = snap.get("doc_entries") or []
|
||||
text = next((e.get("text") or e.get("content") or ""
|
||||
for e in docs if e.get("doc_type") == "impressum"), "")
|
||||
for e in docs if e.get("doc_type") == doc_type), "")
|
||||
if len((text or "").strip()) < _MIN_TEXT:
|
||||
return None
|
||||
profile = snap.get("profile") or {}
|
||||
@@ -70,7 +70,7 @@ def impressum_input_from_snapshot(snap: dict) -> dict | None:
|
||||
| set(_derive_scope(profile))
|
||||
)
|
||||
return {
|
||||
"doc_type": "impressum",
|
||||
"doc_type": doc_type,
|
||||
"text": text,
|
||||
"business_scope": scope,
|
||||
"company_name": (profile.get("company_name") or snap.get("site_label") or ""),
|
||||
@@ -78,6 +78,11 @@ def impressum_input_from_snapshot(snap: dict) -> dict | None:
|
||||
}
|
||||
|
||||
|
||||
def impressum_input_from_snapshot(snap: dict) -> dict | None:
|
||||
"""Rückwärtskompatibler Alias für den Impressum-Endpoint."""
|
||||
return doc_input_from_snapshot(snap, "impressum")
|
||||
|
||||
|
||||
async def run_agent_outputs(state: dict) -> None:
|
||||
"""Für jedes Topic mit registriertem v3-Agent + ausreichend Text:
|
||||
Agent laufen lassen, AgentOutput ablegen + als SSE topic-Event
|
||||
|
||||
@@ -295,6 +295,33 @@ async def snapshot_impressum_check(snapshot_id: str):
|
||||
db.close()
|
||||
|
||||
|
||||
@router.get("/snapshots/{snapshot_id}/dse-check")
|
||||
async def snapshot_dse_check(snapshot_id: str):
|
||||
"""DSE-Analyse aus dem Snapshot (kein Re-Crawl): laeuft den kuratierten
|
||||
DSEAgent (Art. 13/14, ART13_CHECKLIST — KEIN Library-Firehose) auf dem
|
||||
gespeicherten DSE-Text und liefert den AgentOutput fuer den Tab."""
|
||||
from fastapi import HTTPException
|
||||
from database import SessionLocal
|
||||
from compliance.services.check_snapshot import load_snapshot
|
||||
from compliance.services.specialist_agents import REGISTRY, AgentInput
|
||||
from compliance.api.agent_check._agent_outputs import (
|
||||
doc_input_from_snapshot,
|
||||
)
|
||||
db = SessionLocal()
|
||||
try:
|
||||
snap = load_snapshot(db, snapshot_id)
|
||||
if not snap:
|
||||
raise HTTPException(status_code=404, detail="snapshot not found")
|
||||
agent_input = doc_input_from_snapshot(snap, "dse")
|
||||
if not agent_input:
|
||||
return {"findings": [], "recommendations": [], "mc_coverage": [],
|
||||
"notes": "kein DSE-Text im Snapshot", "confidence": 0.0}
|
||||
out = await REGISTRY.get("dse").evaluate(AgentInput(**agent_input))
|
||||
return out.model_dump(mode="json")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
@router.get("/admin/benchmark")
|
||||
async def benchmark(
|
||||
industry: str = "",
|
||||
|
||||
@@ -30,17 +30,19 @@ from ._base import (
|
||||
from ._registry import REGISTRY
|
||||
from .cookie_policy import CookiePolicyAgent
|
||||
from .cross_placement import CrossPlacementAgent
|
||||
from .dse import DSEAgent
|
||||
from .impressum import ImpressumAgent
|
||||
|
||||
# Self-register all agents
|
||||
REGISTRY.register(ImpressumAgent())
|
||||
REGISTRY.register(CookiePolicyAgent())
|
||||
REGISTRY.register(CrossPlacementAgent())
|
||||
REGISTRY.register(DSEAgent())
|
||||
|
||||
__all__ = [
|
||||
"AgentInput", "AgentOutput", "BaseSpecialistAgent",
|
||||
"EscalationLog", "EvidenceSource", "Finding", "McCoverage",
|
||||
"Recommendation", "Severity", "SourceType",
|
||||
"REGISTRY", "ImpressumAgent", "CookiePolicyAgent",
|
||||
"CrossPlacementAgent",
|
||||
"CrossPlacementAgent", "DSEAgent",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
"""DSE-Agent — Datenschutzerklärung (Art. 13/14 DSGVO), kuratiert."""
|
||||
|
||||
from .agent import DSEAgent
|
||||
|
||||
__all__ = ["DSEAgent"]
|
||||
@@ -0,0 +1,167 @@
|
||||
"""DSEAgent — Datenschutzerklärung / Datenschutzinformation (Art. 13/14 DSGVO).
|
||||
|
||||
Kuratiert: läuft die ART13_CHECKLIST (Pflichtangaben L1 „erwähnt?" +
|
||||
Detailchecks L2 „vollständig?") deterministisch über den DSE-Text. BEWUSST
|
||||
KEIN Library-Firehose (eCall/Gesundheit/Telekom/Data-Act-Lärm aus der 90k-
|
||||
Control-Library) — nur die echten Art-13/14-Auskunftspflichten. Output =
|
||||
AgentOutput (mc_coverage + Findings + Maßnahmen), gerendert im AgentResultTab
|
||||
wie das Impressum-Modul.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from compliance.services.doc_checks.dse_checks import ART13_CHECKLIST
|
||||
|
||||
from .._base import (
|
||||
AgentInput,
|
||||
AgentOutput,
|
||||
BaseSpecialistAgent,
|
||||
CheckStatus,
|
||||
EvidenceSource,
|
||||
Finding,
|
||||
McCoverage,
|
||||
Severity,
|
||||
SourceType,
|
||||
lint_output,
|
||||
)
|
||||
from .._rollup import rollup
|
||||
|
||||
_SEV = {"HIGH": Severity.HIGH, "MEDIUM": Severity.MEDIUM,
|
||||
"LOW": Severity.LOW, "INFO": Severity.INFO}
|
||||
# Coverage-Status bei FAIL spiegelt die Risiko-Achse (severity) der Quelle.
|
||||
_COV_FAIL = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low", "INFO": "low"}
|
||||
_NORM_RE = re.compile(r"\((Art\.[^)]+)\)")
|
||||
|
||||
|
||||
def _match_value(text: str, start: int, end: int) -> str:
|
||||
"""Exakter Treffer-Wert (nicht die umgebende Passage), normalisiert + gekappt."""
|
||||
return " ".join(text[start:end].split())[:120]
|
||||
|
||||
|
||||
def _norm_of(label: str) -> str:
|
||||
m = _NORM_RE.search(label or "")
|
||||
return m.group(1) if m else "Art. 13/14 DSGVO"
|
||||
|
||||
|
||||
def _compiled(check: dict) -> list:
|
||||
out = []
|
||||
for p in check.get("patterns", []):
|
||||
try:
|
||||
out.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
|
||||
except re.error:
|
||||
continue
|
||||
return out
|
||||
|
||||
|
||||
def _search(patterns: list, text: str):
|
||||
for p in patterns:
|
||||
m = p.search(text)
|
||||
if m:
|
||||
return m
|
||||
return None
|
||||
|
||||
|
||||
class DSEAgent(BaseSpecialistAgent):
|
||||
agent_id = "dse"
|
||||
agent_version = "1.0"
|
||||
doc_type = "dse"
|
||||
owned_mc_ids = tuple(c["id"] for c in ART13_CHECKLIST)
|
||||
|
||||
async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
|
||||
start = datetime.now(timezone.utc)
|
||||
text = (agent_input.text or "").strip()
|
||||
coverage: list[McCoverage] = []
|
||||
findings: list[Finding] = []
|
||||
|
||||
if len(text) < 100:
|
||||
for c in ART13_CHECKLIST:
|
||||
coverage.append(McCoverage(
|
||||
mc_id=c["id"], status="skipped",
|
||||
label=c["label"], reason="Text zu kurz"))
|
||||
return self._finalize(start, findings, coverage, 0.0,
|
||||
"DSE-Text zu kurz oder leer.")
|
||||
|
||||
# L1 (Pflichtangabe erwähnt?) zuerst — Ergebnis steuert L2.
|
||||
l1_present: dict[str, bool] = {}
|
||||
for c in ART13_CHECKLIST:
|
||||
if c.get("level", 1) != 1:
|
||||
continue
|
||||
m = _search(_compiled(c), text)
|
||||
l1_present[c["id"]] = m is not None
|
||||
coverage.append(self._cov(c, m, text))
|
||||
if m is None:
|
||||
findings.append(self._finding(c, present=False))
|
||||
|
||||
# L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 vorhanden ist
|
||||
# (sonst kein Doppel-Finding zum selben Mangel).
|
||||
for c in ART13_CHECKLIST:
|
||||
if c.get("level", 1) != 2:
|
||||
continue
|
||||
parent = c.get("parent")
|
||||
if parent and not l1_present.get(parent, False):
|
||||
coverage.append(McCoverage(
|
||||
mc_id=c["id"], status="na", label=c["label"],
|
||||
reason="übergeordnete Pflichtangabe fehlt"))
|
||||
continue
|
||||
m = _search(_compiled(c), text)
|
||||
coverage.append(self._cov(c, m, text))
|
||||
if m is None:
|
||||
findings.append(self._finding(c, present=True))
|
||||
|
||||
return self._finalize(start, findings, coverage, 0.7, "")
|
||||
|
||||
def _cov(self, c: dict, m, text: str) -> McCoverage:
|
||||
if m is not None:
|
||||
return McCoverage(
|
||||
mc_id=c["id"], status="ok", label=c["label"],
|
||||
reason="Pattern-Treffer",
|
||||
found=_match_value(text, m.start(), m.end()))
|
||||
sev = c.get("severity", "MEDIUM")
|
||||
return McCoverage(
|
||||
mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"),
|
||||
label=c["label"],
|
||||
reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig")
|
||||
|
||||
def _finding(self, c: dict, present: bool) -> Finding:
|
||||
sev = c.get("severity", "MEDIUM")
|
||||
title = (f"{c['label']}: Detail unvollständig" if present
|
||||
else f"{c['label']} fehlt")
|
||||
return Finding(
|
||||
check_id=f"DSE-{c['id']}",
|
||||
agent=self.agent_id, agent_version=self.agent_version,
|
||||
field_id=c["id"], status=CheckStatus.FAIL,
|
||||
severity=_SEV.get(sev, Severity.MEDIUM),
|
||||
severity_reason=("detail_incomplete" if present
|
||||
else "pflichtangabe_missing"),
|
||||
title=title, norm=_norm_of(c["label"]),
|
||||
action=c.get("hint", ""), confidence=0.7,
|
||||
sources=[EvidenceSource(
|
||||
source_type=SourceType.REGEX, source_id=c["id"],
|
||||
detail="kein Pattern-Treffer", confidence=0.7)],
|
||||
)
|
||||
|
||||
def _finalize(self, start, findings, coverage, confidence, notes):
|
||||
end = datetime.now(timezone.utc)
|
||||
recs = rollup([f for f in findings
|
||||
if f.status == CheckStatus.FAIL.value])
|
||||
out = AgentOutput(
|
||||
agent=self.agent_id, agent_version=self.agent_version,
|
||||
started_at=start, finished_at=end,
|
||||
duration_ms=int((end - start).total_seconds() * 1000),
|
||||
findings=findings, recommendations=recs, mc_coverage=coverage,
|
||||
confidence=confidence, notes=notes,
|
||||
mc_total=len(coverage),
|
||||
mc_ok=sum(1 for c in coverage if c.status == "ok"),
|
||||
mc_na=sum(1 for c in coverage if c.status == "na"),
|
||||
mc_high=sum(1 for c in coverage if c.status == "high"),
|
||||
mc_medium=sum(1 for c in coverage if c.status == "medium"),
|
||||
mc_low=sum(1 for c in coverage if c.status == "low"),
|
||||
mc_insufficient=sum(
|
||||
1 for c in coverage if c.status == "insufficient_evidence"),
|
||||
mc_possibly=sum(
|
||||
1 for c in coverage if c.status == "possibly_applicable"),
|
||||
)
|
||||
return lint_output(out)
|
||||
@@ -99,7 +99,7 @@ MCS: tuple[MC, ...] = (
|
||||
excludes_scope=("kein_handelsregister",),
|
||||
legal_form_dependent=True,
|
||||
patterns=(
|
||||
re.compile(r"\bHR[BA]\s+\d", re.IGNORECASE),
|
||||
re.compile(r"\bHR[BA]\s+\d+", re.IGNORECASE),
|
||||
re.compile(r"Handelsregister", re.IGNORECASE),
|
||||
),
|
||||
),
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
"""DSEAgent — kuratierte Art-13/14-Checkliste (kein Library-Firehose)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from compliance.services.specialist_agents import REGISTRY, AgentInput
|
||||
|
||||
|
||||
def _run(text: str):
|
||||
return asyncio.run(
|
||||
REGISTRY.get("dse").evaluate(AgentInput(doc_type="dse", text=text)))
|
||||
|
||||
|
||||
def test_dse_agent_registered():
|
||||
assert REGISTRY.get("dse") is not None
|
||||
|
||||
|
||||
def test_dse_detects_core_obligations():
|
||||
text = (
|
||||
"Datenschutzerklaerung. Verantwortlich im Sinne der DSGVO ist die "
|
||||
"Muster GmbH, Musterstrasse 1, 12345 Berlin. E-Mail: info@muster.de. "
|
||||
"Datenschutzbeauftragter: dsb@muster.de. Zwecke der Verarbeitung und "
|
||||
"Rechtsgrundlage Art. 6 Abs. 1. Empfaenger Ihrer Daten. Speicherdauer "
|
||||
"der Daten. Ihre Rechte: Auskunft, Loeschung, Widerspruch, Beschwerde "
|
||||
"bei der Aufsichtsbehoerde. ") * 3
|
||||
out = _run(text)
|
||||
assert out.agent == "dse"
|
||||
assert out.mc_total == 33 # ART13_CHECKLIST komplett
|
||||
ok = [c.label for c in out.mc_coverage if c.status == "ok"]
|
||||
assert any("Verantwortlich" in lbl for lbl in ok)
|
||||
assert any("Rechtsgrundlage" in lbl for lbl in ok)
|
||||
|
||||
|
||||
def test_dse_missing_obligations_are_findings():
|
||||
out = _run("Lorem ipsum dolor sit amet consectetur adipiscing elit. " * 6)
|
||||
assert out.findings
|
||||
assert any(f.severity == "HIGH" for f in out.findings)
|
||||
|
||||
|
||||
def test_dse_short_text_skips():
|
||||
out = _run("zu kurz")
|
||||
assert out.confidence == 0.0
|
||||
assert all(c.status == "skipped" for c in out.mc_coverage)
|
||||
Reference in New Issue
Block a user