#!/usr/bin/env python3 """ Converts parsed Rule Library entries (R051-R1550) into Go HazardPattern code. Groups rules by (component, energy_source) to produce unique patterns, maps rule fields to existing IACE IDs (component tags, M-IDs, E-IDs), and outputs Go source code for hazard_patterns_extended.go. """ import json import sys from collections import defaultdict # ============================================================================ # Mapping: Rule component names → component library tags # ============================================================================ COMPONENT_TO_TAGS = { "robot_arm": ["moving_part", "rotating_part", "high_force"], "robot_gripper": ["moving_part", "clamping_part", "pinch_point"], "conveyor_belt": ["moving_part", "rotating_part", "entanglement_risk"], "conveyor_system": ["moving_part", "rotating_part", "entanglement_risk"], "roller_conveyor": ["moving_part", "rotating_part", "entanglement_risk"], "rotary_table": ["rotating_part", "high_force"], "rotating_disc": ["rotating_part", "high_speed"], "rotating_spindle": ["rotating_part", "high_speed", "cutting_part"], "linear_axis": ["moving_part", "crush_point"], "gearbox": ["rotating_part", "pinch_point"], "coupling": ["rotating_part"], "tool_changer": ["moving_part", "pinch_point"], "palletizer": ["moving_part", "high_force"], "lifting_device": ["moving_part", "high_force", "gravity_risk"], "lifting_table": ["moving_part", "gravity_risk"], "platform": ["structural_part", "gravity_risk"], "machine_frame": ["structural_part"], # Hydraulic "hydraulic_pump": ["hydraulic_part", "high_pressure"], "hydraulic_cylinder":["hydraulic_part", "moving_part", "high_force", "high_pressure"], "hydraulic_valve": ["hydraulic_part", "high_pressure"], "hydraulic_hose": ["hydraulic_part", "high_pressure"], "hydraulic_system": ["hydraulic_part", "high_pressure"], # Pneumatic "pneumatic_cylinder":["pneumatic_part", "moving_part", "stored_energy"], "pneumatic_line": ["pneumatic_part"], "compressor": ["pneumatic_part", "high_pressure", "noise_source"], "compressed_air_line": ["pneumatic_part"], # Electrical "control_cabinet": ["high_voltage", "electrical_part"], "power_supply": ["high_voltage", "electrical_part"], "transformer": ["high_voltage", "electrical_part"], "cable_harness": ["electrical_part"], "cable_system": ["electrical_part"], # Control "controller": ["has_software", "programmable"], "robot_controller": ["has_software", "programmable"], "hmi": ["has_software", "user_interface"], "hmi_panel": ["has_software", "user_interface"], "control_interface": ["has_software", "user_interface"], "touch_interface": ["has_software", "user_interface"], "firmware": ["has_software", "programmable"], # Sensor "proximity_sensor": ["sensor_part"], "laser_scanner": ["sensor_part"], "camera_system": ["sensor_part"], "vision_camera": ["sensor_part"], # Actuator "actuator": ["actuator_part"], # Safety "diagnostic_module": ["has_software", "safety_device"], "monitoring_system": ["has_software", "safety_device"], # IT/Network "industrial_switch": ["networked", "security_device"], "firewall": ["networked", "security_device"], "router": ["networked", "security_device"], # AI "vision_ai": ["has_ai", "sensor_part"], "ml_model": ["has_ai", "has_software"], "ai_controller": ["has_ai", "has_software", "programmable"], # Thermal "heating_element": ["high_temperature"], "furnace": ["high_temperature"], "furnace_chamber": ["high_temperature"], "cooling_unit": ["high_temperature"], # System-level "system": ["has_software"], } # ============================================================================ # Mapping: Rule energy_source → energy tags # ============================================================================ ENERGY_TO_TAGS = { "mechanical": ["kinetic"], "electrical": ["electrical_energy"], "hydraulic": ["hydraulic_pressure"], "pneumatic": ["pneumatic_pressure"], "thermal": ["thermal"], "software": [], # no energy tag, but component tags cover it "mixed": [], } # ============================================================================ # Mapping: Rule measure names → existing M-IDs # ============================================================================ MEASURE_TO_IDS = { "guard": ["M051", "M054"], "safe_speed_monitoring": ["M003", "M106"], "lockout_tagout": ["M121", "M131"], "energy_isolation": ["M121"], "overload_protection": ["M004"], "safe_stop": ["M106"], "reduced_speed": ["M003", "M106"], "safety_coupling": ["M012"], "mechanical_lock": ["M051"], "tool_locking_system": ["M051"], "load_monitor": ["M004"], "load_monitoring": ["M004"], "load_securing": ["M005"], "handrail": ["M052"], "protective_cover": ["M051"], "protective_shield": ["M051"], "enclosure": ["M051"], "heat_shield": ["M015"], "thermal_insulation": ["M015"], "temperature_monitor": ["M014"], "temperature_monitoring": ["M014"], "pressure_relief_valve": ["M021"], "pressure_limit": ["M021"], "pressure_monitor": ["M022"], "pressure_monitoring": ["M022"], "pressure_switch": ["M022"], "pressure_release": ["M021"], "pressure_isolation": ["M021"], "hose_guard": ["M051"], "hose_protection": ["M051"], "flow_control": ["M022"], "flow_control_valve": ["M022"], "flow_regulator": ["M022"], "leak_detection": ["M022"], "circuit_breaker": ["M061"], "grounding": ["M063"], "insulation_check": ["M062"], "disconnect_switch": ["M061"], "cable_protection": ["M062"], "access_control": ["M101", "M113"], "authentication": ["M101", "M113"], "network_filtering": ["M116"], "network_redundancy": ["M116"], "firewall": ["M116"], "software_validation": ["M103"], "validation_logic": ["M103"], "signed_update": ["M104"], "restart_validation": ["M106"], "confidence_threshold": ["M103"], "drift_monitoring": ["M103"], "human_override": ["M103"], "self_test": ["M106"], "safety_validation": ["M106"], "sensor_redundancy": ["M082"], "redundancy": ["M082"], "calibration": ["M082"], "alarm_test": ["M106"], "connection_validation": ["M062"], "enabling_device": ["M051"], "safety_scanner": ["M051", "M106"], "laser_scanner": ["M082"], "grip_force_monitoring": ["M004"], "reinforcement": ["M005"], "confirmation_dialog": ["M131"], } # ============================================================================ # Mapping: Rule evidence names → existing E-IDs # ============================================================================ EVIDENCE_TO_IDS = { "safety_function_test": ["E08", "E09"], "maintenance_protocol": ["E21"], "inspection": ["E20"], "functional_test": ["E08"], "pressure_test": ["E11"], "insulation_test": ["E10"], "grounding_test": ["E10"], "load_test": ["E08"], "temperature_test": ["E10"], "calibration_protocol": ["E20"], "measurement_protocol": ["E20"], "structural_calculation": ["E07"], "software_test": ["E14"], "system_test": ["E14"], "penetration_test": ["E16"], "security_audit": ["E16", "E17"], "model_validation": ["E15"], "model_test": ["E15"], "validation_report": ["E15"], "usability_test": ["E20"], "failover_test": ["E08"], "signature_verification": ["E18"], } # ============================================================================ # Mapping: Rule hazard names → hazard categories for patterns # ============================================================================ HAZARD_TO_CATEGORIES = { "collision": "mechanical_hazard", "crushing": "mechanical_hazard", "drawing_in": "mechanical_hazard", "entanglement": "mechanical_hazard", "pinching": "mechanical_hazard", "contact_with_moving_parts": "mechanical_hazard", "unexpected_motion": "mechanical_hazard", "unexpected_rotation": "mechanical_hazard", "unexpected_start": "mechanical_hazard", "unsafe_restart": "mechanical_hazard", "sudden_motion": "mechanical_hazard", "sudden_release": "mechanical_hazard", "ejected_parts": "mechanical_hazard", "tool_release": "mechanical_hazard", "falling_tool": "mechanical_hazard", "gear_breakage": "mechanical_hazard", "gear_failure": "mechanical_hazard", "rotational_overload": "mechanical_hazard", "object_drop": "mechanical_hazard", "dropping_object": "mechanical_hazard", "falling_load": "mechanical_hazard", "load_instability": "mechanical_hazard", "structural_failure": "mechanical_hazard", "fall": "mechanical_hazard", "stored_energy": "mechanical_hazard", "electric_shock": "electrical_hazard", "overcurrent": "electrical_hazard", "insulation_damage": "electrical_hazard", "insulation_failure": "electrical_hazard", "incorrect_wiring": "electrical_hazard", "incorrect_connection": "electrical_hazard", "burn": "thermal_hazard", "overheating": "thermal_hazard", "heat_exposure": "thermal_hazard", "coolant_leak": "thermal_hazard", "overpressure": "pneumatic_hydraulic", "hose_burst": "pneumatic_hydraulic", "hose_whip": "pneumatic_hydraulic", "pressure_spike": "pneumatic_hydraulic", "pressure_release": "pneumatic_hydraulic", "logic_error": "software_fault", "incorrect_command": "software_fault", "faulty_update": "update_failure", "system_failure": "software_fault", "safety_function_failure": "software_fault", "network_failure": "communication_failure", "unauthorized_access": "unauthorized_access", "unauthorized_traffic": "unauthorized_access", "false_signal": "sensor_fault", "false_detection": "sensor_fault", "false_object_detection": "sensor_fault", "detection_failure": "sensor_fault", "missed_alarm": "sensor_fault", "model_drift": "model_drift", "misclassification": "ai_misclassification", "unsafe_decision": "ai_misclassification", "incorrect_diagnosis": "software_fault", "incorrect_input": "hmi_error", "operator_error": "hmi_error", } # ============================================================================ # German names for component patterns # ============================================================================ COMPONENT_NAMES_DE = { "robot_arm": "Roboterarm", "robot_gripper": "Greifer", "conveyor_belt": "Foerderband", "conveyor_system": "Foerdersystem", "roller_conveyor": "Rollenfoerderer", "rotary_table": "Drehtisch", "rotating_disc": "Drehscheibe", "rotating_spindle": "Spindel", "linear_axis": "Linearachse", "gearbox": "Getriebe", "coupling": "Kupplung", "tool_changer": "Werkzeugwechsler", "palletizer": "Palettierer", "lifting_device": "Hubwerk", "lifting_table": "Hubtisch", "platform": "Plattform", "machine_frame": "Maschinenrahmen", "hydraulic_pump": "Hydraulikpumpe", "hydraulic_cylinder": "Hydraulikzylinder", "hydraulic_valve": "Hydraulikventil", "hydraulic_hose": "Hydraulikschlauch", "hydraulic_system": "Hydrauliksystem", "pneumatic_cylinder": "Pneumatikzylinder", "pneumatic_line": "Pneumatikleitung", "compressor": "Kompressor", "compressed_air_line": "Druckluftleitung", "control_cabinet": "Schaltschrank", "power_supply": "Stromversorgung", "transformer": "Transformator", "cable_harness": "Kabelbaum", "cable_system": "Kabelsystem", "controller": "Steuerung", "robot_controller": "Robotersteuerung", "hmi": "HMI-Bedienterminal", "hmi_panel": "HMI-Panel", "control_interface": "Steuerungsschnittstelle", "touch_interface": "Touch-Bedienfeld", "firmware": "Firmware", "proximity_sensor": "Naeherungssensor", "laser_scanner": "Laserscanner", "camera_system": "Kamerasystem", "vision_camera": "Vision-Kamera", "actuator": "Aktor", "diagnostic_module": "Diagnosemodul", "monitoring_system": "Ueberwachungssystem", "industrial_switch": "Industrie-Switch", "firewall": "Firewall", "router": "Router", "vision_ai": "KI-Bilderkennung", "ml_model": "ML-Modell", "ai_controller": "KI-Steuerung", "heating_element": "Heizelement", "furnace": "Ofen", "furnace_chamber": "Ofenkammer", "cooling_unit": "Kuehlgeraet", "system": "Gesamtsystem", } ENERGY_NAMES_DE = { "mechanical": "mechanisch", "electrical": "elektrisch", "hydraulic": "hydraulisch", "pneumatic": "pneumatisch", "thermal": "thermisch", "software": "Software", "mixed": "gemischt", } def main(): with open("/Users/benjaminadmin/Projekte/breakpilot-compliance/scripts/parsed-rule-library.json") as f: data = json.load(f) rules = data["rules"] print(f"Loaded {len(rules)} rules") # Group rules by (component, energy_source) to create unique patterns groups = defaultdict(list) for rule in rules: key = (rule["component"], rule["energy_source"]) groups[key].append(rule) print(f"Grouped into {len(groups)} unique (component, energy_source) combinations") # Filter out groups whose component tags are already covered by existing HP001-HP044 # We keep all groups since the Rule Library adds lifecycle-phase specificity # and more detailed measure/evidence mappings patterns = [] pattern_id = 45 # Start at HP045 for (component, energy), group_rules in sorted(groups.items()): comp_tags = COMPONENT_TO_TAGS.get(component, []) if not comp_tags: print(f" WARN: No tag mapping for component '{component}', skipping {len(group_rules)} rules") continue energy_tags = ENERGY_TO_TAGS.get(energy, []) # Collect all hazard categories from group hazard_cats = set() for r in group_rules: for h in r["hazards"]: cat = HAZARD_TO_CATEGORIES.get(h) if cat: hazard_cats.add(cat) if not hazard_cats: print(f" WARN: No hazard categories for ({component}, {energy}), skipping") continue # Collect all measure IDs measure_ids = set() for r in group_rules: for m in r["recommended_measures"]: ids = MEASURE_TO_IDS.get(m, []) measure_ids.update(ids) # Collect all evidence IDs evidence_ids = set() for r in group_rules: for e in r["required_evidence"]: ids = EVIDENCE_TO_IDS.get(e, []) evidence_ids.update(ids) # Collect lifecycle phases lifecycles = set() for r in group_rules: lifecycles.add(r["lifecycle_phase"]) # Determine priority based on hazard severity priority = 70 if "mechanical_hazard" in hazard_cats: priority = 80 if "electrical_hazard" in hazard_cats: priority = 80 if any(c in hazard_cats for c in ["ai_misclassification", "model_drift"]): priority = 75 if any(c in hazard_cats for c in ["unauthorized_access"]): priority = 85 comp_name_de = COMPONENT_NAMES_DE.get(component, component) energy_name_de = ENERGY_NAMES_DE.get(energy, energy) name_de = f"{comp_name_de} — {energy_name_de}" name_en = f"{component.replace('_', ' ').title()} — {energy}" pattern = { "id": f"HP{pattern_id:03d}", "name_de": name_de, "name_en": name_en, "required_component_tags": sorted(comp_tags), "required_energy_tags": sorted(energy_tags), "required_lifecycle_phases": sorted(lifecycles) if len(lifecycles) <= 3 else [], "excluded_component_tags": [], "generated_hazard_cats": sorted(hazard_cats), "suggested_measure_ids": sorted(measure_ids), "suggested_evidence_ids": sorted(evidence_ids), "priority": priority, "source_rules": [r["rule_id"] for r in group_rules], } patterns.append(pattern) pattern_id += 1 print(f"\nGenerated {len(patterns)} new HazardPatterns (HP045-HP{pattern_id-1:03d})") # Generate Go code go_lines = [] go_lines.append("package iace") go_lines.append("") go_lines.append(f"// GetExtendedHazardPatterns returns {len(patterns)} additional patterns") go_lines.append("// derived from the Rule Library documents (R051-R1550).") go_lines.append("// These supplement the 44 built-in patterns in hazard_patterns.go.") go_lines.append("func GetExtendedHazardPatterns() []HazardPattern {") go_lines.append("\treturn []HazardPattern{") for p in patterns: go_lines.append(f"\t\t{{") go_lines.append(f'\t\t\tID: "{p["id"]}", NameDE: "{p["name_de"]}", NameEN: "{p["name_en"]}",') go_lines.append(f'\t\t\tRequiredComponentTags: {go_string_slice(p["required_component_tags"])},') go_lines.append(f'\t\t\tRequiredEnergyTags: {go_string_slice(p["required_energy_tags"])},') if p["required_lifecycle_phases"]: go_lines.append(f'\t\t\tRequiredLifecycles: {go_string_slice(p["required_lifecycle_phases"])},') go_lines.append(f'\t\t\tGeneratedHazardCats: {go_string_slice(p["generated_hazard_cats"])},') go_lines.append(f'\t\t\tSuggestedMeasureIDs: {go_string_slice(p["suggested_measure_ids"])},') go_lines.append(f'\t\t\tSuggestedEvidenceIDs: {go_string_slice(p["suggested_evidence_ids"])},') go_lines.append(f'\t\t\tPriority: {p["priority"]},') go_lines.append(f'\t\t\t// Source: {", ".join(p["source_rules"])}') go_lines.append(f"\t\t}},") go_lines.append("\t}") go_lines.append("}") go_lines.append("") go_code = "\n".join(go_lines) output_path = "/Users/benjaminadmin/Projekte/breakpilot-compliance/ai-compliance-sdk/internal/iace/hazard_patterns_extended.go" with open(output_path, "w") as f: f.write(go_code) print(f"\nGo code written to: {output_path}") print(f"Patterns: {len(patterns)}") # Stats all_measure_ids = set() all_evidence_ids = set() all_hazard_cats = set() for p in patterns: all_measure_ids.update(p["suggested_measure_ids"]) all_evidence_ids.update(p["suggested_evidence_ids"]) all_hazard_cats.update(p["generated_hazard_cats"]) print(f"Unique hazard categories: {len(all_hazard_cats)}: {sorted(all_hazard_cats)}") print(f"Unique measure IDs referenced: {len(all_measure_ids)}: {sorted(all_measure_ids)}") print(f"Unique evidence IDs referenced: {len(all_evidence_ids)}: {sorted(all_evidence_ids)}") def go_string_slice(items): if not items: return "[]string{}" quoted = ", ".join(f'"{x}"' for x in items) return f"[]string{{{quoted}}}" if __name__ == "__main__": main()