feat(sdk,iace): add Personalized Drafting Pipeline v2 and IACE engine
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 44s
CI / test-python-backend-compliance (push) Successful in 37s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 20s
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 44s
CI / test-python-backend-compliance (push) Successful in 37s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 20s
Drafting Engine: 7-module pipeline with narrative tags, allowed facts governance, PII sanitizer, prose validator with repair loop, hash-based cache, and terminology guide. v1 fallback via ?v=1 query param. IACE: Initial AI-Act Conformity Engine with risk classifier, completeness checker, hazard library, and PostgreSQL store for AI system assessments. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
415
ai-compliance-sdk/internal/iace/classifier.go
Normal file
415
ai-compliance-sdk/internal/iace/classifier.go
Normal file
@@ -0,0 +1,415 @@
|
||||
package iace
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ============================================================================
|
||||
// Classifier Types
|
||||
// ============================================================================
|
||||
|
||||
// ClassificationResult holds the output of a single regulatory classification.
|
||||
type ClassificationResult struct {
|
||||
Regulation RegulationType `json:"regulation"`
|
||||
ClassificationResult string `json:"classification_result"`
|
||||
RiskLevel string `json:"risk_level"`
|
||||
Confidence float64 `json:"confidence"`
|
||||
Reasoning string `json:"reasoning"`
|
||||
Requirements []string `json:"requirements"`
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Classifier
|
||||
// ============================================================================
|
||||
|
||||
// Classifier determines which EU regulations apply to a machine or product
|
||||
// based on project metadata and component analysis.
|
||||
type Classifier struct{}
|
||||
|
||||
// NewClassifier creates a new Classifier instance.
|
||||
func NewClassifier() *Classifier { return &Classifier{} }
|
||||
|
||||
// ============================================================================
|
||||
// Public Methods
|
||||
// ============================================================================
|
||||
|
||||
// ClassifyAll runs all four regulatory classifiers (AI Act, Machinery Regulation,
|
||||
// CRA, NIS2) and returns the combined results.
|
||||
func (c *Classifier) ClassifyAll(project *Project, components []Component) []ClassificationResult {
|
||||
return []ClassificationResult{
|
||||
c.ClassifyAIAct(project, components),
|
||||
c.ClassifyMachineryRegulation(project, components),
|
||||
c.ClassifyCRA(project, components),
|
||||
c.ClassifyNIS2(project, components),
|
||||
}
|
||||
}
|
||||
|
||||
// ClassifyAIAct determines the AI Act classification based on whether the system
|
||||
// contains AI components and their safety relevance.
|
||||
//
|
||||
// Classification logic:
|
||||
// - Has safety-relevant AI component: "high_risk"
|
||||
// - Has AI component (not safety-relevant): "limited_risk"
|
||||
// - No AI components: "not_applicable"
|
||||
func (c *Classifier) ClassifyAIAct(project *Project, components []Component) ClassificationResult {
|
||||
result := ClassificationResult{
|
||||
Regulation: RegulationAIAct,
|
||||
}
|
||||
|
||||
hasAI := false
|
||||
hasSafetyRelevantAI := false
|
||||
var aiComponentNames []string
|
||||
|
||||
for _, comp := range components {
|
||||
if comp.ComponentType == ComponentTypeAIModel {
|
||||
hasAI = true
|
||||
aiComponentNames = append(aiComponentNames, comp.Name)
|
||||
if comp.IsSafetyRelevant {
|
||||
hasSafetyRelevantAI = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case hasSafetyRelevantAI:
|
||||
result.ClassificationResult = "high_risk"
|
||||
result.RiskLevel = "high"
|
||||
result.Confidence = 0.9
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"System contains safety-relevant AI component(s): %s. "+
|
||||
"Under EU AI Act Art. 6, AI systems that are safety components of products "+
|
||||
"covered by Union harmonisation legislation are classified as high-risk.",
|
||||
strings.Join(aiComponentNames, ", "),
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Risk management system (Art. 9)",
|
||||
"Data governance and management (Art. 10)",
|
||||
"Technical documentation (Art. 11)",
|
||||
"Record-keeping / logging (Art. 12)",
|
||||
"Transparency and information to deployers (Art. 13)",
|
||||
"Human oversight measures (Art. 14)",
|
||||
"Accuracy, robustness and cybersecurity (Art. 15)",
|
||||
"Quality management system (Art. 17)",
|
||||
"Conformity assessment before placing on market",
|
||||
}
|
||||
|
||||
case hasAI:
|
||||
result.ClassificationResult = "limited_risk"
|
||||
result.RiskLevel = "medium"
|
||||
result.Confidence = 0.85
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"System contains AI component(s): %s, but none are marked as safety-relevant. "+
|
||||
"Classified as limited-risk under EU AI Act with transparency obligations.",
|
||||
strings.Join(aiComponentNames, ", "),
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Transparency obligations (Art. 52)",
|
||||
"AI literacy measures (Art. 4)",
|
||||
"Technical documentation recommended",
|
||||
}
|
||||
|
||||
default:
|
||||
result.ClassificationResult = "not_applicable"
|
||||
result.RiskLevel = "none"
|
||||
result.Confidence = 0.95
|
||||
result.Reasoning = "No AI model components found in the system. EU AI Act does not apply."
|
||||
result.Requirements = nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// ClassifyMachineryRegulation determines the Machinery Regulation (EU) 2023/1230
|
||||
// classification based on CE marking intent and component analysis.
|
||||
//
|
||||
// Classification logic:
|
||||
// - CE marking target set: "applicable" (standard machinery)
|
||||
// - Has safety-relevant software/firmware: "annex_iii" (high-risk, Annex III machinery)
|
||||
// - Otherwise: "standard"
|
||||
func (c *Classifier) ClassifyMachineryRegulation(project *Project, components []Component) ClassificationResult {
|
||||
result := ClassificationResult{
|
||||
Regulation: RegulationMachineryRegulation,
|
||||
}
|
||||
|
||||
hasCETarget := project.CEMarkingTarget != ""
|
||||
hasSafetyRelevantSoftware := false
|
||||
var safetySwNames []string
|
||||
|
||||
for _, comp := range components {
|
||||
if (comp.ComponentType == ComponentTypeSoftware || comp.ComponentType == ComponentTypeFirmware) && comp.IsSafetyRelevant {
|
||||
hasSafetyRelevantSoftware = true
|
||||
safetySwNames = append(safetySwNames, comp.Name)
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case hasSafetyRelevantSoftware:
|
||||
result.ClassificationResult = "annex_iii"
|
||||
result.RiskLevel = "high"
|
||||
result.Confidence = 0.9
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"Machine contains safety-relevant software/firmware component(s): %s. "+
|
||||
"Under Machinery Regulation (EU) 2023/1230 Annex III, machinery with safety-relevant "+
|
||||
"digital components requires third-party conformity assessment.",
|
||||
strings.Join(safetySwNames, ", "),
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Third-party conformity assessment (Annex III)",
|
||||
"Essential health and safety requirements (Annex III EHSR)",
|
||||
"Technical documentation per Annex IV",
|
||||
"Risk assessment per ISO 12100",
|
||||
"Software validation (IEC 62443 / IEC 61508)",
|
||||
"EU Declaration of Conformity",
|
||||
"CE marking",
|
||||
}
|
||||
|
||||
case hasCETarget:
|
||||
result.ClassificationResult = "applicable"
|
||||
result.RiskLevel = "medium"
|
||||
result.Confidence = 0.85
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"CE marking target is set (%s). Machinery Regulation (EU) 2023/1230 applies. "+
|
||||
"No safety-relevant software/firmware detected; standard conformity assessment path.",
|
||||
project.CEMarkingTarget,
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Essential health and safety requirements (EHSR)",
|
||||
"Technical documentation per Annex IV",
|
||||
"Risk assessment per ISO 12100",
|
||||
"EU Declaration of Conformity",
|
||||
"CE marking",
|
||||
}
|
||||
|
||||
default:
|
||||
result.ClassificationResult = "standard"
|
||||
result.RiskLevel = "low"
|
||||
result.Confidence = 0.7
|
||||
result.Reasoning = "No CE marking target specified and no safety-relevant software/firmware detected. " +
|
||||
"Standard machinery regulation requirements may still apply depending on product placement."
|
||||
result.Requirements = []string{
|
||||
"Risk assessment recommended",
|
||||
"Technical documentation recommended",
|
||||
"Verify if CE marking is required for intended market",
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// ClassifyCRA determines the Cyber Resilience Act (CRA) classification based on
|
||||
// whether the system contains networked components and their criticality.
|
||||
//
|
||||
// Classification logic:
|
||||
// - Safety-relevant + networked: "class_ii" (highest CRA category)
|
||||
// - Networked with critical component types: "class_i"
|
||||
// - Networked (other): "default" (self-assessment)
|
||||
// - No networked components: "not_applicable"
|
||||
func (c *Classifier) ClassifyCRA(project *Project, components []Component) ClassificationResult {
|
||||
result := ClassificationResult{
|
||||
Regulation: RegulationCRA,
|
||||
}
|
||||
|
||||
hasNetworked := false
|
||||
hasSafetyRelevantNetworked := false
|
||||
hasCriticalType := false
|
||||
var networkedNames []string
|
||||
|
||||
// Component types considered critical under CRA
|
||||
criticalTypes := map[ComponentType]bool{
|
||||
ComponentTypeController: true,
|
||||
ComponentTypeNetwork: true,
|
||||
ComponentTypeSensor: true,
|
||||
}
|
||||
|
||||
for _, comp := range components {
|
||||
if comp.IsNetworked {
|
||||
hasNetworked = true
|
||||
networkedNames = append(networkedNames, comp.Name)
|
||||
|
||||
if comp.IsSafetyRelevant {
|
||||
hasSafetyRelevantNetworked = true
|
||||
}
|
||||
if criticalTypes[comp.ComponentType] {
|
||||
hasCriticalType = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case hasSafetyRelevantNetworked:
|
||||
result.ClassificationResult = "class_ii"
|
||||
result.RiskLevel = "high"
|
||||
result.Confidence = 0.9
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"System contains safety-relevant networked component(s): %s. "+
|
||||
"Under CRA, products with digital elements that are safety-relevant and networked "+
|
||||
"fall into Class II, requiring third-party conformity assessment.",
|
||||
strings.Join(networkedNames, ", "),
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Third-party conformity assessment",
|
||||
"Vulnerability handling process",
|
||||
"Security updates for product lifetime (min. 5 years)",
|
||||
"SBOM (Software Bill of Materials)",
|
||||
"Incident reporting to ENISA within 24h",
|
||||
"Coordinated vulnerability disclosure",
|
||||
"Secure by default configuration",
|
||||
"Technical documentation with cybersecurity risk assessment",
|
||||
}
|
||||
|
||||
case hasCriticalType:
|
||||
result.ClassificationResult = "class_i"
|
||||
result.RiskLevel = "medium"
|
||||
result.Confidence = 0.85
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"System contains networked component(s) of critical type: %s. "+
|
||||
"Under CRA Class I, these products require conformity assessment via harmonised "+
|
||||
"standards or third-party assessment.",
|
||||
strings.Join(networkedNames, ", "),
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Conformity assessment (self or third-party with harmonised standards)",
|
||||
"Vulnerability handling process",
|
||||
"Security updates for product lifetime (min. 5 years)",
|
||||
"SBOM (Software Bill of Materials)",
|
||||
"Incident reporting to ENISA within 24h",
|
||||
"Coordinated vulnerability disclosure",
|
||||
"Secure by default configuration",
|
||||
}
|
||||
|
||||
case hasNetworked:
|
||||
result.ClassificationResult = "default"
|
||||
result.RiskLevel = "low"
|
||||
result.Confidence = 0.85
|
||||
result.Reasoning = fmt.Sprintf(
|
||||
"System contains networked component(s): %s. "+
|
||||
"CRA default category applies; self-assessment is sufficient.",
|
||||
strings.Join(networkedNames, ", "),
|
||||
)
|
||||
result.Requirements = []string{
|
||||
"Self-assessment conformity",
|
||||
"Vulnerability handling process",
|
||||
"Security updates for product lifetime (min. 5 years)",
|
||||
"SBOM (Software Bill of Materials)",
|
||||
"Incident reporting to ENISA within 24h",
|
||||
}
|
||||
|
||||
default:
|
||||
result.ClassificationResult = "not_applicable"
|
||||
result.RiskLevel = "none"
|
||||
result.Confidence = 0.9
|
||||
result.Reasoning = "No networked components found. The Cyber Resilience Act applies to " +
|
||||
"products with digital elements that have a network connection. Currently not applicable."
|
||||
result.Requirements = nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// ClassifyNIS2 determines the NIS2 Directive classification based on project
|
||||
// metadata indicating whether the manufacturer supplies critical infrastructure sectors.
|
||||
//
|
||||
// Classification logic:
|
||||
// - Project metadata indicates KRITIS supplier: "indirect_obligation"
|
||||
// - Otherwise: "not_applicable"
|
||||
func (c *Classifier) ClassifyNIS2(project *Project, components []Component) ClassificationResult {
|
||||
result := ClassificationResult{
|
||||
Regulation: RegulationNIS2,
|
||||
}
|
||||
|
||||
isKRITISSupplier := c.isKRITISSupplier(project)
|
||||
|
||||
if isKRITISSupplier {
|
||||
result.ClassificationResult = "indirect_obligation"
|
||||
result.RiskLevel = "medium"
|
||||
result.Confidence = 0.8
|
||||
result.Reasoning = "Project metadata indicates this product/system is supplied to clients " +
|
||||
"in critical infrastructure sectors (KRITIS). Under NIS2, suppliers to essential and " +
|
||||
"important entities have indirect obligations for supply chain security."
|
||||
result.Requirements = []string{
|
||||
"Supply chain security measures",
|
||||
"Incident notification support for customers",
|
||||
"Cybersecurity risk management documentation",
|
||||
"Security-by-design evidence",
|
||||
"Contractual security requirements with KRITIS customers",
|
||||
"Regular security assessments and audits",
|
||||
}
|
||||
} else {
|
||||
result.ClassificationResult = "not_applicable"
|
||||
result.RiskLevel = "none"
|
||||
result.Confidence = 0.75
|
||||
result.Reasoning = "No indication in project metadata that this product is supplied to " +
|
||||
"critical infrastructure (KRITIS) sectors. NIS2 indirect obligations do not currently apply. " +
|
||||
"Re-evaluate if customer base changes."
|
||||
result.Requirements = nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Helper Methods
|
||||
// ============================================================================
|
||||
|
||||
// isKRITISSupplier checks project metadata for indicators that the manufacturer
|
||||
// supplies critical infrastructure sectors.
|
||||
func (c *Classifier) isKRITISSupplier(project *Project) bool {
|
||||
if project.Metadata == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var metadata map[string]interface{}
|
||||
if err := json.Unmarshal(project.Metadata, &metadata); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check for explicit KRITIS flag
|
||||
if kritis, ok := metadata["kritis_supplier"]; ok {
|
||||
if val, ok := kritis.(bool); ok && val {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Check for critical sector clients
|
||||
if sectors, ok := metadata["critical_sector_clients"]; ok {
|
||||
switch v := sectors.(type) {
|
||||
case []interface{}:
|
||||
return len(v) > 0
|
||||
case bool:
|
||||
return v
|
||||
}
|
||||
}
|
||||
|
||||
// Check for NIS2-relevant target sectors
|
||||
if targetSectors, ok := metadata["target_sectors"]; ok {
|
||||
kriticalSectors := map[string]bool{
|
||||
"energy": true,
|
||||
"transport": true,
|
||||
"banking": true,
|
||||
"health": true,
|
||||
"water": true,
|
||||
"digital_infra": true,
|
||||
"public_admin": true,
|
||||
"space": true,
|
||||
"food": true,
|
||||
"manufacturing": true,
|
||||
"waste_management": true,
|
||||
"postal": true,
|
||||
"chemicals": true,
|
||||
}
|
||||
|
||||
if sectorList, ok := targetSectors.([]interface{}); ok {
|
||||
for _, s := range sectorList {
|
||||
if str, ok := s.(string); ok {
|
||||
if kriticalSectors[strings.ToLower(str)] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user