""" LLM verification for regex check results. When a regex check FAILs, the LLM re-checks the original text to confirm or overturn the finding. This eliminates false positives caused by regex limitations (unusual formatting, synonyms, etc.). Uses the self-hosted Ollama endpoint (Qwen) for fast local inference. """ import logging import os import httpx logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("OLLAMA_VERIFY_MODEL", "qwen3.5:35b-a3b") TIMEOUT = 30.0 async def verify_failed_checks( text: str, failed_checks: list[dict], doc_title: str, ) -> dict[str, dict]: """Verify regex FAIL results using LLM — single batched call. Sends ALL failed checks in one LLM prompt instead of one call per check. Returns a dict mapping check_id -> {"overturned": bool, "evidence": str}. """ results: dict[str, dict] = {} checks_with_hints = [c for c in failed_checks if c.get("hint")] if not checks_with_hints: return results # Truncate text to fit context window text_excerpt = text[:8000] try: batch_results = await _ask_llm_batch( text_excerpt, checks_with_hints, doc_title, ) for check_id, answer in batch_results.items(): overturned = answer.get("found", False) results[check_id] = { "overturned": overturned, "evidence": answer.get("evidence", ""), } if overturned: logger.info( "LLM overturned regex FAIL for '%s' in '%s': %s", check_id, doc_title, answer.get("evidence", "")[:80], ) except Exception as e: logger.warning("LLM batch verify failed for '%s': %s", doc_title, e) return results async def _ask_llm_batch( text: str, checks: list[dict], doc_title: str, ) -> dict[str, dict]: """Ask the LLM to verify ALL failed checks in a single call.""" checklist_lines = [] for i, c in enumerate(checks, 1): checklist_lines.append( f'{i}. ID="{c["id"]}" | {c["label"]} | {c.get("hint", "")[:120]}' ) checklist_str = "\n".join(checklist_lines) prompt = f"""/no_think Pruefe ob der Dokumenttext die folgenden Anforderungen erfuellt. DOKUMENT: "{doc_title}" ANFORDERUNGEN: {checklist_str} TEXT: {text} Antworte NUR mit einem JSON-Array (keine Erklaerung). Fuer jede Anforderung: [{{"id": "check-id", "found": true/false, "evidence": "Kurzes Zitat (max 80 Zeichen) oder leer"}}] """ async with httpx.AsyncClient(timeout=90.0) as client: resp = await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, "options": {"temperature": 0.0, "num_predict": 2000}, }, ) resp.raise_for_status() raw = resp.json().get("response", "") return _parse_batch_response(raw, checks) def _parse_batch_response(raw: str, checks: list[dict]) -> dict[str, dict]: """Parse batch LLM JSON array response.""" import json import re results: dict[str, dict] = {} raw = raw.strip() # Extract JSON array from markdown code blocks m = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", raw, re.DOTALL) if m: raw = m.group(1) else: m = re.search(r"\[.*\]", raw, re.DOTALL) if m: raw = m.group(0) try: items = json.loads(raw) if isinstance(items, list): for item in items: cid = item.get("id", "") if cid: results[cid] = { "found": bool(item.get("found", False)), "evidence": str(item.get("evidence", ""))[:150], } except (json.JSONDecodeError, ValueError): # Fallback: extract individual JSON objects for m in re.finditer(r'\{[^}]*"id"\s*:\s*"([^"]+)"[^}]*"found"\s*:\s*(true|false)[^}]*\}', raw, re.DOTALL): cid = m.group(1) found = m.group(2) == "true" results[cid] = {"found": found, "evidence": ""} logger.info("LLM batch: %d/%d checks parsed", len(results), len(checks)) return results