diff --git a/ai-compliance-sdk/internal/ucca/domain_context_test.go b/ai-compliance-sdk/internal/ucca/domain_context_test.go new file mode 100644 index 0000000..60d2ba5 --- /dev/null +++ b/ai-compliance-sdk/internal/ucca/domain_context_test.go @@ -0,0 +1,542 @@ +package ucca + +import ( + "os" + "path/filepath" + "testing" +) + +// ============================================================================ +// HR Domain Context Tests +// ============================================================================ + +func TestHRContext_AutomatedRejection_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI generiert und versendet Absagen automatisch", + Domain: DomainHR, + DataTypes: DataTypes{PersonalData: true, EmployeeData: true}, + HRContext: &HRContext{ + AutomatedScreening: true, + AutomatedRejection: true, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO feasibility for automated rejection, got %s", result.Feasibility) + } + if !result.Art22Risk { + t.Error("Expected Art22Risk=true for automated rejection") + } +} + +func TestHRContext_ScreeningWithHumanReview_OK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI sortiert Bewerber vor, Mensch prueft jeden Vorschlag", + Domain: DomainHR, + DataTypes: DataTypes{PersonalData: true, EmployeeData: true}, + HRContext: &HRContext{ + AutomatedScreening: true, + AutomatedRejection: false, + HumanReviewEnforced: true, + BiasAuditsDone: true, + }, + } + + result := engine.Evaluate(intake) + + // Should NOT block — human review is enforced + if result.Feasibility == FeasibilityNO { + t.Error("Expected feasibility != NO when human review is enforced") + } +} + +func TestHRContext_AGGVisible_RiskIncrease(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intakeWithAGG := &UseCaseIntake{ + UseCaseText: "CV-Screening mit Foto und Name sichtbar", + Domain: DomainHR, + DataTypes: DataTypes{PersonalData: true, EmployeeData: true}, + HRContext: &HRContext{AGGCategoriesVisible: true}, + } + intakeWithout := &UseCaseIntake{ + UseCaseText: "CV-Screening anonymisiert", + Domain: DomainHR, + DataTypes: DataTypes{PersonalData: true, EmployeeData: true}, + HRContext: &HRContext{AGGCategoriesVisible: false}, + } + + resultWith := engine.Evaluate(intakeWithAGG) + resultWithout := engine.Evaluate(intakeWithout) + + if resultWith.RiskScore <= resultWithout.RiskScore { + t.Errorf("Expected higher risk with AGG visible (%d) vs without (%d)", + resultWith.RiskScore, resultWithout.RiskScore) + } +} + +// ============================================================================ +// Education Domain Context Tests +// ============================================================================ + +func TestEducationContext_MinorsWithoutTeacher_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI bewertet Schuelerarbeiten ohne Lehrkraft-Pruefung", + Domain: DomainEducation, + DataTypes: DataTypes{PersonalData: true, MinorData: true}, + EducationContext: &EducationContext{ + GradeInfluence: true, + MinorsInvolved: true, + TeacherReviewRequired: false, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO feasibility for minors without teacher review, got %s", result.Feasibility) + } +} + +func TestEducationContext_WithTeacherReview_Allowed(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI schlaegt Noten vor, Lehrkraft prueft und entscheidet", + Domain: DomainEducation, + DataTypes: DataTypes{PersonalData: true, MinorData: true}, + EducationContext: &EducationContext{ + GradeInfluence: true, + MinorsInvolved: true, + TeacherReviewRequired: true, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility == FeasibilityNO { + t.Error("Expected feasibility != NO when teacher review is required") + } +} + +// ============================================================================ +// Healthcare Domain Context Tests +// ============================================================================ + +func TestHealthcareContext_MDRWithoutValidation_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI-Diagnosetool als Medizinprodukt ohne klinische Validierung", + Domain: DomainHealthcare, + DataTypes: DataTypes{PersonalData: true, Article9Data: true}, + HealthcareContext: &HealthcareContext{ + DiagnosisSupport: true, + MedicalDevice: true, + ClinicalValidation: false, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO for medical device without clinical validation, got %s", result.Feasibility) + } +} + +func TestHealthcareContext_Triage_HighRisk(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI priorisiert Patienten in der Notaufnahme", + Domain: DomainHealthcare, + DataTypes: DataTypes{PersonalData: true, Article9Data: true}, + HealthcareContext: &HealthcareContext{ + TriageDecision: true, + PatientDataProcessed: true, + }, + } + + result := engine.Evaluate(intake) + + if result.RiskScore < 40 { + t.Errorf("Expected high risk score for triage, got %d", result.RiskScore) + } + if !result.DSFARecommended { + t.Error("Expected DSFA recommended for triage") + } +} + +// ============================================================================ +// Critical Infrastructure Tests +// ============================================================================ + +func TestCriticalInfra_SafetyCriticalNoRedundancy_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI steuert Stromnetz ohne Fallback", + Domain: DomainEnergy, + CriticalInfraContext: &CriticalInfraContext{ + GridControl: true, + SafetyCritical: true, + RedundancyExists: false, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO for safety-critical without redundancy, got %s", result.Feasibility) + } +} + +// ============================================================================ +// Marketing — Deepfake BLOCK Test +// ============================================================================ + +func TestMarketing_DeepfakeUnlabeled_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI generiert Werbevideos ohne Kennzeichnung", + Domain: DomainMarketing, + MarketingContext: &MarketingContext{ + DeepfakeContent: true, + AIContentLabeled: false, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO for unlabeled deepfakes, got %s", result.Feasibility) + } +} + +func TestMarketing_DeepfakeLabeled_OK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI generiert Werbevideos mit Kennzeichnung", + Domain: DomainMarketing, + MarketingContext: &MarketingContext{ + DeepfakeContent: true, + AIContentLabeled: true, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility == FeasibilityNO { + t.Error("Expected feasibility != NO when deepfakes are properly labeled") + } +} + +// ============================================================================ +// Manufacturing — Safety BLOCK Test +// ============================================================================ + +func TestManufacturing_SafetyUnvalidated_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI in Maschinensicherheit ohne Validierung", + Domain: DomainMechanicalEngineering, + ManufacturingContext: &ManufacturingContext{ + MachineSafety: true, + SafetyValidated: false, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO for unvalidated machine safety, got %s", result.Feasibility) + } +} + +// ============================================================================ +// AGG V2 Obligations Loading Test +// ============================================================================ + +func TestAGGV2_LoadsFromManifest(t *testing.T) { + regs, err := LoadAllV2Regulations() + if err != nil { + t.Fatalf("Failed to load v2 regulations: %v", err) + } + + agg, ok := regs["agg"] + if !ok { + t.Fatal("agg not found in loaded regulations") + } + + if len(agg.Obligations) < 8 { + t.Errorf("Expected at least 8 AGG obligations, got %d", len(agg.Obligations)) + } + + // Check first obligation + if agg.Obligations[0].ID != "AGG-OBL-001" { + t.Errorf("Expected first ID 'AGG-OBL-001', got '%s'", agg.Obligations[0].ID) + } +} + +func TestAGGApplicability_Germany(t *testing.T) { + regs, err := LoadAllV2Regulations() + if err != nil { + t.Fatalf("Failed to load v2 regulations: %v", err) + } + + module := NewJSONRegulationModule(regs["agg"]) + + factsDE := &UnifiedFacts{Organization: OrganizationFacts{Country: "DE"}} + if !module.IsApplicable(factsDE) { + t.Error("AGG should be applicable for German company") + } + + factsUS := &UnifiedFacts{Organization: OrganizationFacts{Country: "US"}} + if module.IsApplicable(factsUS) { + t.Error("AGG should NOT be applicable for US company") + } +} + +// ============================================================================ +// AI Act V2 Extended Obligations Test +// ============================================================================ + +func TestAIActV2_ExtendedObligations(t *testing.T) { + regs, err := LoadAllV2Regulations() + if err != nil { + t.Fatalf("Failed to load v2 regulations: %v", err) + } + + aiAct, ok := regs["ai_act"] + if !ok { + t.Fatal("ai_act not found in loaded regulations") + } + + if len(aiAct.Obligations) < 75 { + t.Errorf("Expected at least 75 AI Act obligations (expanded), got %d", len(aiAct.Obligations)) + } + + // Check GPAI obligations exist (Art. 51-56) + hasGPAI := false + for _, obl := range aiAct.Obligations { + if obl.ID == "AIACT-OBL-078" { // GPAI classification + hasGPAI = true + break + } + } + if !hasGPAI { + t.Error("Expected GPAI obligation AIACT-OBL-078 in expanded AI Act") + } +} + +// ============================================================================ +// Field Resolver Tests — Domain Contexts +// ============================================================================ + +func TestFieldResolver_HRContext(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + HRContext: &HRContext{AutomatedScreening: true}, + } + + val := engine.getFieldValue("hr_context.automated_screening", intake) + if val != true { + t.Errorf("Expected true for hr_context.automated_screening, got %v", val) + } + + val2 := engine.getFieldValue("hr_context.automated_rejection", intake) + if val2 != false { + t.Errorf("Expected false for hr_context.automated_rejection, got %v", val2) + } +} + +func TestFieldResolver_NilContext(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{} // No HR context + + val := engine.getFieldValue("hr_context.automated_screening", intake) + if val != nil { + t.Errorf("Expected nil for nil HR context, got %v", val) + } +} + +func TestFieldResolver_HealthcareContext(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + HealthcareContext: &HealthcareContext{ + TriageDecision: true, + MedicalDevice: false, + }, + } + + val := engine.getFieldValue("healthcare_context.triage_decision", intake) + if val != true { + t.Errorf("Expected true, got %v", val) + } + + val2 := engine.getFieldValue("healthcare_context.medical_device", intake) + if val2 != false { + t.Errorf("Expected false, got %v", val2) + } +} + +// ============================================================================ +// Hospitality — Review Manipulation BLOCK +// ============================================================================ + +func TestHospitality_ReviewManipulation_BLOCK(t *testing.T) { + root := getProjectRoot(t) + policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml") + engine, err := NewPolicyEngineFromPath(policyPath) + if err != nil { + t.Fatalf("Failed to create policy engine: %v", err) + } + + intake := &UseCaseIntake{ + UseCaseText: "KI generiert Fake-Bewertungen", + Domain: DomainHospitality, + HospitalityContext: &HospitalityContext{ + ReviewManipulation: true, + }, + } + + result := engine.Evaluate(intake) + + if result.Feasibility != FeasibilityNO { + t.Errorf("Expected NO for review manipulation, got %s", result.Feasibility) + } +} + +// ============================================================================ +// Total Obligations Count +// ============================================================================ + +func TestTotalObligationsCount(t *testing.T) { + regs, err := LoadAllV2Regulations() + if err != nil { + t.Fatalf("Failed to load v2 regulations: %v", err) + } + + total := 0 + for _, reg := range regs { + total += len(reg.Obligations) + } + + // We expect at least 350 obligations across all regulations + if total < 350 { + t.Errorf("Expected at least 350 total obligations, got %d", total) + } + + t.Logf("Total obligations across all regulations: %d", total) + for id, reg := range regs { + t.Logf(" %s: %d obligations", id, len(reg.Obligations)) + } +} + +// ============================================================================ +// Domain constant existence checks +// ============================================================================ + +func TestDomainConstants_Exist(t *testing.T) { + domains := []Domain{ + DomainHR, DomainEducation, DomainHealthcare, + DomainFinance, DomainBanking, DomainInsurance, + DomainEnergy, DomainUtilities, + DomainAutomotive, DomainAerospace, + DomainRetail, DomainEcommerce, + DomainMarketing, DomainMedia, + DomainLogistics, DomainConstruction, + DomainPublicSector, DomainDefense, + DomainMechanicalEngineering, + } + + for _, d := range domains { + if d == "" { + t.Error("Empty domain constant found") + } + } +}