Files
breakpilot-compliance/scripts/obligation_discovery/precluster.py
T
Benjamin Admin 1584b8fb2f feat(bridge): remote-access obligation cut (CRA Annex I) — 18 obligations
- obligations/cra_remote_access.json: 18 (5 LEGAL_MINIMUM outcomes + 13 BEST_PRACTICE),
  15 Beziehungen. Two-stage clustering 445->209 micro->27 review-units -> Opus-Synthese.
  Synthese vergab 14 LM -> key-free re-tier nach Auth-Regel (Mechanismen MFA/Session/VPN/
  insecure-protocol/OT/Wartungs-Governance/temp/data-export/component -> BEST_PRACTICE +
  supports-Kante zur Eltern-LM). out_of_scope M5/M11 = physische Maschinen-Fernsteuerung
  (MaschinenVO 2023/1230). Anker approximativ (siehe curation.anchor_quality).
- obligation_join_keys.json: 66 -> 84 (remote_access 18).
- precluster.py: remote_access-Scope.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-25 18:37:10 +02:00

81 lines
3.6 KiB
Python

"""Stufe 1 — Pre-Cluster: Controls (scope) → BGE-M3-Embedding (gecacht) → Mikro-Cluster.
Deterministisch. Im bp-compliance-backend-Container ausführen (PYTHONPATH=/app).
python3 scripts/obligation_discovery/precluster.py --scope sbom
python3 scripts/obligation_discovery/precluster.py --patterns '%sbom%,%software bill%' --micro-thr 0.78
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import pickle
from _core import greedy_cluster, parse_req
SCOPES = {
"sbom": ["%SBOM%", "%software bill%", "%stückliste%", "%komponentenliste%"],
"vuln": ["%schwachstellenbehandl%", "%schwachstellenmanagement%", "%vulnerability handling%",
"%coordinated vulnerab%", "%vulnerability disclosure%", "%cvd-konzept%"],
"auth": ["%authentisierung%", "%authentifizierung%", "%authentication%"],
"logging": ["%logging%", "%protokollierung%", "%audit-log%", "%audit-trail%",
"%ereignisprotokoll%", "%sicherheitsprotokoll%", "%audit-protokoll%",
"%log-management%", "%sicherheitsereignis%protokoll%", "%audit-trail%"],
"remote_access": ["%fernwartung%", "%fernzugriff%", "%fernzugang%", "%fernwartungs%",
"%remote access%", "%remote maintenance%", "%remote management%",
"%remote-wartung%", "%remote-zugriff%", "%remote-zugang%",
"%sichere fernwartung%", "%fernsteuerung%"],
}
async def run(scope: str, patterns: list[str], micro_thr: float, outdir: str) -> None:
import asyncpg
from compliance.services.mc_embedding_matcher import _embed_texts
dsn = os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
conn = await asyncpg.connect(dsn)
where = " or ".join(f"title ilike ${i+1}" for i in range(len(patterns)))
rows = await conn.fetch(
f"select control_id, title, requirements from compliance.canonical_controls "
f"where {where} order by control_id", *patterns)
await conn.close()
items = [{"control_id": r["control_id"], "title": r["title"] or "",
"embed_text": (r["title"] or "") + ". " + " ".join(parse_req(r["requirements"])[:2])}
for r in rows]
print(f"scope={scope}: {len(items)} controls")
cache = os.path.join(outdir, f"{scope}_vecs.pkl")
if os.path.exists(cache):
vecs = pickle.load(open(cache, "rb"))
print(f"embeddings from cache ({len(vecs)})")
else:
vecs = await _embed_texts([it["embed_text"] for it in items])
pickle.dump(vecs, open(cache, "wb"))
print(f"embeddings fresh+cached ({len(vecs)})")
micro = greedy_cluster(vecs, micro_thr)
print(f"pass-1 (micro-thr={micro_thr}): {len(items)}{len(micro)} micro-clusters")
out = [{"micro_id": i, "size": len(c["members"]), "member_indices": c["members"],
"control_ids": [items[j]["control_id"] for j in c["members"]],
"titles": [items[j]["title"] for j in c["members"][:6]]}
for i, c in enumerate(micro)]
path = os.path.join(outdir, f"{scope}_micro_clusters.json")
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print(f"written: {path}")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--scope", default="sbom")
ap.add_argument("--patterns", default="", help="comma-separated SQL ILIKE patterns (overrides --scope)")
ap.add_argument("--micro-thr", type=float, default=0.78)
ap.add_argument("--outdir", default="/tmp")
a = ap.parse_args()
patterns = [p for p in a.patterns.split(",") if p] or SCOPES[a.scope]
asyncio.run(run(a.scope, patterns, a.micro_thr, a.outdir))
if __name__ == "__main__":
main()