Files
Benjamin Admin 95fcba34cd
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Failing after 30s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 17s
fix(quality): Ruff/CVE/TS-Fixes, 104 neue Tests, Complexity-Refactoring
- Ruff: 144 auto-fixes (unused imports, == None → is None), F821/F811/F841 manuell
- CVEs: python-multipart>=0.0.22, weasyprint>=68.0, pillow>=12.1.1, npm audit fix (0 vulns)
- TS: 5 tote Drafting-Engine-Dateien entfernt, allowed-facts/sanitizer/StepHeader/context fixes
- Tests: +104 (ISMS 58, Evidence 18, VVT 14, Generation 14) → 1449 passed
- Refactoring: collect_ci_evidence (F→A), row_to_response (E→A), extract_requirements (E→A)
- Dead Code: pca-platform, 7 Go-Handler, dsr_api.py, duplicate Schemas entfernt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 19:00:33 +01:00

217 lines
8.1 KiB
Python

"""
Generic CRUD Router Factory for Compliance API.
Creates standardized CRUD endpoints (list, create, get, update, delete)
for simple resource tables that follow the tenant-isolated pattern:
- Table has `id`, `tenant_id`, `created_at`, `updated_at` columns
- All queries filtered by tenant_id
Usage:
router = create_crud_router(
prefix="/security-backlog",
table_name="compliance_security_backlog",
tag="security-backlog",
columns=["title", "description", "type", "severity", "status", ...],
search_columns=["title", "description"],
filter_columns=["status", "severity", "type"],
order_by="created_at DESC",
resource_name="Security item",
)
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from .tenant_utils import get_tenant_id
from .db_utils import row_to_dict
logger = logging.getLogger(__name__)
def create_crud_router(
prefix: str,
table_name: str,
tag: str,
columns: List[str],
search_columns: Optional[List[str]] = None,
filter_columns: Optional[List[str]] = None,
order_by: str = "created_at DESC",
resource_name: str = "Item",
stats_query: Optional[str] = None,
stats_defaults: Optional[Dict[str, int]] = None,
) -> APIRouter:
"""Create a CRUD router with list, create, get/{id}, update/{id}, delete/{id}.
Args:
prefix: URL prefix (e.g. "/security-backlog")
table_name: PostgreSQL table name
tag: OpenAPI tag
columns: Writable column names (excluding id, tenant_id, created_at, updated_at)
search_columns: Columns to ILIKE-search (default: ["title", "description"])
filter_columns: Columns to filter by exact match via query params
order_by: SQL ORDER BY clause
resource_name: Human-readable name for error messages
stats_query: Optional custom SQL for /stats endpoint (must accept :tenant_id param)
stats_defaults: Default dict for stats when no rows found
"""
router = APIRouter(prefix=prefix, tags=[tag])
_search_cols = search_columns or ["title", "description"]
_filter_cols = filter_columns or []
# ── LIST ──────────────────────────────────────────────────────────────
@router.get("")
async def list_items(
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
**kwargs,
):
where = ["tenant_id = :tenant_id"]
params: Dict[str, Any] = {"tenant_id": tenant_id, "limit": limit, "offset": offset}
# Dynamic filter columns from query string
# We can't use **kwargs with FastAPI easily, so we handle this in a wrapper
if search and _search_cols:
clauses = [f"{c} ILIKE :search" for c in _search_cols]
where.append(f"({' OR '.join(clauses)})")
params["search"] = f"%{search}%"
where_sql = " AND ".join(where)
total_row = db.execute(
text(f"SELECT COUNT(*) FROM {table_name} WHERE {where_sql}"),
params,
).fetchone()
total = total_row[0] if total_row else 0
rows = db.execute(
text(f"""
SELECT * FROM {table_name}
WHERE {where_sql}
ORDER BY {order_by}
LIMIT :limit OFFSET :offset
"""),
params,
).fetchall()
return {"items": [row_to_dict(r) for r in rows], "total": total}
# ── STATS (optional) ─────────────────────────────────────────────────
if stats_query:
@router.get("/stats")
async def get_stats(
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
):
row = db.execute(text(stats_query), {"tenant_id": tenant_id}).fetchone()
if row:
d = dict(row._mapping)
return {k: (v or 0) for k, v in d.items()}
return stats_defaults or {}
# ── CREATE ────────────────────────────────────────────────────────────
@router.post("", status_code=201)
async def create_item(
payload: dict = {},
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
):
col_names = ["tenant_id"]
col_params = [":tenant_id"]
values: Dict[str, Any] = {"tenant_id": tenant_id}
for col in columns:
if col in payload:
col_names.append(col)
col_params.append(f":{col}")
values[col] = payload[col]
row = db.execute(
text(f"""
INSERT INTO {table_name} ({', '.join(col_names)})
VALUES ({', '.join(col_params)})
RETURNING *
"""),
values,
).fetchone()
db.commit()
return row_to_dict(row)
# ── GET BY ID ─────────────────────────────────────────────────────────
@router.get("/{item_id}")
async def get_item(
item_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
):
row = db.execute(
text(f"SELECT * FROM {table_name} WHERE id = :id AND tenant_id = :tenant_id"),
{"id": item_id, "tenant_id": tenant_id},
).fetchone()
if not row:
raise HTTPException(status_code=404, detail=f"{resource_name} not found")
return row_to_dict(row)
# ── UPDATE ────────────────────────────────────────────────────────────
@router.put("/{item_id}")
async def update_item(
item_id: str,
payload: dict = {},
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
):
updates: Dict[str, Any] = {
"id": item_id,
"tenant_id": tenant_id,
"updated_at": datetime.utcnow(),
}
set_clauses = ["updated_at = :updated_at"]
for field, value in payload.items():
if field in columns:
updates[field] = value
set_clauses.append(f"{field} = :{field}")
if len(set_clauses) == 1:
raise HTTPException(status_code=400, detail="No fields to update")
row = db.execute(
text(f"""
UPDATE {table_name}
SET {', '.join(set_clauses)}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING *
"""),
updates,
).fetchone()
db.commit()
if not row:
raise HTTPException(status_code=404, detail=f"{resource_name} not found")
return row_to_dict(row)
# ── DELETE ────────────────────────────────────────────────────────────
@router.delete("/{item_id}", status_code=204)
async def delete_item(
item_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
):
result = db.execute(
text(f"DELETE FROM {table_name} WHERE id = :id AND tenant_id = :tenant_id"),
{"id": item_id, "tenant_id": tenant_id},
)
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail=f"{resource_name} not found")
return router