Files
Benjamin Admin c7651796c9
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 36s
CI/CD / test-python-backend-compliance (push) Successful in 36s
CI/CD / test-python-document-crawler (push) Successful in 22s
CI/CD / test-python-dsms-gateway (push) Successful in 18s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / Deploy (push) Successful in 2s
feat(iace): integrate ISO 12100 machine risk model with 4-factor assessment
Add dual-mode risk engine: legacy S×E×P (avoidance=0) and ISO mode S×F×P×A
(avoidance>=1) with new thresholds (low/medium/high/very_high/not_acceptable).

- 150+ hazard library entries across 28 categories incl. physical hazards
  (mechanical, electrical, thermal, pneumatic/hydraulic, noise/vibration,
  ergonomic, material/environmental)
- 160-entry protective measures library with 3-step hierarchy validation
  (design → protective → information)
- 25 lifecycle phases, 20 affected person roles, 50 evidence types
- 10 verification methods (expanded from 7)
- New API endpoints: lifecycle-phases, roles, evidence-types,
  protective-measures-library, validate-mitigation-hierarchy
- DB migrations 018+019 for extended schema
- Frontend: 4-slider risk assessment, hierarchy warnings, measures library modal
- MkDocs wiki updated with ISO mode docs and legal notice (no norm text)

All content uses original wording — norms referenced as methodology only.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 23:13:41 +01:00

