Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
429 lines
12 KiB
Python
429 lines
12 KiB
Python
"""
|
|
RBAC Middleware for Session-Based Authentication
|
|
|
|
Provides user type checking (Employee vs. Customer) and
|
|
permission-based access control.
|
|
|
|
Employee roles: Teacher-Rollen, Admin-Rollen
|
|
Customer roles: parent, student, user
|
|
|
|
Usage:
|
|
@app.get("/api/protected/employee/grades")
|
|
async def get_grades(session: Session = Depends(require_employee)):
|
|
return {"grades": [...]}
|
|
|
|
@app.get("/api/protected/endpoint")
|
|
async def protected(session: Session = Depends(require_permission("grades:read"))):
|
|
return {"data": [...]}
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Callable
|
|
from functools import wraps
|
|
|
|
from fastapi import HTTPException, Depends
|
|
|
|
from .session_store import Session, UserType
|
|
from .session_middleware import get_current_session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================
|
|
# Permission Constants
|
|
# =============================================
|
|
|
|
EMPLOYEE_PERMISSIONS = [
|
|
# Grades & Attendance
|
|
"grades:read",
|
|
"grades:write",
|
|
"attendance:read",
|
|
"attendance:write",
|
|
# Student Management
|
|
"students:read",
|
|
"students:write",
|
|
# Reports & Consent
|
|
"reports:generate",
|
|
"consent:admin",
|
|
# Corrections
|
|
"corrections:read",
|
|
"corrections:write",
|
|
# Classes
|
|
"classes:read",
|
|
"classes:write",
|
|
# Timetable
|
|
"timetable:read",
|
|
"timetable:write",
|
|
# Substitutions
|
|
"substitutions:read",
|
|
"substitutions:write",
|
|
# Parent Communication
|
|
"parent_communication:read",
|
|
"parent_communication:write",
|
|
]
|
|
|
|
CUSTOMER_PERMISSIONS = [
|
|
# Own Data Access
|
|
"own_data:read",
|
|
"own_grades:read",
|
|
"own_attendance:read",
|
|
# Consent Management
|
|
"consent:manage",
|
|
# Meetings & Communication
|
|
"meetings:join",
|
|
"messages:read",
|
|
"messages:write",
|
|
# Children (for parents)
|
|
"children:read",
|
|
"children:grades:read",
|
|
"children:attendance:read",
|
|
]
|
|
|
|
ADMIN_PERMISSIONS = [
|
|
# User Management
|
|
"users:read",
|
|
"users:write",
|
|
"users:manage",
|
|
# RBAC Management
|
|
"rbac:read",
|
|
"rbac:write",
|
|
# Audit & Logs
|
|
"audit:read",
|
|
# System Settings
|
|
"settings:read",
|
|
"settings:write",
|
|
# DSR Management
|
|
"dsr:read",
|
|
"dsr:process",
|
|
]
|
|
|
|
# Roles that indicate employee user type
|
|
EMPLOYEE_ROLES = {
|
|
"admin",
|
|
"schul_admin",
|
|
"schulleitung",
|
|
"pruefungsvorsitz",
|
|
"klassenlehrer",
|
|
"fachlehrer",
|
|
"sekretariat",
|
|
"erstkorrektor",
|
|
"zweitkorrektor",
|
|
"drittkorrektor",
|
|
"teacher_assistant",
|
|
"teacher",
|
|
"lehrer",
|
|
"data_protection_officer",
|
|
}
|
|
|
|
# Roles that indicate customer user type
|
|
CUSTOMER_ROLES = {
|
|
"parent",
|
|
"student",
|
|
"user",
|
|
"guardian",
|
|
}
|
|
|
|
|
|
# =============================================
|
|
# User Type Dependencies
|
|
# =============================================
|
|
|
|
async def require_employee(session: Session = Depends(get_current_session)) -> Session:
|
|
"""
|
|
Require user to be an employee (internal staff).
|
|
|
|
Usage:
|
|
@app.get("/api/protected/employee/grades")
|
|
async def employee_only(session: Session = Depends(require_employee)):
|
|
return {"grades": [...]}
|
|
"""
|
|
if not session.is_employee():
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Employee access required"
|
|
)
|
|
return session
|
|
|
|
|
|
async def require_customer(session: Session = Depends(get_current_session)) -> Session:
|
|
"""
|
|
Require user to be a customer (external user).
|
|
|
|
Usage:
|
|
@app.get("/api/protected/customer/my-grades")
|
|
async def customer_only(session: Session = Depends(require_customer)):
|
|
return {"my_grades": [...]}
|
|
"""
|
|
if not session.is_customer():
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Customer access required"
|
|
)
|
|
return session
|
|
|
|
|
|
def require_user_type(user_type: UserType):
|
|
"""
|
|
Factory for user type dependency.
|
|
|
|
Usage:
|
|
@app.get("/api/protected/endpoint")
|
|
async def endpoint(session: Session = Depends(require_user_type(UserType.EMPLOYEE))):
|
|
return {"data": [...]}
|
|
"""
|
|
async def user_type_checker(session: Session = Depends(get_current_session)) -> Session:
|
|
if session.user_type != user_type:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"User type '{user_type.value}' required"
|
|
)
|
|
return session
|
|
|
|
return user_type_checker
|
|
|
|
|
|
# =============================================
|
|
# Permission Dependencies
|
|
# =============================================
|
|
|
|
def require_permission(permission: str):
|
|
"""
|
|
Factory for permission-based access control.
|
|
|
|
Usage:
|
|
@app.get("/api/protected/grades")
|
|
async def get_grades(session: Session = Depends(require_permission("grades:read"))):
|
|
return {"grades": [...]}
|
|
"""
|
|
async def permission_checker(session: Session = Depends(get_current_session)) -> Session:
|
|
if not session.has_permission(permission):
|
|
logger.warning(
|
|
f"Permission denied: user {session.user_id} lacks '{permission}'"
|
|
)
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Permission '{permission}' required"
|
|
)
|
|
return session
|
|
|
|
return permission_checker
|
|
|
|
|
|
def require_any_permission(permissions: List[str]):
|
|
"""
|
|
Require user to have at least one of the specified permissions.
|
|
|
|
Usage:
|
|
@app.get("/api/protected/data")
|
|
async def get_data(
|
|
session: Session = Depends(require_any_permission(["data:read", "admin"]))
|
|
):
|
|
return {"data": [...]}
|
|
"""
|
|
async def any_permission_checker(session: Session = Depends(get_current_session)) -> Session:
|
|
if not session.has_any_permission(permissions):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"One of permissions {permissions} required"
|
|
)
|
|
return session
|
|
|
|
return any_permission_checker
|
|
|
|
|
|
def require_all_permissions(permissions: List[str]):
|
|
"""
|
|
Require user to have all specified permissions.
|
|
|
|
Usage:
|
|
@app.get("/api/protected/sensitive")
|
|
async def sensitive(
|
|
session: Session = Depends(require_all_permissions(["grades:read", "grades:write"]))
|
|
):
|
|
return {"data": [...]}
|
|
"""
|
|
async def all_permissions_checker(session: Session = Depends(get_current_session)) -> Session:
|
|
if not session.has_all_permissions(permissions):
|
|
missing = [p for p in permissions if not session.has_permission(p)]
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Missing permissions: {missing}"
|
|
)
|
|
return session
|
|
|
|
return all_permissions_checker
|
|
|
|
|
|
def require_role(role: str):
|
|
"""
|
|
Factory for role-based access control.
|
|
|
|
Usage:
|
|
@app.get("/api/admin/users")
|
|
async def admin_users(session: Session = Depends(require_role("admin"))):
|
|
return {"users": [...]}
|
|
"""
|
|
async def role_checker(session: Session = Depends(get_current_session)) -> Session:
|
|
if not session.has_role(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Role '{role}' required"
|
|
)
|
|
return session
|
|
|
|
return role_checker
|
|
|
|
|
|
def require_any_role(roles: List[str]):
|
|
"""
|
|
Require user to have at least one of the specified roles.
|
|
|
|
Usage:
|
|
@app.get("/api/management/data")
|
|
async def management(
|
|
session: Session = Depends(require_any_role(["admin", "schulleitung"]))
|
|
):
|
|
return {"data": [...]}
|
|
"""
|
|
async def any_role_checker(session: Session = Depends(get_current_session)) -> Session:
|
|
if not any(session.has_role(role) for role in roles):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"One of roles {roles} required"
|
|
)
|
|
return session
|
|
|
|
return any_role_checker
|
|
|
|
|
|
# =============================================
|
|
# Tenant Isolation
|
|
# =============================================
|
|
|
|
def require_same_tenant(tenant_id_param: str = "tenant_id"):
|
|
"""
|
|
Ensure user can only access data within their tenant.
|
|
|
|
Usage:
|
|
@app.get("/api/protected/school/{tenant_id}/data")
|
|
async def school_data(
|
|
tenant_id: str,
|
|
session: Session = Depends(require_same_tenant("tenant_id"))
|
|
):
|
|
return {"data": [...]}
|
|
"""
|
|
async def tenant_checker(
|
|
session: Session = Depends(get_current_session),
|
|
**kwargs
|
|
) -> Session:
|
|
request_tenant = kwargs.get(tenant_id_param)
|
|
if request_tenant and session.tenant_id != request_tenant:
|
|
# Check if user is super admin (can access all tenants)
|
|
if not session.has_role("super_admin"):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Access denied to this tenant"
|
|
)
|
|
return session
|
|
|
|
return tenant_checker
|
|
|
|
|
|
# =============================================
|
|
# Utility Functions
|
|
# =============================================
|
|
|
|
def determine_user_type(roles: List[str]) -> UserType:
|
|
"""
|
|
Determine user type based on roles.
|
|
|
|
Employee roles take precedence over customer roles.
|
|
"""
|
|
role_set = set(roles)
|
|
|
|
# Check for employee roles
|
|
if role_set & EMPLOYEE_ROLES:
|
|
return UserType.EMPLOYEE
|
|
|
|
# Check for customer roles
|
|
if role_set & CUSTOMER_ROLES:
|
|
return UserType.CUSTOMER
|
|
|
|
# Default to customer
|
|
return UserType.CUSTOMER
|
|
|
|
|
|
def get_permissions_for_roles(roles: List[str], user_type: UserType) -> List[str]:
|
|
"""
|
|
Get permissions based on roles and user type.
|
|
|
|
This is a basic implementation - in production, you'd query
|
|
the RBAC database for role-permission mappings.
|
|
"""
|
|
permissions = set()
|
|
|
|
# Base permissions based on user type
|
|
if user_type == UserType.EMPLOYEE:
|
|
permissions.update(EMPLOYEE_PERMISSIONS)
|
|
else:
|
|
permissions.update(CUSTOMER_PERMISSIONS)
|
|
|
|
# Admin permissions
|
|
role_set = set(roles)
|
|
admin_roles = {"admin", "schul_admin", "super_admin", "data_protection_officer"}
|
|
if role_set & admin_roles:
|
|
permissions.update(ADMIN_PERMISSIONS)
|
|
|
|
return list(permissions)
|
|
|
|
|
|
def check_resource_ownership(
|
|
session: Session,
|
|
resource_user_id: str,
|
|
allow_admin: bool = True
|
|
) -> bool:
|
|
"""
|
|
Check if user owns a resource or is admin.
|
|
|
|
Usage:
|
|
if not check_resource_ownership(session, grade.student_id):
|
|
raise HTTPException(403, "Access denied")
|
|
"""
|
|
# User owns the resource
|
|
if session.user_id == resource_user_id:
|
|
return True
|
|
|
|
# Admin can access all
|
|
if allow_admin and (session.has_role("admin") or session.has_role("super_admin")):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def check_parent_child_access(
|
|
session: Session,
|
|
student_id: str,
|
|
parent_student_ids: List[str]
|
|
) -> bool:
|
|
"""
|
|
Check if parent has access to student's data.
|
|
|
|
Usage:
|
|
parent_children = get_parent_children(session.user_id)
|
|
if not check_parent_child_access(session, student_id, parent_children):
|
|
raise HTTPException(403, "Access denied")
|
|
"""
|
|
# User is the student
|
|
if session.user_id == student_id:
|
|
return True
|
|
|
|
# User is parent of student
|
|
if student_id in parent_student_ids:
|
|
return True
|
|
|
|
# Employee can access (with appropriate permissions)
|
|
if session.is_employee() and session.has_permission("students:read"):
|
|
return True
|
|
|
|
return False
|