This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/rbac_api.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

820 lines
28 KiB
Python

"""
RBAC API - Teacher and Role Management Endpoints
Provides API endpoints for:
- Listing all teachers
- Listing all available roles
- Assigning/revoking roles to teachers
- Viewing role assignments per teacher
Architecture:
- Authentication: Keycloak (when configured) or local JWT
- Authorization: Custom rbac.py for fine-grained permissions
"""
import os
import asyncpg
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any
from fastapi import APIRouter, HTTPException, Depends, Request
from pydantic import BaseModel
# Import hybrid auth module
try:
from auth import get_current_user, TokenExpiredError, TokenInvalidError
except ImportError:
# Fallback for standalone testing
from auth.keycloak_auth import get_current_user, TokenExpiredError, TokenInvalidError
# Configuration from environment - NO DEFAULT SECRETS
ENVIRONMENT = os.environ.get("ENVIRONMENT", "development")
router = APIRouter(prefix="/rbac", tags=["rbac"])
# Connection pool
_pool: Optional[asyncpg.Pool] = None
def _get_database_url() -> str:
"""Get DATABASE_URL from environment, raising error if not set."""
url = os.environ.get("DATABASE_URL")
if not url:
raise RuntimeError("DATABASE_URL nicht konfiguriert - bitte via Vault oder Umgebungsvariable setzen")
return url
async def get_pool() -> asyncpg.Pool:
"""Get or create database connection pool"""
global _pool
if _pool is None:
database_url = _get_database_url()
_pool = await asyncpg.create_pool(database_url, min_size=2, max_size=10)
return _pool
async def close_pool():
"""Close database connection pool"""
global _pool
if _pool:
await _pool.close()
_pool = None
# Pydantic Models
class RoleAssignmentCreate(BaseModel):
user_id: str
role: str
resource_type: str = "tenant"
resource_id: str
valid_to: Optional[str] = None
class RoleAssignmentRevoke(BaseModel):
assignment_id: str
class TeacherCreate(BaseModel):
email: str
first_name: str
last_name: str
teacher_code: Optional[str] = None
title: Optional[str] = None
roles: List[str] = []
class TeacherUpdate(BaseModel):
email: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
teacher_code: Optional[str] = None
title: Optional[str] = None
is_active: Optional[bool] = None
class CustomRoleCreate(BaseModel):
role_key: str
display_name: str
description: str
category: str
class CustomRoleUpdate(BaseModel):
display_name: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
class TeacherResponse(BaseModel):
id: str
user_id: str
email: str
name: str
teacher_code: Optional[str]
title: Optional[str]
first_name: str
last_name: str
is_active: bool
roles: List[str]
class RoleInfo(BaseModel):
role: str
display_name: str
description: str
category: str
class RoleAssignmentResponse(BaseModel):
id: str
user_id: str
role: str
resource_type: str
resource_id: str
valid_from: str
valid_to: Optional[str]
granted_at: str
is_active: bool
# Role definitions with German display names
AVAILABLE_ROLES = {
# Klausur-Korrekturkette
"erstkorrektor": {
"display_name": "Erstkorrektor",
"description": "Fuehrt die erste Korrektur der Klausur durch",
"category": "klausur"
},
"zweitkorrektor": {
"display_name": "Zweitkorrektor",
"description": "Fuehrt die zweite Korrektur der Klausur durch",
"category": "klausur"
},
"drittkorrektor": {
"display_name": "Drittkorrektor",
"description": "Fuehrt die dritte Korrektur bei Notenabweichung durch",
"category": "klausur"
},
# Zeugnis-Workflow
"klassenlehrer": {
"display_name": "Klassenlehrer/in",
"description": "Erstellt Zeugnisse, traegt Kopfnoten und Bemerkungen ein",
"category": "zeugnis"
},
"fachlehrer": {
"display_name": "Fachlehrer/in",
"description": "Traegt Fachnoten ein",
"category": "zeugnis"
},
"zeugnisbeauftragter": {
"display_name": "Zeugnisbeauftragte/r",
"description": "Qualitaetskontrolle und Freigabe von Zeugnissen",
"category": "zeugnis"
},
"sekretariat": {
"display_name": "Sekretariat",
"description": "Druck, Versand und Archivierung von Dokumenten",
"category": "verwaltung"
},
# Leitung
"fachvorsitz": {
"display_name": "Fachvorsitz",
"description": "Fachpruefungsleitung und Qualitaetssicherung",
"category": "leitung"
},
"pruefungsvorsitz": {
"display_name": "Pruefungsvorsitz",
"description": "Pruefungsleitung und finale Freigabe",
"category": "leitung"
},
"schulleitung": {
"display_name": "Schulleitung",
"description": "Finale Freigabe und Unterschrift",
"category": "leitung"
},
"stufenleitung": {
"display_name": "Stufenleitung",
"description": "Koordination einer Jahrgangsstufe",
"category": "leitung"
},
# Administration
"schul_admin": {
"display_name": "Schul-Administrator",
"description": "Technische Administration der Schule",
"category": "admin"
},
"teacher_assistant": {
"display_name": "Referendar/in",
"description": "Lehrkraft in Ausbildung mit eingeschraenkten Rechten",
"category": "other"
},
}
# Note: get_user_from_token is replaced by the imported get_current_user dependency
# from auth module which supports both Keycloak and local JWT authentication
# API Endpoints
@router.get("/roles")
async def list_available_roles() -> List[RoleInfo]:
"""List all available roles with their descriptions"""
return [
RoleInfo(
role=role_key,
display_name=role_data["display_name"],
description=role_data["description"],
category=role_data["category"]
)
for role_key, role_data in AVAILABLE_ROLES.items()
]
@router.get("/teachers")
async def list_teachers(user: Dict[str, Any] = Depends(get_current_user)) -> List[TeacherResponse]:
"""List all teachers with their current roles"""
pool = await get_pool()
async with pool.acquire() as conn:
# Get all teachers with their user info
teachers = await conn.fetch("""
SELECT
t.id, t.user_id, t.teacher_code, t.title,
t.first_name, t.last_name, t.is_active,
u.email, u.name
FROM teachers t
JOIN users u ON t.user_id = u.id
WHERE t.school_id = 'a0000000-0000-0000-0000-000000000001'
ORDER BY t.last_name, t.first_name
""")
# Get role assignments for all teachers
role_assignments = await conn.fetch("""
SELECT user_id, role
FROM role_assignments
WHERE tenant_id = 'a0000000-0000-0000-0000-000000000001'
AND revoked_at IS NULL
AND (valid_to IS NULL OR valid_to > NOW())
""")
# Build role lookup
role_lookup: Dict[str, List[str]] = {}
for ra in role_assignments:
uid = str(ra["user_id"])
if uid not in role_lookup:
role_lookup[uid] = []
role_lookup[uid].append(ra["role"])
# Build response
result = []
for t in teachers:
uid = str(t["user_id"])
result.append(TeacherResponse(
id=str(t["id"]),
user_id=uid,
email=t["email"],
name=t["name"] or f"{t['first_name']} {t['last_name']}",
teacher_code=t["teacher_code"],
title=t["title"],
first_name=t["first_name"],
last_name=t["last_name"],
is_active=t["is_active"],
roles=role_lookup.get(uid, [])
))
return result
@router.get("/teachers/{teacher_id}/roles")
async def get_teacher_roles(teacher_id: str, user: Dict[str, Any] = Depends(get_current_user)) -> List[RoleAssignmentResponse]:
"""Get all role assignments for a specific teacher"""
pool = await get_pool()
async with pool.acquire() as conn:
# Get teacher's user_id
teacher = await conn.fetchrow(
"SELECT user_id FROM teachers WHERE id = $1",
teacher_id
)
if not teacher:
raise HTTPException(status_code=404, detail="Teacher not found")
# Get role assignments
assignments = await conn.fetch("""
SELECT id, user_id, role, resource_type, resource_id,
valid_from, valid_to, granted_at, revoked_at
FROM role_assignments
WHERE user_id = $1
ORDER BY granted_at DESC
""", teacher["user_id"])
return [
RoleAssignmentResponse(
id=str(a["id"]),
user_id=str(a["user_id"]),
role=a["role"],
resource_type=a["resource_type"],
resource_id=str(a["resource_id"]),
valid_from=a["valid_from"].isoformat() if a["valid_from"] else None,
valid_to=a["valid_to"].isoformat() if a["valid_to"] else None,
granted_at=a["granted_at"].isoformat() if a["granted_at"] else None,
is_active=a["revoked_at"] is None and (
a["valid_to"] is None or a["valid_to"] > datetime.now(timezone.utc)
)
)
for a in assignments
]
@router.get("/roles/{role}/teachers")
async def get_teachers_by_role(role: str, user: Dict[str, Any] = Depends(get_current_user)) -> List[TeacherResponse]:
"""Get all teachers with a specific role"""
if role not in AVAILABLE_ROLES:
raise HTTPException(status_code=400, detail=f"Unknown role: {role}")
pool = await get_pool()
async with pool.acquire() as conn:
teachers = await conn.fetch("""
SELECT DISTINCT
t.id, t.user_id, t.teacher_code, t.title,
t.first_name, t.last_name, t.is_active,
u.email, u.name
FROM teachers t
JOIN users u ON t.user_id = u.id
JOIN role_assignments ra ON t.user_id = ra.user_id
WHERE ra.role = $1
AND ra.revoked_at IS NULL
AND (ra.valid_to IS NULL OR ra.valid_to > NOW())
AND t.school_id = 'a0000000-0000-0000-0000-000000000001'
ORDER BY t.last_name, t.first_name
""", role)
# Get all roles for these teachers
if teachers:
user_ids = [t["user_id"] for t in teachers]
role_assignments = await conn.fetch("""
SELECT user_id, role
FROM role_assignments
WHERE user_id = ANY($1)
AND revoked_at IS NULL
AND (valid_to IS NULL OR valid_to > NOW())
""", user_ids)
role_lookup: Dict[str, List[str]] = {}
for ra in role_assignments:
uid = str(ra["user_id"])
if uid not in role_lookup:
role_lookup[uid] = []
role_lookup[uid].append(ra["role"])
else:
role_lookup = {}
return [
TeacherResponse(
id=str(t["id"]),
user_id=str(t["user_id"]),
email=t["email"],
name=t["name"] or f"{t['first_name']} {t['last_name']}",
teacher_code=t["teacher_code"],
title=t["title"],
first_name=t["first_name"],
last_name=t["last_name"],
is_active=t["is_active"],
roles=role_lookup.get(str(t["user_id"]), [])
)
for t in teachers
]
@router.post("/assignments")
async def assign_role(assignment: RoleAssignmentCreate, user: Dict[str, Any] = Depends(get_current_user)) -> RoleAssignmentResponse:
"""Assign a role to a user"""
if assignment.role not in AVAILABLE_ROLES:
raise HTTPException(status_code=400, detail=f"Unknown role: {assignment.role}")
pool = await get_pool()
async with pool.acquire() as conn:
# Check if assignment already exists
existing = await conn.fetchrow("""
SELECT id FROM role_assignments
WHERE user_id = $1 AND role = $2 AND resource_id = $3
AND revoked_at IS NULL
""", assignment.user_id, assignment.role, assignment.resource_id)
if existing:
raise HTTPException(
status_code=409,
detail="Role assignment already exists"
)
# Parse valid_to if provided
valid_to = None
if assignment.valid_to:
valid_to = datetime.fromisoformat(assignment.valid_to)
# Create assignment
result = await conn.fetchrow("""
INSERT INTO role_assignments
(user_id, role, resource_type, resource_id, tenant_id, valid_to, granted_by)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, user_id, role, resource_type, resource_id, valid_from, valid_to, granted_at
""",
assignment.user_id,
assignment.role,
assignment.resource_type,
assignment.resource_id,
assignment.resource_id, # tenant_id same as resource_id for tenant-level roles
valid_to,
user.get("user_id")
)
return RoleAssignmentResponse(
id=str(result["id"]),
user_id=str(result["user_id"]),
role=result["role"],
resource_type=result["resource_type"],
resource_id=str(result["resource_id"]),
valid_from=result["valid_from"].isoformat(),
valid_to=result["valid_to"].isoformat() if result["valid_to"] else None,
granted_at=result["granted_at"].isoformat(),
is_active=True
)
@router.delete("/assignments/{assignment_id}")
async def revoke_role(assignment_id: str, user: Dict[str, Any] = Depends(get_current_user)):
"""Revoke a role assignment"""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute("""
UPDATE role_assignments
SET revoked_at = NOW()
WHERE id = $1 AND revoked_at IS NULL
""", assignment_id)
if result == "UPDATE 0":
raise HTTPException(status_code=404, detail="Assignment not found or already revoked")
return {"status": "revoked", "assignment_id": assignment_id}
@router.get("/summary")
async def get_role_summary(user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""Get a summary of roles and their assignment counts"""
pool = await get_pool()
async with pool.acquire() as conn:
counts = await conn.fetch("""
SELECT role, COUNT(*) as count
FROM role_assignments
WHERE tenant_id = 'a0000000-0000-0000-0000-000000000001'
AND revoked_at IS NULL
AND (valid_to IS NULL OR valid_to > NOW())
GROUP BY role
ORDER BY role
""")
total_teachers = await conn.fetchval("""
SELECT COUNT(*) FROM teachers
WHERE school_id = 'a0000000-0000-0000-0000-000000000001'
AND is_active = true
""")
role_counts = {c["role"]: c["count"] for c in counts}
# Also include custom roles from database
custom_roles = await conn.fetch("""
SELECT role_key, display_name, category
FROM custom_roles
WHERE tenant_id = 'a0000000-0000-0000-0000-000000000001'
AND is_active = true
""")
all_roles = [
{
"role": role_key,
"display_name": role_data["display_name"],
"category": role_data["category"],
"count": role_counts.get(role_key, 0),
"is_custom": False
}
for role_key, role_data in AVAILABLE_ROLES.items()
]
for cr in custom_roles:
all_roles.append({
"role": cr["role_key"],
"display_name": cr["display_name"],
"category": cr["category"],
"count": role_counts.get(cr["role_key"], 0),
"is_custom": True
})
return {
"total_teachers": total_teachers,
"roles": all_roles
}
# ==========================================
# TEACHER MANAGEMENT ENDPOINTS
# ==========================================
@router.post("/teachers")
async def create_teacher(teacher: TeacherCreate, user: Dict[str, Any] = Depends(get_current_user)) -> TeacherResponse:
"""Create a new teacher with optional initial roles"""
pool = await get_pool()
import uuid
async with pool.acquire() as conn:
# Check if email already exists
existing = await conn.fetchrow(
"SELECT id FROM users WHERE email = $1",
teacher.email
)
if existing:
raise HTTPException(status_code=409, detail="Email already exists")
# Generate UUIDs
user_id = str(uuid.uuid4())
teacher_id = str(uuid.uuid4())
# Create user first
await conn.execute("""
INSERT INTO users (id, email, name, password_hash, role, is_active)
VALUES ($1, $2, $3, '', 'teacher', true)
""", user_id, teacher.email, f"{teacher.first_name} {teacher.last_name}")
# Create teacher record
await conn.execute("""
INSERT INTO teachers (id, user_id, school_id, first_name, last_name, teacher_code, title, is_active)
VALUES ($1, $2, 'a0000000-0000-0000-0000-000000000001', $3, $4, $5, $6, true)
""", teacher_id, user_id, teacher.first_name, teacher.last_name,
teacher.teacher_code, teacher.title)
# Assign initial roles if provided
assigned_roles = []
for role in teacher.roles:
if role in AVAILABLE_ROLES or await conn.fetchrow(
"SELECT 1 FROM custom_roles WHERE role_key = $1 AND is_active = true", role
):
await conn.execute("""
INSERT INTO role_assignments (user_id, role, resource_type, resource_id, tenant_id, granted_by)
VALUES ($1, $2, 'tenant', 'a0000000-0000-0000-0000-000000000001',
'a0000000-0000-0000-0000-000000000001', $3)
""", user_id, role, user.get("user_id"))
assigned_roles.append(role)
return TeacherResponse(
id=teacher_id,
user_id=user_id,
email=teacher.email,
name=f"{teacher.first_name} {teacher.last_name}",
teacher_code=teacher.teacher_code,
title=teacher.title,
first_name=teacher.first_name,
last_name=teacher.last_name,
is_active=True,
roles=assigned_roles
)
@router.put("/teachers/{teacher_id}")
async def update_teacher(teacher_id: str, updates: TeacherUpdate, user: Dict[str, Any] = Depends(get_current_user)) -> TeacherResponse:
"""Update teacher information"""
pool = await get_pool()
async with pool.acquire() as conn:
# Get current teacher data
teacher = await conn.fetchrow("""
SELECT t.id, t.user_id, t.teacher_code, t.title, t.first_name, t.last_name, t.is_active,
u.email, u.name
FROM teachers t
JOIN users u ON t.user_id = u.id
WHERE t.id = $1
""", teacher_id)
if not teacher:
raise HTTPException(status_code=404, detail="Teacher not found")
# Build update queries
if updates.email:
await conn.execute("UPDATE users SET email = $1 WHERE id = $2",
updates.email, teacher["user_id"])
teacher_updates = []
teacher_values = []
idx = 1
if updates.first_name:
teacher_updates.append(f"first_name = ${idx}")
teacher_values.append(updates.first_name)
idx += 1
if updates.last_name:
teacher_updates.append(f"last_name = ${idx}")
teacher_values.append(updates.last_name)
idx += 1
if updates.teacher_code is not None:
teacher_updates.append(f"teacher_code = ${idx}")
teacher_values.append(updates.teacher_code)
idx += 1
if updates.title is not None:
teacher_updates.append(f"title = ${idx}")
teacher_values.append(updates.title)
idx += 1
if updates.is_active is not None:
teacher_updates.append(f"is_active = ${idx}")
teacher_values.append(updates.is_active)
idx += 1
if teacher_updates:
teacher_values.append(teacher_id)
await conn.execute(
f"UPDATE teachers SET {', '.join(teacher_updates)} WHERE id = ${idx}",
*teacher_values
)
# Update user name if first/last name changed
if updates.first_name or updates.last_name:
new_first = updates.first_name or teacher["first_name"]
new_last = updates.last_name or teacher["last_name"]
await conn.execute("UPDATE users SET name = $1 WHERE id = $2",
f"{new_first} {new_last}", teacher["user_id"])
# Fetch updated data
updated = await conn.fetchrow("""
SELECT t.id, t.user_id, t.teacher_code, t.title, t.first_name, t.last_name, t.is_active,
u.email, u.name
FROM teachers t
JOIN users u ON t.user_id = u.id
WHERE t.id = $1
""", teacher_id)
# Get roles
roles = await conn.fetch("""
SELECT role FROM role_assignments
WHERE user_id = $1 AND revoked_at IS NULL
AND (valid_to IS NULL OR valid_to > NOW())
""", updated["user_id"])
return TeacherResponse(
id=str(updated["id"]),
user_id=str(updated["user_id"]),
email=updated["email"],
name=updated["name"],
teacher_code=updated["teacher_code"],
title=updated["title"],
first_name=updated["first_name"],
last_name=updated["last_name"],
is_active=updated["is_active"],
roles=[r["role"] for r in roles]
)
@router.delete("/teachers/{teacher_id}")
async def deactivate_teacher(teacher_id: str, user: Dict[str, Any] = Depends(get_current_user)):
"""Deactivate a teacher (soft delete)"""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute("""
UPDATE teachers SET is_active = false WHERE id = $1
""", teacher_id)
if result == "UPDATE 0":
raise HTTPException(status_code=404, detail="Teacher not found")
return {"status": "deactivated", "teacher_id": teacher_id}
# ==========================================
# CUSTOM ROLE MANAGEMENT ENDPOINTS
# ==========================================
@router.get("/custom-roles")
async def list_custom_roles(user: Dict[str, Any] = Depends(get_current_user)) -> List[RoleInfo]:
"""List all custom roles"""
pool = await get_pool()
async with pool.acquire() as conn:
roles = await conn.fetch("""
SELECT role_key, display_name, description, category
FROM custom_roles
WHERE tenant_id = 'a0000000-0000-0000-0000-000000000001'
AND is_active = true
ORDER BY category, display_name
""")
return [
RoleInfo(
role=r["role_key"],
display_name=r["display_name"],
description=r["description"],
category=r["category"]
)
for r in roles
]
@router.post("/custom-roles")
async def create_custom_role(role: CustomRoleCreate, user: Dict[str, Any] = Depends(get_current_user)) -> RoleInfo:
"""Create a new custom role"""
pool = await get_pool()
# Check if role_key conflicts with built-in roles
if role.role_key in AVAILABLE_ROLES:
raise HTTPException(status_code=409, detail="Role key conflicts with built-in role")
async with pool.acquire() as conn:
# Check if custom role already exists
existing = await conn.fetchrow("""
SELECT id FROM custom_roles
WHERE role_key = $1 AND tenant_id = 'a0000000-0000-0000-0000-000000000001'
""", role.role_key)
if existing:
raise HTTPException(status_code=409, detail="Custom role already exists")
await conn.execute("""
INSERT INTO custom_roles (role_key, display_name, description, category, tenant_id, created_by)
VALUES ($1, $2, $3, $4, 'a0000000-0000-0000-0000-000000000001', $5)
""", role.role_key, role.display_name, role.description, role.category, user.get("user_id"))
return RoleInfo(
role=role.role_key,
display_name=role.display_name,
description=role.description,
category=role.category
)
@router.put("/custom-roles/{role_key}")
async def update_custom_role(role_key: str, updates: CustomRoleUpdate, user: Dict[str, Any] = Depends(get_current_user)) -> RoleInfo:
"""Update a custom role"""
if role_key in AVAILABLE_ROLES:
raise HTTPException(status_code=400, detail="Cannot modify built-in roles")
pool = await get_pool()
async with pool.acquire() as conn:
current = await conn.fetchrow("""
SELECT role_key, display_name, description, category
FROM custom_roles
WHERE role_key = $1 AND tenant_id = 'a0000000-0000-0000-0000-000000000001'
AND is_active = true
""", role_key)
if not current:
raise HTTPException(status_code=404, detail="Custom role not found")
new_display = updates.display_name or current["display_name"]
new_desc = updates.description or current["description"]
new_cat = updates.category or current["category"]
await conn.execute("""
UPDATE custom_roles
SET display_name = $1, description = $2, category = $3
WHERE role_key = $4 AND tenant_id = 'a0000000-0000-0000-0000-000000000001'
""", new_display, new_desc, new_cat, role_key)
return RoleInfo(
role=role_key,
display_name=new_display,
description=new_desc,
category=new_cat
)
@router.delete("/custom-roles/{role_key}")
async def delete_custom_role(role_key: str, user: Dict[str, Any] = Depends(get_current_user)):
"""Delete a custom role (soft delete)"""
if role_key in AVAILABLE_ROLES:
raise HTTPException(status_code=400, detail="Cannot delete built-in roles")
pool = await get_pool()
async with pool.acquire() as conn:
# Soft delete the role
result = await conn.execute("""
UPDATE custom_roles SET is_active = false
WHERE role_key = $1 AND tenant_id = 'a0000000-0000-0000-0000-000000000001'
""", role_key)
if result == "UPDATE 0":
raise HTTPException(status_code=404, detail="Custom role not found")
# Also revoke all assignments with this role
await conn.execute("""
UPDATE role_assignments SET revoked_at = NOW()
WHERE role = $1 AND tenant_id = 'a0000000-0000-0000-0000-000000000001'
AND revoked_at IS NULL
""", role_key)
return {"status": "deleted", "role_key": role_key}