287 lines
9.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package iace
import (
"fmt"
"math"
)
// RiskLevel, AssessRiskRequest, and RiskAssessment types are defined in models.go.
// This file only contains calculation methods.
// RiskComputeInput contains the input parameters for the engine's risk computation.
type RiskComputeInput struct {
Severity int `json:"severity"` // 1-5
Exposure int `json:"exposure"` // 1-5
Probability int `json:"probability"` // 1-5
Avoidance int `json:"avoidance"` // 0=disabled, 1-5 (3=neutral)
ControlMaturity int `json:"control_maturity"` // 0-4
ControlCoverage float64 `json:"control_coverage"` // 0-1
TestEvidence float64 `json:"test_evidence"` // 0-1
HasJustification bool `json:"has_justification"`
}
// RiskComputeResult contains the output of the engine's risk computation.
type RiskComputeResult struct {
InherentRisk float64 `json:"inherent_risk"`
ControlEffectiveness float64 `json:"control_effectiveness"`
ResidualRisk float64 `json:"residual_risk"`
RiskLevel RiskLevel `json:"risk_level"`
IsAcceptable bool `json:"is_acceptable"`
AcceptanceReason string `json:"acceptance_reason"`
}
// ============================================================================
// RiskEngine
// ============================================================================
// RiskEngine provides methods for mathematical risk calculations
// according to the IACE (Inherent-risk Adjusted Control Effectiveness) model.
type RiskEngine struct{}
// NewRiskEngine creates a new RiskEngine instance.
func NewRiskEngine() *RiskEngine {
return &RiskEngine{}
}
// ============================================================================
// Calculations
// ============================================================================
// clamp restricts v to the range [lo, hi].
func clamp(v, lo, hi int) int {
if v < lo {
return lo
}
if v > hi {
return hi
}
return v
}
// clampFloat restricts v to the range [lo, hi].
func clampFloat(v, lo, hi float64) float64 {
if v < lo {
return lo
}
if v > hi {
return hi
}
return v
}
// CalculateInherentRisk computes the inherent risk score.
//
// Dual-mode formula for backward compatibility:
// - avoidance == 0: Legacy S × E × P (no avoidance factor)
// - avoidance >= 1: ISO 12100 mode S × F × P × A (direct multiplication)
//
// In ISO mode, the factors represent:
// - S: Severity (1-5), F: Frequency/Exposure (1-5), P: Probability (1-5), A: Avoidance (1-5)
//
// Each factor is expected in the range 1-5 and will be clamped if out of range.
func (e *RiskEngine) CalculateInherentRisk(severity, exposure, probability, avoidance int) float64 {
s := clamp(severity, 1, 5)
ex := clamp(exposure, 1, 5)
p := clamp(probability, 1, 5)
base := float64(s) * float64(ex) * float64(p)
if avoidance <= 0 {
return base
}
// ISO 12100 mode: direct S × F × P × A multiplication (no /3.0 normalization)
a := clamp(avoidance, 1, 5)
return base * float64(a)
}
// CalculateControlEffectiveness computes the control effectiveness score.
//
// Formula: C_eff = min(1, 0.2*(maturity/4.0) + 0.5*coverage + 0.3*testEvidence)
//
// Parameters:
// - maturity: 0-4, clamped if out of range
// - coverage: 0-1, clamped if out of range
// - testEvidence: 0-1, clamped if out of range
//
// Returns a value between 0 and 1.
func (e *RiskEngine) CalculateControlEffectiveness(maturity int, coverage, testEvidence float64) float64 {
m := clamp(maturity, 0, 4)
cov := clampFloat(coverage, 0, 1)
te := clampFloat(testEvidence, 0, 1)
cEff := 0.2*(float64(m)/4.0) + 0.5*cov + 0.3*te
return math.Min(1, cEff)
}
// CalculateResidualRisk computes the residual risk after applying controls.
//
// Formula: R_residual = S * E * P * (1 - cEff)
//
// Parameters:
// - severity, exposure, probability: 1-5, clamped if out of range
// - cEff: control effectiveness, 0-1
func (e *RiskEngine) CalculateResidualRisk(severity, exposure, probability int, cEff float64) float64 {
inherent := e.CalculateInherentRisk(severity, exposure, probability, 0)
return inherent * (1 - cEff)
}
// DetermineRiskLevel classifies the residual risk into a RiskLevel category.
//
// Thresholds:
// - >= 75: critical
// - >= 40: high
// - >= 15: medium
// - >= 5: low
// - < 5: negligible
func (e *RiskEngine) DetermineRiskLevel(residualRisk float64) RiskLevel {
switch {
case residualRisk >= 75:
return RiskLevelCritical
case residualRisk >= 40:
return RiskLevelHigh
case residualRisk >= 15:
return RiskLevelMedium
case residualRisk >= 5:
return RiskLevelLow
default:
return RiskLevelNegligible
}
}
// DetermineRiskLevelISO classifies the inherent risk using ISO 12100 thresholds.
//
// Thresholds (for S×F×P×A, max 625):
// - > 300: not_acceptable
// - 151-300: very_high
// - 61-150: high
// - 21-60: medium
// - 1-20: low
func (e *RiskEngine) DetermineRiskLevelISO(risk float64) RiskLevel {
switch {
case risk > 300:
return RiskLevelNotAcceptable
case risk >= 151:
return RiskLevelVeryHigh
case risk >= 61:
return RiskLevelHigh
case risk >= 21:
return RiskLevelMedium
default:
return RiskLevelLow
}
}
// ValidateProtectiveMeasureHierarchy checks if an information-only measure
// (ReductionType "information") is being used as the primary mitigation
// when design or technical measures could be applied.
// Returns a list of warning messages.
func (e *RiskEngine) ValidateProtectiveMeasureHierarchy(reductionType ReductionType, existingMitigations []Mitigation) []string {
var warnings []string
if reductionType != ReductionTypeInformation {
return warnings
}
// Check if there are any design or protective measures already
hasDesign := false
hasProtective := false
for _, m := range existingMitigations {
if m.Status == MitigationStatusRejected {
continue
}
switch m.ReductionType {
case ReductionTypeDesign:
hasDesign = true
case ReductionTypeProtective:
hasProtective = true
}
}
if !hasDesign && !hasProtective {
warnings = append(warnings,
"Hinweismassnahmen (Typ 3) duerfen NICHT als alleinige Primaermassnahme verwendet werden. "+
"Pruefen Sie zuerst, ob konstruktive (Typ 1) oder technische Schutzmassnahmen (Typ 2) moeglich sind.")
} else if !hasDesign {
warnings = append(warnings,
"Es liegen keine konstruktiven Massnahmen (Typ 1) vor. "+
"Pruefen Sie, ob eine inhaerent sichere Konstruktion die Gefaehrdung beseitigen kann.")
}
return warnings
}
// IsAcceptable determines whether the residual risk is acceptable based on
// the ALARP (As Low As Reasonably Practicable) principle and EU AI Act thresholds.
//
// Decision logic:
// - residualRisk < 15: acceptable ("Restrisiko unter Schwellwert")
// - residualRisk < 40 AND allReductionStepsApplied AND hasJustification:
// acceptable under ALARP ("ALARP-Prinzip: Restrisiko akzeptabel mit vollstaendiger Risikominderung")
// - residualRisk >= 40: not acceptable ("Restrisiko zu hoch - blockiert CE-Export")
func (e *RiskEngine) IsAcceptable(residualRisk float64, allReductionStepsApplied bool, hasJustification bool) (bool, string) {
if residualRisk < 15 {
return true, "Restrisiko unter Schwellwert"
}
if residualRisk < 40 && allReductionStepsApplied && hasJustification {
return true, "ALARP-Prinzip: Restrisiko akzeptabel mit vollstaendiger Risikominderung"
}
return false, "Restrisiko zu hoch - blockiert CE-Export"
}
// CalculateCompletenessScore computes a weighted completeness score (0-100).
//
// Formula:
//
// score = (passedRequired/totalRequired)*80
// + (passedRecommended/totalRecommended)*15
// + (passedOptional/totalOptional)*5
//
// If any totalX is 0, that component contributes 0 to the score.
func (e *RiskEngine) CalculateCompletenessScore(passedRequired, totalRequired, passedRecommended, totalRecommended, passedOptional, totalOptional int) float64 {
var score float64
if totalRequired > 0 {
score += (float64(passedRequired) / float64(totalRequired)) * 80
}
if totalRecommended > 0 {
score += (float64(passedRecommended) / float64(totalRecommended)) * 15
}
if totalOptional > 0 {
score += (float64(passedOptional) / float64(totalOptional)) * 5
}
return score
}
// ComputeRisk performs a complete risk computation using all calculation methods.
// It returns a RiskComputeResult with inherent risk, control effectiveness, residual risk,
// risk level, and acceptability.
//
// The allReductionStepsApplied parameter for IsAcceptable is set to false;
// the caller is responsible for updating acceptance status after reduction steps are applied.
func (e *RiskEngine) ComputeRisk(req RiskComputeInput) (*RiskComputeResult, error) {
if req.Severity < 1 || req.Exposure < 1 || req.Probability < 1 {
return nil, fmt.Errorf("severity, exposure, and probability must be >= 1")
}
inherentRisk := e.CalculateInherentRisk(req.Severity, req.Exposure, req.Probability, req.Avoidance)
controlEff := e.CalculateControlEffectiveness(req.ControlMaturity, req.ControlCoverage, req.TestEvidence)
residualRisk := e.CalculateResidualRisk(req.Severity, req.Exposure, req.Probability, controlEff)
// ISO 12100 mode uses ISO thresholds for inherent risk classification
var riskLevel RiskLevel
if req.Avoidance >= 1 {
riskLevel = e.DetermineRiskLevelISO(inherentRisk)
} else {
riskLevel = e.DetermineRiskLevel(residualRisk)
}
acceptable, reason := e.IsAcceptable(residualRisk, false, req.HasJustification)
return &RiskComputeResult{
InherentRisk: inherentRisk,
ControlEffectiveness: controlEff,
ResidualRisk: residualRisk,
RiskLevel: riskLevel,
IsAcceptable: acceptable,
AcceptanceReason: reason,
}, nil
}