7b8440191e
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
235 lines
8.6 KiB
Python
235 lines
8.6 KiB
Python
"""
|
|
RAG-based Document Checker — semantic verification against Control Library.
|
|
|
|
Instead of fixed regex patterns, this uses:
|
|
1. RAG search to find relevant controls for a document type
|
|
2. LLM (Qwen 3.5:35b) to verify if each control is fulfilled
|
|
3. Template Generator for corrections when controls are not met
|
|
|
|
Flow:
|
|
Document text + type
|
|
→ Filter controls by regulation (144K → ~500)
|
|
→ Semantic search for relevant controls (500 → 10-15)
|
|
→ LLM checks each control against text
|
|
→ Returns fulfilled/missing + evidence + correction
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
|
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
|
SDK_URL = os.getenv("SDK_URL", "http://ai-compliance-sdk:8090")
|
|
QDRANT_URL = os.getenv("QDRANT_INTERNAL_URL", "http://bp-core-qdrant:6333")
|
|
|
|
# Document type → Regulation keywords for RAG filtering
|
|
DOC_TYPE_REGULATIONS = {
|
|
"dse": ["DSGVO Art. 13", "DSGVO Art. 14", "Datenschutzinformation", "Informationspflicht"],
|
|
"cookie": ["TDDDG §25", "ePrivacy", "Cookie", "Einwilligung Cookie"],
|
|
"impressum": ["TMG §5", "MStV §18", "Impressum", "Anbieterkennzeichnung"],
|
|
"widerruf": ["BGB §355", "BGB §312g", "Widerrufsrecht", "Widerrufsbelehrung"],
|
|
"agb": ["BGB §305", "BGB §307", "BGB §309", "AGB", "Allgemeine Geschaeftsbedingungen"],
|
|
"dsfa": ["DSGVO Art. 35", "Datenschutz-Folgenabschaetzung", "DSFA", "Risikoanalyse"],
|
|
"avv": ["DSGVO Art. 28", "Auftragsverarbeitung", "AVV"],
|
|
"loeschkonzept": ["DSGVO Art. 5", "DIN 66398", "Loeschkonzept", "Aufbewahrungsfrist"],
|
|
}
|
|
|
|
|
|
async def check_document_with_rag(
|
|
text: str,
|
|
doc_type: str,
|
|
doc_title: str,
|
|
doc_url: str,
|
|
max_controls: int = 10,
|
|
) -> list[dict]:
|
|
"""Check document against relevant controls from RAG + LLM verification.
|
|
|
|
Returns list of check results with:
|
|
- id, label, passed, severity, matched_text, control_text, correction
|
|
"""
|
|
if not text or len(text) < 100:
|
|
return []
|
|
|
|
# Step 1: Find relevant controls via RAG
|
|
regulations = DOC_TYPE_REGULATIONS.get(doc_type, DOC_TYPE_REGULATIONS["dse"])
|
|
controls = await _search_relevant_controls(text[:2000], regulations, max_controls)
|
|
|
|
if not controls:
|
|
logger.info("No RAG controls found for %s (%s)", doc_title, doc_type)
|
|
return []
|
|
|
|
logger.info("Found %d relevant controls for '%s' (%s)", len(controls), doc_title, doc_type)
|
|
|
|
# Step 2: LLM verification for each control
|
|
results = []
|
|
for control in controls:
|
|
check_result = await _verify_control_with_llm(text, control, doc_title)
|
|
if check_result:
|
|
results.append(check_result)
|
|
|
|
return results
|
|
|
|
|
|
async def _search_relevant_controls(
|
|
text_excerpt: str,
|
|
regulations: list[str],
|
|
top_k: int = 10,
|
|
) -> list[dict]:
|
|
"""Search for relevant controls — tries Go SDK first, falls back to direct Qdrant."""
|
|
# Try Go SDK RAG endpoint first
|
|
controls = await _search_via_sdk(regulations, top_k)
|
|
if controls:
|
|
return controls
|
|
|
|
# Fallback: search directly in Qdrant (local Mac Mini)
|
|
controls = await _search_via_qdrant(regulations, top_k)
|
|
return controls
|
|
|
|
|
|
async def _search_via_sdk(regulations: list[str], top_k: int) -> list[dict]:
|
|
"""Search via Go SDK RAG endpoint."""
|
|
try:
|
|
query = f"{regulations[0]} Anforderungen Pflichtangaben"
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
resp = await client.post(f"{SDK_URL}/sdk/v1/rag/search", json={
|
|
"query": query,
|
|
"collection": "bp_compliance_datenschutz",
|
|
"top_k": top_k,
|
|
})
|
|
if resp.status_code != 200:
|
|
return []
|
|
data = resp.json()
|
|
return [{
|
|
"text": r.get("text", ""),
|
|
"regulation": r.get("regulation_code", "") or r.get("regulation_short", ""),
|
|
"article": r.get("article", ""),
|
|
"score": r.get("score", 0.0),
|
|
} for r in data.get("results", [])]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
async def _search_via_qdrant(regulations: list[str], top_k: int) -> list[dict]:
|
|
"""Search directly in local Qdrant — scroll with payload filter."""
|
|
try:
|
|
all_results = []
|
|
collections = ["bp_compliance_datenschutz", "bp_compliance_gesetze"]
|
|
|
|
for collection in collections:
|
|
# Scroll through points, filter by section/regulation matching
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.post(f"{QDRANT_URL}/collections/{collection}/points/scroll", json={
|
|
"limit": 100, # Fetch more, filter client-side
|
|
"with_payload": True,
|
|
"with_vector": False,
|
|
})
|
|
if resp.status_code != 200:
|
|
continue
|
|
|
|
data = resp.json()
|
|
for point in data.get("result", {}).get("points", []):
|
|
payload = point.get("payload", {})
|
|
chunk = payload.get("chunk_text", "")
|
|
section = payload.get("section", "")
|
|
category = payload.get("category", "")
|
|
reg_id = payload.get("regulation_id", "")
|
|
section_title = payload.get("section_title", "")
|
|
|
|
if not chunk or len(chunk) < 50:
|
|
continue
|
|
|
|
# Match against regulation keywords
|
|
searchable = f"{section} {category} {reg_id} {section_title} {chunk[:200]}".lower()
|
|
matched = any(
|
|
kw.lower() in searchable
|
|
for r in regulations
|
|
for kw in [r, r.replace("Art. ", "Article "), r.replace("§", "")]
|
|
)
|
|
if matched:
|
|
all_results.append({
|
|
"text": chunk[:500],
|
|
"regulation": reg_id or section or category,
|
|
"article": section,
|
|
"score": 0.5,
|
|
})
|
|
|
|
logger.info("Qdrant direct search: found %d controls from %d collections",
|
|
len(all_results), len(collections))
|
|
return all_results[:top_k]
|
|
|
|
except Exception as e:
|
|
logger.warning("Direct Qdrant search failed: %s", e)
|
|
return []
|
|
|
|
|
|
async def _verify_control_with_llm(
|
|
document_text: str,
|
|
control: dict,
|
|
doc_title: str,
|
|
) -> Optional[dict]:
|
|
"""Ask LLM if a specific control requirement is fulfilled in the document."""
|
|
control_text = control["text"]
|
|
regulation = control.get("regulation", "")
|
|
|
|
# Truncate document for LLM context (keep first + last portion)
|
|
if len(document_text) > 8000:
|
|
doc_excerpt = document_text[:5000] + "\n...\n" + document_text[-3000:]
|
|
else:
|
|
doc_excerpt = document_text
|
|
|
|
prompt = (
|
|
f"Pruefe ob der folgende Dokumenttext die Anforderung erfuellt.\n\n"
|
|
f"ANFORDERUNG ({regulation}):\n{control_text[:500]}\n\n"
|
|
f"DOKUMENTTEXT:\n{doc_excerpt}\n\n"
|
|
f"Antworte NUR mit JSON (kein anderer Text):\n"
|
|
f'{{"fulfilled": true/false, "evidence": "gefundene Textstelle (max 100 Zeichen)", '
|
|
f'"issue": "was fehlt oder falsch ist (leer wenn fulfilled)", '
|
|
f'"severity": "HIGH/MEDIUM/LOW"}}'
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {"num_predict": 200}, # Limit response length
|
|
})
|
|
|
|
if resp.status_code != 200:
|
|
return None
|
|
|
|
raw = resp.json().get("response", "").strip()
|
|
# Strip think tags if present
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
|
|
# Parse JSON response
|
|
import json
|
|
# Find JSON in response
|
|
json_match = re.search(r"\{[^{}]+\}", raw)
|
|
if not json_match:
|
|
return None
|
|
|
|
result = json.loads(json_match.group())
|
|
|
|
return {
|
|
"id": f"rag-{hash(control_text) % 10000}",
|
|
"label": f"{regulation}: {control_text[:80]}...",
|
|
"passed": result.get("fulfilled", False),
|
|
"severity": result.get("severity", "MEDIUM"),
|
|
"matched_text": result.get("evidence", ""),
|
|
"issue": result.get("issue", ""),
|
|
"control_text": control_text[:200],
|
|
"regulation": regulation,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning("LLM verification failed: %s %s", type(e).__name__, e)
|
|
return None
|