This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/tests/test_session_middleware.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

614 lines
21 KiB
Python

"""
Tests for Session Middleware
Tests the hybrid Valkey + PostgreSQL session storage and RBAC middleware.
Usage:
cd backend && pytest tests/test_session_middleware.py -v
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime, timezone, timedelta
from session.session_store import Session, SessionStore, UserType
from session.rbac_middleware import (
determine_user_type,
get_permissions_for_roles,
EMPLOYEE_PERMISSIONS,
CUSTOMER_PERMISSIONS,
ADMIN_PERMISSIONS,
EMPLOYEE_ROLES,
CUSTOMER_ROLES,
)
class TestSession:
"""Test Session dataclass."""
def test_session_creation(self):
"""Test creating a session."""
session = Session(
session_id="test-session-id",
user_id="test-user-id",
email="test@example.com",
user_type=UserType.EMPLOYEE,
roles=["teacher", "klassenlehrer"],
permissions=["grades:read", "grades:write"],
)
assert session.session_id == "test-session-id"
assert session.user_id == "test-user-id"
assert session.email == "test@example.com"
assert session.user_type == UserType.EMPLOYEE
assert "teacher" in session.roles
assert "grades:read" in session.permissions
def test_session_to_dict(self):
"""Test converting session to dictionary."""
session = Session(
session_id="test-session-id",
user_id="test-user-id",
email="test@example.com",
user_type=UserType.CUSTOMER,
roles=["parent"],
permissions=["children:read"],
)
data = session.to_dict()
assert data["session_id"] == "test-session-id"
assert data["user_type"] == "customer"
assert data["roles"] == ["parent"]
def test_session_from_dict(self):
"""Test creating session from dictionary."""
data = {
"session_id": "test-session-id",
"user_id": "test-user-id",
"email": "test@example.com",
"user_type": "employee",
"roles": ["admin"],
"permissions": ["users:manage"],
"created_at": "2024-01-01T00:00:00+00:00",
}
session = Session.from_dict(data)
assert session.session_id == "test-session-id"
assert session.user_type == UserType.EMPLOYEE
assert session.roles == ["admin"]
def test_has_permission(self):
"""Test permission checking."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
permissions=["grades:read", "grades:write", "attendance:read"],
)
assert session.has_permission("grades:read") is True
assert session.has_permission("users:manage") is False
def test_has_any_permission(self):
"""Test any permission checking."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
permissions=["grades:read"],
)
assert session.has_any_permission(["grades:read", "grades:write"]) is True
assert session.has_any_permission(["users:manage", "audit:read"]) is False
def test_has_all_permissions(self):
"""Test all permissions checking."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
permissions=["grades:read", "grades:write", "attendance:read"],
)
assert session.has_all_permissions(["grades:read", "grades:write"]) is True
assert session.has_all_permissions(["grades:read", "users:manage"]) is False
def test_has_role(self):
"""Test role checking."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
roles=["teacher", "klassenlehrer"],
)
assert session.has_role("teacher") is True
assert session.has_role("admin") is False
def test_is_employee(self):
"""Test employee check."""
employee_session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
)
customer_session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.CUSTOMER,
)
assert employee_session.is_employee() is True
assert employee_session.is_customer() is False
assert customer_session.is_customer() is True
assert customer_session.is_employee() is False
class TestUserType:
"""Test user type determination."""
def test_determine_employee_type(self):
"""Test determining employee user type from roles."""
assert determine_user_type(["teacher"]) == UserType.EMPLOYEE
assert determine_user_type(["admin"]) == UserType.EMPLOYEE
assert determine_user_type(["klassenlehrer"]) == UserType.EMPLOYEE
assert determine_user_type(["schul_admin"]) == UserType.EMPLOYEE
def test_determine_customer_type(self):
"""Test determining customer user type from roles."""
assert determine_user_type(["parent"]) == UserType.CUSTOMER
assert determine_user_type(["student"]) == UserType.CUSTOMER
assert determine_user_type(["user"]) == UserType.CUSTOMER
def test_employee_takes_precedence(self):
"""Test that employee roles take precedence."""
# User has both employee and customer roles
assert determine_user_type(["teacher", "parent"]) == UserType.EMPLOYEE
def test_unknown_role_defaults_to_customer(self):
"""Test that unknown roles default to customer."""
assert determine_user_type(["unknown_role"]) == UserType.CUSTOMER
assert determine_user_type([]) == UserType.CUSTOMER
class TestPermissions:
"""Test permission assignment."""
def test_employee_permissions(self):
"""Test that employees get employee permissions."""
permissions = get_permissions_for_roles(["teacher"], UserType.EMPLOYEE)
assert "grades:read" in permissions
assert "grades:write" in permissions
assert "attendance:read" in permissions
# Should not have customer-only permissions
assert "children:read" not in permissions
def test_customer_permissions(self):
"""Test that customers get customer permissions."""
permissions = get_permissions_for_roles(["parent"], UserType.CUSTOMER)
assert "children:read" in permissions
assert "own_grades:read" in permissions
assert "consent:manage" in permissions
# Should not have employee permissions
assert "grades:write" not in permissions
def test_admin_permissions(self):
"""Test that admins get admin permissions."""
permissions = get_permissions_for_roles(["admin"], UserType.EMPLOYEE)
assert "users:manage" in permissions
assert "audit:read" in permissions
assert "rbac:write" in permissions
def test_schul_admin_permissions(self):
"""Test that school admins get admin permissions."""
permissions = get_permissions_for_roles(["schul_admin"], UserType.EMPLOYEE)
assert "users:manage" in permissions
assert "settings:write" in permissions
class TestEmployeeRoles:
"""Test employee role constants."""
def test_employee_roles_defined(self):
"""Test that expected employee roles are defined."""
expected_roles = [
"admin", "schul_admin", "teacher", "klassenlehrer",
"fachlehrer", "sekretariat", "data_protection_officer"
]
for role in expected_roles:
assert role in EMPLOYEE_ROLES, f"Missing employee role: {role}"
def test_customer_roles_not_in_employee(self):
"""Test that customer roles are not in employee roles."""
for role in CUSTOMER_ROLES:
assert role not in EMPLOYEE_ROLES
class TestCustomerRoles:
"""Test customer role constants."""
def test_customer_roles_defined(self):
"""Test that expected customer roles are defined."""
expected_roles = ["parent", "student", "user"]
for role in expected_roles:
assert role in CUSTOMER_ROLES, f"Missing customer role: {role}"
class TestPermissionConstants:
"""Test permission constants are properly defined."""
def test_employee_permissions_not_empty(self):
"""Test employee permissions list is not empty."""
assert len(EMPLOYEE_PERMISSIONS) > 0
def test_customer_permissions_not_empty(self):
"""Test customer permissions list is not empty."""
assert len(CUSTOMER_PERMISSIONS) > 0
def test_admin_permissions_not_empty(self):
"""Test admin permissions list is not empty."""
assert len(ADMIN_PERMISSIONS) > 0
def test_no_duplicate_permissions(self):
"""Test there are no duplicate permissions within each category."""
assert len(EMPLOYEE_PERMISSIONS) == len(set(EMPLOYEE_PERMISSIONS))
assert len(CUSTOMER_PERMISSIONS) == len(set(CUSTOMER_PERMISSIONS))
assert len(ADMIN_PERMISSIONS) == len(set(ADMIN_PERMISSIONS))
class TestSessionStore:
"""Test SessionStore class."""
@pytest.fixture
def mock_store(self):
"""Create a mock session store."""
store = SessionStore(
valkey_url="redis://localhost:6379",
database_url=None,
session_ttl_hours=24,
)
# Disable Valkey for tests
store._valkey_available = False
return store
def test_session_ttl_default(self, mock_store):
"""Test default session TTL is 24 hours."""
assert mock_store.session_ttl == timedelta(hours=24)
assert mock_store.session_ttl_seconds == 24 * 3600
def test_valkey_key_format(self, mock_store):
"""Test Valkey key format."""
key = mock_store._get_valkey_key("test-session-id")
assert key == "session:test-session-id"
def test_hash_token(self, mock_store):
"""Test token hashing."""
hash1 = mock_store._hash_token("token1")
hash2 = mock_store._hash_token("token1")
hash3 = mock_store._hash_token("token2")
assert hash1 == hash2 # Same token, same hash
assert hash1 != hash3 # Different tokens, different hashes
assert len(hash1) == 64 # SHA-256 hex length
class TestSessionExpiry:
"""Test session expiry handling."""
def test_session_created_with_timestamps(self):
"""Test that session is created with proper timestamps."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
)
assert session.created_at is not None
assert session.last_activity_at is not None
assert session.created_at <= datetime.now(timezone.utc)
class TestFastAPIIntegration:
"""Test FastAPI middleware integration."""
@pytest.fixture
def mock_request(self):
"""Create a mock FastAPI request."""
request = MagicMock()
request.headers = {}
request.cookies = {}
request.state = MagicMock()
return request
def test_extract_session_from_bearer(self, mock_request):
"""Test extracting session ID from Bearer token."""
from session.session_middleware import _extract_session_id_from_request
mock_request.headers = {"authorization": "Bearer test-session-id"}
mock_request.cookies = {}
session_id = _extract_session_id_from_request(mock_request)
assert session_id == "test-session-id"
def test_extract_session_from_header(self, mock_request):
"""Test extracting session ID from X-Session-ID header."""
from session.session_middleware import _extract_session_id_from_request
mock_request.headers = {"x-session-id": "test-session-id"}
mock_request.cookies = {}
session_id = _extract_session_id_from_request(mock_request)
assert session_id == "test-session-id"
def test_extract_session_from_cookie(self, mock_request):
"""Test extracting session ID from cookie."""
from session.session_middleware import _extract_session_id_from_request
mock_request.headers = {}
mock_request.cookies = {"session_id": "test-session-id"}
# Dict already has .get() method that works correctly
session_id = _extract_session_id_from_request(mock_request)
assert session_id == "test-session-id"
def test_bearer_takes_precedence(self, mock_request):
"""Test that Bearer token takes precedence over cookie."""
from session.session_middleware import _extract_session_id_from_request
mock_request.headers = {"authorization": "Bearer bearer-id"}
mock_request.cookies = {"session_id": "cookie-id"}
session_id = _extract_session_id_from_request(mock_request)
assert session_id == "bearer-id"
def test_demo_session_in_development(self):
"""Test that demo session is returned in development mode."""
from session.session_middleware import _get_demo_session
demo = _get_demo_session()
assert demo.session_id == "demo-session-id"
assert demo.email == "demo@breakpilot.app"
assert demo.user_type == UserType.EMPLOYEE
assert "admin" in demo.roles
assert "grades:read" in demo.permissions
class TestRBACMiddlewareFunctions:
"""Test RBAC middleware helper functions."""
def test_check_resource_ownership_owner(self):
"""Test resource ownership check for owner."""
from session.rbac_middleware import check_resource_ownership
session = Session(
session_id="test",
user_id="user-123",
email="test@test.com",
user_type=UserType.EMPLOYEE,
roles=["teacher"],
)
assert check_resource_ownership(session, "user-123") is True
assert check_resource_ownership(session, "user-456") is False
def test_check_resource_ownership_admin(self):
"""Test resource ownership check for admin."""
from session.rbac_middleware import check_resource_ownership
session = Session(
session_id="test",
user_id="admin-123",
email="admin@test.com",
user_type=UserType.EMPLOYEE,
roles=["admin"],
)
# Admin can access other user's resource with allow_admin=True
assert check_resource_ownership(session, "user-456", allow_admin=True) is True
# Admin cannot access without allow_admin
assert check_resource_ownership(session, "user-456", allow_admin=False) is False
def test_check_resource_ownership_super_admin(self):
"""Test resource ownership check for super admin."""
from session.rbac_middleware import check_resource_ownership
session = Session(
session_id="test",
user_id="super-123",
email="super@test.com",
user_type=UserType.EMPLOYEE,
roles=["super_admin"],
)
assert check_resource_ownership(session, "user-456", allow_admin=True) is True
class TestSessionSerialization:
"""Test session serialization and deserialization."""
def test_round_trip_serialization(self):
"""Test that session survives serialization round-trip."""
original = Session(
session_id="test-session-id",
user_id="test-user-id",
email="test@example.com",
user_type=UserType.EMPLOYEE,
roles=["teacher", "klassenlehrer"],
permissions=["grades:read", "grades:write"],
tenant_id="tenant-123",
ip_address="192.168.1.1",
user_agent="Mozilla/5.0",
)
# Serialize and deserialize
data = original.to_dict()
restored = Session.from_dict(data)
assert restored.session_id == original.session_id
assert restored.user_id == original.user_id
assert restored.email == original.email
assert restored.user_type == original.user_type
assert restored.roles == original.roles
assert restored.permissions == original.permissions
assert restored.tenant_id == original.tenant_id
assert restored.ip_address == original.ip_address
assert restored.user_agent == original.user_agent
def test_json_serialization(self):
"""Test JSON serialization of session."""
import json
session = Session(
session_id="test-session-id",
user_id="test-user-id",
email="test@example.com",
user_type=UserType.CUSTOMER,
roles=["parent"],
permissions=["children:read"],
)
# Should not raise
json_str = json.dumps(session.to_dict())
assert "test-session-id" in json_str
assert "customer" in json_str
def test_from_dict_with_missing_optional_fields(self):
"""Test creating session from dict with missing optional fields."""
data = {
"session_id": "test",
"user_id": "user-123",
"email": "test@test.com",
"user_type": "employee",
}
session = Session.from_dict(data)
assert session.session_id == "test"
assert session.roles == []
assert session.permissions == []
assert session.tenant_id is None
class TestUserTypeEnum:
"""Test UserType enum."""
def test_employee_value(self):
"""Test employee enum value."""
assert UserType.EMPLOYEE.value == "employee"
assert UserType.EMPLOYEE == "employee"
def test_customer_value(self):
"""Test customer enum value."""
assert UserType.CUSTOMER.value == "customer"
assert UserType.CUSTOMER == "customer"
def test_from_string(self):
"""Test creating UserType from string."""
assert UserType("employee") == UserType.EMPLOYEE
assert UserType("customer") == UserType.CUSTOMER
class TestSessionStoreConfiguration:
"""Test SessionStore configuration."""
def test_default_ttl(self):
"""Test default TTL configuration."""
store = SessionStore(
valkey_url=None,
database_url=None,
)
assert store.session_ttl == timedelta(hours=24)
def test_custom_ttl(self):
"""Test custom TTL configuration."""
store = SessionStore(
valkey_url=None,
database_url=None,
session_ttl_hours=12,
)
assert store.session_ttl == timedelta(hours=12)
assert store.session_ttl_seconds == 12 * 3600
def test_valkey_key_generation(self):
"""Test Valkey key generation."""
store = SessionStore()
key1 = store._get_valkey_key("session-123")
key2 = store._get_valkey_key("session-456")
assert key1 == "session:session-123"
assert key2 == "session:session-456"
assert key1 != key2
class TestPermissionMatching:
"""Test permission pattern matching."""
def test_exact_permission_match(self):
"""Test exact permission matching."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
permissions=["grades:read"],
)
assert session.has_permission("grades:read") is True
assert session.has_permission("grades:write") is False
def test_multiple_permissions(self):
"""Test session with multiple permissions."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.EMPLOYEE,
permissions=[
"grades:read", "grades:write",
"attendance:read", "attendance:write",
"students:read",
],
)
assert session.has_permission("grades:read") is True
assert session.has_permission("attendance:write") is True
assert session.has_permission("users:manage") is False
def test_empty_permissions(self):
"""Test session with no permissions."""
session = Session(
session_id="test",
user_id="test",
email="test@test.com",
user_type=UserType.CUSTOMER,
permissions=[],
)
assert session.has_permission("any:permission") is False
assert session.has_any_permission(["grades:read"]) is False
assert session.has_all_permissions([]) is True # Empty list = vacuously true
if __name__ == "__main__":
pytest.main([__file__, "-v"])