This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/session/rbac_middleware.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

429 lines
12 KiB
Python

"""
RBAC Middleware for Session-Based Authentication
Provides user type checking (Employee vs. Customer) and
permission-based access control.
Employee roles: Teacher-Rollen, Admin-Rollen
Customer roles: parent, student, user
Usage:
@app.get("/api/protected/employee/grades")
async def get_grades(session: Session = Depends(require_employee)):
return {"grades": [...]}
@app.get("/api/protected/endpoint")
async def protected(session: Session = Depends(require_permission("grades:read"))):
return {"data": [...]}
"""
import logging
from typing import List, Callable
from functools import wraps
from fastapi import HTTPException, Depends
from .session_store import Session, UserType
from .session_middleware import get_current_session
logger = logging.getLogger(__name__)
# =============================================
# Permission Constants
# =============================================
EMPLOYEE_PERMISSIONS = [
# Grades & Attendance
"grades:read",
"grades:write",
"attendance:read",
"attendance:write",
# Student Management
"students:read",
"students:write",
# Reports & Consent
"reports:generate",
"consent:admin",
# Corrections
"corrections:read",
"corrections:write",
# Classes
"classes:read",
"classes:write",
# Timetable
"timetable:read",
"timetable:write",
# Substitutions
"substitutions:read",
"substitutions:write",
# Parent Communication
"parent_communication:read",
"parent_communication:write",
]
CUSTOMER_PERMISSIONS = [
# Own Data Access
"own_data:read",
"own_grades:read",
"own_attendance:read",
# Consent Management
"consent:manage",
# Meetings & Communication
"meetings:join",
"messages:read",
"messages:write",
# Children (for parents)
"children:read",
"children:grades:read",
"children:attendance:read",
]
ADMIN_PERMISSIONS = [
# User Management
"users:read",
"users:write",
"users:manage",
# RBAC Management
"rbac:read",
"rbac:write",
# Audit & Logs
"audit:read",
# System Settings
"settings:read",
"settings:write",
# DSR Management
"dsr:read",
"dsr:process",
]
# Roles that indicate employee user type
EMPLOYEE_ROLES = {
"admin",
"schul_admin",
"schulleitung",
"pruefungsvorsitz",
"klassenlehrer",
"fachlehrer",
"sekretariat",
"erstkorrektor",
"zweitkorrektor",
"drittkorrektor",
"teacher_assistant",
"teacher",
"lehrer",
"data_protection_officer",
}
# Roles that indicate customer user type
CUSTOMER_ROLES = {
"parent",
"student",
"user",
"guardian",
}
# =============================================
# User Type Dependencies
# =============================================
async def require_employee(session: Session = Depends(get_current_session)) -> Session:
"""
Require user to be an employee (internal staff).
Usage:
@app.get("/api/protected/employee/grades")
async def employee_only(session: Session = Depends(require_employee)):
return {"grades": [...]}
"""
if not session.is_employee():
raise HTTPException(
status_code=403,
detail="Employee access required"
)
return session
async def require_customer(session: Session = Depends(get_current_session)) -> Session:
"""
Require user to be a customer (external user).
Usage:
@app.get("/api/protected/customer/my-grades")
async def customer_only(session: Session = Depends(require_customer)):
return {"my_grades": [...]}
"""
if not session.is_customer():
raise HTTPException(
status_code=403,
detail="Customer access required"
)
return session
def require_user_type(user_type: UserType):
"""
Factory for user type dependency.
Usage:
@app.get("/api/protected/endpoint")
async def endpoint(session: Session = Depends(require_user_type(UserType.EMPLOYEE))):
return {"data": [...]}
"""
async def user_type_checker(session: Session = Depends(get_current_session)) -> Session:
if session.user_type != user_type:
raise HTTPException(
status_code=403,
detail=f"User type '{user_type.value}' required"
)
return session
return user_type_checker
# =============================================
# Permission Dependencies
# =============================================
def require_permission(permission: str):
"""
Factory for permission-based access control.
Usage:
@app.get("/api/protected/grades")
async def get_grades(session: Session = Depends(require_permission("grades:read"))):
return {"grades": [...]}
"""
async def permission_checker(session: Session = Depends(get_current_session)) -> Session:
if not session.has_permission(permission):
logger.warning(
f"Permission denied: user {session.user_id} lacks '{permission}'"
)
raise HTTPException(
status_code=403,
detail=f"Permission '{permission}' required"
)
return session
return permission_checker
def require_any_permission(permissions: List[str]):
"""
Require user to have at least one of the specified permissions.
Usage:
@app.get("/api/protected/data")
async def get_data(
session: Session = Depends(require_any_permission(["data:read", "admin"]))
):
return {"data": [...]}
"""
async def any_permission_checker(session: Session = Depends(get_current_session)) -> Session:
if not session.has_any_permission(permissions):
raise HTTPException(
status_code=403,
detail=f"One of permissions {permissions} required"
)
return session
return any_permission_checker
def require_all_permissions(permissions: List[str]):
"""
Require user to have all specified permissions.
Usage:
@app.get("/api/protected/sensitive")
async def sensitive(
session: Session = Depends(require_all_permissions(["grades:read", "grades:write"]))
):
return {"data": [...]}
"""
async def all_permissions_checker(session: Session = Depends(get_current_session)) -> Session:
if not session.has_all_permissions(permissions):
missing = [p for p in permissions if not session.has_permission(p)]
raise HTTPException(
status_code=403,
detail=f"Missing permissions: {missing}"
)
return session
return all_permissions_checker
def require_role(role: str):
"""
Factory for role-based access control.
Usage:
@app.get("/api/admin/users")
async def admin_users(session: Session = Depends(require_role("admin"))):
return {"users": [...]}
"""
async def role_checker(session: Session = Depends(get_current_session)) -> Session:
if not session.has_role(role):
raise HTTPException(
status_code=403,
detail=f"Role '{role}' required"
)
return session
return role_checker
def require_any_role(roles: List[str]):
"""
Require user to have at least one of the specified roles.
Usage:
@app.get("/api/management/data")
async def management(
session: Session = Depends(require_any_role(["admin", "schulleitung"]))
):
return {"data": [...]}
"""
async def any_role_checker(session: Session = Depends(get_current_session)) -> Session:
if not any(session.has_role(role) for role in roles):
raise HTTPException(
status_code=403,
detail=f"One of roles {roles} required"
)
return session
return any_role_checker
# =============================================
# Tenant Isolation
# =============================================
def require_same_tenant(tenant_id_param: str = "tenant_id"):
"""
Ensure user can only access data within their tenant.
Usage:
@app.get("/api/protected/school/{tenant_id}/data")
async def school_data(
tenant_id: str,
session: Session = Depends(require_same_tenant("tenant_id"))
):
return {"data": [...]}
"""
async def tenant_checker(
session: Session = Depends(get_current_session),
**kwargs
) -> Session:
request_tenant = kwargs.get(tenant_id_param)
if request_tenant and session.tenant_id != request_tenant:
# Check if user is super admin (can access all tenants)
if not session.has_role("super_admin"):
raise HTTPException(
status_code=403,
detail="Access denied to this tenant"
)
return session
return tenant_checker
# =============================================
# Utility Functions
# =============================================
def determine_user_type(roles: List[str]) -> UserType:
"""
Determine user type based on roles.
Employee roles take precedence over customer roles.
"""
role_set = set(roles)
# Check for employee roles
if role_set & EMPLOYEE_ROLES:
return UserType.EMPLOYEE
# Check for customer roles
if role_set & CUSTOMER_ROLES:
return UserType.CUSTOMER
# Default to customer
return UserType.CUSTOMER
def get_permissions_for_roles(roles: List[str], user_type: UserType) -> List[str]:
"""
Get permissions based on roles and user type.
This is a basic implementation - in production, you'd query
the RBAC database for role-permission mappings.
"""
permissions = set()
# Base permissions based on user type
if user_type == UserType.EMPLOYEE:
permissions.update(EMPLOYEE_PERMISSIONS)
else:
permissions.update(CUSTOMER_PERMISSIONS)
# Admin permissions
role_set = set(roles)
admin_roles = {"admin", "schul_admin", "super_admin", "data_protection_officer"}
if role_set & admin_roles:
permissions.update(ADMIN_PERMISSIONS)
return list(permissions)
def check_resource_ownership(
session: Session,
resource_user_id: str,
allow_admin: bool = True
) -> bool:
"""
Check if user owns a resource or is admin.
Usage:
if not check_resource_ownership(session, grade.student_id):
raise HTTPException(403, "Access denied")
"""
# User owns the resource
if session.user_id == resource_user_id:
return True
# Admin can access all
if allow_admin and (session.has_role("admin") or session.has_role("super_admin")):
return True
return False
def check_parent_child_access(
session: Session,
student_id: str,
parent_student_ids: List[str]
) -> bool:
"""
Check if parent has access to student's data.
Usage:
parent_children = get_parent_children(session.user_id)
if not check_parent_child_access(session, student_id, parent_children):
raise HTTPException(403, "Access denied")
"""
# User is the student
if session.user_id == student_id:
return True
# User is parent of student
if student_id in parent_student_ids:
return True
# Employee can access (with appropriate permissions)
if session.is_employee() and session.has_permission("students:read"):
return True
return False