"""Per-tenant control suppression (Applicability-Override). A tenant marks a control as 'not applicable' → it is hidden from that tenant's use-case views and (via suppressed_control_uuids) future repo scans. Reversible (active flag + reverted_*); auditable (actor + reason + created_at answer "why not checked?"). See migration 156. Writes commit explicitly (request-scoped session).""" from typing import Any, Optional from sqlalchemy import text from sqlalchemy.orm import Session def suppress( db: Session, tenant_id: str, control_uuid: str, reason: Optional[str] = None, actor: Optional[str] = None, ) -> dict[str, Any]: """Mark a control not-applicable for a tenant (idempotent; reactivates a previously reverted suppression).""" db.execute(text( "INSERT INTO compliance.control_suppressions " "(tenant_id, control_uuid, reason, actor, active, created_at, updated_at) " "VALUES (:t, :c, :r, :a, TRUE, now(), now()) " "ON CONFLICT (tenant_id, control_uuid) DO UPDATE SET " "active=TRUE, reason=EXCLUDED.reason, actor=EXCLUDED.actor, " "reverted_at=NULL, reverted_by=NULL, revert_reason=NULL, updated_at=now()" ), {"t": tenant_id, "c": control_uuid, "r": reason, "a": actor}) db.commit() return {"tenant_id": tenant_id, "control_uuid": control_uuid, "active": True, "reason": reason, "actor": actor} def revert( db: Session, tenant_id: str, control_uuid: str, actor: Optional[str] = None, reason: Optional[str] = None, ) -> bool: """Undo a suppression (keeps the row for audit). True if one was active.""" res = db.execute(text( "UPDATE compliance.control_suppressions " "SET active=FALSE, reverted_at=now(), reverted_by=:a, revert_reason=:r, " "updated_at=now() " "WHERE tenant_id=:t AND control_uuid=:c AND active=TRUE" ), {"t": tenant_id, "c": control_uuid, "a": actor, "r": reason}) db.commit() return (res.rowcount or 0) > 0 def list_suppressions( db: Session, tenant_id: str, include_reverted: bool = False, ) -> list[dict[str, Any]]: """A tenant's suppressions (active by default) — the audit list answering 'what did we exclude, by whom, when and why?'.""" rows = db.execute(text( "SELECT cs.control_uuid::text, cs.reason, cs.actor, cs.active, " "cs.reverted_at, cs.reverted_by, cs.created_at, " "cc.control_id, cc.title " "FROM compliance.control_suppressions cs " "JOIN canonical_controls cc ON cc.id = cs.control_uuid " "WHERE cs.tenant_id = :t AND (:incl = TRUE OR cs.active = TRUE) " "ORDER BY cs.updated_at DESC" ), {"t": tenant_id, "incl": include_reverted}).fetchall() return [ { "control_uuid": r[0], "reason": r[1], "actor": r[2], "active": bool(r[3]), "reverted_at": r[4].isoformat() if r[4] else None, "reverted_by": r[5], "created_at": r[6].isoformat() if r[6] else None, "control_id": r[7], "title": r[8], } for r in rows ] def suppressed_control_uuids(db: Session, tenant_id: Optional[str]) -> set[str]: """Active-suppressed control UUIDs for a tenant — shared filter for the use-case views AND repo scans. Empty when no tenant (agent/CRA path).""" if not tenant_id: return set() return { str(r[0]) for r in db.execute(text( "SELECT control_uuid FROM compliance.control_suppressions " "WHERE tenant_id = :t AND active = TRUE" ), {"t": tenant_id}).fetchall() }