feat: LLM interpretation layer for failed MC checks
Deterministic pass/fail stays unchanged. After keyword checking, ONE batched LLM call enriches the top 10 severity FAILs with context-specific recommendations based on the actual document. Example: If document uses Google Analytics but lacks transfer mechanism → LLM generates: "Sie nutzen Google Analytics (USA). Ergaenzen Sie einen Verweis auf das EU-US Data Privacy Framework und pruefen Sie die DPF-Zertifizierung unter dataprivacyframework.gov." - Pass/fail: deterministic (keyword matching, reproducible) - Hint enrichment: LLM (contextual, one call for all fails) - Temperature 0.3 for consistency - Graceful fallback if Ollama unavailable Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,8 +19,13 @@ import os
|
|||||||
import re
|
import re
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
||||||
|
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
||||||
|
|
||||||
# Minimum keyword match ratio to consider a criterion "met"
|
# Minimum keyword match ratio to consider a criterion "met"
|
||||||
PASS_THRESHOLD = 0.5 # At least 50% of extracted keywords must match
|
PASS_THRESHOLD = 0.5 # At least 50% of extracted keywords must match
|
||||||
|
|
||||||
@@ -58,9 +63,17 @@ async def check_document_with_controls(
|
|||||||
results.append(result)
|
results.append(result)
|
||||||
|
|
||||||
passed = sum(1 for r in results if r["passed"])
|
passed = sum(1 for r in results if r["passed"])
|
||||||
failed = sum(1 for r in results if not r["passed"])
|
failed_results = [r for r in results if not r["passed"]]
|
||||||
logger.info("MC results: %d passed, %d failed out of %d for '%s'",
|
logger.info("MC results: %d passed, %d failed out of %d for '%s'",
|
||||||
passed, failed, len(results), doc_title)
|
passed, len(failed_results), len(results), doc_title)
|
||||||
|
|
||||||
|
# LLM Interpretation: enrich FAILs with context-specific recommendations
|
||||||
|
if failed_results:
|
||||||
|
try:
|
||||||
|
await _enrich_fails_with_llm(text, failed_results, doc_title)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("LLM interpretation skipped: %s", e)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
@@ -248,3 +261,92 @@ async def _load_controls(doc_type: str, db_url: str, limit: int) -> list[dict]:
|
|||||||
return []
|
return []
|
||||||
finally:
|
finally:
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def _enrich_fails_with_llm(
|
||||||
|
doc_text: str,
|
||||||
|
failed_results: list[dict],
|
||||||
|
doc_title: str,
|
||||||
|
) -> None:
|
||||||
|
"""Enrich failed MC results with LLM-generated context-specific advice.
|
||||||
|
|
||||||
|
Does NOT change pass/fail (deterministic result stays). Only adds
|
||||||
|
a richer 'hint' with concrete recommendations based on the actual
|
||||||
|
document content.
|
||||||
|
|
||||||
|
Uses ONE batched LLM call for up to 10 top-severity FAILs.
|
||||||
|
"""
|
||||||
|
# Select top failures by severity (max 10 to fit context window)
|
||||||
|
sev_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
|
||||||
|
top_fails = sorted(
|
||||||
|
failed_results,
|
||||||
|
key=lambda r: sev_order.get(r.get("severity", "MEDIUM"), 2),
|
||||||
|
)[:10]
|
||||||
|
|
||||||
|
fail_list = "\n".join(
|
||||||
|
f"{i+1}. [{r['severity']}] {r['label']} — {r.get('hint', '')[:100]}"
|
||||||
|
for i, r in enumerate(top_fails)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Truncate document for context
|
||||||
|
excerpt = doc_text[:4000] if len(doc_text) > 5000 else doc_text
|
||||||
|
|
||||||
|
prompt = (
|
||||||
|
"/no_think\n"
|
||||||
|
f"Du bist ein Datenschutz-Experte. Analysiere das Dokument '{doc_title}' "
|
||||||
|
f"und gib fuer JEDEN der folgenden fehlgeschlagenen Pruefpunkte eine "
|
||||||
|
f"konkrete, umsetzbare Empfehlung (1-2 Saetze).\n\n"
|
||||||
|
f"Beruecksichtige dabei den Inhalt des Dokuments — welche Dienste werden "
|
||||||
|
f"genutzt? Welche Rechtsgrundlagen sind genannt? Was fehlt konkret?\n\n"
|
||||||
|
f"FEHLGESCHLAGENE PRUEFPUNKTE:\n{fail_list}\n\n"
|
||||||
|
f"DOKUMENT (Auszug):\n{excerpt[:3000]}\n\n"
|
||||||
|
f"Antworte als JSON-Array: [\n"
|
||||||
|
f' {{"nr": 1, "empfehlung": "Konkreter Hinweis..."}},\n'
|
||||||
|
f' {{"nr": 2, "empfehlung": "..."}}\n'
|
||||||
|
f"]\n"
|
||||||
|
f"Nur die Empfehlungen, kein anderer Text."
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||||
|
resp = await client.post(f"{OLLAMA_URL}/api/generate", json={
|
||||||
|
"model": OLLAMA_MODEL,
|
||||||
|
"prompt": prompt,
|
||||||
|
"stream": False,
|
||||||
|
"options": {"temperature": 0.3, "num_predict": 1500},
|
||||||
|
})
|
||||||
|
|
||||||
|
if resp.status_code != 200:
|
||||||
|
return
|
||||||
|
|
||||||
|
raw = resp.json().get("response", "")
|
||||||
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||||
|
|
||||||
|
# Parse JSON array
|
||||||
|
import json
|
||||||
|
arr_match = re.search(r"\[[\s\S]*\]", raw)
|
||||||
|
if not arr_match:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
recommendations = json.loads(arr_match.group())
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Enrich the failed results with LLM recommendations
|
||||||
|
for rec in recommendations:
|
||||||
|
nr = rec.get("nr", 0)
|
||||||
|
advice = rec.get("empfehlung", "")
|
||||||
|
if 1 <= nr <= len(top_fails) and advice:
|
||||||
|
existing_hint = top_fails[nr - 1].get("hint", "")
|
||||||
|
# Append LLM advice after the deterministic hint
|
||||||
|
top_fails[nr - 1]["hint"] = (
|
||||||
|
f"{existing_hint}\n\n"
|
||||||
|
f"Empfehlung: {advice}"
|
||||||
|
).strip() if existing_hint else advice
|
||||||
|
|
||||||
|
logger.info("LLM enriched %d/%d fails for '%s'",
|
||||||
|
len(recommendations), len(top_fails), doc_title)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("LLM enrichment failed: %s", e)
|
||||||
|
|||||||
Reference in New Issue
Block a user