Files
breakpilot-compliance/backend-compliance/compliance/api/security_backlog_routes.py
Benjamin Admin 6509e64dd9
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 32s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 18s
feat(sdk): API-Referenz Frontend + Backend-Konsolidierung (Shared Utilities, CRUD Factory)
- API-Referenz Seite (/sdk/api-docs) mit ~690 Endpoints, Suche, Filter, Modul-Index
- Shared db_utils.py (row_to_dict) + tenant_utils Integration in 6 Route-Dateien
- CRUD Factory (crud_factory.py) fuer zukuenftige Module
- Version-Route Auto-Registration in versioning_utils.py
- 1338 Tests bestanden, -232 Zeilen Duplikat-Code

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 17:07:43 +01:00

246 lines
8.2 KiB
Python

"""
FastAPI routes for Security Backlog Tracking.
Endpoints:
GET /security-backlog — list with filters (status, severity, type, search; limit/offset)
GET /security-backlog/stats — open, critical, high, overdue counts
POST /security-backlog — create finding
PUT /security-backlog/{id} — update finding
DELETE /security-backlog/{id} — delete finding (204)
"""
import logging
from datetime import datetime
from typing import Optional, Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from .tenant_utils import get_tenant_id as _get_tenant_id
from .db_utils import row_to_dict as _row_to_dict
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/security-backlog", tags=["security-backlog"])
# =============================================================================
# Pydantic Schemas
# =============================================================================
class SecurityItemCreate(BaseModel):
title: str
description: Optional[str] = None
type: str = "vulnerability"
severity: str = "medium"
status: str = "open"
source: Optional[str] = None
cve: Optional[str] = None
cvss: Optional[float] = None
affected_asset: Optional[str] = None
assigned_to: Optional[str] = None
due_date: Optional[datetime] = None
remediation: Optional[str] = None
class SecurityItemUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
type: Optional[str] = None
severity: Optional[str] = None
status: Optional[str] = None
source: Optional[str] = None
cve: Optional[str] = None
cvss: Optional[float] = None
affected_asset: Optional[str] = None
assigned_to: Optional[str] = None
due_date: Optional[datetime] = None
remediation: Optional[str] = None
# =============================================================================
# Routes
# =============================================================================
@router.get("")
async def list_security_items(
status: Optional[str] = Query(None),
severity: Optional[str] = Query(None),
type: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""List security backlog items with optional filters."""
where_clauses = ["tenant_id = :tenant_id"]
params: Dict[str, Any] = {"tenant_id": tenant_id, "limit": limit, "offset": offset}
if status:
where_clauses.append("status = :status")
params["status"] = status
if severity:
where_clauses.append("severity = :severity")
params["severity"] = severity
if type:
where_clauses.append("type = :type")
params["type"] = type
if search:
where_clauses.append("(title ILIKE :search OR description ILIKE :search)")
params["search"] = f"%{search}%"
where_sql = " AND ".join(where_clauses)
total_row = db.execute(
text(f"SELECT COUNT(*) FROM compliance_security_backlog WHERE {where_sql}"),
params,
).fetchone()
total = total_row[0] if total_row else 0
rows = db.execute(
text(f"""
SELECT * FROM compliance_security_backlog
WHERE {where_sql}
ORDER BY
CASE severity
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
ELSE 3
END,
CASE status
WHEN 'open' THEN 0
WHEN 'in-progress' THEN 1
WHEN 'accepted-risk' THEN 2
ELSE 3
END,
created_at DESC
LIMIT :limit OFFSET :offset
"""),
params,
).fetchall()
return {
"items": [_row_to_dict(r) for r in rows],
"total": total,
}
@router.get("/stats")
async def get_security_stats(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Return security backlog counts."""
rows = db.execute(text("""
SELECT
COUNT(*) FILTER (WHERE status = 'open') AS open,
COUNT(*) FILTER (WHERE status = 'in-progress') AS in_progress,
COUNT(*) FILTER (WHERE status = 'resolved') AS resolved,
COUNT(*) FILTER (WHERE status = 'accepted-risk') AS accepted_risk,
COUNT(*) FILTER (WHERE severity = 'critical' AND status != 'resolved') AS critical,
COUNT(*) FILTER (WHERE severity = 'high' AND status != 'resolved') AS high,
COUNT(*) FILTER (
WHERE due_date IS NOT NULL
AND due_date < NOW()
AND status NOT IN ('resolved', 'accepted-risk')
) AS overdue,
COUNT(*) AS total
FROM compliance_security_backlog
WHERE tenant_id = :tenant_id
"""), {"tenant_id": tenant_id}).fetchone()
if rows:
d = dict(rows._mapping)
return {k: (v or 0) for k, v in d.items()}
return {"open": 0, "in_progress": 0, "resolved": 0, "accepted_risk": 0,
"critical": 0, "high": 0, "overdue": 0, "total": 0}
@router.post("", status_code=201)
async def create_security_item(
payload: SecurityItemCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Create a new security backlog item."""
row = db.execute(text("""
INSERT INTO compliance_security_backlog
(tenant_id, title, description, type, severity, status,
source, cve, cvss, affected_asset, assigned_to, due_date, remediation)
VALUES
(:tenant_id, :title, :description, :type, :severity, :status,
:source, :cve, :cvss, :affected_asset, :assigned_to, :due_date, :remediation)
RETURNING *
"""), {
"tenant_id": tenant_id,
"title": payload.title,
"description": payload.description,
"type": payload.type,
"severity": payload.severity,
"status": payload.status,
"source": payload.source,
"cve": payload.cve,
"cvss": payload.cvss,
"affected_asset": payload.affected_asset,
"assigned_to": payload.assigned_to,
"due_date": payload.due_date,
"remediation": payload.remediation,
}).fetchone()
db.commit()
return _row_to_dict(row)
@router.put("/{item_id}")
async def update_security_item(
item_id: str,
payload: SecurityItemUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Update a security backlog item."""
updates: Dict[str, Any] = {"id": item_id, "tenant_id": tenant_id, "updated_at": datetime.utcnow()}
set_clauses = ["updated_at = :updated_at"]
for field, value in payload.model_dump(exclude_unset=True).items():
updates[field] = value
set_clauses.append(f"{field} = :{field}")
if len(set_clauses) == 1:
raise HTTPException(status_code=400, detail="No fields to update")
row = db.execute(text(f"""
UPDATE compliance_security_backlog
SET {', '.join(set_clauses)}
WHERE id = :id AND tenant_id = :tenant_id
RETURNING *
"""), updates).fetchone()
db.commit()
if not row:
raise HTTPException(status_code=404, detail="Security item not found")
return _row_to_dict(row)
@router.delete("/{item_id}", status_code=204)
async def delete_security_item(
item_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
result = db.execute(text("""
DELETE FROM compliance_security_backlog
WHERE id = :id AND tenant_id = :tenant_id
"""), {"id": item_id, "tenant_id": tenant_id})
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Security item not found")