feat: Rollenkonzept backend + SOP template (Phase 1-3)

- Migration 111: 3 new tables (org_roles, document_reviews, document_role_mapping)
  with seed data mapping all 71 doc types to 7 compliance roles
- org_role_routes.py: CRUD for roles, seed defaults, test email, mapping API
- document_review_routes.py: Review lifecycle (create→send→approve/reject)
  with approval notification to all affected roles
- Migration 112: SOP template (ISO 9001 structure, 21 placeholders)
- Added standard_operating_procedure to TemplateType, doc-labels, presets

[migration-approved]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-03 13:03:38 +02:00
parent ce52dd153e
commit 9b4be663f7
8 changed files with 892 additions and 1 deletions
@@ -63,6 +63,8 @@ _ROUTER_MODULES = [
"tom_mapping_routes",
"llm_audit_routes",
"assertion_routes",
"org_role_routes",
"document_review_routes",
]
_loaded_count = 0
@@ -0,0 +1,312 @@
"""
FastAPI routes for Document Review Workflow.
Tracks which compliance documents have been sent for review, their status,
and handles email notifications to reviewers.
Endpoints:
GET /document-reviews — list reviews with filters
GET /document-reviews/stats — counts by status
POST /document-reviews — create review (auto-assign from mapping)
GET /document-reviews/{id} — single review
POST /document-reviews/{id}/send — send notification email
POST /document-reviews/{id}/approve — mark as approved
POST /document-reviews/{id}/reject — mark as rejected
GET /document-reviews/for-document — reviews for a specific doc type
"""
import hashlib
import logging
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from .tenant_utils import get_tenant_id as _get_tenant_id
from .db_utils import row_to_dict as _row_to_dict
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/document-reviews", tags=["document-reviews"])
# =============================================================================
# Schemas
# =============================================================================
class ReviewCreate(BaseModel):
document_type: str
document_title: str
document_content: Optional[str] = None
project_id: Optional[str] = None
submitted_by: Optional[str] = None
review_link: Optional[str] = None
class ReviewReject(BaseModel):
comment: str
# =============================================================================
# Routes
# =============================================================================
@router.get("")
def list_reviews(
project_id: Optional[str] = Query(None),
status: Optional[str] = Query(None),
document_type: Optional[str] = Query(None),
reviewer_role_key: Optional[str] = Query(None),
limit: int = Query(50, le=200),
offset: int = Query(0),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
where = ["tenant_id = :tid"]
params = {"tid": tenant_id, "lim": limit, "off": offset}
if project_id:
where.append("project_id = :pid")
params["pid"] = project_id
if status:
where.append("status = :status")
params["status"] = status
if document_type:
where.append("document_type = :dt")
params["dt"] = document_type
if reviewer_role_key:
where.append("reviewer_role_key = :rrk")
params["rrk"] = reviewer_role_key
q = text(f"""
SELECT * FROM compliance_document_reviews
WHERE {' AND '.join(where)}
ORDER BY created_at DESC LIMIT :lim OFFSET :off
""")
rows = db.execute(q, params).fetchall()
return [_row_to_dict(r) for r in rows]
@router.get("/stats")
def review_stats(
project_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
where = "tenant_id = :tid"
params = {"tid": tenant_id}
if project_id:
where += " AND project_id = :pid"
params["pid"] = project_id
q = text(f"SELECT status, COUNT(*) as count FROM compliance_document_reviews WHERE {where} GROUP BY status")
rows = db.execute(q, params).fetchall()
return {r.status: r.count for r in rows}
@router.get("/for-document")
def reviews_for_document(
document_type: str = Query(...),
project_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
where = "tenant_id = :tid AND document_type = :dt"
params = {"tid": tenant_id, "dt": document_type}
if project_id:
where += " AND project_id = :pid"
params["pid"] = project_id
q = text(f"SELECT * FROM compliance_document_reviews WHERE {where} ORDER BY created_at DESC LIMIT 10")
rows = db.execute(q, params).fetchall()
return [_row_to_dict(r) for r in rows]
@router.post("")
def create_review(
body: ReviewCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
# Find reviewer(s) from mapping + org_roles
q = text("""
SELECT m.role_key, m.is_primary, r.person_name, r.person_email, r.role_label
FROM compliance_document_role_mapping m
LEFT JOIN compliance_org_roles r
ON r.tenant_id = m.tenant_id AND r.role_key = m.role_key
AND (r.project_id = :pid OR r.project_id IS NULL)
WHERE m.tenant_id = :tid AND m.document_type = :dt
ORDER BY m.is_primary DESC
""")
mappings = db.execute(q, {"tid": tenant_id, "dt": body.document_type, "pid": body.project_id}).fetchall()
if not mappings:
raise HTTPException(404, f"No reviewer mapping found for document type '{body.document_type}'")
content_hash = hashlib.sha256(body.document_content.encode()).hexdigest() if body.document_content else None
created = []
for m in mappings:
m_dict = _row_to_dict(m)
ins = text("""
INSERT INTO compliance_document_reviews
(tenant_id, project_id, document_type, document_title, document_content_hash,
reviewer_role_key, reviewer_name, reviewer_email, submitted_by, review_link, submitted_at)
VALUES (:tid, :pid, :dt, :title, :hash, :rrk, :rn, :re, :sb, :rl, NOW())
RETURNING *
""")
row = db.execute(ins, {
"tid": tenant_id, "pid": body.project_id, "dt": body.document_type,
"title": body.document_title, "hash": content_hash,
"rrk": m_dict["role_key"], "rn": m_dict.get("person_name"),
"re": m_dict.get("person_email"), "sb": body.submitted_by,
"rl": body.review_link,
}).fetchone()
created.append(_row_to_dict(row))
db.commit()
return created
@router.get("/{review_id}")
def get_review(
review_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("SELECT * FROM compliance_document_reviews WHERE id = :rid AND tenant_id = :tid")
row = db.execute(q, {"rid": review_id, "tid": tenant_id}).fetchone()
if not row:
raise HTTPException(404, "Review not found")
return _row_to_dict(row)
@router.post("/{review_id}/send")
def send_notification(
review_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("SELECT * FROM compliance_document_reviews WHERE id = :rid AND tenant_id = :tid")
row = db.execute(q, {"rid": review_id, "tid": tenant_id}).fetchone()
if not row:
raise HTTPException(404, "Review not found")
review = _row_to_dict(row)
if not review.get("reviewer_email"):
raise HTTPException(400, "No email for reviewer — assign a person to this role first")
try:
from compliance.services.smtp_sender import send_email
result = send_email(
recipient=review["reviewer_email"],
subject=f"[BreakPilot] Dokument zur Pruefung: {review['document_title']}",
body_html=f"""
<h2>Dokument zur Pruefung</h2>
<p>Sehr geehrte/r <strong>{review.get('reviewer_name') or 'Pruefer/in'}</strong>,</p>
<p>das folgende Dokument wurde Ihnen zur inhaltlichen Pruefung zugewiesen:</p>
<table style="border-collapse:collapse;margin:16px 0;">
<tr><td style="padding:4px 12px 4px 0;font-weight:bold;">Dokument:</td>
<td>{review['document_title']}</td></tr>
<tr><td style="padding:4px 12px 4px 0;font-weight:bold;">Typ:</td>
<td>{review['document_type']}</td></tr>
<tr><td style="padding:4px 12px 4px 0;font-weight:bold;">Eingereicht von:</td>
<td>{review.get('submitted_by') or 'System'}</td></tr>
</table>
<p>Bitte pruefen Sie das Dokument auf <strong>inhaltliche Richtigkeit</strong>,
<strong>Vollstaendigkeit</strong> und <strong>Umsetzbarkeit</strong>.</p>
{f'<p><a href="{review["review_link"]}" style="background:#7c3aed;color:white;padding:10px 20px;border-radius:6px;text-decoration:none;">Dokument oeffnen</a></p>' if review.get("review_link") else ''}
<p style="color:#888;font-size:12px;">BreakPilot Compliance SDK</p>
""",
)
# Update review status
db.execute(text("""
UPDATE compliance_document_reviews
SET status = 'in_review', email_sent = TRUE, email_sent_at = NOW(), updated_at = NOW()
WHERE id = :rid
"""), {"rid": review_id})
db.commit()
return {"sent": True, "email": review["reviewer_email"], "result": result}
except Exception as e:
logger.error("Failed to send review email: %s", e)
raise HTTPException(500, f"Email sending failed: {e}")
@router.post("/{review_id}/approve")
def approve_review(
review_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
UPDATE compliance_document_reviews
SET status = 'approved', reviewed_at = NOW(), updated_at = NOW()
WHERE id = :rid AND tenant_id = :tid
RETURNING *
""")
row = db.execute(q, {"rid": review_id, "tid": tenant_id}).fetchone()
if not row:
raise HTTPException(404, "Review not found")
db.commit()
review = _row_to_dict(row)
# Notify all OTHER roles mapped to this document type about the approval
_notify_approval(db, tenant_id, review)
return review
def _notify_approval(db: Session, tenant_id: str, review: dict):
"""Send approval notification to all other roles mapped to this document type."""
try:
from compliance.services.smtp_sender import send_email
q = text("""
SELECT DISTINCT r.person_name, r.person_email, r.role_label
FROM compliance_document_role_mapping m
JOIN compliance_org_roles r
ON r.tenant_id = m.tenant_id AND r.role_key = m.role_key
AND (r.project_id = :pid OR r.project_id IS NULL)
WHERE m.tenant_id = :tid AND m.document_type = :dt
AND m.role_key != :reviewer_key AND r.person_email IS NOT NULL
""")
others = db.execute(q, {
"tid": tenant_id, "dt": review["document_type"],
"pid": review.get("project_id"), "reviewer_key": review["reviewer_role_key"],
}).fetchall()
for other in others:
o = _row_to_dict(other)
send_email(
recipient=o["person_email"],
subject=f"[BreakPilot] Freigabe: {review['document_title']}",
body_html=f"""
<h2>Dokument freigegeben</h2>
<p>Sehr geehrte/r <strong>{o.get('person_name') or o['role_label']}</strong>,</p>
<p>das Dokument <strong>{review['document_title']}</strong> wurde von
{review.get('reviewer_name') or review['reviewer_role_key']} freigegeben.</p>
<p>Bitte pruefen Sie, ob fuer Ihren Verantwortungsbereich Handlungsbedarf besteht
(z.B. Schulungsbedarf, Prozessanpassungen).</p>
<p style="color:#888;font-size:12px;">BreakPilot Compliance SDK</p>
""",
)
logger.info("Notified %d other roles about approval of %s", len(others), review["document_title"])
except Exception as e:
logger.warning("Approval notification failed (non-blocking): %s", e)
@router.post("/{review_id}/reject")
def reject_review(
review_id: str,
body: ReviewReject,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
UPDATE compliance_document_reviews
SET status = 'rejected', reviewed_at = NOW(), review_comment = :comment, updated_at = NOW()
WHERE id = :rid AND tenant_id = :tid
RETURNING *
""")
row = db.execute(q, {"rid": review_id, "tid": tenant_id, "comment": body.comment}).fetchone()
if not row:
raise HTTPException(404, "Review not found")
db.commit()
return _row_to_dict(row)
@@ -0,0 +1,255 @@
"""
FastAPI routes for Organizational Compliance Roles.
Manages the 7 standard compliance roles (DSB, GF, IT-Leiter, etc.)
and the document-to-role mapping that determines who reviews which documents.
Endpoints:
GET /org-roles — list roles for tenant/project
POST /org-roles — create/upsert a role
PUT /org-roles/{id} — update role details
DELETE /org-roles/{id} — remove a role
GET /org-roles/defaults — 7 standard role definitions
POST /org-roles/seed — seed default roles for a project
POST /org-roles/{id}/send-test — send test email to role
GET /org-roles/mapping — document-to-role mapping
PUT /org-roles/mapping — update mapping
"""
import logging
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from .tenant_utils import get_tenant_id as _get_tenant_id
from .db_utils import row_to_dict as _row_to_dict
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/org-roles", tags=["org-roles"])
# =============================================================================
# Standard role definitions
# =============================================================================
DEFAULT_ROLES = [
{"role_key": "dsb", "role_label": "Datenschutzbeauftragter (DSB)"},
{"role_key": "gf", "role_label": "Geschaeftsfuehrung"},
{"role_key": "it_leiter", "role_label": "IT-Leiter / CISO"},
{"role_key": "hr_leitung", "role_label": "HR-Leitung"},
{"role_key": "marketing_leitung", "role_label": "Marketing-Leitung"},
{"role_key": "compliance_beauftragter", "role_label": "Compliance-Beauftragter"},
{"role_key": "einkauf", "role_label": "Einkauf / Vendor Management"},
]
# =============================================================================
# Schemas
# =============================================================================
class OrgRoleCreate(BaseModel):
role_key: str
role_label: str
person_name: Optional[str] = None
person_email: Optional[str] = None
department: Optional[str] = None
project_id: Optional[str] = None
class OrgRoleUpdate(BaseModel):
role_label: Optional[str] = None
person_name: Optional[str] = None
person_email: Optional[str] = None
department: Optional[str] = None
is_active: Optional[bool] = None
class MappingEntry(BaseModel):
document_type: str
role_key: str
is_primary: bool = True
class MappingUpdate(BaseModel):
entries: List[MappingEntry]
# =============================================================================
# Routes
# =============================================================================
@router.get("")
def list_roles(
project_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
SELECT * FROM compliance_org_roles
WHERE tenant_id = :tid AND (project_id = :pid OR (:pid IS NULL AND project_id IS NULL))
ORDER BY role_key
""")
rows = db.execute(q, {"tid": tenant_id, "pid": project_id}).fetchall()
return [_row_to_dict(r) for r in rows]
@router.get("/defaults")
def get_defaults():
return DEFAULT_ROLES
@router.post("")
def create_role(
body: OrgRoleCreate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
INSERT INTO compliance_org_roles (tenant_id, project_id, role_key, role_label, person_name, person_email, department)
VALUES (:tid, :pid, :rk, :rl, :pn, :pe, :dept)
ON CONFLICT (tenant_id, project_id, role_key) DO UPDATE
SET role_label = EXCLUDED.role_label,
person_name = COALESCE(EXCLUDED.person_name, compliance_org_roles.person_name),
person_email = COALESCE(EXCLUDED.person_email, compliance_org_roles.person_email),
department = COALESCE(EXCLUDED.department, compliance_org_roles.department),
updated_at = NOW()
RETURNING *
""")
row = db.execute(q, {
"tid": tenant_id, "pid": body.project_id, "rk": body.role_key,
"rl": body.role_label, "pn": body.person_name, "pe": body.person_email,
"dept": body.department,
}).fetchone()
db.commit()
return _row_to_dict(row)
@router.put("/{role_id}")
def update_role(
role_id: str,
body: OrgRoleUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
sets, params = [], {"rid": role_id, "tid": tenant_id}
for field in ["role_label", "person_name", "person_email", "department", "is_active"]:
val = getattr(body, field, None)
if val is not None:
sets.append(f"{field} = :{field}")
params[field] = val
if not sets:
raise HTTPException(400, "No fields to update")
sets.append("updated_at = NOW()")
q = text(f"UPDATE compliance_org_roles SET {', '.join(sets)} WHERE id = :rid AND tenant_id = :tid RETURNING *")
row = db.execute(q, params).fetchone()
if not row:
raise HTTPException(404, "Role not found")
db.commit()
return _row_to_dict(row)
@router.delete("/{role_id}")
def delete_role(
role_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("DELETE FROM compliance_org_roles WHERE id = :rid AND tenant_id = :tid")
result = db.execute(q, {"rid": role_id, "tid": tenant_id})
db.commit()
if result.rowcount == 0:
raise HTTPException(404, "Role not found")
return {"deleted": True}
@router.post("/seed")
def seed_roles(
project_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
created = 0
for role in DEFAULT_ROLES:
q = text("""
INSERT INTO compliance_org_roles (tenant_id, project_id, role_key, role_label)
VALUES (:tid, :pid, :rk, :rl)
ON CONFLICT (tenant_id, project_id, role_key) DO NOTHING
""")
result = db.execute(q, {"tid": tenant_id, "pid": project_id, "rk": role["role_key"], "rl": role["role_label"]})
created += result.rowcount
db.commit()
return {"seeded": created, "total": len(DEFAULT_ROLES)}
@router.post("/{role_id}/send-test")
def send_test_email(
role_id: str,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("SELECT * FROM compliance_org_roles WHERE id = :rid AND tenant_id = :tid")
role = db.execute(q, {"rid": role_id, "tid": tenant_id}).fetchone()
if not role:
raise HTTPException(404, "Role not found")
role_dict = _row_to_dict(role)
if not role_dict.get("person_email"):
raise HTTPException(400, "No email configured for this role")
try:
from compliance.services.smtp_sender import send_email
result = send_email(
recipient=role_dict["person_email"],
subject=f"[BreakPilot] Test-E-Mail fuer {role_dict['role_label']}",
body_html=f"""
<h2>Test-E-Mail</h2>
<p>Diese E-Mail bestaetigt, dass die Zustellung an die Rolle
<strong>{role_dict['role_label']}</strong> funktioniert.</p>
<p>Empfaenger: {role_dict['person_name'] or 'N/A'} ({role_dict['person_email']})</p>
<p style="color:#888;font-size:12px;">Gesendet von BreakPilot Compliance SDK</p>
""",
)
return {"sent": True, "email": role_dict["person_email"], "result": result}
except Exception as e:
logger.error("Failed to send test email: %s", e)
raise HTTPException(500, f"Email sending failed: {e}")
# =============================================================================
# Document-to-Role Mapping
# =============================================================================
@router.get("/mapping")
def get_mapping(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
q = text("""
SELECT * FROM compliance_document_role_mapping
WHERE tenant_id = :tid
ORDER BY document_type, role_key
""")
rows = db.execute(q, {"tid": tenant_id}).fetchall()
return [_row_to_dict(r) for r in rows]
@router.put("/mapping")
def update_mapping(
body: MappingUpdate,
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
for entry in body.entries:
q = text("""
INSERT INTO compliance_document_role_mapping (tenant_id, document_type, role_key, is_primary)
VALUES (:tid, :dt, :rk, :ip)
ON CONFLICT (tenant_id, document_type, role_key) DO UPDATE
SET is_primary = EXCLUDED.is_primary
""")
db.execute(q, {"tid": tenant_id, "dt": entry.document_type, "rk": entry.role_key, "ip": entry.is_primary})
db.commit()
return {"updated": len(body.entries)}