Files
breakpilot-compliance/scripts/qa/poc_cra_article_assign.py
T
Benjamin Admin e1f89f6226 feat(cra): CRA/Cyber-Tab in 3 Zielgruppen-Ebenen + Brücke /sdk/cra
Frontend-Reorganisation (kein Datenmodell-Umbau):
- Ebene 1 (Management): CRA-Readiness, offene Risiken (Klartext Kritisch/Hoch/..),
  Handlungsaufwand nach Evidenz-Typ, betroffene Vorschriften, Top-Risiken, Fristen.
- Ebene 2 (Safety × Cyber): "Cyber öffnet CE-Gefährdung erneut" als Hero (USP).
- Ebene 3 (Technik): Befund-Tabelle einklappbar, interne IDs (CRA-AI-x/CWE/NIST/
  OWASP/ISO) nur im Detail, Maßnahmen-Namen statt M-IDs, größere Schrift.
- Brücke: IACE-CRA-Tab ↔ /sdk/cra (Readiness-Check) beidseitig verlinkt.
- CRACyberView in Unterkomponenten gesplittet (LOC < 300).

scripts/qa/poc_cra_article_assign.py: PoC Artikel/Absatz-Zuordnung (Pfad B2b,
zurückgestellt — nicht MVP).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-15 00:48:53 +02:00

142 lines
5.3 KiB
Python

"""PoC: Artikel/Absatz-Zuordnung für CRA-Controls (Pfad B2b).
Pro Control: semantische Suche (Go-SDK /rag/search, nomic-embed + Qdrant) holt
die besten artikel-getaggten CRA-Chunks; Haiku wählt die passende Fundstelle und
gibt {article, paragraph, confidence}. Schreibt NICHTS in die DB — nur Report zur
Validierung (50er-Stichprobe). Lauf:
ssh macmini 'docker exec -i -e POC_N=8 bp-compliance-backend python3 -' \
< scripts/qa/poc_cra_article_assign.py
"""
import json
import os
import re
import httpx
from sqlalchemy import create_engine, text
N = int(os.environ.get("POC_N", "50"))
DB = os.environ.get("COMPLIANCE_DATABASE_URL") or os.environ["DATABASE_URL"]
SDK = os.environ.get("SDK_URL", "http://ai-compliance-sdk:8090")
AKEY = (os.environ.get("ANTHROPIC_API_KEY") or "").strip()
MODEL = os.environ.get("POC_HAIKU_MODEL", "claude-haiku-4-5-20251001")
_MARKER = re.compile(r"^\[([^\]]+)\]")
_BINDING = ("artikel", "anhang", "annex", "art.", "teil")
_JSON = re.compile(r"\{.*\}", re.DOTALL)
def is_binding(marker: str) -> bool:
m = marker.lower()
return any(k in m for k in _BINDING)
def retrieve(query: str) -> list:
"""Top binding (article/annex) CRA chunks for the control text."""
try:
r = httpx.post(
f"{SDK}/sdk/v1/rag/search",
json={"query": query, "collection": "bp_compliance_ce",
"top_k": 12, "regulations": ["cra_2024"]},
timeout=20.0,
)
res = r.json().get("results", [])
except Exception as e: # noqa: BLE001
return [{"_err": str(e)}]
cands = []
for x in res:
t = x.get("text") or ""
m = _MARKER.match(t)
if m and is_binding(m.group(1)):
cands.append({"marker": m.group(1).strip(),
"text": t[:400], "score": x.get("score", 0.0)})
if len(cands) >= 3:
break
return cands
def haiku(control_text: str, cands: list) -> dict:
block = "\n".join(
f"[{i+1}] {c['marker']}: {c['text'][:300]}" for i, c in enumerate(cands)
)
prompt = (
"Eine CRA-Compliance-Pflicht soll der korrekten Fundstelle im Cyber "
"Resilience Act (Verordnung (EU) 2024/2847) zugeordnet werden.\n\n"
f"PFLICHT:\n{control_text}\n\n"
f"KANDIDATEN-FUNDSTELLEN (aus dem CRA-Volltext):\n{block}\n\n"
"Wähle die Fundstelle, die die Pflicht am genauesten verankert. "
"Antworte NUR mit JSON: "
'{"article":"Artikel N|Anhang X","paragraph":"Absatz N|","candidate":N,'
'"confidence":0.0}. Wenn keine passt: '
'{"article":"","paragraph":"","candidate":0,"confidence":0.0}'
)
r = httpx.post(
"https://api.anthropic.com/v1/messages",
headers={"x-api-key": AKEY, "anthropic-version": "2023-06-01",
"content-type": "application/json"},
json={"model": MODEL, "max_tokens": 200,
"messages": [{"role": "user", "content": prompt}]},
timeout=60.0,
)
data = r.json()
if "content" not in data:
return {"_err": str(data)[:200]}
m = _JSON.search(data["content"][0]["text"])
return json.loads(m.group(0)) if m else {"_err": "no json"}
def main() -> None:
eng = create_engine(DB)
with eng.connect() as c:
c.execute(text("SET search_path TO compliance, core, public"))
rows = c.execute(text("""
SELECT cc.id::text uid, cc.control_id,
trim(coalesce(cc.title,'') || '. ' || coalesce(cc.objective,'')) ctext,
cpl.source_article existing
FROM atom_classification ac
JOIN canonical_controls cc ON cc.id = ac.control_uuid
JOIN control_parent_links cpl ON cpl.control_uuid = ac.control_uuid
WHERE ac.use_case = 'cra' AND ac.relevant = true
ORDER BY md5(cc.control_id)
LIMIT :n
"""), {"n": N}).fetchall()
print(f"PoC CRA Artikel-Zuordnung — {len(rows)} Controls, Modell {MODEL}\n")
n_assigned = n_conf = n_changed = n_nocand = 0
for row in rows:
cands = retrieve(row.ctext or row.control_id)
if cands and cands[0].get("_err"):
print(f"[{row.control_id}] RAG-ERR {cands[0]['_err'][:80]}")
continue
if not cands:
n_nocand += 1
print(f"[{row.control_id}] alt={row.existing!r:30} → KEINE Artikel-Kandidaten")
continue
v = haiku(row.ctext, cands)
if v.get("_err"):
print(f"[{row.control_id}] HAIKU-ERR {v['_err'][:80]}")
continue
art = v.get("article", "")
para = v.get("paragraph", "")
conf = v.get("confidence", 0.0)
if art:
n_assigned += 1
if conf >= 0.7:
n_conf += 1
if art and art.lower().replace("artikel", "art").strip() not in (row.existing or "").lower():
n_changed += 1
newref = f"{art}{(' ' + para) if para else ''}"
print(f"[{row.control_id}] conf={conf:.2f} NEU={newref!r:24} ALT={row.existing!r:30} "
f"| top-cand={cands[0]['marker'][:18]!r}")
print(f" pflicht: {(row.ctext or '')[:95]}")
print(f"\n--- Summe ({len(rows)}) ---")
print(f" Artikel zugeordnet : {n_assigned}")
print(f" confidence >= 0.70 : {n_conf}")
print(f" abweichend von ALT : {n_changed}")
print(f" keine Kandidaten : {n_nocand}")
if __name__ == "__main__":
main()