Files
breakpilot-compliance/scripts/generate-rule-patterns.py
Benjamin Admin 5adb1c5f16
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 39s
CI/CD / test-python-backend-compliance (push) Successful in 38s
CI/CD / test-python-document-crawler (push) Successful in 25s
CI/CD / test-python-dsms-gateway (push) Successful in 20s
CI/CD / validate-canonical-controls (push) Successful in 14s
CI/CD / Deploy (push) Successful in 2s
feat(iace): integrate Rule Library as 58 extended hazard patterns (HP045-HP102)
Parsed 171 explicit rules from 4 Rule Library Word documents (R051-R1550),
deduplicated into 58 unique (component, energy_source) patterns, and mapped
to existing IACE IDs (component tags, M-IDs, E-IDs).

Changes:
- hazard_patterns_extended.go: 58 new patterns derived from Rule Library
- pattern_engine.go: combines builtin (44) + extended (58) = 102 total patterns
- iace_handler.go: ListHazardPatterns returns all 102 patterns
- iace.md: updated documentation for 102 patterns
- scripts/generate-rule-patterns.py: mapping + Go code generator
- scripts/parsed-rule-library.json: extracted rule data

Tests: 132 passing (9 new extended pattern tests)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 11:24:07 +01:00

493 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Converts parsed Rule Library entries (R051-R1550) into Go HazardPattern code.
Groups rules by (component, energy_source) to produce unique patterns,
maps rule fields to existing IACE IDs (component tags, M-IDs, E-IDs),
and outputs Go source code for hazard_patterns_extended.go.
"""
import json
import sys
from collections import defaultdict
# ============================================================================
# Mapping: Rule component names → component library tags
# ============================================================================
COMPONENT_TO_TAGS = {
"robot_arm": ["moving_part", "rotating_part", "high_force"],
"robot_gripper": ["moving_part", "clamping_part", "pinch_point"],
"conveyor_belt": ["moving_part", "rotating_part", "entanglement_risk"],
"conveyor_system": ["moving_part", "rotating_part", "entanglement_risk"],
"roller_conveyor": ["moving_part", "rotating_part", "entanglement_risk"],
"rotary_table": ["rotating_part", "high_force"],
"rotating_disc": ["rotating_part", "high_speed"],
"rotating_spindle": ["rotating_part", "high_speed", "cutting_part"],
"linear_axis": ["moving_part", "crush_point"],
"gearbox": ["rotating_part", "pinch_point"],
"coupling": ["rotating_part"],
"tool_changer": ["moving_part", "pinch_point"],
"palletizer": ["moving_part", "high_force"],
"lifting_device": ["moving_part", "high_force", "gravity_risk"],
"lifting_table": ["moving_part", "gravity_risk"],
"platform": ["structural_part", "gravity_risk"],
"machine_frame": ["structural_part"],
# Hydraulic
"hydraulic_pump": ["hydraulic_part", "high_pressure"],
"hydraulic_cylinder":["hydraulic_part", "moving_part", "high_force", "high_pressure"],
"hydraulic_valve": ["hydraulic_part", "high_pressure"],
"hydraulic_hose": ["hydraulic_part", "high_pressure"],
"hydraulic_system": ["hydraulic_part", "high_pressure"],
# Pneumatic
"pneumatic_cylinder":["pneumatic_part", "moving_part", "stored_energy"],
"pneumatic_line": ["pneumatic_part"],
"compressor": ["pneumatic_part", "high_pressure", "noise_source"],
"compressed_air_line": ["pneumatic_part"],
# Electrical
"control_cabinet": ["high_voltage", "electrical_part"],
"power_supply": ["high_voltage", "electrical_part"],
"transformer": ["high_voltage", "electrical_part"],
"cable_harness": ["electrical_part"],
"cable_system": ["electrical_part"],
# Control
"controller": ["has_software", "programmable"],
"robot_controller": ["has_software", "programmable"],
"hmi": ["has_software", "user_interface"],
"hmi_panel": ["has_software", "user_interface"],
"control_interface": ["has_software", "user_interface"],
"touch_interface": ["has_software", "user_interface"],
"firmware": ["has_software", "programmable"],
# Sensor
"proximity_sensor": ["sensor_part"],
"laser_scanner": ["sensor_part"],
"camera_system": ["sensor_part"],
"vision_camera": ["sensor_part"],
# Actuator
"actuator": ["actuator_part"],
# Safety
"diagnostic_module": ["has_software", "safety_device"],
"monitoring_system": ["has_software", "safety_device"],
# IT/Network
"industrial_switch": ["networked", "security_device"],
"firewall": ["networked", "security_device"],
"router": ["networked", "security_device"],
# AI
"vision_ai": ["has_ai", "sensor_part"],
"ml_model": ["has_ai", "has_software"],
"ai_controller": ["has_ai", "has_software", "programmable"],
# Thermal
"heating_element": ["high_temperature"],
"furnace": ["high_temperature"],
"furnace_chamber": ["high_temperature"],
"cooling_unit": ["high_temperature"],
# System-level
"system": ["has_software"],
}
# ============================================================================
# Mapping: Rule energy_source → energy tags
# ============================================================================
ENERGY_TO_TAGS = {
"mechanical": ["kinetic"],
"electrical": ["electrical_energy"],
"hydraulic": ["hydraulic_pressure"],
"pneumatic": ["pneumatic_pressure"],
"thermal": ["thermal"],
"software": [], # no energy tag, but component tags cover it
"mixed": [],
}
# ============================================================================
# Mapping: Rule measure names → existing M-IDs
# ============================================================================
MEASURE_TO_IDS = {
"guard": ["M051", "M054"],
"safe_speed_monitoring": ["M003", "M106"],
"lockout_tagout": ["M121", "M131"],
"energy_isolation": ["M121"],
"overload_protection": ["M004"],
"safe_stop": ["M106"],
"reduced_speed": ["M003", "M106"],
"safety_coupling": ["M012"],
"mechanical_lock": ["M051"],
"tool_locking_system": ["M051"],
"load_monitor": ["M004"],
"load_monitoring": ["M004"],
"load_securing": ["M005"],
"handrail": ["M052"],
"protective_cover": ["M051"],
"protective_shield": ["M051"],
"enclosure": ["M051"],
"heat_shield": ["M015"],
"thermal_insulation": ["M015"],
"temperature_monitor": ["M014"],
"temperature_monitoring": ["M014"],
"pressure_relief_valve": ["M021"],
"pressure_limit": ["M021"],
"pressure_monitor": ["M022"],
"pressure_monitoring": ["M022"],
"pressure_switch": ["M022"],
"pressure_release": ["M021"],
"pressure_isolation": ["M021"],
"hose_guard": ["M051"],
"hose_protection": ["M051"],
"flow_control": ["M022"],
"flow_control_valve": ["M022"],
"flow_regulator": ["M022"],
"leak_detection": ["M022"],
"circuit_breaker": ["M061"],
"grounding": ["M063"],
"insulation_check": ["M062"],
"disconnect_switch": ["M061"],
"cable_protection": ["M062"],
"access_control": ["M101", "M113"],
"authentication": ["M101", "M113"],
"network_filtering": ["M116"],
"network_redundancy": ["M116"],
"firewall": ["M116"],
"software_validation": ["M103"],
"validation_logic": ["M103"],
"signed_update": ["M104"],
"restart_validation": ["M106"],
"confidence_threshold": ["M103"],
"drift_monitoring": ["M103"],
"human_override": ["M103"],
"self_test": ["M106"],
"safety_validation": ["M106"],
"sensor_redundancy": ["M082"],
"redundancy": ["M082"],
"calibration": ["M082"],
"alarm_test": ["M106"],
"connection_validation": ["M062"],
"enabling_device": ["M051"],
"safety_scanner": ["M051", "M106"],
"laser_scanner": ["M082"],
"grip_force_monitoring": ["M004"],
"reinforcement": ["M005"],
"confirmation_dialog": ["M131"],
}
# ============================================================================
# Mapping: Rule evidence names → existing E-IDs
# ============================================================================
EVIDENCE_TO_IDS = {
"safety_function_test": ["E08", "E09"],
"maintenance_protocol": ["E21"],
"inspection": ["E20"],
"functional_test": ["E08"],
"pressure_test": ["E11"],
"insulation_test": ["E10"],
"grounding_test": ["E10"],
"load_test": ["E08"],
"temperature_test": ["E10"],
"calibration_protocol": ["E20"],
"measurement_protocol": ["E20"],
"structural_calculation": ["E07"],
"software_test": ["E14"],
"system_test": ["E14"],
"penetration_test": ["E16"],
"security_audit": ["E16", "E17"],
"model_validation": ["E15"],
"model_test": ["E15"],
"validation_report": ["E15"],
"usability_test": ["E20"],
"failover_test": ["E08"],
"signature_verification": ["E18"],
}
# ============================================================================
# Mapping: Rule hazard names → hazard categories for patterns
# ============================================================================
HAZARD_TO_CATEGORIES = {
"collision": "mechanical_hazard",
"crushing": "mechanical_hazard",
"drawing_in": "mechanical_hazard",
"entanglement": "mechanical_hazard",
"pinching": "mechanical_hazard",
"contact_with_moving_parts": "mechanical_hazard",
"unexpected_motion": "mechanical_hazard",
"unexpected_rotation": "mechanical_hazard",
"unexpected_start": "mechanical_hazard",
"unsafe_restart": "mechanical_hazard",
"sudden_motion": "mechanical_hazard",
"sudden_release": "mechanical_hazard",
"ejected_parts": "mechanical_hazard",
"tool_release": "mechanical_hazard",
"falling_tool": "mechanical_hazard",
"gear_breakage": "mechanical_hazard",
"gear_failure": "mechanical_hazard",
"rotational_overload": "mechanical_hazard",
"object_drop": "mechanical_hazard",
"dropping_object": "mechanical_hazard",
"falling_load": "mechanical_hazard",
"load_instability": "mechanical_hazard",
"structural_failure": "mechanical_hazard",
"fall": "mechanical_hazard",
"stored_energy": "mechanical_hazard",
"electric_shock": "electrical_hazard",
"overcurrent": "electrical_hazard",
"insulation_damage": "electrical_hazard",
"insulation_failure": "electrical_hazard",
"incorrect_wiring": "electrical_hazard",
"incorrect_connection": "electrical_hazard",
"burn": "thermal_hazard",
"overheating": "thermal_hazard",
"heat_exposure": "thermal_hazard",
"coolant_leak": "thermal_hazard",
"overpressure": "pneumatic_hydraulic",
"hose_burst": "pneumatic_hydraulic",
"hose_whip": "pneumatic_hydraulic",
"pressure_spike": "pneumatic_hydraulic",
"pressure_release": "pneumatic_hydraulic",
"logic_error": "software_fault",
"incorrect_command": "software_fault",
"faulty_update": "update_failure",
"system_failure": "software_fault",
"safety_function_failure": "software_fault",
"network_failure": "communication_failure",
"unauthorized_access": "unauthorized_access",
"unauthorized_traffic": "unauthorized_access",
"false_signal": "sensor_fault",
"false_detection": "sensor_fault",
"false_object_detection": "sensor_fault",
"detection_failure": "sensor_fault",
"missed_alarm": "sensor_fault",
"model_drift": "model_drift",
"misclassification": "ai_misclassification",
"unsafe_decision": "ai_misclassification",
"incorrect_diagnosis": "software_fault",
"incorrect_input": "hmi_error",
"operator_error": "hmi_error",
}
# ============================================================================
# German names for component patterns
# ============================================================================
COMPONENT_NAMES_DE = {
"robot_arm": "Roboterarm",
"robot_gripper": "Greifer",
"conveyor_belt": "Foerderband",
"conveyor_system": "Foerdersystem",
"roller_conveyor": "Rollenfoerderer",
"rotary_table": "Drehtisch",
"rotating_disc": "Drehscheibe",
"rotating_spindle": "Spindel",
"linear_axis": "Linearachse",
"gearbox": "Getriebe",
"coupling": "Kupplung",
"tool_changer": "Werkzeugwechsler",
"palletizer": "Palettierer",
"lifting_device": "Hubwerk",
"lifting_table": "Hubtisch",
"platform": "Plattform",
"machine_frame": "Maschinenrahmen",
"hydraulic_pump": "Hydraulikpumpe",
"hydraulic_cylinder": "Hydraulikzylinder",
"hydraulic_valve": "Hydraulikventil",
"hydraulic_hose": "Hydraulikschlauch",
"hydraulic_system": "Hydrauliksystem",
"pneumatic_cylinder": "Pneumatikzylinder",
"pneumatic_line": "Pneumatikleitung",
"compressor": "Kompressor",
"compressed_air_line": "Druckluftleitung",
"control_cabinet": "Schaltschrank",
"power_supply": "Stromversorgung",
"transformer": "Transformator",
"cable_harness": "Kabelbaum",
"cable_system": "Kabelsystem",
"controller": "Steuerung",
"robot_controller": "Robotersteuerung",
"hmi": "HMI-Bedienterminal",
"hmi_panel": "HMI-Panel",
"control_interface": "Steuerungsschnittstelle",
"touch_interface": "Touch-Bedienfeld",
"firmware": "Firmware",
"proximity_sensor": "Naeherungssensor",
"laser_scanner": "Laserscanner",
"camera_system": "Kamerasystem",
"vision_camera": "Vision-Kamera",
"actuator": "Aktor",
"diagnostic_module": "Diagnosemodul",
"monitoring_system": "Ueberwachungssystem",
"industrial_switch": "Industrie-Switch",
"firewall": "Firewall",
"router": "Router",
"vision_ai": "KI-Bilderkennung",
"ml_model": "ML-Modell",
"ai_controller": "KI-Steuerung",
"heating_element": "Heizelement",
"furnace": "Ofen",
"furnace_chamber": "Ofenkammer",
"cooling_unit": "Kuehlgeraet",
"system": "Gesamtsystem",
}
ENERGY_NAMES_DE = {
"mechanical": "mechanisch",
"electrical": "elektrisch",
"hydraulic": "hydraulisch",
"pneumatic": "pneumatisch",
"thermal": "thermisch",
"software": "Software",
"mixed": "gemischt",
}
def main():
with open("/Users/benjaminadmin/Projekte/breakpilot-compliance/scripts/parsed-rule-library.json") as f:
data = json.load(f)
rules = data["rules"]
print(f"Loaded {len(rules)} rules")
# Group rules by (component, energy_source) to create unique patterns
groups = defaultdict(list)
for rule in rules:
key = (rule["component"], rule["energy_source"])
groups[key].append(rule)
print(f"Grouped into {len(groups)} unique (component, energy_source) combinations")
# Filter out groups whose component tags are already covered by existing HP001-HP044
# We keep all groups since the Rule Library adds lifecycle-phase specificity
# and more detailed measure/evidence mappings
patterns = []
pattern_id = 45 # Start at HP045
for (component, energy), group_rules in sorted(groups.items()):
comp_tags = COMPONENT_TO_TAGS.get(component, [])
if not comp_tags:
print(f" WARN: No tag mapping for component '{component}', skipping {len(group_rules)} rules")
continue
energy_tags = ENERGY_TO_TAGS.get(energy, [])
# Collect all hazard categories from group
hazard_cats = set()
for r in group_rules:
for h in r["hazards"]:
cat = HAZARD_TO_CATEGORIES.get(h)
if cat:
hazard_cats.add(cat)
if not hazard_cats:
print(f" WARN: No hazard categories for ({component}, {energy}), skipping")
continue
# Collect all measure IDs
measure_ids = set()
for r in group_rules:
for m in r["recommended_measures"]:
ids = MEASURE_TO_IDS.get(m, [])
measure_ids.update(ids)
# Collect all evidence IDs
evidence_ids = set()
for r in group_rules:
for e in r["required_evidence"]:
ids = EVIDENCE_TO_IDS.get(e, [])
evidence_ids.update(ids)
# Collect lifecycle phases
lifecycles = set()
for r in group_rules:
lifecycles.add(r["lifecycle_phase"])
# Determine priority based on hazard severity
priority = 70
if "mechanical_hazard" in hazard_cats:
priority = 80
if "electrical_hazard" in hazard_cats:
priority = 80
if any(c in hazard_cats for c in ["ai_misclassification", "model_drift"]):
priority = 75
if any(c in hazard_cats for c in ["unauthorized_access"]):
priority = 85
comp_name_de = COMPONENT_NAMES_DE.get(component, component)
energy_name_de = ENERGY_NAMES_DE.get(energy, energy)
name_de = f"{comp_name_de}{energy_name_de}"
name_en = f"{component.replace('_', ' ').title()}{energy}"
pattern = {
"id": f"HP{pattern_id:03d}",
"name_de": name_de,
"name_en": name_en,
"required_component_tags": sorted(comp_tags),
"required_energy_tags": sorted(energy_tags),
"required_lifecycle_phases": sorted(lifecycles) if len(lifecycles) <= 3 else [],
"excluded_component_tags": [],
"generated_hazard_cats": sorted(hazard_cats),
"suggested_measure_ids": sorted(measure_ids),
"suggested_evidence_ids": sorted(evidence_ids),
"priority": priority,
"source_rules": [r["rule_id"] for r in group_rules],
}
patterns.append(pattern)
pattern_id += 1
print(f"\nGenerated {len(patterns)} new HazardPatterns (HP045-HP{pattern_id-1:03d})")
# Generate Go code
go_lines = []
go_lines.append("package iace")
go_lines.append("")
go_lines.append(f"// GetExtendedHazardPatterns returns {len(patterns)} additional patterns")
go_lines.append("// derived from the Rule Library documents (R051-R1550).")
go_lines.append("// These supplement the 44 built-in patterns in hazard_patterns.go.")
go_lines.append("func GetExtendedHazardPatterns() []HazardPattern {")
go_lines.append("\treturn []HazardPattern{")
for p in patterns:
go_lines.append(f"\t\t{{")
go_lines.append(f'\t\t\tID: "{p["id"]}", NameDE: "{p["name_de"]}", NameEN: "{p["name_en"]}",')
go_lines.append(f'\t\t\tRequiredComponentTags: {go_string_slice(p["required_component_tags"])},')
go_lines.append(f'\t\t\tRequiredEnergyTags: {go_string_slice(p["required_energy_tags"])},')
if p["required_lifecycle_phases"]:
go_lines.append(f'\t\t\tRequiredLifecycles: {go_string_slice(p["required_lifecycle_phases"])},')
go_lines.append(f'\t\t\tGeneratedHazardCats: {go_string_slice(p["generated_hazard_cats"])},')
go_lines.append(f'\t\t\tSuggestedMeasureIDs: {go_string_slice(p["suggested_measure_ids"])},')
go_lines.append(f'\t\t\tSuggestedEvidenceIDs: {go_string_slice(p["suggested_evidence_ids"])},')
go_lines.append(f'\t\t\tPriority: {p["priority"]},')
go_lines.append(f'\t\t\t// Source: {", ".join(p["source_rules"])}')
go_lines.append(f"\t\t}},")
go_lines.append("\t}")
go_lines.append("}")
go_lines.append("")
go_code = "\n".join(go_lines)
output_path = "/Users/benjaminadmin/Projekte/breakpilot-compliance/ai-compliance-sdk/internal/iace/hazard_patterns_extended.go"
with open(output_path, "w") as f:
f.write(go_code)
print(f"\nGo code written to: {output_path}")
print(f"Patterns: {len(patterns)}")
# Stats
all_measure_ids = set()
all_evidence_ids = set()
all_hazard_cats = set()
for p in patterns:
all_measure_ids.update(p["suggested_measure_ids"])
all_evidence_ids.update(p["suggested_evidence_ids"])
all_hazard_cats.update(p["generated_hazard_cats"])
print(f"Unique hazard categories: {len(all_hazard_cats)}: {sorted(all_hazard_cats)}")
print(f"Unique measure IDs referenced: {len(all_measure_ids)}: {sorted(all_measure_ids)}")
print(f"Unique evidence IDs referenced: {len(all_evidence_ids)}: {sorted(all_evidence_ids)}")
def go_string_slice(items):
if not items:
return "[]string{}"
quoted = ", ".join(f'"{x}"' for x in items)
return f"[]string{{{quoted}}}"
if __name__ == "__main__":
main()