""" Document Checker with Master Controls — deterministic keyword verification. Checks ALL doc_check_controls for the given doc_type using keyword extraction from pass_criteria/fail_criteria. No LLM needed for the primary check — results are 100% deterministic and reproducible. Flow: Document text + doc_type → Load ALL MCs from compliance.doc_check_controls WHERE doc_type = ? → For each MC: extract keywords from pass_criteria → Match keywords against document text (regex, case-insensitive) → PASS if enough pass_criteria met AND no fail_criteria triggered → Returns structured results compatible with CheckItem format """ import logging import os import re from typing import Optional import httpx logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b") # Minimum keyword match ratio to consider a criterion "met" PASS_THRESHOLD = 0.5 # At least 50% of extracted keywords must match async def check_document_with_controls( text: str, doc_type: str, doc_title: str, db_url: str = "", max_controls: int = 0, # 0 = no limit, check ALL use_agent: bool = False, # Use LLM agent for intelligent evaluation ) -> list[dict]: """Check document against ALL doc_check_controls for this doc_type. Two modes: - use_agent=False (default): Deterministic keyword matching. Fast, reproducible. - use_agent=True: LLM agent with tool calling. Intelligent, contextual. """ if use_agent: try: from compliance.services.compliance_agent import run_compliance_check return await run_compliance_check(text, doc_type, doc_title) except Exception as e: logger.warning("Agent mode failed, falling back to regex: %s", e) if not text or len(text) < 100: return [] mapped_type = _map_doc_type(doc_type) # Load ALL controls for this doc_type controls = await _load_controls(mapped_type, db_url, max_controls) if not controls: logger.info("No MCs for doc_type '%s' (%s)", mapped_type, doc_title) return [] logger.info("Checking %d MCs for '%s' (%s)", len(controls), doc_title, mapped_type) text_lower = text.lower().replace("\xad", "") # Strip soft hyphens results = [] for mc in controls: result = _check_mc_deterministic(text_lower, mc) if result: results.append(result) passed = sum(1 for r in results if r["passed"]) failed_results = [r for r in results if not r["passed"]] logger.info("MC results: %d passed, %d failed out of %d for '%s'", passed, len(failed_results), len(results), doc_title) # LLM Interpretation: enrich FAILs with context-specific recommendations if failed_results: try: await _enrich_fails_with_llm(text, failed_results, doc_title) except Exception as e: logger.warning("LLM interpretation skipped: %s", e) return results def _check_mc_deterministic(text_lower: str, mc: dict) -> Optional[dict]: """Check one MC against document text using keyword matching. Deterministic: extracts keywords from pass_criteria, searches text. """ import json question = mc.get("check_question", "") if not question: return None pass_crit = mc.get("pass_criteria", []) fail_crit = mc.get("fail_criteria", []) # Parse JSON if needed if isinstance(pass_crit, str): try: pass_crit = json.loads(pass_crit) except Exception: pass_crit = [pass_crit] if pass_crit else [] if isinstance(fail_crit, str): try: fail_crit = json.loads(fail_crit) except Exception: fail_crit = [fail_crit] if fail_crit else [] if not pass_crit: return None # Check how many pass_criteria are met criteria_met = 0 total_criteria = len(pass_crit) evidence = "" for criterion in pass_crit: keywords = _extract_keywords(criterion) if not keywords: criteria_met += 1 # Empty criterion = auto-pass continue # Count how many keywords match matched = sum(1 for kw in keywords if kw in text_lower) ratio = matched / len(keywords) if keywords else 0 if ratio >= PASS_THRESHOLD: criteria_met += 1 # Find evidence if not evidence: for kw in keywords: idx = text_lower.find(kw) if idx >= 0: start = max(0, idx - 30) end = min(len(text_lower), idx + len(kw) + 30) evidence = text_lower[start:end].strip() break # Check fail_criteria (any match = penalty) fail_triggered = False for criterion in fail_crit: keywords = _extract_keywords(criterion) if not keywords: continue matched = sum(1 for kw in keywords if kw in text_lower) if matched >= len(keywords) * 0.7: # 70% of fail keywords match fail_triggered = True break # Decision: PASS if majority of criteria met and no fail triggered passed = (criteria_met >= total_criteria * 0.6) and not fail_triggered severity = (mc.get("severity") or "MEDIUM").upper() control_id = mc.get("control_id", str(mc.get("id", ""))[:8]) return { "id": f"mc-{control_id}", "label": mc.get("title", "")[:80], "passed": passed, "severity": severity, "matched_text": evidence[:100] if passed else "", "level": 2, "parent": None, "skipped": False, "hint": question if not passed else "", "source": "master_control", "criteria_met": f"{criteria_met}/{total_criteria}", } # Keywords shorter than this are too generic to be useful _MIN_KEYWORD_LEN = 4 # Common German stop words to skip _STOP_WORDS = { "oder", "und", "der", "die", "das", "ein", "eine", "einer", "eines", "von", "vom", "zur", "zum", "mit", "auf", "aus", "fuer", "für", "bei", "nach", "ueber", "über", "unter", "nicht", "kein", "keine", "wird", "werden", "kann", "muss", "soll", "ist", "sind", "hat", "dass", "wenn", "ohne", "nur", "auch", "noch", "alle", "alle", "wie", "was", "wer", "den", "dem", "des", "als", "bis", "vor", "sein", "sich", "durch", "damit", "davon", "dazu", "dies", "diese", "dieser", "dieses", "jede", "jeder", "jedes", "andere", "anderen", "solche", "solcher", "welche", "welcher", "etwa", "bereits", "sowie", "soweit", "sofern", "falls", "hierzu", "hierbei", "insbesondere", "beispielsweise", "gegebenenfalls", } def _extract_keywords(criterion: str) -> list[str]: """Extract meaningful keywords from a pass/fail criterion text.""" # Lowercase and clean text = criterion.lower() text = re.sub(r"[()'\"\[\],;:!?]", " ", text) text = re.sub(r"\s+", " ", text).strip() words = text.split() keywords = [] for word in words: # Skip short words and stop words if len(word) < _MIN_KEYWORD_LEN: continue if word in _STOP_WORDS: continue # Skip pure numbers if word.isdigit(): continue keywords.append(word) # Also extract compound terms (2-word bigrams) for specificity for i in range(len(words) - 1): bigram = f"{words[i]} {words[i+1]}" if len(bigram) >= 8 and words[i] not in _STOP_WORDS and words[i+1] not in _STOP_WORDS: keywords.append(bigram) return keywords[:15] # Cap at 15 keywords per criterion # Map doc_type aliases _DOC_TYPE_MAP = { "dse": "dse", "datenschutz": "dse", "privacy": "dse", "cookie": "cookie", "impressum": "impressum", "imprint": "impressum", "widerruf": "widerruf", "withdrawal": "widerruf", "agb": "agb", "terms": "agb", "dsfa": "dsfa", "social_media": "dse", "avv": "avv", "loeschkonzept": "loeschkonzept", } def _map_doc_type(doc_type: str) -> str: return _DOC_TYPE_MAP.get(doc_type, doc_type) async def _load_controls(doc_type: str, db_url: str, limit: int) -> list[dict]: """Load all doc_check_controls for a doc_type from PostgreSQL.""" try: import asyncpg db = db_url or os.getenv( "DATABASE_URL", "postgresql://breakpilot:breakpilot@bp-core-postgres:5432/breakpilot", ) conn = await asyncpg.connect(db) except Exception as e: logger.warning("DB connection failed: %s", e) return [] try: query = """SELECT id, control_id, title, regulation, check_question, pass_criteria, fail_criteria, severity FROM compliance.doc_check_controls WHERE doc_type = $1 ORDER BY severity DESC, title""" if limit > 0: query += f" LIMIT {limit}" rows = await conn.fetch(query, doc_type) return [dict(r) for r in rows] except Exception as e: logger.warning("MC query failed: %s", e) return [] finally: await conn.close() async def _enrich_fails_with_llm( doc_text: str, failed_results: list[dict], doc_title: str, ) -> None: """Enrich failed MC results with LLM-generated context-specific advice. Does NOT change pass/fail (deterministic result stays). Only adds a richer 'hint' with concrete recommendations based on the actual document content. Uses ONE batched LLM call for up to 10 top-severity FAILs. """ # Select top failures by severity (max 10 to fit context window) sev_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} top_fails = sorted( failed_results, key=lambda r: sev_order.get(r.get("severity", "MEDIUM"), 2), )[:10] fail_list = "\n".join( f"{i+1}. [{r['severity']}] {r['label']} — {r.get('hint', '')[:100]}" for i, r in enumerate(top_fails) ) # Truncate document for context excerpt = doc_text[:4000] if len(doc_text) > 5000 else doc_text prompt = ( "/no_think\n" f"Du bist ein Datenschutz-Experte. Analysiere das Dokument '{doc_title}' " f"und gib fuer JEDEN der folgenden fehlgeschlagenen Pruefpunkte eine " f"konkrete, umsetzbare Empfehlung (1-2 Saetze).\n\n" f"Beruecksichtige dabei den Inhalt des Dokuments — welche Dienste werden " f"genutzt? Welche Rechtsgrundlagen sind genannt? Was fehlt konkret?\n\n" f"FEHLGESCHLAGENE PRUEFPUNKTE:\n{fail_list}\n\n" f"DOKUMENT (Auszug):\n{excerpt[:3000]}\n\n" f"Antworte als JSON-Array: [\n" f' {{"nr": 1, "empfehlung": "Konkreter Hinweis..."}},\n' f' {{"nr": 2, "empfehlung": "..."}}\n' f"]\n" f"Nur die Empfehlungen, kein anderer Text." ) try: async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post(f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, "options": {"temperature": 0.3, "num_predict": 1500}, }) if resp.status_code != 200: return raw = resp.json().get("response", "") raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() # Parse JSON array import json arr_match = re.search(r"\[[\s\S]*\]", raw) if not arr_match: return try: recommendations = json.loads(arr_match.group()) except json.JSONDecodeError: return # Enrich the failed results with LLM recommendations for rec in recommendations: nr = rec.get("nr", 0) advice = rec.get("empfehlung", "") if 1 <= nr <= len(top_fails) and advice: existing_hint = top_fails[nr - 1].get("hint", "") # Append LLM advice after the deterministic hint top_fails[nr - 1]["hint"] = ( f"{existing_hint}\n\n" f"Empfehlung: {advice}" ).strip() if existing_hint else advice logger.info("LLM enriched %d/%d fails for '%s'", len(recommendations), len(top_fails), doc_title) except Exception as e: logger.warning("LLM enrichment failed: %s", e)