Files
breakpilot-lehrer/klausur-service/backend/rbac.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

1133 lines
39 KiB
Python

"""
RBAC/ABAC Policy System for Klausur-Service
Implements:
- Role-Based Access Control (RBAC) with hierarchical roles
- Attribute-Based Access Control (ABAC) via policy sets
- Bundesland-specific policies
- Key sharing for exam packages
"""
import json
from enum import Enum
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict, Set, Any
from datetime import datetime, timezone
import uuid
# =============================================
# ENUMS: Roles, Actions, Resources
# =============================================
class Role(str, Enum):
"""Fachliche Rollen in Korrektur- und Zeugniskette."""
# === Klausur-Korrekturkette ===
ERSTKORREKTOR = "erstkorrektor" # EK
ZWEITKORREKTOR = "zweitkorrektor" # ZK
DRITTKORREKTOR = "drittkorrektor" # DK
# === Zeugnis-Workflow ===
KLASSENLEHRER = "klassenlehrer" # KL - Erstellt Zeugnis, Kopfnoten, Bemerkungen
FACHLEHRER = "fachlehrer" # FL - Traegt Fachnoten ein
ZEUGNISBEAUFTRAGTER = "zeugnisbeauftragter" # ZB - Qualitaetskontrolle
SEKRETARIAT = "sekretariat" # SEK - Druck, Versand, Archivierung
# === Leitung (Klausur + Zeugnis) ===
FACHVORSITZ = "fachvorsitz" # FVL - Fachpruefungsleitung
PRUEFUNGSVORSITZ = "pruefungsvorsitz" # PV - Schulleitung / Pruefungsvorsitz
SCHULLEITUNG = "schulleitung" # SL - Finale Zeugnis-Freigabe
STUFENLEITUNG = "stufenleitung" # STL - Stufenkoordination
# === Administration ===
SCHUL_ADMIN = "schul_admin" # SA
LAND_ADMIN = "land_admin" # LA - Behoerde
# === Spezial ===
AUDITOR = "auditor" # DSB/Auditor
OPERATOR = "operator" # OPS - Support
TEACHER_ASSISTANT = "teacher_assistant" # TA - Referendar
EXAM_AUTHOR = "exam_author" # EA - nur Vorabi
class Action(str, Enum):
"""Moegliche Operationen auf Ressourcen."""
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
ASSIGN_ROLE = "assign_role"
INVITE_USER = "invite_user"
REMOVE_USER = "remove_user"
UPLOAD = "upload"
DOWNLOAD = "download"
LOCK = "lock" # Finalisieren
UNLOCK = "unlock" # Nur mit Sonderrecht
SIGN_OFF = "sign_off" # Freigabe
SHARE_KEY = "share_key" # Key Share erzeugen
VIEW_PII = "view_pii" # Falls PII vorhanden
BREAK_GLASS = "break_glass" # Notfallzugriff
PUBLISH_OFFICIAL = "publish_official" # Amtliche EH verteilen
class ResourceType(str, Enum):
"""Ressourcentypen im System."""
TENANT = "tenant"
NAMESPACE = "namespace"
# === Klausur-Korrektur ===
EXAM_PACKAGE = "exam_package"
STUDENT_WORK = "student_work"
EH_DOCUMENT = "eh_document"
RUBRIC = "rubric" # Punkteraster
ANNOTATION = "annotation"
EVALUATION = "evaluation" # Kriterien/Punkte
REPORT = "report" # Gutachten
GRADE_DECISION = "grade_decision"
# === Zeugnisgenerator ===
ZEUGNIS = "zeugnis" # Zeugnisdokument
ZEUGNIS_VORLAGE = "zeugnis_vorlage" # Zeugnisvorlage/Template
ZEUGNIS_ENTWURF = "zeugnis_entwurf" # Zeugnisentwurf (vor Freigabe)
SCHUELER_DATEN = "schueler_daten" # Schueler-Stammdaten, Noten
FACHNOTE = "fachnote" # Einzelne Fachnote
KOPFNOTE = "kopfnote" # Arbeits-/Sozialverhalten
FEHLZEITEN = "fehlzeiten" # Fehlzeiten
BEMERKUNG = "bemerkung" # Zeugnisbemerkungen
KONFERENZ_BESCHLUSS = "konferenz_beschluss" # Konferenzergebnis
VERSETZUNG = "versetzung" # Versetzungsentscheidung
# === Allgemein ===
DOCUMENT = "document" # Generischer Dokumenttyp (EH, Vorlagen, etc.)
TEMPLATE = "template" # Generische Vorlagen
EXPORT = "export"
AUDIT_LOG = "audit_log"
KEY_MATERIAL = "key_material"
class ZKVisibilityMode(str, Enum):
"""Sichtbarkeitsmodus fuer Zweitkorrektoren."""
BLIND = "blind" # ZK sieht keine EK-Note/Gutachten
SEMI = "semi" # ZK sieht Annotationen, aber keine Note
FULL = "full" # ZK sieht alles
class EHVisibilityMode(str, Enum):
"""Sichtbarkeitsmodus fuer Erwartungshorizonte."""
BLIND = "blind" # ZK sieht EH nicht (selten)
SHARED = "shared" # ZK sieht EH (Standard)
class VerfahrenType(str, Enum):
"""Verfahrenstypen fuer Klausuren und Zeugnisse."""
# === Klausur/Pruefungsverfahren ===
ABITUR = "abitur"
VORABITUR = "vorabitur"
KLAUSUR = "klausur"
NACHPRUEFUNG = "nachpruefung"
# === Zeugnisverfahren ===
HALBJAHRESZEUGNIS = "halbjahreszeugnis"
JAHRESZEUGNIS = "jahreszeugnis"
ABSCHLUSSZEUGNIS = "abschlusszeugnis"
ABGANGSZEUGNIS = "abgangszeugnis"
@classmethod
def is_exam_type(cls, verfahren: str) -> bool:
"""Pruefe ob Verfahren ein Pruefungstyp ist."""
exam_types = {cls.ABITUR, cls.VORABITUR, cls.KLAUSUR, cls.NACHPRUEFUNG}
try:
return cls(verfahren) in exam_types
except ValueError:
return False
@classmethod
def is_certificate_type(cls, verfahren: str) -> bool:
"""Pruefe ob Verfahren ein Zeugnistyp ist."""
cert_types = {cls.HALBJAHRESZEUGNIS, cls.JAHRESZEUGNIS, cls.ABSCHLUSSZEUGNIS, cls.ABGANGSZEUGNIS}
try:
return cls(verfahren) in cert_types
except ValueError:
return False
# =============================================
# DATA STRUCTURES
# =============================================
@dataclass
class PolicySet:
"""
Policy-Konfiguration pro Bundesland/Jahr/Fach.
Ermoeglicht bundesland-spezifische Unterschiede ohne
harte Codierung im Quellcode.
Unterstuetzte Verfahrenstypen:
- Pruefungen: abitur, vorabitur, klausur, nachpruefung
- Zeugnisse: halbjahreszeugnis, jahreszeugnis, abschlusszeugnis, abgangszeugnis
"""
id: str
bundesland: str
jahr: int
fach: Optional[str] # None = gilt fuer alle Faecher
verfahren: str # See VerfahrenType enum
# Sichtbarkeitsregeln (Klausur)
zk_visibility_mode: ZKVisibilityMode = ZKVisibilityMode.FULL
eh_visibility_mode: EHVisibilityMode = EHVisibilityMode.SHARED
# EH-Quellen (Klausur)
allow_teacher_uploaded_eh: bool = True
allow_land_uploaded_eh: bool = True
require_rights_confirmation_on_upload: bool = True
require_dual_control_for_official_eh_update: bool = False
# Korrekturregeln (Klausur)
third_correction_threshold: int = 4 # Notenpunkte Abweichung
final_signoff_role: str = "fachvorsitz"
# Zeugnisregeln (Zeugnis)
require_klassenlehrer_approval: bool = True
require_schulleitung_signoff: bool = True
allow_sekretariat_edit_after_approval: bool = False
konferenz_protokoll_required: bool = True
bemerkungen_require_review: bool = True
fehlzeiten_auto_import: bool = True
kopfnoten_enabled: bool = False
versetzung_auto_calculate: bool = True
# Export & Anzeige
quote_verbatim_allowed: bool = False # Amtliche Texte in UI
export_template_id: str = "default"
# Zusaetzliche Flags
flags: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def is_exam_policy(self) -> bool:
"""Pruefe ob diese Policy fuer Pruefungen ist."""
return VerfahrenType.is_exam_type(self.verfahren)
def is_certificate_policy(self) -> bool:
"""Pruefe ob diese Policy fuer Zeugnisse ist."""
return VerfahrenType.is_certificate_type(self.verfahren)
def to_dict(self):
d = asdict(self)
d['zk_visibility_mode'] = self.zk_visibility_mode.value
d['eh_visibility_mode'] = self.eh_visibility_mode.value
d['created_at'] = self.created_at.isoformat()
return d
@dataclass
class RoleAssignment:
"""
Zuweisung einer Rolle zu einem User fuer eine spezifische Ressource.
"""
id: str
user_id: str
role: Role
resource_type: ResourceType
resource_id: str
# Optionale Einschraenkungen
tenant_id: Optional[str] = None
namespace_id: Optional[str] = None
# Gueltigkeit
valid_from: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
valid_to: Optional[datetime] = None
# Metadaten
granted_by: str = ""
granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
revoked_at: Optional[datetime] = None
def is_active(self) -> bool:
now = datetime.now(timezone.utc)
if self.revoked_at:
return False
if self.valid_to and now > self.valid_to:
return False
return now >= self.valid_from
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'role': self.role.value,
'resource_type': self.resource_type.value,
'resource_id': self.resource_id,
'tenant_id': self.tenant_id,
'namespace_id': self.namespace_id,
'valid_from': self.valid_from.isoformat(),
'valid_to': self.valid_to.isoformat() if self.valid_to else None,
'granted_by': self.granted_by,
'granted_at': self.granted_at.isoformat(),
'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None,
'is_active': self.is_active()
}
@dataclass
class KeyShare:
"""
Berechtigung fuer einen User, auf verschluesselte Inhalte zuzugreifen.
Ein KeyShare ist KEIN Schluessel im Klartext, sondern eine
Berechtigung in Verbindung mit Role Assignment.
"""
id: str
user_id: str
package_id: str
# Berechtigungsumfang
permissions: Set[str] = field(default_factory=set)
# z.B. {"read_original", "read_eh", "read_ek_outputs", "write_annotations"}
# Optionale Einschraenkungen
scope: str = "full" # "full", "original_only", "eh_only", "outputs_only"
# Kette
granted_by: str = ""
granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# Akzeptanz (fuer Invite-Flow)
invite_token: Optional[str] = None
accepted_at: Optional[datetime] = None
# Widerruf
revoked_at: Optional[datetime] = None
revoked_by: Optional[str] = None
def is_active(self) -> bool:
return self.revoked_at is None and (
self.invite_token is None or self.accepted_at is not None
)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'package_id': self.package_id,
'permissions': list(self.permissions),
'scope': self.scope,
'granted_by': self.granted_by,
'granted_at': self.granted_at.isoformat(),
'invite_token': self.invite_token,
'accepted_at': self.accepted_at.isoformat() if self.accepted_at else None,
'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None,
'is_active': self.is_active()
}
@dataclass
class Tenant:
"""
Hoechste Isolationseinheit - typischerweise eine Schule.
"""
id: str
name: str
bundesland: str
tenant_type: str = "school" # "school", "pruefungszentrum", "behoerde"
# Verschluesselung
encryption_enabled: bool = True
# Metadaten
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
deleted_at: Optional[datetime] = None
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'bundesland': self.bundesland,
'tenant_type': self.tenant_type,
'encryption_enabled': self.encryption_enabled,
'created_at': self.created_at.isoformat()
}
@dataclass
class Namespace:
"""
Arbeitsraum innerhalb eines Tenants.
z.B. "Abitur 2026 - Deutsch LK - Kurs 12a"
"""
id: str
tenant_id: str
name: str
# Kontext
jahr: int
fach: str
kurs: Optional[str] = None
pruefungsart: str = "abitur" # "abitur", "vorabitur"
# Policy
policy_set_id: Optional[str] = None
# Metadaten
created_by: str = ""
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
deleted_at: Optional[datetime] = None
def to_dict(self):
return {
'id': self.id,
'tenant_id': self.tenant_id,
'name': self.name,
'jahr': self.jahr,
'fach': self.fach,
'kurs': self.kurs,
'pruefungsart': self.pruefungsart,
'policy_set_id': self.policy_set_id,
'created_by': self.created_by,
'created_at': self.created_at.isoformat()
}
@dataclass
class ExamPackage:
"""
Pruefungspaket - kompletter Satz Arbeiten mit allen Artefakten.
"""
id: str
namespace_id: str
tenant_id: str
name: str
beschreibung: Optional[str] = None
# Workflow-Status
status: str = "draft" # "draft", "in_progress", "locked", "signed_off"
# Beteiligte (Rollen werden separat zugewiesen)
owner_id: str = "" # Typischerweise EK
# Verschluesselung
encryption_key_id: Optional[str] = None
# Timestamps
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
locked_at: Optional[datetime] = None
signed_off_at: Optional[datetime] = None
signed_off_by: Optional[str] = None
def to_dict(self):
return {
'id': self.id,
'namespace_id': self.namespace_id,
'tenant_id': self.tenant_id,
'name': self.name,
'beschreibung': self.beschreibung,
'status': self.status,
'owner_id': self.owner_id,
'created_at': self.created_at.isoformat(),
'locked_at': self.locked_at.isoformat() if self.locked_at else None,
'signed_off_at': self.signed_off_at.isoformat() if self.signed_off_at else None,
'signed_off_by': self.signed_off_by
}
# =============================================
# RBAC PERMISSION MATRIX
# =============================================
# Standard-Berechtigungsmatrix (kann durch Policies ueberschrieben werden)
DEFAULT_PERMISSIONS: Dict[Role, Dict[ResourceType, Set[Action]]] = {
# Erstkorrektor
Role.ERSTKORREKTOR: {
ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.SHARE_KEY, Action.LOCK},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE},
ResourceType.RUBRIC: {Action.READ, Action.UPDATE},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Zweitkorrektor (Standard: FULL visibility)
Role.ZWEITKORREKTOR: {
ResourceType.EXAM_PACKAGE: {Action.READ},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ},
ResourceType.RUBRIC: {Action.READ},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Drittkorrektor
Role.DRITTKORREKTOR: {
ResourceType.EXAM_PACKAGE: {Action.READ},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ},
ResourceType.RUBRIC: {Action.READ},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Fachvorsitz
Role.FACHVORSITZ: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.LOCK, Action.UNLOCK, Action.SIGN_OFF},
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE},
ResourceType.RUBRIC: {Action.READ, Action.UPDATE},
ResourceType.ANNOTATION: {Action.READ, Action.UPDATE},
ResourceType.EVALUATION: {Action.READ, Action.UPDATE},
ResourceType.REPORT: {Action.READ, Action.UPDATE},
ResourceType.GRADE_DECISION: {Action.READ, Action.UPDATE, Action.SIGN_OFF},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Pruefungsvorsitz
Role.PRUEFUNGSVORSITZ: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ, Action.CREATE},
ResourceType.EXAM_PACKAGE: {Action.READ, Action.SIGN_OFF},
ResourceType.STUDENT_WORK: {Action.READ},
ResourceType.EH_DOCUMENT: {Action.READ},
ResourceType.GRADE_DECISION: {Action.READ, Action.SIGN_OFF},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Schul-Admin
Role.SCHUL_ADMIN: {
ResourceType.TENANT: {Action.READ, Action.UPDATE},
ResourceType.NAMESPACE: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.EXAM_PACKAGE: {Action.CREATE, Action.READ, Action.DELETE, Action.ASSIGN_ROLE},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.DELETE},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Land-Admin (Behoerde)
Role.LAND_ADMIN: {
ResourceType.TENANT: {Action.READ},
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE, Action.DELETE, Action.PUBLISH_OFFICIAL},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Auditor
Role.AUDITOR: {
ResourceType.AUDIT_LOG: {Action.READ},
ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten
# Kein Zugriff auf Inhalte!
},
# Operator
Role.OPERATOR: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ},
ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten
ResourceType.AUDIT_LOG: {Action.READ},
# Break-glass separat gehandhabt
},
# Teacher Assistant
Role.TEACHER_ASSISTANT: {
ResourceType.STUDENT_WORK: {Action.READ},
ResourceType.ANNOTATION: {Action.CREATE, Action.READ}, # Nur bestimmte Typen
ResourceType.EH_DOCUMENT: {Action.READ},
},
# Exam Author (nur Vorabi)
Role.EXAM_AUTHOR: {
ResourceType.EH_DOCUMENT: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.RUBRIC: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
},
# =============================================
# ZEUGNIS-WORKFLOW ROLLEN
# =============================================
# Klassenlehrer - Erstellt Zeugnisse, Kopfnoten, Bemerkungen
Role.KLASSENLEHRER: {
ResourceType.NAMESPACE: {Action.READ},
ResourceType.ZEUGNIS: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_ENTWURF: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ},
ResourceType.SCHUELER_DATEN: {Action.READ, Action.UPDATE},
ResourceType.FACHNOTE: {Action.READ}, # Liest Fachnoten der Fachlehrer
ResourceType.KOPFNOTE: {Action.CREATE, Action.READ, Action.UPDATE},
ResourceType.FEHLZEITEN: {Action.READ, Action.UPDATE},
ResourceType.BEMERKUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
ResourceType.VERSETZUNG: {Action.READ},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Fachlehrer - Traegt Fachnoten ein
Role.FACHLEHRER: {
ResourceType.NAMESPACE: {Action.READ},
ResourceType.SCHUELER_DATEN: {Action.READ}, # Nur eigene Schueler
ResourceType.FACHNOTE: {Action.CREATE, Action.READ, Action.UPDATE}, # Nur eigenes Fach
ResourceType.BEMERKUNG: {Action.CREATE, Action.READ}, # Fachbezogene Bemerkungen
ResourceType.AUDIT_LOG: {Action.READ},
},
# Zeugnisbeauftragter - Qualitaetskontrolle
Role.ZEUGNISBEAUFTRAGTER: {
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE, Action.UPLOAD},
ResourceType.SCHUELER_DATEN: {Action.READ},
ResourceType.FACHNOTE: {Action.READ},
ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE},
ResourceType.FEHLZEITEN: {Action.READ},
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
ResourceType.VERSETZUNG: {Action.READ},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Sekretariat - Druck, Versand, Archivierung
Role.SEKRETARIAT: {
ResourceType.ZEUGNIS: {Action.READ, Action.DOWNLOAD},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ},
ResourceType.SCHUELER_DATEN: {Action.READ}, # Fuer Adressdaten
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Schulleitung - Finale Zeugnis-Freigabe
Role.SCHULLEITUNG: {
ResourceType.TENANT: {Action.READ},
ResourceType.NAMESPACE: {Action.READ, Action.CREATE},
ResourceType.ZEUGNIS: {Action.READ, Action.SIGN_OFF, Action.LOCK},
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE},
ResourceType.SCHUELER_DATEN: {Action.READ},
ResourceType.FACHNOTE: {Action.READ},
ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE},
ResourceType.FEHLZEITEN: {Action.READ},
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
ResourceType.KONFERENZ_BESCHLUSS: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF},
ResourceType.VERSETZUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF},
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
# Stufenleitung - Stufenkoordination (z.B. Oberstufe)
Role.STUFENLEITUNG: {
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE},
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
ResourceType.SCHUELER_DATEN: {Action.READ},
ResourceType.FACHNOTE: {Action.READ},
ResourceType.KOPFNOTE: {Action.READ},
ResourceType.FEHLZEITEN: {Action.READ},
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
ResourceType.KONFERENZ_BESCHLUSS: {Action.READ},
ResourceType.VERSETZUNG: {Action.READ, Action.UPDATE},
ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD},
ResourceType.AUDIT_LOG: {Action.READ},
},
}
# =============================================
# POLICY ENGINE
# =============================================
class PolicyEngine:
"""
Engine fuer RBAC/ABAC Entscheidungen.
Prueft:
1. Basis-Rollenberechtigung (RBAC)
2. Policy-Einschraenkungen (ABAC)
3. Key Share Berechtigungen
"""
def __init__(self):
self.policy_sets: Dict[str, PolicySet] = {}
self.role_assignments: Dict[str, List[RoleAssignment]] = {} # user_id -> assignments
self.key_shares: Dict[str, List[KeyShare]] = {} # user_id -> shares
def register_policy_set(self, policy: PolicySet):
"""Registriere ein Policy Set."""
self.policy_sets[policy.id] = policy
def get_policy_for_context(
self,
bundesland: str,
jahr: int,
fach: Optional[str] = None,
verfahren: str = "abitur"
) -> Optional[PolicySet]:
"""Finde das passende Policy Set fuer einen Kontext."""
# Exakte Uebereinstimmung
for policy in self.policy_sets.values():
if (policy.bundesland == bundesland and
policy.jahr == jahr and
policy.verfahren == verfahren):
if policy.fach is None or policy.fach == fach:
return policy
# Fallback: Default Policy
for policy in self.policy_sets.values():
if policy.bundesland == "DEFAULT":
return policy
return None
def assign_role(
self,
user_id: str,
role: Role,
resource_type: ResourceType,
resource_id: str,
granted_by: str,
tenant_id: Optional[str] = None,
namespace_id: Optional[str] = None,
valid_to: Optional[datetime] = None
) -> RoleAssignment:
"""Weise einem User eine Rolle zu."""
assignment = RoleAssignment(
id=str(uuid.uuid4()),
user_id=user_id,
role=role,
resource_type=resource_type,
resource_id=resource_id,
tenant_id=tenant_id,
namespace_id=namespace_id,
granted_by=granted_by,
valid_to=valid_to
)
if user_id not in self.role_assignments:
self.role_assignments[user_id] = []
self.role_assignments[user_id].append(assignment)
return assignment
def revoke_role(self, assignment_id: str, revoked_by: str) -> bool:
"""Widerrufe eine Rollenzuweisung."""
for user_assignments in self.role_assignments.values():
for assignment in user_assignments:
if assignment.id == assignment_id:
assignment.revoked_at = datetime.now(timezone.utc)
return True
return False
def get_user_roles(
self,
user_id: str,
resource_type: Optional[ResourceType] = None,
resource_id: Optional[str] = None
) -> List[Role]:
"""Hole alle aktiven Rollen eines Users."""
assignments = self.role_assignments.get(user_id, [])
roles = []
for assignment in assignments:
if not assignment.is_active():
continue
if resource_type and assignment.resource_type != resource_type:
continue
if resource_id and assignment.resource_id != resource_id:
continue
roles.append(assignment.role)
return list(set(roles))
def create_key_share(
self,
user_id: str,
package_id: str,
permissions: Set[str],
granted_by: str,
scope: str = "full",
invite_token: Optional[str] = None
) -> KeyShare:
"""Erstelle einen Key Share."""
share = KeyShare(
id=str(uuid.uuid4()),
user_id=user_id,
package_id=package_id,
permissions=permissions,
scope=scope,
granted_by=granted_by,
invite_token=invite_token
)
if user_id not in self.key_shares:
self.key_shares[user_id] = []
self.key_shares[user_id].append(share)
return share
def accept_key_share(self, share_id: str, token: str) -> bool:
"""Akzeptiere einen Key Share via Invite Token."""
for user_shares in self.key_shares.values():
for share in user_shares:
if share.id == share_id and share.invite_token == token:
share.accepted_at = datetime.now(timezone.utc)
return True
return False
def revoke_key_share(self, share_id: str, revoked_by: str) -> bool:
"""Widerrufe einen Key Share."""
for user_shares in self.key_shares.values():
for share in user_shares:
if share.id == share_id:
share.revoked_at = datetime.now(timezone.utc)
share.revoked_by = revoked_by
return True
return False
def check_permission(
self,
user_id: str,
action: Action,
resource_type: ResourceType,
resource_id: str,
policy: Optional[PolicySet] = None,
package_id: Optional[str] = None
) -> bool:
"""
Pruefe ob ein User eine Aktion ausfuehren darf.
Prueft:
1. Basis-RBAC
2. Policy-Einschraenkungen
3. Key Share (falls package_id angegeben)
"""
# 1. Hole aktive Rollen
roles = self.get_user_roles(user_id, resource_type, resource_id)
if not roles:
return False
# 2. Pruefe Basis-RBAC
has_permission = False
for role in roles:
role_permissions = DEFAULT_PERMISSIONS.get(role, {})
resource_permissions = role_permissions.get(resource_type, set())
if action in resource_permissions:
has_permission = True
break
if not has_permission:
return False
# 3. Pruefe Policy-Einschraenkungen
if policy:
# ZK Visibility Mode
if Role.ZWEITKORREKTOR in roles:
if policy.zk_visibility_mode == ZKVisibilityMode.BLIND:
# Blind: ZK darf EK-Outputs nicht sehen
if resource_type in [ResourceType.EVALUATION, ResourceType.REPORT, ResourceType.GRADE_DECISION]:
if action == Action.READ:
# Pruefe ob es EK-Outputs sind (muesste ueber Metadaten geprueft werden)
pass # Implementierung abhaengig von Datenmodell
elif policy.zk_visibility_mode == ZKVisibilityMode.SEMI:
# Semi: ZK sieht Annotationen, aber keine Note
if resource_type == ResourceType.GRADE_DECISION and action == Action.READ:
return False
# 4. Pruefe Key Share (falls Package-basiert)
if package_id:
user_shares = self.key_shares.get(user_id, [])
has_key_share = any(
share.package_id == package_id and share.is_active()
for share in user_shares
)
if not has_key_share:
return False
return True
def get_allowed_actions(
self,
user_id: str,
resource_type: ResourceType,
resource_id: str,
policy: Optional[PolicySet] = None
) -> Set[Action]:
"""Hole alle erlaubten Aktionen fuer einen User auf einer Ressource."""
roles = self.get_user_roles(user_id, resource_type, resource_id)
allowed = set()
for role in roles:
role_permissions = DEFAULT_PERMISSIONS.get(role, {})
resource_permissions = role_permissions.get(resource_type, set())
allowed.update(resource_permissions)
# Policy-Einschraenkungen anwenden
if policy and Role.ZWEITKORREKTOR in roles:
if policy.zk_visibility_mode == ZKVisibilityMode.BLIND:
# Entferne READ fuer bestimmte Ressourcen
pass # Detailimplementierung
return allowed
# =============================================
# DEFAULT POLICY SETS (alle Bundeslaender)
# =============================================
def create_default_policy_sets() -> List[PolicySet]:
"""
Erstelle Default Policy Sets fuer alle Bundeslaender.
Diese koennen spaeter pro Land verfeinert werden.
"""
bundeslaender = [
"baden-wuerttemberg", "bayern", "berlin", "brandenburg",
"bremen", "hamburg", "hessen", "mecklenburg-vorpommern",
"niedersachsen", "nordrhein-westfalen", "rheinland-pfalz",
"saarland", "sachsen", "sachsen-anhalt", "schleswig-holstein",
"thueringen"
]
policies = []
# Default Policy (Fallback)
policies.append(PolicySet(
id="DEFAULT-2025",
bundesland="DEFAULT",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
eh_visibility_mode=EHVisibilityMode.SHARED,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz"
))
# Niedersachsen (Beispiel mit spezifischen Anpassungen)
policies.append(PolicySet(
id="NI-2025-ABITUR",
bundesland="niedersachsen",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL, # In NI sieht ZK alles
eh_visibility_mode=EHVisibilityMode.SHARED,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz",
export_template_id="niedersachsen-abitur"
))
# Bayern (Beispiel mit SEMI visibility)
policies.append(PolicySet(
id="BY-2025-ABITUR",
bundesland="bayern",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.SEMI, # ZK sieht Annotationen, nicht Note
eh_visibility_mode=EHVisibilityMode.SHARED,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz",
export_template_id="bayern-abitur"
))
# NRW (Beispiel)
policies.append(PolicySet(
id="NW-2025-ABITUR",
bundesland="nordrhein-westfalen",
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
eh_visibility_mode=EHVisibilityMode.SHARED,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz",
export_template_id="nrw-abitur"
))
# Generiere Basis-Policies fuer alle anderen Bundeslaender
for bl in bundeslaender:
if bl not in ["niedersachsen", "bayern", "nordrhein-westfalen"]:
policies.append(PolicySet(
id=f"{bl[:2].upper()}-2025-ABITUR",
bundesland=bl,
jahr=2025,
fach=None,
verfahren="abitur",
zk_visibility_mode=ZKVisibilityMode.FULL,
eh_visibility_mode=EHVisibilityMode.SHARED,
allow_teacher_uploaded_eh=True,
allow_land_uploaded_eh=True,
require_rights_confirmation_on_upload=True,
third_correction_threshold=4,
final_signoff_role="fachvorsitz"
))
return policies
# =============================================
# GLOBAL POLICY ENGINE INSTANCE
# =============================================
# Singleton Policy Engine
_policy_engine: Optional[PolicyEngine] = None
def get_policy_engine() -> PolicyEngine:
"""Hole die globale Policy Engine Instanz."""
global _policy_engine
if _policy_engine is None:
_policy_engine = PolicyEngine()
# Registriere Default Policies
for policy in create_default_policy_sets():
_policy_engine.register_policy_set(policy)
return _policy_engine
# =============================================
# API GUARDS (Decorators fuer FastAPI)
# =============================================
from functools import wraps
from fastapi import HTTPException, Request
def require_permission(
action: Action,
resource_type: ResourceType,
resource_id_param: str = "resource_id"
):
"""
Decorator fuer FastAPI Endpoints.
Prueft ob der aktuelle User die angegebene Berechtigung hat.
Usage:
@app.get("/api/v1/packages/{package_id}")
@require_permission(Action.READ, ResourceType.EXAM_PACKAGE, "package_id")
async def get_package(package_id: str, request: Request):
...
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request:
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
raise HTTPException(status_code=500, detail="Request not found")
# User aus Token holen
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user.get('user_id')
resource_id = kwargs.get(resource_id_param)
# Policy Engine pruefen
engine = get_policy_engine()
# Optional: Policy aus Kontext laden
policy = None
bundesland = user.get('bundesland')
if bundesland:
policy = engine.get_policy_for_context(bundesland, 2025)
if not engine.check_permission(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
policy=policy
):
raise HTTPException(
status_code=403,
detail=f"Permission denied: {action.value} on {resource_type.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
def require_role(role: Role):
"""
Decorator der prueft ob User eine bestimmte Rolle hat.
Usage:
@app.post("/api/v1/eh/publish")
@require_role(Role.LAND_ADMIN)
async def publish_eh(request: Request):
...
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request:
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
raise HTTPException(status_code=500, detail="Request not found")
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user.get('user_id')
engine = get_policy_engine()
user_roles = engine.get_user_roles(user_id)
if role not in user_roles:
raise HTTPException(
status_code=403,
detail=f"Role required: {role.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator