""" Document Checker with Master Controls — SQL-based deep verification. Uses doc_check_controls from PostgreSQL with: - check_question: binary YES/NO question - pass_criteria: JSONB list of concrete must-haves - fail_criteria: JSONB list of common mistakes - LLM (Qwen) verifies each control against document text Flow: Document text + doc_type → SQL query: SELECT * FROM compliance.doc_check_controls WHERE doc_type = ? → For each control: LLM answers check_question with pass/fail criteria → Returns structured results compatible with CheckItem format """ import logging import os import re import json as _json from typing import Optional import httpx logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b") # Map our doc_types to the DB doc_type values DOC_TYPE_MAP = { "dse": "dse", "datenschutz": "dse", "privacy": "dse", "cookie": "cookie", "impressum": "impressum", "imprint": "impressum", "widerruf": "widerruf", "withdrawal": "widerruf", "agb": "agb", "terms": "agb", "dsfa": "dsfa", "social_media": "dse", "avv": "avv", "loeschkonzept": "loeschkonzept", } async def check_document_with_controls( text: str, doc_type: str, doc_title: str, db_url: str = "", max_controls: int = 20, ) -> list[dict]: """Check document against doc_check_controls from DB. Returns list of CheckItem-compatible dicts. """ if not text or len(text) < 100: return [] mapped_type = DOC_TYPE_MAP.get(doc_type, doc_type) try: import asyncpg db = db_url or os.getenv( "DATABASE_URL", "postgresql://breakpilot:breakpilot@bp-core-postgres:5432/breakpilot", ) conn = await asyncpg.connect(db) except Exception as e: logger.warning("DB connection failed: %s", e) return [] try: rows = await conn.fetch( """SELECT id, control_id, title, regulation, check_question, pass_criteria, fail_criteria, severity FROM compliance.doc_check_controls WHERE doc_type = $1 ORDER BY severity DESC, title LIMIT $2""", mapped_type, max_controls, ) except Exception as e: logger.warning("MC query failed: %s", e) await conn.close() return [] await conn.close() if not rows: logger.info("No MCs for doc_type '%s' (%s)", mapped_type, doc_title) return [] logger.info("Checking %d MCs for '%s' (%s)", len(rows), doc_title, mapped_type) results = [] for row in rows: result = await _verify_mc(text, dict(row)) if result: results.append(result) passed = sum(1 for r in results if r["passed"]) logger.info("MC results: %d/%d passed for '%s'", passed, len(results), doc_title) return results async def _verify_mc(text: str, mc: dict) -> Optional[dict]: """Verify one master control against document text via LLM.""" question = mc.get("check_question", "") if not question: return None pass_crit = mc.get("pass_criteria", []) fail_crit = mc.get("fail_criteria", []) # Parse JSON if string if isinstance(pass_crit, str): try: pass_crit = _json.loads(pass_crit) except Exception: pass_crit = [pass_crit] if isinstance(fail_crit, str): try: fail_crit = _json.loads(fail_crit) except Exception: fail_crit = [fail_crit] pass_str = "\n".join(f" - {p}" for p in pass_crit[:5]) fail_str = "\n".join(f" - {f}" for f in fail_crit[:5]) # Truncate text doc_excerpt = text[:6000] if len(text) <= 8000 else text[:4000] + "\n...\n" + text[-2000:] prompt = ( f"/no_think\n" f"FRAGE: {question}\n\n" f"PASS wenn ALLE zutreffen:\n{pass_str}\n\n" f"FAIL wenn EINES zutrifft:\n{fail_str}\n\n" f"DOKUMENT:\n{doc_excerpt[:5000]}\n\n" f'Antworte NUR mit JSON: {{"passed": true/false, "evidence": "Textstelle max 80 Zeichen oder leer"}}' ) try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, "options": {"temperature": 0.0, "num_predict": 200}, }) if resp.status_code != 200: return None raw = resp.json().get("response", "") raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() # Parse JSON json_match = re.search(r"\{[^{}]+\}", raw) if json_match: json_str = json_match.group() json_str = re.sub(r"(?<=[{,])\s*(\w+)\s*:", r' "\1":', json_str) json_str = json_str.replace("True", "true").replace("False", "false") try: result = _json.loads(json_str) return { "id": f"mc-{mc.get('control_id', mc['id'][:8])}", "label": mc["title"][:80], "passed": bool(result.get("passed", False)), "severity": (mc.get("severity") or "MEDIUM").upper(), "matched_text": str(result.get("evidence", ""))[:100], "level": 2, "parent": None, "skipped": False, "hint": question, "source": "master_control", } except _json.JSONDecodeError: pass # Fallback passed = '"passed": true' in raw.lower() or '"passed":true' in raw.lower() return { "id": f"mc-{mc.get('control_id', mc['id'][:8])}", "label": mc["title"][:80], "passed": passed, "severity": (mc.get("severity") or "MEDIUM").upper(), "matched_text": "", "level": 2, "parent": None, "skipped": False, "hint": question, "source": "master_control", } except Exception as e: logger.warning("MC verify failed for '%s': %s", mc["title"][:40], e) return None