From cfdc5fe2777ca3ccbd517bcff2bc846483485e50 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 16 Jun 2026 19:06:07 +0200 Subject: [PATCH] =?UTF-8?q?feat(cra):=20Datenblatt=E2=86=92Grenzen-Extrakt?= =?UTF-8?q?or=20(hybrid,=20lokales=2035B)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hybrid-Extraktion Datenblatt → IACE Grenzen (ISO 12100): deterministischer Detektor (Schnittstellen/Einheiten per Regex) + lokales 35B via llm_cascade (Qwen-lokal-first) fuer die semantische Zuordnung auf die echten LimitsFormData- Keys. Nichts erfinden: Feld nicht im Text → leer + Quellen-Zitat je Feld. Essenzielle ISO-12100-Felder, die leer bleiben → gezielte Rückfragen (foreseeable_misuses, person_groups, qualification, temporal_limits …). Endpoint POST /api/v1/cra/extract-datasheet. 13 Tests gruen (reine Teile). Co-Authored-By: Claude Opus 4.7 --- .../compliance/api/cra_assess_routes.py | 13 ++ .../services/cra_datasheet_extractor.py | 177 ++++++++++++++++++ .../tests/test_cra_datasheet_extractor.py | 78 ++++++++ 3 files changed, 268 insertions(+) create mode 100644 backend-compliance/compliance/services/cra_datasheet_extractor.py create mode 100644 backend-compliance/tests/test_cra_datasheet_extractor.py diff --git a/backend-compliance/compliance/api/cra_assess_routes.py b/backend-compliance/compliance/api/cra_assess_routes.py index f0832c7e..0bf5883c 100644 --- a/backend-compliance/compliance/api/cra_assess_routes.py +++ b/backend-compliance/compliance/api/cra_assess_routes.py @@ -18,6 +18,7 @@ from compliance.services.cra_finding_mapper import assess_findings_payload from compliance.services.cra_applicability import ( compute_verdict, compute_machinery_verdict, maturity as evidence_maturity, MACHINE_INTEGRATOR, ) +from compliance.services.cra_datasheet_extractor import extract_grenzen from compliance.services.scanner_mcp_client import fetch_findings from compliance.services.cra_snapshot_store import save_snapshot, list_snapshots, get_snapshot from compliance.services.cra_use_case_controls import enrich_findings_with_breadth @@ -137,6 +138,18 @@ async def assess_from_scanner(body: ScannerPullRequest): return result +class DatasheetRequest(BaseModel): + text: str = "" + + +@router.post("/extract-datasheet") +async def extract_datasheet(body: DatasheetRequest): + """Datasheet text -> IACE 'Grenzen' draft (limits + provenance) + the + essential ISO-12100 fields still missing as targeted follow-up questions. + Hybrid: deterministic interface/unit detector + local 35B (llm_cascade).""" + return await extract_grenzen(body.text) + + @router.get("/scanner-repos") async def scanner_repos(): """Distinct repo_ids the scanner has findings for, so the UI can pick which diff --git a/backend-compliance/compliance/services/cra_datasheet_extractor.py b/backend-compliance/compliance/services/cra_datasheet_extractor.py new file mode 100644 index 00000000..2af71a10 --- /dev/null +++ b/backend-compliance/compliance/services/cra_datasheet_extractor.py @@ -0,0 +1,177 @@ +"""Datasheet -> IACE 'Grenzen' (ISO 12100 machine limits) extraction. + +Hybrid: a deterministic pre-pass pulls high-confidence facts (interfaces, units) +straight from the text; the local LLM (Ollama 35B via llm_cascade, local-first) +does the semantic mapping into the IACE LimitsFormData keys. The LLM must NOT +invent values — any field not supported by the text stays empty and becomes a +follow-up question. Each filled field carries a source snippet (auditability). + +Pure + testable: detect_signals / parse_grenzen_json / compute_followups. The +async extract_grenzen() wraps the LLM call (llm_cascade, same as vendor extractor). +""" +import json +import re +from typing import Optional + +# IACE Grenzen field keys (must match admin LimitsFormData). label + whether it +# is essential for a usable risk assessment (=> asked as follow-up if empty). +LIMIT_FIELDS = [ + ("machine_designation", "Maschinenbezeichnung", False), + ("machine_type", "Maschinentyp", False), + ("manufacturer", "Hersteller", False), + ("year_of_construction", "Baujahr", False), + ("general_description", "Allgemeine Beschreibung", True), + ("intended_purpose", "Verwendungszweck", True), + ("area_of_use", "Einsatzbereich", True), + ("operating_modes", "Betriebsarten", True), + ("variants", "Varianten", False), + ("foreseeable_misuses", "Vorhersehbare Fehlanwendungen", True), + ("spatial_limits", "Räumliche Grenzen", True), + ("temporal_limits", "Zeitliche Grenzen", True), + ("operating_conditions", "Betriebsbedingungen", True), + ("energy_supply", "Energieversorgung", True), + ("mechanical_interfaces", "Mechanische Schnittstellen", False), + ("electrical_interfaces", "Elektrische Schnittstellen", False), + ("software_interfaces", "Software-Schnittstellen", False), + ("pneumatic_hydraulic_interfaces", "Pneumatische/Hydraulische Schnittstellen", False), + ("person_groups", "Personengruppen", True), + ("qualification_requirements", "Qualifikationsanforderungen", True), +] +_FIELD_KEYS = [f[0] for f in LIMIT_FIELDS] +_FIELD_LABEL = {f[0]: f[1] for f in LIMIT_FIELDS} +_ESSENTIAL = {f[0] for f in LIMIT_FIELDS if f[2]} + +# Deterministic signal detection — high-confidence facts straight from the text. +_INTERFACE_TOKENS = [ + "Ethernet", "EtherCAT", "EtherNet/IP", "PROFINET", "Profinet", "PROFIBUS", "Modbus", + "CANopen", "CAN", "IO-Link", "OPC UA", "OPC-UA", "Anybus", "RS232", "RS-232", "RS485", + "RS-485", "USB", "Bluetooth", "WLAN", "WiFi", "Wi-Fi", "MQTT", "REST", "HTTP", + "Sercos", "DeviceNet", "TCP/IP", "TLS", +] +_UNIT_RE = re.compile(r"\b\d+(?:[.,]\d+)?\s?(?:V|A|kW|bar|mm|cm|°C|Hz|kg|rpm|Achsen|axes|N|W)\b", re.IGNORECASE) + + +def detect_signals(text: str) -> dict: + """Deterministic facts: interfaces present + technical units found.""" + t = text or "" + low = t.lower() + interfaces = [] + seen = set() + for tok in _INTERFACE_TOKENS: + if tok.lower() in low: + key = tok.lower().replace("-", "").replace("/", "").replace(".", "") + if key not in seen: + seen.add(key) + interfaces.append(tok) + units = sorted({m.group(0).strip() for m in _UNIT_RE.finditer(t)}) + return {"interfaces": interfaces, "units": units} + + +def _system_prompt() -> str: + keys = ", ".join(_FIELD_KEYS) + return ( + "Du bist ein Sicherheitsingenieur. Extrahiere aus einem Maschinen-/Produkt-" + "Datenblatt die Maschinengrenzen nach ISO 12100. Gib NUR ein JSON-Objekt zurueck:\n" + '{"fields": {"": {"value": "", "source": ""}}}\n\n' + f"Erlaubte keys: {keys}\n\n" + "Regeln:\n" + "- Fuelle ein Feld NUR, wenn es im Datenblatt steht. Sonst value=\"\".\n" + "- KEINE Werte erfinden, schaetzen oder annehmen.\n" + "- 'source' ist ein woertliches Zitat aus dem Text, das den Wert belegt.\n" + "- foreseeable_misuses / person_groups / qualification_requirements stehen " + "fast nie im Datenblatt → meist leer lassen.\n" + "- Nur reines JSON, keine Prosa, keine Code-Fences." + ) + + +def parse_grenzen_json(raw: str) -> dict: + """Parse the LLM response into {key: {value, source}} for known keys only.""" + try: + data = json.loads(raw) + except (json.JSONDecodeError, TypeError): + return {} + fields = data.get("fields") if isinstance(data, dict) else None + if not isinstance(fields, dict): + fields = data if isinstance(data, dict) else {} + out = {} + for key in _FIELD_KEYS: + entry = fields.get(key) + if isinstance(entry, dict): + val = str(entry.get("value") or "").strip() + src = str(entry.get("source") or "").strip() + elif isinstance(entry, str): + val, src = entry.strip(), "" + else: + continue + if val: + out[key] = {"value": val, "source": src} + return out + + +_QUESTION = { + "general_description": "Was tut das Produkt grundsätzlich? (kurze Beschreibung)", + "intended_purpose": "Wofür ist das Produkt bestimmungsgemäß vorgesehen?", + "area_of_use": "In welchem Umfeld / welcher Branche wird es eingesetzt?", + "operating_modes": "Welche Betriebsarten gibt es (Automatik, Einrichten, Wartung …)?", + "foreseeable_misuses": "Welche vernünftigerweise vorhersehbaren Fehlanwendungen gibt es?", + "spatial_limits": "Räumliche Grenzen (Abmessungen, Arbeits-/Zugangsbereich)?", + "temporal_limits": "Zeitliche Grenzen (Lebensdauer, Wartungsintervalle, Betriebsdauer)?", + "operating_conditions": "Betriebsbedingungen (Temperatur, Feuchte, Umgebung)?", + "energy_supply": "Energieversorgung (elektrisch, pneumatisch, hydraulisch)?", + "person_groups": "Welche Personengruppen interagieren mit dem Produkt?", + "qualification_requirements": "Welche Qualifikation brauchen Bediener/Wartung?", +} + + +def compute_followups(limits: dict) -> list: + """Essential ISO-12100 fields still empty → targeted follow-up questions.""" + out = [] + for key in _FIELD_KEYS: + if key in _ESSENTIAL and not (limits.get(key) or "").strip(): + out.append({"key": key, "label": _FIELD_LABEL[key], + "question": _QUESTION.get(key, f"Bitte ergänzen: {_FIELD_LABEL[key]}")}) + return out + + +def _merge_detected(limits: dict, provenance: dict, signals: dict) -> None: + """Backfill electrical/software interfaces from the deterministic detector + when the LLM left them empty (high-confidence facts shouldn't be lost).""" + ifaces = signals.get("interfaces") or [] + if not ifaces: + return + net = [i for i in ifaces if i.lower() not in ("usb",)] + if net and not limits.get("electrical_interfaces"): + limits["electrical_interfaces"] = ", ".join(net) + provenance["electrical_interfaces"] = "deterministisch erkannt: " + ", ".join(net) + + +async def extract_grenzen(text: str, max_chars: int = 20000) -> dict: + """Datasheet text -> {limits, provenance, detected, missing, followup}.""" + signals = detect_signals(text or "") + limits: dict = {} + provenance: dict = {} + excerpt = (text or "")[:max_chars] + if len(excerpt) >= 200: + try: + from compliance.services.llm_cascade import call_with_cascade + res = await call_with_cascade( + system=_system_prompt(), + user=f"Datenblatt-Text:\n\n{excerpt}", + min_confidence=0.5, max_tokens=4000, + ) + parsed = parse_grenzen_json(res.get("text", "") if isinstance(res, dict) else "") + for key, entry in parsed.items(): + limits[key] = entry["value"] + provenance[key] = entry.get("source", "") + except Exception: + pass # extraction is best-effort; fall back to detector + follow-ups + + _merge_detected(limits, provenance, signals) + return { + "limits": limits, + "provenance": provenance, + "detected": signals, + "filled": sorted(limits.keys()), + "missing": [k for k in _FIELD_KEYS if not (limits.get(k) or "").strip()], + "followup": compute_followups(limits), + } diff --git a/backend-compliance/tests/test_cra_datasheet_extractor.py b/backend-compliance/tests/test_cra_datasheet_extractor.py new file mode 100644 index 00000000..b1c9d1e6 --- /dev/null +++ b/backend-compliance/tests/test_cra_datasheet_extractor.py @@ -0,0 +1,78 @@ +"""Datasheet -> Grenzen extraction (deterministic + parser parts).""" +from compliance.services.cra_datasheet_extractor import ( + detect_signals, parse_grenzen_json, compute_followups, _merge_detected, _ESSENTIAL, +) + +OWIS = ( + "PS 90+ Universelle Positioniersteuerung, bis 9 Achsen. Schnittstellen: Ethernet, " + "USB, RS232, optional Anybus (Modbus/TCP). Versorgung 24 V. SDK fuer C/C++/C#/LabView." +) + + +class TestDetectSignals: + def test_interfaces_detected(self): + s = detect_signals(OWIS) + for tok in ("Ethernet", "USB", "RS232", "Modbus", "Anybus"): + assert tok in s["interfaces"], tok + + def test_units_detected(self): + s = detect_signals(OWIS) + assert any("24" in u and "V" in u for u in s["units"]) + + def test_empty_text(self): + s = detect_signals("") + assert s["interfaces"] == [] and s["units"] == [] + + def test_no_duplicate_rs232_variants(self): + s = detect_signals("RS232 and RS-232 ports") + rs = [i for i in s["interfaces"] if i.lower().startswith("rs")] + assert len(rs) == 1 + + +class TestParse: + def test_parses_fields_wrapper(self): + raw = '{"fields": {"machine_designation": {"value": "PS 90+", "source": "PS 90+ ..."}, "intended_purpose": {"value": "", "source": ""}}}' + out = parse_grenzen_json(raw) + assert out["machine_designation"]["value"] == "PS 90+" + assert "intended_purpose" not in out # empty dropped + + def test_unknown_keys_ignored(self): + out = parse_grenzen_json('{"fields": {"nonsense": {"value": "x"}}}') + assert out == {} + + def test_string_entry_tolerated(self): + out = parse_grenzen_json('{"fields": {"manufacturer": "OWIS"}}') + assert out["manufacturer"]["value"] == "OWIS" + + def test_bad_json(self): + assert parse_grenzen_json("not json") == {} + assert parse_grenzen_json("") == {} + + +class TestFollowups: + def test_empty_limits_asks_all_essentials(self): + fu = compute_followups({}) + assert {f["key"] for f in fu} == _ESSENTIAL + assert all(f["question"] for f in fu) + + def test_filled_essential_not_asked(self): + fu = compute_followups({"intended_purpose": "Positionieren"}) + assert "intended_purpose" not in {f["key"] for f in fu} + + def test_blank_string_still_asked(self): + fu = compute_followups({"intended_purpose": " "}) + assert "intended_purpose" in {f["key"] for f in fu} + + +class TestMergeDetected: + def test_backfills_electrical_interfaces_excluding_usb(self): + limits, prov = {}, {} + _merge_detected(limits, prov, {"interfaces": ["Ethernet", "Modbus", "USB"]}) + assert "Ethernet" in limits["electrical_interfaces"] + assert "USB" not in limits["electrical_interfaces"] + assert prov["electrical_interfaces"].startswith("deterministisch") + + def test_does_not_overwrite_llm_value(self): + limits = {"electrical_interfaces": "PROFINET (vom LLM)"} + _merge_detected(limits, {}, {"interfaces": ["Ethernet"]}) + assert limits["electrical_interfaces"] == "PROFINET (vom LLM)"