""" RBAC/ABAC Policy System for Klausur-Service Implements: - Role-Based Access Control (RBAC) with hierarchical roles - Attribute-Based Access Control (ABAC) via policy sets - Bundesland-specific policies - Key sharing for exam packages """ import json from enum import Enum from dataclasses import dataclass, field, asdict from typing import Optional, List, Dict, Set, Any from datetime import datetime, timezone import uuid # ============================================= # ENUMS: Roles, Actions, Resources # ============================================= class Role(str, Enum): """Fachliche Rollen in Korrektur- und Zeugniskette.""" # === Klausur-Korrekturkette === ERSTKORREKTOR = "erstkorrektor" # EK ZWEITKORREKTOR = "zweitkorrektor" # ZK DRITTKORREKTOR = "drittkorrektor" # DK # === Zeugnis-Workflow === KLASSENLEHRER = "klassenlehrer" # KL - Erstellt Zeugnis, Kopfnoten, Bemerkungen FACHLEHRER = "fachlehrer" # FL - Traegt Fachnoten ein ZEUGNISBEAUFTRAGTER = "zeugnisbeauftragter" # ZB - Qualitaetskontrolle SEKRETARIAT = "sekretariat" # SEK - Druck, Versand, Archivierung # === Leitung (Klausur + Zeugnis) === FACHVORSITZ = "fachvorsitz" # FVL - Fachpruefungsleitung PRUEFUNGSVORSITZ = "pruefungsvorsitz" # PV - Schulleitung / Pruefungsvorsitz SCHULLEITUNG = "schulleitung" # SL - Finale Zeugnis-Freigabe STUFENLEITUNG = "stufenleitung" # STL - Stufenkoordination # === Administration === SCHUL_ADMIN = "schul_admin" # SA LAND_ADMIN = "land_admin" # LA - Behoerde # === Spezial === AUDITOR = "auditor" # DSB/Auditor OPERATOR = "operator" # OPS - Support TEACHER_ASSISTANT = "teacher_assistant" # TA - Referendar EXAM_AUTHOR = "exam_author" # EA - nur Vorabi class Action(str, Enum): """Moegliche Operationen auf Ressourcen.""" CREATE = "create" READ = "read" UPDATE = "update" DELETE = "delete" ASSIGN_ROLE = "assign_role" INVITE_USER = "invite_user" REMOVE_USER = "remove_user" UPLOAD = "upload" DOWNLOAD = "download" LOCK = "lock" # Finalisieren UNLOCK = "unlock" # Nur mit Sonderrecht SIGN_OFF = "sign_off" # Freigabe SHARE_KEY = "share_key" # Key Share erzeugen VIEW_PII = "view_pii" # Falls PII vorhanden BREAK_GLASS = "break_glass" # Notfallzugriff PUBLISH_OFFICIAL = "publish_official" # Amtliche EH verteilen class ResourceType(str, Enum): """Ressourcentypen im System.""" TENANT = "tenant" NAMESPACE = "namespace" # === Klausur-Korrektur === EXAM_PACKAGE = "exam_package" STUDENT_WORK = "student_work" EH_DOCUMENT = "eh_document" RUBRIC = "rubric" # Punkteraster ANNOTATION = "annotation" EVALUATION = "evaluation" # Kriterien/Punkte REPORT = "report" # Gutachten GRADE_DECISION = "grade_decision" # === Zeugnisgenerator === ZEUGNIS = "zeugnis" # Zeugnisdokument ZEUGNIS_VORLAGE = "zeugnis_vorlage" # Zeugnisvorlage/Template ZEUGNIS_ENTWURF = "zeugnis_entwurf" # Zeugnisentwurf (vor Freigabe) SCHUELER_DATEN = "schueler_daten" # Schueler-Stammdaten, Noten FACHNOTE = "fachnote" # Einzelne Fachnote KOPFNOTE = "kopfnote" # Arbeits-/Sozialverhalten FEHLZEITEN = "fehlzeiten" # Fehlzeiten BEMERKUNG = "bemerkung" # Zeugnisbemerkungen KONFERENZ_BESCHLUSS = "konferenz_beschluss" # Konferenzergebnis VERSETZUNG = "versetzung" # Versetzungsentscheidung # === Allgemein === DOCUMENT = "document" # Generischer Dokumenttyp (EH, Vorlagen, etc.) TEMPLATE = "template" # Generische Vorlagen EXPORT = "export" AUDIT_LOG = "audit_log" KEY_MATERIAL = "key_material" class ZKVisibilityMode(str, Enum): """Sichtbarkeitsmodus fuer Zweitkorrektoren.""" BLIND = "blind" # ZK sieht keine EK-Note/Gutachten SEMI = "semi" # ZK sieht Annotationen, aber keine Note FULL = "full" # ZK sieht alles class EHVisibilityMode(str, Enum): """Sichtbarkeitsmodus fuer Erwartungshorizonte.""" BLIND = "blind" # ZK sieht EH nicht (selten) SHARED = "shared" # ZK sieht EH (Standard) class VerfahrenType(str, Enum): """Verfahrenstypen fuer Klausuren und Zeugnisse.""" # === Klausur/Pruefungsverfahren === ABITUR = "abitur" VORABITUR = "vorabitur" KLAUSUR = "klausur" NACHPRUEFUNG = "nachpruefung" # === Zeugnisverfahren === HALBJAHRESZEUGNIS = "halbjahreszeugnis" JAHRESZEUGNIS = "jahreszeugnis" ABSCHLUSSZEUGNIS = "abschlusszeugnis" ABGANGSZEUGNIS = "abgangszeugnis" @classmethod def is_exam_type(cls, verfahren: str) -> bool: """Pruefe ob Verfahren ein Pruefungstyp ist.""" exam_types = {cls.ABITUR, cls.VORABITUR, cls.KLAUSUR, cls.NACHPRUEFUNG} try: return cls(verfahren) in exam_types except ValueError: return False @classmethod def is_certificate_type(cls, verfahren: str) -> bool: """Pruefe ob Verfahren ein Zeugnistyp ist.""" cert_types = {cls.HALBJAHRESZEUGNIS, cls.JAHRESZEUGNIS, cls.ABSCHLUSSZEUGNIS, cls.ABGANGSZEUGNIS} try: return cls(verfahren) in cert_types except ValueError: return False # ============================================= # DATA STRUCTURES # ============================================= @dataclass class PolicySet: """ Policy-Konfiguration pro Bundesland/Jahr/Fach. Ermoeglicht bundesland-spezifische Unterschiede ohne harte Codierung im Quellcode. Unterstuetzte Verfahrenstypen: - Pruefungen: abitur, vorabitur, klausur, nachpruefung - Zeugnisse: halbjahreszeugnis, jahreszeugnis, abschlusszeugnis, abgangszeugnis """ id: str bundesland: str jahr: int fach: Optional[str] # None = gilt fuer alle Faecher verfahren: str # See VerfahrenType enum # Sichtbarkeitsregeln (Klausur) zk_visibility_mode: ZKVisibilityMode = ZKVisibilityMode.FULL eh_visibility_mode: EHVisibilityMode = EHVisibilityMode.SHARED # EH-Quellen (Klausur) allow_teacher_uploaded_eh: bool = True allow_land_uploaded_eh: bool = True require_rights_confirmation_on_upload: bool = True require_dual_control_for_official_eh_update: bool = False # Korrekturregeln (Klausur) third_correction_threshold: int = 4 # Notenpunkte Abweichung final_signoff_role: str = "fachvorsitz" # Zeugnisregeln (Zeugnis) require_klassenlehrer_approval: bool = True require_schulleitung_signoff: bool = True allow_sekretariat_edit_after_approval: bool = False konferenz_protokoll_required: bool = True bemerkungen_require_review: bool = True fehlzeiten_auto_import: bool = True kopfnoten_enabled: bool = False versetzung_auto_calculate: bool = True # Export & Anzeige quote_verbatim_allowed: bool = False # Amtliche Texte in UI export_template_id: str = "default" # Zusaetzliche Flags flags: Dict[str, Any] = field(default_factory=dict) created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) def is_exam_policy(self) -> bool: """Pruefe ob diese Policy fuer Pruefungen ist.""" return VerfahrenType.is_exam_type(self.verfahren) def is_certificate_policy(self) -> bool: """Pruefe ob diese Policy fuer Zeugnisse ist.""" return VerfahrenType.is_certificate_type(self.verfahren) def to_dict(self): d = asdict(self) d['zk_visibility_mode'] = self.zk_visibility_mode.value d['eh_visibility_mode'] = self.eh_visibility_mode.value d['created_at'] = self.created_at.isoformat() return d @dataclass class RoleAssignment: """ Zuweisung einer Rolle zu einem User fuer eine spezifische Ressource. """ id: str user_id: str role: Role resource_type: ResourceType resource_id: str # Optionale Einschraenkungen tenant_id: Optional[str] = None namespace_id: Optional[str] = None # Gueltigkeit valid_from: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) valid_to: Optional[datetime] = None # Metadaten granted_by: str = "" granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) revoked_at: Optional[datetime] = None def is_active(self) -> bool: now = datetime.now(timezone.utc) if self.revoked_at: return False if self.valid_to and now > self.valid_to: return False return now >= self.valid_from def to_dict(self): return { 'id': self.id, 'user_id': self.user_id, 'role': self.role.value, 'resource_type': self.resource_type.value, 'resource_id': self.resource_id, 'tenant_id': self.tenant_id, 'namespace_id': self.namespace_id, 'valid_from': self.valid_from.isoformat(), 'valid_to': self.valid_to.isoformat() if self.valid_to else None, 'granted_by': self.granted_by, 'granted_at': self.granted_at.isoformat(), 'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None, 'is_active': self.is_active() } @dataclass class KeyShare: """ Berechtigung fuer einen User, auf verschluesselte Inhalte zuzugreifen. Ein KeyShare ist KEIN Schluessel im Klartext, sondern eine Berechtigung in Verbindung mit Role Assignment. """ id: str user_id: str package_id: str # Berechtigungsumfang permissions: Set[str] = field(default_factory=set) # z.B. {"read_original", "read_eh", "read_ek_outputs", "write_annotations"} # Optionale Einschraenkungen scope: str = "full" # "full", "original_only", "eh_only", "outputs_only" # Kette granted_by: str = "" granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) # Akzeptanz (fuer Invite-Flow) invite_token: Optional[str] = None accepted_at: Optional[datetime] = None # Widerruf revoked_at: Optional[datetime] = None revoked_by: Optional[str] = None def is_active(self) -> bool: return self.revoked_at is None and ( self.invite_token is None or self.accepted_at is not None ) def to_dict(self): return { 'id': self.id, 'user_id': self.user_id, 'package_id': self.package_id, 'permissions': list(self.permissions), 'scope': self.scope, 'granted_by': self.granted_by, 'granted_at': self.granted_at.isoformat(), 'invite_token': self.invite_token, 'accepted_at': self.accepted_at.isoformat() if self.accepted_at else None, 'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None, 'is_active': self.is_active() } @dataclass class Tenant: """ Hoechste Isolationseinheit - typischerweise eine Schule. """ id: str name: str bundesland: str tenant_type: str = "school" # "school", "pruefungszentrum", "behoerde" # Verschluesselung encryption_enabled: bool = True # Metadaten created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) deleted_at: Optional[datetime] = None def to_dict(self): return { 'id': self.id, 'name': self.name, 'bundesland': self.bundesland, 'tenant_type': self.tenant_type, 'encryption_enabled': self.encryption_enabled, 'created_at': self.created_at.isoformat() } @dataclass class Namespace: """ Arbeitsraum innerhalb eines Tenants. z.B. "Abitur 2026 - Deutsch LK - Kurs 12a" """ id: str tenant_id: str name: str # Kontext jahr: int fach: str kurs: Optional[str] = None pruefungsart: str = "abitur" # "abitur", "vorabitur" # Policy policy_set_id: Optional[str] = None # Metadaten created_by: str = "" created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) deleted_at: Optional[datetime] = None def to_dict(self): return { 'id': self.id, 'tenant_id': self.tenant_id, 'name': self.name, 'jahr': self.jahr, 'fach': self.fach, 'kurs': self.kurs, 'pruefungsart': self.pruefungsart, 'policy_set_id': self.policy_set_id, 'created_by': self.created_by, 'created_at': self.created_at.isoformat() } @dataclass class ExamPackage: """ Pruefungspaket - kompletter Satz Arbeiten mit allen Artefakten. """ id: str namespace_id: str tenant_id: str name: str beschreibung: Optional[str] = None # Workflow-Status status: str = "draft" # "draft", "in_progress", "locked", "signed_off" # Beteiligte (Rollen werden separat zugewiesen) owner_id: str = "" # Typischerweise EK # Verschluesselung encryption_key_id: Optional[str] = None # Timestamps created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) locked_at: Optional[datetime] = None signed_off_at: Optional[datetime] = None signed_off_by: Optional[str] = None def to_dict(self): return { 'id': self.id, 'namespace_id': self.namespace_id, 'tenant_id': self.tenant_id, 'name': self.name, 'beschreibung': self.beschreibung, 'status': self.status, 'owner_id': self.owner_id, 'created_at': self.created_at.isoformat(), 'locked_at': self.locked_at.isoformat() if self.locked_at else None, 'signed_off_at': self.signed_off_at.isoformat() if self.signed_off_at else None, 'signed_off_by': self.signed_off_by } # ============================================= # RBAC PERMISSION MATRIX # ============================================= # Standard-Berechtigungsmatrix (kann durch Policies ueberschrieben werden) DEFAULT_PERMISSIONS: Dict[Role, Dict[ResourceType, Set[Action]]] = { # Erstkorrektor Role.ERSTKORREKTOR: { ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.SHARE_KEY, Action.LOCK}, ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE}, ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE}, ResourceType.RUBRIC: {Action.READ, Action.UPDATE}, ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE}, ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Zweitkorrektor (Standard: FULL visibility) Role.ZWEITKORREKTOR: { ResourceType.EXAM_PACKAGE: {Action.READ}, ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE}, ResourceType.EH_DOCUMENT: {Action.READ}, ResourceType.RUBRIC: {Action.READ}, ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Drittkorrektor Role.DRITTKORREKTOR: { ResourceType.EXAM_PACKAGE: {Action.READ}, ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE}, ResourceType.EH_DOCUMENT: {Action.READ}, ResourceType.RUBRIC: {Action.READ}, ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Fachvorsitz Role.FACHVORSITZ: { ResourceType.TENANT: {Action.READ}, ResourceType.NAMESPACE: {Action.READ, Action.UPDATE}, ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.LOCK, Action.UNLOCK, Action.SIGN_OFF}, ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE}, ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE}, ResourceType.RUBRIC: {Action.READ, Action.UPDATE}, ResourceType.ANNOTATION: {Action.READ, Action.UPDATE}, ResourceType.EVALUATION: {Action.READ, Action.UPDATE}, ResourceType.REPORT: {Action.READ, Action.UPDATE}, ResourceType.GRADE_DECISION: {Action.READ, Action.UPDATE, Action.SIGN_OFF}, ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Pruefungsvorsitz Role.PRUEFUNGSVORSITZ: { ResourceType.TENANT: {Action.READ}, ResourceType.NAMESPACE: {Action.READ, Action.CREATE}, ResourceType.EXAM_PACKAGE: {Action.READ, Action.SIGN_OFF}, ResourceType.STUDENT_WORK: {Action.READ}, ResourceType.EH_DOCUMENT: {Action.READ}, ResourceType.GRADE_DECISION: {Action.READ, Action.SIGN_OFF}, ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Schul-Admin Role.SCHUL_ADMIN: { ResourceType.TENANT: {Action.READ, Action.UPDATE}, ResourceType.NAMESPACE: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE}, ResourceType.EXAM_PACKAGE: {Action.CREATE, Action.READ, Action.DELETE, Action.ASSIGN_ROLE}, ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.DELETE}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Land-Admin (Behoerde) Role.LAND_ADMIN: { ResourceType.TENANT: {Action.READ}, ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE, Action.DELETE, Action.PUBLISH_OFFICIAL}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Auditor Role.AUDITOR: { ResourceType.AUDIT_LOG: {Action.READ}, ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten # Kein Zugriff auf Inhalte! }, # Operator Role.OPERATOR: { ResourceType.TENANT: {Action.READ}, ResourceType.NAMESPACE: {Action.READ}, ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten ResourceType.AUDIT_LOG: {Action.READ}, # Break-glass separat gehandhabt }, # Teacher Assistant Role.TEACHER_ASSISTANT: { ResourceType.STUDENT_WORK: {Action.READ}, ResourceType.ANNOTATION: {Action.CREATE, Action.READ}, # Nur bestimmte Typen ResourceType.EH_DOCUMENT: {Action.READ}, }, # Exam Author (nur Vorabi) Role.EXAM_AUTHOR: { ResourceType.EH_DOCUMENT: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE}, ResourceType.RUBRIC: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE}, }, # ============================================= # ZEUGNIS-WORKFLOW ROLLEN # ============================================= # Klassenlehrer - Erstellt Zeugnisse, Kopfnoten, Bemerkungen Role.KLASSENLEHRER: { ResourceType.NAMESPACE: {Action.READ}, ResourceType.ZEUGNIS: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS_ENTWURF: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE}, ResourceType.ZEUGNIS_VORLAGE: {Action.READ}, ResourceType.SCHUELER_DATEN: {Action.READ, Action.UPDATE}, ResourceType.FACHNOTE: {Action.READ}, # Liest Fachnoten der Fachlehrer ResourceType.KOPFNOTE: {Action.CREATE, Action.READ, Action.UPDATE}, ResourceType.FEHLZEITEN: {Action.READ, Action.UPDATE}, ResourceType.BEMERKUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE}, ResourceType.VERSETZUNG: {Action.READ}, ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Fachlehrer - Traegt Fachnoten ein Role.FACHLEHRER: { ResourceType.NAMESPACE: {Action.READ}, ResourceType.SCHUELER_DATEN: {Action.READ}, # Nur eigene Schueler ResourceType.FACHNOTE: {Action.CREATE, Action.READ, Action.UPDATE}, # Nur eigenes Fach ResourceType.BEMERKUNG: {Action.CREATE, Action.READ}, # Fachbezogene Bemerkungen ResourceType.AUDIT_LOG: {Action.READ}, }, # Zeugnisbeauftragter - Qualitaetskontrolle Role.ZEUGNISBEAUFTRAGTER: { ResourceType.NAMESPACE: {Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE, Action.UPLOAD}, ResourceType.SCHUELER_DATEN: {Action.READ}, ResourceType.FACHNOTE: {Action.READ}, ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE}, ResourceType.FEHLZEITEN: {Action.READ}, ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE}, ResourceType.VERSETZUNG: {Action.READ}, ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Sekretariat - Druck, Versand, Archivierung Role.SEKRETARIAT: { ResourceType.ZEUGNIS: {Action.READ, Action.DOWNLOAD}, ResourceType.ZEUGNIS_VORLAGE: {Action.READ}, ResourceType.SCHUELER_DATEN: {Action.READ}, # Fuer Adressdaten ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Schulleitung - Finale Zeugnis-Freigabe Role.SCHULLEITUNG: { ResourceType.TENANT: {Action.READ}, ResourceType.NAMESPACE: {Action.READ, Action.CREATE}, ResourceType.ZEUGNIS: {Action.READ, Action.SIGN_OFF, Action.LOCK}, ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE}, ResourceType.SCHUELER_DATEN: {Action.READ}, ResourceType.FACHNOTE: {Action.READ}, ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE}, ResourceType.FEHLZEITEN: {Action.READ}, ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE}, ResourceType.KONFERENZ_BESCHLUSS: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF}, ResourceType.VERSETZUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF}, ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, # Stufenleitung - Stufenkoordination (z.B. Oberstufe) Role.STUFENLEITUNG: { ResourceType.NAMESPACE: {Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE}, ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE}, ResourceType.SCHUELER_DATEN: {Action.READ}, ResourceType.FACHNOTE: {Action.READ}, ResourceType.KOPFNOTE: {Action.READ}, ResourceType.FEHLZEITEN: {Action.READ}, ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE}, ResourceType.KONFERENZ_BESCHLUSS: {Action.READ}, ResourceType.VERSETZUNG: {Action.READ, Action.UPDATE}, ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD}, ResourceType.AUDIT_LOG: {Action.READ}, }, } # ============================================= # POLICY ENGINE # ============================================= class PolicyEngine: """ Engine fuer RBAC/ABAC Entscheidungen. Prueft: 1. Basis-Rollenberechtigung (RBAC) 2. Policy-Einschraenkungen (ABAC) 3. Key Share Berechtigungen """ def __init__(self): self.policy_sets: Dict[str, PolicySet] = {} self.role_assignments: Dict[str, List[RoleAssignment]] = {} # user_id -> assignments self.key_shares: Dict[str, List[KeyShare]] = {} # user_id -> shares def register_policy_set(self, policy: PolicySet): """Registriere ein Policy Set.""" self.policy_sets[policy.id] = policy def get_policy_for_context( self, bundesland: str, jahr: int, fach: Optional[str] = None, verfahren: str = "abitur" ) -> Optional[PolicySet]: """Finde das passende Policy Set fuer einen Kontext.""" # Exakte Uebereinstimmung for policy in self.policy_sets.values(): if (policy.bundesland == bundesland and policy.jahr == jahr and policy.verfahren == verfahren): if policy.fach is None or policy.fach == fach: return policy # Fallback: Default Policy for policy in self.policy_sets.values(): if policy.bundesland == "DEFAULT": return policy return None def assign_role( self, user_id: str, role: Role, resource_type: ResourceType, resource_id: str, granted_by: str, tenant_id: Optional[str] = None, namespace_id: Optional[str] = None, valid_to: Optional[datetime] = None ) -> RoleAssignment: """Weise einem User eine Rolle zu.""" assignment = RoleAssignment( id=str(uuid.uuid4()), user_id=user_id, role=role, resource_type=resource_type, resource_id=resource_id, tenant_id=tenant_id, namespace_id=namespace_id, granted_by=granted_by, valid_to=valid_to ) if user_id not in self.role_assignments: self.role_assignments[user_id] = [] self.role_assignments[user_id].append(assignment) return assignment def revoke_role(self, assignment_id: str, revoked_by: str) -> bool: """Widerrufe eine Rollenzuweisung.""" for user_assignments in self.role_assignments.values(): for assignment in user_assignments: if assignment.id == assignment_id: assignment.revoked_at = datetime.now(timezone.utc) return True return False def get_user_roles( self, user_id: str, resource_type: Optional[ResourceType] = None, resource_id: Optional[str] = None ) -> List[Role]: """Hole alle aktiven Rollen eines Users.""" assignments = self.role_assignments.get(user_id, []) roles = [] for assignment in assignments: if not assignment.is_active(): continue if resource_type and assignment.resource_type != resource_type: continue if resource_id and assignment.resource_id != resource_id: continue roles.append(assignment.role) return list(set(roles)) def create_key_share( self, user_id: str, package_id: str, permissions: Set[str], granted_by: str, scope: str = "full", invite_token: Optional[str] = None ) -> KeyShare: """Erstelle einen Key Share.""" share = KeyShare( id=str(uuid.uuid4()), user_id=user_id, package_id=package_id, permissions=permissions, scope=scope, granted_by=granted_by, invite_token=invite_token ) if user_id not in self.key_shares: self.key_shares[user_id] = [] self.key_shares[user_id].append(share) return share def accept_key_share(self, share_id: str, token: str) -> bool: """Akzeptiere einen Key Share via Invite Token.""" for user_shares in self.key_shares.values(): for share in user_shares: if share.id == share_id and share.invite_token == token: share.accepted_at = datetime.now(timezone.utc) return True return False def revoke_key_share(self, share_id: str, revoked_by: str) -> bool: """Widerrufe einen Key Share.""" for user_shares in self.key_shares.values(): for share in user_shares: if share.id == share_id: share.revoked_at = datetime.now(timezone.utc) share.revoked_by = revoked_by return True return False def check_permission( self, user_id: str, action: Action, resource_type: ResourceType, resource_id: str, policy: Optional[PolicySet] = None, package_id: Optional[str] = None ) -> bool: """ Pruefe ob ein User eine Aktion ausfuehren darf. Prueft: 1. Basis-RBAC 2. Policy-Einschraenkungen 3. Key Share (falls package_id angegeben) """ # 1. Hole aktive Rollen roles = self.get_user_roles(user_id, resource_type, resource_id) if not roles: return False # 2. Pruefe Basis-RBAC has_permission = False for role in roles: role_permissions = DEFAULT_PERMISSIONS.get(role, {}) resource_permissions = role_permissions.get(resource_type, set()) if action in resource_permissions: has_permission = True break if not has_permission: return False # 3. Pruefe Policy-Einschraenkungen if policy: # ZK Visibility Mode if Role.ZWEITKORREKTOR in roles: if policy.zk_visibility_mode == ZKVisibilityMode.BLIND: # Blind: ZK darf EK-Outputs nicht sehen if resource_type in [ResourceType.EVALUATION, ResourceType.REPORT, ResourceType.GRADE_DECISION]: if action == Action.READ: # Pruefe ob es EK-Outputs sind (muesste ueber Metadaten geprueft werden) pass # Implementierung abhaengig von Datenmodell elif policy.zk_visibility_mode == ZKVisibilityMode.SEMI: # Semi: ZK sieht Annotationen, aber keine Note if resource_type == ResourceType.GRADE_DECISION and action == Action.READ: return False # 4. Pruefe Key Share (falls Package-basiert) if package_id: user_shares = self.key_shares.get(user_id, []) has_key_share = any( share.package_id == package_id and share.is_active() for share in user_shares ) if not has_key_share: return False return True def get_allowed_actions( self, user_id: str, resource_type: ResourceType, resource_id: str, policy: Optional[PolicySet] = None ) -> Set[Action]: """Hole alle erlaubten Aktionen fuer einen User auf einer Ressource.""" roles = self.get_user_roles(user_id, resource_type, resource_id) allowed = set() for role in roles: role_permissions = DEFAULT_PERMISSIONS.get(role, {}) resource_permissions = role_permissions.get(resource_type, set()) allowed.update(resource_permissions) # Policy-Einschraenkungen anwenden if policy and Role.ZWEITKORREKTOR in roles: if policy.zk_visibility_mode == ZKVisibilityMode.BLIND: # Entferne READ fuer bestimmte Ressourcen pass # Detailimplementierung return allowed # ============================================= # DEFAULT POLICY SETS (alle Bundeslaender) # ============================================= def create_default_policy_sets() -> List[PolicySet]: """ Erstelle Default Policy Sets fuer alle Bundeslaender. Diese koennen spaeter pro Land verfeinert werden. """ bundeslaender = [ "baden-wuerttemberg", "bayern", "berlin", "brandenburg", "bremen", "hamburg", "hessen", "mecklenburg-vorpommern", "niedersachsen", "nordrhein-westfalen", "rheinland-pfalz", "saarland", "sachsen", "sachsen-anhalt", "schleswig-holstein", "thueringen" ] policies = [] # Default Policy (Fallback) policies.append(PolicySet( id="DEFAULT-2025", bundesland="DEFAULT", jahr=2025, fach=None, verfahren="abitur", zk_visibility_mode=ZKVisibilityMode.FULL, eh_visibility_mode=EHVisibilityMode.SHARED, allow_teacher_uploaded_eh=True, allow_land_uploaded_eh=True, require_rights_confirmation_on_upload=True, third_correction_threshold=4, final_signoff_role="fachvorsitz" )) # Niedersachsen (Beispiel mit spezifischen Anpassungen) policies.append(PolicySet( id="NI-2025-ABITUR", bundesland="niedersachsen", jahr=2025, fach=None, verfahren="abitur", zk_visibility_mode=ZKVisibilityMode.FULL, # In NI sieht ZK alles eh_visibility_mode=EHVisibilityMode.SHARED, allow_teacher_uploaded_eh=True, allow_land_uploaded_eh=True, require_rights_confirmation_on_upload=True, third_correction_threshold=4, final_signoff_role="fachvorsitz", export_template_id="niedersachsen-abitur" )) # Bayern (Beispiel mit SEMI visibility) policies.append(PolicySet( id="BY-2025-ABITUR", bundesland="bayern", jahr=2025, fach=None, verfahren="abitur", zk_visibility_mode=ZKVisibilityMode.SEMI, # ZK sieht Annotationen, nicht Note eh_visibility_mode=EHVisibilityMode.SHARED, allow_teacher_uploaded_eh=True, allow_land_uploaded_eh=True, require_rights_confirmation_on_upload=True, third_correction_threshold=4, final_signoff_role="fachvorsitz", export_template_id="bayern-abitur" )) # NRW (Beispiel) policies.append(PolicySet( id="NW-2025-ABITUR", bundesland="nordrhein-westfalen", jahr=2025, fach=None, verfahren="abitur", zk_visibility_mode=ZKVisibilityMode.FULL, eh_visibility_mode=EHVisibilityMode.SHARED, allow_teacher_uploaded_eh=True, allow_land_uploaded_eh=True, require_rights_confirmation_on_upload=True, third_correction_threshold=4, final_signoff_role="fachvorsitz", export_template_id="nrw-abitur" )) # Generiere Basis-Policies fuer alle anderen Bundeslaender for bl in bundeslaender: if bl not in ["niedersachsen", "bayern", "nordrhein-westfalen"]: policies.append(PolicySet( id=f"{bl[:2].upper()}-2025-ABITUR", bundesland=bl, jahr=2025, fach=None, verfahren="abitur", zk_visibility_mode=ZKVisibilityMode.FULL, eh_visibility_mode=EHVisibilityMode.SHARED, allow_teacher_uploaded_eh=True, allow_land_uploaded_eh=True, require_rights_confirmation_on_upload=True, third_correction_threshold=4, final_signoff_role="fachvorsitz" )) return policies # ============================================= # GLOBAL POLICY ENGINE INSTANCE # ============================================= # Singleton Policy Engine _policy_engine: Optional[PolicyEngine] = None def get_policy_engine() -> PolicyEngine: """Hole die globale Policy Engine Instanz.""" global _policy_engine if _policy_engine is None: _policy_engine = PolicyEngine() # Registriere Default Policies for policy in create_default_policy_sets(): _policy_engine.register_policy_set(policy) return _policy_engine # ============================================= # API GUARDS (Decorators fuer FastAPI) # ============================================= from functools import wraps from fastapi import HTTPException, Request def require_permission( action: Action, resource_type: ResourceType, resource_id_param: str = "resource_id" ): """ Decorator fuer FastAPI Endpoints. Prueft ob der aktuelle User die angegebene Berechtigung hat. Usage: @app.get("/api/v1/packages/{package_id}") @require_permission(Action.READ, ResourceType.EXAM_PACKAGE, "package_id") async def get_package(package_id: str, request: Request): ... """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request: for arg in args: if isinstance(arg, Request): request = arg break if not request: raise HTTPException(status_code=500, detail="Request not found") # User aus Token holen user = getattr(request.state, 'user', None) if not user: raise HTTPException(status_code=401, detail="Not authenticated") user_id = user.get('user_id') resource_id = kwargs.get(resource_id_param) # Policy Engine pruefen engine = get_policy_engine() # Optional: Policy aus Kontext laden policy = None bundesland = user.get('bundesland') if bundesland: policy = engine.get_policy_for_context(bundesland, 2025) if not engine.check_permission( user_id=user_id, action=action, resource_type=resource_type, resource_id=resource_id, policy=policy ): raise HTTPException( status_code=403, detail=f"Permission denied: {action.value} on {resource_type.value}" ) return await func(*args, **kwargs) return wrapper return decorator def require_role(role: Role): """ Decorator der prueft ob User eine bestimmte Rolle hat. Usage: @app.post("/api/v1/eh/publish") @require_role(Role.LAND_ADMIN) async def publish_eh(request: Request): ... """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request: for arg in args: if isinstance(arg, Request): request = arg break if not request: raise HTTPException(status_code=500, detail="Request not found") user = getattr(request.state, 'user', None) if not user: raise HTTPException(status_code=401, detail="Not authenticated") user_id = user.get('user_id') engine = get_policy_engine() user_roles = engine.get_user_roles(user_id) if role not in user_roles: raise HTTPException( status_code=403, detail=f"Role required: {role.value}" ) return await func(*args, **kwargs) return wrapper return decorator