Files
breakpilot-compliance/scripts/obligation_discovery/cross_domain_pairs.py
T
Benjamin Admin 01956ee690 feat: cross-domain relationship discovery — Capability-Schicht-Entwurf (CRA P1)
Stufe 1+2 der Ontologie-Entdeckung (User-Schaerfung #54): nicht Aehnlichkeit sondern
STRUKTURELLE Beziehung. 93 Obligations -> BGE-M3 -> 101 cross-family Paare -> Opus
klassifiziert in 8 Kategorien (genau eine je Paar).
- scripts/obligation_discovery/cross_domain_pairs.py (Stufe 1, key-frei)
- scripts/obligation_discovery/classify_relationships.py (Stufe 2, Opus)
- obligations/cross_domain_relationships.json: 16 SHARED_CAPABILITY -> 8 Capabilities
  (mfa/session/transport-tls/code_signing/anomaly_detection), 23 SUPPORTED_BY
  (Hubs: vuln_identification_inventory<-SBOM-Familie 5x, vuln_remediation_patching 5x),
  1 SAME_OBLIGATION (vuln_remediation_patching == provide_security_updates, MERGE-Kandidat),
  42 OVERLAP_ONLY sauber verworfen. Erstentwurf der Capability-Schicht (Phase 4).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 19:12:17 +02:00

67 lines
2.7 KiB
Python

"""Cross-Domain Relationship Discovery — Stufe 1 (key-frei, im bp-compliance-backend-Container).
Alle Obligations mehrerer Registries -> BGE-M3-Embedding -> je Obligation Top-K Nachbarn ->
Kandidaten-Paare (cross- UND same-family) >= min-sim. KEIN Urteil hier — nur Kandidaten.
Stufe 2 (classify_relationships.py) klassifiziert die Beziehung per Opus.
python3 cross_domain_pairs.py /tmp/reg/cra.json /tmp/reg/cra_authentication.json ... \
--top-k 8 --min-sim 0.60 --out /tmp/cd_pairs.json
"""
from __future__ import annotations
import argparse
import asyncio
import json
from _core import cosine
async def run(paths: list[str], top_k: int, min_sim: float, out: str) -> None:
from compliance.services.mc_embedding_matcher import _embed_texts
obls: list[dict] = []
for p in paths:
reg = json.load(open(p, encoding="utf-8"))
fam = reg.get("family", "")
for o in reg.get("obligations", []):
obls.append({"id": o["id"], "family": o.get("family", "") or fam,
"tier": o.get("tier", ""), "name": o.get("name", ""),
"desc": o.get("description", "")})
vecs = await _embed_texts([f'{o["name"]}. {o["desc"]}' for o in obls])
n = len(obls)
print(f"obligations={n}")
best: dict[tuple[int, int], float] = {}
for i in range(n):
nbrs = sorted(((cosine(vecs[i], vecs[j]), j) for j in range(n) if j != i), reverse=True)[:top_k]
for s, j in nbrs:
if s < min_sim:
continue
a, b = sorted((i, j))
if (a, b) not in best or s > best[(a, b)]:
best[(a, b)] = s
pairs = []
for (a, b), s in sorted(best.items(), key=lambda x: -x[1]):
pairs.append({
"a": obls[a]["id"], "fa": obls[a]["family"], "ta": obls[a]["tier"], "da": obls[a]["desc"][:220],
"b": obls[b]["id"], "fb": obls[b]["family"], "tb": obls[b]["tier"], "db": obls[b]["desc"][:220],
"sim": round(s, 3), "cross_family": obls[a]["family"] != obls[b]["family"]})
cf = sum(1 for p in pairs if p["cross_family"])
json.dump({"n_obligations": n, "n_pairs": len(pairs), "cross_family": cf, "pairs": pairs},
open(out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print(f"pairs={len(pairs)} (cross-family={cf}, same-family={len(pairs) - cf}) written: {out}")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("registries", nargs="+")
ap.add_argument("--top-k", type=int, default=8)
ap.add_argument("--min-sim", type=float, default=0.60)
ap.add_argument("--out", default="/tmp/cd_pairs.json")
a = ap.parse_args()
asyncio.run(run(a.registries, a.top_k, a.min_sim, a.out))
if __name__ == "__main__":
main()