Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1133 lines
39 KiB
Python
1133 lines
39 KiB
Python
"""
|
|
RBAC/ABAC Policy System for Klausur-Service
|
|
|
|
Implements:
|
|
- Role-Based Access Control (RBAC) with hierarchical roles
|
|
- Attribute-Based Access Control (ABAC) via policy sets
|
|
- Bundesland-specific policies
|
|
- Key sharing for exam packages
|
|
"""
|
|
|
|
import json
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field, asdict
|
|
from typing import Optional, List, Dict, Set, Any
|
|
from datetime import datetime, timezone
|
|
import uuid
|
|
|
|
|
|
# =============================================
|
|
# ENUMS: Roles, Actions, Resources
|
|
# =============================================
|
|
|
|
class Role(str, Enum):
|
|
"""Fachliche Rollen in Korrektur- und Zeugniskette."""
|
|
|
|
# === Klausur-Korrekturkette ===
|
|
ERSTKORREKTOR = "erstkorrektor" # EK
|
|
ZWEITKORREKTOR = "zweitkorrektor" # ZK
|
|
DRITTKORREKTOR = "drittkorrektor" # DK
|
|
|
|
# === Zeugnis-Workflow ===
|
|
KLASSENLEHRER = "klassenlehrer" # KL - Erstellt Zeugnis, Kopfnoten, Bemerkungen
|
|
FACHLEHRER = "fachlehrer" # FL - Traegt Fachnoten ein
|
|
ZEUGNISBEAUFTRAGTER = "zeugnisbeauftragter" # ZB - Qualitaetskontrolle
|
|
SEKRETARIAT = "sekretariat" # SEK - Druck, Versand, Archivierung
|
|
|
|
# === Leitung (Klausur + Zeugnis) ===
|
|
FACHVORSITZ = "fachvorsitz" # FVL - Fachpruefungsleitung
|
|
PRUEFUNGSVORSITZ = "pruefungsvorsitz" # PV - Schulleitung / Pruefungsvorsitz
|
|
SCHULLEITUNG = "schulleitung" # SL - Finale Zeugnis-Freigabe
|
|
STUFENLEITUNG = "stufenleitung" # STL - Stufenkoordination
|
|
|
|
# === Administration ===
|
|
SCHUL_ADMIN = "schul_admin" # SA
|
|
LAND_ADMIN = "land_admin" # LA - Behoerde
|
|
|
|
# === Spezial ===
|
|
AUDITOR = "auditor" # DSB/Auditor
|
|
OPERATOR = "operator" # OPS - Support
|
|
TEACHER_ASSISTANT = "teacher_assistant" # TA - Referendar
|
|
EXAM_AUTHOR = "exam_author" # EA - nur Vorabi
|
|
|
|
|
|
class Action(str, Enum):
|
|
"""Moegliche Operationen auf Ressourcen."""
|
|
CREATE = "create"
|
|
READ = "read"
|
|
UPDATE = "update"
|
|
DELETE = "delete"
|
|
|
|
ASSIGN_ROLE = "assign_role"
|
|
INVITE_USER = "invite_user"
|
|
REMOVE_USER = "remove_user"
|
|
|
|
UPLOAD = "upload"
|
|
DOWNLOAD = "download"
|
|
|
|
LOCK = "lock" # Finalisieren
|
|
UNLOCK = "unlock" # Nur mit Sonderrecht
|
|
SIGN_OFF = "sign_off" # Freigabe
|
|
|
|
SHARE_KEY = "share_key" # Key Share erzeugen
|
|
VIEW_PII = "view_pii" # Falls PII vorhanden
|
|
BREAK_GLASS = "break_glass" # Notfallzugriff
|
|
|
|
PUBLISH_OFFICIAL = "publish_official" # Amtliche EH verteilen
|
|
|
|
|
|
class ResourceType(str, Enum):
|
|
"""Ressourcentypen im System."""
|
|
TENANT = "tenant"
|
|
NAMESPACE = "namespace"
|
|
|
|
# === Klausur-Korrektur ===
|
|
EXAM_PACKAGE = "exam_package"
|
|
STUDENT_WORK = "student_work"
|
|
EH_DOCUMENT = "eh_document"
|
|
RUBRIC = "rubric" # Punkteraster
|
|
ANNOTATION = "annotation"
|
|
EVALUATION = "evaluation" # Kriterien/Punkte
|
|
REPORT = "report" # Gutachten
|
|
GRADE_DECISION = "grade_decision"
|
|
|
|
# === Zeugnisgenerator ===
|
|
ZEUGNIS = "zeugnis" # Zeugnisdokument
|
|
ZEUGNIS_VORLAGE = "zeugnis_vorlage" # Zeugnisvorlage/Template
|
|
ZEUGNIS_ENTWURF = "zeugnis_entwurf" # Zeugnisentwurf (vor Freigabe)
|
|
SCHUELER_DATEN = "schueler_daten" # Schueler-Stammdaten, Noten
|
|
FACHNOTE = "fachnote" # Einzelne Fachnote
|
|
KOPFNOTE = "kopfnote" # Arbeits-/Sozialverhalten
|
|
FEHLZEITEN = "fehlzeiten" # Fehlzeiten
|
|
BEMERKUNG = "bemerkung" # Zeugnisbemerkungen
|
|
KONFERENZ_BESCHLUSS = "konferenz_beschluss" # Konferenzergebnis
|
|
VERSETZUNG = "versetzung" # Versetzungsentscheidung
|
|
|
|
# === Allgemein ===
|
|
DOCUMENT = "document" # Generischer Dokumenttyp (EH, Vorlagen, etc.)
|
|
TEMPLATE = "template" # Generische Vorlagen
|
|
EXPORT = "export"
|
|
AUDIT_LOG = "audit_log"
|
|
KEY_MATERIAL = "key_material"
|
|
|
|
|
|
class ZKVisibilityMode(str, Enum):
|
|
"""Sichtbarkeitsmodus fuer Zweitkorrektoren."""
|
|
BLIND = "blind" # ZK sieht keine EK-Note/Gutachten
|
|
SEMI = "semi" # ZK sieht Annotationen, aber keine Note
|
|
FULL = "full" # ZK sieht alles
|
|
|
|
|
|
class EHVisibilityMode(str, Enum):
|
|
"""Sichtbarkeitsmodus fuer Erwartungshorizonte."""
|
|
BLIND = "blind" # ZK sieht EH nicht (selten)
|
|
SHARED = "shared" # ZK sieht EH (Standard)
|
|
|
|
|
|
class VerfahrenType(str, Enum):
|
|
"""Verfahrenstypen fuer Klausuren und Zeugnisse."""
|
|
|
|
# === Klausur/Pruefungsverfahren ===
|
|
ABITUR = "abitur"
|
|
VORABITUR = "vorabitur"
|
|
KLAUSUR = "klausur"
|
|
NACHPRUEFUNG = "nachpruefung"
|
|
|
|
# === Zeugnisverfahren ===
|
|
HALBJAHRESZEUGNIS = "halbjahreszeugnis"
|
|
JAHRESZEUGNIS = "jahreszeugnis"
|
|
ABSCHLUSSZEUGNIS = "abschlusszeugnis"
|
|
ABGANGSZEUGNIS = "abgangszeugnis"
|
|
|
|
@classmethod
|
|
def is_exam_type(cls, verfahren: str) -> bool:
|
|
"""Pruefe ob Verfahren ein Pruefungstyp ist."""
|
|
exam_types = {cls.ABITUR, cls.VORABITUR, cls.KLAUSUR, cls.NACHPRUEFUNG}
|
|
try:
|
|
return cls(verfahren) in exam_types
|
|
except ValueError:
|
|
return False
|
|
|
|
@classmethod
|
|
def is_certificate_type(cls, verfahren: str) -> bool:
|
|
"""Pruefe ob Verfahren ein Zeugnistyp ist."""
|
|
cert_types = {cls.HALBJAHRESZEUGNIS, cls.JAHRESZEUGNIS, cls.ABSCHLUSSZEUGNIS, cls.ABGANGSZEUGNIS}
|
|
try:
|
|
return cls(verfahren) in cert_types
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
# =============================================
|
|
# DATA STRUCTURES
|
|
# =============================================
|
|
|
|
@dataclass
|
|
class PolicySet:
|
|
"""
|
|
Policy-Konfiguration pro Bundesland/Jahr/Fach.
|
|
|
|
Ermoeglicht bundesland-spezifische Unterschiede ohne
|
|
harte Codierung im Quellcode.
|
|
|
|
Unterstuetzte Verfahrenstypen:
|
|
- Pruefungen: abitur, vorabitur, klausur, nachpruefung
|
|
- Zeugnisse: halbjahreszeugnis, jahreszeugnis, abschlusszeugnis, abgangszeugnis
|
|
"""
|
|
id: str
|
|
bundesland: str
|
|
jahr: int
|
|
fach: Optional[str] # None = gilt fuer alle Faecher
|
|
verfahren: str # See VerfahrenType enum
|
|
|
|
# Sichtbarkeitsregeln (Klausur)
|
|
zk_visibility_mode: ZKVisibilityMode = ZKVisibilityMode.FULL
|
|
eh_visibility_mode: EHVisibilityMode = EHVisibilityMode.SHARED
|
|
|
|
# EH-Quellen (Klausur)
|
|
allow_teacher_uploaded_eh: bool = True
|
|
allow_land_uploaded_eh: bool = True
|
|
require_rights_confirmation_on_upload: bool = True
|
|
require_dual_control_for_official_eh_update: bool = False
|
|
|
|
# Korrekturregeln (Klausur)
|
|
third_correction_threshold: int = 4 # Notenpunkte Abweichung
|
|
final_signoff_role: str = "fachvorsitz"
|
|
|
|
# Zeugnisregeln (Zeugnis)
|
|
require_klassenlehrer_approval: bool = True
|
|
require_schulleitung_signoff: bool = True
|
|
allow_sekretariat_edit_after_approval: bool = False
|
|
konferenz_protokoll_required: bool = True
|
|
bemerkungen_require_review: bool = True
|
|
fehlzeiten_auto_import: bool = True
|
|
kopfnoten_enabled: bool = False
|
|
versetzung_auto_calculate: bool = True
|
|
|
|
# Export & Anzeige
|
|
quote_verbatim_allowed: bool = False # Amtliche Texte in UI
|
|
export_template_id: str = "default"
|
|
|
|
# Zusaetzliche Flags
|
|
flags: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
def is_exam_policy(self) -> bool:
|
|
"""Pruefe ob diese Policy fuer Pruefungen ist."""
|
|
return VerfahrenType.is_exam_type(self.verfahren)
|
|
|
|
def is_certificate_policy(self) -> bool:
|
|
"""Pruefe ob diese Policy fuer Zeugnisse ist."""
|
|
return VerfahrenType.is_certificate_type(self.verfahren)
|
|
|
|
def to_dict(self):
|
|
d = asdict(self)
|
|
d['zk_visibility_mode'] = self.zk_visibility_mode.value
|
|
d['eh_visibility_mode'] = self.eh_visibility_mode.value
|
|
d['created_at'] = self.created_at.isoformat()
|
|
return d
|
|
|
|
|
|
@dataclass
|
|
class RoleAssignment:
|
|
"""
|
|
Zuweisung einer Rolle zu einem User fuer eine spezifische Ressource.
|
|
"""
|
|
id: str
|
|
user_id: str
|
|
role: Role
|
|
resource_type: ResourceType
|
|
resource_id: str
|
|
|
|
# Optionale Einschraenkungen
|
|
tenant_id: Optional[str] = None
|
|
namespace_id: Optional[str] = None
|
|
|
|
# Gueltigkeit
|
|
valid_from: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
valid_to: Optional[datetime] = None
|
|
|
|
# Metadaten
|
|
granted_by: str = ""
|
|
granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
revoked_at: Optional[datetime] = None
|
|
|
|
def is_active(self) -> bool:
|
|
now = datetime.now(timezone.utc)
|
|
if self.revoked_at:
|
|
return False
|
|
if self.valid_to and now > self.valid_to:
|
|
return False
|
|
return now >= self.valid_from
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'user_id': self.user_id,
|
|
'role': self.role.value,
|
|
'resource_type': self.resource_type.value,
|
|
'resource_id': self.resource_id,
|
|
'tenant_id': self.tenant_id,
|
|
'namespace_id': self.namespace_id,
|
|
'valid_from': self.valid_from.isoformat(),
|
|
'valid_to': self.valid_to.isoformat() if self.valid_to else None,
|
|
'granted_by': self.granted_by,
|
|
'granted_at': self.granted_at.isoformat(),
|
|
'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None,
|
|
'is_active': self.is_active()
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class KeyShare:
|
|
"""
|
|
Berechtigung fuer einen User, auf verschluesselte Inhalte zuzugreifen.
|
|
|
|
Ein KeyShare ist KEIN Schluessel im Klartext, sondern eine
|
|
Berechtigung in Verbindung mit Role Assignment.
|
|
"""
|
|
id: str
|
|
user_id: str
|
|
package_id: str
|
|
|
|
# Berechtigungsumfang
|
|
permissions: Set[str] = field(default_factory=set)
|
|
# z.B. {"read_original", "read_eh", "read_ek_outputs", "write_annotations"}
|
|
|
|
# Optionale Einschraenkungen
|
|
scope: str = "full" # "full", "original_only", "eh_only", "outputs_only"
|
|
|
|
# Kette
|
|
granted_by: str = ""
|
|
granted_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
# Akzeptanz (fuer Invite-Flow)
|
|
invite_token: Optional[str] = None
|
|
accepted_at: Optional[datetime] = None
|
|
|
|
# Widerruf
|
|
revoked_at: Optional[datetime] = None
|
|
revoked_by: Optional[str] = None
|
|
|
|
def is_active(self) -> bool:
|
|
return self.revoked_at is None and (
|
|
self.invite_token is None or self.accepted_at is not None
|
|
)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'user_id': self.user_id,
|
|
'package_id': self.package_id,
|
|
'permissions': list(self.permissions),
|
|
'scope': self.scope,
|
|
'granted_by': self.granted_by,
|
|
'granted_at': self.granted_at.isoformat(),
|
|
'invite_token': self.invite_token,
|
|
'accepted_at': self.accepted_at.isoformat() if self.accepted_at else None,
|
|
'revoked_at': self.revoked_at.isoformat() if self.revoked_at else None,
|
|
'is_active': self.is_active()
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Tenant:
|
|
"""
|
|
Hoechste Isolationseinheit - typischerweise eine Schule.
|
|
"""
|
|
id: str
|
|
name: str
|
|
bundesland: str
|
|
tenant_type: str = "school" # "school", "pruefungszentrum", "behoerde"
|
|
|
|
# Verschluesselung
|
|
encryption_enabled: bool = True
|
|
|
|
# Metadaten
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
deleted_at: Optional[datetime] = None
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'bundesland': self.bundesland,
|
|
'tenant_type': self.tenant_type,
|
|
'encryption_enabled': self.encryption_enabled,
|
|
'created_at': self.created_at.isoformat()
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Namespace:
|
|
"""
|
|
Arbeitsraum innerhalb eines Tenants.
|
|
z.B. "Abitur 2026 - Deutsch LK - Kurs 12a"
|
|
"""
|
|
id: str
|
|
tenant_id: str
|
|
name: str
|
|
|
|
# Kontext
|
|
jahr: int
|
|
fach: str
|
|
kurs: Optional[str] = None
|
|
pruefungsart: str = "abitur" # "abitur", "vorabitur"
|
|
|
|
# Policy
|
|
policy_set_id: Optional[str] = None
|
|
|
|
# Metadaten
|
|
created_by: str = ""
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
deleted_at: Optional[datetime] = None
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'tenant_id': self.tenant_id,
|
|
'name': self.name,
|
|
'jahr': self.jahr,
|
|
'fach': self.fach,
|
|
'kurs': self.kurs,
|
|
'pruefungsart': self.pruefungsart,
|
|
'policy_set_id': self.policy_set_id,
|
|
'created_by': self.created_by,
|
|
'created_at': self.created_at.isoformat()
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ExamPackage:
|
|
"""
|
|
Pruefungspaket - kompletter Satz Arbeiten mit allen Artefakten.
|
|
"""
|
|
id: str
|
|
namespace_id: str
|
|
tenant_id: str
|
|
|
|
name: str
|
|
beschreibung: Optional[str] = None
|
|
|
|
# Workflow-Status
|
|
status: str = "draft" # "draft", "in_progress", "locked", "signed_off"
|
|
|
|
# Beteiligte (Rollen werden separat zugewiesen)
|
|
owner_id: str = "" # Typischerweise EK
|
|
|
|
# Verschluesselung
|
|
encryption_key_id: Optional[str] = None
|
|
|
|
# Timestamps
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
locked_at: Optional[datetime] = None
|
|
signed_off_at: Optional[datetime] = None
|
|
signed_off_by: Optional[str] = None
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'namespace_id': self.namespace_id,
|
|
'tenant_id': self.tenant_id,
|
|
'name': self.name,
|
|
'beschreibung': self.beschreibung,
|
|
'status': self.status,
|
|
'owner_id': self.owner_id,
|
|
'created_at': self.created_at.isoformat(),
|
|
'locked_at': self.locked_at.isoformat() if self.locked_at else None,
|
|
'signed_off_at': self.signed_off_at.isoformat() if self.signed_off_at else None,
|
|
'signed_off_by': self.signed_off_by
|
|
}
|
|
|
|
|
|
# =============================================
|
|
# RBAC PERMISSION MATRIX
|
|
# =============================================
|
|
|
|
# Standard-Berechtigungsmatrix (kann durch Policies ueberschrieben werden)
|
|
DEFAULT_PERMISSIONS: Dict[Role, Dict[ResourceType, Set[Action]]] = {
|
|
# Erstkorrektor
|
|
Role.ERSTKORREKTOR: {
|
|
ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.SHARE_KEY, Action.LOCK},
|
|
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
|
|
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE},
|
|
ResourceType.RUBRIC: {Action.READ, Action.UPDATE},
|
|
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
|
|
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Zweitkorrektor (Standard: FULL visibility)
|
|
Role.ZWEITKORREKTOR: {
|
|
ResourceType.EXAM_PACKAGE: {Action.READ},
|
|
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
|
|
ResourceType.EH_DOCUMENT: {Action.READ},
|
|
ResourceType.RUBRIC: {Action.READ},
|
|
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Drittkorrektor
|
|
Role.DRITTKORREKTOR: {
|
|
ResourceType.EXAM_PACKAGE: {Action.READ},
|
|
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
|
|
ResourceType.EH_DOCUMENT: {Action.READ},
|
|
ResourceType.RUBRIC: {Action.READ},
|
|
ResourceType.ANNOTATION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.EVALUATION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.REPORT: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.GRADE_DECISION: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Fachvorsitz
|
|
Role.FACHVORSITZ: {
|
|
ResourceType.TENANT: {Action.READ},
|
|
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
|
|
ResourceType.EXAM_PACKAGE: {Action.READ, Action.UPDATE, Action.LOCK, Action.UNLOCK, Action.SIGN_OFF},
|
|
ResourceType.STUDENT_WORK: {Action.READ, Action.UPDATE},
|
|
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE},
|
|
ResourceType.RUBRIC: {Action.READ, Action.UPDATE},
|
|
ResourceType.ANNOTATION: {Action.READ, Action.UPDATE},
|
|
ResourceType.EVALUATION: {Action.READ, Action.UPDATE},
|
|
ResourceType.REPORT: {Action.READ, Action.UPDATE},
|
|
ResourceType.GRADE_DECISION: {Action.READ, Action.UPDATE, Action.SIGN_OFF},
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Pruefungsvorsitz
|
|
Role.PRUEFUNGSVORSITZ: {
|
|
ResourceType.TENANT: {Action.READ},
|
|
ResourceType.NAMESPACE: {Action.READ, Action.CREATE},
|
|
ResourceType.EXAM_PACKAGE: {Action.READ, Action.SIGN_OFF},
|
|
ResourceType.STUDENT_WORK: {Action.READ},
|
|
ResourceType.EH_DOCUMENT: {Action.READ},
|
|
ResourceType.GRADE_DECISION: {Action.READ, Action.SIGN_OFF},
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Schul-Admin
|
|
Role.SCHUL_ADMIN: {
|
|
ResourceType.TENANT: {Action.READ, Action.UPDATE},
|
|
ResourceType.NAMESPACE: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
|
|
ResourceType.EXAM_PACKAGE: {Action.CREATE, Action.READ, Action.DELETE, Action.ASSIGN_ROLE},
|
|
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.DELETE},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Land-Admin (Behoerde)
|
|
Role.LAND_ADMIN: {
|
|
ResourceType.TENANT: {Action.READ},
|
|
ResourceType.EH_DOCUMENT: {Action.READ, Action.UPLOAD, Action.UPDATE, Action.DELETE, Action.PUBLISH_OFFICIAL},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Auditor
|
|
Role.AUDITOR: {
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten
|
|
# Kein Zugriff auf Inhalte!
|
|
},
|
|
|
|
# Operator
|
|
Role.OPERATOR: {
|
|
ResourceType.TENANT: {Action.READ},
|
|
ResourceType.NAMESPACE: {Action.READ},
|
|
ResourceType.EXAM_PACKAGE: {Action.READ}, # Nur Metadaten
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
# Break-glass separat gehandhabt
|
|
},
|
|
|
|
# Teacher Assistant
|
|
Role.TEACHER_ASSISTANT: {
|
|
ResourceType.STUDENT_WORK: {Action.READ},
|
|
ResourceType.ANNOTATION: {Action.CREATE, Action.READ}, # Nur bestimmte Typen
|
|
ResourceType.EH_DOCUMENT: {Action.READ},
|
|
},
|
|
|
|
# Exam Author (nur Vorabi)
|
|
Role.EXAM_AUTHOR: {
|
|
ResourceType.EH_DOCUMENT: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
|
|
ResourceType.RUBRIC: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
|
|
},
|
|
|
|
# =============================================
|
|
# ZEUGNIS-WORKFLOW ROLLEN
|
|
# =============================================
|
|
|
|
# Klassenlehrer - Erstellt Zeugnisse, Kopfnoten, Bemerkungen
|
|
Role.KLASSENLEHRER: {
|
|
ResourceType.NAMESPACE: {Action.READ},
|
|
ResourceType.ZEUGNIS: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS_ENTWURF: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
|
|
ResourceType.ZEUGNIS_VORLAGE: {Action.READ},
|
|
ResourceType.SCHUELER_DATEN: {Action.READ, Action.UPDATE},
|
|
ResourceType.FACHNOTE: {Action.READ}, # Liest Fachnoten der Fachlehrer
|
|
ResourceType.KOPFNOTE: {Action.CREATE, Action.READ, Action.UPDATE},
|
|
ResourceType.FEHLZEITEN: {Action.READ, Action.UPDATE},
|
|
ResourceType.BEMERKUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.DELETE},
|
|
ResourceType.VERSETZUNG: {Action.READ},
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Fachlehrer - Traegt Fachnoten ein
|
|
Role.FACHLEHRER: {
|
|
ResourceType.NAMESPACE: {Action.READ},
|
|
ResourceType.SCHUELER_DATEN: {Action.READ}, # Nur eigene Schueler
|
|
ResourceType.FACHNOTE: {Action.CREATE, Action.READ, Action.UPDATE}, # Nur eigenes Fach
|
|
ResourceType.BEMERKUNG: {Action.CREATE, Action.READ}, # Fachbezogene Bemerkungen
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Zeugnisbeauftragter - Qualitaetskontrolle
|
|
Role.ZEUGNISBEAUFTRAGTER: {
|
|
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE, Action.UPLOAD},
|
|
ResourceType.SCHUELER_DATEN: {Action.READ},
|
|
ResourceType.FACHNOTE: {Action.READ},
|
|
ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE},
|
|
ResourceType.FEHLZEITEN: {Action.READ},
|
|
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
|
|
ResourceType.VERSETZUNG: {Action.READ},
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Sekretariat - Druck, Versand, Archivierung
|
|
Role.SEKRETARIAT: {
|
|
ResourceType.ZEUGNIS: {Action.READ, Action.DOWNLOAD},
|
|
ResourceType.ZEUGNIS_VORLAGE: {Action.READ},
|
|
ResourceType.SCHUELER_DATEN: {Action.READ}, # Fuer Adressdaten
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Schulleitung - Finale Zeugnis-Freigabe
|
|
Role.SCHULLEITUNG: {
|
|
ResourceType.TENANT: {Action.READ},
|
|
ResourceType.NAMESPACE: {Action.READ, Action.CREATE},
|
|
ResourceType.ZEUGNIS: {Action.READ, Action.SIGN_OFF, Action.LOCK},
|
|
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS_VORLAGE: {Action.READ, Action.UPDATE},
|
|
ResourceType.SCHUELER_DATEN: {Action.READ},
|
|
ResourceType.FACHNOTE: {Action.READ},
|
|
ResourceType.KOPFNOTE: {Action.READ, Action.UPDATE},
|
|
ResourceType.FEHLZEITEN: {Action.READ},
|
|
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
|
|
ResourceType.KONFERENZ_BESCHLUSS: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF},
|
|
ResourceType.VERSETZUNG: {Action.CREATE, Action.READ, Action.UPDATE, Action.SIGN_OFF},
|
|
ResourceType.EXPORT: {Action.CREATE, Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
|
|
# Stufenleitung - Stufenkoordination (z.B. Oberstufe)
|
|
Role.STUFENLEITUNG: {
|
|
ResourceType.NAMESPACE: {Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS: {Action.READ, Action.UPDATE},
|
|
ResourceType.ZEUGNIS_ENTWURF: {Action.READ, Action.UPDATE},
|
|
ResourceType.SCHUELER_DATEN: {Action.READ},
|
|
ResourceType.FACHNOTE: {Action.READ},
|
|
ResourceType.KOPFNOTE: {Action.READ},
|
|
ResourceType.FEHLZEITEN: {Action.READ},
|
|
ResourceType.BEMERKUNG: {Action.READ, Action.UPDATE},
|
|
ResourceType.KONFERENZ_BESCHLUSS: {Action.READ},
|
|
ResourceType.VERSETZUNG: {Action.READ, Action.UPDATE},
|
|
ResourceType.EXPORT: {Action.READ, Action.DOWNLOAD},
|
|
ResourceType.AUDIT_LOG: {Action.READ},
|
|
},
|
|
}
|
|
|
|
|
|
# =============================================
|
|
# POLICY ENGINE
|
|
# =============================================
|
|
|
|
class PolicyEngine:
|
|
"""
|
|
Engine fuer RBAC/ABAC Entscheidungen.
|
|
|
|
Prueft:
|
|
1. Basis-Rollenberechtigung (RBAC)
|
|
2. Policy-Einschraenkungen (ABAC)
|
|
3. Key Share Berechtigungen
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.policy_sets: Dict[str, PolicySet] = {}
|
|
self.role_assignments: Dict[str, List[RoleAssignment]] = {} # user_id -> assignments
|
|
self.key_shares: Dict[str, List[KeyShare]] = {} # user_id -> shares
|
|
|
|
def register_policy_set(self, policy: PolicySet):
|
|
"""Registriere ein Policy Set."""
|
|
self.policy_sets[policy.id] = policy
|
|
|
|
def get_policy_for_context(
|
|
self,
|
|
bundesland: str,
|
|
jahr: int,
|
|
fach: Optional[str] = None,
|
|
verfahren: str = "abitur"
|
|
) -> Optional[PolicySet]:
|
|
"""Finde das passende Policy Set fuer einen Kontext."""
|
|
# Exakte Uebereinstimmung
|
|
for policy in self.policy_sets.values():
|
|
if (policy.bundesland == bundesland and
|
|
policy.jahr == jahr and
|
|
policy.verfahren == verfahren):
|
|
if policy.fach is None or policy.fach == fach:
|
|
return policy
|
|
|
|
# Fallback: Default Policy
|
|
for policy in self.policy_sets.values():
|
|
if policy.bundesland == "DEFAULT":
|
|
return policy
|
|
|
|
return None
|
|
|
|
def assign_role(
|
|
self,
|
|
user_id: str,
|
|
role: Role,
|
|
resource_type: ResourceType,
|
|
resource_id: str,
|
|
granted_by: str,
|
|
tenant_id: Optional[str] = None,
|
|
namespace_id: Optional[str] = None,
|
|
valid_to: Optional[datetime] = None
|
|
) -> RoleAssignment:
|
|
"""Weise einem User eine Rolle zu."""
|
|
assignment = RoleAssignment(
|
|
id=str(uuid.uuid4()),
|
|
user_id=user_id,
|
|
role=role,
|
|
resource_type=resource_type,
|
|
resource_id=resource_id,
|
|
tenant_id=tenant_id,
|
|
namespace_id=namespace_id,
|
|
granted_by=granted_by,
|
|
valid_to=valid_to
|
|
)
|
|
|
|
if user_id not in self.role_assignments:
|
|
self.role_assignments[user_id] = []
|
|
self.role_assignments[user_id].append(assignment)
|
|
|
|
return assignment
|
|
|
|
def revoke_role(self, assignment_id: str, revoked_by: str) -> bool:
|
|
"""Widerrufe eine Rollenzuweisung."""
|
|
for user_assignments in self.role_assignments.values():
|
|
for assignment in user_assignments:
|
|
if assignment.id == assignment_id:
|
|
assignment.revoked_at = datetime.now(timezone.utc)
|
|
return True
|
|
return False
|
|
|
|
def get_user_roles(
|
|
self,
|
|
user_id: str,
|
|
resource_type: Optional[ResourceType] = None,
|
|
resource_id: Optional[str] = None
|
|
) -> List[Role]:
|
|
"""Hole alle aktiven Rollen eines Users."""
|
|
assignments = self.role_assignments.get(user_id, [])
|
|
roles = []
|
|
|
|
for assignment in assignments:
|
|
if not assignment.is_active():
|
|
continue
|
|
if resource_type and assignment.resource_type != resource_type:
|
|
continue
|
|
if resource_id and assignment.resource_id != resource_id:
|
|
continue
|
|
roles.append(assignment.role)
|
|
|
|
return list(set(roles))
|
|
|
|
def create_key_share(
|
|
self,
|
|
user_id: str,
|
|
package_id: str,
|
|
permissions: Set[str],
|
|
granted_by: str,
|
|
scope: str = "full",
|
|
invite_token: Optional[str] = None
|
|
) -> KeyShare:
|
|
"""Erstelle einen Key Share."""
|
|
share = KeyShare(
|
|
id=str(uuid.uuid4()),
|
|
user_id=user_id,
|
|
package_id=package_id,
|
|
permissions=permissions,
|
|
scope=scope,
|
|
granted_by=granted_by,
|
|
invite_token=invite_token
|
|
)
|
|
|
|
if user_id not in self.key_shares:
|
|
self.key_shares[user_id] = []
|
|
self.key_shares[user_id].append(share)
|
|
|
|
return share
|
|
|
|
def accept_key_share(self, share_id: str, token: str) -> bool:
|
|
"""Akzeptiere einen Key Share via Invite Token."""
|
|
for user_shares in self.key_shares.values():
|
|
for share in user_shares:
|
|
if share.id == share_id and share.invite_token == token:
|
|
share.accepted_at = datetime.now(timezone.utc)
|
|
return True
|
|
return False
|
|
|
|
def revoke_key_share(self, share_id: str, revoked_by: str) -> bool:
|
|
"""Widerrufe einen Key Share."""
|
|
for user_shares in self.key_shares.values():
|
|
for share in user_shares:
|
|
if share.id == share_id:
|
|
share.revoked_at = datetime.now(timezone.utc)
|
|
share.revoked_by = revoked_by
|
|
return True
|
|
return False
|
|
|
|
def check_permission(
|
|
self,
|
|
user_id: str,
|
|
action: Action,
|
|
resource_type: ResourceType,
|
|
resource_id: str,
|
|
policy: Optional[PolicySet] = None,
|
|
package_id: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Pruefe ob ein User eine Aktion ausfuehren darf.
|
|
|
|
Prueft:
|
|
1. Basis-RBAC
|
|
2. Policy-Einschraenkungen
|
|
3. Key Share (falls package_id angegeben)
|
|
"""
|
|
# 1. Hole aktive Rollen
|
|
roles = self.get_user_roles(user_id, resource_type, resource_id)
|
|
|
|
if not roles:
|
|
return False
|
|
|
|
# 2. Pruefe Basis-RBAC
|
|
has_permission = False
|
|
for role in roles:
|
|
role_permissions = DEFAULT_PERMISSIONS.get(role, {})
|
|
resource_permissions = role_permissions.get(resource_type, set())
|
|
if action in resource_permissions:
|
|
has_permission = True
|
|
break
|
|
|
|
if not has_permission:
|
|
return False
|
|
|
|
# 3. Pruefe Policy-Einschraenkungen
|
|
if policy:
|
|
# ZK Visibility Mode
|
|
if Role.ZWEITKORREKTOR in roles:
|
|
if policy.zk_visibility_mode == ZKVisibilityMode.BLIND:
|
|
# Blind: ZK darf EK-Outputs nicht sehen
|
|
if resource_type in [ResourceType.EVALUATION, ResourceType.REPORT, ResourceType.GRADE_DECISION]:
|
|
if action == Action.READ:
|
|
# Pruefe ob es EK-Outputs sind (muesste ueber Metadaten geprueft werden)
|
|
pass # Implementierung abhaengig von Datenmodell
|
|
|
|
elif policy.zk_visibility_mode == ZKVisibilityMode.SEMI:
|
|
# Semi: ZK sieht Annotationen, aber keine Note
|
|
if resource_type == ResourceType.GRADE_DECISION and action == Action.READ:
|
|
return False
|
|
|
|
# 4. Pruefe Key Share (falls Package-basiert)
|
|
if package_id:
|
|
user_shares = self.key_shares.get(user_id, [])
|
|
has_key_share = any(
|
|
share.package_id == package_id and share.is_active()
|
|
for share in user_shares
|
|
)
|
|
if not has_key_share:
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_allowed_actions(
|
|
self,
|
|
user_id: str,
|
|
resource_type: ResourceType,
|
|
resource_id: str,
|
|
policy: Optional[PolicySet] = None
|
|
) -> Set[Action]:
|
|
"""Hole alle erlaubten Aktionen fuer einen User auf einer Ressource."""
|
|
roles = self.get_user_roles(user_id, resource_type, resource_id)
|
|
allowed = set()
|
|
|
|
for role in roles:
|
|
role_permissions = DEFAULT_PERMISSIONS.get(role, {})
|
|
resource_permissions = role_permissions.get(resource_type, set())
|
|
allowed.update(resource_permissions)
|
|
|
|
# Policy-Einschraenkungen anwenden
|
|
if policy and Role.ZWEITKORREKTOR in roles:
|
|
if policy.zk_visibility_mode == ZKVisibilityMode.BLIND:
|
|
# Entferne READ fuer bestimmte Ressourcen
|
|
pass # Detailimplementierung
|
|
|
|
return allowed
|
|
|
|
|
|
# =============================================
|
|
# DEFAULT POLICY SETS (alle Bundeslaender)
|
|
# =============================================
|
|
|
|
def create_default_policy_sets() -> List[PolicySet]:
|
|
"""
|
|
Erstelle Default Policy Sets fuer alle Bundeslaender.
|
|
|
|
Diese koennen spaeter pro Land verfeinert werden.
|
|
"""
|
|
bundeslaender = [
|
|
"baden-wuerttemberg", "bayern", "berlin", "brandenburg",
|
|
"bremen", "hamburg", "hessen", "mecklenburg-vorpommern",
|
|
"niedersachsen", "nordrhein-westfalen", "rheinland-pfalz",
|
|
"saarland", "sachsen", "sachsen-anhalt", "schleswig-holstein",
|
|
"thueringen"
|
|
]
|
|
|
|
policies = []
|
|
|
|
# Default Policy (Fallback)
|
|
policies.append(PolicySet(
|
|
id="DEFAULT-2025",
|
|
bundesland="DEFAULT",
|
|
jahr=2025,
|
|
fach=None,
|
|
verfahren="abitur",
|
|
zk_visibility_mode=ZKVisibilityMode.FULL,
|
|
eh_visibility_mode=EHVisibilityMode.SHARED,
|
|
allow_teacher_uploaded_eh=True,
|
|
allow_land_uploaded_eh=True,
|
|
require_rights_confirmation_on_upload=True,
|
|
third_correction_threshold=4,
|
|
final_signoff_role="fachvorsitz"
|
|
))
|
|
|
|
# Niedersachsen (Beispiel mit spezifischen Anpassungen)
|
|
policies.append(PolicySet(
|
|
id="NI-2025-ABITUR",
|
|
bundesland="niedersachsen",
|
|
jahr=2025,
|
|
fach=None,
|
|
verfahren="abitur",
|
|
zk_visibility_mode=ZKVisibilityMode.FULL, # In NI sieht ZK alles
|
|
eh_visibility_mode=EHVisibilityMode.SHARED,
|
|
allow_teacher_uploaded_eh=True,
|
|
allow_land_uploaded_eh=True,
|
|
require_rights_confirmation_on_upload=True,
|
|
third_correction_threshold=4,
|
|
final_signoff_role="fachvorsitz",
|
|
export_template_id="niedersachsen-abitur"
|
|
))
|
|
|
|
# Bayern (Beispiel mit SEMI visibility)
|
|
policies.append(PolicySet(
|
|
id="BY-2025-ABITUR",
|
|
bundesland="bayern",
|
|
jahr=2025,
|
|
fach=None,
|
|
verfahren="abitur",
|
|
zk_visibility_mode=ZKVisibilityMode.SEMI, # ZK sieht Annotationen, nicht Note
|
|
eh_visibility_mode=EHVisibilityMode.SHARED,
|
|
allow_teacher_uploaded_eh=True,
|
|
allow_land_uploaded_eh=True,
|
|
require_rights_confirmation_on_upload=True,
|
|
third_correction_threshold=4,
|
|
final_signoff_role="fachvorsitz",
|
|
export_template_id="bayern-abitur"
|
|
))
|
|
|
|
# NRW (Beispiel)
|
|
policies.append(PolicySet(
|
|
id="NW-2025-ABITUR",
|
|
bundesland="nordrhein-westfalen",
|
|
jahr=2025,
|
|
fach=None,
|
|
verfahren="abitur",
|
|
zk_visibility_mode=ZKVisibilityMode.FULL,
|
|
eh_visibility_mode=EHVisibilityMode.SHARED,
|
|
allow_teacher_uploaded_eh=True,
|
|
allow_land_uploaded_eh=True,
|
|
require_rights_confirmation_on_upload=True,
|
|
third_correction_threshold=4,
|
|
final_signoff_role="fachvorsitz",
|
|
export_template_id="nrw-abitur"
|
|
))
|
|
|
|
# Generiere Basis-Policies fuer alle anderen Bundeslaender
|
|
for bl in bundeslaender:
|
|
if bl not in ["niedersachsen", "bayern", "nordrhein-westfalen"]:
|
|
policies.append(PolicySet(
|
|
id=f"{bl[:2].upper()}-2025-ABITUR",
|
|
bundesland=bl,
|
|
jahr=2025,
|
|
fach=None,
|
|
verfahren="abitur",
|
|
zk_visibility_mode=ZKVisibilityMode.FULL,
|
|
eh_visibility_mode=EHVisibilityMode.SHARED,
|
|
allow_teacher_uploaded_eh=True,
|
|
allow_land_uploaded_eh=True,
|
|
require_rights_confirmation_on_upload=True,
|
|
third_correction_threshold=4,
|
|
final_signoff_role="fachvorsitz"
|
|
))
|
|
|
|
return policies
|
|
|
|
|
|
# =============================================
|
|
# GLOBAL POLICY ENGINE INSTANCE
|
|
# =============================================
|
|
|
|
# Singleton Policy Engine
|
|
_policy_engine: Optional[PolicyEngine] = None
|
|
|
|
|
|
def get_policy_engine() -> PolicyEngine:
|
|
"""Hole die globale Policy Engine Instanz."""
|
|
global _policy_engine
|
|
if _policy_engine is None:
|
|
_policy_engine = PolicyEngine()
|
|
# Registriere Default Policies
|
|
for policy in create_default_policy_sets():
|
|
_policy_engine.register_policy_set(policy)
|
|
return _policy_engine
|
|
|
|
|
|
# =============================================
|
|
# API GUARDS (Decorators fuer FastAPI)
|
|
# =============================================
|
|
|
|
from functools import wraps
|
|
from fastapi import HTTPException, Request
|
|
|
|
|
|
def require_permission(
|
|
action: Action,
|
|
resource_type: ResourceType,
|
|
resource_id_param: str = "resource_id"
|
|
):
|
|
"""
|
|
Decorator fuer FastAPI Endpoints.
|
|
|
|
Prueft ob der aktuelle User die angegebene Berechtigung hat.
|
|
|
|
Usage:
|
|
@app.get("/api/v1/packages/{package_id}")
|
|
@require_permission(Action.READ, ResourceType.EXAM_PACKAGE, "package_id")
|
|
async def get_package(package_id: str, request: Request):
|
|
...
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
request = kwargs.get('request')
|
|
if not request:
|
|
for arg in args:
|
|
if isinstance(arg, Request):
|
|
request = arg
|
|
break
|
|
|
|
if not request:
|
|
raise HTTPException(status_code=500, detail="Request not found")
|
|
|
|
# User aus Token holen
|
|
user = getattr(request.state, 'user', None)
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
user_id = user.get('user_id')
|
|
resource_id = kwargs.get(resource_id_param)
|
|
|
|
# Policy Engine pruefen
|
|
engine = get_policy_engine()
|
|
|
|
# Optional: Policy aus Kontext laden
|
|
policy = None
|
|
bundesland = user.get('bundesland')
|
|
if bundesland:
|
|
policy = engine.get_policy_for_context(bundesland, 2025)
|
|
|
|
if not engine.check_permission(
|
|
user_id=user_id,
|
|
action=action,
|
|
resource_type=resource_type,
|
|
resource_id=resource_id,
|
|
policy=policy
|
|
):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Permission denied: {action.value} on {resource_type.value}"
|
|
)
|
|
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_role(role: Role):
|
|
"""
|
|
Decorator der prueft ob User eine bestimmte Rolle hat.
|
|
|
|
Usage:
|
|
@app.post("/api/v1/eh/publish")
|
|
@require_role(Role.LAND_ADMIN)
|
|
async def publish_eh(request: Request):
|
|
...
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
request = kwargs.get('request')
|
|
if not request:
|
|
for arg in args:
|
|
if isinstance(arg, Request):
|
|
request = arg
|
|
break
|
|
|
|
if not request:
|
|
raise HTTPException(status_code=500, detail="Request not found")
|
|
|
|
user = getattr(request.state, 'user', None)
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
user_id = user.get('user_id')
|
|
engine = get_policy_engine()
|
|
|
|
user_roles = engine.get_user_roles(user_id)
|
|
if role not in user_roles:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Role required: {role.value}"
|
|
)
|
|
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
return decorator
|