8937f105ea
- obligations/cra_updates.json: 9 (6 LEGAL_MINIMUM + 3 BEST_PRACTICE), Beziehungen. Pipeline 670->318 micro->15 review-units -> Opus-Synthese. Synthese gut kalibriert -> light review (KEINE Hart-Re-Tier, vs Auth/Remote-Access). out_of_scope M4/M7. 5 capability_candidate-Marker (signed/trusted/automatic/rollback/testing) fuer Phase-4-Capability-Pruefung. Anker approximativ (curation.anchor_quality). - obligation_join_keys.json: 84 -> 93 (updates 9). Alle 6 CRA-P1-Domaenen abgedeckt. - precluster.py: updates-Scope. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
88 lines
4.2 KiB
Python
88 lines
4.2 KiB
Python
"""Stufe 1 — Pre-Cluster: Controls (scope) → BGE-M3-Embedding (gecacht) → Mikro-Cluster.
|
|
Deterministisch. Im bp-compliance-backend-Container ausführen (PYTHONPATH=/app).
|
|
|
|
python3 scripts/obligation_discovery/precluster.py --scope sbom
|
|
python3 scripts/obligation_discovery/precluster.py --patterns '%sbom%,%software bill%' --micro-thr 0.78
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import pickle
|
|
|
|
from _core import greedy_cluster, parse_req
|
|
|
|
SCOPES = {
|
|
"sbom": ["%SBOM%", "%software bill%", "%stückliste%", "%komponentenliste%"],
|
|
"vuln": ["%schwachstellenbehandl%", "%schwachstellenmanagement%", "%vulnerability handling%",
|
|
"%coordinated vulnerab%", "%vulnerability disclosure%", "%cvd-konzept%"],
|
|
"auth": ["%authentisierung%", "%authentifizierung%", "%authentication%"],
|
|
"logging": ["%logging%", "%protokollierung%", "%audit-log%", "%audit-trail%",
|
|
"%ereignisprotokoll%", "%sicherheitsprotokoll%", "%audit-protokoll%",
|
|
"%log-management%", "%sicherheitsereignis%protokoll%", "%audit-trail%"],
|
|
"remote_access": ["%fernwartung%", "%fernzugriff%", "%fernzugang%", "%fernwartungs%",
|
|
"%remote access%", "%remote maintenance%", "%remote management%",
|
|
"%remote-wartung%", "%remote-zugriff%", "%remote-zugang%",
|
|
"%sichere fernwartung%", "%fernsteuerung%"],
|
|
"updates": ["%sicherheitsupdate%", "%security update%", "%sicherheits-update%",
|
|
"%security patch%", "%sicherheitspatch%", "%patch-management%",
|
|
"%patchmanagement%", "%patch management%", "%firmware-update%",
|
|
"%firmware update%", "%software-update%", "%software update%",
|
|
"%automatische aktualisierung%", "%update-mechanismus%",
|
|
"%update-bereitstellung%", "%bereitstellung von updates%",
|
|
"%sichere aktualisierung%", "%signierte update%", "%update-paket%"],
|
|
}
|
|
|
|
|
|
async def run(scope: str, patterns: list[str], micro_thr: float, outdir: str) -> None:
|
|
import asyncpg
|
|
from compliance.services.mc_embedding_matcher import _embed_texts
|
|
|
|
dsn = os.getenv("DATABASE_URL") or os.getenv("COMPLIANCE_DATABASE_URL")
|
|
conn = await asyncpg.connect(dsn)
|
|
where = " or ".join(f"title ilike ${i+1}" for i in range(len(patterns)))
|
|
rows = await conn.fetch(
|
|
f"select control_id, title, requirements from compliance.canonical_controls "
|
|
f"where {where} order by control_id", *patterns)
|
|
await conn.close()
|
|
items = [{"control_id": r["control_id"], "title": r["title"] or "",
|
|
"embed_text": (r["title"] or "") + ". " + " ".join(parse_req(r["requirements"])[:2])}
|
|
for r in rows]
|
|
print(f"scope={scope}: {len(items)} controls")
|
|
|
|
cache = os.path.join(outdir, f"{scope}_vecs.pkl")
|
|
if os.path.exists(cache):
|
|
vecs = pickle.load(open(cache, "rb"))
|
|
print(f"embeddings from cache ({len(vecs)})")
|
|
else:
|
|
vecs = await _embed_texts([it["embed_text"] for it in items])
|
|
pickle.dump(vecs, open(cache, "wb"))
|
|
print(f"embeddings fresh+cached ({len(vecs)})")
|
|
|
|
micro = greedy_cluster(vecs, micro_thr)
|
|
print(f"pass-1 (micro-thr={micro_thr}): {len(items)} → {len(micro)} micro-clusters")
|
|
out = [{"micro_id": i, "size": len(c["members"]), "member_indices": c["members"],
|
|
"control_ids": [items[j]["control_id"] for j in c["members"]],
|
|
"titles": [items[j]["title"] for j in c["members"][:6]]}
|
|
for i, c in enumerate(micro)]
|
|
path = os.path.join(outdir, f"{scope}_micro_clusters.json")
|
|
json.dump(out, open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
|
|
print(f"written: {path}")
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--scope", default="sbom")
|
|
ap.add_argument("--patterns", default="", help="comma-separated SQL ILIKE patterns (overrides --scope)")
|
|
ap.add_argument("--micro-thr", type=float, default=0.78)
|
|
ap.add_argument("--outdir", default="/tmp")
|
|
a = ap.parse_args()
|
|
patterns = [p for p in a.patterns.split(",") if p] or SCOPES[a.scope]
|
|
asyncio.run(run(a.scope, patterns, a.micro_thr, a.outdir))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|