feat: RAG-based document verification against 144K Control Library

New module: rag_document_checker.py
- Searches RAG (Qdrant) for controls relevant to document type
- Filters by regulation (DSGVO Art.13, TDDDG §25, BGB §355 etc.)
- LLM (Qwen 3.5:35b) verifies each control against document text
- Returns fulfilled/missing with evidence text + severity
- Supports: DSI, Cookie, Impressum, Widerruf, AGB, DSFA, AVV, Loeschkonzept

Integration in doc-check endpoint:
- Regex checklist runs first (fast, deterministic)
- RAG checks run after (semantic, catches what regex misses)
- Both results combined in single response

LLM prompt returns JSON: {fulfilled, evidence, issue, severity}
Think-tags stripped, JSON extracted from response.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-06 13:19:15 +02:00
parent 13c5880f51
commit 090da0f71b
2 changed files with 196 additions and 0 deletions
@@ -198,6 +198,24 @@ async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
# Main document check (full text against primary type)
main_result = _run_checklist(doc_text, entry.doc_type, entry.label, entry.url, word_count)
# RAG-based deep check (semantic verification against Control Library)
try:
from compliance.services.rag_document_checker import check_document_with_rag
rag_checks = await check_document_with_rag(
doc_text, entry.doc_type, entry.label, entry.url,
)
if rag_checks:
for rc in rag_checks:
main_result.checks.append(CheckItem(
id=rc["id"], label=rc["label"], passed=rc["passed"],
severity=rc["severity"], matched_text=rc.get("matched_text", ""),
))
if not rc["passed"]:
main_result.findings_count += 1
except Exception as e:
logger.warning("RAG check failed for %s: %s", entry.label, e)
all_results.append(main_result)
# Sub-section checks (auto-detected from headings)
@@ -0,0 +1,178 @@
"""
RAG-based Document Checker — semantic verification against Control Library.
Instead of fixed regex patterns, this uses:
1. RAG search to find relevant controls for a document type
2. LLM (Qwen 3.5:35b) to verify if each control is fulfilled
3. Template Generator for corrections when controls are not met
Flow:
Document text + type
→ Filter controls by regulation (144K → ~500)
→ Semantic search for relevant controls (500 → 10-15)
→ LLM checks each control against text
→ Returns fulfilled/missing + evidence + correction
"""
import logging
import os
import re
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b")
SDK_URL = os.getenv("SDK_URL", "http://ai-compliance-sdk:8090")
# Document type → Regulation keywords for RAG filtering
DOC_TYPE_REGULATIONS = {
"dse": ["DSGVO Art. 13", "DSGVO Art. 14", "Datenschutzinformation", "Informationspflicht"],
"cookie": ["TDDDG §25", "ePrivacy", "Cookie", "Einwilligung Cookie"],
"impressum": ["TMG §5", "MStV §18", "Impressum", "Anbieterkennzeichnung"],
"widerruf": ["BGB §355", "BGB §312g", "Widerrufsrecht", "Widerrufsbelehrung"],
"agb": ["BGB §305", "BGB §307", "BGB §309", "AGB", "Allgemeine Geschaeftsbedingungen"],
"dsfa": ["DSGVO Art. 35", "Datenschutz-Folgenabschaetzung", "DSFA", "Risikoanalyse"],
"avv": ["DSGVO Art. 28", "Auftragsverarbeitung", "AVV"],
"loeschkonzept": ["DSGVO Art. 5", "DIN 66398", "Loeschkonzept", "Aufbewahrungsfrist"],
}
async def check_document_with_rag(
text: str,
doc_type: str,
doc_title: str,
doc_url: str,
max_controls: int = 10,
) -> list[dict]:
"""Check document against relevant controls from RAG + LLM verification.
Returns list of check results with:
- id, label, passed, severity, matched_text, control_text, correction
"""
if not text or len(text) < 100:
return []
# Step 1: Find relevant controls via RAG
regulations = DOC_TYPE_REGULATIONS.get(doc_type, DOC_TYPE_REGULATIONS["dse"])
controls = await _search_relevant_controls(text[:2000], regulations, max_controls)
if not controls:
logger.info("No RAG controls found for %s (%s)", doc_title, doc_type)
return []
logger.info("Found %d relevant controls for '%s' (%s)", len(controls), doc_title, doc_type)
# Step 2: LLM verification for each control
results = []
for control in controls:
check_result = await _verify_control_with_llm(text, control, doc_title)
if check_result:
results.append(check_result)
return results
async def _search_relevant_controls(
text_excerpt: str,
regulations: list[str],
top_k: int = 10,
) -> list[dict]:
"""Search RAG for controls relevant to this document."""
try:
# Use the first regulation as primary query, rest as context
query = f"{regulations[0]} Anforderungen Pflichtangaben"
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(f"{SDK_URL}/sdk/v1/rag/search", json={
"query": query,
"collection": "bp_compliance_datenschutz",
"top_k": top_k,
})
if resp.status_code != 200:
logger.warning("RAG search returned %d", resp.status_code)
return []
data = resp.json()
controls = []
for r in data.get("results", []):
controls.append({
"text": r.get("text", ""),
"regulation": r.get("regulation_code", "") or r.get("regulation_short", ""),
"article": r.get("article", ""),
"score": r.get("score", 0.0),
})
return controls
except Exception as e:
logger.warning("RAG control search failed: %s", e)
return []
async def _verify_control_with_llm(
document_text: str,
control: dict,
doc_title: str,
) -> Optional[dict]:
"""Ask LLM if a specific control requirement is fulfilled in the document."""
control_text = control["text"]
regulation = control.get("regulation", "")
# Truncate document for LLM context (keep first + last portion)
if len(document_text) > 8000:
doc_excerpt = document_text[:5000] + "\n...\n" + document_text[-3000:]
else:
doc_excerpt = document_text
prompt = (
f"Pruefe ob der folgende Dokumenttext die Anforderung erfuellt.\n\n"
f"ANFORDERUNG ({regulation}):\n{control_text[:500]}\n\n"
f"DOKUMENTTEXT:\n{doc_excerpt}\n\n"
f"Antworte NUR mit JSON (kein anderer Text):\n"
f'{{"fulfilled": true/false, "evidence": "gefundene Textstelle (max 100 Zeichen)", '
f'"issue": "was fehlt oder falsch ist (leer wenn fulfilled)", '
f'"severity": "HIGH/MEDIUM/LOW"}}'
)
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False,
})
if resp.status_code != 200:
return None
raw = resp.json().get("response", "").strip()
# Strip think tags if present
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
# Parse JSON response
import json
# Find JSON in response
json_match = re.search(r"\{[^{}]+\}", raw)
if not json_match:
return None
result = json.loads(json_match.group())
return {
"id": f"rag-{hash(control_text) % 10000}",
"label": f"{regulation}: {control_text[:80]}...",
"passed": result.get("fulfilled", False),
"severity": result.get("severity", "MEDIUM"),
"matched_text": result.get("evidence", ""),
"issue": result.get("issue", ""),
"control_text": control_text[:200],
"regulation": regulation,
}
except Exception as e:
logger.warning("LLM verification failed: %s", e)
return None