Files
breakpilot-compliance/ai-compliance-sdk/internal/iace/engine.go
Benjamin Boenisch 06711bad1c
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 44s
CI / test-python-backend-compliance (push) Successful in 37s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 20s
feat(sdk,iace): add Personalized Drafting Pipeline v2 and IACE engine
Drafting Engine: 7-module pipeline with narrative tags, allowed facts governance,
PII sanitizer, prose validator with repair loop, hash-based cache, and terminology
guide. v1 fallback via ?v=1 query param.

IACE: Initial AI-Act Conformity Engine with risk classifier, completeness checker,
hazard library, and PostgreSQL store for AI system assessments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:27:06 +01:00

203 lines
7.0 KiB
Go

package iace
import (
"fmt"
"math"
)
// RiskLevel, AssessRiskRequest, and RiskAssessment types are defined in models.go.
// This file only contains calculation methods.
// RiskComputeInput contains the input parameters for the engine's risk computation.
type RiskComputeInput struct {
Severity int `json:"severity"` // 1-5
Exposure int `json:"exposure"` // 1-5
Probability int `json:"probability"` // 1-5
ControlMaturity int `json:"control_maturity"` // 0-4
ControlCoverage float64 `json:"control_coverage"` // 0-1
TestEvidence float64 `json:"test_evidence"` // 0-1
HasJustification bool `json:"has_justification"`
}
// RiskComputeResult contains the output of the engine's risk computation.
type RiskComputeResult struct {
InherentRisk float64 `json:"inherent_risk"`
ControlEffectiveness float64 `json:"control_effectiveness"`
ResidualRisk float64 `json:"residual_risk"`
RiskLevel RiskLevel `json:"risk_level"`
IsAcceptable bool `json:"is_acceptable"`
AcceptanceReason string `json:"acceptance_reason"`
}
// ============================================================================
// RiskEngine
// ============================================================================
// RiskEngine provides methods for mathematical risk calculations
// according to the IACE (Inherent-risk Adjusted Control Effectiveness) model.
type RiskEngine struct{}
// NewRiskEngine creates a new RiskEngine instance.
func NewRiskEngine() *RiskEngine {
return &RiskEngine{}
}
// ============================================================================
// Calculations
// ============================================================================
// clamp restricts v to the range [lo, hi].
func clamp(v, lo, hi int) int {
if v < lo {
return lo
}
if v > hi {
return hi
}
return v
}
// clampFloat restricts v to the range [lo, hi].
func clampFloat(v, lo, hi float64) float64 {
if v < lo {
return lo
}
if v > hi {
return hi
}
return v
}
// CalculateInherentRisk computes the inherent risk score as S * E * P.
// Each factor is expected in the range 1-5 and will be clamped if out of range.
func (e *RiskEngine) CalculateInherentRisk(severity, exposure, probability int) float64 {
s := clamp(severity, 1, 5)
ex := clamp(exposure, 1, 5)
p := clamp(probability, 1, 5)
return float64(s) * float64(ex) * float64(p)
}
// CalculateControlEffectiveness computes the control effectiveness score.
//
// Formula: C_eff = min(1, 0.2*(maturity/4.0) + 0.5*coverage + 0.3*testEvidence)
//
// Parameters:
// - maturity: 0-4, clamped if out of range
// - coverage: 0-1, clamped if out of range
// - testEvidence: 0-1, clamped if out of range
//
// Returns a value between 0 and 1.
func (e *RiskEngine) CalculateControlEffectiveness(maturity int, coverage, testEvidence float64) float64 {
m := clamp(maturity, 0, 4)
cov := clampFloat(coverage, 0, 1)
te := clampFloat(testEvidence, 0, 1)
cEff := 0.2*(float64(m)/4.0) + 0.5*cov + 0.3*te
return math.Min(1, cEff)
}
// CalculateResidualRisk computes the residual risk after applying controls.
//
// Formula: R_residual = S * E * P * (1 - cEff)
//
// Parameters:
// - severity, exposure, probability: 1-5, clamped if out of range
// - cEff: control effectiveness, 0-1
func (e *RiskEngine) CalculateResidualRisk(severity, exposure, probability int, cEff float64) float64 {
inherent := e.CalculateInherentRisk(severity, exposure, probability)
return inherent * (1 - cEff)
}
// DetermineRiskLevel classifies the residual risk into a RiskLevel category.
//
// Thresholds:
// - >= 75: critical
// - >= 40: high
// - >= 15: medium
// - >= 5: low
// - < 5: negligible
func (e *RiskEngine) DetermineRiskLevel(residualRisk float64) RiskLevel {
switch {
case residualRisk >= 75:
return RiskLevelCritical
case residualRisk >= 40:
return RiskLevelHigh
case residualRisk >= 15:
return RiskLevelMedium
case residualRisk >= 5:
return RiskLevelLow
default:
return RiskLevelNegligible
}
}
// IsAcceptable determines whether the residual risk is acceptable based on
// the ALARP (As Low As Reasonably Practicable) principle and EU AI Act thresholds.
//
// Decision logic:
// - residualRisk < 15: acceptable ("Restrisiko unter Schwellwert")
// - residualRisk < 40 AND allReductionStepsApplied AND hasJustification:
// acceptable under ALARP ("ALARP-Prinzip: Restrisiko akzeptabel mit vollstaendiger Risikominderung")
// - residualRisk >= 40: not acceptable ("Restrisiko zu hoch - blockiert CE-Export")
func (e *RiskEngine) IsAcceptable(residualRisk float64, allReductionStepsApplied bool, hasJustification bool) (bool, string) {
if residualRisk < 15 {
return true, "Restrisiko unter Schwellwert"
}
if residualRisk < 40 && allReductionStepsApplied && hasJustification {
return true, "ALARP-Prinzip: Restrisiko akzeptabel mit vollstaendiger Risikominderung"
}
return false, "Restrisiko zu hoch - blockiert CE-Export"
}
// CalculateCompletenessScore computes a weighted completeness score (0-100).
//
// Formula:
//
// score = (passedRequired/totalRequired)*80
// + (passedRecommended/totalRecommended)*15
// + (passedOptional/totalOptional)*5
//
// If any totalX is 0, that component contributes 0 to the score.
func (e *RiskEngine) CalculateCompletenessScore(passedRequired, totalRequired, passedRecommended, totalRecommended, passedOptional, totalOptional int) float64 {
var score float64
if totalRequired > 0 {
score += (float64(passedRequired) / float64(totalRequired)) * 80
}
if totalRecommended > 0 {
score += (float64(passedRecommended) / float64(totalRecommended)) * 15
}
if totalOptional > 0 {
score += (float64(passedOptional) / float64(totalOptional)) * 5
}
return score
}
// ComputeRisk performs a complete risk computation using all calculation methods.
// It returns a RiskComputeResult with inherent risk, control effectiveness, residual risk,
// risk level, and acceptability.
//
// The allReductionStepsApplied parameter for IsAcceptable is set to false;
// the caller is responsible for updating acceptance status after reduction steps are applied.
func (e *RiskEngine) ComputeRisk(req RiskComputeInput) (*RiskComputeResult, error) {
if req.Severity < 1 || req.Exposure < 1 || req.Probability < 1 {
return nil, fmt.Errorf("severity, exposure, and probability must be >= 1")
}
inherentRisk := e.CalculateInherentRisk(req.Severity, req.Exposure, req.Probability)
controlEff := e.CalculateControlEffectiveness(req.ControlMaturity, req.ControlCoverage, req.TestEvidence)
residualRisk := e.CalculateResidualRisk(req.Severity, req.Exposure, req.Probability, controlEff)
riskLevel := e.DetermineRiskLevel(residualRisk)
acceptable, reason := e.IsAcceptable(residualRisk, false, req.HasJustification)
return &RiskComputeResult{
InherentRisk: inherentRisk,
ControlEffectiveness: controlEff,
ResidualRisk: residualRisk,
RiskLevel: riskLevel,
IsAcceptable: acceptable,
AcceptanceReason: reason,
}, nil
}