26b222d53d
Rewritten rag_document_checker.py to use doc_check_controls table instead of generic canonical_controls. Each MC has: - check_question: binary YES/NO for LLM - pass_criteria: JSONB list of concrete requirements - fail_criteria: JSONB list of common mistakes Flow: Regex checks (fast) → LLM verify FAILs → MC deep check (15 per doc) MC results appear as additional L2 checks in the report. Coverage: 571 DSE, 381 Cookie, 309 Loeschkonzept, 153 Widerruf, 147 DSFA, 125 AVV, 113 AGB, 75 Impressum = 1.874 total. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
202 lines
6.2 KiB
Python
202 lines
6.2 KiB
Python
"""
|
|
Document Checker with Master Controls — SQL-based deep verification.
|
|
|
|
Uses doc_check_controls from PostgreSQL with:
|
|
- check_question: binary YES/NO question
|
|
- pass_criteria: JSONB list of concrete must-haves
|
|
- fail_criteria: JSONB list of common mistakes
|
|
- LLM (Qwen) verifies each control against document text
|
|
|
|
Flow:
|
|
Document text + doc_type
|
|
→ SQL query: SELECT * FROM compliance.doc_check_controls WHERE doc_type = ?
|
|
→ For each control: LLM answers check_question with pass/fail criteria
|
|
→ Returns structured results compatible with CheckItem format
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import json as _json
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
|
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
|
|
|
# Map our doc_types to the DB doc_type values
|
|
DOC_TYPE_MAP = {
|
|
"dse": "dse",
|
|
"datenschutz": "dse",
|
|
"privacy": "dse",
|
|
"cookie": "cookie",
|
|
"impressum": "impressum",
|
|
"imprint": "impressum",
|
|
"widerruf": "widerruf",
|
|
"withdrawal": "widerruf",
|
|
"agb": "agb",
|
|
"terms": "agb",
|
|
"dsfa": "dsfa",
|
|
"social_media": "dse",
|
|
"avv": "avv",
|
|
"loeschkonzept": "loeschkonzept",
|
|
}
|
|
|
|
|
|
async def check_document_with_controls(
|
|
text: str,
|
|
doc_type: str,
|
|
doc_title: str,
|
|
db_url: str = "",
|
|
max_controls: int = 20,
|
|
) -> list[dict]:
|
|
"""Check document against doc_check_controls from DB.
|
|
|
|
Returns list of CheckItem-compatible dicts.
|
|
"""
|
|
if not text or len(text) < 100:
|
|
return []
|
|
|
|
mapped_type = DOC_TYPE_MAP.get(doc_type, doc_type)
|
|
|
|
try:
|
|
import asyncpg
|
|
db = db_url or os.getenv(
|
|
"DATABASE_URL",
|
|
"postgresql://breakpilot:breakpilot@bp-core-postgres:5432/breakpilot",
|
|
)
|
|
conn = await asyncpg.connect(db)
|
|
except Exception as e:
|
|
logger.warning("DB connection failed: %s", e)
|
|
return []
|
|
|
|
try:
|
|
rows = await conn.fetch(
|
|
"""SELECT id, control_id, title, regulation, check_question,
|
|
pass_criteria, fail_criteria, severity
|
|
FROM compliance.doc_check_controls
|
|
WHERE doc_type = $1
|
|
ORDER BY severity DESC, title
|
|
LIMIT $2""",
|
|
mapped_type, max_controls,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("MC query failed: %s", e)
|
|
await conn.close()
|
|
return []
|
|
|
|
await conn.close()
|
|
|
|
if not rows:
|
|
logger.info("No MCs for doc_type '%s' (%s)", mapped_type, doc_title)
|
|
return []
|
|
|
|
logger.info("Checking %d MCs for '%s' (%s)", len(rows), doc_title, mapped_type)
|
|
|
|
results = []
|
|
for row in rows:
|
|
result = await _verify_mc(text, dict(row))
|
|
if result:
|
|
results.append(result)
|
|
|
|
passed = sum(1 for r in results if r["passed"])
|
|
logger.info("MC results: %d/%d passed for '%s'", passed, len(results), doc_title)
|
|
return results
|
|
|
|
|
|
async def _verify_mc(text: str, mc: dict) -> Optional[dict]:
|
|
"""Verify one master control against document text via LLM."""
|
|
question = mc.get("check_question", "")
|
|
if not question:
|
|
return None
|
|
|
|
pass_crit = mc.get("pass_criteria", [])
|
|
fail_crit = mc.get("fail_criteria", [])
|
|
|
|
# Parse JSON if string
|
|
if isinstance(pass_crit, str):
|
|
try:
|
|
pass_crit = _json.loads(pass_crit)
|
|
except Exception:
|
|
pass_crit = [pass_crit]
|
|
if isinstance(fail_crit, str):
|
|
try:
|
|
fail_crit = _json.loads(fail_crit)
|
|
except Exception:
|
|
fail_crit = [fail_crit]
|
|
|
|
pass_str = "\n".join(f" - {p}" for p in pass_crit[:5])
|
|
fail_str = "\n".join(f" - {f}" for f in fail_crit[:5])
|
|
|
|
# Truncate text
|
|
doc_excerpt = text[:6000] if len(text) <= 8000 else text[:4000] + "\n...\n" + text[-2000:]
|
|
|
|
prompt = (
|
|
f"/no_think\n"
|
|
f"FRAGE: {question}\n\n"
|
|
f"PASS wenn ALLE zutreffen:\n{pass_str}\n\n"
|
|
f"FAIL wenn EINES zutrifft:\n{fail_str}\n\n"
|
|
f"DOKUMENT:\n{doc_excerpt[:5000]}\n\n"
|
|
f'Antworte NUR mit JSON: {{"passed": true/false, "evidence": "Textstelle max 80 Zeichen oder leer"}}'
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {"temperature": 0.0, "num_predict": 200},
|
|
})
|
|
|
|
if resp.status_code != 200:
|
|
return None
|
|
|
|
raw = resp.json().get("response", "")
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
|
|
# Parse JSON
|
|
json_match = re.search(r"\{[^{}]+\}", raw)
|
|
if json_match:
|
|
json_str = json_match.group()
|
|
json_str = re.sub(r"(?<=[{,])\s*(\w+)\s*:", r' "\1":', json_str)
|
|
json_str = json_str.replace("True", "true").replace("False", "false")
|
|
try:
|
|
result = _json.loads(json_str)
|
|
return {
|
|
"id": f"mc-{mc.get('control_id', mc['id'][:8])}",
|
|
"label": mc["title"][:80],
|
|
"passed": bool(result.get("passed", False)),
|
|
"severity": (mc.get("severity") or "MEDIUM").upper(),
|
|
"matched_text": str(result.get("evidence", ""))[:100],
|
|
"level": 2,
|
|
"parent": None,
|
|
"skipped": False,
|
|
"hint": question,
|
|
"source": "master_control",
|
|
}
|
|
except _json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback
|
|
passed = '"passed": true' in raw.lower() or '"passed":true' in raw.lower()
|
|
return {
|
|
"id": f"mc-{mc.get('control_id', mc['id'][:8])}",
|
|
"label": mc["title"][:80],
|
|
"passed": passed,
|
|
"severity": (mc.get("severity") or "MEDIUM").upper(),
|
|
"matched_text": "",
|
|
"level": 2,
|
|
"parent": None,
|
|
"skipped": False,
|
|
"hint": question,
|
|
"source": "master_control",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning("MC verify failed for '%s': %s", mc["title"][:40], e)
|
|
return None
|