From ba7d98be367af3032320be73da28815deeaaa7c3 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 11 Jun 2026 15:42:16 +0200 Subject: [PATCH] =?UTF-8?q?feat(reconcile):=20B=20=E2=80=94=20Cross-Doc-Re?= =?UTF-8?q?conciliation=20(Pflicht=20in=20anderem=20Doc=20erf=C3=BCllt)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ein 'X fehlt'/'zu prüfen'-Finding wird unterdrückt, wenn die Pflicht in einem ANDEREN Snapshot-Dokument erfüllt ist (z.B. § 36 VSBG / OS-Link stehen bei BMW in AGB/'Rechtlicher Hinweis', nicht im Impressum → war False Positive). Konservative Allowlist (impressum: verbraucher_streitbeilegung, odr_link) gegen False-Reconciliation. Verdrahtet in _run_doc_agent (alle Doc-Checks). Frontend: 'In anderem Dokument abgedeckt'-Sektion. Greift voll nach Scan + Legal-Capture. Co-Authored-By: Claude Opus 4.7 --- .../sdk/agent/_components/AgentResultView.tsx | 19 +++ .../compliance/api/snapshot_check_routes.py | 12 +- .../services/cross_doc_reconcile.py | 109 ++++++++++++++++++ .../tests/test_cross_doc_reconcile.py | 51 ++++++++ 4 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 backend-compliance/compliance/services/cross_doc_reconcile.py create mode 100644 backend-compliance/compliance/tests/test_cross_doc_reconcile.py diff --git a/admin-compliance/app/sdk/agent/_components/AgentResultView.tsx b/admin-compliance/app/sdk/agent/_components/AgentResultView.tsx index a1aa4d4a..45ba7611 100644 --- a/admin-compliance/app/sdk/agent/_components/AgentResultView.tsx +++ b/admin-compliance/app/sdk/agent/_components/AgentResultView.tsx @@ -25,8 +25,11 @@ const SEV_ORDER: Record = { const INITIAL_VISIBLE = 12 +type Reconciled = { title?: string; field_id?: string; norm?: string; reconciled_in_label?: string; reconciled_in?: string } + export function AgentResultView({ output }: { output: SlotOutput }) { const [showAll, setShowAll] = useState(false) + const reconciled = (output as { reconciled?: Reconciled[] }).reconciled || [] const sortedFindings = [...output.findings].sort( (a, b) => SEV_ORDER[a.severity] - SEV_ORDER[b.severity], ) @@ -92,6 +95,22 @@ export function AgentResultView({ output }: { output: SlotOutput }) { )} + {reconciled.length > 0 && ( +
+
+ In anderem Dokument abgedeckt ({reconciled.length}) +
+ {reconciled.map((f, i) => ( +
+ ✓ {f.title || f.field_id} + — gefunden in + {f.reconciled_in_label || f.reconciled_in} + {f.norm && · {f.norm}} +
+ ))} +
+ )} + {output.recommendations.length > 0 && (
diff --git a/backend-compliance/compliance/api/snapshot_check_routes.py b/backend-compliance/compliance/api/snapshot_check_routes.py index f0f871c0..be7874f5 100644 --- a/backend-compliance/compliance/api/snapshot_check_routes.py +++ b/backend-compliance/compliance/api/snapshot_check_routes.py @@ -33,7 +33,17 @@ async def _run_doc_agent(snapshot_id: str, doc_type: str, agent_id: str) -> dict return {"findings": [], "recommendations": [], "mc_coverage": [], "notes": f"kein {doc_type}-Text im Snapshot", "confidence": 0.0} out = await REGISTRY.get(agent_id).evaluate(AgentInput(**agent_input)) - return out.model_dump(mode="json") + result = out.model_dump(mode="json") + # B: Cross-Doc-Reconciliation — Pflichten, die in einem ANDEREN Dokument + # erfüllt sind (z.B. § 36 VSBG / OS-Link in AGB/Legal), nicht als Finding + # zeigen. Konservative Allowlist in cross_doc_reconcile. + from compliance.services.cross_doc_reconcile import reconcile_doc_findings + other = [(e.get("doc_type"), e.get("text") or e.get("content") or "") + for e in (snap.get("doc_entries") or []) + if e.get("doc_type") != doc_type + and (e.get("text") or e.get("content"))] + reconcile_doc_findings(result, agent_id, other) + return result finally: db.close() diff --git a/backend-compliance/compliance/services/cross_doc_reconcile.py b/backend-compliance/compliance/services/cross_doc_reconcile.py new file mode 100644 index 00000000..8f37feba --- /dev/null +++ b/backend-compliance/compliance/services/cross_doc_reconcile.py @@ -0,0 +1,109 @@ +"""B — Cross-Doc-Reconciliation. + +Ein „X fehlt"/„zu prüfen"-Finding eines Doc-Agenten wird unterdrückt, wenn die +Pflicht in einem ANDEREN Dokument des Snapshots erfüllt ist (z.B. § 36 VSBG oder +OS-Link stehen bei BMW in AGB/„Rechtlicher Hinweis", nicht im Impressum → unser +Impressum-Finding war ein False Positive). + +KONSERVATIV: nur eine kuratierte Allowlist wirklich cross-doc-fähiger Pflichten +wird abgeglichen — sonst würde z.B. eine E-Mail in der DSE fälschlich die +Impressum-Pflicht „abdecken". Pattern-Quelle = der Check selbst (mcs.py / +CHECKLIST), gegen die Texte der anderen Dokumente gesucht. +""" + +from __future__ import annotations + +import re + +# (agent_id, field_id), die legitim in einem ANDEREN Dokument erfüllt sein können. +_RECONCILABLE: set[tuple[str, str]] = { + ("impressum", "verbraucher_streitbeilegung"), # § 36 VSBG + ("impressum", "odr_link"), # OS-/ODR-Link (524/2013) +} + +_DOC_LABEL = { + "agb": "AGB", "dse": "Datenschutzerklärung", "impressum": "Impressum", + "legal_notice": "Rechtliche Hinweise", "widerruf": "Widerrufsbelehrung", + "nutzungsbedingungen": "Nutzungsbedingungen", "cookie": "Cookie-Richtlinie", +} +_RECONCILED_STATES = {"fail", "possibly_applicable"} + + +def _field_patterns(agent_id: str) -> dict[str, list]: + if agent_id == "impressum": + from compliance.services.specialist_agents.impressum.mcs import MCS + return {mc.field_id: list(mc.patterns) for mc in MCS} + from compliance.services.specialist_agents import REGISTRY + cl = getattr(REGISTRY.get(agent_id), "CHECKLIST", None) or [] + out: dict[str, list] = {} + for c in cl: + pats = [] + for p in c.get("patterns", []): + try: + pats.append(re.compile(p, re.IGNORECASE | re.MULTILINE)) + except re.error: + pass + out[c["id"]] = pats + return out + + +def _field_labels(agent_id: str) -> dict[str, str]: + if agent_id == "impressum": + from compliance.services.specialist_agents.impressum.mcs import MCS + return {mc.field_id: mc.label for mc in MCS} + from compliance.services.specialist_agents import REGISTRY + cl = getattr(REGISTRY.get(agent_id), "CHECKLIST", None) or [] + return {c["id"]: c.get("label", "") for c in cl} + + +def _covered_in(patterns: list, other_docs: list) -> str | None: + for dt, text in other_docs: + if not text: + continue + for p in patterns: + if p.search(text): + return dt + return None + + +def reconcile_doc_findings(result: dict, agent_id: str, other_docs: list) -> dict: + """Mutiert `result` (AgentOutput-JSON): cross-doc-fähige Findings, die in + einem anderen Dokument erfüllt sind, wandern nach `result['reconciled']` + (raus aus den aktiven Findings; passende Coverage-Zeile + Speedometer + angepasst). `other_docs` = [(doc_type, text), …] der ANDEREN Dokumente.""" + findings = result.get("findings") or [] + if not findings or not other_docs: + return result + fp = _field_patterns(agent_id) + labels = _field_labels(agent_id) + active: list = [] + reconciled: list = [] + for f in findings: + key = (agent_id, f.get("field_id")) + if (f.get("status") in _RECONCILED_STATES and key in _RECONCILABLE + and fp.get(f.get("field_id"))): + hit = _covered_in(fp[f["field_id"]], other_docs) + if hit: + f["reconciled_in"] = hit + f["reconciled_in_label"] = _DOC_LABEL.get(hit, hit) + reconciled.append(f) + continue + active.append(f) + if not reconciled: + return result + result["findings"] = active + result["reconciled"] = (result.get("reconciled") or []) + reconciled + cov = result.get("mc_coverage") or [] + sev_key = {"high": "mc_high", "medium": "mc_medium", "low": "mc_low"} + for f in reconciled: + lbl = labels.get(f.get("field_id"), "") + for c in cov: + if lbl and c.get("label") == lbl: + c["status"] = "ok" + c["reason"] = f"in {f['reconciled_in_label']} abgedeckt" + c["found"] = f"→ {f['reconciled_in_label']}" + k = sev_key.get((f.get("severity") or "").lower()) + if k and result.get(k, 0) > 0: + result[k] -= 1 + result["mc_ok"] = result.get("mc_ok", 0) + 1 + return result diff --git a/backend-compliance/compliance/tests/test_cross_doc_reconcile.py b/backend-compliance/compliance/tests/test_cross_doc_reconcile.py new file mode 100644 index 00000000..a22ba221 --- /dev/null +++ b/backend-compliance/compliance/tests/test_cross_doc_reconcile.py @@ -0,0 +1,51 @@ +"""B — Cross-Doc-Reconciliation: cross-doc-fähige Findings unterdrücken, wenn in +einem anderen Dokument erfüllt; Nicht-Allowlist-Felder unberührt lassen.""" + +from __future__ import annotations + +from compliance.services.cross_doc_reconcile import reconcile_doc_findings + + +def test_vsbg_reconciled_from_agb(): + result = { + "findings": [ + {"field_id": "verbraucher_streitbeilegung", + "status": "possibly_applicable", "severity": "LOW", "title": "VSBG"}, + {"field_id": "name_anbieter", "status": "fail", + "severity": "HIGH", "title": "Name fehlt"}, + ], + "mc_coverage": [{"label": "Verbraucher-Streitbeilegung-Hinweis", + "status": "low"}], + "mc_low": 1, "mc_ok": 0, "mc_high": 1, + } + other = [("agb", "BMW wird nicht an einem Streitbeilegungsverfahren vor " + "einer Verbraucherschlichtungsstelle im Sinne des VSBG " + "teilnehmen und ist hierzu auch nicht verpflichtet.")] + reconcile_doc_findings(result, "impressum", other) + assert not any(f["field_id"] == "verbraucher_streitbeilegung" + for f in result["findings"]) + rec = result.get("reconciled") or [] + assert any(f.get("reconciled_in") == "agb" for f in rec) + # nicht-reconcilable name_anbieter bleibt aktiv + assert any(f["field_id"] == "name_anbieter" for f in result["findings"]) + # Coverage-Zeile auf ok, Speedometer angepasst + assert result["mc_coverage"][0]["status"] == "ok" + assert result["mc_ok"] == 1 and result["mc_low"] == 0 + + +def test_no_reconcile_when_absent_elsewhere(): + result = {"findings": [{"field_id": "verbraucher_streitbeilegung", + "status": "possibly_applicable", "severity": "LOW"}], + "mc_coverage": []} + reconcile_doc_findings(result, "impressum", + [("agb", "Text ganz ohne dieses Thema.")]) + assert result["findings"] and not result.get("reconciled") + + +def test_non_allowlisted_field_not_reconciled(): + # name_anbieter ist NICHT cross-doc-fähig → bleibt Finding, auch wenn im AGB. + result = {"findings": [{"field_id": "name_anbieter", "status": "fail", + "severity": "HIGH"}], "mc_coverage": []} + reconcile_doc_findings(result, "impressum", + [("agb", "Bayerische Motoren Werke Aktiengesellschaft")]) + assert result["findings"] and not result.get("reconciled")