feat(reconcile): B — Cross-Doc-Reconciliation (Pflicht in anderem Doc erfüllt)
Ein 'X fehlt'/'zu prüfen'-Finding wird unterdrückt, wenn die Pflicht in einem ANDEREN Snapshot-Dokument erfüllt ist (z.B. § 36 VSBG / OS-Link stehen bei BMW in AGB/'Rechtlicher Hinweis', nicht im Impressum → war False Positive). Konservative Allowlist (impressum: verbraucher_streitbeilegung, odr_link) gegen False-Reconciliation. Verdrahtet in _run_doc_agent (alle Doc-Checks). Frontend: 'In anderem Dokument abgedeckt'-Sektion. Greift voll nach Scan + Legal-Capture. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,109 @@
|
||||
"""B — Cross-Doc-Reconciliation.
|
||||
|
||||
Ein „X fehlt"/„zu prüfen"-Finding eines Doc-Agenten wird unterdrückt, wenn die
|
||||
Pflicht in einem ANDEREN Dokument des Snapshots erfüllt ist (z.B. § 36 VSBG oder
|
||||
OS-Link stehen bei BMW in AGB/„Rechtlicher Hinweis", nicht im Impressum → unser
|
||||
Impressum-Finding war ein False Positive).
|
||||
|
||||
KONSERVATIV: nur eine kuratierte Allowlist wirklich cross-doc-fähiger Pflichten
|
||||
wird abgeglichen — sonst würde z.B. eine E-Mail in der DSE fälschlich die
|
||||
Impressum-Pflicht „abdecken". Pattern-Quelle = der Check selbst (mcs.py /
|
||||
CHECKLIST), gegen die Texte der anderen Dokumente gesucht.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
# (agent_id, field_id), die legitim in einem ANDEREN Dokument erfüllt sein können.
|
||||
_RECONCILABLE: set[tuple[str, str]] = {
|
||||
("impressum", "verbraucher_streitbeilegung"), # § 36 VSBG
|
||||
("impressum", "odr_link"), # OS-/ODR-Link (524/2013)
|
||||
}
|
||||
|
||||
_DOC_LABEL = {
|
||||
"agb": "AGB", "dse": "Datenschutzerklärung", "impressum": "Impressum",
|
||||
"legal_notice": "Rechtliche Hinweise", "widerruf": "Widerrufsbelehrung",
|
||||
"nutzungsbedingungen": "Nutzungsbedingungen", "cookie": "Cookie-Richtlinie",
|
||||
}
|
||||
_RECONCILED_STATES = {"fail", "possibly_applicable"}
|
||||
|
||||
|
||||
def _field_patterns(agent_id: str) -> dict[str, list]:
|
||||
if agent_id == "impressum":
|
||||
from compliance.services.specialist_agents.impressum.mcs import MCS
|
||||
return {mc.field_id: list(mc.patterns) for mc in MCS}
|
||||
from compliance.services.specialist_agents import REGISTRY
|
||||
cl = getattr(REGISTRY.get(agent_id), "CHECKLIST", None) or []
|
||||
out: dict[str, list] = {}
|
||||
for c in cl:
|
||||
pats = []
|
||||
for p in c.get("patterns", []):
|
||||
try:
|
||||
pats.append(re.compile(p, re.IGNORECASE | re.MULTILINE))
|
||||
except re.error:
|
||||
pass
|
||||
out[c["id"]] = pats
|
||||
return out
|
||||
|
||||
|
||||
def _field_labels(agent_id: str) -> dict[str, str]:
|
||||
if agent_id == "impressum":
|
||||
from compliance.services.specialist_agents.impressum.mcs import MCS
|
||||
return {mc.field_id: mc.label for mc in MCS}
|
||||
from compliance.services.specialist_agents import REGISTRY
|
||||
cl = getattr(REGISTRY.get(agent_id), "CHECKLIST", None) or []
|
||||
return {c["id"]: c.get("label", "") for c in cl}
|
||||
|
||||
|
||||
def _covered_in(patterns: list, other_docs: list) -> str | None:
|
||||
for dt, text in other_docs:
|
||||
if not text:
|
||||
continue
|
||||
for p in patterns:
|
||||
if p.search(text):
|
||||
return dt
|
||||
return None
|
||||
|
||||
|
||||
def reconcile_doc_findings(result: dict, agent_id: str, other_docs: list) -> dict:
|
||||
"""Mutiert `result` (AgentOutput-JSON): cross-doc-fähige Findings, die in
|
||||
einem anderen Dokument erfüllt sind, wandern nach `result['reconciled']`
|
||||
(raus aus den aktiven Findings; passende Coverage-Zeile + Speedometer
|
||||
angepasst). `other_docs` = [(doc_type, text), …] der ANDEREN Dokumente."""
|
||||
findings = result.get("findings") or []
|
||||
if not findings or not other_docs:
|
||||
return result
|
||||
fp = _field_patterns(agent_id)
|
||||
labels = _field_labels(agent_id)
|
||||
active: list = []
|
||||
reconciled: list = []
|
||||
for f in findings:
|
||||
key = (agent_id, f.get("field_id"))
|
||||
if (f.get("status") in _RECONCILED_STATES and key in _RECONCILABLE
|
||||
and fp.get(f.get("field_id"))):
|
||||
hit = _covered_in(fp[f["field_id"]], other_docs)
|
||||
if hit:
|
||||
f["reconciled_in"] = hit
|
||||
f["reconciled_in_label"] = _DOC_LABEL.get(hit, hit)
|
||||
reconciled.append(f)
|
||||
continue
|
||||
active.append(f)
|
||||
if not reconciled:
|
||||
return result
|
||||
result["findings"] = active
|
||||
result["reconciled"] = (result.get("reconciled") or []) + reconciled
|
||||
cov = result.get("mc_coverage") or []
|
||||
sev_key = {"high": "mc_high", "medium": "mc_medium", "low": "mc_low"}
|
||||
for f in reconciled:
|
||||
lbl = labels.get(f.get("field_id"), "")
|
||||
for c in cov:
|
||||
if lbl and c.get("label") == lbl:
|
||||
c["status"] = "ok"
|
||||
c["reason"] = f"in {f['reconciled_in_label']} abgedeckt"
|
||||
c["found"] = f"→ {f['reconciled_in_label']}"
|
||||
k = sev_key.get((f.get("severity") or "").lower())
|
||||
if k and result.get(k, 0) > 0:
|
||||
result[k] -= 1
|
||||
result["mc_ok"] = result.get("mc_ok", 0) + 1
|
||||
return result
|
||||
Reference in New Issue
Block a user