Files
breakpilot-compliance/backend-compliance/compliance/services/mc_solution_generator.py
T
Benjamin Admin cf6005a47c
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 41s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
perf(audit): vendor_llm_extractor + mc_solution_generator nutzen P31 LLM-Cascade
Beide rufen jetzt llm_cascade.call_with_cascade() statt direkter Qwen/OVH-
Aufrufe. Damit:
* Cache-Hit auf identische Eingaben (Valkey, 7d TTL) → ~50ms statt
  4-6min beim Re-Run derselben Cookie-Doc.
* Tiered Cascade automatisch: Qwen → OVH 120B → Anthropic Claude Haiku
  wenn lower-tier under confidence-threshold.
* Confidence-Scoring (JSON-parse + items_per_input_size) entscheidet ob
  weiter delegiert wird.

Fallback auf alte _call_ollama/_call_ovh bleibt bestehen wenn der
Cascade-Aufruf scheitert.

Erwartete Wirkung beim 2. VW-Lauf: ~10min statt ~25min (Cache-Hit auf
identische Cookie-Doc + MC-Solutions).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 09:40:11 +02:00

273 lines
9.6 KiB
Python

"""
P73 — MC-Solution-Generator.
Generiert pro Fail-MC eine konkrete Einfuege-Empfehlung mit Anchor:
"Bitte ergaenzen Sie nach Abschnitt 'Kontaktdaten DSB' folgenden
Absatz: ...". LLM-Cascade Qwen (lokal) -> OVH 120B.
Cache: in-process LRU per (mc_id, doc_md5) damit Re-Runs derselben
Site denselben Vorschlag liefern. Volle DB-Cache kommt spaeter (P31).
Integration: wird im build_critical_findings_html / mc-detail-rendering
unter jedem HIGH-Fail als eingeklappbarer Block angezeigt.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
from functools import lru_cache
from typing import Iterable
import httpx
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = (
"Du bist Datenschutz-Redakteur. Du formulierst kurze, einfueg-bereite "
"Absaetze fuer Datenschutz-Dokumente — sachlich, in deutscher "
"Rechtssprache, ohne Marketing-Floskeln.\n\n"
"Du bekommst:\n"
"- den FAIL-MC (was geprueft wurde, warum es nicht erfuellt ist)\n"
"- einen Auszug aus dem Ist-Dokument\n"
"- den Dokument-Typ\n\n"
"Du lieferst JSON:\n"
'{\n'
' "solution_text": "<3-6 Saetze Vorschlags-Absatz fuer das Dokument>",\n'
' "anchor_hint": "<wo einfuegen, z.B. \\"nach Abschnitt Kontaktdaten\\">",\n'
' "effort_min": "<gering|mittel|hoch>"\n'
'}\n\n'
"Regeln:\n"
"- KEINE Normtexte 1:1 zitieren — eigene Formulierung + Norm-Referenz.\n"
"- KEINE Annahmen ueber Konkretes (z.B. Firmennamen, Adressen) — "
"Platzhalter [Ihr Firmenname] / [Ihre Adresse] verwenden.\n"
"- Wenn schon eine schwache Variante im Dokument steht, anchor_hint "
"auf 'ersetzen' setzen statt einfuegen.\n"
"- Nur reines JSON, keine Prosa, keine Code-Fences."
)
def _doc_hash(doc_text: str) -> str:
return hashlib.md5(doc_text.encode("utf-8")).hexdigest()[:12]
_CACHE: dict[str, dict] = {}
_CACHE_MAX = 500
def _cache_get(key: str) -> dict | None:
return _CACHE.get(key)
def _cache_put(key: str, val: dict) -> None:
if len(_CACHE) >= _CACHE_MAX:
# Drop oldest 50 entries
for k in list(_CACHE.keys())[:50]:
_CACHE.pop(k, None)
_CACHE[key] = val
async def _call_ollama(prompt: str) -> str:
base = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
model = os.getenv("MC_SOLUTION_MODEL",
os.getenv("CMP_LLM_MODEL", "qwen3:30b-a3b"))
payload = {
"model": model, "stream": False, "format": "json",
"messages": [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
"options": {"temperature": 0.1, "num_predict": 600},
}
try:
async with httpx.AsyncClient(timeout=90.0) as client:
resp = await client.post(f"{base.rstrip('/')}/api/chat", json=payload)
resp.raise_for_status()
return (resp.json().get("message") or {}).get("content", "")
except Exception as e:
logger.warning("Qwen MC-solution failed: %s", e)
return ""
async def _call_ovh(prompt: str) -> str:
base = os.getenv("OVH_LLM_URL", "").strip()
key = os.getenv("OVH_LLM_KEY", "").strip()
model = os.getenv("OVH_LLM_MODEL", "").strip()
if not base or not model:
return ""
headers = {"Content-Type": "application/json"}
if key:
headers["Authorization"] = f"Bearer {key}"
payload = {
"model": model, "temperature": 0.1, "max_tokens": 600,
"messages": [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
"response_format": {"type": "json_object"},
}
try:
async with httpx.AsyncClient(timeout=45.0) as client:
resp = await client.post(
f"{base.rstrip('/')}/v1/chat/completions",
json=payload, headers=headers,
)
resp.raise_for_status()
choice = (resp.json().get("choices") or [{}])[0]
return (choice.get("message") or {}).get("content", "") or ""
except Exception as e:
logger.warning("OVH MC-solution failed: %s", e)
return ""
def _parse(content: str) -> dict | None:
if not content:
return None
txt = content.strip()
if txt.startswith("```"):
txt = "\n".join(txt.split("\n")[1:-1])
a, b = txt.find("{"), txt.rfind("}")
if 0 <= a < b:
try:
obj = json.loads(txt[a:b + 1])
if isinstance(obj, dict) and obj.get("solution_text"):
return {
"solution_text": str(obj["solution_text"])[:1200],
"anchor_hint": str(obj.get("anchor_hint", ""))[:200],
"effort_min": str(obj.get("effort_min", "mittel"))[:20],
}
except Exception:
return None
return None
async def generate_solution(
mc: dict,
doc_text: str,
doc_type: str,
) -> dict | None:
"""Generates a solution dict for a single FAIL-MC.
mc must contain: label, hint, severity. Returns
{solution_text, anchor_hint, effort_min} or None.
"""
if not mc or not doc_text:
return None
mc_id = str(mc.get("id") or mc.get("label", ""))[:80]
cache_key = f"{mc_id}:{doc_type}:{_doc_hash(doc_text)}"
cached = _cache_get(cache_key)
if cached:
return cached
excerpt = doc_text[:3500]
prompt = (
f"FAIL-MC: {mc.get('label', '')}\n"
f"Severity: {mc.get('severity', 'MEDIUM')}\n"
f"Aktueller Hint: {mc.get('hint', '')[:300]}\n\n"
f"Dokument-Typ: {doc_type}\n"
f"Dokument-Auszug:\n---\n{excerpt}\n---\n\n"
"Liefere die Loesung als JSON."
)
# P31: tiered Cascade (Qwen → OVH → Anthropic) mit Valkey-Cache.
try:
from compliance.services.llm_cascade import call_with_cascade
res = await call_with_cascade(
system=_SYSTEM_PROMPT, user=prompt,
min_confidence=0.5, max_tokens=600,
)
parsed = _parse(res.get("text", ""))
if parsed:
_cache_put(cache_key, parsed)
return parsed
except Exception:
# fall through to legacy direct calls
pass
content = await _call_ollama(prompt)
parsed = _parse(content)
if not parsed:
content = await _call_ovh(prompt)
parsed = _parse(content)
if parsed:
_cache_put(cache_key, parsed)
return parsed
async def generate_solutions_for_fails(
failed_mcs: Iterable[dict],
doc_text: str,
doc_type: str,
limit: int = 5,
) -> list[dict]:
"""Returns a list of {mc_label, severity, solution_text, anchor_hint,
effort_min} for the top-N HIGH/CRITICAL fails. Skips MEDIUM/LOW
to keep latency bounded."""
sev_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
high_fails = [m for m in (failed_mcs or [])
if (m.get("severity") or "").upper() in ("CRITICAL", "HIGH")]
high_fails.sort(key=lambda m: sev_order.get(
(m.get("severity") or "").upper(), 3))
high_fails = high_fails[:limit]
out: list[dict] = []
for mc in high_fails:
sol = await generate_solution(mc, doc_text, doc_type)
if not sol:
continue
out.append({
"mc_label": mc.get("label", "")[:200],
"severity": mc.get("severity", "MEDIUM"),
"solution_text": sol["solution_text"],
"anchor_hint": sol["anchor_hint"],
"effort_min": sol["effort_min"],
})
return out
def build_solutions_block_html(solutions: list[dict]) -> str:
"""Renders the LLM-generated solutions as a Mail-Block."""
if not solutions:
return ""
items: list[str] = []
for s in solutions:
sev_color = "#dc2626" if s["severity"].upper() == "CRITICAL" else "#d97706"
items.append(
f'<li style="margin-bottom:12px;font-size:11px;line-height:1.5">'
f'<div style="font-weight:600;color:{sev_color}">'
f'[{s["severity"]}] {s["mc_label"]}</div>'
f'<div style="background:#fff;padding:8px 10px;border:1px solid '
f'#cbd5e1;border-radius:4px;margin-top:4px;color:#1e293b;'
f'white-space:pre-wrap">{s["solution_text"]}</div>'
f'<div style="font-size:10px;color:#64748b;margin-top:3px">'
f'<strong>Anchor:</strong> {s["anchor_hint"] or ""} '
f'&nbsp;·&nbsp; <strong>Aufwand:</strong> {s["effort_min"]}'
f'</div></li>'
)
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:760px;margin:0 auto 16px;padding:14px 18px;'
'background:#f0f9ff;border:1px solid #bfdbfe;border-radius:8px">'
'<div style="font-size:11px;color:#1e40af;text-transform:uppercase;'
'letter-spacing:1.2px;margin-bottom:4px;font-weight:600">'
'Loesungs-Vorschlaege (KI-generiert)</div>'
f'<h3 style="margin:0 0 6px;font-size:14px;color:#1e293b">'
f'{len(solutions)} konkrete Einfuege-Empfehlung'
f'{"en" if len(solutions) != 1 else ""} '
'fuer die kritischen Findings</h3>'
'<p style="margin:0 0 10px;font-size:11px;color:#475569;line-height:1.5">'
'Folgende Absaetze koennen Sie direkt uebernehmen — Platzhalter '
'[Ihr Firmenname] / [Ihre Adresse] sind zu ersetzen. Inhaltliche '
'Korrektheit ist mit DSB / Rechtsabteilung zu pruefen.</p>'
'<ul style="margin:0 0 0 18px;padding:0">'
+ "".join(items) +
'</ul>'
'<p style="margin:8px 0 0;font-size:10px;color:#94a3b8;'
'font-style:italic">Generiert via Qwen3-30b lokal (Fallback: '
'OVH 120B). Vorschlaege sind kein Rechts-Beratung.</p>'
'</div>'
)