Files
breakpilot-compliance/backend-compliance/compliance/services/cra_datasheet_extractor.py
T
Benjamin Admin b217429d39 feat(cra): Datenblatt-Extraktion auf lokales 35B + llm_status-Fix
llm_cascade additiv modell-faehig (optionaler model-Param, Cache-Key kennt
model_hint → keine Kollision; Default unveraendert für alle anderen Nutzer).
Datenblatt-Extraktor nutzt jetzt qwen3.5:35b-a3b (CRA_DATASHEET_MODEL, gleiches
Modell wie der Compliance Advisor) für bessere semantische Zuordnung. Plus
llm_status (ok|empty|unavailable) + Logging statt stillem except; Frontend zeigt
bei 'unavailable' einen Hinweis statt leerer Felder (wichtig auf prod ohne
lokales Ollama → Cascade-Fallback bzw. Hinweis).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-16 19:53:48 +02:00

191 lines
8.6 KiB
Python

"""Datasheet -> IACE 'Grenzen' (ISO 12100 machine limits) extraction.
Hybrid: a deterministic pre-pass pulls high-confidence facts (interfaces, units)
straight from the text; the local LLM (Ollama 35B via llm_cascade, local-first)
does the semantic mapping into the IACE LimitsFormData keys. The LLM must NOT
invent values — any field not supported by the text stays empty and becomes a
follow-up question. Each filled field carries a source snippet (auditability).
Pure + testable: detect_signals / parse_grenzen_json / compute_followups. The
async extract_grenzen() wraps the LLM call (llm_cascade, same as vendor extractor).
"""
import json
import logging
import os
import re
logger = logging.getLogger(__name__)
# Datasheet extraction uses the local 35B (same model as the Compliance Advisor) —
# higher-quality semantic mapping than the default cascade model. Env-overridable.
_DATASHEET_MODEL = os.getenv("CRA_DATASHEET_MODEL", "qwen3.5:35b-a3b")
# IACE Grenzen field keys (must match admin LimitsFormData). label + whether it
# is essential for a usable risk assessment (=> asked as follow-up if empty).
LIMIT_FIELDS = [
("machine_designation", "Maschinenbezeichnung", False),
("machine_type", "Maschinentyp", False),
("manufacturer", "Hersteller", False),
("year_of_construction", "Baujahr", False),
("general_description", "Allgemeine Beschreibung", True),
("intended_purpose", "Verwendungszweck", True),
("area_of_use", "Einsatzbereich", True),
("operating_modes", "Betriebsarten", True),
("variants", "Varianten", False),
("foreseeable_misuses", "Vorhersehbare Fehlanwendungen", True),
("spatial_limits", "Räumliche Grenzen", True),
("temporal_limits", "Zeitliche Grenzen", True),
("operating_conditions", "Betriebsbedingungen", True),
("energy_supply", "Energieversorgung", True),
("mechanical_interfaces", "Mechanische Schnittstellen", False),
("electrical_interfaces", "Elektrische Schnittstellen", False),
("software_interfaces", "Software-Schnittstellen", False),
("pneumatic_hydraulic_interfaces", "Pneumatische/Hydraulische Schnittstellen", False),
("person_groups", "Personengruppen", True),
("qualification_requirements", "Qualifikationsanforderungen", True),
]
_FIELD_KEYS = [f[0] for f in LIMIT_FIELDS]
_FIELD_LABEL = {f[0]: f[1] for f in LIMIT_FIELDS}
_ESSENTIAL = {f[0] for f in LIMIT_FIELDS if f[2]}
# Deterministic signal detection — high-confidence facts straight from the text.
_INTERFACE_TOKENS = [
"Ethernet", "EtherCAT", "EtherNet/IP", "PROFINET", "Profinet", "PROFIBUS", "Modbus",
"CANopen", "CAN", "IO-Link", "OPC UA", "OPC-UA", "Anybus", "RS232", "RS-232", "RS485",
"RS-485", "USB", "Bluetooth", "WLAN", "WiFi", "Wi-Fi", "MQTT", "REST", "HTTP",
"Sercos", "DeviceNet", "TCP/IP", "TLS",
]
_UNIT_RE = re.compile(r"\b\d+(?:[.,]\d+)?\s?(?:V|A|kW|bar|mm|cm|°C|Hz|kg|rpm|Achsen|axes|N|W)\b", re.IGNORECASE)
def detect_signals(text: str) -> dict:
"""Deterministic facts: interfaces present + technical units found."""
t = text or ""
low = t.lower()
interfaces = []
seen = set()
for tok in _INTERFACE_TOKENS:
if tok.lower() in low:
key = tok.lower().replace("-", "").replace("/", "").replace(".", "")
if key not in seen:
seen.add(key)
interfaces.append(tok)
units = sorted({m.group(0).strip() for m in _UNIT_RE.finditer(t)})
return {"interfaces": interfaces, "units": units}
def _system_prompt() -> str:
keys = ", ".join(_FIELD_KEYS)
return (
"Du bist ein Sicherheitsingenieur. Extrahiere aus einem Maschinen-/Produkt-"
"Datenblatt die Maschinengrenzen nach ISO 12100. Gib NUR ein JSON-Objekt zurueck:\n"
'{"fields": {"<key>": {"value": "<Text oder \\"\\">", "source": "<woertliches Zitat aus dem Datenblatt oder \\"\\">"}}}\n\n'
f"Erlaubte keys: {keys}\n\n"
"Regeln:\n"
"- Fuelle ein Feld NUR, wenn es im Datenblatt steht. Sonst value=\"\".\n"
"- KEINE Werte erfinden, schaetzen oder annehmen.\n"
"- 'source' ist ein woertliches Zitat aus dem Text, das den Wert belegt.\n"
"- foreseeable_misuses / person_groups / qualification_requirements stehen "
"fast nie im Datenblatt → meist leer lassen.\n"
"- Nur reines JSON, keine Prosa, keine Code-Fences."
)
def parse_grenzen_json(raw: str) -> dict:
"""Parse the LLM response into {key: {value, source}} for known keys only."""
try:
data = json.loads(raw)
except (json.JSONDecodeError, TypeError):
return {}
fields = data.get("fields") if isinstance(data, dict) else None
if not isinstance(fields, dict):
fields = data if isinstance(data, dict) else {}
out = {}
for key in _FIELD_KEYS:
entry = fields.get(key)
if isinstance(entry, dict):
val = str(entry.get("value") or "").strip()
src = str(entry.get("source") or "").strip()
elif isinstance(entry, str):
val, src = entry.strip(), ""
else:
continue
if val:
out[key] = {"value": val, "source": src}
return out
_QUESTION = {
"general_description": "Was tut das Produkt grundsätzlich? (kurze Beschreibung)",
"intended_purpose": "Wofür ist das Produkt bestimmungsgemäß vorgesehen?",
"area_of_use": "In welchem Umfeld / welcher Branche wird es eingesetzt?",
"operating_modes": "Welche Betriebsarten gibt es (Automatik, Einrichten, Wartung …)?",
"foreseeable_misuses": "Welche vernünftigerweise vorhersehbaren Fehlanwendungen gibt es?",
"spatial_limits": "Räumliche Grenzen (Abmessungen, Arbeits-/Zugangsbereich)?",
"temporal_limits": "Zeitliche Grenzen (Lebensdauer, Wartungsintervalle, Betriebsdauer)?",
"operating_conditions": "Betriebsbedingungen (Temperatur, Feuchte, Umgebung)?",
"energy_supply": "Energieversorgung (elektrisch, pneumatisch, hydraulisch)?",
"person_groups": "Welche Personengruppen interagieren mit dem Produkt?",
"qualification_requirements": "Welche Qualifikation brauchen Bediener/Wartung?",
}
def compute_followups(limits: dict) -> list:
"""Essential ISO-12100 fields still empty → targeted follow-up questions."""
out = []
for key in _FIELD_KEYS:
if key in _ESSENTIAL and not (limits.get(key) or "").strip():
out.append({"key": key, "label": _FIELD_LABEL[key],
"question": _QUESTION.get(key, f"Bitte ergänzen: {_FIELD_LABEL[key]}")})
return out
def _merge_detected(limits: dict, provenance: dict, signals: dict) -> None:
"""Backfill electrical/software interfaces from the deterministic detector
when the LLM left them empty (high-confidence facts shouldn't be lost)."""
ifaces = signals.get("interfaces") or []
if not ifaces:
return
net = [i for i in ifaces if i.lower() not in ("usb",)]
if net and not limits.get("electrical_interfaces"):
limits["electrical_interfaces"] = ", ".join(net)
provenance["electrical_interfaces"] = "deterministisch erkannt: " + ", ".join(net)
async def extract_grenzen(text: str, max_chars: int = 20000) -> dict:
"""Datasheet text -> {limits, provenance, detected, missing, followup}."""
signals = detect_signals(text or "")
limits: dict = {}
provenance: dict = {}
llm_status = "skipped" # skipped | ok | empty | unavailable
excerpt = (text or "")[:max_chars]
if len(excerpt) >= 200:
try:
from compliance.services.llm_cascade import call_with_cascade
res = await call_with_cascade(
system=_system_prompt(),
user=f"Datenblatt-Text:\n\n{excerpt}",
min_confidence=0.5, max_tokens=4000, model=_DATASHEET_MODEL,
)
parsed = parse_grenzen_json(res.get("text", "") if isinstance(res, dict) else "")
for key, entry in parsed.items():
limits[key] = entry["value"]
provenance[key] = entry.get("source", "")
llm_status = "ok" if parsed else "empty"
except Exception as e:
# best-effort: keep the deterministic facts, but surface the failure so
# a cold-start/timeout doesn't masquerade as "nothing on the datasheet".
logger.warning("datasheet LLM extraction failed: %s (%s)", e, type(e).__name__)
llm_status = "unavailable"
_merge_detected(limits, provenance, signals)
return {
"limits": limits,
"provenance": provenance,
"detected": signals,
"llm_status": llm_status,
"filled": sorted(limits.keys()),
"missing": [k for k in _FIELD_KEYS if not (limits.get(k) or "").strip()],
"followup": compute_followups(limits),
}