Files
breakpilot-compliance/ai-compliance-sdk/internal/iace/audit/reachability.go
T
Benjamin Admin f534b52817 feat(iace): pattern audit suite + library hygiene wave
Add cmd/iace-audit CLI with 5 deterministic methods that find engine
gaps without ground truth:

- A reachability: 1058 patterns vs achievable tag universe
- B consistency: components vs their declared hazard categories
- C vocabulary: limits-form tokens vs keyword dictionary
- D echo: limits-form sentences vs generated hazards (jaccard)
- E hierarchy: hazards vs ISO 12100 design/protection/info levels

Library fixes triggered by A+B+C findings:

- tag_resolver: synonym map for electrical/pneumatic/hydraulic aliases
- component_library: crush_point + EN03 (gravitational) on C014/C128
  (Hubwerk family) - fixes HP1014/1015/1017/1018 which were silently
  weakly_reachable. noise_source added on 7 components (C006/C011/
  C017/C020/C031/C041/C096). electrical_part on 8 drive components
  (C031/C032/C033/C034/C035/C036/C037/C038/C077/C092). cyber tag
  on 10 sensors (C081-C090) + 3 IT components (C111/C112/C116) +
  KI module C119 (ai_model added). pneumatic_part+hydraulic_part
  on valves C091/C093, hydraulic_part+chemical_risk on pump C097,
  moving_part on motion controller C075
- keyword_dictionary: EN03 added to aufzug/lift/hubwerk/hubgeraet
  (was wrongly EN04-only). New keyword entries for hub-action verbs:
  absenken/senken/anheben/heben + hubhoehe/hubweg/hubgeschwindig

Audit impact:
- A: weakly_reachable 409 -> 358 (-51 patterns now fully reachable)
- B: incomplete components 46 -> 30 (-16, -33%)
- HP1018 (Person unter absenkendem Maschinenteil eingeklemmt):
  weakly_reachable -> reachable

Why: methods A/B/C surfaced that the Kistenhubgeraet test project
generated 0 crush-under-load hazards despite OSHA 1910.212(a)(3) +
EN ISO 12100 6.3.5.5 explicitly requiring them. Three orthogonal
bugs (missing crush_point tag, wrong energy source mapping, missing
action verbs in dictionary) silently disabled the entire lift crush
pattern family.
2026-05-21 10:51:08 +02:00

299 lines
9.1 KiB
Go

