BLOCK-Tests: AutomatedRejection, MinorsWithoutTeacher, MDRUnvalidated,
SafetyCriticalNoRedundancy, DeepfakeUnlabeled, ManufacturingUnvalidated,
ReviewManipulation
Positive Tests: HumanReview OK, TeacherReview OK, DeepfakeLabeled OK
Risk Tests: AGG visible, Triage high risk
Loader Tests: AGG + AI Act obligations count, applicability
Resolver Tests: HRContext, NilContext, HealthcareContext
Meta: TotalObligationsCount, DomainConstants
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
543 lines
16 KiB
Go
543 lines
16 KiB
Go
package ucca
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
)
|
|
|
|
// ============================================================================
|
|
// HR Domain Context Tests
|
|
// ============================================================================
|
|
|
|
func TestHRContext_AutomatedRejection_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI generiert und versendet Absagen automatisch",
|
|
Domain: DomainHR,
|
|
DataTypes: DataTypes{PersonalData: true, EmployeeData: true},
|
|
HRContext: &HRContext{
|
|
AutomatedScreening: true,
|
|
AutomatedRejection: true,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO feasibility for automated rejection, got %s", result.Feasibility)
|
|
}
|
|
if !result.Art22Risk {
|
|
t.Error("Expected Art22Risk=true for automated rejection")
|
|
}
|
|
}
|
|
|
|
func TestHRContext_ScreeningWithHumanReview_OK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI sortiert Bewerber vor, Mensch prueft jeden Vorschlag",
|
|
Domain: DomainHR,
|
|
DataTypes: DataTypes{PersonalData: true, EmployeeData: true},
|
|
HRContext: &HRContext{
|
|
AutomatedScreening: true,
|
|
AutomatedRejection: false,
|
|
HumanReviewEnforced: true,
|
|
BiasAuditsDone: true,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
// Should NOT block — human review is enforced
|
|
if result.Feasibility == FeasibilityNO {
|
|
t.Error("Expected feasibility != NO when human review is enforced")
|
|
}
|
|
}
|
|
|
|
func TestHRContext_AGGVisible_RiskIncrease(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intakeWithAGG := &UseCaseIntake{
|
|
UseCaseText: "CV-Screening mit Foto und Name sichtbar",
|
|
Domain: DomainHR,
|
|
DataTypes: DataTypes{PersonalData: true, EmployeeData: true},
|
|
HRContext: &HRContext{AGGCategoriesVisible: true},
|
|
}
|
|
intakeWithout := &UseCaseIntake{
|
|
UseCaseText: "CV-Screening anonymisiert",
|
|
Domain: DomainHR,
|
|
DataTypes: DataTypes{PersonalData: true, EmployeeData: true},
|
|
HRContext: &HRContext{AGGCategoriesVisible: false},
|
|
}
|
|
|
|
resultWith := engine.Evaluate(intakeWithAGG)
|
|
resultWithout := engine.Evaluate(intakeWithout)
|
|
|
|
if resultWith.RiskScore <= resultWithout.RiskScore {
|
|
t.Errorf("Expected higher risk with AGG visible (%d) vs without (%d)",
|
|
resultWith.RiskScore, resultWithout.RiskScore)
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Education Domain Context Tests
|
|
// ============================================================================
|
|
|
|
func TestEducationContext_MinorsWithoutTeacher_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI bewertet Schuelerarbeiten ohne Lehrkraft-Pruefung",
|
|
Domain: DomainEducation,
|
|
DataTypes: DataTypes{PersonalData: true, MinorData: true},
|
|
EducationContext: &EducationContext{
|
|
GradeInfluence: true,
|
|
MinorsInvolved: true,
|
|
TeacherReviewRequired: false,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO feasibility for minors without teacher review, got %s", result.Feasibility)
|
|
}
|
|
}
|
|
|
|
func TestEducationContext_WithTeacherReview_Allowed(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI schlaegt Noten vor, Lehrkraft prueft und entscheidet",
|
|
Domain: DomainEducation,
|
|
DataTypes: DataTypes{PersonalData: true, MinorData: true},
|
|
EducationContext: &EducationContext{
|
|
GradeInfluence: true,
|
|
MinorsInvolved: true,
|
|
TeacherReviewRequired: true,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility == FeasibilityNO {
|
|
t.Error("Expected feasibility != NO when teacher review is required")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Healthcare Domain Context Tests
|
|
// ============================================================================
|
|
|
|
func TestHealthcareContext_MDRWithoutValidation_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI-Diagnosetool als Medizinprodukt ohne klinische Validierung",
|
|
Domain: DomainHealthcare,
|
|
DataTypes: DataTypes{PersonalData: true, Article9Data: true},
|
|
HealthcareContext: &HealthcareContext{
|
|
DiagnosisSupport: true,
|
|
MedicalDevice: true,
|
|
ClinicalValidation: false,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO for medical device without clinical validation, got %s", result.Feasibility)
|
|
}
|
|
}
|
|
|
|
func TestHealthcareContext_Triage_HighRisk(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI priorisiert Patienten in der Notaufnahme",
|
|
Domain: DomainHealthcare,
|
|
DataTypes: DataTypes{PersonalData: true, Article9Data: true},
|
|
HealthcareContext: &HealthcareContext{
|
|
TriageDecision: true,
|
|
PatientDataProcessed: true,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.RiskScore < 40 {
|
|
t.Errorf("Expected high risk score for triage, got %d", result.RiskScore)
|
|
}
|
|
if !result.DSFARecommended {
|
|
t.Error("Expected DSFA recommended for triage")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Critical Infrastructure Tests
|
|
// ============================================================================
|
|
|
|
func TestCriticalInfra_SafetyCriticalNoRedundancy_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI steuert Stromnetz ohne Fallback",
|
|
Domain: DomainEnergy,
|
|
CriticalInfraContext: &CriticalInfraContext{
|
|
GridControl: true,
|
|
SafetyCritical: true,
|
|
RedundancyExists: false,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO for safety-critical without redundancy, got %s", result.Feasibility)
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Marketing — Deepfake BLOCK Test
|
|
// ============================================================================
|
|
|
|
func TestMarketing_DeepfakeUnlabeled_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI generiert Werbevideos ohne Kennzeichnung",
|
|
Domain: DomainMarketing,
|
|
MarketingContext: &MarketingContext{
|
|
DeepfakeContent: true,
|
|
AIContentLabeled: false,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO for unlabeled deepfakes, got %s", result.Feasibility)
|
|
}
|
|
}
|
|
|
|
func TestMarketing_DeepfakeLabeled_OK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI generiert Werbevideos mit Kennzeichnung",
|
|
Domain: DomainMarketing,
|
|
MarketingContext: &MarketingContext{
|
|
DeepfakeContent: true,
|
|
AIContentLabeled: true,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility == FeasibilityNO {
|
|
t.Error("Expected feasibility != NO when deepfakes are properly labeled")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Manufacturing — Safety BLOCK Test
|
|
// ============================================================================
|
|
|
|
func TestManufacturing_SafetyUnvalidated_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI in Maschinensicherheit ohne Validierung",
|
|
Domain: DomainMechanicalEngineering,
|
|
ManufacturingContext: &ManufacturingContext{
|
|
MachineSafety: true,
|
|
SafetyValidated: false,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO for unvalidated machine safety, got %s", result.Feasibility)
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// AGG V2 Obligations Loading Test
|
|
// ============================================================================
|
|
|
|
func TestAGGV2_LoadsFromManifest(t *testing.T) {
|
|
regs, err := LoadAllV2Regulations()
|
|
if err != nil {
|
|
t.Fatalf("Failed to load v2 regulations: %v", err)
|
|
}
|
|
|
|
agg, ok := regs["agg"]
|
|
if !ok {
|
|
t.Fatal("agg not found in loaded regulations")
|
|
}
|
|
|
|
if len(agg.Obligations) < 8 {
|
|
t.Errorf("Expected at least 8 AGG obligations, got %d", len(agg.Obligations))
|
|
}
|
|
|
|
// Check first obligation
|
|
if agg.Obligations[0].ID != "AGG-OBL-001" {
|
|
t.Errorf("Expected first ID 'AGG-OBL-001', got '%s'", agg.Obligations[0].ID)
|
|
}
|
|
}
|
|
|
|
func TestAGGApplicability_Germany(t *testing.T) {
|
|
regs, err := LoadAllV2Regulations()
|
|
if err != nil {
|
|
t.Fatalf("Failed to load v2 regulations: %v", err)
|
|
}
|
|
|
|
module := NewJSONRegulationModule(regs["agg"])
|
|
|
|
factsDE := &UnifiedFacts{Organization: OrganizationFacts{Country: "DE"}}
|
|
if !module.IsApplicable(factsDE) {
|
|
t.Error("AGG should be applicable for German company")
|
|
}
|
|
|
|
factsUS := &UnifiedFacts{Organization: OrganizationFacts{Country: "US"}}
|
|
if module.IsApplicable(factsUS) {
|
|
t.Error("AGG should NOT be applicable for US company")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// AI Act V2 Extended Obligations Test
|
|
// ============================================================================
|
|
|
|
func TestAIActV2_ExtendedObligations(t *testing.T) {
|
|
regs, err := LoadAllV2Regulations()
|
|
if err != nil {
|
|
t.Fatalf("Failed to load v2 regulations: %v", err)
|
|
}
|
|
|
|
aiAct, ok := regs["ai_act"]
|
|
if !ok {
|
|
t.Fatal("ai_act not found in loaded regulations")
|
|
}
|
|
|
|
if len(aiAct.Obligations) < 75 {
|
|
t.Errorf("Expected at least 75 AI Act obligations (expanded), got %d", len(aiAct.Obligations))
|
|
}
|
|
|
|
// Check GPAI obligations exist (Art. 51-56)
|
|
hasGPAI := false
|
|
for _, obl := range aiAct.Obligations {
|
|
if obl.ID == "AIACT-OBL-078" { // GPAI classification
|
|
hasGPAI = true
|
|
break
|
|
}
|
|
}
|
|
if !hasGPAI {
|
|
t.Error("Expected GPAI obligation AIACT-OBL-078 in expanded AI Act")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Field Resolver Tests — Domain Contexts
|
|
// ============================================================================
|
|
|
|
func TestFieldResolver_HRContext(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
HRContext: &HRContext{AutomatedScreening: true},
|
|
}
|
|
|
|
val := engine.getFieldValue("hr_context.automated_screening", intake)
|
|
if val != true {
|
|
t.Errorf("Expected true for hr_context.automated_screening, got %v", val)
|
|
}
|
|
|
|
val2 := engine.getFieldValue("hr_context.automated_rejection", intake)
|
|
if val2 != false {
|
|
t.Errorf("Expected false for hr_context.automated_rejection, got %v", val2)
|
|
}
|
|
}
|
|
|
|
func TestFieldResolver_NilContext(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{} // No HR context
|
|
|
|
val := engine.getFieldValue("hr_context.automated_screening", intake)
|
|
if val != nil {
|
|
t.Errorf("Expected nil for nil HR context, got %v", val)
|
|
}
|
|
}
|
|
|
|
func TestFieldResolver_HealthcareContext(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
HealthcareContext: &HealthcareContext{
|
|
TriageDecision: true,
|
|
MedicalDevice: false,
|
|
},
|
|
}
|
|
|
|
val := engine.getFieldValue("healthcare_context.triage_decision", intake)
|
|
if val != true {
|
|
t.Errorf("Expected true, got %v", val)
|
|
}
|
|
|
|
val2 := engine.getFieldValue("healthcare_context.medical_device", intake)
|
|
if val2 != false {
|
|
t.Errorf("Expected false, got %v", val2)
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Hospitality — Review Manipulation BLOCK
|
|
// ============================================================================
|
|
|
|
func TestHospitality_ReviewManipulation_BLOCK(t *testing.T) {
|
|
root := getProjectRoot(t)
|
|
policyPath := filepath.Join(root, "policies", "ucca_policy_v1.yaml")
|
|
engine, err := NewPolicyEngineFromPath(policyPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create policy engine: %v", err)
|
|
}
|
|
|
|
intake := &UseCaseIntake{
|
|
UseCaseText: "KI generiert Fake-Bewertungen",
|
|
Domain: DomainHospitality,
|
|
HospitalityContext: &HospitalityContext{
|
|
ReviewManipulation: true,
|
|
},
|
|
}
|
|
|
|
result := engine.Evaluate(intake)
|
|
|
|
if result.Feasibility != FeasibilityNO {
|
|
t.Errorf("Expected NO for review manipulation, got %s", result.Feasibility)
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Total Obligations Count
|
|
// ============================================================================
|
|
|
|
func TestTotalObligationsCount(t *testing.T) {
|
|
regs, err := LoadAllV2Regulations()
|
|
if err != nil {
|
|
t.Fatalf("Failed to load v2 regulations: %v", err)
|
|
}
|
|
|
|
total := 0
|
|
for _, reg := range regs {
|
|
total += len(reg.Obligations)
|
|
}
|
|
|
|
// We expect at least 350 obligations across all regulations
|
|
if total < 350 {
|
|
t.Errorf("Expected at least 350 total obligations, got %d", total)
|
|
}
|
|
|
|
t.Logf("Total obligations across all regulations: %d", total)
|
|
for id, reg := range regs {
|
|
t.Logf(" %s: %d obligations", id, len(reg.Obligations))
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Domain constant existence checks
|
|
// ============================================================================
|
|
|
|
func TestDomainConstants_Exist(t *testing.T) {
|
|
domains := []Domain{
|
|
DomainHR, DomainEducation, DomainHealthcare,
|
|
DomainFinance, DomainBanking, DomainInsurance,
|
|
DomainEnergy, DomainUtilities,
|
|
DomainAutomotive, DomainAerospace,
|
|
DomainRetail, DomainEcommerce,
|
|
DomainMarketing, DomainMedia,
|
|
DomainLogistics, DomainConstruction,
|
|
DomainPublicSector, DomainDefense,
|
|
DomainMechanicalEngineering,
|
|
}
|
|
|
|
for _, d := range domains {
|
|
if d == "" {
|
|
t.Error("Empty domain constant found")
|
|
}
|
|
}
|
|
}
|