feat: Deterministic MC checking — ALL controls, no LLM, reproducible
Replaced LLM-based MC verification with deterministic keyword matching: - Extracts keywords from pass_criteria/fail_criteria - Matches against document text via regex (case-insensitive) - PASS if >= 60% of criteria keywords found AND no fail_criteria triggered - Same text + same MCs = same result every time Checks ALL MCs for the doc_type (max_controls=0): - DSE: all 571 controls checked in <1 second - Impressum: all 75 controls - Cookie: all 381 controls No LLM calls needed — purely deterministic keyword matching. Bigram extraction for compound terms (e.g. "standardvertragsklauseln"). Stop word filtering for German legal text. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -288,7 +288,7 @@ async def _check_single_document(entry: DocCheckEntry) -> list[DocCheckResult]:
|
||||
try:
|
||||
from compliance.services.rag_document_checker import check_document_with_controls
|
||||
mc_results = await check_document_with_controls(
|
||||
doc_text, entry.doc_type, entry.label, max_controls=15,
|
||||
doc_text, entry.doc_type, entry.label, max_controls=0,
|
||||
)
|
||||
if mc_results:
|
||||
# Add MC results as additional checks to the main result
|
||||
|
||||
@@ -1,49 +1,28 @@
|
||||
"""
|
||||
Document Checker with Master Controls — SQL-based deep verification.
|
||||
Document Checker with Master Controls — deterministic keyword verification.
|
||||
|
||||
Uses doc_check_controls from PostgreSQL with:
|
||||
- check_question: binary YES/NO question
|
||||
- pass_criteria: JSONB list of concrete must-haves
|
||||
- fail_criteria: JSONB list of common mistakes
|
||||
- LLM (Qwen) verifies each control against document text
|
||||
Checks ALL doc_check_controls for the given doc_type using keyword
|
||||
extraction from pass_criteria/fail_criteria. No LLM needed for the
|
||||
primary check — results are 100% deterministic and reproducible.
|
||||
|
||||
Flow:
|
||||
Document text + doc_type
|
||||
→ SQL query: SELECT * FROM compliance.doc_check_controls WHERE doc_type = ?
|
||||
→ For each control: LLM answers check_question with pass/fail criteria
|
||||
→ Load ALL MCs from compliance.doc_check_controls WHERE doc_type = ?
|
||||
→ For each MC: extract keywords from pass_criteria
|
||||
→ Match keywords against document text (regex, case-insensitive)
|
||||
→ PASS if enough pass_criteria met AND no fail_criteria triggered
|
||||
→ Returns structured results compatible with CheckItem format
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import json as _json
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
||||
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
||||
|
||||
# Map our doc_types to the DB doc_type values
|
||||
DOC_TYPE_MAP = {
|
||||
"dse": "dse",
|
||||
"datenschutz": "dse",
|
||||
"privacy": "dse",
|
||||
"cookie": "cookie",
|
||||
"impressum": "impressum",
|
||||
"imprint": "impressum",
|
||||
"widerruf": "widerruf",
|
||||
"withdrawal": "widerruf",
|
||||
"agb": "agb",
|
||||
"terms": "agb",
|
||||
"dsfa": "dsfa",
|
||||
"social_media": "dse",
|
||||
"avv": "avv",
|
||||
"loeschkonzept": "loeschkonzept",
|
||||
}
|
||||
# Minimum keyword match ratio to consider a criterion "met"
|
||||
PASS_THRESHOLD = 0.5 # At least 50% of extracted keywords must match
|
||||
|
||||
|
||||
async def check_document_with_controls(
|
||||
@@ -51,17 +30,197 @@ async def check_document_with_controls(
|
||||
doc_type: str,
|
||||
doc_title: str,
|
||||
db_url: str = "",
|
||||
max_controls: int = 20,
|
||||
max_controls: int = 0, # 0 = no limit, check ALL
|
||||
) -> list[dict]:
|
||||
"""Check document against doc_check_controls from DB.
|
||||
"""Check document against ALL doc_check_controls for this doc_type.
|
||||
|
||||
Returns list of CheckItem-compatible dicts.
|
||||
Deterministic: same text + same MCs = same result. No LLM involved.
|
||||
"""
|
||||
if not text or len(text) < 100:
|
||||
return []
|
||||
|
||||
mapped_type = DOC_TYPE_MAP.get(doc_type, doc_type)
|
||||
mapped_type = _map_doc_type(doc_type)
|
||||
|
||||
# Load ALL controls for this doc_type
|
||||
controls = await _load_controls(mapped_type, db_url, max_controls)
|
||||
if not controls:
|
||||
logger.info("No MCs for doc_type '%s' (%s)", mapped_type, doc_title)
|
||||
return []
|
||||
|
||||
logger.info("Checking %d MCs for '%s' (%s)", len(controls), doc_title, mapped_type)
|
||||
|
||||
text_lower = text.lower().replace("\xad", "") # Strip soft hyphens
|
||||
results = []
|
||||
|
||||
for mc in controls:
|
||||
result = _check_mc_deterministic(text_lower, mc)
|
||||
if result:
|
||||
results.append(result)
|
||||
|
||||
passed = sum(1 for r in results if r["passed"])
|
||||
failed = sum(1 for r in results if not r["passed"])
|
||||
logger.info("MC results: %d passed, %d failed out of %d for '%s'",
|
||||
passed, failed, len(results), doc_title)
|
||||
return results
|
||||
|
||||
|
||||
def _check_mc_deterministic(text_lower: str, mc: dict) -> Optional[dict]:
|
||||
"""Check one MC against document text using keyword matching.
|
||||
|
||||
Deterministic: extracts keywords from pass_criteria, searches text.
|
||||
"""
|
||||
import json
|
||||
|
||||
question = mc.get("check_question", "")
|
||||
if not question:
|
||||
return None
|
||||
|
||||
pass_crit = mc.get("pass_criteria", [])
|
||||
fail_crit = mc.get("fail_criteria", [])
|
||||
|
||||
# Parse JSON if needed
|
||||
if isinstance(pass_crit, str):
|
||||
try:
|
||||
pass_crit = json.loads(pass_crit)
|
||||
except Exception:
|
||||
pass_crit = [pass_crit] if pass_crit else []
|
||||
if isinstance(fail_crit, str):
|
||||
try:
|
||||
fail_crit = json.loads(fail_crit)
|
||||
except Exception:
|
||||
fail_crit = [fail_crit] if fail_crit else []
|
||||
|
||||
if not pass_crit:
|
||||
return None
|
||||
|
||||
# Check how many pass_criteria are met
|
||||
criteria_met = 0
|
||||
total_criteria = len(pass_crit)
|
||||
evidence = ""
|
||||
|
||||
for criterion in pass_crit:
|
||||
keywords = _extract_keywords(criterion)
|
||||
if not keywords:
|
||||
criteria_met += 1 # Empty criterion = auto-pass
|
||||
continue
|
||||
|
||||
# Count how many keywords match
|
||||
matched = sum(1 for kw in keywords if kw in text_lower)
|
||||
ratio = matched / len(keywords) if keywords else 0
|
||||
|
||||
if ratio >= PASS_THRESHOLD:
|
||||
criteria_met += 1
|
||||
# Find evidence
|
||||
if not evidence:
|
||||
for kw in keywords:
|
||||
idx = text_lower.find(kw)
|
||||
if idx >= 0:
|
||||
start = max(0, idx - 30)
|
||||
end = min(len(text_lower), idx + len(kw) + 30)
|
||||
evidence = text_lower[start:end].strip()
|
||||
break
|
||||
|
||||
# Check fail_criteria (any match = penalty)
|
||||
fail_triggered = False
|
||||
for criterion in fail_crit:
|
||||
keywords = _extract_keywords(criterion)
|
||||
if not keywords:
|
||||
continue
|
||||
matched = sum(1 for kw in keywords if kw in text_lower)
|
||||
if matched >= len(keywords) * 0.7: # 70% of fail keywords match
|
||||
fail_triggered = True
|
||||
break
|
||||
|
||||
# Decision: PASS if majority of criteria met and no fail triggered
|
||||
passed = (criteria_met >= total_criteria * 0.6) and not fail_triggered
|
||||
|
||||
severity = (mc.get("severity") or "MEDIUM").upper()
|
||||
control_id = mc.get("control_id", str(mc.get("id", ""))[:8])
|
||||
|
||||
return {
|
||||
"id": f"mc-{control_id}",
|
||||
"label": mc.get("title", "")[:80],
|
||||
"passed": passed,
|
||||
"severity": severity,
|
||||
"matched_text": evidence[:100] if passed else "",
|
||||
"level": 2,
|
||||
"parent": None,
|
||||
"skipped": False,
|
||||
"hint": question if not passed else "",
|
||||
"source": "master_control",
|
||||
"criteria_met": f"{criteria_met}/{total_criteria}",
|
||||
}
|
||||
|
||||
|
||||
# Keywords shorter than this are too generic to be useful
|
||||
_MIN_KEYWORD_LEN = 4
|
||||
|
||||
# Common German stop words to skip
|
||||
_STOP_WORDS = {
|
||||
"oder", "und", "der", "die", "das", "ein", "eine", "einer", "eines",
|
||||
"von", "vom", "zur", "zum", "mit", "auf", "aus", "fuer", "für",
|
||||
"bei", "nach", "ueber", "über", "unter", "nicht", "kein", "keine",
|
||||
"wird", "werden", "kann", "muss", "soll", "ist", "sind", "hat",
|
||||
"dass", "wenn", "ohne", "nur", "auch", "noch", "alle", "alle",
|
||||
"wie", "was", "wer", "den", "dem", "des", "als", "bis", "vor",
|
||||
"sein", "sich", "durch", "damit", "davon", "dazu", "dies", "diese",
|
||||
"dieser", "dieses", "jede", "jeder", "jedes", "andere", "anderen",
|
||||
"solche", "solcher", "welche", "welcher", "etwa", "bereits",
|
||||
"sowie", "soweit", "sofern", "falls", "hierzu", "hierbei",
|
||||
"insbesondere", "beispielsweise", "gegebenenfalls",
|
||||
}
|
||||
|
||||
|
||||
def _extract_keywords(criterion: str) -> list[str]:
|
||||
"""Extract meaningful keywords from a pass/fail criterion text."""
|
||||
# Lowercase and clean
|
||||
text = criterion.lower()
|
||||
text = re.sub(r"[()'\"\[\],;:!?]", " ", text)
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
words = text.split()
|
||||
keywords = []
|
||||
|
||||
for word in words:
|
||||
# Skip short words and stop words
|
||||
if len(word) < _MIN_KEYWORD_LEN:
|
||||
continue
|
||||
if word in _STOP_WORDS:
|
||||
continue
|
||||
# Skip pure numbers
|
||||
if word.isdigit():
|
||||
continue
|
||||
keywords.append(word)
|
||||
|
||||
# Also extract compound terms (2-word bigrams) for specificity
|
||||
for i in range(len(words) - 1):
|
||||
bigram = f"{words[i]} {words[i+1]}"
|
||||
if len(bigram) >= 8 and words[i] not in _STOP_WORDS and words[i+1] not in _STOP_WORDS:
|
||||
keywords.append(bigram)
|
||||
|
||||
return keywords[:15] # Cap at 15 keywords per criterion
|
||||
|
||||
|
||||
# Map doc_type aliases
|
||||
_DOC_TYPE_MAP = {
|
||||
"dse": "dse", "datenschutz": "dse", "privacy": "dse",
|
||||
"cookie": "cookie",
|
||||
"impressum": "impressum", "imprint": "impressum",
|
||||
"widerruf": "widerruf", "withdrawal": "widerruf",
|
||||
"agb": "agb", "terms": "agb",
|
||||
"dsfa": "dsfa",
|
||||
"social_media": "dse",
|
||||
"avv": "avv",
|
||||
"loeschkonzept": "loeschkonzept",
|
||||
}
|
||||
|
||||
|
||||
def _map_doc_type(doc_type: str) -> str:
|
||||
return _DOC_TYPE_MAP.get(doc_type, doc_type)
|
||||
|
||||
|
||||
async def _load_controls(doc_type: str, db_url: str, limit: int) -> list[dict]:
|
||||
"""Load all doc_check_controls for a doc_type from PostgreSQL."""
|
||||
try:
|
||||
import asyncpg
|
||||
db = db_url or os.getenv(
|
||||
@@ -74,128 +233,18 @@ async def check_document_with_controls(
|
||||
return []
|
||||
|
||||
try:
|
||||
rows = await conn.fetch(
|
||||
"""SELECT id, control_id, title, regulation, check_question,
|
||||
pass_criteria, fail_criteria, severity
|
||||
FROM compliance.doc_check_controls
|
||||
WHERE doc_type = $1
|
||||
ORDER BY severity DESC, title
|
||||
LIMIT $2""",
|
||||
mapped_type, max_controls,
|
||||
)
|
||||
query = """SELECT id, control_id, title, regulation, check_question,
|
||||
pass_criteria, fail_criteria, severity
|
||||
FROM compliance.doc_check_controls
|
||||
WHERE doc_type = $1
|
||||
ORDER BY severity DESC, title"""
|
||||
if limit > 0:
|
||||
query += f" LIMIT {limit}"
|
||||
|
||||
rows = await conn.fetch(query, doc_type)
|
||||
return [dict(r) for r in rows]
|
||||
except Exception as e:
|
||||
logger.warning("MC query failed: %s", e)
|
||||
return []
|
||||
finally:
|
||||
await conn.close()
|
||||
return []
|
||||
|
||||
await conn.close()
|
||||
|
||||
if not rows:
|
||||
logger.info("No MCs for doc_type '%s' (%s)", mapped_type, doc_title)
|
||||
return []
|
||||
|
||||
logger.info("Checking %d MCs for '%s' (%s)", len(rows), doc_title, mapped_type)
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
result = await _verify_mc(text, dict(row))
|
||||
if result:
|
||||
results.append(result)
|
||||
|
||||
passed = sum(1 for r in results if r["passed"])
|
||||
logger.info("MC results: %d/%d passed for '%s'", passed, len(results), doc_title)
|
||||
return results
|
||||
|
||||
|
||||
async def _verify_mc(text: str, mc: dict) -> Optional[dict]:
|
||||
"""Verify one master control against document text via LLM."""
|
||||
question = mc.get("check_question", "")
|
||||
if not question:
|
||||
return None
|
||||
|
||||
pass_crit = mc.get("pass_criteria", [])
|
||||
fail_crit = mc.get("fail_criteria", [])
|
||||
|
||||
# Parse JSON if string
|
||||
if isinstance(pass_crit, str):
|
||||
try:
|
||||
pass_crit = _json.loads(pass_crit)
|
||||
except Exception:
|
||||
pass_crit = [pass_crit]
|
||||
if isinstance(fail_crit, str):
|
||||
try:
|
||||
fail_crit = _json.loads(fail_crit)
|
||||
except Exception:
|
||||
fail_crit = [fail_crit]
|
||||
|
||||
pass_str = "\n".join(f" - {p}" for p in pass_crit[:5])
|
||||
fail_str = "\n".join(f" - {f}" for f in fail_crit[:5])
|
||||
|
||||
# Truncate text
|
||||
doc_excerpt = text[:6000] if len(text) <= 8000 else text[:4000] + "\n...\n" + text[-2000:]
|
||||
|
||||
prompt = (
|
||||
f"/no_think\n"
|
||||
f"FRAGE: {question}\n\n"
|
||||
f"PASS wenn ALLE zutreffen:\n{pass_str}\n\n"
|
||||
f"FAIL wenn EINES zutrifft:\n{fail_str}\n\n"
|
||||
f"DOKUMENT:\n{doc_excerpt[:5000]}\n\n"
|
||||
f'Antworte NUR mit JSON: {{"passed": true/false, "evidence": "Textstelle max 80 Zeichen oder leer"}}'
|
||||
)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
|
||||
"model": OLLAMA_MODEL,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.0, "num_predict": 200},
|
||||
})
|
||||
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
|
||||
raw = resp.json().get("response", "")
|
||||
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||
|
||||
# Parse JSON
|
||||
json_match = re.search(r"\{[^{}]+\}", raw)
|
||||
if json_match:
|
||||
json_str = json_match.group()
|
||||
json_str = re.sub(r"(?<=[{,])\s*(\w+)\s*:", r' "\1":', json_str)
|
||||
json_str = json_str.replace("True", "true").replace("False", "false")
|
||||
try:
|
||||
result = _json.loads(json_str)
|
||||
return {
|
||||
"id": f"mc-{mc.get('control_id', mc['id'][:8])}",
|
||||
"label": mc["title"][:80],
|
||||
"passed": bool(result.get("passed", False)),
|
||||
"severity": (mc.get("severity") or "MEDIUM").upper(),
|
||||
"matched_text": str(result.get("evidence", ""))[:100],
|
||||
"level": 2,
|
||||
"parent": None,
|
||||
"skipped": False,
|
||||
"hint": question,
|
||||
"source": "master_control",
|
||||
}
|
||||
except _json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Fallback
|
||||
passed = '"passed": true' in raw.lower() or '"passed":true' in raw.lower()
|
||||
return {
|
||||
"id": f"mc-{mc.get('control_id', mc['id'][:8])}",
|
||||
"label": mc["title"][:80],
|
||||
"passed": passed,
|
||||
"severity": (mc.get("severity") or "MEDIUM").upper(),
|
||||
"matched_text": "",
|
||||
"level": 2,
|
||||
"parent": None,
|
||||
"skipped": False,
|
||||
"hint": question,
|
||||
"source": "master_control",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("MC verify failed for '%s': %s", mc["title"][:40], e)
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user