7aabfbe5b5
Geteilte Schicht für alle Surfaces (Workspace-Anwälte, Cyber-Risiko-Projekt,
Admin): ein Mandant markiert ein Control als "nicht anwendbar" → in seinen
Use-Case-Ansichten (und künftig Repo-Scans) ausgeblendet.
- Migration 156: compliance.control_suppressions (PK tenant_id+control_uuid),
reversibel (active + reverted_*), auditierbar (actor/reason/created_at).
[migration-approved]
- Service control_suppression: suppress/revert/list_suppressions +
suppressed_control_uuids (geteilter Filter).
- Routes: GET/POST /v1/controls/suppressions + POST .../{uuid}/revert (X-Tenant-ID).
- controls_for_use_case: optionaler X-Tenant-ID + include_suppressed; suppressed
per Default versteckt (nie gelöscht), suppressed_count, suppressed-Flag pro
Control. Agenten/CRA ohne Tenant unberührt.
- Tests: Request-Validierung + import-safety (E2E-Zyklus gegen macmini bewiesen).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
85 lines
3.5 KiB
Python
85 lines
3.5 KiB
Python
"""Per-tenant control suppression (Applicability-Override). A tenant marks a
|
|
control as 'not applicable' → it is hidden from that tenant's use-case views and
|
|
(via suppressed_control_uuids) future repo scans. Reversible (active flag +
|
|
reverted_*); auditable (actor + reason + created_at answer "why not checked?").
|
|
See migration 156. Writes commit explicitly (request-scoped session)."""
|
|
|
|
from typing import Any, Optional
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
def suppress(
|
|
db: Session, tenant_id: str, control_uuid: str,
|
|
reason: Optional[str] = None, actor: Optional[str] = None,
|
|
) -> dict[str, Any]:
|
|
"""Mark a control not-applicable for a tenant (idempotent; reactivates a
|
|
previously reverted suppression)."""
|
|
db.execute(text(
|
|
"INSERT INTO compliance.control_suppressions "
|
|
"(tenant_id, control_uuid, reason, actor, active, created_at, updated_at) "
|
|
"VALUES (:t, :c, :r, :a, TRUE, now(), now()) "
|
|
"ON CONFLICT (tenant_id, control_uuid) DO UPDATE SET "
|
|
"active=TRUE, reason=EXCLUDED.reason, actor=EXCLUDED.actor, "
|
|
"reverted_at=NULL, reverted_by=NULL, revert_reason=NULL, updated_at=now()"
|
|
), {"t": tenant_id, "c": control_uuid, "r": reason, "a": actor})
|
|
db.commit()
|
|
return {"tenant_id": tenant_id, "control_uuid": control_uuid,
|
|
"active": True, "reason": reason, "actor": actor}
|
|
|
|
|
|
def revert(
|
|
db: Session, tenant_id: str, control_uuid: str,
|
|
actor: Optional[str] = None, reason: Optional[str] = None,
|
|
) -> bool:
|
|
"""Undo a suppression (keeps the row for audit). True if one was active."""
|
|
res = db.execute(text(
|
|
"UPDATE compliance.control_suppressions "
|
|
"SET active=FALSE, reverted_at=now(), reverted_by=:a, revert_reason=:r, "
|
|
"updated_at=now() "
|
|
"WHERE tenant_id=:t AND control_uuid=:c AND active=TRUE"
|
|
), {"t": tenant_id, "c": control_uuid, "a": actor, "r": reason})
|
|
db.commit()
|
|
return (res.rowcount or 0) > 0
|
|
|
|
|
|
def list_suppressions(
|
|
db: Session, tenant_id: str, include_reverted: bool = False,
|
|
) -> list[dict[str, Any]]:
|
|
"""A tenant's suppressions (active by default) — the audit list answering
|
|
'what did we exclude, by whom, when and why?'."""
|
|
rows = db.execute(text(
|
|
"SELECT cs.control_uuid::text, cs.reason, cs.actor, cs.active, "
|
|
"cs.reverted_at, cs.reverted_by, cs.created_at, "
|
|
"cc.control_id, cc.title "
|
|
"FROM compliance.control_suppressions cs "
|
|
"JOIN canonical_controls cc ON cc.id = cs.control_uuid "
|
|
"WHERE cs.tenant_id = :t AND (:incl = TRUE OR cs.active = TRUE) "
|
|
"ORDER BY cs.updated_at DESC"
|
|
), {"t": tenant_id, "incl": include_reverted}).fetchall()
|
|
return [
|
|
{
|
|
"control_uuid": r[0], "reason": r[1], "actor": r[2],
|
|
"active": bool(r[3]),
|
|
"reverted_at": r[4].isoformat() if r[4] else None,
|
|
"reverted_by": r[5],
|
|
"created_at": r[6].isoformat() if r[6] else None,
|
|
"control_id": r[7], "title": r[8],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def suppressed_control_uuids(db: Session, tenant_id: Optional[str]) -> set[str]:
|
|
"""Active-suppressed control UUIDs for a tenant — shared filter for the
|
|
use-case views AND repo scans. Empty when no tenant (agent/CRA path)."""
|
|
if not tenant_id:
|
|
return set()
|
|
return {
|
|
str(r[0]) for r in db.execute(text(
|
|
"SELECT control_uuid FROM compliance.control_suppressions "
|
|
"WHERE tenant_id = :t AND active = TRUE"
|
|
), {"t": tenant_id}).fetchall()
|
|
}
|