e1b270c36e
Sichert die validierte Obligation Discovery Pipeline aus /tmp als dauerhaftes, committetes Tooling (scripts/obligation_discovery/) — der eigentliche Vermögenswert. Stufen: precluster (Embedding-Cache + Mikro-Cluster) → meta_cluster (Review Units, Skalierungs-Fix) → synthesize_obligations (Opus, Key aus ENV, Streaming, harte Tier-Regel, Provenance) → validate_registry → merge_review_diff. Reine Helfer in _core.py, 16 Unit-Tests. Doku docs-src/development/obligation_discovery_pipeline_v1.md mit Meilensteinen (SBOM/Vuln reproduziert, Auth 4408→170 Review Units→54→kuriert 29) und der Architekturregel: Runtime deterministisch, Discovery LLM-gestützt. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
93 lines
3.5 KiB
Python
93 lines
3.5 KiB
Python
"""Unit-Tests für die reinen Helfer der Obligation Discovery Pipeline (scripts/obligation_discovery/_core.py)."""
|
|
import pathlib
|
|
import sys
|
|
|
|
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "scripts" / "obligation_discovery"))
|
|
|
|
from _core import ( # noqa: E402
|
|
centroid, cosine, greedy_cluster, merge_edges, parse_req, validate_registry,
|
|
)
|
|
|
|
|
|
class TestParseReq:
|
|
def test_list_passthrough(self):
|
|
assert parse_req(["a", "b"]) == ["a", "b"]
|
|
|
|
def test_python_repr_string(self):
|
|
assert parse_req("['x', 'y']") == ["x", "y"]
|
|
|
|
def test_json_string(self):
|
|
assert parse_req('["x", "y"]') == ["x", "y"]
|
|
|
|
def test_plain_string(self):
|
|
assert parse_req("just text") == ["just text"]
|
|
|
|
|
|
class TestCosine:
|
|
def test_identical(self):
|
|
assert cosine([1.0, 2.0, 3.0], [1.0, 2.0, 3.0]) > 0.999
|
|
|
|
def test_orthogonal(self):
|
|
assert abs(cosine([1.0, 0.0], [0.0, 1.0])) < 1e-9
|
|
|
|
def test_empty(self):
|
|
assert cosine([], [1.0]) == 0.0
|
|
|
|
|
|
class TestGreedyCluster:
|
|
def test_near_vectors_cluster_far_separate(self):
|
|
vecs = [[1.0, 0.0], [0.99, 0.01], [0.0, 1.0]]
|
|
clusters = greedy_cluster(vecs, 0.9)
|
|
assert len(clusters) == 2
|
|
assert clusters[0]["members"] == [0, 1]
|
|
assert clusters[1]["members"] == [2]
|
|
|
|
def test_deterministic(self):
|
|
vecs = [[1.0, 0.0], [0.5, 0.5], [0.99, 0.0]]
|
|
assert greedy_cluster(vecs, 0.8) == greedy_cluster(vecs, 0.8)
|
|
|
|
def test_none_vector_isolated(self):
|
|
clusters = greedy_cluster([[1.0, 0.0], None], 0.5)
|
|
assert clusters[1]["members"] == [1] and clusters[1]["seed"] is None
|
|
|
|
|
|
class TestCentroid:
|
|
def test_mean(self):
|
|
assert centroid([0, 1], [[0.0, 2.0], [2.0, 4.0]]) == [1.0, 3.0]
|
|
|
|
|
|
class TestValidateRegistry:
|
|
def _reg(self, obls, rels=None):
|
|
return {"obligations": obls, "relationships": rels or []}
|
|
|
|
def test_lm_without_legal_basis_fails(self):
|
|
r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [], "member_controls": ["C1"]}])
|
|
v = validate_registry(r)
|
|
assert v["lm_without_legal_basis"] == ["x"] and v["passed"] is False
|
|
|
|
def test_clean_passes(self):
|
|
r = self._reg([{"id": "x", "tier": "LEGAL_MINIMUM", "legal_basis": [{"source": "CRA"}],
|
|
"member_controls": ["C1"], "provenance": {"source_meta_cluster": "M0"}}])
|
|
assert validate_registry(r)["passed"] is True
|
|
|
|
def test_over8_per_review_unit_flagged(self):
|
|
obls = [{"id": f"o{i}", "tier": "BEST_PRACTICE", "member_controls": ["C"],
|
|
"provenance": {"source_meta_cluster": "M0"}} for i in range(9)]
|
|
v = validate_registry(self._reg(obls))
|
|
assert v["over8_per_review_unit"] == {"M0": 9} and v["passed"] is False
|
|
|
|
def test_empty_member_controls_flagged(self):
|
|
v = validate_registry(self._reg([{"id": "x", "tier": "BEST_PRACTICE", "member_controls": []}]))
|
|
assert v["empty_member_controls"] == ["x"] and v["passed"] is False
|
|
|
|
|
|
class TestMergeEdges:
|
|
def test_dedup_and_semantic_only(self):
|
|
existing = [{"type": "supports", "from": "a", "to": "b"}]
|
|
proposed = [{"type": "supports", "from": "a", "to": "b"}, # dup
|
|
{"type": "depends_on", "from": "c", "to": "d"}, # new
|
|
{"type": "out_of_scope", "clusters": [1]}] # not semantic
|
|
merged, added = merge_edges(existing, proposed)
|
|
assert added == 1
|
|
assert {"type": "depends_on", "from": "c", "to": "d"} in merged
|