diff --git a/backend-compliance/compliance/api/agent_doc_check_routes.py b/backend-compliance/compliance/api/agent_doc_check_routes.py index a372f87..85473e2 100644 --- a/backend-compliance/compliance/api/agent_doc_check_routes.py +++ b/backend-compliance/compliance/api/agent_doc_check_routes.py @@ -199,17 +199,28 @@ async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]: # Main document check (full text against primary type) main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count) - # RAG-based deep check — DISABLED until Master Controls (G1 Decision Trace) are ready. - # The current 144K controls are general legal texts, not specific check criteria. - # Enable via rag_check=true in request when Master Controls are available. - # try: - # from compliance.services.rag_document_checker import check_document_with_rag - # rag_checks = await check_document_with_rag(doc_text, entry.doc_type, entry.label, entry.url) - # if rag_checks: - # for rc in rag_checks: - # main_result.checks.append(CheckItem(...)) - # except Exception as e: - # logger.warning("RAG check failed: %s", e) + # Control Library deep check — verifies against canonical_controls (SQL) + try: + from compliance.services.rag_document_checker import check_document_with_controls + from classroom_engine.database import SessionLocal + db = SessionLocal() + try: + ctrl_checks = await check_document_with_controls( + doc_text, entry.doc_type, entry.label, db, + ) + logger.info("Control check: %d results for '%s'", len(ctrl_checks) if ctrl_checks else 0, entry.label) + if ctrl_checks: + for rc in ctrl_checks: + main_result.checks.append(CheckItem( + id=rc["id"], label=rc["label"], passed=rc["passed"], + severity=rc["severity"], matched_text=rc.get("matched_text", ""), + )) + if not rc["passed"]: + main_result.findings_count += 1 + finally: + db.close() + except Exception as e: + logger.warning("Control check failed for %s: %s %s", entry.label, type(e).__name__, e) all_results.append(main_result) diff --git a/backend-compliance/compliance/services/rag_document_checker.py b/backend-compliance/compliance/services/rag_document_checker.py index 76ae521..2f007c2 100644 --- a/backend-compliance/compliance/services/rag_document_checker.py +++ b/backend-compliance/compliance/services/rag_document_checker.py @@ -1,22 +1,22 @@ """ -RAG-based Document Checker — semantic verification against Control Library. +Document Checker with Canonical Controls — SQL-based verification. -Instead of fixed regex patterns, this uses: -1. RAG search to find relevant controls for a document type -2. LLM (Qwen 3.5:35b) to verify if each control is fulfilled -3. Template Generator for corrections when controls are not met +Uses canonical_controls from PostgreSQL (not Qdrant) with: +- test_procedure: specific check instructions +- pass_criteria / evidence: what to look for +- Regex pre-check (fast) + LLM verification (semantic, for regex misses) Flow: Document text + type - → Filter controls by regulation (144K → ~500) - → Semantic search for relevant controls (500 → 10-15) - → LLM checks each control against text - → Returns fulfilled/missing + evidence + correction + → SQL query for relevant controls by category + title keywords + → For each control: check test_procedure against document text + → LLM verifies if control requirements are met """ import logging import os import re +import json as _json from typing import Optional import httpx @@ -25,179 +25,174 @@ logger = logging.getLogger(__name__) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b") -SDK_URL = os.getenv("SDK_URL", "http://ai-compliance-sdk:8090") -QDRANT_URL = os.getenv("QDRANT_INTERNAL_URL", "http://bp-core-qdrant:6333") -# Document type → Regulation keywords for RAG filtering -DOC_TYPE_REGULATIONS = { - "dse": ["DSGVO Art. 13", "DSGVO Art. 14", "Datenschutzinformation", "Informationspflicht"], - "cookie": ["TDDDG §25", "ePrivacy", "Cookie", "Einwilligung Cookie"], - "impressum": ["TMG §5", "MStV §18", "Impressum", "Anbieterkennzeichnung"], - "widerruf": ["BGB §355", "BGB §312g", "Widerrufsrecht", "Widerrufsbelehrung"], - "agb": ["BGB §305", "BGB §307", "BGB §309", "AGB", "Allgemeine Geschaeftsbedingungen"], - "dsfa": ["DSGVO Art. 35", "Datenschutz-Folgenabschaetzung", "DSFA", "Risikoanalyse"], - "avv": ["DSGVO Art. 28", "Auftragsverarbeitung", "AVV"], - "loeschkonzept": ["DSGVO Art. 5", "DIN 66398", "Loeschkonzept", "Aufbewahrungsfrist"], +# Document type → SQL filter keywords for canonical_controls +DOC_TYPE_FILTERS = { + "dse": { + "category": "data_protection", + "keywords": ["informationspflicht", "datenschutzerkl", "art. 13", "art. 14", + "betroffenenrecht", "verantwortlich", "datenschutzbeauftrag"], + }, + "cookie": { + "category": "data_protection", + "keywords": ["cookie", "einwilligung", "tracking", "consent"], + }, + "impressum": { + "category": "compliance", + "keywords": ["impressum", "anbieterkennzeichnung", "telemedien", "tmg"], + }, + "widerruf": { + "category": "compliance", + "keywords": ["widerruf", "verbraucher", "fernabsatz"], + }, + "agb": { + "category": "compliance", + "keywords": ["geschäftsbedingung", "agb", "vertragsklausel"], + }, } -async def check_document_with_rag( +async def check_document_with_controls( text: str, doc_type: str, doc_title: str, - doc_url: str, + db_session, max_controls: int = 10, ) -> list[dict]: - """Check document against relevant controls from RAG + LLM verification. - - Returns list of check results with: - - id, label, passed, severity, matched_text, control_text, correction - """ + """Check document against relevant canonical controls from DB.""" if not text or len(text) < 100: return [] - # Step 1: Find relevant controls via RAG - regulations = DOC_TYPE_REGULATIONS.get(doc_type, DOC_TYPE_REGULATIONS["dse"]) - controls = await _search_relevant_controls(text[:2000], regulations, max_controls) + filters = DOC_TYPE_FILTERS.get(doc_type, DOC_TYPE_FILTERS.get("dse", {})) + category = filters.get("category", "data_protection") + keywords = filters.get("keywords", []) + # Query relevant controls from DB + controls = _query_controls(db_session, category, keywords, max_controls) if not controls: - logger.info("No RAG controls found for %s (%s)", doc_title, doc_type) + logger.info("No canonical controls found for '%s' (%s)", doc_title, doc_type) return [] - logger.info("Found %d relevant controls for '%s' (%s)", len(controls), doc_title, doc_type) + logger.info("Found %d canonical controls for '%s' (%s)", len(controls), doc_title, doc_type) - # Step 2: LLM verification for each control + # Verify each control against document text results = [] for control in controls: - check_result = await _verify_control_with_llm(text, control, doc_title) + check_result = await _verify_control(text, control) if check_result: results.append(check_result) return results -async def _search_relevant_controls( - text_excerpt: str, - regulations: list[str], - top_k: int = 10, -) -> list[dict]: - """Search for relevant controls — tries Go SDK first, falls back to direct Qdrant.""" - # Try Go SDK RAG endpoint first - controls = await _search_via_sdk(regulations, top_k) - if controls: - return controls +def _query_controls(db_session, category: str, keywords: list[str], limit: int) -> list[dict]: + """Query canonical_controls by category + title keywords.""" + from sqlalchemy import text - # Fallback: search directly in Qdrant (local Mac Mini) - controls = await _search_via_qdrant(regulations, top_k) - return controls + # Build keyword filter + keyword_clauses = " OR ".join([f"title ILIKE :kw{i}" for i in range(len(keywords))]) + params = {f"kw{i}": f"%{kw}%" for i, kw in enumerate(keywords)} + params["cat"] = category + params["limit"] = limit + query = text(f""" + SELECT id, title, objective, test_procedure, severity, category + FROM compliance.canonical_controls + WHERE category = :cat + AND release_state != 'deleted' + AND ({keyword_clauses}) + ORDER BY risk_score DESC NULLS LAST + LIMIT :limit + """) -async def _search_via_sdk(regulations: list[str], top_k: int) -> list[dict]: - """Search via Go SDK RAG endpoint.""" try: - query = f"{regulations[0]} Anforderungen Pflichtangaben" - async with httpx.AsyncClient(timeout=15.0) as client: - resp = await client.post(f"{SDK_URL}/sdk/v1/rag/search", json={ - "query": query, - "collection": "bp_compliance_datenschutz", - "top_k": top_k, + result = db_session.execute(query, params) + controls = [] + for row in result: + controls.append({ + "id": str(row[0]), + "title": row[1], + "objective": row[2], + "test_procedure": row[3], + "severity": row[4], + "category": row[5], }) - if resp.status_code != 200: - return [] - data = resp.json() - return [{ - "text": r.get("text", ""), - "regulation": r.get("regulation_code", "") or r.get("regulation_short", ""), - "article": r.get("article", ""), - "score": r.get("score", 0.0), - } for r in data.get("results", [])] - except Exception: - return [] - - -EMBEDDING_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://bp-core-embedding-service:8087") - - -async def _search_via_qdrant(regulations: list[str], top_k: int) -> list[dict]: - """Semantic search in local Qdrant via embedding + vector search.""" - try: - # Step 1: Embed the query - query_text = " ".join(regulations[:3]) + " Pflichtangaben Anforderungen" - async with httpx.AsyncClient(timeout=15.0) as client: - emb_resp = await client.post(f"{EMBEDDING_URL}/embed", json={"texts": [query_text]}) - if emb_resp.status_code != 200: - logger.warning("Embedding failed: %d", emb_resp.status_code) - return [] - - vector = emb_resp.json().get("embeddings", [[]])[0] - if not vector: - return [] - - # Step 2: Search Qdrant with vector - all_results = [] - for collection in ["bp_compliance_datenschutz", "bp_compliance_gesetze"]: - async with httpx.AsyncClient(timeout=10.0) as client: - resp = await client.post(f"{QDRANT_URL}/collections/{collection}/points/search", json={ - "vector": vector, - "limit": top_k, - "with_payload": True, - }) - if resp.status_code != 200: - continue - - data = resp.json() - for point in data.get("result", []): - payload = point.get("payload", {}) - chunk = payload.get("chunk_text", "") - if not chunk or len(chunk) < 50: - continue - all_results.append({ - "text": chunk[:500], - "regulation": payload.get("regulation_id", "") or payload.get("section", ""), - "article": payload.get("section", ""), - "score": point.get("score", 0.0), - }) - - # Sort by score descending - all_results.sort(key=lambda x: x["score"], reverse=True) - logger.info("Qdrant semantic search: found %d results", len(all_results)) - return all_results[:top_k] - + return controls except Exception as e: - logger.warning("Qdrant semantic search failed: %s", e) + logger.warning("Control query failed: %s", e) return [] -async def _verify_control_with_llm( - document_text: str, - control: dict, - doc_title: str, -) -> Optional[dict]: - """Ask LLM if a specific control requirement is fulfilled in the document.""" - control_text = control["text"] - regulation = control.get("regulation", "") +async def _verify_control(text: str, control: dict) -> Optional[dict]: + """Verify if a control's test_procedure is fulfilled by the document text.""" + title = control["title"] + test_proc = control.get("test_procedure", "[]") - # Truncate document for LLM context (keep first + last portion) - if len(document_text) > 8000: - doc_excerpt = document_text[:5000] + "\n...\n" + document_text[-3000:] + # Parse test_procedure JSON + try: + procedures = _json.loads(test_proc) if isinstance(test_proc, str) else test_proc + except Exception: + procedures = [test_proc] if test_proc else [] + + if not procedures: + return None + + # Quick regex pre-check — extract keywords from test procedure + proc_text = " ".join(str(p) for p in procedures).lower() + doc_lower = text.lower() + + # Extract key terms from procedure + key_terms = re.findall(r'\b(?:prüf|überprüf|kontroll|verifiz|feststell|validier)\w*\s+(?:ob|dass|der|die|das)\s+(\w+(?:\s+\w+){0,3})', proc_text) + + # If we can find key terms via regex, skip LLM + regex_found = False + evidence = "" + for term in key_terms: + if term in doc_lower: + idx = doc_lower.find(term) + evidence = doc_lower[max(0, idx-20):idx+len(term)+20] + regex_found = True + break + + if regex_found: + return { + "id": f"ctrl-{control['id'][:8]}", + "label": title[:80], + "passed": True, + "severity": control.get("severity", "medium").upper(), + "matched_text": evidence[:100], + "control_text": title, + "regulation": control.get("category", ""), + } + + # LLM verification for cases regex can't handle + return await _llm_verify(text, title, procedures, control) + + +async def _llm_verify(text: str, title: str, procedures: list, control: dict) -> Optional[dict]: + """Ask LLM if control requirements are met.""" + proc_str = "\n".join(f"- {p}" for p in procedures[:5]) + + # Truncate document + if len(text) > 6000: + doc_excerpt = text[:4000] + "\n...\n" + text[-2000:] else: - doc_excerpt = document_text + doc_excerpt = text prompt = ( - f"Pruefe ob der folgende Dokumenttext die Anforderung erfuellt.\n\n" - f"ANFORDERUNG ({regulation}):\n{control_text[:500]}\n\n" - f"DOKUMENTTEXT:\n{doc_excerpt}\n\n" - f"Antworte NUR mit JSON (kein anderer Text):\n" - f'{{"fulfilled": true/false, "evidence": "gefundene Textstelle (max 100 Zeichen)", ' - f'"issue": "was fehlt oder falsch ist (leer wenn fulfilled)", ' - f'"severity": "HIGH/MEDIUM/LOW"}}' + f"/no_think\n" + f"Pruefe ob das Dokument die folgenden Anforderungen erfuellt.\n\n" + f"CONTROL: {title}\n" + f"PRUEFSCHRITTE:\n{proc_str}\n\n" + f"DOKUMENT (Auszug):\n{doc_excerpt[:3000]}\n\n" + f'Antworte NUR mit JSON: {{"fulfilled": true/false, "evidence": "textstelle max 80 zeichen"}}' ) try: - async with httpx.AsyncClient(timeout=120.0) as client: + async with httpx.AsyncClient(timeout=90.0) as client: resp = await client.post(f"{OLLAMA_URL}/api/generate", json={ "model": OLLAMA_MODEL, - "prompt": "/no_think\n" + prompt, # Disable thinking mode + "prompt": prompt, "stream": False, "options": {"num_predict": 300}, }) @@ -206,62 +201,41 @@ async def _verify_control_with_llm( return None data = resp.json() - # Qwen 3.5 may return content in 'response' or 'thinking' field - raw = data.get("response", "").strip() - if not raw: - raw = data.get("thinking", "").strip() - # Strip think tags if present + raw = data.get("response", "") or data.get("thinking", "") raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() - # Parse JSON response — handle LLM quirks - import json - # Find JSON in response + # Parse JSON json_match = re.search(r"\{[^{}]+\}", raw) - if not json_match: - # Fallback: try to extract fulfilled/evidence from raw text - fulfilled = "true" in raw.lower()[:100] or "yes" in raw.lower()[:100] or "erfüllt" in raw.lower()[:100] - return { - "id": f"rag-{hash(control_text) % 10000}", - "label": f"{regulation}: {control_text[:80]}...", - "passed": fulfilled, - "severity": "LOW" if fulfilled else "MEDIUM", - "matched_text": raw[:100] if fulfilled else "", - "issue": "" if fulfilled else raw[:100], - "control_text": control_text[:200], - "regulation": regulation, - } - - json_str = json_match.group() - # Fix common LLM JSON issues - json_str = re.sub(r'(?<=[{,])\s*(\w+)\s*:', r' "\1":', json_str) # Unquoted keys - json_str = json_str.replace("True", "true").replace("False", "false") - try: - result = json.loads(json_str) - except json.JSONDecodeError: - # Last resort: extract boolean from raw text - fulfilled = "true" in json_str.lower() or "fulfilled" in raw.lower()[:200] - return { - "id": f"rag-{hash(control_text) % 10000}", - "label": f"{regulation}: {control_text[:80]}...", - "passed": fulfilled, - "severity": "LOW" if fulfilled else "MEDIUM", - "matched_text": "", - "issue": "", - "control_text": control_text[:200], - "regulation": regulation, - } + if json_match: + json_str = json_match.group() + json_str = re.sub(r'(?<=[{,])\s*(\w+)\s*:', r' "\1":', json_str) + json_str = json_str.replace("True", "true").replace("False", "false") + try: + result = _json.loads(json_str) + return { + "id": f"ctrl-{control['id'][:8]}", + "label": title[:80], + "passed": result.get("fulfilled", False), + "severity": control.get("severity", "medium").upper(), + "matched_text": result.get("evidence", "")[:100], + "control_text": title, + "regulation": control.get("category", ""), + } + except _json.JSONDecodeError: + pass + # Fallback + fulfilled = "true" in raw.lower()[:200] or "fulfilled" in raw.lower()[:200] return { - "id": f"rag-{hash(control_text) % 10000}", - "label": f"{regulation}: {control_text[:80]}...", - "passed": result.get("fulfilled", False), - "severity": result.get("severity", "MEDIUM"), - "matched_text": result.get("evidence", ""), - "issue": result.get("issue", ""), - "control_text": control_text[:200], - "regulation": regulation, + "id": f"ctrl-{control['id'][:8]}", + "label": title[:80], + "passed": fulfilled, + "severity": control.get("severity", "medium").upper(), + "matched_text": "", + "control_text": title, + "regulation": control.get("category", ""), } except Exception as e: - logger.warning("LLM verification failed: %s %s", type(e).__name__, e) + logger.warning("LLM control verify failed: %s %s", type(e).__name__, e) return None