diff --git a/backend-compliance/compliance/services/obligation_taxonomy.py b/backend-compliance/compliance/services/obligation_taxonomy.py new file mode 100644 index 00000000..dc003ec6 --- /dev/null +++ b/backend-compliance/compliance/services/obligation_taxonomy.py @@ -0,0 +1,26 @@ +"""Obligation-Taxonomie-Registry — versioniertes Artefakt bis zur DB-Owner-Tabelle +(Legal Obligation Layer v1, docs-src/development/legal_obligation_layer_v1.md). + +Hält Metadaten auf OBLIGATION-Ebene, die (noch) keine eigene DB-Tabelle haben. + +`decision_method_required`: Obligations, deren Erkennung Keyword/Embedding +NACHWEISLICH nicht zuverlässig leistet (kompakte/synonymreiche Offenlegung) und +die CONTENT/LLM brauchen. Empirisch belegt am TeamViewer-Recall-Defekt: 0/22 +recipients+international_transfer Controls trafen, obwohl die Pflicht erfüllt war +(„…außerhalb EU/EWR … Standardvertragsklauseln/Schutzmaßnahmen"); Embedding cos +0.49–0.57 < 0.62, teils falscher Chunk → kein Schwellen-Fix, sondern LLM-Klasse. + +Wirkung: der Shadow zählt ein FAILED solcher Obligations NICHT als „echte Lücke", +sondern als RECALL_LIMITED (Prüfer kann sie mit aktueller Methode nicht verifizieren). +""" +OBLIGATION_META: dict[str, dict] = { + "recipients_disclosed": {"decision_method_required": "LLM"}, + "third_country_transfer_disclosed": {"decision_method_required": "LLM"}, + "safeguards_disclosed": {"decision_method_required": "LLM"}, + "safeguards_accessible": {"decision_method_required": "LLM"}, +} + + +def requires_llm(obligation_id: str) -> bool: + """True, wenn diese Obligation CONTENT/LLM braucht (Keyword/Embedding-Recall belegt unzureichend).""" + return OBLIGATION_META.get(obligation_id, {}).get("decision_method_required") == "LLM" diff --git a/backend-compliance/compliance/services/specialist_agents/dse/_obligation_shadow.py b/backend-compliance/compliance/services/specialist_agents/dse/_obligation_shadow.py index 793d46a7..5f5b1caf 100644 --- a/backend-compliance/compliance/services/specialist_agents/dse/_obligation_shadow.py +++ b/backend-compliance/compliance/services/specialist_agents/dse/_obligation_shadow.py @@ -59,6 +59,7 @@ def compute_obligation_shadow(results: list[dict], text: str, FAILED, LM, NA, PARTIAL, CriterionEval, aggregate_obligations, ) from compliance.services.obligation_applicability import applicable + from compliance.services.obligation_taxonomy import requires_llm legacy = 0 evals: list[Any] = [] @@ -78,20 +79,34 @@ def compute_obligation_shadow(results: list[dict], text: str, return {"status": "no obligation markers on result controls"} obls = aggregate_obligations(evals, applicable_fn=applicable, doc_text=text) - findings = sum(1 for o in obls if o.status in (FAILED, PARTIAL)) - na = sum(1 for o in obls if o.status == NA) + # FAILED/PARTIAL ehrlich trennen: echte Lücke (failed_by_current_checker) vs + # RECALL_LIMITED (Obligation braucht LLM, aktueller Prüfer kann sie nicht verifizieren). + findings = failed_current = recall_limited = na = 0 + for o in obls: + if o.status == NA: + na += 1 + elif o.status in (FAILED, PARTIAL): + findings += 1 + if requires_llm(o.obligation_id): + recall_limited += 1 + else: + failed_current += 1 top = [] for o in obls: cs = contrib.get(o.obligation_id, []) fehlt = sum(1 for _, p in cs if not p) if fehlt >= 2: top.append({"obligation": o.obligation_id, "fehlt": fehlt, - "total": len(cs), "status": o.status}) + "total": len(cs), "status": o.status, + "recall_limited": bool(requires_llm(o.obligation_id) + and o.status in (FAILED, PARTIAL))}) top.sort(key=lambda x: -x["fehlt"]) return { "legacy_control_findings": legacy, "obligation_shadow_results": len(obls), "obligation_findings": findings, + "failed_by_current_checker": failed_current, + "recall_limited": recall_limited, "collapse_factor": round(legacy / findings, 2) if findings else None, "na_count": na, "met_failed_delta": legacy - findings, diff --git a/backend-compliance/tests/test_obligation_shadow.py b/backend-compliance/tests/test_obligation_shadow.py index ad6a7737..0a1bdb9b 100644 --- a/backend-compliance/tests/test_obligation_shadow.py +++ b/backend-compliance/tests/test_obligation_shadow.py @@ -3,6 +3,9 @@ from compliance.services.specialist_agents.dse._obligation_shadow import ( compute_obligation_shadow, ) +NON_LLM = "art20_right_exists_core" # nicht in der LLM_REQUIRED-Registry +LLM_REQ = "third_country_transfer_disclosed" # in der LLM_REQUIRED-Registry + def _markers(n, ob, cond=None): return {f"C{i}": {"obl": [ob], "cond": cond} for i in range(n)} @@ -11,42 +14,59 @@ def _markers(n, ob, cond=None): class TestComputeShadow: def test_collapse_and_delta(self): results = [{"control_id": f"C{i}", "passed": False} for i in range(5)] - s = compute_obligation_shadow(results, "x", _markers(5, "recipients_disclosed")) + s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM)) assert s["legacy_control_findings"] == 5 assert s["obligation_findings"] == 1 # 5 → 1 + assert s["failed_by_current_checker"] == 1 + assert s["recall_limited"] == 0 assert s["collapse_factor"] == 5.0 assert s["met_failed_delta"] == 4 top = s["top_collapsed_obligations"][0] - assert top["obligation"] == "recipients_disclosed" and top["fehlt"] == 5 + assert top["obligation"] == NON_LLM and top["fehlt"] == 5 + assert top["recall_limited"] is False def test_fp_correction_one_passed_collapses_to_met(self): results = [{"control_id": f"C{i}", "passed": i == 0} for i in range(5)] - s = compute_obligation_shadow(results, "x", _markers(5, "recipients_disclosed")) + s = compute_obligation_shadow(results, "x", _markers(5, NON_LLM)) assert s["legacy_control_findings"] == 4 assert s["obligation_findings"] == 0 # MET (anderswo erfüllt) assert s["met_failed_delta"] == 4 def test_na_when_predicate_false(self): results = [{"control_id": "C0", "passed": False}] - m = {"C0": {"obl": ["third_country_transfer_disclosed"], - "cond": "has_third_country_transfer"}} + m = {"C0": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}} s = compute_obligation_shadow(results, "nur innerhalb der eu", m) assert s["na_count"] == 1 assert s["obligation_findings"] == 0 # NA statt FEHLT - def test_na_predicate_true_keeps_finding(self): - results = [{"control_id": "C0", "passed": False}] - m = {"C0": {"obl": ["third_country_transfer_disclosed"], - "cond": "has_third_country_transfer"}} - s = compute_obligation_shadow(results, "übermittlung in ein drittland", m) - assert s["na_count"] == 0 - assert s["obligation_findings"] == 1 - def test_no_markers_returns_status(self): s = compute_obligation_shadow([{"control_id": "C0", "passed": False}], "x", {}) assert "no obligation" in s["status"] def test_does_not_mutate_results(self): results = [{"control_id": "C0", "passed": False}] - compute_obligation_shadow(results, "x", _markers(1, "recipients_disclosed")) - assert set(results[0].keys()) == {"control_id", "passed"} # unverändert + compute_obligation_shadow(results, "x", _markers(1, NON_LLM)) + assert set(results[0].keys()) == {"control_id", "passed"} + + +class TestRecallSegregation: + def test_llm_required_failed_is_recall_limited_not_real_gap(self): + # 5 verfehlte third_country-Controls, Transfer-Text vorhanden → FAILED, + # aber LLM_REQUIRED → RECALL_LIMITED, NICHT failed_by_current_checker. + results = [{"control_id": f"C{i}", "passed": False} for i in range(5)] + m = {f"C{i}": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"} + for i in range(5)} + s = compute_obligation_shadow(results, "übermittlung in ein drittland", m) + assert s["obligation_findings"] == 1 + assert s["recall_limited"] == 1 + assert s["failed_by_current_checker"] == 0 + assert s["top_collapsed_obligations"][0]["recall_limited"] is True + + def test_mixed_real_gap_and_recall_limited(self): + results = [{"control_id": "A", "passed": False}, {"control_id": "B", "passed": False}] + m = {"A": {"obl": [NON_LLM], "cond": None}, + "B": {"obl": [LLM_REQ], "cond": "has_third_country_transfer"}} + s = compute_obligation_shadow(results, "übermittlung in ein drittland", m) + assert s["obligation_findings"] == 2 + assert s["failed_by_current_checker"] == 1 + assert s["recall_limited"] == 1 diff --git a/backend-compliance/tests/test_obligation_taxonomy.py b/backend-compliance/tests/test_obligation_taxonomy.py new file mode 100644 index 00000000..98558771 --- /dev/null +++ b/backend-compliance/tests/test_obligation_taxonomy.py @@ -0,0 +1,20 @@ +"""Unit-Tests für die Obligation-Taxonomie-Registry (decision_method_required).""" +from compliance.services.obligation_taxonomy import OBLIGATION_META, requires_llm + + +class TestRequiresLlm: + def test_marked_obligations_require_llm(self): + for ob in ("recipients_disclosed", "third_country_transfer_disclosed", + "safeguards_disclosed", "safeguards_accessible"): + assert requires_llm(ob) is True + + def test_unmarked_obligation_does_not(self): + assert requires_llm("art20_right_exists_core") is False + assert requires_llm("objection_general_art21_1") is False + + def test_unknown_obligation_is_false(self): + assert requires_llm("does_not_exist") is False + + def test_registry_values_are_llm(self): + assert all(v.get("decision_method_required") == "LLM" + for v in OBLIGATION_META.values())