"""Datasheet -> IACE 'Grenzen' (ISO 12100 machine limits) extraction. Hybrid: a deterministic pre-pass pulls high-confidence facts (interfaces, units) straight from the text; the local LLM (Ollama 35B via llm_cascade, local-first) does the semantic mapping into the IACE LimitsFormData keys. The LLM must NOT invent values — any field not supported by the text stays empty and becomes a follow-up question. Each filled field carries a source snippet (auditability). Pure + testable: detect_signals / parse_grenzen_json / compute_followups. The async extract_grenzen() wraps the LLM call (llm_cascade, same as vendor extractor). """ import json import logging import os import re logger = logging.getLogger(__name__) # Datasheet extraction uses the local 35B (same model as the Compliance Advisor) — # higher-quality semantic mapping than the default cascade model. Env-overridable. _DATASHEET_MODEL = os.getenv("CRA_DATASHEET_MODEL", "qwen3.5:35b-a3b") # IACE Grenzen field keys (must match admin LimitsFormData). label + whether it # is essential for a usable risk assessment (=> asked as follow-up if empty). LIMIT_FIELDS = [ ("machine_designation", "Maschinenbezeichnung", False), ("machine_type", "Maschinentyp", False), ("manufacturer", "Hersteller", False), ("year_of_construction", "Baujahr", False), ("general_description", "Allgemeine Beschreibung", True), ("intended_purpose", "Verwendungszweck", True), ("area_of_use", "Einsatzbereich", True), ("operating_modes", "Betriebsarten", True), ("variants", "Varianten", False), ("foreseeable_misuses", "Vorhersehbare Fehlanwendungen", True), ("spatial_limits", "Räumliche Grenzen", True), ("temporal_limits", "Zeitliche Grenzen", True), ("operating_conditions", "Betriebsbedingungen", True), ("energy_supply", "Energieversorgung", True), ("mechanical_interfaces", "Mechanische Schnittstellen", False), ("electrical_interfaces", "Elektrische Schnittstellen", False), ("software_interfaces", "Software-Schnittstellen", False), ("pneumatic_hydraulic_interfaces", "Pneumatische/Hydraulische Schnittstellen", False), ("person_groups", "Personengruppen", True), ("qualification_requirements", "Qualifikationsanforderungen", True), ] _FIELD_KEYS = [f[0] for f in LIMIT_FIELDS] _FIELD_LABEL = {f[0]: f[1] for f in LIMIT_FIELDS} _ESSENTIAL = {f[0] for f in LIMIT_FIELDS if f[2]} # Deterministic signal detection — high-confidence facts straight from the text. _INTERFACE_TOKENS = [ "Ethernet", "EtherCAT", "EtherNet/IP", "PROFINET", "Profinet", "PROFIBUS", "Modbus", "CANopen", "CAN", "IO-Link", "OPC UA", "OPC-UA", "Anybus", "RS232", "RS-232", "RS485", "RS-485", "USB", "Bluetooth", "WLAN", "WiFi", "Wi-Fi", "MQTT", "REST", "HTTP", "Sercos", "DeviceNet", "TCP/IP", "TLS", ] _UNIT_RE = re.compile(r"\b\d+(?:[.,]\d+)?\s?(?:V|A|kW|bar|mm|cm|°C|Hz|kg|rpm|Achsen|axes|N|W)\b", re.IGNORECASE) def detect_signals(text: str) -> dict: """Deterministic facts: interfaces present + technical units found.""" t = text or "" low = t.lower() interfaces = [] seen = set() for tok in _INTERFACE_TOKENS: if tok.lower() in low: key = tok.lower().replace("-", "").replace("/", "").replace(".", "") if key not in seen: seen.add(key) interfaces.append(tok) units = sorted({m.group(0).strip() for m in _UNIT_RE.finditer(t)}) return {"interfaces": interfaces, "units": units} def _system_prompt() -> str: keys = ", ".join(_FIELD_KEYS) return ( "Du bist ein Sicherheitsingenieur. Extrahiere aus einem Maschinen-/Produkt-" "Datenblatt die Maschinengrenzen nach ISO 12100. Gib NUR ein JSON-Objekt zurueck:\n" '{"fields": {"": {"value": "", "source": ""}}}\n\n' f"Erlaubte keys: {keys}\n\n" "Regeln:\n" "- Fuelle ein Feld NUR, wenn es im Datenblatt steht. Sonst value=\"\".\n" "- KEINE Werte erfinden, schaetzen oder annehmen.\n" "- 'source' ist ein woertliches Zitat aus dem Text, das den Wert belegt.\n" "- foreseeable_misuses / person_groups / qualification_requirements stehen " "fast nie im Datenblatt → meist leer lassen.\n" "- Nur reines JSON, keine Prosa, keine Code-Fences." ) def parse_grenzen_json(raw: str) -> dict: """Parse the LLM response into {key: {value, source}} for known keys only.""" try: data = json.loads(raw) except (json.JSONDecodeError, TypeError): return {} fields = data.get("fields") if isinstance(data, dict) else None if not isinstance(fields, dict): fields = data if isinstance(data, dict) else {} out = {} for key in _FIELD_KEYS: entry = fields.get(key) if isinstance(entry, dict): val = str(entry.get("value") or "").strip() src = str(entry.get("source") or "").strip() elif isinstance(entry, str): val, src = entry.strip(), "" else: continue if val: out[key] = {"value": val, "source": src} return out _QUESTION = { "general_description": "Was tut das Produkt grundsätzlich? (kurze Beschreibung)", "intended_purpose": "Wofür ist das Produkt bestimmungsgemäß vorgesehen?", "area_of_use": "In welchem Umfeld / welcher Branche wird es eingesetzt?", "operating_modes": "Welche Betriebsarten gibt es (Automatik, Einrichten, Wartung …)?", "foreseeable_misuses": "Welche vernünftigerweise vorhersehbaren Fehlanwendungen gibt es?", "spatial_limits": "Räumliche Grenzen (Abmessungen, Arbeits-/Zugangsbereich)?", "temporal_limits": "Zeitliche Grenzen (Lebensdauer, Wartungsintervalle, Betriebsdauer)?", "operating_conditions": "Betriebsbedingungen (Temperatur, Feuchte, Umgebung)?", "energy_supply": "Energieversorgung (elektrisch, pneumatisch, hydraulisch)?", "person_groups": "Welche Personengruppen interagieren mit dem Produkt?", "qualification_requirements": "Welche Qualifikation brauchen Bediener/Wartung?", } def compute_followups(limits: dict) -> list: """Essential ISO-12100 fields still empty → targeted follow-up questions.""" out = [] for key in _FIELD_KEYS: if key in _ESSENTIAL and not (limits.get(key) or "").strip(): out.append({"key": key, "label": _FIELD_LABEL[key], "question": _QUESTION.get(key, f"Bitte ergänzen: {_FIELD_LABEL[key]}")}) return out def _merge_detected(limits: dict, provenance: dict, signals: dict) -> None: """Backfill electrical/software interfaces from the deterministic detector when the LLM left them empty (high-confidence facts shouldn't be lost).""" ifaces = signals.get("interfaces") or [] if not ifaces: return net = [i for i in ifaces if i.lower() not in ("usb",)] if net and not limits.get("electrical_interfaces"): limits["electrical_interfaces"] = ", ".join(net) provenance["electrical_interfaces"] = "deterministisch erkannt: " + ", ".join(net) async def extract_grenzen(text: str, max_chars: int = 20000) -> dict: """Datasheet text -> {limits, provenance, detected, missing, followup}.""" signals = detect_signals(text or "") limits: dict = {} provenance: dict = {} llm_status = "skipped" # skipped | ok | empty | unavailable excerpt = (text or "")[:max_chars] if len(excerpt) >= 200: try: from compliance.services.llm_cascade import call_with_cascade res = await call_with_cascade( system=_system_prompt(), user=f"Datenblatt-Text:\n\n{excerpt}", min_confidence=0.5, max_tokens=4000, model=_DATASHEET_MODEL, think=False, ) parsed = parse_grenzen_json(res.get("text", "") if isinstance(res, dict) else "") for key, entry in parsed.items(): limits[key] = entry["value"] provenance[key] = entry.get("source", "") llm_status = "ok" if parsed else "empty" except Exception as e: # best-effort: keep the deterministic facts, but surface the failure so # a cold-start/timeout doesn't masquerade as "nothing on the datasheet". logger.warning("datasheet LLM extraction failed: %s (%s)", e, type(e).__name__) llm_status = "unavailable" _merge_detected(limits, provenance, signals) return { "limits": limits, "provenance": provenance, "detected": signals, "llm_status": llm_status, "filled": sorted(limits.keys()), "missing": [k for k in _FIELD_KEYS if not (limits.get(k) or "").strip()], "followup": compute_followups(limits), }