From 3c6deac1c5ca2583e553478cfba7b826084dd794 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 11 Jun 2026 13:43:24 +0200 Subject: [PATCH] fix(dse+linter): Drittland-Applicability, kein na-Detail, kurze Titel, Linter-Wortgrenzen MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Linter: FORBIDDEN_OUTPUT_TERMS per Wortgrenze → 'Schutzgarantien'/'geeignete Garantien' (Art. 46) passieren, 'garantiert'-Claims bleiben geblockt. - DSE: L2-Detail wird übersprungen statt 'na', wenn die L1-Pflichtangabe fehlt (kein irreführendes 'nicht anwendbar' für z.B. Transfermechanismus). - DSE: Drittland → HIGH bei dokumentiertem Drittlandtransfer (scan_context via AgentInput.context) — BMW (Konzern, US-Provider) ist kein weiches MEDIUM. - DSE: Titel/Maßnahme kurz (treibt den Recommendation-Titel); ausführliche Begründung als evidence — behebt 120-Zeichen-abgeschnittene Überschriften. Co-Authored-By: Claude Opus 4.7 --- .../api/agent_check/_agent_outputs.py | 1 + .../services/specialist_agents/_base.py | 17 ++++++-- .../services/specialist_agents/dse/agent.py | 41 ++++++++++++------- .../compliance/tests/test_dse_agent.py | 23 ++++++++++- .../tests/test_linter_word_boundary.py | 32 +++++++++++++++ 5 files changed, 95 insertions(+), 19 deletions(-) create mode 100644 backend-compliance/compliance/tests/test_linter_word_boundary.py diff --git a/backend-compliance/compliance/api/agent_check/_agent_outputs.py b/backend-compliance/compliance/api/agent_check/_agent_outputs.py index 723f83c1..e8db550b 100644 --- a/backend-compliance/compliance/api/agent_check/_agent_outputs.py +++ b/backend-compliance/compliance/api/agent_check/_agent_outputs.py @@ -75,6 +75,7 @@ def doc_input_from_snapshot(snap: dict, doc_type: str) -> dict | None: "business_scope": scope, "company_name": (profile.get("company_name") or snap.get("site_label") or ""), "origin_domain": snap.get("site_domain", ""), + "context": {"scan_context": snap.get("scan_context") or {}}, } diff --git a/backend-compliance/compliance/services/specialist_agents/_base.py b/backend-compliance/compliance/services/specialist_agents/_base.py index 113e6b92..ee762d29 100644 --- a/backend-compliance/compliance/services/specialist_agents/_base.py +++ b/backend-compliance/compliance/services/specialist_agents/_base.py @@ -192,7 +192,7 @@ def lint_output(output: AgentOutput) -> AgentOutput: for field_name in ("title", "evidence", "action"): v = getattr(f, field_name) or "" for term in FORBIDDEN_OUTPUT_TERMS: - if term in v.lower(): + if _has_term(v, term): issues.append(f"Finding {f.check_id}.{field_name}: '{term}'") v = _scrub(v, term) setattr(f, field_name, v) @@ -200,7 +200,7 @@ def lint_output(output: AgentOutput) -> AgentOutput: for field_name in ("title", "body"): v = getattr(r, field_name) or "" for term in FORBIDDEN_OUTPUT_TERMS: - if term in v.lower(): + if _has_term(v, term): issues.append(f"Rec {r.recommendation_id}.{field_name}: '{term}'") v = _scrub(v, term) setattr(r, field_name, v) @@ -210,11 +210,20 @@ def lint_output(output: AgentOutput) -> AgentOutput: return output +def _has_term(text: str, term: str) -> bool: + """Verbotenes Wort an WORT-GRENZE (nicht Substring) — blockt 'garantiert'/ + 'garantie', lässt aber den Rechtsbegriff 'Schutzgarantien'/'geeignete + Garantien' (Art. 46 DSGVO) passieren.""" + import re as _re + return bool(_re.search( + r"\b" + _re.escape(term) + r"\b", text, _re.IGNORECASE)) + + def _scrub(text: str, term: str) -> str: - """Case-insensitive replace mit Marker.""" + """Case-insensitive replace mit Marker — nur das eigenständige Wort.""" import re as _re return _re.sub( - _re.escape(term), "[→ neutraler Wortlaut]", + r"\b" + _re.escape(term) + r"\b", "[→ neutraler Wortlaut]", text, flags=_re.IGNORECASE, ) diff --git a/backend-compliance/compliance/services/specialist_agents/dse/agent.py b/backend-compliance/compliance/services/specialist_agents/dse/agent.py index 1f85bced..98de8bcc 100644 --- a/backend-compliance/compliance/services/specialist_agents/dse/agent.py +++ b/backend-compliance/compliance/services/specialist_agents/dse/agent.py @@ -73,6 +73,9 @@ class DSEAgent(BaseSpecialistAgent): async def evaluate(self, agent_input: AgentInput) -> AgentOutput: start = datetime.now(timezone.utc) text = (agent_input.text or "").strip() + sc = (agent_input.context or {}).get("scan_context") or {} + tc_applies = str(sc.get("third_country_transfer", "")).lower() in ( + "yes", "true", "1", "ja") coverage: list[McCoverage] = [] findings: list[Finding] = [] @@ -91,44 +94,54 @@ class DSEAgent(BaseSpecialistAgent): continue m = _search(_compiled(c), text) l1_present[c["id"]] = m is not None - coverage.append(self._cov(c, m, text)) + coverage.append(self._cov(c, m, text, tc_applies)) if m is None: - findings.append(self._finding(c, present=False)) + findings.append(self._finding(c, False, tc_applies)) - # L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 vorhanden ist - # (sonst kein Doppel-Finding zum selben Mangel). + # L2 (vollständig/korrekt?) — nur wenn die übergeordnete L1 da ist. Fehlt + # die L1, deckt deren Finding die Lücke ab → KEIN irreführendes 'na' + # (nicht anwendbar) für das Detail (z.B. Transfermechanismus bei BMW). for c in ART13_CHECKLIST: if c.get("level", 1) != 2: continue parent = c.get("parent") if parent and not l1_present.get(parent, False): - coverage.append(McCoverage( - mc_id=c["id"], status="na", label=c["label"], - reason="übergeordnete Pflichtangabe fehlt")) continue m = _search(_compiled(c), text) - coverage.append(self._cov(c, m, text)) + coverage.append(self._cov(c, m, text, tc_applies)) if m is None: - findings.append(self._finding(c, present=True)) + findings.append(self._finding(c, True, tc_applies)) return self._finalize(start, findings, coverage, 0.7, "") - def _cov(self, c: dict, m, text: str) -> McCoverage: + @staticmethod + def _eff_sev(c: dict, tc_applies: bool) -> str: + """Drittland ist bei dokumentiertem Drittlandtransfer (Scan-Kontext) + keine weiche MEDIUM-Empfehlung mehr, sondern HIGH (Konzern/US-Provider).""" + if tc_applies and c["id"] in ("third_country", "third_country_mechanism"): + return "HIGH" + return c.get("severity", "MEDIUM") + + def _cov(self, c: dict, m, text: str, tc_applies: bool) -> McCoverage: if m is not None: return McCoverage( mc_id=c["id"], status="ok", label=c["label"], reason="Pattern-Treffer", found=_match_value(text, m.start(), m.end())) - sev = c.get("severity", "MEDIUM") + sev = self._eff_sev(c, tc_applies) return McCoverage( mc_id=c["id"], status=_COV_FAIL.get(sev, "medium"), label=c["label"], reason="fehlt" if c.get("level", 1) == 1 else "Detail unvollständig") - def _finding(self, c: dict, present: bool) -> Finding: - sev = c.get("severity", "MEDIUM") + def _finding(self, c: dict, present: bool, tc_applies: bool) -> Finding: + sev = self._eff_sev(c, tc_applies) + # Titel + Maßnahme bewusst KURZ (treibt den Recommendation-Titel); die + # ausführliche Begründung steht als evidence auf der Finding-Karte. title = (f"{c['label']}: Detail unvollständig" if present else f"{c['label']} fehlt") + action = (f"{c['label']} präzisieren." if present + else f"{c['label']} in der Datenschutzerklärung ergänzen.") return Finding( check_id=f"DSE-{c['id']}", agent=self.agent_id, agent_version=self.agent_version, @@ -137,7 +150,7 @@ class DSEAgent(BaseSpecialistAgent): severity_reason=("detail_incomplete" if present else "pflichtangabe_missing"), title=title, norm=_norm_of(c["label"]), - action=c.get("hint", ""), confidence=0.7, + action=action, evidence=(c.get("hint") or "")[:280], confidence=0.7, sources=[EvidenceSource( source_type=SourceType.REGEX, source_id=c["id"], detail="kein Pattern-Treffer", confidence=0.7)], diff --git a/backend-compliance/compliance/tests/test_dse_agent.py b/backend-compliance/compliance/tests/test_dse_agent.py index 2c6a685d..d5f2acdc 100644 --- a/backend-compliance/compliance/tests/test_dse_agent.py +++ b/backend-compliance/compliance/tests/test_dse_agent.py @@ -26,7 +26,9 @@ def test_dse_detects_core_obligations(): "bei der Aufsichtsbehoerde. ") * 3 out = _run(text) assert out.agent == "dse" - assert out.mc_total == 33 # ART13_CHECKLIST komplett + # 10 L1-Pflichtangaben immer + L2-Details deren Parent vorhanden ist + # (fehlende Parents → L2 übersprungen, kein 'na'-Rauschen). + assert 10 <= out.mc_total <= 33 ok = [c.label for c in out.mc_coverage if c.status == "ok"] assert any("Verantwortlich" in lbl for lbl in ok) assert any("Rechtsgrundlage" in lbl for lbl in ok) @@ -42,3 +44,22 @@ def test_dse_short_text_skips(): out = _run("zu kurz") assert out.confidence == 0.0 assert all(c.status == "skipped" for c in out.mc_coverage) + + +def test_third_country_high_when_applicable_no_na_detail_short_action(): + # Text ohne Drittland-Abschnitt + Scan-Kontext drittland=ja: + # - third_country (L1) fehlt → HIGH (nicht weiches MEDIUM) + # - Transfermechanismus (L2) → KEIN 'na' (übersprungen, Parent deckt ab) + # - Titel/Maßnahme kurz (kein 280-Zeichen-Hint als Recommendation-Titel) + text = ("Datenschutz. Verantwortlich ist die Muster GmbH, info@muster.de. " + "Zwecke und Rechtsgrundlage Art. 6. Speicherdauer. Ihre Rechte. ") * 4 + out = asyncio.run(REGISTRY.get("dse").evaluate(AgentInput( + doc_type="dse", text=text, + context={"scan_context": {"third_country_transfer": "yes"}}))) + tc = [f for f in out.findings if "Drittland" in f.title] + assert tc and tc[0].severity == "HIGH" + assert not any(c.status == "na" and "Transfermechanismus" in c.label + for c in out.mc_coverage) + assert all(len(f.action) < 110 for f in out.findings) + # Detail-Begründung bleibt als evidence erhalten + assert any(f.evidence for f in out.findings) diff --git a/backend-compliance/compliance/tests/test_linter_word_boundary.py b/backend-compliance/compliance/tests/test_linter_word_boundary.py new file mode 100644 index 00000000..75812100 --- /dev/null +++ b/backend-compliance/compliance/tests/test_linter_word_boundary.py @@ -0,0 +1,32 @@ +"""Disclaimer-Linter: Wort-Grenzen — Rechtsbegriffe passieren, Claims geblockt.""" + +from __future__ import annotations + +from datetime import datetime, timezone + +from compliance.services.specialist_agents._base import ( + AgentOutput, + Finding, + Severity, + lint_output, +) + + +def _out(action: str) -> AgentOutput: + now = datetime.now(timezone.utc) + f = Finding(check_id="X", agent="t", agent_version="1", + severity=Severity.MEDIUM, title="Titel", action=action) + return AgentOutput(agent="t", agent_version="1", started_at=now, + finished_at=now, duration_ms=0, findings=[f]) + + +def test_schutzgarantien_not_scrubbed(): + out = lint_output(_out("Geeignete Schutzgarantien nach Art. 46 angeben.")) + assert "Schutzgarantien" in out.findings[0].action + assert "neutraler Wortlaut" not in out.findings[0].action + + +def test_garantiert_claim_still_blocked(): + out = lint_output(_out("Dies ist garantiert konform.")) + assert "garantiert" not in out.findings[0].action.lower() + assert "neutraler Wortlaut" in out.findings[0].action