Files
breakpilot-compliance/backend-compliance/compliance/api/whistleblower_routes.py
T
Benjamin Admin c89a68e59e feat: Whistleblower backend + Scanner banner-check (last 2 gaps)
Whistleblower (HinSchG):
- Migration 118: 3 tables (reports, messages, measures) with
  HinSchG deadlines (7d acknowledgment, 3mo feedback)
- whistleblower_routes.py: 14 endpoints (CRUD, acknowledge, close,
  messages, measures, public submit, anonymous status check)
- Frontend api-operations.ts rewired from Go SDK to compliance proxy
- Access key format XXXX-XXXX-XXXX for anonymous reporters

Scanner banner-check (TTDSG § 25):
- CMP Dashboard: green "Kein Cookie-Banner erforderlich" when no
  trackers detected + no banner configured
- Red warning "Cookie-Banner fehlt!" when trackers found but no banner
- Mandatory note: Impressum (DDG § 5) + DSE (DSGVO Art. 13) still required

[migration-approved]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 00:22:18 +02:00

311 lines
12 KiB
Python

"""
FastAPI routes for Whistleblower (HinSchG) — Hinweisgeberschutz.
Admin endpoints for managing reports + public endpoint for anonymous submissions.
Deadlines: 7 days acknowledgment (§ 17 Abs. 1), 3 months feedback (§ 17 Abs. 2).
Endpoints:
GET /whistleblower/reports — list with filters
GET /whistleblower/reports/stats — counts by status/category
POST /whistleblower/reports — create report (admin)
GET /whistleblower/reports/{id} — single report with messages
PUT /whistleblower/reports/{id} — update status/priority/assignment
POST /whistleblower/reports/{id}/acknowledge — send acknowledgment
POST /whistleblower/reports/{id}/close — close report
POST /whistleblower/reports/{id}/messages — add message
GET /whistleblower/reports/{id}/measures — list measures
POST /whistleblower/reports/{id}/measures — add measure
POST /whistleblower/submit — public anonymous submission
GET /whistleblower/check/{access_key} — reporter checks status
"""
import logging
import secrets
import string
from datetime import datetime, timedelta, timezone
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from .tenant_utils import get_tenant_id as _get_tenant_id
from .db_utils import row_to_dict as _row_to_dict
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/whistleblower", tags=["whistleblower"])
VALID_CATEGORIES = {"corruption", "fraud", "data_protection", "discrimination",
"environment", "competition", "product_safety", "tax_evasion", "other"}
VALID_STATUSES = {"new", "acknowledged", "under_review", "investigation",
"measures_taken", "closed", "rejected"}
def _gen_ref(tenant_id: str, db: Session) -> str:
year = datetime.now().year
q = text("SELECT COUNT(*) FROM compliance_whistleblower_reports WHERE tenant_id = :tid")
count = db.execute(q, {"tid": tenant_id}).scalar() or 0
return f"WB-{year}-{count + 1:06d}"
def _gen_access_key() -> str:
chars = string.ascii_uppercase + string.digits
parts = [''.join(secrets.choice(chars) for _ in range(4)) for _ in range(3)]
return '-'.join(parts)
# =============================================================================
# Schemas
# =============================================================================
class ReportCreate(BaseModel):
category: str = "other"
title: str
description: str
is_anonymous: bool = True
reporter_name: Optional[str] = None
reporter_email: Optional[str] = None
reporter_phone: Optional[str] = None
priority: str = "normal"
class ReportUpdate(BaseModel):
status: Optional[str] = None
priority: Optional[str] = None
assigned_to: Optional[str] = None
category: Optional[str] = None
class MessageCreate(BaseModel):
message: str
sender_type: str = "admin"
is_internal: bool = False
class MeasureCreate(BaseModel):
title: str
description: Optional[str] = None
responsible: Optional[str] = None
due_date: Optional[str] = None
# =============================================================================
# Admin Routes
# =============================================================================
@router.get("/reports")
def list_reports(
status: Optional[str] = Query(None),
category: Optional[str] = Query(None),
limit: int = Query(50, le=200),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
where = ["tenant_id = :tid"]
params = {"tid": tenant_id, "lim": limit}
if status:
where.append("status = :st")
params["st"] = status
if category:
where.append("category = :cat")
params["cat"] = category
q = text(f"SELECT * FROM compliance_whistleblower_reports WHERE {' AND '.join(where)} ORDER BY received_at DESC LIMIT :lim")
return [_row_to_dict(r) for r in db.execute(q, params).fetchall()]
@router.get("/reports/stats")
def report_stats(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
now = datetime.now(timezone.utc)
q = text("SELECT status, COUNT(*) as cnt FROM compliance_whistleblower_reports WHERE tenant_id = :tid GROUP BY status")
by_status = {r.status: r.cnt for r in db.execute(q, {"tid": tenant_id}).fetchall()}
q2 = text("SELECT category, COUNT(*) as cnt FROM compliance_whistleblower_reports WHERE tenant_id = :tid GROUP BY category")
by_category = {r.category: r.cnt for r in db.execute(q2, {"tid": tenant_id}).fetchall()}
q3 = text("SELECT COUNT(*) FROM compliance_whistleblower_reports WHERE tenant_id = :tid AND deadline_acknowledgment < :now AND acknowledged_at IS NULL AND status = 'new'")
overdue_ack = db.execute(q3, {"tid": tenant_id, "now": now}).scalar() or 0
q4 = text("SELECT COUNT(*) FROM compliance_whistleblower_reports WHERE tenant_id = :tid AND deadline_feedback < :now AND status NOT IN ('closed', 'rejected')")
overdue_fb = db.execute(q4, {"tid": tenant_id, "now": now}).scalar() or 0
total = sum(by_status.values())
return {"total": total, "by_status": by_status, "by_category": by_category, "overdue_acknowledgment": overdue_ack, "overdue_feedback": overdue_fb}
@router.post("/reports")
def create_report(
body: ReportCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
now = datetime.now(timezone.utc)
ref = _gen_ref(tenant_id, db)
ak = _gen_access_key()
q = text("""
INSERT INTO compliance_whistleblower_reports
(tenant_id, reference_number, access_key, category, title, description,
is_anonymous, reporter_name, reporter_email, reporter_phone, priority,
received_at, deadline_acknowledgment, deadline_feedback)
VALUES (:tid, :ref, :ak, :cat, :title, :desc,
:anon, :rn, :re, :rp, :pri,
:now, :dl_ack, :dl_fb)
RETURNING *
""")
row = db.execute(q, {
"tid": tenant_id, "ref": ref, "ak": ak,
"cat": body.category, "title": body.title, "desc": body.description,
"anon": body.is_anonymous, "rn": body.reporter_name,
"re": body.reporter_email, "rp": body.reporter_phone,
"pri": body.priority, "now": now,
"dl_ack": now + timedelta(days=7),
"dl_fb": now + timedelta(days=90),
}).fetchone()
db.commit()
return _row_to_dict(row)
@router.get("/reports/{report_id}")
def get_report(
report_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
row = db.execute(text("SELECT * FROM compliance_whistleblower_reports WHERE id = :rid AND tenant_id = :tid"),
{"rid": report_id, "tid": tenant_id}).fetchone()
if not row:
raise HTTPException(404, "Report not found")
result = _row_to_dict(row)
msgs = db.execute(text("SELECT * FROM compliance_whistleblower_messages WHERE report_id = :rid ORDER BY created_at"),
{"rid": report_id}).fetchall()
result["messages"] = [_row_to_dict(m) for m in msgs]
measures = db.execute(text("SELECT * FROM compliance_whistleblower_measures WHERE report_id = :rid ORDER BY created_at"),
{"rid": report_id}).fetchall()
result["measures"] = [_row_to_dict(m) for m in measures]
return result
@router.put("/reports/{report_id}")
def update_report(
report_id: str,
body: ReportUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
sets, params = [], {"rid": report_id, "tid": tenant_id}
for field in ["status", "priority", "assigned_to", "category"]:
val = getattr(body, field, None)
if val is not None:
sets.append(f"{field} = :{field}")
params[field] = val
if not sets:
raise HTTPException(400, "No fields to update")
sets.append("updated_at = NOW()")
q = text(f"UPDATE compliance_whistleblower_reports SET {', '.join(sets)} WHERE id = :rid AND tenant_id = :tid RETURNING *")
row = db.execute(q, params).fetchone()
if not row:
raise HTTPException(404, "Report not found")
db.commit()
return _row_to_dict(row)
@router.post("/reports/{report_id}/acknowledge")
def acknowledge_report(
report_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
UPDATE compliance_whistleblower_reports
SET status = 'acknowledged', acknowledged_at = NOW(), updated_at = NOW()
WHERE id = :rid AND tenant_id = :tid RETURNING *
""")
row = db.execute(q, {"rid": report_id, "tid": tenant_id}).fetchone()
if not row:
raise HTTPException(404, "Report not found")
db.commit()
return _row_to_dict(row)
@router.post("/reports/{report_id}/close")
def close_report(
report_id: str,
reason: str = Query(""),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
UPDATE compliance_whistleblower_reports
SET status = 'closed', closed_at = NOW(), closure_reason = :reason, updated_at = NOW()
WHERE id = :rid AND tenant_id = :tid RETURNING *
""")
row = db.execute(q, {"rid": report_id, "tid": tenant_id, "reason": reason}).fetchone()
if not row:
raise HTTPException(404, "Report not found")
db.commit()
return _row_to_dict(row)
@router.post("/reports/{report_id}/messages")
def add_message(
report_id: str,
body: MessageCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
INSERT INTO compliance_whistleblower_messages (report_id, sender_type, message, is_internal)
VALUES (:rid, :st, :msg, :internal) RETURNING *
""")
row = db.execute(q, {"rid": report_id, "st": body.sender_type, "msg": body.message, "internal": body.is_internal}).fetchone()
db.commit()
return _row_to_dict(row)
@router.get("/reports/{report_id}/measures")
def list_measures(report_id: str, db: Session = Depends(get_db)):
return [_row_to_dict(r) for r in db.execute(text(
"SELECT * FROM compliance_whistleblower_measures WHERE report_id = :rid ORDER BY created_at"
), {"rid": report_id}).fetchall()]
@router.post("/reports/{report_id}/measures")
def add_measure(
report_id: str, body: MeasureCreate,
db: Session = Depends(get_db),
):
q = text("""
INSERT INTO compliance_whistleblower_measures (report_id, title, description, responsible, due_date)
VALUES (:rid, :title, :desc, :resp, :due) RETURNING *
""")
row = db.execute(q, {"rid": report_id, "title": body.title, "desc": body.description,
"resp": body.responsible, "due": body.due_date}).fetchone()
db.commit()
return _row_to_dict(row)
# =============================================================================
# Public Routes (Anonymous)
# =============================================================================
@router.post("/submit")
def submit_report(body: ReportCreate, db: Session = Depends(get_db), tenant_id: str = Depends(_get_tenant_id)):
"""Public anonymous submission — same as create but returns only access_key."""
body.is_anonymous = True
result = create_report(body, db, tenant_id)
return {"access_key": result["access_key"], "reference_number": result["reference_number"],
"message": "Ihre Meldung wurde erfolgreich eingereicht. Nutzen Sie den Zugangscode um den Status zu pruefen."}
@router.get("/check/{access_key}")
def check_status(access_key: str, db: Session = Depends(get_db), tenant_id: str = Depends(_get_tenant_id)):
"""Reporter checks status anonymously via access key."""
row = db.execute(text(
"SELECT id, reference_number, status, category, received_at, acknowledged_at FROM compliance_whistleblower_reports WHERE access_key = :ak AND tenant_id = :tid"
), {"ak": access_key, "tid": tenant_id}).fetchone()
if not row:
raise HTTPException(404, "Meldung nicht gefunden")
result = _row_to_dict(row)
msgs = db.execute(text(
"SELECT message, sender_type, created_at FROM compliance_whistleblower_messages WHERE report_id = :rid AND is_internal = FALSE ORDER BY created_at"
), {"rid": result["id"]}).fetchall()
result["messages"] = [_row_to_dict(m) for m in msgs]
return result