""" LLM-tagged regulation backfill v2. The regex pass (backfill_mc_regulation.py) covers ~13% of MCs that quote a norm inline ('Art. X DSGVO', '§Y TDDDG'). The remaining 1636 are abstract security/process controls that don't cite a norm by name but DO map to one (e.g. 'Pseudonymisierung' -> DSGVO Art. 32). This script asks Qwen (Ollama) in batches of 25 to classify each MC. Output is constrained: one of DSGVO / TDDDG / BGB / HGB / AO / TMG / BDSG / MStV / UWG / VSBG / NIS2 / ISO27001 / BSI-GS / NIST / OTHER / NONE — plus an optional article hint. Run inside the backend container: docker exec bp-compliance-backend python3 \\ /app/scripts/backfill_mc_regulation_llm.py [--dry-run] [--limit N] """ from __future__ import annotations import argparse import asyncio import json import logging import os import re import sys import asyncpg import httpx logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") MODEL = os.getenv("CMP_LLM_MODEL", "qwen3:30b-a3b") BATCH = 25 _ALLOWED = { "DSGVO", "TDDDG", "BGB", "HGB", "AO", "TMG", "BDSG", "MStV", "UWG", "VSBG", "PAngV", "GwG", "NIS2", "ISO27001", "BSI-GS", "NIST", "TKG", "EU-VO", "OTHER", "NONE", } _SYSTEM = ( "Du klassifizierst Compliance Master-Controls nach der wichtigsten " "rechtlichen oder normativen Grundlage. Antworte AUSSCHLIESSLICH " 'mit JSON: {"items": [{"id":"", "regulation":"", "article":""}]}. ' "Eine MC -> EIN Eintrag. Wenn keine Norm passt: NONE. " "Keine Erklaerung, kein Markdown." ) def _build_user_prompt(batch: list[dict]) -> str: lines = ["Klassifiziere diese Master-Controls:"] for r in batch: title = (r["title"] or "")[:120] q = (r["check_question"] or "")[:200] lines.append(f'- id={r["id_short"]} | {title}') if q: lines.append(f" -> {q}") return "\n".join(lines) async def _ask_llm(batch: list[dict]) -> dict[str, dict]: payload = { "model": MODEL, "messages": [ {"role": "system", "content": _SYSTEM}, {"role": "user", "content": _build_user_prompt(batch)}, ], "stream": False, "format": "json", "options": {"temperature": 0.0, "num_predict": 2500}, } try: async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( f"{OLLAMA_URL.rstrip('/')}/api/chat", json=payload, ) resp.raise_for_status() content = (resp.json().get("message") or {}).get("content", "") except Exception as e: logger.warning("LLM call failed: %s", e) return {} if not content: return {} # Strip qwen thinking content = re.sub(r".*?", "", content, flags=re.DOTALL).strip() try: obj = json.loads(content) except Exception: return {} items = obj.get("items") if isinstance(obj, dict) else None if not isinstance(items, list): return {} out: dict[str, dict] = {} for it in items: if not isinstance(it, dict): continue sid = str(it.get("id", "")).strip() reg = str(it.get("regulation", "")).strip().upper() art = str(it.get("article", "")).strip() if not sid or reg not in _ALLOWED: continue if reg == "NONE": continue out[sid] = {"regulation": reg, "article": art[:120]} return out async def main(dry_run: bool, limit: int) -> None: db = os.getenv("DATABASE_URL") if not db: print("DATABASE_URL not set", file=sys.stderr) sys.exit(1) conn = await asyncpg.connect(db) sql = ("SELECT id, title, check_question FROM compliance.doc_check_controls " "WHERE regulation IS NULL ORDER BY id") if limit > 0: sql += f" LIMIT {limit}" rows = await conn.fetch(sql) print(f"{len(rows)} MCs without regulation — calling LLM in batches of {BATCH}") by_short: dict[str, str] = {} batches: list[list[dict]] = [] cur: list[dict] = [] for r in rows: sid = str(r["id"])[:8] by_short[sid] = str(r["id"]) cur.append({"id_short": sid, "title": r["title"], "check_question": r["check_question"]}) if len(cur) >= BATCH: batches.append(cur); cur = [] if cur: batches.append(cur) updates: list[tuple[str, str, str]] = [] # (regulation, article, uuid) hits: dict[str, int] = {} for i, batch in enumerate(batches, 1): logger.info("batch %d/%d (%d items)", i, len(batches), len(batch)) res = await _ask_llm(batch) for short, m in res.items(): uuid = by_short.get(short) if not uuid: continue updates.append((m["regulation"], m["article"], uuid)) hits[m["regulation"]] = hits.get(m["regulation"], 0) + 1 print(f"\nLLM classified: {sum(hits.values())} / {len(rows)}") for k, v in sorted(hits.items(), key=lambda x: -x[1]): print(f" {k:10s} {v:>5}") if dry_run: print("\nDRY RUN — no UPDATE issued.") await conn.close() return for i in range(0, len(updates), 200): chunk = updates[i:i + 200] await conn.executemany( "UPDATE compliance.doc_check_controls " "SET regulation = $1, article = $2 WHERE id = $3::uuid", chunk, ) print(f"\nApplied {len(updates)} updates.") await conn.close() if __name__ == "__main__": ap = argparse.ArgumentParser() ap.add_argument("--dry-run", action="store_true") ap.add_argument("--limit", type=int, default=0, help="process only N MCs (0 = all)") args = ap.parse_args() asyncio.run(main(args.dry_run, args.limit))