// Package audit provides static and runtime audits of the IACE pattern
// engine — finding pattern reachability, library consistency, and
// limits-form coverage gaps without a ground-truth reference.
package audit
import (
"sort"
"github.com/breakpilot/ai-compliance-sdk/internal/iace"
)
// ReachabilityResult is the verdict for a single pattern in Method A.
type ReachabilityResult struct {
PatternID string `json:"pattern_id"`
Name string `json:"name_de"`
Priority int `json:"priority"`
RequiredAllTags []string `json:"required_tags"`
UnreachableTags []string `json:"unreachable_tags,omitempty"`
Status string `json:"status"` // "reachable" | "weakly_reachable" | "unreachable"
ReachableSources []string `json:"reachable_sources,omitempty"`
FixSuggestions []string `json:"fix_suggestions,omitempty"`
}
// ReachabilityReport is the full Method A output.
type ReachabilityReport struct {
TotalPatterns int `json:"total_patterns"`
Reachable int `json:"reachable"`
WeaklyReachable int `json:"weakly_reachable"`
Unreachable int `json:"unreachable"`
UniverseTags []string `json:"universe_tags"`
UnreachablePatterns []ReachabilityResult `json:"unreachable_patterns"`
WeakPatterns []ReachabilityResult `json:"weak_patterns"`
}
// RunReachability evaluates every pattern against the achievable tag universe.
//
// A pattern is:
// - "unreachable" if at least one required tag is not produced by any
// component, energy source, or keyword-dictionary entry.
// - "weakly_reachable" if all required tags exist in the universe but
// no single source (one Component or one EnergySource or one Keyword
// entry) supplies all of them at once — i.e., it relies on multiple
// parser hits to combine.
// - "reachable" if some single source covers all required tags.
//
// The classification ignores ExcludedComponentTags and runtime filters
// (lifecycle/op-state/machine-type), because those are project-level
// concerns. The audit answers "could this pattern EVER fire", not
// "does it fire for project X".
func RunReachability() ReachabilityReport {
patterns := iace.AllPatterns()
comps := iace.GetComponentLibrary()
energies := iace.GetEnergySources()
keywords := iace.GetKeywordDictionary()
// Tag universe: union of every tag emitted anywhere
universe := map[string][]string{} // tag → list of source IDs that emit it
for _, c := range comps {
for _, t := range c.Tags {
universe[t] = appendUnique(universe[t], "component:"+c.ID)
}
}
for _, e := range energies {
for _, t := range e.Tags {
universe[t] = appendUnique(universe[t], "energy:"+e.ID)
}
}
for i, kw := range keywords {
for _, t := range kw.ExtraTags {
universe[t] = appendUnique(universe[t], keywordLabel(kw, i))
}
// Keyword entries can also reference components/energies, which
// transitively add their tags to the keyword's effective tag set.
for _, cID := range kw.ComponentIDs {
for _, c := range comps {
if c.ID != cID {
continue
}
for _, t := range c.Tags {
universe[t] = appendUnique(universe[t], keywordLabel(kw, i))
}
}
}
for _, eID := range kw.EnergyIDs {
for _, e := range energies {
if e.ID != eID {
continue
}
for _, t := range e.Tags {
universe[t] = appendUnique(universe[t], keywordLabel(kw, i))
}
}
}
}
// Single-source coverage map: tag → covering sources, but also
// per-source tag set so we can check "is there ONE source covering
// all required tags".
sourceTags := map[string]map[string]bool{}
for _, c := range comps {
key := "component:" + c.ID
sourceTags[key] = toBoolSet(c.Tags)
}
for _, e := range energies {
key := "energy:" + e.ID
sourceTags[key] = toBoolSet(e.Tags)
}
for i, kw := range keywords {
key := keywordLabel(kw, i)
set := toBoolSet(kw.ExtraTags)
for _, cID := range kw.ComponentIDs {
for _, c := range comps {
if c.ID == cID {
for _, t := range c.Tags {
set[t] = true
}
}
}
}
for _, eID := range kw.EnergyIDs {
for _, e := range energies {
if e.ID == eID {
for _, t := range e.Tags {
set[t] = true
}
}
}
}
sourceTags[key] = set
}
report := ReachabilityReport{TotalPatterns: len(patterns)}
// Universe tag list (sorted) for the report header
for t := range universe {
report.UniverseTags = append(report.UniverseTags, t)
}
sort.Strings(report.UniverseTags)
for _, p := range patterns {
all := dedup(append(append([]string{}, p.RequiredComponentTags...), p.RequiredEnergyTags...))
if len(all) == 0 {
// Pattern with no tag requirements relies on lifecycle/machine_type
// filters only — count as reachable by default.
report.Reachable++
continue
}
var missing []string
for _, t := range all {
if _, ok := universe[t]; !ok {
missing = append(missing, t)
}
}
res := ReachabilityResult{
PatternID: p.ID,
Name: p.NameDE,
Priority: p.Priority,
RequiredAllTags: all,
}
if len(missing) > 0 {
res.Status = "unreachable"
res.UnreachableTags = missing
res.FixSuggestions = suggestFixes(p, missing, comps, sourceTags)
report.Unreachable++
report.UnreachablePatterns = append(report.UnreachablePatterns, res)
continue
}
// All tags in universe — check single-source coverage
single := findSingleSourceCovers(all, sourceTags)
if len(single) > 0 {
res.Status = "reachable"
res.ReachableSources = single
report.Reachable++
continue
}
res.Status = "weakly_reachable"
res.FixSuggestions = suggestSingleSourceFixes(p, all, comps, sourceTags)
report.WeaklyReachable++
report.WeakPatterns = append(report.WeakPatterns, res)
}
sort.Slice(report.UnreachablePatterns, func(i, j int) bool {
return report.UnreachablePatterns[i].Priority > report.UnreachablePatterns[j].Priority
})
sort.Slice(report.WeakPatterns, func(i, j int) bool {
return report.WeakPatterns[i].Priority > report.WeakPatterns[j].Priority
})
return report
}
func findSingleSourceCovers(required []string, sourceTags map[string]map[string]bool) []string {
var hits []string
for src, tags := range sourceTags {
ok := true
for _, t := range required {
if !tags[t] {
ok = false
break
}
}
if ok {
hits = append(hits, src)
}
}
sort.Strings(hits)
return hits
}
// suggestFixes proposes concrete library edits for unreachable patterns:
// "Add tag X to Component C014 (Hubwerk)" type suggestions.
func suggestFixes(p iace.HazardPattern, missing []string, comps []iace.ComponentLibraryEntry, sourceTags map[string]map[string]bool) []string {
var out []string
// For each missing tag, find candidates: components/energies that
// would semantically own that tag based on existing tags overlap.
for _, tag := range missing {
candidates := nearComponents(p, tag, comps, sourceTags)
if len(candidates) > 0 {
out = append(out, "Add tag '"+tag+"' to one of: "+joinFirst(candidates, 3))
} else {
out = append(out, "Tag '"+tag+"' is undefined anywhere — needs a new component or energy source carrying it")
}
}
return out
}
func suggestSingleSourceFixes(p iace.HazardPattern, all []string, comps []iace.ComponentLibraryEntry, sourceTags map[string]map[string]bool) []string {
// Find components that match the most required tags, then suggest
// adding the residual ones.
best := ""
bestCover := 0
var bestMissing []string
for src, tags := range sourceTags {
hit := 0
var miss []string
for _, t := range all {
if tags[t] {
hit++
} else {
miss = append(miss, t)
}
}
if hit > bestCover {
best, bestCover, bestMissing = src, hit, miss
}
}
if best == "" || bestCover == 0 {
return []string{"No single source covers any required tags — pattern needs a new dedicated component"}
}
if len(bestMissing) == 0 {
return nil
}
return []string{"Closest single source '" + best + "' covers " + itoa(bestCover) + "/" + itoa(len(all)) + " tags. Add missing tags to it: " + joinFirst(bestMissing, 5)}
}
// nearComponents finds components whose tags overlap most with the pattern's
// requirements — these are good candidates to receive the missing tag.
func nearComponents(p iace.HazardPattern, missing string, comps []iace.ComponentLibraryEntry, sourceTags map[string]map[string]bool) []string {
required := dedup(append(append([]string{}, p.RequiredComponentTags...), p.RequiredEnergyTags...))
required = removeOne(required, missing)
if len(required) == 0 {
return nil
}
type scored struct {
id string
score int
}
var scoredList []scored
for _, c := range comps {
tagSet := toBoolSet(c.Tags)
s := 0
for _, t := range required {
if tagSet[t] {
s++
}
}
if s > 0 {
scoredList = append(scoredList, scored{id: c.ID + " (" + c.NameDE + ")", score: s})
}
}
sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score })
var out []string
for _, s := range scoredList {
out = append(out, s.id)
}
return out
}
func keywordLabel(kw iace.KeywordEntry, idx int) string {
if len(kw.Keywords) > 0 {
return "keyword:" + kw.Keywords[0]
}
return "keyword:" + itoa(idx)
}