""" LLM verification for regex check results. When a regex check FAILs, the LLM re-checks the original text to confirm or overturn the finding. This eliminates false positives caused by regex limitations (unusual formatting, synonyms, etc.). Uses the self-hosted Ollama endpoint (Qwen) for fast local inference. """ import logging import os import httpx logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("OLLAMA_VERIFY_MODEL", "qwen3.5:35b-a3b") TIMEOUT = 30.0 async def verify_failed_checks( text: str, failed_checks: list[dict], doc_title: str, ) -> dict[str, dict]: """Verify regex FAIL results using LLM. For each failed check, asks the LLM a binary YES/NO question. Returns a dict mapping check_id -> {"overturned": bool, "evidence": str}. Only checks with a "hint" field are verified (hints contain the natural-language question the LLM can answer). """ results: dict[str, dict] = {} if not failed_checks: return results # Truncate text to fit context window text_excerpt = text[:8000] for check in failed_checks: check_id = check.get("id", "") label = check.get("label", "") hint = check.get("hint", "") if not hint: continue try: answer = await _ask_llm(text_excerpt, label, hint, doc_title) overturned = answer.get("found", False) results[check_id] = { "overturned": overturned, "evidence": answer.get("evidence", ""), } if overturned: logger.info( "LLM overturned regex FAIL for '%s' in '%s': %s", label, doc_title, answer.get("evidence", "")[:80], ) except Exception as e: logger.warning("LLM verify failed for '%s': %s", label, e) return results async def _ask_llm( text: str, check_label: str, hint: str, doc_title: str, ) -> dict: """Ask the LLM a binary verification question.""" prompt = f"""/no_think Pruefe ob der folgende Dokumenttext die Anforderung erfuellt. ANFORDERUNG: {check_label} DETAILS: {hint} DOKUMENT: "{doc_title}" TEXT: {text} Antworte NUR mit einem JSON-Objekt (keine Erklaerung): {{"found": true/false, "evidence": "Zitat aus dem Text das die Anforderung belegt (max 100 Zeichen), oder leer wenn nicht gefunden"}} """ async with httpx.AsyncClient(timeout=TIMEOUT) as client: resp = await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, "options": {"temperature": 0.0, "num_predict": 200}, }, ) resp.raise_for_status() raw = resp.json().get("response", "") return _parse_llm_response(raw) def _parse_llm_response(raw: str) -> dict: """Parse LLM JSON response with fallback extraction.""" import json import re # Try direct JSON parse raw = raw.strip() # Extract JSON from markdown code blocks m = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL) if m: raw = m.group(1) # Or just find the JSON object m = re.search(r"\{[^}]*\"found\"[^}]*\}", raw, re.DOTALL) if m: raw = m.group(0) try: data = json.loads(raw) return { "found": bool(data.get("found", False)), "evidence": str(data.get("evidence", ""))[:150], } except (json.JSONDecodeError, ValueError): # Fallback: look for "found": true/false found = '"found": true' in raw.lower() or '"found":true' in raw.lower() return {"found": found, "evidence": ""}