Files
breakpilot-compliance/ai-compliance-sdk/internal/gap/gap_engine.go
T
Benjamin Admin dabc2358ab feat(gap): Regulatory Gap Analysis Engine — Phase A Backend
Product Profile → Regulatory Classification → MC Gap Assessment → Priority List.

- 12 regulations supported (CRA, AI Act, NIS2, DSGVO, Data Act, MiCA, PSD2, AML, MDR, Machinery, TDDDG, LkSG)
- Scope signal extraction from product profile
- Priority scoring: Severity × Deadline × Dependency
- 5 industry templates (IoT, Exchange, Cobot, SaaS, Medical)
- 8 API endpoints under /sdk/v1/gap/
- DB migration for gap_projects table
- Full build passes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-10 23:11:30 +02:00

291 lines
7.7 KiB
Go

package gap
import (
"fmt"
"math"
"sort"
"time"
)
// Engine orchestrates the gap analysis pipeline.
type Engine struct {
classifier *Classifier
store *Store
}
// NewEngine creates a new gap analysis engine.
func NewEngine(store *Store) *Engine {
return &Engine{
classifier: NewClassifier(),
store: store,
}
}
// Analyze runs the full gap analysis for a product profile.
func (e *Engine) Analyze(profile *ProductProfile) (*GapReport, error) {
// Step 1: Extract scope signals
signals := e.classifier.ExtractScopeSignals(profile)
// Step 2: Classify regulations
regulations := e.classifier.ClassifyAll(profile)
// Step 3: Fetch applicable MCs from DB
mcGroups, err := e.store.FetchApplicableMCs(signals, regulations)
if err != nil {
return nil, fmt.Errorf("fetch MCs: %w", err)
}
// Step 4: Assess gaps
gaps := make([]GapItem, 0, len(mcGroups))
for _, mc := range mcGroups {
status := e.assessGapStatus(mc, profile.ExistingCertifications)
item := GapItem{
MCID: mc.MasterControlID,
MCName: mc.CanonicalName,
Regulation: mc.Regulation,
Status: status,
Title: mc.Title,
Description: mc.Description,
Severity: mc.Severity,
ControlCount: mc.ControlCount,
Recommendation: e.generateRecommendation(mc, status),
}
item.Priority = e.calculatePriority(item, regulations)
gaps = append(gaps, item)
}
// Step 5: Sort by priority (highest first)
sort.Slice(gaps, func(i, j int) bool {
return gaps[i].Priority.Score > gaps[j].Priority.Score
})
// Assign ranks
for i := range gaps {
gaps[i].Priority.Rank = i + 1
}
// Step 6: Build report
report := &GapReport{
ProfileID: profile.ID,
ProfileName: profile.Name,
Regulations: regulations,
Summary: e.buildSummary(gaps, regulations),
Gaps: gaps,
CreatedAt: time.Now(),
}
return report, nil
}
// assessGapStatus determines if a MC is fulfilled based on existing certs.
func (e *Engine) assessGapStatus(mc MCGroup, certs []string) GapStatus {
// If customer has ISO 27001, many security controls are likely fulfilled
for _, cert := range certs {
switch cert {
case "ISO27001":
if isSecurityTopic(mc.CanonicalName) {
return GapPartial // Likely partially covered
}
case "CE":
if isMachineryTopic(mc.CanonicalName) {
return GapFulfilled
}
case "SOC2":
if isSecurityTopic(mc.CanonicalName) {
return GapPartial
}
}
}
// Default: missing (customer must verify)
return GapMissing
}
// calculatePriority computes the priority score for a gap.
func (e *Engine) calculatePriority(item GapItem, regs []ApplicableRegulation) Priority {
p := Priority{
SeverityFactor: severityToFactor(item.Severity),
DeadlineFactor: 1.0,
DependencyFactor: 1.0,
}
// Find deadline for this regulation
for _, reg := range regs {
if reg.ID == item.Regulation && reg.Deadline != nil {
monthsUntil := time.Until(*reg.Deadline).Hours() / (24 * 30)
if monthsUntil < 6 {
p.DeadlineFactor = 3.0
} else if monthsUntil < 12 {
p.DeadlineFactor = 2.0
}
}
}
// Dependency: foundational controls get higher priority
if isFoundational(item.MCName) {
p.DependencyFactor = 2.0
}
p.Score = p.SeverityFactor * p.DeadlineFactor * p.DependencyFactor
// Only gaps count — fulfilled items get score 0
if item.Status == GapFulfilled {
p.Score = 0
} else if item.Status == GapPartial {
p.Score *= 0.5
}
return p
}
// generateRecommendation creates an actionable recommendation for a gap.
func (e *Engine) generateRecommendation(mc MCGroup, status GapStatus) string {
if status == GapFulfilled {
return "Bereits erfüllt — keine Maßnahme erforderlich."
}
// Build recommendation from MC name + regulation
name := mc.CanonicalName
switch {
case contains(name, "encryption"):
return "Verschlüsselungsimplementierung prüfen und dokumentieren."
case contains(name, "access_control"):
return "Zugriffskontrollkonzept erstellen und Berechtigungen überprüfen."
case contains(name, "incident"):
return "Incident-Response-Plan erstellen und Meldeprozesse etablieren."
case contains(name, "vulnerability"):
return "Schwachstellenmanagement einführen (Scanning, CVE-Tracking, Patching)."
case contains(name, "audit_logging"):
return "Protokollierung implementieren und Audit-Trail sicherstellen."
case contains(name, "data_retention"):
return "Löschkonzept erstellen mit konkreten Fristen pro Datenkategorie."
case contains(name, "consent"):
return "Einwilligungsmanagement implementieren (Opt-In, Widerruf, Dokumentation)."
case contains(name, "dpia"):
return "Datenschutz-Folgenabschätzung durchführen und dokumentieren."
case contains(name, "training"):
return "Schulungsprogramm für Mitarbeiter etablieren."
case contains(name, "risk_management"):
return "Risikobewertung durchführen und Maßnahmenplan erstellen."
default:
return fmt.Sprintf("Anforderung '%s' prüfen und Umsetzung planen.", mc.Title)
}
}
// buildSummary aggregates gap statistics.
func (e *Engine) buildSummary(gaps []GapItem, regs []ApplicableRegulation) GapSummary {
s := GapSummary{
TotalApplicableRegulations: len(regs),
TotalGaps: len(gaps),
GapsByStatus: map[string]int{},
GapsBySeverity: map[string]int{},
GapsByRegulation: map[string]int{},
}
fulfilled := 0
for _, g := range gaps {
s.GapsByStatus[string(g.Status)]++
s.GapsBySeverity[g.Severity]++
s.GapsByRegulation[string(g.Regulation)]++
if g.Status == GapFulfilled {
fulfilled++
}
// Rough effort estimate per gap
switch g.Severity {
case "CRITICAL":
s.EstimatedEffortWeeks += 2
case "HIGH":
s.EstimatedEffortWeeks += 1
case "MEDIUM":
s.EstimatedEffortWeeks += 0.5
case "LOW":
s.EstimatedEffortWeeks += 0.25
}
}
if len(gaps) > 0 {
s.OverallCompliancePercent = math.Round(float64(fulfilled)/float64(len(gaps))*1000) / 10
}
// Only count effort for non-fulfilled gaps
s.EstimatedEffortWeeks = math.Round(s.EstimatedEffortWeeks*10) / 10
return s
}
// ── Helpers ──────────────────────────────────────────────────────────
func severityToFactor(sev string) float64 {
switch sev {
case "CRITICAL":
return 4.0
case "HIGH":
return 3.0
case "MEDIUM":
return 2.0
case "LOW":
return 1.0
default:
return 2.0
}
}
func isFoundational(name string) bool {
foundational := []string{
"risk_management", "policy", "asset_management",
"access_control_rbac", "encryption_key",
}
for _, f := range foundational {
if contains(name, f) {
return true
}
}
return false
}
func isSecurityTopic(name string) bool {
topics := []string{
"encryption", "access_control", "vulnerability", "patch_management",
"audit_logging", "monitoring", "firewall", "network_security",
"session_management", "multi_factor_auth", "key_management",
"backup", "disaster_recovery", "incident",
}
for _, t := range topics {
if contains(name, t) {
return true
}
}
return false
}
func isMachineryTopic(name string) bool {
return contains(name, "product_safety") || contains(name, "certification")
}
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > len(substr) &&
(s[:len(substr)] == substr || s[len(s)-len(substr):] == substr ||
findSubstring(s, substr)))
}
func findSubstring(s, sub string) bool {
for i := 0; i <= len(s)-len(sub); i++ {
if s[i:i+len(sub)] == sub {
return true
}
}
return false
}
// MCGroup represents a Master Control with aggregated info for gap analysis.
type MCGroup struct {
MasterControlID string
CanonicalName string
Title string
Description string
Regulation RegulationID
Severity string
ControlCount int
}