fix(dse+linter): Drittland-Applicability, kein na-Detail, kurze Titel, Linter-Wortgrenzen

- Linter: FORBIDDEN_OUTPUT_TERMS per Wortgrenze → 'Schutzgarantien'/'geeignete
  Garantien' (Art. 46) passieren, 'garantiert'-Claims bleiben geblockt.
- DSE: L2-Detail wird übersprungen statt 'na', wenn die L1-Pflichtangabe fehlt
  (kein irreführendes 'nicht anwendbar' für z.B. Transfermechanismus).
- DSE: Drittland → HIGH bei dokumentiertem Drittlandtransfer (scan_context via
  AgentInput.context) — BMW (Konzern, US-Provider) ist kein weiches MEDIUM.
- DSE: Titel/Maßnahme kurz (treibt den Recommendation-Titel); ausführliche
  Begründung als evidence — behebt 120-Zeichen-abgeschnittene Überschriften.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-11 13:43:24 +02:00
parent 6b41eec176
commit 3c6deac1c5
5 changed files with 95 additions and 19 deletions
@@ -75,6 +75,7 @@ def doc_input_from_snapshot(snap: dict, doc_type: str) -> dict | None:
"business_scope": scope, "business_scope": scope,
"company_name": (profile.get("company_name") or snap.get("site_label") or ""), "company_name": (profile.get("company_name") or snap.get("site_label") or ""),
"origin_domain": snap.get("site_domain", ""), "origin_domain": snap.get("site_domain", ""),
"context": {"scan_context": snap.get("scan_context") or {}},
} }
@@ -192,7 +192,7 @@ def lint_output(output: AgentOutput) -> AgentOutput:
for field_name in ("title", "evidence", "action"): for field_name in ("title", "evidence", "action"):
v = getattr(f, field_name) or "" v = getattr(f, field_name) or ""
for term in FORBIDDEN_OUTPUT_TERMS: for term in FORBIDDEN_OUTPUT_TERMS:
if term in v.lower(): if _has_term(v, term):
issues.append(f"Finding {f.check_id}.{field_name}: '{term}'") issues.append(f"Finding {f.check_id}.{field_name}: '{term}'")
v = _scrub(v, term) v = _scrub(v, term)
setattr(f, field_name, v) setattr(f, field_name, v)
@@ -200,7 +200,7 @@ def lint_output(output: AgentOutput) -> AgentOutput:
for field_name in ("title", "body"): for field_name in ("title", "body"):
v = getattr(r, field_name) or "" v = getattr(r, field_name) or ""
for term in FORBIDDEN_OUTPUT_TERMS: for term in FORBIDDEN_OUTPUT_TERMS:
if term in v.lower(): if _has_term(v, term):
issues.append(f"Rec {r.recommendation_id}.{field_name}: '{term}'") issues.append(f"Rec {r.recommendation_id}.{field_name}: '{term}'")
v = _scrub(v, term) v = _scrub(v, term)
setattr(r, field_name, v) setattr(r, field_name, v)
@@ -210,11 +210,20 @@ def lint_output(output: AgentOutput) -> AgentOutput:
return output return output
def _has_term(text: str, term: str) -> bool:
"""Verbotenes Wort an WORT-GRENZE (nicht Substring) — blockt 'garantiert'/
'garantie', lässt aber den Rechtsbegriff 'Schutzgarantien'/'geeignete
Garantien' (Art. 46 DSGVO) passieren."""
import re as _re
return bool(_re.search(
r"\b" + _re.escape(term) + r"\b", text, _re.IGNORECASE))
def _scrub(text: str, term: str) -> str: def _scrub(text: str, term: str) -> str:
"""Case-insensitive replace mit Marker.""" """Case-insensitive replace mit Marker — nur das eigenständige Wort."""
import re as _re import re as _re
return _re.sub( return _re.sub(
_re.escape(term), "[→ neutraler Wortlaut]", r"\b" + _re.escape(term) + r"\b", "[→ neutraler Wortlaut]",
text, flags=_re.IGNORECASE, text, flags=_re.IGNORECASE,
) )
@@ -73,6 +73,9 @@ class DSEAgent(BaseSpecialistAgent):
async def evaluate(self, agent_input: AgentInput) -> AgentOutput: async def evaluate(self, agent_input: AgentInput) -> AgentOutput:
start = datetime.now(timezone.utc) start = datetime.now(timezone.utc)
text = (agent_input.text or "").strip() text = (agent_input.text or "").strip()
sc = (agent_input.context or {}).get("scan_context") or {}
tc_applies = str(sc.get("third_country_transfer", "")).lower() in (
"yes", "true", "1", "ja")
coverage: list[McCoverage] = [] coverage: list[McCoverage] = []
findings: list[Finding] = [] findings: list[Finding] = []
@@ -91,44 +94,54 @@ class DSEAgent(BaseSpecialistAgent):
continue continue
m = _search(_compiled(c), text) m = _search(_compiled(c), text)
l1_present[c["id"]] = m is not None l1_present[c["id"]] = m is not None
coverage.append(self._cov(c, m, text)) coverage.append(self._cov(c, m, text, tc_applies))
if m is None: if m is None:
findings.append(self._finding(c, present=False)) findings.append(self._finding(c, False, tc_applies))
# L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 vorhanden ist # L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 da ist. Fehlt
# (sonst kein Doppel-Finding zum selben Mangel). # die L1, deckt deren Finding die Lücke ab → KEIN irreführendes 'na'
# (nicht anwendbar) für das Detail (z.B. Transfermechanismus bei BMW).
for c in ART13_CHECKLIST: for c in ART13_CHECKLIST:
if c.get("level", 1) != 2: if c.get("level", 1) != 2:
continue continue
parent = c.get("parent") parent = c.get("parent")
if parent and not l1_present.get(parent, False): if parent and not l1_present.get(parent, False):
coverage.append(McCoverage(
mc_id=c["id"], status="na", label=c["label"],
reason="übergeordnete Pflichtangabe fehlt"))
continue continue
m = _search(_compiled(c), text) m = _search(_compiled(c), text)
coverage.append(self._cov(c, m, text)) coverage.append(self._cov(c, m, text, tc_applies))
if m is None: if m is None:
findings.append(self._finding(c, present=True)) findings.append(self._finding(c, True, tc_applies))
return self._finalize(start, findings, coverage, 0.7, "") return self._finalize(start, findings, coverage, 0.7, "")
def _cov(self, c: dict, m, text: str) -> McCoverage: @staticmethod
def _eff_sev(c: dict, tc_applies: bool) -> str:
"""Drittland ist bei dokumentiertem Drittlandtransfer (Scan-Kontext)
keine weiche MEDIUM-Empfehlung mehr, sondern HIGH (Konzern/US-Provider)."""
if tc_applies and c["id"] in ("third_country", "third_country_mechanism"):
return "HIGH"
return c.get("severity", "MEDIUM")
def _cov(self, c: dict, m, text: str, tc_applies: bool) -> McCoverage:
if m is not None: if m is not None:
return McCoverage( return McCoverage(
mc_id=c["id"], status="ok", label=c["label"], mc_id=c["id"], status="ok", label=c["label"],
reason="Pattern-Treffer", reason="Pattern-Treffer",
found=_match_value(text, m.start(), m.end())) found=_match_value(text, m.start(), m.end()))
sev = c.get("severity", "MEDIUM") sev = self._eff_sev(c, tc_applies)
return McCoverage( return McCoverage(
mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"), mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"),
label=c["label"], label=c["label"],
reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig") reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig")
def _finding(self, c: dict, present: bool) -> Finding: def _finding(self, c: dict, present: bool, tc_applies: bool) -> Finding:
sev = c.get("severity", "MEDIUM") sev = self._eff_sev(c, tc_applies)
# Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die
# ausführliche Begründung steht als evidence auf der Finding-Karte.
title = (f"{c['label']}: Detail unvollständig" if present title = (f"{c['label']}: Detail unvollständig" if present
else f"{c['label']} fehlt") else f"{c['label']} fehlt")
action = (f"{c['label']} präzisieren." if present
else f"{c['label']} in der Datenschutzerklärung ergänzen.")
return Finding( return Finding(
check_id=f"DSE-{c['id']}", check_id=f"DSE-{c['id']}",
agent=self.agent_id, agent_version=self.agent_version, agent=self.agent_id, agent_version=self.agent_version,
@@ -137,7 +150,7 @@ class DSEAgent(BaseSpecialistAgent):
severity_reason=("detail_incomplete" if present severity_reason=("detail_incomplete" if present
else "pflichtangabe_missing"), else "pflichtangabe_missing"),
title=title, norm=_norm_of(c["label"]), title=title, norm=_norm_of(c["label"]),
action=c.get("hint", ""), confidence=0.7, action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7,
sources=[EvidenceSource( sources=[EvidenceSource(
source_type=SourceType.REGEX, source_id=c["id"], source_type=SourceType.REGEX, source_id=c["id"],
detail="kein Pattern-Treffer", confidence=0.7)], detail="kein Pattern-Treffer", confidence=0.7)],
@@ -26,7 +26,9 @@ def test_dse_detects_core_obligations():
"bei der Aufsichtsbehoerde. ") * 3 "bei der Aufsichtsbehoerde. ") * 3
out = _run(text) out = _run(text)
assert out.agent == "dse" assert out.agent == "dse"
assert out.mc_total == 33 # ART13_CHECKLIST komplett # 10 L1-Pflichtangaben immer + L2-Details deren Parent vorhanden ist
# (fehlende Parents → L2 übersprungen, kein 'na'-Rauschen).
assert 10 <= out.mc_total <= 33
ok = [c.label for c in out.mc_coverage if c.status == "ok"] ok = [c.label for c in out.mc_coverage if c.status == "ok"]
assert any("Verantwortlich" in lbl for lbl in ok) assert any("Verantwortlich" in lbl for lbl in ok)
assert any("Rechtsgrundlage" in lbl for lbl in ok) assert any("Rechtsgrundlage" in lbl for lbl in ok)
@@ -42,3 +44,22 @@ def test_dse_short_text_skips():
out = _run("zu kurz") out = _run("zu kurz")
assert out.confidence == 0.0 assert out.confidence == 0.0
assert all(c.status == "skipped" for c in out.mc_coverage) assert all(c.status == "skipped" for c in out.mc_coverage)
def test_third_country_high_when_applicable_no_na_detail_short_action():
# Text ohne Drittland-Abschnitt + Scan-Kontext drittland=ja:
# - third_country (L1) fehlt → HIGH (nicht weiches MEDIUM)
# - Transfermechanismus (L2) → KEIN 'na' (übersprungen, Parent deckt ab)
# - Titel/Maßnahme kurz (kein 280-Zeichen-Hint als Recommendation-Titel)
text = ("Datenschutz. Verantwortlich ist die Muster GmbH, info@muster.de. "
"Zwecke und Rechtsgrundlage Art. 6. Speicherdauer. Ihre Rechte. ") * 4
out = asyncio.run(REGISTRY.get("dse").evaluate(AgentInput(
doc_type="dse", text=text,
context={"scan_context": {"third_country_transfer": "yes"}})))
tc = [f for f in out.findings if "Drittland" in f.title]
assert tc and tc[0].severity == "HIGH"
assert not any(c.status == "na" and "Transfermechanismus" in c.label
for c in out.mc_coverage)
assert all(len(f.action) < 110 for f in out.findings)
# Detail-Begründung bleibt als evidence erhalten
assert any(f.evidence for f in out.findings)
@@ -0,0 +1,32 @@
"""Disclaimer-Linter: Wort-Grenzen — Rechtsbegriffe passieren, Claims geblockt."""
from __future__ import annotations
from datetime import datetime, timezone
from compliance.services.specialist_agents._base import (
AgentOutput,
Finding,
Severity,
lint_output,
)
def _out(action: str) -> AgentOutput:
now = datetime.now(timezone.utc)
f = Finding(check_id="X", agent="t", agent_version="1",
severity=Severity.MEDIUM, title="Titel", action=action)
return AgentOutput(agent="t", agent_version="1", started_at=now,
finished_at=now, duration_ms=0, findings=[f])
def test_schutzgarantien_not_scrubbed():
out = lint_output(_out("Geeignete Schutzgarantien nach Art. 46 angeben."))
assert "Schutzgarantien" in out.findings[0].action
assert "neutraler Wortlaut" not in out.findings[0].action
def test_garantiert_claim_still_blocked():
out = lint_output(_out("Dies ist garantiert konform."))
assert "garantiert" not in out.findings[0].action.lower()
assert "neutraler Wortlaut" in out.findings[0].action