Files
breakpilot-compliance/ai-compliance-sdk/internal/gap/classifier.go
T
Benjamin Admin dabc2358ab feat(gap): Regulatory Gap Analysis Engine — Phase A Backend
Product Profile → Regulatory Classification → MC Gap Assessment → Priority List.

- 12 regulations supported (CRA, AI Act, NIS2, DSGVO, Data Act, MiCA, PSD2, AML, MDR, Machinery, TDDDG, LkSG)
- Scope signal extraction from product profile
- Priority scoring: Severity × Deadline × Dependency
- 5 industry templates (IoT, Exchange, Cobot, SaaS, Medical)
- 8 API endpoints under /sdk/v1/gap/
- DB migration for gap_projects table
- Full build passes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-10 23:11:30 +02:00

443 lines
13 KiB
Go

package gap
import (
"strings"
"time"
)
// RegulationDeadlines maps regulation IDs to their enforcement deadlines.
var RegulationDeadlines = map[RegulationID]time.Time{
RegCRA: time.Date(2027, 6, 1, 0, 0, 0, 0, time.UTC),
RegAIAct: time.Date(2025, 8, 1, 0, 0, 0, 0, time.UTC),
RegDataAct: time.Date(2025, 9, 12, 0, 0, 0, 0, time.UTC),
RegNIS2: time.Date(2025, 10, 18, 0, 0, 0, 0, time.UTC),
RegDSGVO: time.Date(2018, 5, 25, 0, 0, 0, 0, time.UTC),
RegMiCA: time.Date(2024, 12, 30, 0, 0, 0, 0, time.UTC),
RegPSD2: time.Date(2018, 1, 13, 0, 0, 0, 0, time.UTC),
RegMDR: time.Date(2021, 5, 26, 0, 0, 0, 0, time.UTC),
RegMachinery: time.Date(2027, 1, 20, 0, 0, 0, 0, time.UTC),
RegEAA: time.Date(2025, 6, 28, 0, 0, 0, 0, time.UTC),
RegTDDDG: time.Date(2021, 12, 1, 0, 0, 0, 0, time.UTC),
RegLkSG: time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC),
}
// RegulationNames maps IDs to human-readable names.
var RegulationNames = map[RegulationID]string{
RegCRA: "Cyber Resilience Act (CRA)",
RegAIAct: "KI-Verordnung (AI Act)",
RegNIS2: "NIS2-Richtlinie",
RegDSGVO: "DSGVO",
RegDataAct: "EU Data Act",
RegMiCA: "Markets in Crypto-Assets (MiCA)",
RegPSD2: "Zahlungsdiensterichtlinie (PSD2)",
RegAML: "Geldwäschegesetz (GwG/AML)",
RegMDR: "Medizinprodukteverordnung (MDR)",
RegMachinery: "Maschinenverordnung (EU) 2023/1230",
RegEAA: "European Accessibility Act",
RegTDDDG: "TDDDG (Telemedien-Datenschutz)",
RegLkSG: "Lieferkettensorgfaltspflichtengesetz",
}
// Classifier determines which regulations apply to a product profile.
type Classifier struct{}
// NewClassifier creates a Classifier.
func NewClassifier() *Classifier { return &Classifier{} }
// ClassifyAll evaluates all regulations against the product profile.
func (c *Classifier) ClassifyAll(p *ProductProfile) []ApplicableRegulation {
results := []ApplicableRegulation{
c.classifyCRA(p),
c.classifyAIAct(p),
c.classifyNIS2(p),
c.classifyDSGVO(p),
c.classifyDataAct(p),
c.classifyMiCA(p),
c.classifyPSD2(p),
c.classifyAML(p),
c.classifyMDR(p),
c.classifyMachinery(p),
c.classifyTDDDG(p),
c.classifyLkSG(p),
}
// Only return applicable ones
applicable := make([]ApplicableRegulation, 0, len(results))
for _, r := range results {
if r.Applicable {
applicable = append(applicable, r)
}
}
return applicable
}
// ExtractScopeSignals derives scope signals from the product profile.
func (c *Classifier) ExtractScopeSignals(p *ProductProfile) []string {
signals := []string{}
if p.ConnectedToInternet {
signals = append(signals, "networked_product")
}
if p.HasSoftwareUpdates {
signals = append(signals, "has_software_updates")
}
if p.UsesAI {
signals = append(signals, "uses_ai")
}
if p.ProcessesPersonalData {
signals = append(signals, "processes_personal_data")
}
if p.IsCriticalInfraSupplier {
signals = append(signals, "is_kritis_operator")
}
for _, tech := range p.Technologies {
switch strings.ToLower(tech) {
case "blockchain", "smart_contract":
signals = append(signals, "uses_blockchain")
case "encryption":
signals = append(signals, "uses_encryption")
case "api":
signals = append(signals, "has_public_api")
case "cloud":
signals = append(signals, "uses_cloud")
case "ota_updates":
signals = append(signals, "has_software_updates")
}
}
for _, dp := range p.DataProcessing {
switch strings.ToLower(dp) {
case "health_data":
signals = append(signals, "processes_health_data")
case "financial_data":
signals = append(signals, "processes_financial_data")
case "personal_data":
signals = append(signals, "processes_personal_data")
}
}
if p.ProductType == ProductTypeExchange {
signals = append(signals, "operates_payment_service",
"holds_client_funds", "uses_blockchain")
}
return dedupStrings(signals)
}
// ── Private classification methods ──────────────────────────────────
func (c *Classifier) classifyCRA(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegCRA)
hasDigitalElements := p.ConnectedToInternet || p.HasSoftwareUpdates ||
containsAny(p.Technologies, "api", "cloud", "ota_updates", "network")
if !hasDigitalElements {
r.Reasoning = "Produkt hat keine digitalen Elemente"
return r
}
r.Applicable = true
r.Confidence = 0.9
if containsAny(p.Technologies, "ota_updates") && p.ConnectedToInternet {
r.RiskLevel = "high"
r.Reasoning = "Vernetztes Produkt mit Software-Updates → CRA Class I/II"
r.Requirements = []string{
"Schwachstellenmanagement (Art. 10)",
"SBOM erstellen (Anhang I, Teil II)",
"Security Updates bereitstellen (Art. 13)",
"Meldepflicht bei Schwachstellen (Art. 11)",
"Konformitätsbewertung (Art. 24-28)",
}
} else {
r.RiskLevel = "medium"
r.Reasoning = "Produkt mit digitalen Elementen → CRA Default-Kategorie"
r.Requirements = []string{
"Cybersicherheitsanforderungen (Anhang I)",
"Technische Dokumentation (Anhang V)",
}
}
return r
}
func (c *Classifier) classifyAIAct(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegAIAct)
if !p.UsesAI && !containsAny(p.Technologies, "ai", "machine_learning", "neural_network") {
r.Reasoning = "Produkt verwendet keine KI"
return r
}
r.Applicable = true
isSafetyRelevant := p.ProductType == ProductTypeMachinery ||
p.ProductType == ProductTypeMedicalDevice ||
containsAny(p.DataProcessing, "health_data")
if isSafetyRelevant {
r.RiskLevel = "high"
r.Confidence = 0.9
r.Reasoning = "KI in sicherheitsrelevantem Produkt → Hochrisiko-KI"
r.Requirements = []string{
"Risikomanagement (Art. 9)",
"Datenqualität (Art. 10)",
"Technische Dokumentation (Art. 11)",
"Aufzeichnungspflichten (Art. 12)",
"Transparenz (Art. 13)",
"Menschliche Aufsicht (Art. 14)",
"Genauigkeit und Robustheit (Art. 15)",
"FRIA — Grundrechte-Folgenabschätzung",
}
} else {
r.RiskLevel = "medium"
r.Confidence = 0.8
r.Reasoning = "KI-System → Transparenzpflichten (Limited Risk)"
r.Requirements = []string{
"Transparenzpflicht (Art. 52)",
"KI-Kennzeichnung",
}
}
return r
}
func (c *Classifier) classifyNIS2(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegNIS2)
isDirectlyAffected := p.IsCriticalInfraSupplier ||
p.ProductType == ProductTypeExchange ||
containsAny(p.DataProcessing, "financial_data")
isSupplyChain := p.ConnectedToInternet && (p.ProductType == ProductTypeSaaS ||
p.ProductType == ProductTypeIoT)
if !isDirectlyAffected && !isSupplyChain {
r.Reasoning = "Kein KRITIS-Betreiber oder -Zulieferer"
return r
}
r.Applicable = true
if isDirectlyAffected {
r.RiskLevel = "high"
r.Confidence = 0.85
r.Reasoning = "Direkt betroffen als KRITIS-Betreiber/Finanzdienstleister"
} else {
r.RiskLevel = "medium"
r.Confidence = 0.7
r.Reasoning = "Indirekt betroffen als Zulieferer vernetzter Dienste"
}
r.Requirements = []string{
"Cybersicherheits-Risikomanagement (Art. 21)",
"Meldepflichten (Art. 23)",
"Supply-Chain-Sicherheit",
"Incident Response",
}
return r
}
func (c *Classifier) classifyDSGVO(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegDSGVO)
if !p.ProcessesPersonalData && !containsAny(p.DataProcessing, "personal_data", "health_data") {
r.Reasoning = "Keine Verarbeitung personenbezogener Daten"
return r
}
r.Applicable = true
r.RiskLevel = "high"
r.Confidence = 0.95
r.Reasoning = "Verarbeitung personenbezogener Daten → DSGVO anwendbar"
r.Requirements = []string{
"Rechtsgrundlage (Art. 6)",
"Informationspflichten (Art. 13/14)",
"Betroffenenrechte (Art. 15-22)",
"Verarbeitungsverzeichnis (Art. 30)",
"TOM (Art. 32)",
}
if containsAny(p.DataProcessing, "health_data") {
r.Requirements = append(r.Requirements, "DSFA (Art. 35)", "Besondere Kategorien (Art. 9)")
}
return r
}
func (c *Classifier) classifyDataAct(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegDataAct)
isConnectedProduct := p.ConnectedToInternet && (p.ProductType == ProductTypeIoT ||
p.ProductType == ProductTypeHardware || p.ProductType == ProductTypeMachinery)
if !isConnectedProduct {
r.Reasoning = "Kein vernetztes Produkt mit Datengenerierung"
return r
}
r.Applicable = true
r.RiskLevel = "medium"
r.Confidence = 0.8
r.Reasoning = "Vernetztes Produkt generiert Nutzungsdaten → Data Act anwendbar"
r.Requirements = []string{
"Nutzerdatenzugang (Art. 3-5)",
"Datenweitergabe an Dritte (Art. 5)",
"Beschränkung der Datennutzung (Art. 6)",
}
return r
}
func (c *Classifier) classifyMiCA(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegMiCA)
if p.ProductType != ProductTypeExchange &&
!containsAny(p.Technologies, "blockchain", "smart_contract", "crypto") {
r.Reasoning = "Kein Kryptowerte-Bezug"
return r
}
r.Applicable = true
r.RiskLevel = "high"
r.Confidence = 0.9
r.Reasoning = "Kryptowerte-Dienstleistung → MiCA anwendbar"
r.Requirements = []string{
"CASP-Zulassung (Art. 59-63)",
"Eigenmittelanforderungen (Art. 67)",
"Organisatorische Anforderungen (Art. 68)",
"Custody-Trennung (Art. 70)",
"Marktmissbrauchsvorschriften (Art. 86-92)",
}
return r
}
func (c *Classifier) classifyPSD2(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegPSD2)
if p.ProductType != ProductTypeExchange &&
!containsAny(p.Technologies, "payment", "fiat_gateway") &&
!containsAny(p.DataProcessing, "financial_data") {
r.Reasoning = "Kein Zahlungsdienstbezug"
return r
}
r.Applicable = true
r.RiskLevel = "high"
r.Confidence = 0.85
r.Reasoning = "Zahlungsdienste oder Fiat-Gateway → PSD2 anwendbar"
r.Requirements = []string{
"Starke Kundenauthentifizierung (Art. 97)",
"Sicherheit der Kommunikation (Art. 98)",
"Open Banking API (Art. 36)",
}
return r
}
func (c *Classifier) classifyAML(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegAML)
if p.ProductType != ProductTypeExchange &&
!containsAny(p.DataProcessing, "financial_data") {
r.Reasoning = "Kein Verpflichteter nach GwG"
return r
}
r.Applicable = true
r.RiskLevel = "high"
r.Confidence = 0.9
r.Reasoning = "Finanzdienstleistung → AML/GwG anwendbar"
r.Requirements = []string{
"KYC-Verfahren (§10 GwG)",
"Transaktionsmonitoring",
"Verdachtsmeldung (§43 GwG)",
"PEP-Prüfung",
}
return r
}
func (c *Classifier) classifyMDR(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegMDR)
if p.ProductType != ProductTypeMedicalDevice {
r.Reasoning = "Kein Medizinprodukt"
return r
}
r.Applicable = true
r.RiskLevel = "high"
r.Confidence = 0.9
r.Reasoning = "Medizinprodukt → MDR anwendbar"
r.Requirements = []string{
"Konformitätsbewertung",
"Klinische Bewertung (Art. 61)",
"Post-Market Surveillance (Art. 83-86)",
"UDI (Art. 27)",
}
return r
}
func (c *Classifier) classifyMachinery(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegMachinery)
if p.ProductType != ProductTypeMachinery && p.ProductType != ProductTypeHardware {
r.Reasoning = "Kein Maschinenprodukt"
return r
}
r.Applicable = true
r.RiskLevel = "high"
r.Confidence = 0.85
r.Reasoning = "Maschinenprodukt → Maschinenverordnung anwendbar"
r.Requirements = []string{
"CE-Konformitätsbewertung",
"Risikobeurteilung (Anhang III)",
"Betriebsanleitung",
"Technische Dokumentation",
}
return r
}
func (c *Classifier) classifyTDDDG(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegTDDDG)
hasTelemedien := p.ProductType == ProductTypeSaaS ||
(p.ConnectedToInternet && containsAny(p.Technologies, "api", "cloud"))
if !hasTelemedien {
r.Reasoning = "Kein Telemediendienst"
return r
}
r.Applicable = true
r.RiskLevel = "medium"
r.Confidence = 0.8
r.Reasoning = "Online-Dienst → TDDDG anwendbar"
r.Requirements = []string{
"Cookie-Einwilligung (§25 TDDDG)",
"Impressumspflicht",
}
return r
}
func (c *Classifier) classifyLkSG(p *ProductProfile) ApplicableRegulation {
r := c.newResult(RegLkSG)
// LkSG applies to companies with >1000 employees — we can't determine this
// from product profile alone. Flag as "unclear" for larger companies.
r.Reasoning = "LkSG-Anwendbarkeit hängt von Unternehmensgröße ab (>1000 MA)"
return r
}
// ── Helpers ─────────────────────────────────────────────────────────
func (c *Classifier) newResult(id RegulationID) ApplicableRegulation {
r := ApplicableRegulation{
ID: id,
Name: RegulationNames[id],
Applicable: false,
Confidence: 0,
}
if dl, ok := RegulationDeadlines[id]; ok {
r.Deadline = &dl
}
return r
}
func containsAny(slice []string, values ...string) bool {
for _, s := range slice {
for _, v := range values {
if strings.EqualFold(s, v) {
return true
}
}
}
return false
}
func dedupStrings(in []string) []string {
seen := make(map[string]bool, len(in))
out := make([]string, 0, len(in))
for _, s := range in {
if !seen[s] {
seen[s] = true
out = append(out, s)
}
}
return out
}