Mark recall-limited obligations in DSE shadow telemetry
Trennt im Shadow drei Kategorien statt eines pauschalen FAILED: - echte Lücke (failed_by_current_checker) - redundanter Control-FP (kollabiert per OR zu MET) - Prüfer-Reichweitenproblem (recall_limited) obligation_taxonomy.py: decision_method_required=LLM für recipients_disclosed, third_country_transfer_disclosed, safeguards_disclosed, safeguards_accessible (versioniertes Registry-Artefakt bis DB-Tabelle, v1-Spec). Empirisch: TeamViewer 0/22 kw+emb trotz erfüllter Pflicht (cos 0.49-0.57) → CONTENT/LLM-Klasse, kein Schwellen-Fix. compute_obligation_shadow segregiert FAILED/PARTIAL über requires_llm(): teamviewer 5 Findings → 2 echte + 3 recall_limited. 9 neue Unit-Tests (41 gesamt grün). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,26 @@
|
|||||||
|
"""Obligation-Taxonomie-Registry — versioniertes Artefakt bis zur DB-Owner-Tabelle
|
||||||
|
(Legal Obligation Layer v1, docs-src/development/legal_obligation_layer_v1.md).
|
||||||
|
|
||||||
|
Hält Metadaten auf OBLIGATION-Ebene, die (noch) keine eigene DB-Tabelle haben.
|
||||||
|
|
||||||
|
`decision_method_required`: Obligations, deren Erkennung Keyword/Embedding
|
||||||
|
NACHWEISLICH nicht zuverlässig leistet (kompakte/synonymreiche Offenlegung) und
|
||||||
|
die CONTENT/LLM brauchen. Empirisch belegt am TeamViewer-Recall-Defekt: 0/22
|
||||||
|
recipients+international_transfer Controls trafen, obwohl die Pflicht erfüllt war
|
||||||
|
(„…außerhalb EU/EWR … Standardvertragsklauseln/Schutzmaßnahmen"); Embedding cos
|
||||||
|
0.49–0.57 < 0.62, teils falscher Chunk → kein Schwellen-Fix, sondern LLM-Klasse.
|
||||||
|
|
||||||
|
Wirkung: der Shadow zählt ein FAILED solcher Obligations NICHT als „echte Lücke",
|
||||||
|
sondern als RECALL_LIMITED (Prüfer kann sie mit aktueller Methode nicht verifizieren).
|
||||||
|
"""
|
||||||
|
OBLIGATION_META: dict[str, dict] = {
|
||||||
|
"recipients_disclosed": {"decision_method_required": "LLM"},
|
||||||
|
"third_country_transfer_disclosed": {"decision_method_required": "LLM"},
|
||||||
|
"safeguards_disclosed": {"decision_method_required": "LLM"},
|
||||||
|
"safeguards_accessible": {"decision_method_required": "LLM"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def requires_llm(obligation_id: str) -> bool:
|
||||||
|
"""True, wenn diese Obligation CONTENT/LLM braucht (Keyword/Embedding-Recall belegt unzureichend)."""
|
||||||
|
return OBLIGATION_META.get(obligation_id, {}).get("decision_method_required") == "LLM"
|
||||||
@@ -59,6 +59,7 @@ def compute_obligation_shadow(results: list[dict], text: str,
|
|||||||
FAILED, LM, NA, PARTIAL, CriterionEval, aggregate_obligations,
|
FAILED, LM, NA, PARTIAL, CriterionEval, aggregate_obligations,
|
||||||
)
|
)
|
||||||
from compliance.services.obligation_applicability import applicable
|
from compliance.services.obligation_applicability import applicable
|
||||||
|
from compliance.services.obligation_taxonomy import requires_llm
|
||||||
|
|
||||||
legacy = 0
|
legacy = 0
|
||||||
evals: list[Any] = []
|
evals: list[Any] = []
|
||||||
@@ -78,20 +79,34 @@ def compute_obligation_shadow(results: list[dict], text: str,
|
|||||||
return {"status": "no obligation markers on result controls"}
|
return {"status": "no obligation markers on result controls"}
|
||||||
|
|
||||||
obls = aggregate_obligations(evals, applicable_fn=applicable, doc_text=text)
|
obls = aggregate_obligations(evals, applicable_fn=applicable, doc_text=text)
|
||||||
findings = sum(1 for o in obls if o.status in (FAILED, PARTIAL))
|
# FAILED/PARTIAL ehrlich trennen: echte Lücke (failed_by_current_checker) vs
|
||||||
na = sum(1 for o in obls if o.status == NA)
|
# RECALL_LIMITED (Obligation braucht LLM, aktueller Prüfer kann sie nicht verifizieren).
|
||||||
|
findings = failed_current = recall_limited = na = 0
|
||||||
|
for o in obls:
|
||||||
|
if o.status == NA:
|
||||||
|
na += 1
|
||||||
|
elif o.status in (FAILED, PARTIAL):
|
||||||
|
findings += 1
|
||||||
|
if requires_llm(o.obligation_id):
|
||||||
|
recall_limited += 1
|
||||||
|
else:
|
||||||
|
failed_current += 1
|
||||||
top = []
|
top = []
|
||||||
for o in obls:
|
for o in obls:
|
||||||
cs = contrib.get(o.obligation_id, [])
|
cs = contrib.get(o.obligation_id, [])
|
||||||
fehlt = sum(1 for _, p in cs if not p)
|
fehlt = sum(1 for _, p in cs if not p)
|
||||||
if fehlt >= 2:
|
if fehlt >= 2:
|
||||||
top.append({"obligation": o.obligation_id, "fehlt": fehlt,
|
top.append({"obligation": o.obligation_id, "fehlt": fehlt,
|
||||||
"total": len(cs), "status": o.status})
|
"total": len(cs), "status": o.status,
|
||||||
|
"recall_limited": bool(requires_llm(o.obligation_id)
|
||||||
|
and o.status in (FAILED, PARTIAL))})
|
||||||
top.sort(key=lambda x: -x["fehlt"])
|
top.sort(key=lambda x: -x["fehlt"])
|
||||||
return {
|
return {
|
||||||
"legacy_control_findings": legacy,
|
"legacy_control_findings": legacy,
|
||||||
"obligation_shadow_results": len(obls),
|
"obligation_shadow_results": len(obls),
|
||||||
"obligation_findings": findings,
|
"obligation_findings": findings,
|
||||||
|
"failed_by_current_checker": failed_current,
|
||||||
|
"recall_limited": recall_limited,
|
||||||
"collapse_factor": round(legacy / findings, 2) if findings else None,
|
"collapse_factor": round(legacy / findings, 2) if findings else None,
|
||||||
"na_count": na,
|
"na_count": na,
|
||||||
"met_failed_delta": legacy - findings,
|
"met_failed_delta": legacy - findings,
|
||||||
|
|||||||
@@ -3,6 +3,9 @@ from compliance.services.specialist_agents.dse._obligation_shadow import (
|
|||||||
compute_obligation_shadow,
|
compute_obligation_shadow,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
NON_LLM = "art20_right_exists_core" # nicht in der LLM_REQUIRED-Registry
|
||||||
|
LLM_REQ = "third_country_transfer_disclosed" # in der LLM_REQUIRED-Registry
|
||||||
|
|
||||||
|
|
||||||
def _markers(n, ob, cond=None):
|
def _markers(n, ob, cond=None):
|
||||||
return {f"C{i}": {"obl": [ob], "cond": cond} for i in range(n)}
|
return {f"C{i}": {"obl": [ob], "cond": cond} for i in range(n)}
|
||||||
@@ -11,42 +14,59 @@ def _markers(n, ob, cond=None):
|
|||||||
class TestComputeShadow:
|
class TestComputeShadow:
|
||||||
def test_collapse_and_delta(self):
|
def test_collapse_and_delta(self):
|
||||||
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
|
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
|
||||||
s = compute_obligation_shadow(results, "x", _markers(5, "recipients_disclosed"))
|
s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM))
|
||||||
assert s["legacy_control_findings"] == 5
|
assert s["legacy_control_findings"] == 5
|
||||||
assert s["obligation_findings"] == 1 # 5 → 1
|
assert s["obligation_findings"] == 1 # 5 → 1
|
||||||
|
assert s["failed_by_current_checker"] == 1
|
||||||
|
assert s["recall_limited"] == 0
|
||||||
assert s["collapse_factor"] == 5.0
|
assert s["collapse_factor"] == 5.0
|
||||||
assert s["met_failed_delta"] == 4
|
assert s["met_failed_delta"] == 4
|
||||||
top = s["top_collapsed_obligations"][0]
|
top = s["top_collapsed_obligations"][0]
|
||||||
assert top["obligation"] == "recipients_disclosed" and top["fehlt"] == 5
|
assert top["obligation"] == NON_LLM and top["fehlt"] == 5
|
||||||
|
assert top["recall_limited"] is False
|
||||||
|
|
||||||
def test_fp_correction_one_passed_collapses_to_met(self):
|
def test_fp_correction_one_passed_collapses_to_met(self):
|
||||||
results = [{"control_id": f"C{i}", "passed": i == 0} for i in range(5)]
|
results = [{"control_id": f"C{i}", "passed": i == 0} for i in range(5)]
|
||||||
s = compute_obligation_shadow(results, "x", _markers(5, "recipients_disclosed"))
|
s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM))
|
||||||
assert s["legacy_control_findings"] == 4
|
assert s["legacy_control_findings"] == 4
|
||||||
assert s["obligation_findings"] == 0 # MET (anderswo erfüllt)
|
assert s["obligation_findings"] == 0 # MET (anderswo erfüllt)
|
||||||
assert s["met_failed_delta"] == 4
|
assert s["met_failed_delta"] == 4
|
||||||
|
|
||||||
def test_na_when_predicate_false(self):
|
def test_na_when_predicate_false(self):
|
||||||
results = [{"control_id": "C0", "passed": False}]
|
results = [{"control_id": "C0", "passed": False}]
|
||||||
m = {"C0": {"obl": ["third_country_transfer_disclosed"],
|
m = {"C0": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}}
|
||||||
"cond": "has_third_country_transfer"}}
|
|
||||||
s = compute_obligation_shadow(results, "nur innerhalb der eu", m)
|
s = compute_obligation_shadow(results, "nur innerhalb der eu", m)
|
||||||
assert s["na_count"] == 1
|
assert s["na_count"] == 1
|
||||||
assert s["obligation_findings"] == 0 # NA statt FEHLT
|
assert s["obligation_findings"] == 0 # NA statt FEHLT
|
||||||
|
|
||||||
def test_na_predicate_true_keeps_finding(self):
|
|
||||||
results = [{"control_id": "C0", "passed": False}]
|
|
||||||
m = {"C0": {"obl": ["third_country_transfer_disclosed"],
|
|
||||||
"cond": "has_third_country_transfer"}}
|
|
||||||
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
|
|
||||||
assert s["na_count"] == 0
|
|
||||||
assert s["obligation_findings"] == 1
|
|
||||||
|
|
||||||
def test_no_markers_returns_status(self):
|
def test_no_markers_returns_status(self):
|
||||||
s = compute_obligation_shadow([{"control_id": "C0", "passed": False}], "x", {})
|
s = compute_obligation_shadow([{"control_id": "C0", "passed": False}], "x", {})
|
||||||
assert "no obligation" in s["status"]
|
assert "no obligation" in s["status"]
|
||||||
|
|
||||||
def test_does_not_mutate_results(self):
|
def test_does_not_mutate_results(self):
|
||||||
results = [{"control_id": "C0", "passed": False}]
|
results = [{"control_id": "C0", "passed": False}]
|
||||||
compute_obligation_shadow(results, "x", _markers(1, "recipients_disclosed"))
|
compute_obligation_shadow(results, "x", _markers(1, NON_LLM))
|
||||||
assert set(results[0].keys()) == {"control_id", "passed"} # unverändert
|
assert set(results[0].keys()) == {"control_id", "passed"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestRecallSegregation:
|
||||||
|
def test_llm_required_failed_is_recall_limited_not_real_gap(self):
|
||||||
|
# 5 verfehlte third_country-Controls, Transfer-Text vorhanden → FAILED,
|
||||||
|
# aber LLM_REQUIRED → RECALL_LIMITED, NICHT failed_by_current_checker.
|
||||||
|
results = [{"control_id": f"C{i}", "passed": False} for i in range(5)]
|
||||||
|
m = {f"C{i}": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}
|
||||||
|
for i in range(5)}
|
||||||
|
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
|
||||||
|
assert s["obligation_findings"] == 1
|
||||||
|
assert s["recall_limited"] == 1
|
||||||
|
assert s["failed_by_current_checker"] == 0
|
||||||
|
assert s["top_collapsed_obligations"][0]["recall_limited"] is True
|
||||||
|
|
||||||
|
def test_mixed_real_gap_and_recall_limited(self):
|
||||||
|
results = [{"control_id": "A", "passed": False}, {"control_id": "B", "passed": False}]
|
||||||
|
m = {"A": {"obl": [NON_LLM], "cond": None},
|
||||||
|
"B": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}}
|
||||||
|
s = compute_obligation_shadow(results, "übermittlung in ein drittland", m)
|
||||||
|
assert s["obligation_findings"] == 2
|
||||||
|
assert s["failed_by_current_checker"] == 1
|
||||||
|
assert s["recall_limited"] == 1
|
||||||
|
|||||||
@@ -0,0 +1,20 @@
|
|||||||
|
"""Unit-Tests für die Obligation-Taxonomie-Registry (decision_method_required)."""
|
||||||
|
from compliance.services.obligation_taxonomy import OBLIGATION_META, requires_llm
|
||||||
|
|
||||||
|
|
||||||
|
class TestRequiresLlm:
|
||||||
|
def test_marked_obligations_require_llm(self):
|
||||||
|
for ob in ("recipients_disclosed", "third_country_transfer_disclosed",
|
||||||
|
"safeguards_disclosed", "safeguards_accessible"):
|
||||||
|
assert requires_llm(ob) is True
|
||||||
|
|
||||||
|
def test_unmarked_obligation_does_not(self):
|
||||||
|
assert requires_llm("art20_right_exists_core") is False
|
||||||
|
assert requires_llm("objection_general_art21_1") is False
|
||||||
|
|
||||||
|
def test_unknown_obligation_is_false(self):
|
||||||
|
assert requires_llm("does_not_exist") is False
|
||||||
|
|
||||||
|
def test_registry_values_are_llm(self):
|
||||||
|
assert all(v.get("decision_method_required") == "LLM"
|
||||||
|
for v in OBLIGATION_META.values())
|
||||||
Reference in New Issue
Block a user