Files
breakpilot-compliance/backend-compliance/compliance/services/control_suppression.py
T
Benjamin Admin 7aabfbe5b5 feat(controls): Mandanten-Suppression — per-tenant Applicability-Override
Geteilte Schicht für alle Surfaces (Workspace-Anwälte, Cyber-Risiko-Projekt,
Admin): ein Mandant markiert ein Control als "nicht anwendbar" → in seinen
Use-Case-Ansichten (und künftig Repo-Scans) ausgeblendet.

- Migration 156: compliance.control_suppressions (PK tenant_id+control_uuid),
  reversibel (active + reverted_*), auditierbar (actor/reason/created_at).
  [migration-approved]
- Service control_suppression: suppress/revert/list_suppressions +
  suppressed_control_uuids (geteilter Filter).
- Routes: GET/POST /v1/controls/suppressions + POST .../{uuid}/revert (X-Tenant-ID).
- controls_for_use_case: optionaler X-Tenant-ID + include_suppressed; suppressed
  per Default versteckt (nie gelöscht), suppressed_count, suppressed-Flag pro
  Control. Agenten/CRA ohne Tenant unberührt.
- Tests: Request-Validierung + import-safety (E2E-Zyklus gegen macmini bewiesen).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 16:35:38 +02:00

85 lines
3.5 KiB
Python

"""Per-tenant control suppression (Applicability-Override). A tenant marks a
control as 'not applicable' → it is hidden from that tenant's use-case views and
(via suppressed_control_uuids) future repo scans. Reversible (active flag +
reverted_*); auditable (actor + reason + created_at answer "why not checked?").
See migration 156. Writes commit explicitly (request-scoped session)."""
from typing import Any, Optional
from sqlalchemy import text
from sqlalchemy.orm import Session
def suppress(
db: Session, tenant_id: str, control_uuid: str,
reason: Optional[str] = None, actor: Optional[str] = None,
) -> dict[str, Any]:
"""Mark a control not-applicable for a tenant (idempotent; reactivates a
previously reverted suppression)."""
db.execute(text(
"INSERT INTO compliance.control_suppressions "
"(tenant_id, control_uuid, reason, actor, active, created_at, updated_at) "
"VALUES (:t, :c, :r, :a, TRUE, now(), now()) "
"ON CONFLICT (tenant_id, control_uuid) DO UPDATE SET "
"active=TRUE, reason=EXCLUDED.reason, actor=EXCLUDED.actor, "
"reverted_at=NULL, reverted_by=NULL, revert_reason=NULL, updated_at=now()"
), {"t": tenant_id, "c": control_uuid, "r": reason, "a": actor})
db.commit()
return {"tenant_id": tenant_id, "control_uuid": control_uuid,
"active": True, "reason": reason, "actor": actor}
def revert(
db: Session, tenant_id: str, control_uuid: str,
actor: Optional[str] = None, reason: Optional[str] = None,
) -> bool:
"""Undo a suppression (keeps the row for audit). True if one was active."""
res = db.execute(text(
"UPDATE compliance.control_suppressions "
"SET active=FALSE, reverted_at=now(), reverted_by=:a, revert_reason=:r, "
"updated_at=now() "
"WHERE tenant_id=:t AND control_uuid=:c AND active=TRUE"
), {"t": tenant_id, "c": control_uuid, "a": actor, "r": reason})
db.commit()
return (res.rowcount or 0) > 0
def list_suppressions(
db: Session, tenant_id: str, include_reverted: bool = False,
) -> list[dict[str, Any]]:
"""A tenant's suppressions (active by default) — the audit list answering
'what did we exclude, by whom, when and why?'."""
rows = db.execute(text(
"SELECT cs.control_uuid::text, cs.reason, cs.actor, cs.active, "
"cs.reverted_at, cs.reverted_by, cs.created_at, "
"cc.control_id, cc.title "
"FROM compliance.control_suppressions cs "
"JOIN canonical_controls cc ON cc.id = cs.control_uuid "
"WHERE cs.tenant_id = :t AND (:incl = TRUE OR cs.active = TRUE) "
"ORDER BY cs.updated_at DESC"
), {"t": tenant_id, "incl": include_reverted}).fetchall()
return [
{
"control_uuid": r[0], "reason": r[1], "actor": r[2],
"active": bool(r[3]),
"reverted_at": r[4].isoformat() if r[4] else None,
"reverted_by": r[5],
"created_at": r[6].isoformat() if r[6] else None,
"control_id": r[7], "title": r[8],
}
for r in rows
]
def suppressed_control_uuids(db: Session, tenant_id: Optional[str]) -> set[str]:
"""Active-suppressed control UUIDs for a tenant — shared filter for the
use-case views AND repo scans. Empty when no tenant (agent/CRA path)."""
if not tenant_id:
return set()
return {
str(r[0]) for r in db.execute(text(
"SELECT control_uuid FROM compliance.control_suppressions "
"WHERE tenant_id = :t AND active = TRUE"
), {"t": tenant_id}).fetchall()
}