diff --git a/backend-compliance/compliance/api/agent_doc_check_routes.py b/backend-compliance/compliance/api/agent_doc_check_routes.py index 2adb83d..c503df6 100644 --- a/backend-compliance/compliance/api/agent_doc_check_routes.py +++ b/backend-compliance/compliance/api/agent_doc_check_routes.py @@ -283,10 +283,23 @@ async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]: # Main document check (full text against primary type) main_result = await _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count) - # Control Library deep check — DISABLED until doc-check-specific - # Master Controls with binary pass/fail criteria are available. - # See: zeroclaw/INSTRUCTION-master-controls-for-doc-check.md - # Code: compliance/services/rag_document_checker.py (ready to re-enable) + # Master Control deep check — 1.874 doc_check_controls with + # binary pass/fail criteria verified by LLM (Qwen) + try: + from compliance.services.rag_document_checker import check_document_with_controls + mc_results = await check_document_with_controls( + doc_text, entry.doc_type, entry.label, max_controls=15, + ) + if mc_results: + # Add MC results as additional checks to the main result + for mc in mc_results: + main_result.checks.append(CheckItem(**mc)) + # Recompute correctness with MC results + l2 = [c for c in main_result.checks if c.level == 2 and not c.skipped] + l2_passed = sum(1 for c in l2 if c.passed) + main_result.correctness_pct = round(l2_passed / len(l2) * 100) if l2 else 0 + except Exception as e: + logger.warning("MC check skipped: %s", e) all_results.append(main_result) diff --git a/backend-compliance/compliance/services/rag_document_checker.py b/backend-compliance/compliance/services/rag_document_checker.py index 05b50a9..0b0312f 100644 --- a/backend-compliance/compliance/services/rag_document_checker.py +++ b/backend-compliance/compliance/services/rag_document_checker.py @@ -1,16 +1,17 @@ """ -Document Checker with Canonical Controls — SQL-based verification. +Document Checker with Master Controls — SQL-based deep verification. -Uses canonical_controls from PostgreSQL (not Qdrant) with: -- test_procedure: specific check instructions -- pass_criteria / evidence: what to look for -- Regex pre-check (fast) + LLM verification (semantic, for regex misses) +Uses doc_check_controls from PostgreSQL with: +- check_question: binary YES/NO question +- pass_criteria: JSONB list of concrete must-haves +- fail_criteria: JSONB list of common mistakes +- LLM (Qwen) verifies each control against document text Flow: - Document text + type - → SQL query for relevant controls by category + title keywords - → For each control: check test_procedure against document text - → LLM verifies if control requirements are met + Document text + doc_type + → SQL query: SELECT * FROM compliance.doc_check_controls WHERE doc_type = ? + → For each control: LLM answers check_question with pass/fail criteria + → Returns structured results compatible with CheckItem format """ import logging @@ -26,33 +27,22 @@ logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b") -# Document type → SQL filter keywords for canonical_controls -DOC_TYPE_FILTERS = { - "dse": { - "category": "data_protection", - "keywords": ["informationspflicht"], - "test_proc_must_contain": ["datenschutzerkl", "informationspflicht", "art. 13", "art. 14"], - }, - "cookie": { - "category": "data_protection", - "keywords": ["cookie", "einwilligung"], - "test_proc_must_contain": ["cookie", "einwilligung", "consent", "banner"], - }, - "impressum": { - "category": "compliance", - "keywords": ["impressum", "anbieterkennzeichnung"], - "test_proc_must_contain": ["impressum", "anbieterkennzeichnung"], - }, - "widerruf": { - "category": "compliance", - "keywords": ["widerruf", "verbraucher"], - "test_proc_must_contain": ["widerruf", "fernabsatz"], - }, - "agb": { - "category": "compliance", - "keywords": ["geschäftsbedingung", "agb"], - "test_proc_must_contain": ["geschäftsbedingung", "agb", "vertragsbedingung"], - }, +# Map our doc_types to the DB doc_type values +DOC_TYPE_MAP = { + "dse": "dse", + "datenschutz": "dse", + "privacy": "dse", + "cookie": "cookie", + "impressum": "impressum", + "imprint": "impressum", + "widerruf": "widerruf", + "withdrawal": "widerruf", + "agb": "agb", + "terms": "agb", + "dsfa": "dsfa", + "social_media": "dse", + "avv": "avv", + "loeschkonzept": "loeschkonzept", } @@ -60,199 +50,152 @@ async def check_document_with_controls( text: str, doc_type: str, doc_title: str, - db_session, - max_controls: int = 10, + db_url: str = "", + max_controls: int = 20, ) -> list[dict]: - """Check document against relevant canonical controls from DB.""" + """Check document against doc_check_controls from DB. + + Returns list of CheckItem-compatible dicts. + """ if not text or len(text) < 100: return [] - filters = DOC_TYPE_FILTERS.get(doc_type, DOC_TYPE_FILTERS.get("dse", {})) - category = filters.get("category", "data_protection") - keywords = filters.get("keywords", []) + mapped_type = DOC_TYPE_MAP.get(doc_type, doc_type) - # Query relevant controls from DB - test_proc_kw = filters.get("test_proc_must_contain") - controls = _query_controls(db_session, category, keywords, max_controls, test_proc_kw) - if not controls: - logger.info("No canonical controls found for '%s' (%s)", doc_title, doc_type) + try: + import asyncpg + db = db_url or os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot@bp-core-postgres:5432/breakpilot", + ) + conn = await asyncpg.connect(db) + except Exception as e: + logger.warning("DB connection failed: %s", e) return [] - logger.info("Found %d canonical controls for '%s' (%s)", len(controls), doc_title, doc_type) + try: + rows = await conn.fetch( + """SELECT id, control_id, title, regulation, check_question, + pass_criteria, fail_criteria, severity + FROM compliance.doc_check_controls + WHERE doc_type = $1 + ORDER BY severity DESC, title + LIMIT $2""", + mapped_type, max_controls, + ) + except Exception as e: + logger.warning("MC query failed: %s", e) + await conn.close() + return [] + + await conn.close() + + if not rows: + logger.info("No MCs for doc_type '%s' (%s)", mapped_type, doc_title) + return [] + + logger.info("Checking %d MCs for '%s' (%s)", len(rows), doc_title, mapped_type) - # Verify each control against document text results = [] - for control in controls: - check_result = await _verify_control(text, control) - if check_result: - results.append(check_result) + for row in rows: + result = await _verify_mc(text, dict(row)) + if result: + results.append(result) + passed = sum(1 for r in results if r["passed"]) + logger.info("MC results: %d/%d passed for '%s'", passed, len(results), doc_title) return results -def _query_controls(db_session, category: str, keywords: list[str], limit: int, - test_proc_keywords: list[str] | None = None) -> list[dict]: - """Query canonical_controls by category + title + test_procedure keywords.""" - from sqlalchemy import text - - # Build keyword filter for title - keyword_clauses = " OR ".join([f"title ILIKE :kw{i}" for i in range(len(keywords))]) - params = {f"kw{i}": f"%{kw}%" for i, kw in enumerate(keywords)} - - # Build test_procedure filter (ensures controls are relevant to document type) - proc_filter = "" - if test_proc_keywords: - proc_clauses = " OR ".join([f"test_procedure::text ILIKE :tp{i}" for i in range(len(test_proc_keywords))]) - for i, tp in enumerate(test_proc_keywords): - params[f"tp{i}"] = f"%{tp}%" - proc_filter = f"AND ({proc_clauses})" - - params["cat"] = category - params["limit"] = limit - - query = text(f""" - SELECT id, title, objective, test_procedure, severity, category - FROM compliance.canonical_controls - WHERE category = :cat - AND release_state != 'deleted' - AND ({keyword_clauses}) - {proc_filter} - AND test_procedure::text != '[]' - ORDER BY risk_score DESC NULLS LAST - LIMIT :limit - """) - - try: - result = db_session.execute(query, params) - controls = [] - for row in result: - controls.append({ - "id": str(row[0]), - "title": row[1], - "objective": row[2], - "test_procedure": row[3], - "severity": row[4], - "category": row[5], - }) - return controls - except Exception as e: - logger.warning("Control query failed: %s", e) - return [] - - -async def _verify_control(text: str, control: dict) -> Optional[dict]: - """Verify if a control's test_procedure is fulfilled by the document text.""" - title = control["title"] - test_proc = control.get("test_procedure", "[]") - - # Parse test_procedure JSON - try: - procedures = _json.loads(test_proc) if isinstance(test_proc, str) else test_proc - except Exception: - procedures = [test_proc] if test_proc else [] - - if not procedures: +async def _verify_mc(text: str, mc: dict) -> Optional[dict]: + """Verify one master control against document text via LLM.""" + question = mc.get("check_question", "") + if not question: return None - # Quick regex pre-check — extract keywords from test procedure - proc_text = " ".join(str(p) for p in procedures).lower() - doc_lower = text.lower() + pass_crit = mc.get("pass_criteria", []) + fail_crit = mc.get("fail_criteria", []) - # Extract key terms from procedure - key_terms = re.findall(r'\b(?:prüf|überprüf|kontroll|verifiz|feststell|validier)\w*\s+(?:ob|dass|der|die|das)\s+(\w+(?:\s+\w+){0,3})', proc_text) + # Parse JSON if string + if isinstance(pass_crit, str): + try: + pass_crit = _json.loads(pass_crit) + except Exception: + pass_crit = [pass_crit] + if isinstance(fail_crit, str): + try: + fail_crit = _json.loads(fail_crit) + except Exception: + fail_crit = [fail_crit] - # If we can find key terms via regex, skip LLM - regex_found = False - evidence = "" - for term in key_terms: - if term in doc_lower: - idx = doc_lower.find(term) - evidence = doc_lower[max(0, idx-20):idx+len(term)+20] - regex_found = True - break + pass_str = "\n".join(f" - {p}" for p in pass_crit[:5]) + fail_str = "\n".join(f" - {f}" for f in fail_crit[:5]) - if regex_found: - return { - "id": f"ctrl-{control['id'][:8]}", - "label": title[:80], - "passed": True, - "severity": control.get("severity", "medium").upper(), - "matched_text": evidence[:100], - "control_text": title, - "regulation": control.get("category", ""), - } - - # LLM verification for cases regex can't handle - return await _llm_verify(text, title, procedures, control) - - -async def _llm_verify(text: str, title: str, procedures: list, control: dict) -> Optional[dict]: - """Ask LLM if control requirements are met.""" - proc_str = "\n".join(f"- {p}" for p in procedures[:5]) - - # Truncate document - if len(text) > 6000: - doc_excerpt = text[:4000] + "\n...\n" + text[-2000:] - else: - doc_excerpt = text + # Truncate text + doc_excerpt = text[:6000] if len(text) <= 8000 else text[:4000] + "\n...\n" + text[-2000:] prompt = ( f"/no_think\n" - f"Pruefe ob das Dokument die folgenden Anforderungen erfuellt.\n\n" - f"CONTROL: {title}\n" - f"PRUEFSCHRITTE:\n{proc_str}\n\n" - f"DOKUMENT (Auszug):\n{doc_excerpt[:3000]}\n\n" - f'Antworte NUR mit JSON: {{"fulfilled": true/false, "evidence": "textstelle max 80 zeichen"}}' + f"FRAGE: {question}\n\n" + f"PASS wenn ALLE zutreffen:\n{pass_str}\n\n" + f"FAIL wenn EINES zutrifft:\n{fail_str}\n\n" + f"DOKUMENT:\n{doc_excerpt[:5000]}\n\n" + f'Antworte NUR mit JSON: {{"passed": true/false, "evidence": "Textstelle max 80 Zeichen oder leer"}}' ) try: - async with httpx.AsyncClient(timeout=90.0) as client: + async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, "prompt": prompt, "stream": False, - "options": {"num_predict": 300}, + "options": {"temperature": 0.0, "num_predict": 200}, }) if resp.status_code != 200: return None - data = resp.json() - raw = data.get("response", "") or data.get("thinking", "") + raw = resp.json().get("response", "") raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() # Parse JSON json_match = re.search(r"\{[^{}]+\}", raw) if json_match: json_str = json_match.group() - json_str = re.sub(r'(?<=[{,])\s*(\w+)\s*:', r' "\1":', json_str) + json_str = re.sub(r"(?<=[{,])\s*(\w+)\s*:", r' "\1":', json_str) json_str = json_str.replace("True", "true").replace("False", "false") try: result = _json.loads(json_str) return { - "id": f"ctrl-{control['id'][:8]}", - "label": title[:80], - "passed": result.get("fulfilled", False), - "severity": control.get("severity", "medium").upper(), - "matched_text": result.get("evidence", "")[:100], - "control_text": title, - "regulation": control.get("category", ""), + "id": f"mc-{mc.get('control_id', mc['id'][:8])}", + "label": mc["title"][:80], + "passed": bool(result.get("passed", False)), + "severity": (mc.get("severity") or "MEDIUM").upper(), + "matched_text": str(result.get("evidence", ""))[:100], + "level": 2, + "parent": None, + "skipped": False, + "hint": question, + "source": "master_control", } except _json.JSONDecodeError: pass # Fallback - fulfilled = "true" in raw.lower()[:200] or "fulfilled" in raw.lower()[:200] + passed = '"passed": true' in raw.lower() or '"passed":true' in raw.lower() return { - "id": f"ctrl-{control['id'][:8]}", - "label": title[:80], - "passed": fulfilled, - "severity": control.get("severity", "medium").upper(), + "id": f"mc-{mc.get('control_id', mc['id'][:8])}", + "label": mc["title"][:80], + "passed": passed, + "severity": (mc.get("severity") or "MEDIUM").upper(), "matched_text": "", - "control_text": title, - "regulation": control.get("category", ""), + "level": 2, + "parent": None, + "skipped": False, + "hint": question, + "source": "master_control", } except Exception as e: - logger.warning("LLM control verify failed: %s %s", type(e).__name__, e) + logger.warning("MC verify failed for '%s': %s", mc["title"][:40], e) return None