"""PoC: Artikel/Absatz-Zuordnung für CRA-Controls (Pfad B2b). Pro Control: semantische Suche (Go-SDK /rag/search, nomic-embed + Qdrant) holt die besten artikel-getaggten CRA-Chunks; Haiku wählt die passende Fundstelle und gibt {article, paragraph, confidence}. Schreibt NICHTS in die DB — nur Report zur Validierung (50er-Stichprobe). Lauf: ssh macmini 'docker exec -i -e POC_N=8 bp-compliance-backend python3 -' \ < scripts/qa/poc_cra_article_assign.py """ import json import os import re import httpx from sqlalchemy import create_engine, text N = int(os.environ.get("POC_N", "50")) DB = os.environ.get("COMPLIANCE_DATABASE_URL") or os.environ["DATABASE_URL"] SDK = os.environ.get("SDK_URL", "http://ai-compliance-sdk:8090") AKEY = (os.environ.get("ANTHROPIC_API_KEY") or "").strip() MODEL = os.environ.get("POC_HAIKU_MODEL", "claude-haiku-4-5-20251001") _MARKER = re.compile(r"^\[([^\]]+)\]") _BINDING = ("artikel", "anhang", "annex", "art.", "teil") _JSON = re.compile(r"\{.*\}", re.DOTALL) def is_binding(marker: str) -> bool: m = marker.lower() return any(k in m for k in _BINDING) def retrieve(query: str) -> list: """Top binding (article/annex) CRA chunks for the control text.""" try: r = httpx.post( f"{SDK}/sdk/v1/rag/search", json={"query": query, "collection": "bp_compliance_ce", "top_k": 12, "regulations": ["cra_2024"]}, timeout=20.0, ) res = r.json().get("results", []) except Exception as e: # noqa: BLE001 return [{"_err": str(e)}] cands = [] for x in res: t = x.get("text") or "" m = _MARKER.match(t) if m and is_binding(m.group(1)): cands.append({"marker": m.group(1).strip(), "text": t[:400], "score": x.get("score", 0.0)}) if len(cands) >= 3: break return cands def haiku(control_text: str, cands: list) -> dict: block = "\n".join( f"[{i+1}] {c['marker']}: {c['text'][:300]}" for i, c in enumerate(cands) ) prompt = ( "Eine CRA-Compliance-Pflicht soll der korrekten Fundstelle im Cyber " "Resilience Act (Verordnung (EU) 2024/2847) zugeordnet werden.\n\n" f"PFLICHT:\n{control_text}\n\n" f"KANDIDATEN-FUNDSTELLEN (aus dem CRA-Volltext):\n{block}\n\n" "Wähle die Fundstelle, die die Pflicht am genauesten verankert. " "Antworte NUR mit JSON: " '{"article":"Artikel N|Anhang X","paragraph":"Absatz N|","candidate":N,' '"confidence":0.0}. Wenn keine passt: ' '{"article":"","paragraph":"","candidate":0,"confidence":0.0}' ) r = httpx.post( "https://api.anthropic.com/v1/messages", headers={"x-api-key": AKEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}, json={"model": MODEL, "max_tokens": 200, "messages": [{"role": "user", "content": prompt}]}, timeout=60.0, ) data = r.json() if "content" not in data: return {"_err": str(data)[:200]} m = _JSON.search(data["content"][0]["text"]) return json.loads(m.group(0)) if m else {"_err": "no json"} def main() -> None: eng = create_engine(DB) with eng.connect() as c: c.execute(text("SET search_path TO compliance, core, public")) rows = c.execute(text(""" SELECT cc.id::text uid, cc.control_id, trim(coalesce(cc.title,'') || '. ' || coalesce(cc.objective,'')) ctext, cpl.source_article existing FROM atom_classification ac JOIN canonical_controls cc ON cc.id = ac.control_uuid JOIN control_parent_links cpl ON cpl.control_uuid = ac.control_uuid WHERE ac.use_case = 'cra' AND ac.relevant = true ORDER BY md5(cc.control_id) LIMIT :n """), {"n": N}).fetchall() print(f"PoC CRA Artikel-Zuordnung — {len(rows)} Controls, Modell {MODEL}\n") n_assigned = n_conf = n_changed = n_nocand = 0 for row in rows: cands = retrieve(row.ctext or row.control_id) if cands and cands[0].get("_err"): print(f"[{row.control_id}] RAG-ERR {cands[0]['_err'][:80]}") continue if not cands: n_nocand += 1 print(f"[{row.control_id}] alt={row.existing!r:30} → KEINE Artikel-Kandidaten") continue v = haiku(row.ctext, cands) if v.get("_err"): print(f"[{row.control_id}] HAIKU-ERR {v['_err'][:80]}") continue art = v.get("article", "") para = v.get("paragraph", "") conf = v.get("confidence", 0.0) if art: n_assigned += 1 if conf >= 0.7: n_conf += 1 if art and art.lower().replace("artikel", "art").strip() not in (row.existing or "").lower(): n_changed += 1 newref = f"{art}{(' ' + para) if para else ''}" print(f"[{row.control_id}] conf={conf:.2f} NEU={newref!r:24} ALT={row.existing!r:30} " f"| top-cand={cands[0]['marker'][:18]!r}") print(f" pflicht: {(row.ctext or '')[:95]}") print(f"\n--- Summe ({len(rows)}) ---") print(f" Artikel zugeordnet : {n_assigned}") print(f" confidence >= 0.70 : {n_conf}") print(f" abweichend von ALT : {n_changed}") print(f" keine Kandidaten : {n_nocand}") if __name__ == "__main__": main()