Files
breakpilot-lehrer/klausur-service/backend/tests/test_rbac.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

1237 lines
43 KiB
Python

"""
Unit Tests for RBAC/ABAC Policy System
Tests cover:
- Role Enum: All roles (Korrektur + Zeugnis)
- Action Enum: All actions
- ResourceType Enum: All resource types (Klausur + Zeugnis)
- VerfahrenType: is_exam_type(), is_certificate_type()
- PolicySet: Creation, serialization, type checks
- RoleAssignment: Creation, validity, revocation
- KeyShare: Creation, acceptance, revocation
- PolicyEngine:
- register_policy_set()
- get_policy_for_context()
- assign_role()
- revoke_role()
- get_user_roles()
- create_key_share()
- accept_key_share()
- revoke_key_share()
- check_permission()
- get_allowed_actions()
- DEFAULT_PERMISSIONS: Matrix completeness
- Zeugnis-Workflow: Role chain permissions
"""
import pytest
import uuid
from datetime import datetime, timezone, timedelta
import sys
sys.path.insert(0, '..')
from rbac import (
Role,
Action,
ResourceType,
ZKVisibilityMode,
EHVisibilityMode,
VerfahrenType,
PolicySet,
RoleAssignment,
KeyShare,
Tenant,
Namespace,
ExamPackage,
PolicyEngine,
DEFAULT_PERMISSIONS,
create_default_policy_sets,
get_policy_engine,
)
# =============================================
# FIXTURES
# =============================================
@pytest.fixture
def policy_engine():
"""Fresh PolicyEngine instance for each test."""
return PolicyEngine()
@pytest.fixture
def sample_policy():
"""Sample PolicySet for testing."""
return PolicySet(
id="TEST-2025-ABITUR",
bundesland="niedersachsen",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
eh_visibility_mode=EHVisibilityMode.SHARED,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz"
)
@pytest.fixture
def sample_zeugnis_policy():
"""Sample PolicySet for Zeugnis testing."""
return PolicySet(
id="TEST-2025-ZEUGNIS",
bundesland="niedersachsen",
jahr=2025,
fach=None,
verfahren="halbjahreszeugnis",
require_klassenlehrer_approval=True,
require_schulleitung_signoff=True,
konferenz_protokoll_required=True,
kopfnoten_enabled=True,
versetzung_auto_calculate=True
)
@pytest.fixture
def sample_tenant():
"""Sample Tenant for testing."""
return Tenant(
id="school-001",
name="Test Gymnasium",
bundesland="niedersachsen",
tenant_type="school",
encryption_enabled=True
)
@pytest.fixture
def sample_namespace(sample_tenant):
"""Sample Namespace for testing."""
return Namespace(
id="namespace-001",
tenant_id=sample_tenant.id,
name="Abitur 2025 - Deutsch LK",
jahr=2025,
fach="deutsch",
kurs="12a",
pruefungsart="abitur"
)
@pytest.fixture
def sample_exam_package(sample_namespace, sample_tenant):
"""Sample ExamPackage for testing."""
return ExamPackage(
id="package-001",
namespace_id=sample_namespace.id,
tenant_id=sample_tenant.id,
name="Deutsch LK Q1 Klausur",
status="draft",
owner_id="teacher-001"
)
# =============================================
# ENUM TESTS
# =============================================
class TestRoleEnum:
"""Tests for Role Enum."""
def test_all_korrektur_roles_exist(self):
"""All correction chain roles are defined."""
assert Role.ERSTKORREKTOR.value == "erstkorrektor"
assert Role.ZWEITKORREKTOR.value == "zweitkorrektor"
assert Role.DRITTKORREKTOR.value == "drittkorrektor"
def test_all_zeugnis_roles_exist(self):
"""All certificate workflow roles are defined."""
assert Role.KLASSENLEHRER.value == "klassenlehrer"
assert Role.FACHLEHRER.value == "fachlehrer"
assert Role.ZEUGNISBEAUFTRAGTER.value == "zeugnisbeauftragter"
assert Role.SEKRETARIAT.value == "sekretariat"
def test_all_leadership_roles_exist(self):
"""All leadership roles are defined."""
assert Role.FACHVORSITZ.value == "fachvorsitz"
assert Role.PRUEFUNGSVORSITZ.value == "pruefungsvorsitz"
assert Role.SCHULLEITUNG.value == "schulleitung"
assert Role.STUFENLEITUNG.value == "stufenleitung"
def test_all_admin_roles_exist(self):
"""All admin roles are defined."""
assert Role.SCHUL_ADMIN.value == "schul_admin"
assert Role.LAND_ADMIN.value == "land_admin"
def test_all_special_roles_exist(self):
"""All special roles are defined."""
assert Role.AUDITOR.value == "auditor"
assert Role.OPERATOR.value == "operator"
assert Role.TEACHER_ASSISTANT.value == "teacher_assistant"
assert Role.EXAM_AUTHOR.value == "exam_author"
def test_role_string_behavior(self):
"""Role enum behaves as string."""
role = Role.ERSTKORREKTOR
assert str(role) == "Role.ERSTKORREKTOR"
assert role.value == "erstkorrektor"
assert f"{role.value}" == "erstkorrektor"
class TestActionEnum:
"""Tests for Action Enum."""
def test_crud_actions_exist(self):
"""Basic CRUD actions are defined."""
assert Action.CREATE.value == "create"
assert Action.READ.value == "read"
assert Action.UPDATE.value == "update"
assert Action.DELETE.value == "delete"
def test_role_actions_exist(self):
"""Role management actions are defined."""
assert Action.ASSIGN_ROLE.value == "assign_role"
assert Action.INVITE_USER.value == "invite_user"
assert Action.REMOVE_USER.value == "remove_user"
def test_file_actions_exist(self):
"""File operations are defined."""
assert Action.UPLOAD.value == "upload"
assert Action.DOWNLOAD.value == "download"
def test_workflow_actions_exist(self):
"""Workflow actions are defined."""
assert Action.LOCK.value == "lock"
assert Action.UNLOCK.value == "unlock"
assert Action.SIGN_OFF.value == "sign_off"
def test_special_actions_exist(self):
"""Special actions are defined."""
assert Action.SHARE_KEY.value == "share_key"
assert Action.VIEW_PII.value == "view_pii"
assert Action.BREAK_GLASS.value == "break_glass"
assert Action.PUBLISH_OFFICIAL.value == "publish_official"
class TestResourceTypeEnum:
"""Tests for ResourceType Enum."""
def test_tenant_resources_exist(self):
"""Tenant-level resources are defined."""
assert ResourceType.TENANT.value == "tenant"
assert ResourceType.NAMESPACE.value == "namespace"
def test_klausur_resources_exist(self):
"""Klausur-related resources are defined."""
assert ResourceType.EXAM_PACKAGE.value == "exam_package"
assert ResourceType.STUDENT_WORK.value == "student_work"
assert ResourceType.EH_DOCUMENT.value == "eh_document"
assert ResourceType.RUBRIC.value == "rubric"
assert ResourceType.ANNOTATION.value == "annotation"
assert ResourceType.EVALUATION.value == "evaluation"
assert ResourceType.REPORT.value == "report"
assert ResourceType.GRADE_DECISION.value == "grade_decision"
def test_zeugnis_resources_exist(self):
"""Zeugnis-related resources are defined."""
assert ResourceType.ZEUGNIS.value == "zeugnis"
assert ResourceType.ZEUGNIS_VORLAGE.value == "zeugnis_vorlage"
assert ResourceType.ZEUGNIS_ENTWURF.value == "zeugnis_entwurf"
assert ResourceType.SCHUELER_DATEN.value == "schueler_daten"
assert ResourceType.FACHNOTE.value == "fachnote"
assert ResourceType.KOPFNOTE.value == "kopfnote"
assert ResourceType.FEHLZEITEN.value == "fehlzeiten"
assert ResourceType.BEMERKUNG.value == "bemerkung"
assert ResourceType.KONFERENZ_BESCHLUSS.value == "konferenz_beschluss"
assert ResourceType.VERSETZUNG.value == "versetzung"
def test_general_resources_exist(self):
"""General resources are defined."""
assert ResourceType.DOCUMENT.value == "document"
assert ResourceType.TEMPLATE.value == "template"
assert ResourceType.EXPORT.value == "export"
assert ResourceType.AUDIT_LOG.value == "audit_log"
assert ResourceType.KEY_MATERIAL.value == "key_material"
class TestVerfahrenType:
"""Tests for VerfahrenType Enum."""
def test_exam_types_exist(self):
"""All exam types are defined."""
assert VerfahrenType.ABITUR.value == "abitur"
assert VerfahrenType.VORABITUR.value == "vorabitur"
assert VerfahrenType.KLAUSUR.value == "klausur"
assert VerfahrenType.NACHPRUEFUNG.value == "nachpruefung"
def test_certificate_types_exist(self):
"""All certificate types are defined."""
assert VerfahrenType.HALBJAHRESZEUGNIS.value == "halbjahreszeugnis"
assert VerfahrenType.JAHRESZEUGNIS.value == "jahreszeugnis"
assert VerfahrenType.ABSCHLUSSZEUGNIS.value == "abschlusszeugnis"
assert VerfahrenType.ABGANGSZEUGNIS.value == "abgangszeugnis"
def test_is_exam_type_returns_true_for_exams(self):
"""is_exam_type() returns True for exam types."""
assert VerfahrenType.is_exam_type("abitur") is True
assert VerfahrenType.is_exam_type("vorabitur") is True
assert VerfahrenType.is_exam_type("klausur") is True
assert VerfahrenType.is_exam_type("nachpruefung") is True
def test_is_exam_type_returns_false_for_certificates(self):
"""is_exam_type() returns False for certificate types."""
assert VerfahrenType.is_exam_type("halbjahreszeugnis") is False
assert VerfahrenType.is_exam_type("jahreszeugnis") is False
assert VerfahrenType.is_exam_type("abschlusszeugnis") is False
assert VerfahrenType.is_exam_type("abgangszeugnis") is False
def test_is_exam_type_returns_false_for_invalid(self):
"""is_exam_type() returns False for invalid types."""
assert VerfahrenType.is_exam_type("invalid") is False
assert VerfahrenType.is_exam_type("") is False
def test_is_certificate_type_returns_true_for_certificates(self):
"""is_certificate_type() returns True for certificate types."""
assert VerfahrenType.is_certificate_type("halbjahreszeugnis") is True
assert VerfahrenType.is_certificate_type("jahreszeugnis") is True
assert VerfahrenType.is_certificate_type("abschlusszeugnis") is True
assert VerfahrenType.is_certificate_type("abgangszeugnis") is True
def test_is_certificate_type_returns_false_for_exams(self):
"""is_certificate_type() returns False for exam types."""
assert VerfahrenType.is_certificate_type("abitur") is False
assert VerfahrenType.is_certificate_type("vorabitur") is False
assert VerfahrenType.is_certificate_type("klausur") is False
assert VerfahrenType.is_certificate_type("nachpruefung") is False
def test_is_certificate_type_returns_false_for_invalid(self):
"""is_certificate_type() returns False for invalid types."""
assert VerfahrenType.is_certificate_type("invalid") is False
assert VerfahrenType.is_certificate_type("") is False
class TestZKVisibilityMode:
"""Tests for ZK Visibility Mode Enum."""
def test_all_modes_exist(self):
"""All visibility modes are defined."""
assert ZKVisibilityMode.BLIND.value == "blind"
assert ZKVisibilityMode.SEMI.value == "semi"
assert ZKVisibilityMode.FULL.value == "full"
class TestEHVisibilityMode:
"""Tests for EH Visibility Mode Enum."""
def test_all_modes_exist(self):
"""All visibility modes are defined."""
assert EHVisibilityMode.BLIND.value == "blind"
assert EHVisibilityMode.SHARED.value == "shared"
# =============================================
# DATA STRUCTURE TESTS
# =============================================
class TestPolicySet:
"""Tests for PolicySet dataclass."""
def test_create_exam_policy(self, sample_policy):
"""PolicySet for exam can be created."""
assert sample_policy.id == "TEST-2025-ABITUR"
assert sample_policy.bundesland == "niedersachsen"
assert sample_policy.jahr == 2025
assert sample_policy.verfahren == "abitur"
def test_create_zeugnis_policy(self, sample_zeugnis_policy):
"""PolicySet for certificate can be created."""
assert sample_zeugnis_policy.verfahren == "halbjahreszeugnis"
assert sample_zeugnis_policy.require_klassenlehrer_approval is True
assert sample_zeugnis_policy.kopfnoten_enabled is True
def test_is_exam_policy(self, sample_policy):
"""is_exam_policy() returns True for exam policies."""
assert sample_policy.is_exam_policy() is True
assert sample_policy.is_certificate_policy() is False
def test_is_certificate_policy(self, sample_zeugnis_policy):
"""is_certificate_policy() returns True for certificate policies."""
assert sample_zeugnis_policy.is_certificate_policy() is True
assert sample_zeugnis_policy.is_exam_policy() is False
def test_policy_to_dict(self, sample_policy):
"""PolicySet can be serialized to dict."""
d = sample_policy.to_dict()
assert d["id"] == "TEST-2025-ABITUR"
assert d["bundesland"] == "niedersachsen"
assert d["zk_visibility_mode"] == "full"
assert d["eh_visibility_mode"] == "shared"
assert "created_at" in d
def test_default_values(self):
"""PolicySet has sensible defaults."""
policy = PolicySet(
id="test",
bundesland="test",
jahr=2025,
fach=None,
verfahren="abitur"
)
assert policy.zk_visibility_mode == ZKVisibilityMode.FULL
assert policy.eh_visibility_mode == EHVisibilityMode.SHARED
assert policy.allow_teacher_uploaded_eh is True
assert policy.third_correction_threshold == 4
class TestRoleAssignment:
"""Tests for RoleAssignment dataclass."""
def test_create_role_assignment(self):
"""RoleAssignment can be created."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
assert assignment.user_id == "user-001"
assert assignment.role == Role.ERSTKORREKTOR
assert assignment.is_active() is True
def test_is_active_returns_true_when_valid(self):
"""is_active() returns True for valid assignment."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
assert assignment.is_active() is True
def test_is_active_returns_false_when_revoked(self):
"""is_active() returns False when revoked."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001",
revoked_at=datetime.now(timezone.utc)
)
assert assignment.is_active() is False
def test_is_active_returns_false_when_expired(self):
"""is_active() returns False when expired."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001",
valid_to=datetime.now(timezone.utc) - timedelta(days=1)
)
assert assignment.is_active() is False
def test_is_active_returns_false_when_not_yet_valid(self):
"""is_active() returns False when not yet valid."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001",
valid_from=datetime.now(timezone.utc) + timedelta(days=1)
)
assert assignment.is_active() is False
def test_role_assignment_to_dict(self):
"""RoleAssignment can be serialized to dict."""
assignment = RoleAssignment(
id="assign-001",
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
d = assignment.to_dict()
assert d["id"] == "assign-001"
assert d["role"] == "erstkorrektor"
assert d["resource_type"] == "exam_package"
assert d["is_active"] is True
class TestKeyShare:
"""Tests for KeyShare dataclass."""
def test_create_key_share(self):
"""KeyShare can be created."""
share = KeyShare(
id=str(uuid.uuid4()),
user_id="user-002",
package_id="package-001",
permissions={"read_original", "read_eh"},
scope="full",
granted_by="user-001"
)
assert share.user_id == "user-002"
assert share.is_active() is True
assert "read_original" in share.permissions
def test_is_active_returns_true_when_accepted(self):
"""is_active() returns True when accepted."""
share = KeyShare(
id=str(uuid.uuid4()),
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001",
invite_token="token-123",
accepted_at=datetime.now(timezone.utc)
)
assert share.is_active() is True
def test_is_active_returns_false_when_pending(self):
"""is_active() returns False when invite pending."""
share = KeyShare(
id=str(uuid.uuid4()),
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001",
invite_token="token-123" # Not accepted
)
assert share.is_active() is False
def test_is_active_returns_false_when_revoked(self):
"""is_active() returns False when revoked."""
share = KeyShare(
id=str(uuid.uuid4()),
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001",
revoked_at=datetime.now(timezone.utc)
)
assert share.is_active() is False
def test_key_share_to_dict(self):
"""KeyShare can be serialized to dict."""
share = KeyShare(
id="share-001",
user_id="user-002",
package_id="package-001",
permissions={"read_original", "read_eh"},
scope="full",
granted_by="user-001"
)
d = share.to_dict()
assert d["id"] == "share-001"
assert "read_original" in d["permissions"]
assert d["is_active"] is True
class TestTenant:
"""Tests for Tenant dataclass."""
def test_create_tenant(self, sample_tenant):
"""Tenant can be created."""
assert sample_tenant.id == "school-001"
assert sample_tenant.bundesland == "niedersachsen"
assert sample_tenant.encryption_enabled is True
def test_tenant_to_dict(self, sample_tenant):
"""Tenant can be serialized to dict."""
d = sample_tenant.to_dict()
assert d["id"] == "school-001"
assert d["tenant_type"] == "school"
assert "created_at" in d
class TestNamespace:
"""Tests for Namespace dataclass."""
def test_create_namespace(self, sample_namespace):
"""Namespace can be created."""
assert sample_namespace.id == "namespace-001"
assert sample_namespace.fach == "deutsch"
assert sample_namespace.pruefungsart == "abitur"
def test_namespace_to_dict(self, sample_namespace):
"""Namespace can be serialized to dict."""
d = sample_namespace.to_dict()
assert d["id"] == "namespace-001"
assert d["jahr"] == 2025
assert d["fach"] == "deutsch"
class TestExamPackage:
"""Tests for ExamPackage dataclass."""
def test_create_exam_package(self, sample_exam_package):
"""ExamPackage can be created."""
assert sample_exam_package.id == "package-001"
assert sample_exam_package.status == "draft"
assert sample_exam_package.owner_id == "teacher-001"
def test_exam_package_to_dict(self, sample_exam_package):
"""ExamPackage can be serialized to dict."""
d = sample_exam_package.to_dict()
assert d["id"] == "package-001"
assert d["status"] == "draft"
assert "created_at" in d
# =============================================
# POLICY ENGINE TESTS
# =============================================
class TestPolicyEngineRegistration:
"""Tests for PolicyEngine policy registration."""
def test_register_policy_set(self, policy_engine, sample_policy):
"""PolicySet can be registered."""
policy_engine.register_policy_set(sample_policy)
assert sample_policy.id in policy_engine.policy_sets
def test_get_policy_for_context_exact_match(self, policy_engine, sample_policy):
"""get_policy_for_context() returns exact match."""
policy_engine.register_policy_set(sample_policy)
found = policy_engine.get_policy_for_context(
bundesland="niedersachsen",
jahr=2025,
fach="deutsch",
verfahren="abitur"
)
assert found is not None
assert found.id == sample_policy.id
def test_get_policy_for_context_fallback_to_default(self, policy_engine):
"""get_policy_for_context() falls back to DEFAULT."""
default_policy = PolicySet(
id="DEFAULT-2025",
bundesland="DEFAULT",
jahr=2025,
fach=None,
verfahren="abitur"
)
policy_engine.register_policy_set(default_policy)
found = policy_engine.get_policy_for_context(
bundesland="unknown-land",
jahr=2025,
verfahren="abitur"
)
assert found is not None
assert found.bundesland == "DEFAULT"
def test_get_policy_for_context_returns_none_when_no_match(self, policy_engine):
"""get_policy_for_context() returns None when no match."""
found = policy_engine.get_policy_for_context(
bundesland="unknown",
jahr=2025,
verfahren="abitur"
)
assert found is None
class TestPolicyEngineRoleAssignment:
"""Tests for PolicyEngine role assignment."""
def test_assign_role_creates_assignment(self, policy_engine):
"""assign_role() creates a RoleAssignment."""
assignment = policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
assert assignment is not None
assert assignment.role == Role.ERSTKORREKTOR
assert "user-001" in policy_engine.role_assignments
def test_get_user_roles_returns_active_roles(self, policy_engine):
"""get_user_roles() returns active roles."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
policy_engine.assign_role(
user_id="user-001",
role=Role.FACHVORSITZ,
resource_type=ResourceType.NAMESPACE,
resource_id="namespace-001",
granted_by="admin-001"
)
roles = policy_engine.get_user_roles("user-001")
assert Role.ERSTKORREKTOR in roles
assert Role.FACHVORSITZ in roles
def test_get_user_roles_filters_by_resource_type(self, policy_engine):
"""get_user_roles() filters by resource type."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
roles = policy_engine.get_user_roles(
"user-001",
resource_type=ResourceType.EXAM_PACKAGE
)
assert Role.ERSTKORREKTOR in roles
roles = policy_engine.get_user_roles(
"user-001",
resource_type=ResourceType.NAMESPACE
)
assert Role.ERSTKORREKTOR not in roles
def test_get_user_roles_filters_by_resource_id(self, policy_engine):
"""get_user_roles() filters by resource ID."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
roles = policy_engine.get_user_roles(
"user-001",
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001"
)
assert Role.ERSTKORREKTOR in roles
roles = policy_engine.get_user_roles(
"user-001",
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-other"
)
assert Role.ERSTKORREKTOR not in roles
def test_revoke_role_deactivates_assignment(self, policy_engine):
"""revoke_role() deactivates the assignment."""
assignment = policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
result = policy_engine.revoke_role(assignment.id, "admin-001")
assert result is True
roles = policy_engine.get_user_roles("user-001")
assert Role.ERSTKORREKTOR not in roles
def test_revoke_role_returns_false_for_invalid_id(self, policy_engine):
"""revoke_role() returns False for invalid assignment ID."""
result = policy_engine.revoke_role("invalid-id", "admin-001")
assert result is False
class TestPolicyEngineKeyShare:
"""Tests for PolicyEngine key sharing."""
def test_create_key_share(self, policy_engine):
"""create_key_share() creates a KeyShare."""
share = policy_engine.create_key_share(
user_id="user-002",
package_id="package-001",
permissions={"read_original", "read_eh"},
granted_by="user-001"
)
assert share is not None
assert share.user_id == "user-002"
assert "user-002" in policy_engine.key_shares
def test_create_key_share_with_invite_token(self, policy_engine):
"""create_key_share() with invite token creates pending share."""
share = policy_engine.create_key_share(
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001",
invite_token="token-123"
)
assert share.invite_token == "token-123"
assert share.is_active() is False
def test_accept_key_share(self, policy_engine):
"""accept_key_share() activates the share."""
share = policy_engine.create_key_share(
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001",
invite_token="token-123"
)
result = policy_engine.accept_key_share(share.id, "token-123")
assert result is True
assert share.is_active() is True
def test_accept_key_share_wrong_token(self, policy_engine):
"""accept_key_share() fails with wrong token."""
share = policy_engine.create_key_share(
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001",
invite_token="token-123"
)
result = policy_engine.accept_key_share(share.id, "wrong-token")
assert result is False
def test_revoke_key_share(self, policy_engine):
"""revoke_key_share() deactivates the share."""
share = policy_engine.create_key_share(
user_id="user-002",
package_id="package-001",
permissions={"read_original"},
granted_by="user-001"
)
result = policy_engine.revoke_key_share(share.id, "user-001")
assert result is True
assert share.is_active() is False
class TestPolicyEnginePermissionCheck:
"""Tests for PolicyEngine permission checking."""
def test_check_permission_returns_true_for_valid(self, policy_engine):
"""check_permission() returns True for valid permission."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
result = policy_engine.check_permission(
user_id="user-001",
action=Action.READ,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001"
)
assert result is True
def test_check_permission_returns_false_without_role(self, policy_engine):
"""check_permission() returns False without role."""
result = policy_engine.check_permission(
user_id="user-001",
action=Action.READ,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001"
)
assert result is False
def test_check_permission_returns_false_for_disallowed_action(self, policy_engine):
"""check_permission() returns False for disallowed action."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ZWEITKORREKTOR, # ZK cannot SHARE_KEY
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
result = policy_engine.check_permission(
user_id="user-001",
action=Action.SHARE_KEY,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001"
)
assert result is False
def test_check_permission_with_key_share_required(self, policy_engine):
"""check_permission() checks key share when package_id specified."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
# Without key share
result = policy_engine.check_permission(
user_id="user-001",
action=Action.READ,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
package_id="package-001"
)
assert result is False
# With key share
policy_engine.create_key_share(
user_id="user-001",
package_id="package-001",
permissions={"read_original"},
granted_by="admin-001"
)
result = policy_engine.check_permission(
user_id="user-001",
action=Action.READ,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
package_id="package-001"
)
assert result is True
def test_check_permission_with_semi_visibility_policy(self, policy_engine):
"""check_permission() applies SEMI visibility for ZK."""
semi_policy = PolicySet(
id="TEST-SEMI",
bundesland="bayern",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.SEMI
)
policy_engine.assign_role(
user_id="user-002",
role=Role.ZWEITKORREKTOR,
resource_type=ResourceType.GRADE_DECISION,
resource_id="decision-001",
granted_by="admin-001"
)
result = policy_engine.check_permission(
user_id="user-002",
action=Action.READ,
resource_type=ResourceType.GRADE_DECISION,
resource_id="decision-001",
policy=semi_policy
)
assert result is False # SEMI mode blocks grade decision reading
def test_get_allowed_actions(self, policy_engine):
"""get_allowed_actions() returns all allowed actions."""
policy_engine.assign_role(
user_id="user-001",
role=Role.ERSTKORREKTOR,
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001",
granted_by="admin-001"
)
allowed = policy_engine.get_allowed_actions(
user_id="user-001",
resource_type=ResourceType.EXAM_PACKAGE,
resource_id="package-001"
)
assert Action.READ in allowed
assert Action.UPDATE in allowed
assert Action.SHARE_KEY in allowed
assert Action.LOCK in allowed
# =============================================
# DEFAULT PERMISSIONS TESTS
# =============================================
class TestDefaultPermissions:
"""Tests for DEFAULT_PERMISSIONS matrix."""
def test_erstkorrektor_has_exam_permissions(self):
"""ERSTKORREKTOR has exam package permissions."""
perms = DEFAULT_PERMISSIONS[Role.ERSTKORREKTOR]
assert Action.READ in perms[ResourceType.EXAM_PACKAGE]
assert Action.UPDATE in perms[ResourceType.EXAM_PACKAGE]
assert Action.SHARE_KEY in perms[ResourceType.EXAM_PACKAGE]
assert Action.LOCK in perms[ResourceType.EXAM_PACKAGE]
def test_zweitkorrektor_has_limited_permissions(self):
"""ZWEITKORREKTOR has limited permissions."""
perms = DEFAULT_PERMISSIONS[Role.ZWEITKORREKTOR]
assert Action.READ in perms[ResourceType.EXAM_PACKAGE]
assert Action.SHARE_KEY not in perms[ResourceType.EXAM_PACKAGE]
def test_klassenlehrer_has_zeugnis_permissions(self):
"""KLASSENLEHRER has certificate permissions."""
perms = DEFAULT_PERMISSIONS[Role.KLASSENLEHRER]
assert Action.CREATE in perms[ResourceType.ZEUGNIS]
assert Action.READ in perms[ResourceType.ZEUGNIS]
assert Action.UPDATE in perms[ResourceType.ZEUGNIS]
assert Action.CREATE in perms[ResourceType.KOPFNOTE]
assert Action.CREATE in perms[ResourceType.BEMERKUNG]
def test_fachlehrer_has_fachnote_permissions(self):
"""FACHLEHRER has grade entry permissions."""
perms = DEFAULT_PERMISSIONS[Role.FACHLEHRER]
assert Action.CREATE in perms[ResourceType.FACHNOTE]
assert Action.READ in perms[ResourceType.FACHNOTE]
assert Action.UPDATE in perms[ResourceType.FACHNOTE]
# FACHLEHRER cannot modify Kopfnoten
assert ResourceType.KOPFNOTE not in perms
def test_zeugnisbeauftragter_has_review_permissions(self):
"""ZEUGNISBEAUFTRAGTER has review permissions."""
perms = DEFAULT_PERMISSIONS[Role.ZEUGNISBEAUFTRAGTER]
assert Action.READ in perms[ResourceType.ZEUGNIS]
assert Action.UPDATE in perms[ResourceType.ZEUGNIS]
assert Action.UPLOAD in perms[ResourceType.ZEUGNIS_VORLAGE]
def test_sekretariat_has_print_permissions(self):
"""SEKRETARIAT has print/export permissions."""
perms = DEFAULT_PERMISSIONS[Role.SEKRETARIAT]
assert Action.READ in perms[ResourceType.ZEUGNIS]
assert Action.DOWNLOAD in perms[ResourceType.ZEUGNIS]
assert Action.CREATE in perms[ResourceType.EXPORT]
def test_schulleitung_has_signoff_permissions(self):
"""SCHULLEITUNG has sign-off permissions."""
perms = DEFAULT_PERMISSIONS[Role.SCHULLEITUNG]
assert Action.SIGN_OFF in perms[ResourceType.ZEUGNIS]
assert Action.LOCK in perms[ResourceType.ZEUGNIS]
assert Action.SIGN_OFF in perms[ResourceType.VERSETZUNG]
assert Action.SIGN_OFF in perms[ResourceType.KONFERENZ_BESCHLUSS]
def test_stufenleitung_has_coordination_permissions(self):
"""STUFENLEITUNG has coordination permissions."""
perms = DEFAULT_PERMISSIONS[Role.STUFENLEITUNG]
assert Action.READ in perms[ResourceType.ZEUGNIS]
assert Action.UPDATE in perms[ResourceType.ZEUGNIS]
assert Action.UPDATE in perms[ResourceType.VERSETZUNG]
def test_land_admin_has_publish_permissions(self):
"""LAND_ADMIN has official EH publish permissions."""
perms = DEFAULT_PERMISSIONS[Role.LAND_ADMIN]
assert Action.PUBLISH_OFFICIAL in perms[ResourceType.EH_DOCUMENT]
def test_auditor_has_limited_read_access(self):
"""AUDITOR has limited read-only access."""
perms = DEFAULT_PERMISSIONS[Role.AUDITOR]
assert Action.READ in perms[ResourceType.AUDIT_LOG]
assert Action.READ in perms[ResourceType.EXAM_PACKAGE]
# No write permissions
assert ResourceType.STUDENT_WORK not in perms
# =============================================
# ZEUGNIS WORKFLOW TESTS
# =============================================
class TestZeugnisWorkflow:
"""Tests for certificate workflow role chain."""
def test_fachlehrer_can_enter_grades(self, policy_engine):
"""FACHLEHRER can enter grades for their subject."""
policy_engine.assign_role(
user_id="lehrer-mathe",
role=Role.FACHLEHRER,
resource_type=ResourceType.FACHNOTE,
resource_id="klasse-7a-mathe",
granted_by="admin-001"
)
assert policy_engine.check_permission(
user_id="lehrer-mathe",
action=Action.CREATE,
resource_type=ResourceType.FACHNOTE,
resource_id="klasse-7a-mathe"
) is True
def test_klassenlehrer_can_create_zeugnis(self, policy_engine):
"""KLASSENLEHRER can create certificates."""
policy_engine.assign_role(
user_id="klassenlehrer-7a",
role=Role.KLASSENLEHRER,
resource_type=ResourceType.ZEUGNIS,
resource_id="klasse-7a",
granted_by="admin-001"
)
assert policy_engine.check_permission(
user_id="klassenlehrer-7a",
action=Action.CREATE,
resource_type=ResourceType.ZEUGNIS,
resource_id="klasse-7a"
) is True
def test_klassenlehrer_can_add_kopfnoten(self, policy_engine):
"""KLASSENLEHRER can add Kopfnoten."""
policy_engine.assign_role(
user_id="klassenlehrer-7a",
role=Role.KLASSENLEHRER,
resource_type=ResourceType.KOPFNOTE,
resource_id="klasse-7a",
granted_by="admin-001"
)
assert policy_engine.check_permission(
user_id="klassenlehrer-7a",
action=Action.CREATE,
resource_type=ResourceType.KOPFNOTE,
resource_id="klasse-7a"
) is True
def test_zeugnisbeauftragter_can_review(self, policy_engine):
"""ZEUGNISBEAUFTRAGTER can review certificates."""
policy_engine.assign_role(
user_id="zb-001",
role=Role.ZEUGNISBEAUFTRAGTER,
resource_type=ResourceType.ZEUGNIS,
resource_id="klasse-7a",
granted_by="admin-001"
)
assert policy_engine.check_permission(
user_id="zb-001",
action=Action.UPDATE,
resource_type=ResourceType.ZEUGNIS,
resource_id="klasse-7a"
) is True
def test_schulleitung_can_sign_off(self, policy_engine):
"""SCHULLEITUNG can sign off certificates."""
policy_engine.assign_role(
user_id="schulleiter",
role=Role.SCHULLEITUNG,
resource_type=ResourceType.ZEUGNIS,
resource_id="klasse-7a",
granted_by="admin-001"
)
assert policy_engine.check_permission(
user_id="schulleiter",
action=Action.SIGN_OFF,
resource_type=ResourceType.ZEUGNIS,
resource_id="klasse-7a"
) is True
def test_sekretariat_can_export(self, policy_engine):
"""SEKRETARIAT can export for printing."""
policy_engine.assign_role(
user_id="sekretariat-001",
role=Role.SEKRETARIAT,
resource_type=ResourceType.EXPORT,
resource_id="klasse-7a",
granted_by="admin-001"
)
assert policy_engine.check_permission(
user_id="sekretariat-001",
action=Action.CREATE,
resource_type=ResourceType.EXPORT,
resource_id="klasse-7a"
) is True
# =============================================
# CREATE DEFAULT POLICY SETS TESTS
# =============================================
class TestCreateDefaultPolicySets:
"""Tests for create_default_policy_sets()."""
def test_creates_default_policy(self):
"""Creates a DEFAULT fallback policy."""
policies = create_default_policy_sets()
default = next((p for p in policies if p.bundesland == "DEFAULT"), None)
assert default is not None
assert default.jahr == 2025
def test_creates_niedersachsen_policy(self):
"""Creates Niedersachsen-specific policy."""
policies = create_default_policy_sets()
ni = next((p for p in policies if p.bundesland == "niedersachsen"), None)
assert ni is not None
assert ni.zk_visibility_mode == ZKVisibilityMode.FULL
def test_creates_bayern_policy_with_semi_visibility(self):
"""Creates Bayern policy with SEMI visibility."""
policies = create_default_policy_sets()
by = next((p for p in policies if p.bundesland == "bayern"), None)
assert by is not None
assert by.zk_visibility_mode == ZKVisibilityMode.SEMI
def test_creates_policies_for_all_bundeslaender(self):
"""Creates policies for all 16 Bundeslaender."""
policies = create_default_policy_sets()
bundeslaender = {p.bundesland for p in policies}
expected = {
"DEFAULT", "baden-wuerttemberg", "bayern", "berlin", "brandenburg",
"bremen", "hamburg", "hessen", "mecklenburg-vorpommern",
"niedersachsen", "nordrhein-westfalen", "rheinland-pfalz",
"saarland", "sachsen", "sachsen-anhalt", "schleswig-holstein",
"thueringen"
}
assert bundeslaender == expected
# =============================================
# GET POLICY ENGINE SINGLETON TESTS
# =============================================
class TestGetPolicyEngine:
"""Tests for get_policy_engine() singleton."""
def test_returns_engine_instance(self):
"""get_policy_engine() returns PolicyEngine."""
engine = get_policy_engine()
assert isinstance(engine, PolicyEngine)
def test_returns_same_instance(self):
"""get_policy_engine() returns same instance."""
engine1 = get_policy_engine()
engine2 = get_policy_engine()
assert engine1 is engine2
def test_has_default_policies_registered(self):
"""Singleton has default policies registered."""
engine = get_policy_engine()
assert len(engine.policy_sets) > 0
assert "DEFAULT-2025" in engine.policy_sets
if __name__ == "__main__":
pytest.main([__file__, "-v"])