The monolithic compliance/db/models.py is decomposed into seven sibling
aggregate modules following the existing repo pattern (dsr_models.py,
vvt_models.py, tom_models.py, etc.):
regulation_models.py (134 LOC) — RegulationDB, RequirementDB
control_models.py (279 LOC) — ControlDB, ControlMappingDB, EvidenceDB, RiskDB
ai_system_models.py (141 LOC) — AISystemDB, AuditExportDB
service_module_models.py (176 LOC) — ServiceModuleDB, ModuleRegulationMappingDB, ModuleRiskDB
audit_session_models.py (177 LOC) — AuditSessionDB, AuditSignOffDB
isms_governance_models.py (323 LOC) — ISMSScope, Context, Policy, Objective, SoA
isms_audit_models.py (468 LOC) — AuditFinding, CAPA, ManagementReview, InternalAudit,
AuditTrail, ReadinessCheck
models.py becomes an 85-line re-export shim — every public symbol is
re-exported in dependency order so existing imports work unchanged:
from compliance.db.models import RegulationDB, ControlDB, AuditFindingDB # still works
New code SHOULD import from the aggregate module directly; the shim is
for backwards compatibility during the migration.
Schema freeze preserved:
- __tablename__ byte-identical
- Column names, types, indexes, constraints byte-identical
- relationship() string references and back_populates unchanged
- cascade directives unchanged
Verified:
- 173/173 pytest compliance/tests/ pass
- tests/contracts/test_openapi_baseline.py passes (360 paths,
484 operations — identical to baseline)
- All new sibling files under the 500-line hard cap
(largest: isms_audit_models.py at 468 LOC)
- No file in compliance/db/ now exceeds the hard cap
This is Phase 1 Step 2 from PHASE1_RUNBOOK.md. Phase 1 Step 3 (split
compliance/api/schemas.py, 1899 LOC) is the next target.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
280 lines
11 KiB
Python
280 lines
11 KiB
Python
"""
|
|
Control, Evidence, and Risk models — extracted from compliance/db/models.py.
|
|
|
|
Covers the control framework (ControlDB), requirement↔control mappings,
|
|
evidence artifacts, and the risk register. Re-exported from
|
|
``compliance.db.models`` for backwards compatibility.
|
|
|
|
DO NOT change __tablename__, column names, or relationship strings.
|
|
"""
|
|
|
|
import uuid
|
|
import enum
|
|
from datetime import datetime, date, timezone
|
|
|
|
from sqlalchemy import (
|
|
Column, String, Text, Integer, Boolean, DateTime, Date,
|
|
ForeignKey, Enum, JSON, Index,
|
|
)
|
|
from sqlalchemy.orm import relationship
|
|
|
|
from classroom_engine.database import Base
|
|
|
|
|
|
# ============================================================================
|
|
# ENUMS
|
|
# ============================================================================
|
|
|
|
class ControlTypeEnum(str, enum.Enum):
|
|
"""Type of security control."""
|
|
PREVENTIVE = "preventive" # Prevents incidents
|
|
DETECTIVE = "detective" # Detects incidents
|
|
CORRECTIVE = "corrective" # Corrects after incidents
|
|
|
|
|
|
class ControlDomainEnum(str, enum.Enum):
|
|
"""Domain/category of control."""
|
|
GOVERNANCE = "gov" # Governance & Organization
|
|
PRIVACY = "priv" # Privacy & Data Protection
|
|
IAM = "iam" # Identity & Access Management
|
|
CRYPTO = "crypto" # Cryptography & Key Management
|
|
SDLC = "sdlc" # Secure Development Lifecycle
|
|
OPS = "ops" # Operations & Monitoring
|
|
AI = "ai" # AI-specific controls
|
|
CRA = "cra" # CRA & Supply Chain
|
|
AUDIT = "aud" # Audit & Traceability
|
|
|
|
|
|
class ControlStatusEnum(str, enum.Enum):
|
|
"""Implementation status of a control."""
|
|
PASS = "pass" # Fully implemented & passing
|
|
PARTIAL = "partial" # Partially implemented
|
|
FAIL = "fail" # Not passing
|
|
NOT_APPLICABLE = "n/a" # Not applicable
|
|
PLANNED = "planned" # Planned for implementation
|
|
|
|
|
|
class RiskLevelEnum(str, enum.Enum):
|
|
"""Risk severity level."""
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
CRITICAL = "critical"
|
|
|
|
|
|
class EvidenceStatusEnum(str, enum.Enum):
|
|
"""Status of evidence artifact."""
|
|
VALID = "valid" # Currently valid
|
|
EXPIRED = "expired" # Past validity date
|
|
PENDING = "pending" # Awaiting validation
|
|
FAILED = "failed" # Failed validation
|
|
|
|
|
|
# ============================================================================
|
|
# MODELS
|
|
# ============================================================================
|
|
|
|
class ControlDB(Base):
|
|
"""
|
|
Technical or organizational security control.
|
|
|
|
Examples: PRIV-001 (Verarbeitungsverzeichnis), SDLC-001 (SAST Scanning)
|
|
"""
|
|
__tablename__ = 'compliance_controls'
|
|
|
|
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
control_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "PRIV-001"
|
|
|
|
domain = Column(Enum(ControlDomainEnum), nullable=False, index=True)
|
|
control_type = Column(Enum(ControlTypeEnum), nullable=False)
|
|
|
|
title = Column(String(300), nullable=False)
|
|
description = Column(Text)
|
|
pass_criteria = Column(Text, nullable=False) # Measurable pass criteria
|
|
implementation_guidance = Column(Text) # How to implement
|
|
|
|
# Code/Evidence references
|
|
code_reference = Column(String(500)) # e.g., "backend/middleware/pii_redactor.py:45"
|
|
documentation_url = Column(String(500)) # Link to internal docs
|
|
|
|
# Automation
|
|
is_automated = Column(Boolean, default=False)
|
|
automation_tool = Column(String(100)) # e.g., "Semgrep", "Trivy"
|
|
automation_config = Column(JSON) # Tool-specific config
|
|
|
|
# Status
|
|
status = Column(Enum(ControlStatusEnum), default=ControlStatusEnum.PLANNED)
|
|
status_notes = Column(Text)
|
|
|
|
# Ownership & Review
|
|
owner = Column(String(100)) # Responsible person/team
|
|
review_frequency_days = Column(Integer, default=90)
|
|
last_reviewed_at = Column(DateTime)
|
|
next_review_at = Column(DateTime)
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
|
|
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
|
|
|
|
# Relationships
|
|
mappings = relationship("ControlMappingDB", back_populates="control", cascade="all, delete-orphan")
|
|
evidence = relationship("EvidenceDB", back_populates="control", cascade="all, delete-orphan")
|
|
|
|
__table_args__ = (
|
|
Index('ix_control_domain_status', 'domain', 'status'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<Control {self.control_id}: {self.title}>"
|
|
|
|
|
|
class ControlMappingDB(Base):
|
|
"""
|
|
Maps requirements to controls (many-to-many with metadata).
|
|
"""
|
|
__tablename__ = 'compliance_control_mappings'
|
|
|
|
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True)
|
|
control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True)
|
|
|
|
coverage_level = Column(String(20), default="full") # "full", "partial", "planned"
|
|
notes = Column(Text) # Explanation of coverage
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
|
|
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
|
|
|
|
# Relationships
|
|
requirement = relationship("RequirementDB", back_populates="control_mappings")
|
|
control = relationship("ControlDB", back_populates="mappings")
|
|
|
|
__table_args__ = (
|
|
Index('ix_mapping_req_ctrl', 'requirement_id', 'control_id', unique=True),
|
|
)
|
|
|
|
|
|
class EvidenceDB(Base):
|
|
"""
|
|
Audit evidence for controls.
|
|
|
|
Types: scan_report, policy_document, config_snapshot, test_result,
|
|
manual_upload, screenshot, external_link
|
|
"""
|
|
__tablename__ = 'compliance_evidence'
|
|
|
|
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True)
|
|
|
|
evidence_type = Column(String(50), nullable=False) # Type of evidence
|
|
title = Column(String(300), nullable=False)
|
|
description = Column(Text)
|
|
|
|
# File/Link storage
|
|
artifact_path = Column(String(500)) # Local file path
|
|
artifact_url = Column(String(500)) # External URL
|
|
artifact_hash = Column(String(64)) # SHA-256 hash
|
|
file_size_bytes = Column(Integer)
|
|
mime_type = Column(String(100))
|
|
|
|
# Validity period
|
|
valid_from = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc))
|
|
valid_until = Column(DateTime) # NULL = no expiry
|
|
status = Column(Enum(EvidenceStatusEnum), default=EvidenceStatusEnum.VALID)
|
|
|
|
# Source tracking
|
|
source = Column(String(100)) # "ci_pipeline", "manual", "api"
|
|
ci_job_id = Column(String(100)) # CI/CD job reference
|
|
uploaded_by = Column(String(100)) # User who uploaded
|
|
|
|
# Timestamps
|
|
collected_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
|
|
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
|
|
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
|
|
|
|
# Relationships
|
|
control = relationship("ControlDB", back_populates="evidence")
|
|
|
|
__table_args__ = (
|
|
Index('ix_evidence_control_type', 'control_id', 'evidence_type'),
|
|
Index('ix_evidence_status', 'status'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<Evidence {self.evidence_type}: {self.title}>"
|
|
|
|
|
|
class RiskDB(Base):
|
|
"""
|
|
Risk register entry with likelihood x impact scoring.
|
|
"""
|
|
__tablename__ = 'compliance_risks'
|
|
|
|
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
|
|
risk_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "RISK-001"
|
|
|
|
title = Column(String(300), nullable=False)
|
|
description = Column(Text)
|
|
category = Column(String(50), nullable=False) # "data_breach", "compliance_gap", etc.
|
|
|
|
# Inherent risk (before controls)
|
|
likelihood = Column(Integer, nullable=False) # 1-5
|
|
impact = Column(Integer, nullable=False) # 1-5
|
|
inherent_risk = Column(Enum(RiskLevelEnum), nullable=False)
|
|
|
|
# Mitigating controls
|
|
mitigating_controls = Column(JSON) # List of control_ids
|
|
|
|
# Residual risk (after controls)
|
|
residual_likelihood = Column(Integer)
|
|
residual_impact = Column(Integer)
|
|
residual_risk = Column(Enum(RiskLevelEnum))
|
|
|
|
# Management
|
|
owner = Column(String(100))
|
|
status = Column(String(20), default="open") # "open", "mitigated", "accepted", "transferred"
|
|
treatment_plan = Column(Text)
|
|
|
|
# Review
|
|
identified_date = Column(Date, default=date.today)
|
|
review_date = Column(Date)
|
|
last_assessed_at = Column(DateTime)
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
|
|
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
|
|
|
|
__table_args__ = (
|
|
Index('ix_risk_category_status', 'category', 'status'),
|
|
Index('ix_risk_inherent', 'inherent_risk'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<Risk {self.risk_id}: {self.title}>"
|
|
|
|
@staticmethod
|
|
def calculate_risk_level(likelihood: int, impact: int) -> RiskLevelEnum:
|
|
"""Calculate risk level from likelihood x impact matrix."""
|
|
score = likelihood * impact
|
|
if score >= 20:
|
|
return RiskLevelEnum.CRITICAL
|
|
elif score >= 12:
|
|
return RiskLevelEnum.HIGH
|
|
elif score >= 6:
|
|
return RiskLevelEnum.MEDIUM
|
|
else:
|
|
return RiskLevelEnum.LOW
|
|
|
|
|
|
__all__ = [
|
|
"ControlTypeEnum",
|
|
"ControlDomainEnum",
|
|
"ControlStatusEnum",
|
|
"RiskLevelEnum",
|
|
"EvidenceStatusEnum",
|
|
"ControlDB",
|
|
"ControlMappingDB",
|
|
"EvidenceDB",
|
|
"RiskDB",
|
|
]
|