feat: cross-domain relationship discovery — Capability-Schicht-Entwurf (CRA P1)
Stufe 1+2 der Ontologie-Entdeckung (User-Schaerfung #54): nicht Aehnlichkeit sondern STRUKTURELLE Beziehung. 93 Obligations -> BGE-M3 -> 101 cross-family Paare -> Opus klassifiziert in 8 Kategorien (genau eine je Paar). - scripts/obligation_discovery/cross_domain_pairs.py (Stufe 1, key-frei) - scripts/obligation_discovery/classify_relationships.py (Stufe 2, Opus) - obligations/cross_domain_relationships.json: 16 SHARED_CAPABILITY -> 8 Capabilities (mfa/session/transport-tls/code_signing/anomaly_detection), 23 SUPPORTED_BY (Hubs: vuln_identification_inventory<-SBOM-Familie 5x, vuln_remediation_patching 5x), 1 SAME_OBLIGATION (vuln_remediation_patching == provide_security_updates, MERGE-Kandidat), 42 OVERLAP_ONLY sauber verworfen. Erstentwurf der Capability-Schicht (Phase 4). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,85 @@
|
|||||||
|
"""Cross-Domain Relationship Discovery — Stufe 2: Opus klassifiziert jede Kandidaten-Beziehung
|
||||||
|
in GENAU EINE Kategorie. Liefert das Rohmaterial der Compliance-Ontologie (insb. SHARED_CAPABILITY
|
||||||
|
= Capability-Schicht). ANTHROPIC_API_KEY aus ENV (nie hartcodiert). Streaming.
|
||||||
|
|
||||||
|
ANTHROPIC_API_KEY=… python3 classify_relationships.py --pairs /tmp/cd_pairs.json \
|
||||||
|
--only-cross-family --out /tmp/cd_classified.json
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
|
SYS = """Du bist Compliance-Ontologe. Gegeben Paare von Legal Obligations (CRA), bestimme fuer
|
||||||
|
JEDES Paar GENAU EINE Beziehung. Ziel ist NICHT Aehnlichkeit, sondern die STRUKTURELLE Beziehung.
|
||||||
|
|
||||||
|
Kategorien (genau EINE; bei Mehrdeutigkeit gilt diese Prioritaet):
|
||||||
|
1 SAME_OBLIGATION — dieselbe rechtliche Pflicht, nur pro Domaene anders formuliert -> MERGE-Kandidat.
|
||||||
|
2 SUPPORTED_BY — A ist domaenenspezifische Auspraegung/Teilfall von B ODER A traegt zur Erfuellung von B bei. RICHTUNG angeben.
|
||||||
|
3 SHARED_CAPABILITY — beide werden durch DIESELBE technische Faehigkeit erfuellt (z.B. MFA, TLS-Verschluesselung, digitale Signatur, Session-Management, Patch-Management, Logging-Pipeline). capability_name (snake_case) angeben.
|
||||||
|
4 SHARED_PROCEDURE — beide ueber denselben operativen Prozess erfuellt, ohne gemeinsames technisches Artefakt.
|
||||||
|
5 SHARED_EVIDENCE — beide erzeugen/nutzen denselben Nachweis (Audit-Log, SBOM, Release Notes). evidence_name angeben.
|
||||||
|
6 SHARED_GUIDANCE — beide berufen sich auf denselben externen Standard (NIST/OWASP/ISO), sonst distinkt.
|
||||||
|
7 OVERLAP_ONLY — nur oberflaechliche Wort-/Themenueberlappung, keine echte strukturelle Beziehung.
|
||||||
|
8 UNRELATED — Falsch-Positiv der Embedding-Naehe.
|
||||||
|
|
||||||
|
Gib AUSSCHLIESSLICH JSON aus:
|
||||||
|
{"results":[{"i":0,"relation":"SHARED_CAPABILITY","direction":"a->b|b->a|none","capability_name":"","evidence_name":"","reason":"max 18 Woerter"}]}
|
||||||
|
Regeln: relation = genau eine der 8 Strings. direction nur bei SUPPORTED_BY, sonst "none".
|
||||||
|
capability_name NUR bei SHARED_CAPABILITY (sonst ""), evidence_name NUR bei SHARED_EVIDENCE (sonst "").
|
||||||
|
Sei streng: SHARED_GUIDANCE/OVERLAP_ONLY/UNRELATED grosszuegig nutzen; SAME_OBLIGATION nur bei echter Deckungsgleichheit.
|
||||||
|
Gib fuer JEDES Paar (per Index i) genau ein Ergebnis."""
|
||||||
|
|
||||||
|
|
||||||
|
def build_user(pairs: list[dict]) -> str:
|
||||||
|
lines = []
|
||||||
|
for i, p in enumerate(pairs):
|
||||||
|
lines.append(f'[{i}] A={p["a"]} ({p["fa"]}/{p["ta"]}): {p["da"]}\n'
|
||||||
|
f' B={p["b"]} ({p["fb"]}/{p["tb"]}): {p["db"]} [sim={p["sim"]}]')
|
||||||
|
return "Paare:\n" + "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--pairs", required=True)
|
||||||
|
ap.add_argument("--only-cross-family", action="store_true")
|
||||||
|
ap.add_argument("--min-sim", type=float, default=0.0)
|
||||||
|
ap.add_argument("--model", default="claude-opus-4-8")
|
||||||
|
ap.add_argument("--out", required=True)
|
||||||
|
a = ap.parse_args()
|
||||||
|
d = json.load(open(a.pairs, encoding="utf-8"))
|
||||||
|
pairs = [p for p in d["pairs"]
|
||||||
|
if (not a.only_cross_family or p["cross_family"]) and p["sim"] >= a.min_sim]
|
||||||
|
|
||||||
|
import anthropic
|
||||||
|
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
|
||||||
|
with client.messages.stream(model=a.model, max_tokens=24000, system=SYS,
|
||||||
|
messages=[{"role": "user", "content": build_user(pairs)}]) as st:
|
||||||
|
msg = st.get_final_message()
|
||||||
|
txt = msg.content[0].text
|
||||||
|
m = re.search(r"\{.*\}", txt, re.DOTALL)
|
||||||
|
data = json.loads(m.group(0) if m else txt)
|
||||||
|
|
||||||
|
res = []
|
||||||
|
for r in data.get("results", []):
|
||||||
|
i = r.get("i")
|
||||||
|
if not isinstance(i, int) or i < 0 or i >= len(pairs):
|
||||||
|
continue
|
||||||
|
p = pairs[i]
|
||||||
|
res.append({"a": p["a"], "fa": p["fa"], "b": p["b"], "fb": p["fb"], "sim": p["sim"],
|
||||||
|
"relation": r.get("relation", "?"), "direction": r.get("direction", "none"),
|
||||||
|
"capability_name": r.get("capability_name", ""),
|
||||||
|
"evidence_name": r.get("evidence_name", ""), "reason": r.get("reason", "")})
|
||||||
|
dist = Counter(r["relation"] for r in res)
|
||||||
|
out = {"n_pairs": len(pairs), "n_classified": len(res), "distribution": dict(dist),
|
||||||
|
"model": a.model, "results": res}
|
||||||
|
json.dump(out, open(a.out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||||
|
print(f"classified {len(res)}/{len(pairs)} | {dict(dist)}")
|
||||||
|
print("written:", a.out)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,66 @@
|
|||||||
|
"""Cross-Domain Relationship Discovery — Stufe 1 (key-frei, im bp-compliance-backend-Container).
|
||||||
|
Alle Obligations mehrerer Registries -> BGE-M3-Embedding -> je Obligation Top-K Nachbarn ->
|
||||||
|
Kandidaten-Paare (cross- UND same-family) >= min-sim. KEIN Urteil hier — nur Kandidaten.
|
||||||
|
Stufe 2 (classify_relationships.py) klassifiziert die Beziehung per Opus.
|
||||||
|
|
||||||
|
python3 cross_domain_pairs.py /tmp/reg/cra.json /tmp/reg/cra_authentication.json ... \
|
||||||
|
--top-k 8 --min-sim 0.60 --out /tmp/cd_pairs.json
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
from _core import cosine
|
||||||
|
|
||||||
|
|
||||||
|
async def run(paths: list[str], top_k: int, min_sim: float, out: str) -> None:
|
||||||
|
from compliance.services.mc_embedding_matcher import _embed_texts
|
||||||
|
|
||||||
|
obls: list[dict] = []
|
||||||
|
for p in paths:
|
||||||
|
reg = json.load(open(p, encoding="utf-8"))
|
||||||
|
fam = reg.get("family", "")
|
||||||
|
for o in reg.get("obligations", []):
|
||||||
|
obls.append({"id": o["id"], "family": o.get("family", "") or fam,
|
||||||
|
"tier": o.get("tier", ""), "name": o.get("name", ""),
|
||||||
|
"desc": o.get("description", "")})
|
||||||
|
vecs = await _embed_texts([f'{o["name"]}. {o["desc"]}' for o in obls])
|
||||||
|
n = len(obls)
|
||||||
|
print(f"obligations={n}")
|
||||||
|
|
||||||
|
best: dict[tuple[int, int], float] = {}
|
||||||
|
for i in range(n):
|
||||||
|
nbrs = sorted(((cosine(vecs[i], vecs[j]), j) for j in range(n) if j != i), reverse=True)[:top_k]
|
||||||
|
for s, j in nbrs:
|
||||||
|
if s < min_sim:
|
||||||
|
continue
|
||||||
|
a, b = sorted((i, j))
|
||||||
|
if (a, b) not in best or s > best[(a, b)]:
|
||||||
|
best[(a, b)] = s
|
||||||
|
|
||||||
|
pairs = []
|
||||||
|
for (a, b), s in sorted(best.items(), key=lambda x: -x[1]):
|
||||||
|
pairs.append({
|
||||||
|
"a": obls[a]["id"], "fa": obls[a]["family"], "ta": obls[a]["tier"], "da": obls[a]["desc"][:220],
|
||||||
|
"b": obls[b]["id"], "fb": obls[b]["family"], "tb": obls[b]["tier"], "db": obls[b]["desc"][:220],
|
||||||
|
"sim": round(s, 3), "cross_family": obls[a]["family"] != obls[b]["family"]})
|
||||||
|
cf = sum(1 for p in pairs if p["cross_family"])
|
||||||
|
json.dump({"n_obligations": n, "n_pairs": len(pairs), "cross_family": cf, "pairs": pairs},
|
||||||
|
open(out, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
||||||
|
print(f"pairs={len(pairs)} (cross-family={cf}, same-family={len(pairs) - cf}) written: {out}")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("registries", nargs="+")
|
||||||
|
ap.add_argument("--top-k", type=int, default=8)
|
||||||
|
ap.add_argument("--min-sim", type=float, default=0.60)
|
||||||
|
ap.add_argument("--out", default="/tmp/cd_pairs.json")
|
||||||
|
a = ap.parse_args()
|
||||||
|
asyncio.run(run(a.registries, a.top_k, a.min_sim, a.out))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user