diff --git a/backend-compliance/compliance/db/ai_system_models.py b/backend-compliance/compliance/db/ai_system_models.py new file mode 100644 index 0000000..d6ca744 --- /dev/null +++ b/backend-compliance/compliance/db/ai_system_models.py @@ -0,0 +1,141 @@ +""" +AI System & Audit Export models — extracted from compliance/db/models.py. + +Covers AI Act system registration/classification and the audit export package +tracker. Re-exported from ``compliance.db.models`` for backwards compatibility. + +DO NOT change __tablename__, column names, or relationship strings. +""" + +import uuid +import enum +from datetime import datetime, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, DateTime, Date, + Enum, JSON, Index, Float, +) + +from classroom_engine.database import Base + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class AIClassificationEnum(str, enum.Enum): + """AI Act risk classification.""" + PROHIBITED = "prohibited" + HIGH_RISK = "high-risk" + LIMITED_RISK = "limited-risk" + MINIMAL_RISK = "minimal-risk" + UNCLASSIFIED = "unclassified" + + +class AISystemStatusEnum(str, enum.Enum): + """Status of an AI system in compliance tracking.""" + DRAFT = "draft" + CLASSIFIED = "classified" + COMPLIANT = "compliant" + NON_COMPLIANT = "non-compliant" + + +class ExportStatusEnum(str, enum.Enum): + """Status of audit export.""" + PENDING = "pending" + GENERATING = "generating" + COMPLETED = "completed" + FAILED = "failed" + + +# ============================================================================ +# MODELS +# ============================================================================ + +class AISystemDB(Base): + """ + AI System registry for AI Act compliance. + Tracks AI systems, their risk classification, and compliance status. + """ + __tablename__ = 'compliance_ai_systems' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + name = Column(String(300), nullable=False) + description = Column(Text) + purpose = Column(String(500)) + sector = Column(String(100)) + + # AI Act classification + classification = Column(Enum(AIClassificationEnum), default=AIClassificationEnum.UNCLASSIFIED) + status = Column(Enum(AISystemStatusEnum), default=AISystemStatusEnum.DRAFT) + + # Assessment + assessment_date = Column(DateTime) + assessment_result = Column(JSON) # Full assessment result + obligations = Column(JSON) # List of AI Act obligations + risk_factors = Column(JSON) # Risk factors from assessment + recommendations = Column(JSON) # Recommendations from assessment + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_ai_system_classification', 'classification'), + Index('ix_ai_system_status', 'status'), + ) + + def __repr__(self): + return f"" + + +class AuditExportDB(Base): + """ + Tracks audit export packages generated for external auditors. + """ + __tablename__ = 'compliance_audit_exports' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + + export_type = Column(String(50), nullable=False) # "full", "controls_only", "evidence_only" + export_name = Column(String(200)) # User-friendly name + + # Scope + included_regulations = Column(JSON) # List of regulation codes + included_domains = Column(JSON) # List of control domains + date_range_start = Column(Date) + date_range_end = Column(Date) + + # Generation + requested_by = Column(String(100), nullable=False) + requested_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) + completed_at = Column(DateTime) + + # Output + file_path = Column(String(500)) + file_hash = Column(String(64)) # SHA-256 of ZIP + file_size_bytes = Column(Integer) + + status = Column(Enum(ExportStatusEnum), default=ExportStatusEnum.PENDING) + error_message = Column(Text) + + # Statistics + total_controls = Column(Integer) + total_evidence = Column(Integer) + compliance_score = Column(Float) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + def __repr__(self): + return f"" + + +__all__ = [ + "AIClassificationEnum", + "AISystemStatusEnum", + "ExportStatusEnum", + "AISystemDB", + "AuditExportDB", +] diff --git a/backend-compliance/compliance/db/audit_session_models.py b/backend-compliance/compliance/db/audit_session_models.py new file mode 100644 index 0000000..4ad2003 --- /dev/null +++ b/backend-compliance/compliance/db/audit_session_models.py @@ -0,0 +1,177 @@ +""" +Audit Session & Sign-Off models — Sprint 3 Phase 3. + +Extracted from compliance/db/models.py as the first worked example of the +Phase 1 model split. The classes are re-exported from compliance.db.models +for backwards compatibility, so existing imports continue to work unchanged. + +Tables: +- compliance_audit_sessions: Structured compliance audit sessions +- compliance_audit_signoffs: Per-requirement sign-offs with digital signatures + +DO NOT change __tablename__, column names, or relationship strings — the +database schema is frozen. +""" + +import uuid +import enum +from datetime import datetime, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, DateTime, + ForeignKey, Enum, JSON, Index, +) +from sqlalchemy.orm import relationship + +from classroom_engine.database import Base + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class AuditResultEnum(str, enum.Enum): + """Result of an audit sign-off for a requirement.""" + COMPLIANT = "compliant" # Fully compliant + COMPLIANT_WITH_NOTES = "compliant_notes" # Compliant with observations + NON_COMPLIANT = "non_compliant" # Not compliant - remediation required + NOT_APPLICABLE = "not_applicable" # Not applicable to this audit + PENDING = "pending" # Not yet reviewed + + +class AuditSessionStatusEnum(str, enum.Enum): + """Status of an audit session.""" + DRAFT = "draft" # Session created, not started + IN_PROGRESS = "in_progress" # Audit in progress + COMPLETED = "completed" # All items reviewed + ARCHIVED = "archived" # Historical record + + +# ============================================================================ +# MODELS +# ============================================================================ + +class AuditSessionDB(Base): + """ + Audit session for structured compliance reviews. + + Enables auditors to: + - Create named audit sessions (e.g., "Q1 2026 GDPR Audit") + - Track progress through requirements + - Sign off individual items with digital signatures + - Generate audit reports + """ + __tablename__ = 'compliance_audit_sessions' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + name = Column(String(200), nullable=False) # e.g., "Q1 2026 Compliance Audit" + description = Column(Text) + + # Auditor information + auditor_name = Column(String(100), nullable=False) # e.g., "Dr. Thomas Müller" + auditor_email = Column(String(200)) + auditor_organization = Column(String(200)) # External auditor company + + # Session scope + status = Column(Enum(AuditSessionStatusEnum), default=AuditSessionStatusEnum.DRAFT) + regulation_ids = Column(JSON) # Filter: ["GDPR", "AIACT"] or null for all + + # Progress tracking + total_items = Column(Integer, default=0) + completed_items = Column(Integer, default=0) + compliant_count = Column(Integer, default=0) + non_compliant_count = Column(Integer, default=0) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + started_at = Column(DateTime) # When audit began + completed_at = Column(DateTime) # When audit finished + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + signoffs = relationship("AuditSignOffDB", back_populates="session", cascade="all, delete-orphan") + + __table_args__ = ( + Index('ix_audit_session_status', 'status'), + Index('ix_audit_session_auditor', 'auditor_name'), + ) + + def __repr__(self): + return f"" + + @property + def completion_percentage(self) -> float: + """Calculate completion percentage.""" + if self.total_items == 0: + return 0.0 + return round((self.completed_items / self.total_items) * 100, 1) + + +class AuditSignOffDB(Base): + """ + Individual sign-off for a requirement within an audit session. + + Features: + - Records audit result (compliant, non-compliant, etc.) + - Stores auditor notes and observations + - Creates digital signature (SHA-256 hash) for tamper evidence + """ + __tablename__ = 'compliance_audit_signoffs' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String(36), ForeignKey('compliance_audit_sessions.id'), nullable=False, index=True) + requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True) + + # Audit result + result = Column(Enum(AuditResultEnum), default=AuditResultEnum.PENDING) + notes = Column(Text) # Auditor observations + + # Evidence references for this sign-off + evidence_ids = Column(JSON) # List of evidence IDs reviewed + + # Digital signature (SHA-256 hash of result + auditor + timestamp) + signature_hash = Column(String(64)) # SHA-256 hex string + signed_at = Column(DateTime) + signed_by = Column(String(100)) # Auditor name at time of signing + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + session = relationship("AuditSessionDB", back_populates="signoffs") + requirement = relationship("RequirementDB") + + __table_args__ = ( + Index('ix_signoff_session_requirement', 'session_id', 'requirement_id', unique=True), + Index('ix_signoff_result', 'result'), + ) + + def __repr__(self): + return f"" + + def create_signature(self, auditor_name: str) -> str: + """ + Create a digital signature for this sign-off. + + Returns SHA-256 hash of: result + requirement_id + auditor_name + timestamp + """ + import hashlib + + timestamp = datetime.now(timezone.utc).isoformat() + data = f"{self.result.value}|{self.requirement_id}|{auditor_name}|{timestamp}" + signature = hashlib.sha256(data.encode()).hexdigest() + + self.signature_hash = signature + self.signed_at = datetime.now(timezone.utc) + self.signed_by = auditor_name + + return signature + + +__all__ = [ + "AuditResultEnum", + "AuditSessionStatusEnum", + "AuditSessionDB", + "AuditSignOffDB", +] diff --git a/backend-compliance/compliance/db/control_models.py b/backend-compliance/compliance/db/control_models.py new file mode 100644 index 0000000..ffd8a14 --- /dev/null +++ b/backend-compliance/compliance/db/control_models.py @@ -0,0 +1,279 @@ +""" +Control, Evidence, and Risk models — extracted from compliance/db/models.py. + +Covers the control framework (ControlDB), requirement↔control mappings, +evidence artifacts, and the risk register. Re-exported from +``compliance.db.models`` for backwards compatibility. + +DO NOT change __tablename__, column names, or relationship strings. +""" + +import uuid +import enum +from datetime import datetime, date, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, Boolean, DateTime, Date, + ForeignKey, Enum, JSON, Index, +) +from sqlalchemy.orm import relationship + +from classroom_engine.database import Base + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class ControlTypeEnum(str, enum.Enum): + """Type of security control.""" + PREVENTIVE = "preventive" # Prevents incidents + DETECTIVE = "detective" # Detects incidents + CORRECTIVE = "corrective" # Corrects after incidents + + +class ControlDomainEnum(str, enum.Enum): + """Domain/category of control.""" + GOVERNANCE = "gov" # Governance & Organization + PRIVACY = "priv" # Privacy & Data Protection + IAM = "iam" # Identity & Access Management + CRYPTO = "crypto" # Cryptography & Key Management + SDLC = "sdlc" # Secure Development Lifecycle + OPS = "ops" # Operations & Monitoring + AI = "ai" # AI-specific controls + CRA = "cra" # CRA & Supply Chain + AUDIT = "aud" # Audit & Traceability + + +class ControlStatusEnum(str, enum.Enum): + """Implementation status of a control.""" + PASS = "pass" # Fully implemented & passing + PARTIAL = "partial" # Partially implemented + FAIL = "fail" # Not passing + NOT_APPLICABLE = "n/a" # Not applicable + PLANNED = "planned" # Planned for implementation + + +class RiskLevelEnum(str, enum.Enum): + """Risk severity level.""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class EvidenceStatusEnum(str, enum.Enum): + """Status of evidence artifact.""" + VALID = "valid" # Currently valid + EXPIRED = "expired" # Past validity date + PENDING = "pending" # Awaiting validation + FAILED = "failed" # Failed validation + + +# ============================================================================ +# MODELS +# ============================================================================ + +class ControlDB(Base): + """ + Technical or organizational security control. + + Examples: PRIV-001 (Verarbeitungsverzeichnis), SDLC-001 (SAST Scanning) + """ + __tablename__ = 'compliance_controls' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + control_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "PRIV-001" + + domain = Column(Enum(ControlDomainEnum), nullable=False, index=True) + control_type = Column(Enum(ControlTypeEnum), nullable=False) + + title = Column(String(300), nullable=False) + description = Column(Text) + pass_criteria = Column(Text, nullable=False) # Measurable pass criteria + implementation_guidance = Column(Text) # How to implement + + # Code/Evidence references + code_reference = Column(String(500)) # e.g., "backend/middleware/pii_redactor.py:45" + documentation_url = Column(String(500)) # Link to internal docs + + # Automation + is_automated = Column(Boolean, default=False) + automation_tool = Column(String(100)) # e.g., "Semgrep", "Trivy" + automation_config = Column(JSON) # Tool-specific config + + # Status + status = Column(Enum(ControlStatusEnum), default=ControlStatusEnum.PLANNED) + status_notes = Column(Text) + + # Ownership & Review + owner = Column(String(100)) # Responsible person/team + review_frequency_days = Column(Integer, default=90) + last_reviewed_at = Column(DateTime) + next_review_at = Column(DateTime) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + mappings = relationship("ControlMappingDB", back_populates="control", cascade="all, delete-orphan") + evidence = relationship("EvidenceDB", back_populates="control", cascade="all, delete-orphan") + + __table_args__ = ( + Index('ix_control_domain_status', 'domain', 'status'), + ) + + def __repr__(self): + return f"" + + +class ControlMappingDB(Base): + """ + Maps requirements to controls (many-to-many with metadata). + """ + __tablename__ = 'compliance_control_mappings' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True) + control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True) + + coverage_level = Column(String(20), default="full") # "full", "partial", "planned" + notes = Column(Text) # Explanation of coverage + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + requirement = relationship("RequirementDB", back_populates="control_mappings") + control = relationship("ControlDB", back_populates="mappings") + + __table_args__ = ( + Index('ix_mapping_req_ctrl', 'requirement_id', 'control_id', unique=True), + ) + + +class EvidenceDB(Base): + """ + Audit evidence for controls. + + Types: scan_report, policy_document, config_snapshot, test_result, + manual_upload, screenshot, external_link + """ + __tablename__ = 'compliance_evidence' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True) + + evidence_type = Column(String(50), nullable=False) # Type of evidence + title = Column(String(300), nullable=False) + description = Column(Text) + + # File/Link storage + artifact_path = Column(String(500)) # Local file path + artifact_url = Column(String(500)) # External URL + artifact_hash = Column(String(64)) # SHA-256 hash + file_size_bytes = Column(Integer) + mime_type = Column(String(100)) + + # Validity period + valid_from = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) + valid_until = Column(DateTime) # NULL = no expiry + status = Column(Enum(EvidenceStatusEnum), default=EvidenceStatusEnum.VALID) + + # Source tracking + source = Column(String(100)) # "ci_pipeline", "manual", "api" + ci_job_id = Column(String(100)) # CI/CD job reference + uploaded_by = Column(String(100)) # User who uploaded + + # Timestamps + collected_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + control = relationship("ControlDB", back_populates="evidence") + + __table_args__ = ( + Index('ix_evidence_control_type', 'control_id', 'evidence_type'), + Index('ix_evidence_status', 'status'), + ) + + def __repr__(self): + return f"" + + +class RiskDB(Base): + """ + Risk register entry with likelihood x impact scoring. + """ + __tablename__ = 'compliance_risks' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + risk_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "RISK-001" + + title = Column(String(300), nullable=False) + description = Column(Text) + category = Column(String(50), nullable=False) # "data_breach", "compliance_gap", etc. + + # Inherent risk (before controls) + likelihood = Column(Integer, nullable=False) # 1-5 + impact = Column(Integer, nullable=False) # 1-5 + inherent_risk = Column(Enum(RiskLevelEnum), nullable=False) + + # Mitigating controls + mitigating_controls = Column(JSON) # List of control_ids + + # Residual risk (after controls) + residual_likelihood = Column(Integer) + residual_impact = Column(Integer) + residual_risk = Column(Enum(RiskLevelEnum)) + + # Management + owner = Column(String(100)) + status = Column(String(20), default="open") # "open", "mitigated", "accepted", "transferred" + treatment_plan = Column(Text) + + # Review + identified_date = Column(Date, default=date.today) + review_date = Column(Date) + last_assessed_at = Column(DateTime) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_risk_category_status', 'category', 'status'), + Index('ix_risk_inherent', 'inherent_risk'), + ) + + def __repr__(self): + return f"" + + @staticmethod + def calculate_risk_level(likelihood: int, impact: int) -> RiskLevelEnum: + """Calculate risk level from likelihood x impact matrix.""" + score = likelihood * impact + if score >= 20: + return RiskLevelEnum.CRITICAL + elif score >= 12: + return RiskLevelEnum.HIGH + elif score >= 6: + return RiskLevelEnum.MEDIUM + else: + return RiskLevelEnum.LOW + + +__all__ = [ + "ControlTypeEnum", + "ControlDomainEnum", + "ControlStatusEnum", + "RiskLevelEnum", + "EvidenceStatusEnum", + "ControlDB", + "ControlMappingDB", + "EvidenceDB", + "RiskDB", +] diff --git a/backend-compliance/compliance/db/isms_audit_models.py b/backend-compliance/compliance/db/isms_audit_models.py new file mode 100644 index 0000000..1bdb4c5 --- /dev/null +++ b/backend-compliance/compliance/db/isms_audit_models.py @@ -0,0 +1,468 @@ +""" +ISMS Audit Execution models (ISO 27001 Kapitel 9-10) — extracted from +compliance/db/models.py. + +Covers findings, corrective actions (CAPA), management reviews, internal +audits, audit trail, and readiness checks. The governance side (scope, +context, policies, objectives, SoA) lives in ``isms_governance_models.py``. + +Re-exported from ``compliance.db.models`` for backwards compatibility. + +DO NOT change __tablename__, column names, or relationship strings. +""" + +import uuid +import enum +from datetime import datetime, date, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, Boolean, DateTime, Date, + ForeignKey, Enum, JSON, Index, Float, +) +from sqlalchemy.orm import relationship + +from classroom_engine.database import Base + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class FindingTypeEnum(str, enum.Enum): + """ISO 27001 audit finding classification.""" + MAJOR = "major" # Major nonconformity - blocks certification + MINOR = "minor" # Minor nonconformity - requires CAPA + OFI = "ofi" # Opportunity for Improvement + POSITIVE = "positive" # Positive observation + + +class FindingStatusEnum(str, enum.Enum): + """Status of an audit finding.""" + OPEN = "open" + IN_PROGRESS = "in_progress" + CORRECTIVE_ACTION_PENDING = "capa_pending" + VERIFICATION_PENDING = "verification_pending" + VERIFIED = "verified" + CLOSED = "closed" + + +class CAPATypeEnum(str, enum.Enum): + """Type of corrective/preventive action.""" + CORRECTIVE = "corrective" # Fix the nonconformity + PREVENTIVE = "preventive" # Prevent recurrence + BOTH = "both" + + +# ============================================================================ +# MODELS +# ============================================================================ + +class AuditFindingDB(Base): + """ + Audit Finding with ISO 27001 Classification (Major/Minor/OFI) + + Tracks findings from internal and external audits with proper + classification and CAPA workflow. + """ + __tablename__ = 'compliance_audit_findings' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + finding_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "FIND-2026-001" + + # Source + audit_session_id = Column(String(36), ForeignKey('compliance_audit_sessions.id'), index=True) + internal_audit_id = Column(String(36), ForeignKey('compliance_internal_audits.id'), index=True) + + # Classification (CRITICAL for ISO 27001!) + finding_type = Column(Enum(FindingTypeEnum), nullable=False) + + # ISO reference + iso_chapter = Column(String(20)) # e.g., "6.1.2", "9.2" + annex_a_control = Column(String(20)) # e.g., "A.8.2" + + # Finding details + title = Column(String(300), nullable=False) + description = Column(Text, nullable=False) + objective_evidence = Column(Text, nullable=False) # What the auditor observed + + # Root cause analysis + root_cause = Column(Text) + root_cause_method = Column(String(50)) # "5-why", "fishbone", "pareto" + + # Impact assessment + impact_description = Column(Text) + affected_processes = Column(JSON) + affected_assets = Column(JSON) + + # Status tracking + status = Column(Enum(FindingStatusEnum), default=FindingStatusEnum.OPEN) + + # Responsibility + owner = Column(String(100)) # Person responsible for closure + auditor = Column(String(100)) # Auditor who raised finding + + # Dates + identified_date = Column(Date, nullable=False, default=date.today) + due_date = Column(Date) # Deadline for closure + closed_date = Column(Date) + + # Verification + verification_method = Column(Text) + verified_by = Column(String(100)) + verified_at = Column(DateTime) + verification_evidence = Column(Text) + + # Closure + closure_notes = Column(Text) + closed_by = Column(String(100)) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + corrective_actions = relationship("CorrectiveActionDB", back_populates="finding", cascade="all, delete-orphan") + + __table_args__ = ( + Index('ix_finding_type_status', 'finding_type', 'status'), + Index('ix_finding_due_date', 'due_date'), + ) + + def __repr__(self): + return f"" + + @property + def is_blocking(self) -> bool: + """Major findings block certification.""" + return self.finding_type == FindingTypeEnum.MAJOR and self.status != FindingStatusEnum.CLOSED + + +class CorrectiveActionDB(Base): + """ + Corrective & Preventive Actions (CAPA) - ISO 27001 10.1 + + Tracks actions taken to address nonconformities. + """ + __tablename__ = 'compliance_corrective_actions' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + capa_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "CAPA-2026-001" + + # Link to finding + finding_id = Column(String(36), ForeignKey('compliance_audit_findings.id'), nullable=False, index=True) + + # Type + capa_type = Column(Enum(CAPATypeEnum), nullable=False) + + # Action details + title = Column(String(300), nullable=False) + description = Column(Text, nullable=False) + expected_outcome = Column(Text) + + # Responsibility + assigned_to = Column(String(100), nullable=False) + approved_by = Column(String(100)) + + # Timeline + planned_start = Column(Date) + planned_completion = Column(Date, nullable=False) + actual_completion = Column(Date) + + # Status + status = Column(String(30), default="planned") # planned, in_progress, completed, verified, cancelled + progress_percentage = Column(Integer, default=0) + + # Resources + estimated_effort_hours = Column(Integer) + actual_effort_hours = Column(Integer) + resources_required = Column(Text) + + # Evidence of implementation + implementation_evidence = Column(Text) + evidence_ids = Column(JSON) + + # Effectiveness review + effectiveness_criteria = Column(Text) + effectiveness_verified = Column(Boolean, default=False) + effectiveness_verification_date = Column(Date) + effectiveness_notes = Column(Text) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + finding = relationship("AuditFindingDB", back_populates="corrective_actions") + + __table_args__ = ( + Index('ix_capa_status', 'status'), + Index('ix_capa_due', 'planned_completion'), + ) + + def __repr__(self): + return f"" + + +class ManagementReviewDB(Base): + """ + Management Review (ISO 27001 Kapitel 9.3) + + Records mandatory management reviews of the ISMS. + """ + __tablename__ = 'compliance_management_reviews' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + review_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "MR-2026-Q1" + + # Review details + title = Column(String(200), nullable=False) + review_date = Column(Date, nullable=False) + review_period_start = Column(Date) # Period being reviewed + review_period_end = Column(Date) + + # Participants + chairperson = Column(String(100), nullable=False) # Usually top management + attendees = Column(JSON) # List of {"name": "", "role": ""} + + # 9.3 Review Inputs (mandatory!) + input_previous_actions = Column(Text) # Status of previous review actions + input_isms_changes = Column(Text) # Changes in internal/external issues + input_security_performance = Column(Text) # Nonconformities, monitoring, audit results + input_interested_party_feedback = Column(Text) + input_risk_assessment_results = Column(Text) + input_improvement_opportunities = Column(Text) + + # Additional inputs + input_policy_effectiveness = Column(Text) + input_objective_achievement = Column(Text) + input_resource_adequacy = Column(Text) + + # 9.3 Review Outputs (mandatory!) + output_improvement_decisions = Column(Text) # Decisions for improvement + output_isms_changes = Column(Text) # Changes needed to ISMS + output_resource_needs = Column(Text) # Resource requirements + + # Action items + action_items = Column(JSON) # List of {"action": "", "owner": "", "due_date": ""} + + # Overall assessment + isms_effectiveness_rating = Column(String(20)) # "effective", "partially_effective", "not_effective" + key_decisions = Column(Text) + + # Approval + status = Column(String(30), default="draft") # draft, conducted, approved + approved_by = Column(String(100)) + approved_at = Column(DateTime) + minutes_document_path = Column(String(500)) # Link to meeting minutes + + # Next review + next_review_date = Column(Date) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_mgmt_review_date', 'review_date'), + Index('ix_mgmt_review_status', 'status'), + ) + + def __repr__(self): + return f"" + + +class InternalAuditDB(Base): + """ + Internal Audit (ISO 27001 Kapitel 9.2) + + Tracks internal audit program and individual audits. + """ + __tablename__ = 'compliance_internal_audits' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + audit_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "IA-2026-001" + + # Audit details + title = Column(String(200), nullable=False) + audit_type = Column(String(50), nullable=False) # "scheduled", "surveillance", "special" + + # Scope + scope_description = Column(Text, nullable=False) + iso_chapters_covered = Column(JSON) # e.g., ["4", "5", "6.1"] + annex_a_controls_covered = Column(JSON) # e.g., ["A.5", "A.6"] + processes_covered = Column(JSON) + departments_covered = Column(JSON) + + # Audit criteria + criteria = Column(Text) # Standards, policies being audited against + + # Timeline + planned_date = Column(Date, nullable=False) + actual_start_date = Column(Date) + actual_end_date = Column(Date) + + # Audit team + lead_auditor = Column(String(100), nullable=False) + audit_team = Column(JSON) # List of auditor names + auditee_representatives = Column(JSON) # Who was interviewed + + # Status + status = Column(String(30), default="planned") # planned, in_progress, completed, cancelled + + # Results summary + total_findings = Column(Integer, default=0) + major_findings = Column(Integer, default=0) + minor_findings = Column(Integer, default=0) + ofi_count = Column(Integer, default=0) + positive_observations = Column(Integer, default=0) + + # Conclusion + audit_conclusion = Column(Text) + overall_assessment = Column(String(30)) # "conforming", "minor_nc", "major_nc" + + # Report + report_date = Column(Date) + report_document_path = Column(String(500)) + + # Sign-off + report_approved_by = Column(String(100)) + report_approved_at = Column(DateTime) + + # Follow-up + follow_up_audit_required = Column(Boolean, default=False) + follow_up_audit_id = Column(String(36)) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + findings = relationship("AuditFindingDB", backref="internal_audit", foreign_keys=[AuditFindingDB.internal_audit_id]) + + __table_args__ = ( + Index('ix_internal_audit_date', 'planned_date'), + Index('ix_internal_audit_status', 'status'), + ) + + def __repr__(self): + return f"" + + +class AuditTrailDB(Base): + """ + Comprehensive Audit Trail for ISMS Changes + + Tracks all changes to compliance-relevant data for + accountability and forensic analysis. + """ + __tablename__ = 'compliance_audit_trail' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + + # What changed + entity_type = Column(String(50), nullable=False, index=True) # "control", "risk", "policy", etc. + entity_id = Column(String(36), nullable=False, index=True) + entity_name = Column(String(200)) # Human-readable identifier + + # Action + action = Column(String(20), nullable=False) # "create", "update", "delete", "approve", "sign" + + # Change details + field_changed = Column(String(100)) # Which field (for updates) + old_value = Column(Text) + new_value = Column(Text) + change_summary = Column(Text) # Human-readable summary + + # Who & When + performed_by = Column(String(100), nullable=False) + performed_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) + + # Context + ip_address = Column(String(45)) + user_agent = Column(String(500)) + session_id = Column(String(100)) + + # Integrity + checksum = Column(String(64)) # SHA-256 of the change + + # Timestamps (immutable after creation) + created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_audit_trail_entity', 'entity_type', 'entity_id'), + Index('ix_audit_trail_time', 'performed_at'), + Index('ix_audit_trail_user', 'performed_by'), + ) + + def __repr__(self): + return f"" + + +class ISMSReadinessCheckDB(Base): + """ + ISMS Readiness Check Results + + Stores automated pre-audit checks to identify potential + Major findings before external audit. + """ + __tablename__ = 'compliance_isms_readiness' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + + # Check run + check_date = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)) + triggered_by = Column(String(100)) # "scheduled", "manual", "pre-audit" + + # Overall status + overall_status = Column(String(20), nullable=False) # "ready", "at_risk", "not_ready" + certification_possible = Column(Boolean, nullable=False) + + # Chapter-by-chapter status (ISO 27001) + chapter_4_status = Column(String(20)) # Context + chapter_5_status = Column(String(20)) # Leadership + chapter_6_status = Column(String(20)) # Planning + chapter_7_status = Column(String(20)) # Support + chapter_8_status = Column(String(20)) # Operation + chapter_9_status = Column(String(20)) # Performance + chapter_10_status = Column(String(20)) # Improvement + + # Potential Major findings + potential_majors = Column(JSON) # List of {"check": "", "status": "", "recommendation": ""} + + # Potential Minor findings + potential_minors = Column(JSON) + + # Improvement opportunities + improvement_opportunities = Column(JSON) + + # Scores + readiness_score = Column(Float) # 0-100 + documentation_score = Column(Float) + implementation_score = Column(Float) + evidence_score = Column(Float) + + # Recommendations + priority_actions = Column(JSON) # List of recommended actions before audit + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_readiness_date', 'check_date'), + Index('ix_readiness_status', 'overall_status'), + ) + + def __repr__(self): + return f"" + + +__all__ = [ + "FindingTypeEnum", + "FindingStatusEnum", + "CAPATypeEnum", + "AuditFindingDB", + "CorrectiveActionDB", + "ManagementReviewDB", + "InternalAuditDB", + "AuditTrailDB", + "ISMSReadinessCheckDB", +] diff --git a/backend-compliance/compliance/db/isms_governance_models.py b/backend-compliance/compliance/db/isms_governance_models.py new file mode 100644 index 0000000..ce8d3f9 --- /dev/null +++ b/backend-compliance/compliance/db/isms_governance_models.py @@ -0,0 +1,323 @@ +""" +ISMS Governance models (ISO 27001 Kapitel 4-6) — extracted from compliance/db/models.py. + +Covers the documentation and planning side of the ISMS: scope, context, +policies, security objectives, and the Statement of Applicability. The audit +execution side (findings, CAPA, management reviews, internal audits, audit +trail, readiness checks) lives in ``isms_audit_models.py``. + +Re-exported from ``compliance.db.models`` for backwards compatibility. + +DO NOT change __tablename__, column names, or relationship strings — the +database schema is frozen. +""" + +import uuid +import enum +from datetime import datetime, date, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, Boolean, DateTime, Date, + ForeignKey, Enum, JSON, Index, +) + +from classroom_engine.database import Base + + +# ============================================================================ +# SHARED GOVERNANCE ENUMS +# ============================================================================ + +class ApprovalStatusEnum(str, enum.Enum): + """Approval status for ISMS documents.""" + DRAFT = "draft" + UNDER_REVIEW = "under_review" + APPROVED = "approved" + SUPERSEDED = "superseded" + + +# ============================================================================ +# MODELS +# ============================================================================ + +class ISMSScopeDB(Base): + """ + ISMS Scope Definition (ISO 27001 Kapitel 4.3) + + Defines the boundaries and applicability of the ISMS. + This is MANDATORY for certification. + """ + __tablename__ = 'compliance_isms_scope' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + version = Column(String(20), nullable=False, default="1.0") + + # Scope definition + scope_statement = Column(Text, nullable=False) # Main scope text + included_locations = Column(JSON) # List of locations + included_processes = Column(JSON) # List of processes + included_services = Column(JSON) # List of services/products + excluded_items = Column(JSON) # Explicitly excluded items + exclusion_justification = Column(Text) # Why items are excluded + + # Boundaries + organizational_boundary = Column(Text) # Legal entity, departments + physical_boundary = Column(Text) # Locations, networks + technical_boundary = Column(Text) # Systems, applications + + # Approval + status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) + approved_by = Column(String(100)) + approved_at = Column(DateTime) + approval_signature = Column(String(64)) # SHA-256 hash + + # Validity + effective_date = Column(Date) + review_date = Column(Date) # Next mandatory review + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + created_by = Column(String(100)) + updated_by = Column(String(100)) + + __table_args__ = ( + Index('ix_isms_scope_status', 'status'), + ) + + def __repr__(self): + return f"" + + +class ISMSContextDB(Base): + """ + ISMS Context (ISO 27001 Kapitel 4.1, 4.2) + + Documents internal/external issues and interested parties. + """ + __tablename__ = 'compliance_isms_context' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + version = Column(String(20), nullable=False, default="1.0") + + # 4.1 Internal issues + internal_issues = Column(JSON) # List of {"issue": "", "impact": "", "treatment": ""} + + # 4.1 External issues + external_issues = Column(JSON) # List of {"issue": "", "impact": "", "treatment": ""} + + # 4.2 Interested parties + interested_parties = Column(JSON) # List of {"party": "", "requirements": [], "relevance": ""} + + # Legal/regulatory requirements + regulatory_requirements = Column(JSON) # DSGVO, AI Act, etc. + contractual_requirements = Column(JSON) # Customer contracts + + # Analysis + swot_strengths = Column(JSON) + swot_weaknesses = Column(JSON) + swot_opportunities = Column(JSON) + swot_threats = Column(JSON) + + # Approval + status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) + approved_by = Column(String(100)) + approved_at = Column(DateTime) + + # Review + last_reviewed_at = Column(DateTime) + next_review_date = Column(Date) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + def __repr__(self): + return f"" + + +class ISMSPolicyDB(Base): + """ + ISMS Policies (ISO 27001 Kapitel 5.2) + + Information security policy and sub-policies. + """ + __tablename__ = 'compliance_isms_policies' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + policy_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "POL-ISMS-001" + + # Policy details + title = Column(String(200), nullable=False) + policy_type = Column(String(50), nullable=False) # "master", "operational", "technical" + description = Column(Text) + policy_text = Column(Text, nullable=False) # Full policy content + + # Scope + applies_to = Column(JSON) # Roles, departments, systems + + # Document control + version = Column(String(20), nullable=False, default="1.0") + status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) + + # Approval chain + authored_by = Column(String(100)) + reviewed_by = Column(String(100)) + approved_by = Column(String(100)) # Must be top management + approved_at = Column(DateTime) + approval_signature = Column(String(64)) + + # Validity + effective_date = Column(Date) + review_frequency_months = Column(Integer, default=12) + next_review_date = Column(Date) + + # References + parent_policy_id = Column(String(36), ForeignKey('compliance_isms_policies.id')) + related_controls = Column(JSON) # List of control_ids + + # Document path + document_path = Column(String(500)) # Link to full document + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_policy_type_status', 'policy_type', 'status'), + ) + + def __repr__(self): + return f"" + + +class SecurityObjectiveDB(Base): + """ + Security Objectives (ISO 27001 Kapitel 6.2) + + Measurable information security objectives. + """ + __tablename__ = 'compliance_security_objectives' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + objective_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "OBJ-001" + + # Objective definition + title = Column(String(200), nullable=False) + description = Column(Text) + category = Column(String(50)) # "availability", "confidentiality", "integrity", "compliance" + + # SMART criteria + specific = Column(Text) # What exactly + measurable = Column(Text) # How measured + achievable = Column(Text) # Is it realistic + relevant = Column(Text) # Why important + time_bound = Column(Text) # Deadline + + # Metrics + kpi_name = Column(String(100)) + kpi_target = Column(String(100)) # Target value + kpi_current = Column(String(100)) # Current value + kpi_unit = Column(String(50)) # %, count, score + measurement_frequency = Column(String(50)) # monthly, quarterly + + # Responsibility + owner = Column(String(100)) + accountable = Column(String(100)) # RACI: Accountable + + # Status + status = Column(String(30), default="active") # active, achieved, not_achieved, cancelled + progress_percentage = Column(Integer, default=0) + + # Timeline + target_date = Column(Date) + achieved_date = Column(Date) + + # Linked items + related_controls = Column(JSON) + related_risks = Column(JSON) + + # Approval + approved_by = Column(String(100)) + approved_at = Column(DateTime) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_objective_status', 'status'), + Index('ix_objective_category', 'category'), + ) + + def __repr__(self): + return f"" + + +class StatementOfApplicabilityDB(Base): + """ + Statement of Applicability (SoA) - ISO 27001 Anhang A Mapping + + Documents which Annex A controls are applicable and why. + This is MANDATORY for certification. + """ + __tablename__ = 'compliance_soa' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + + # ISO 27001:2022 Annex A reference + annex_a_control = Column(String(20), nullable=False, index=True) # e.g., "A.5.1" + annex_a_title = Column(String(300), nullable=False) + annex_a_category = Column(String(100)) # "Organizational", "People", "Physical", "Technological" + + # Applicability decision + is_applicable = Column(Boolean, nullable=False) + applicability_justification = Column(Text, nullable=False) # MUST be documented + + # Implementation status + implementation_status = Column(String(30), default="planned") # planned, partial, implemented, not_implemented + implementation_notes = Column(Text) + + # Mapping to our controls + breakpilot_control_ids = Column(JSON) # List of our control_ids that address this + coverage_level = Column(String(20), default="full") # full, partial, planned + + # Evidence + evidence_description = Column(Text) + evidence_ids = Column(JSON) # Links to EvidenceDB + + # Risk-based justification (for exclusions) + risk_assessment_notes = Column(Text) # If not applicable, explain why + compensating_controls = Column(Text) # If partial, explain compensating measures + + # Approval + reviewed_by = Column(String(100)) + reviewed_at = Column(DateTime) + approved_by = Column(String(100)) + approved_at = Column(DateTime) + + # Version tracking + version = Column(String(20), default="1.0") + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('ix_soa_annex_control', 'annex_a_control', unique=True), + Index('ix_soa_applicable', 'is_applicable'), + Index('ix_soa_status', 'implementation_status'), + ) + + def __repr__(self): + return f"" + + +__all__ = [ + "ApprovalStatusEnum", + "ISMSScopeDB", + "ISMSContextDB", + "ISMSPolicyDB", + "SecurityObjectiveDB", + "StatementOfApplicabilityDB", +] diff --git a/backend-compliance/compliance/db/models.py b/backend-compliance/compliance/db/models.py index 84aa79d..a89f047 100644 --- a/backend-compliance/compliance/db/models.py +++ b/backend-compliance/compliance/db/models.py @@ -1,1466 +1,85 @@ """ -SQLAlchemy models for Compliance & Audit Framework. +compliance.db.models — backwards-compatibility re-export shim. -Tables: -- compliance_regulations: EU regulations, directives, BSI standards -- compliance_requirements: Individual requirements from regulations -- compliance_controls: Technical & organizational controls -- compliance_control_mappings: Requirement <-> Control mappings -- compliance_evidence: Audit evidence (files, reports, configs) -- compliance_risks: Risk register with likelihood x impact -- compliance_audit_exports: Export history for auditors +Phase 1 refactor split the monolithic 1466-line models module into per-aggregate +sibling modules. Every public symbol is re-exported here so existing imports +(``from compliance.db.models import RegulationDB, ...``) continue to work +unchanged. + +New code SHOULD import directly from the aggregate module: + + from compliance.db.regulation_models import RegulationDB, RequirementDB + from compliance.db.control_models import ControlDB, RiskDB + from compliance.db.ai_system_models import AISystemDB + from compliance.db.service_module_models import ServiceModuleDB + from compliance.db.audit_session_models import AuditSessionDB + from compliance.db.isms_governance_models import ISMSScopeDB + from compliance.db.isms_audit_models import AuditFindingDB + +Import order here also matters for SQLAlchemy mapper configuration: aggregates +that are referenced by name-string relationships must be imported before their +referrers. Regulation/Control/Risk come first, then Service Module, then the +audit sessions and ISMS layers. + +DO NOT add new classes to this file. Add them to the appropriate aggregate +module and re-export here. """ -import enum -import uuid -from datetime import datetime, date +# Order matters: later modules reference classes defined in earlier ones via +# SQLAlchemy string relationships. Keep foundational aggregates first. -from sqlalchemy import ( - Column, String, Text, Integer, Boolean, DateTime, Date, - ForeignKey, Enum, JSON, Index, Float +from compliance.db.regulation_models import ( # noqa: F401 + RegulationTypeEnum, + RegulationDB, + RequirementDB, +) +from compliance.db.control_models import ( # noqa: F401 + ControlTypeEnum, + ControlDomainEnum, + ControlStatusEnum, + RiskLevelEnum, + EvidenceStatusEnum, + ControlDB, + ControlMappingDB, + EvidenceDB, + RiskDB, +) +from compliance.db.ai_system_models import ( # noqa: F401 + AIClassificationEnum, + AISystemStatusEnum, + ExportStatusEnum, + AISystemDB, + AuditExportDB, +) +from compliance.db.service_module_models import ( # noqa: F401 + ServiceTypeEnum, + RelevanceLevelEnum, + ServiceModuleDB, + ModuleRegulationMappingDB, + ModuleRiskDB, +) +from compliance.db.audit_session_models import ( # noqa: F401 + AuditResultEnum, + AuditSessionStatusEnum, + AuditSessionDB, + AuditSignOffDB, +) +from compliance.db.isms_governance_models import ( # noqa: F401 + ApprovalStatusEnum, + ISMSScopeDB, + ISMSContextDB, + ISMSPolicyDB, + SecurityObjectiveDB, + StatementOfApplicabilityDB, +) +from compliance.db.isms_audit_models import ( # noqa: F401 + FindingTypeEnum, + FindingStatusEnum, + CAPATypeEnum, + AuditFindingDB, + CorrectiveActionDB, + ManagementReviewDB, + InternalAuditDB, + AuditTrailDB, + ISMSReadinessCheckDB, ) -from sqlalchemy.orm import relationship - -# Import shared Base from classroom_engine -from classroom_engine.database import Base - - -# ============================================================================ -# ENUMS -# ============================================================================ - -class RegulationTypeEnum(str, enum.Enum): - """Type of regulation/standard.""" - EU_REGULATION = "eu_regulation" # Directly applicable EU law - EU_DIRECTIVE = "eu_directive" # Requires national implementation - DE_LAW = "de_law" # German national law - BSI_STANDARD = "bsi_standard" # BSI technical guidelines - INDUSTRY_STANDARD = "industry_standard" # ISO, OWASP, etc. - - -class ControlTypeEnum(str, enum.Enum): - """Type of security control.""" - PREVENTIVE = "preventive" # Prevents incidents - DETECTIVE = "detective" # Detects incidents - CORRECTIVE = "corrective" # Corrects after incidents - - -class ControlDomainEnum(str, enum.Enum): - """Domain/category of control.""" - GOVERNANCE = "gov" # Governance & Organization - PRIVACY = "priv" # Privacy & Data Protection - IAM = "iam" # Identity & Access Management - CRYPTO = "crypto" # Cryptography & Key Management - SDLC = "sdlc" # Secure Development Lifecycle - OPS = "ops" # Operations & Monitoring - AI = "ai" # AI-specific controls - CRA = "cra" # CRA & Supply Chain - AUDIT = "aud" # Audit & Traceability - - -class ControlStatusEnum(str, enum.Enum): - """Implementation status of a control.""" - PASS = "pass" # Fully implemented & passing - PARTIAL = "partial" # Partially implemented - FAIL = "fail" # Not passing - NOT_APPLICABLE = "n/a" # Not applicable - PLANNED = "planned" # Planned for implementation - - -class RiskLevelEnum(str, enum.Enum): - """Risk severity level.""" - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - CRITICAL = "critical" - - -class EvidenceStatusEnum(str, enum.Enum): - """Status of evidence artifact.""" - VALID = "valid" # Currently valid - EXPIRED = "expired" # Past validity date - PENDING = "pending" # Awaiting validation - FAILED = "failed" # Failed validation - - -class ExportStatusEnum(str, enum.Enum): - """Status of audit export.""" - PENDING = "pending" - GENERATING = "generating" - COMPLETED = "completed" - FAILED = "failed" - - -class ServiceTypeEnum(str, enum.Enum): - """Type of Breakpilot service/module.""" - BACKEND = "backend" # API/Backend services - DATABASE = "database" # Data storage - AI = "ai" # AI/ML services - COMMUNICATION = "communication" # Chat/Video/Messaging - STORAGE = "storage" # File/Object storage - INFRASTRUCTURE = "infrastructure" # Load balancer, reverse proxy - MONITORING = "monitoring" # Logging, metrics - SECURITY = "security" # Auth, encryption, secrets - - -class RelevanceLevelEnum(str, enum.Enum): - """Relevance level of a regulation to a service.""" - CRITICAL = "critical" # Non-compliance = shutdown - HIGH = "high" # Major risk - MEDIUM = "medium" # Moderate risk - LOW = "low" # Minor risk - - -# ============================================================================ -# MODELS -# ============================================================================ - -class RegulationDB(Base): - """ - Represents a regulation, directive, or standard. - - Examples: GDPR, AI Act, CRA, BSI-TR-03161 - """ - __tablename__ = 'compliance_regulations' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - code = Column(String(20), unique=True, nullable=False, index=True) # e.g., "GDPR", "AIACT" - name = Column(String(200), nullable=False) # Short name - full_name = Column(Text) # Full official name - regulation_type = Column(Enum(RegulationTypeEnum), nullable=False) - source_url = Column(String(500)) # EUR-Lex URL or similar - local_pdf_path = Column(String(500)) # Local PDF if available - effective_date = Column(Date) # When it came into force - description = Column(Text) # Brief description - is_active = Column(Boolean, default=True) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - requirements = relationship("RequirementDB", back_populates="regulation", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - - -class RequirementDB(Base): - """ - Individual requirement from a regulation. - - Examples: GDPR Art. 32(1)(a), AI Act Art. 9, BSI-TR O.Auth_1 - """ - __tablename__ = 'compliance_requirements' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - regulation_id = Column(String(36), ForeignKey('compliance_regulations.id'), nullable=False, index=True) - - # Requirement identification - article = Column(String(50), nullable=False) # e.g., "Art. 32", "O.Auth_1" - paragraph = Column(String(20)) # e.g., "(1)(a)" - requirement_id_external = Column(String(50)) # External ID (e.g., BSI ID) - title = Column(String(300), nullable=False) # Requirement title - description = Column(Text) # Brief description - requirement_text = Column(Text) # Original text from regulation - - # Breakpilot-specific interpretation and implementation - breakpilot_interpretation = Column(Text) # How Breakpilot interprets this - implementation_status = Column(String(30), default="not_started") # not_started, in_progress, implemented, verified - implementation_details = Column(Text) # How we implemented it - code_references = Column(JSON) # List of {"file": "...", "line": ..., "description": "..."} - documentation_links = Column(JSON) # List of internal doc links - - # Evidence for auditors - evidence_description = Column(Text) # What evidence proves compliance - evidence_artifacts = Column(JSON) # List of {"type": "...", "path": "...", "description": "..."} - - # Audit-specific fields - auditor_notes = Column(Text) # Notes from auditor review - audit_status = Column(String(30), default="pending") # pending, in_review, approved, rejected - last_audit_date = Column(DateTime) - last_auditor = Column(String(100)) - - is_applicable = Column(Boolean, default=True) # Applicable to Breakpilot? - applicability_reason = Column(Text) # Why/why not applicable - - priority = Column(Integer, default=2) # 1=Critical, 2=High, 3=Medium - - # Source document reference - source_page = Column(Integer) # Page number in source document - source_section = Column(String(100)) # Section in source document - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - regulation = relationship("RegulationDB", back_populates="requirements") - control_mappings = relationship("ControlMappingDB", back_populates="requirement", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_requirement_regulation_article', 'regulation_id', 'article'), - Index('ix_requirement_audit_status', 'audit_status'), - Index('ix_requirement_impl_status', 'implementation_status'), - ) - - def __repr__(self): - return f"" - - -class ControlDB(Base): - """ - Technical or organizational security control. - - Examples: PRIV-001 (Verarbeitungsverzeichnis), SDLC-001 (SAST Scanning) - """ - __tablename__ = 'compliance_controls' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - control_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "PRIV-001" - - domain = Column(Enum(ControlDomainEnum), nullable=False, index=True) - control_type = Column(Enum(ControlTypeEnum), nullable=False) - - title = Column(String(300), nullable=False) - description = Column(Text) - pass_criteria = Column(Text, nullable=False) # Measurable pass criteria - implementation_guidance = Column(Text) # How to implement - - # Code/Evidence references - code_reference = Column(String(500)) # e.g., "backend/middleware/pii_redactor.py:45" - documentation_url = Column(String(500)) # Link to internal docs - - # Automation - is_automated = Column(Boolean, default=False) - automation_tool = Column(String(100)) # e.g., "Semgrep", "Trivy" - automation_config = Column(JSON) # Tool-specific config - - # Status - status = Column(Enum(ControlStatusEnum), default=ControlStatusEnum.PLANNED) - status_notes = Column(Text) - - # Ownership & Review - owner = Column(String(100)) # Responsible person/team - review_frequency_days = Column(Integer, default=90) - last_reviewed_at = Column(DateTime) - next_review_at = Column(DateTime) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - mappings = relationship("ControlMappingDB", back_populates="control", cascade="all, delete-orphan") - evidence = relationship("EvidenceDB", back_populates="control", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_control_domain_status', 'domain', 'status'), - ) - - def __repr__(self): - return f"" - - -class ControlMappingDB(Base): - """ - Maps requirements to controls (many-to-many with metadata). - """ - __tablename__ = 'compliance_control_mappings' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True) - control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True) - - coverage_level = Column(String(20), default="full") # "full", "partial", "planned" - notes = Column(Text) # Explanation of coverage - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - requirement = relationship("RequirementDB", back_populates="control_mappings") - control = relationship("ControlDB", back_populates="mappings") - - __table_args__ = ( - Index('ix_mapping_req_ctrl', 'requirement_id', 'control_id', unique=True), - ) - - -class EvidenceDB(Base): - """ - Audit evidence for controls. - - Types: scan_report, policy_document, config_snapshot, test_result, - manual_upload, screenshot, external_link - """ - __tablename__ = 'compliance_evidence' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True) - - evidence_type = Column(String(50), nullable=False) # Type of evidence - title = Column(String(300), nullable=False) - description = Column(Text) - - # File/Link storage - artifact_path = Column(String(500)) # Local file path - artifact_url = Column(String(500)) # External URL - artifact_hash = Column(String(64)) # SHA-256 hash - file_size_bytes = Column(Integer) - mime_type = Column(String(100)) - - # Validity period - valid_from = Column(DateTime, nullable=False, default=datetime.utcnow) - valid_until = Column(DateTime) # NULL = no expiry - status = Column(Enum(EvidenceStatusEnum), default=EvidenceStatusEnum.VALID) - - # Source tracking - source = Column(String(100)) # "ci_pipeline", "manual", "api" - ci_job_id = Column(String(100)) # CI/CD job reference - uploaded_by = Column(String(100)) # User who uploaded - - # Timestamps - collected_at = Column(DateTime, default=datetime.utcnow) - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - control = relationship("ControlDB", back_populates="evidence") - - __table_args__ = ( - Index('ix_evidence_control_type', 'control_id', 'evidence_type'), - Index('ix_evidence_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class RiskDB(Base): - """ - Risk register entry with likelihood x impact scoring. - """ - __tablename__ = 'compliance_risks' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - risk_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "RISK-001" - - title = Column(String(300), nullable=False) - description = Column(Text) - category = Column(String(50), nullable=False) # "data_breach", "compliance_gap", etc. - - # Inherent risk (before controls) - likelihood = Column(Integer, nullable=False) # 1-5 - impact = Column(Integer, nullable=False) # 1-5 - inherent_risk = Column(Enum(RiskLevelEnum), nullable=False) - - # Mitigating controls - mitigating_controls = Column(JSON) # List of control_ids - - # Residual risk (after controls) - residual_likelihood = Column(Integer) - residual_impact = Column(Integer) - residual_risk = Column(Enum(RiskLevelEnum)) - - # Management - owner = Column(String(100)) - status = Column(String(20), default="open") # "open", "mitigated", "accepted", "transferred" - treatment_plan = Column(Text) - - # Review - identified_date = Column(Date, default=date.today) - review_date = Column(Date) - last_assessed_at = Column(DateTime) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_risk_category_status', 'category', 'status'), - Index('ix_risk_inherent', 'inherent_risk'), - ) - - def __repr__(self): - return f"" - - @staticmethod - def calculate_risk_level(likelihood: int, impact: int) -> RiskLevelEnum: - """Calculate risk level from likelihood x impact matrix.""" - score = likelihood * impact - if score >= 20: - return RiskLevelEnum.CRITICAL - elif score >= 12: - return RiskLevelEnum.HIGH - elif score >= 6: - return RiskLevelEnum.MEDIUM - else: - return RiskLevelEnum.LOW - - -class AIClassificationEnum(str, enum.Enum): - """AI Act risk classification.""" - PROHIBITED = "prohibited" - HIGH_RISK = "high-risk" - LIMITED_RISK = "limited-risk" - MINIMAL_RISK = "minimal-risk" - UNCLASSIFIED = "unclassified" - - -class AISystemStatusEnum(str, enum.Enum): - """Status of an AI system in compliance tracking.""" - DRAFT = "draft" - CLASSIFIED = "classified" - COMPLIANT = "compliant" - NON_COMPLIANT = "non-compliant" - - -class AISystemDB(Base): - """ - AI System registry for AI Act compliance. - Tracks AI systems, their risk classification, and compliance status. - """ - __tablename__ = 'compliance_ai_systems' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(300), nullable=False) - description = Column(Text) - purpose = Column(String(500)) - sector = Column(String(100)) - - # AI Act classification - classification = Column(Enum(AIClassificationEnum), default=AIClassificationEnum.UNCLASSIFIED) - status = Column(Enum(AISystemStatusEnum), default=AISystemStatusEnum.DRAFT) - - # Assessment - assessment_date = Column(DateTime) - assessment_result = Column(JSON) # Full assessment result - obligations = Column(JSON) # List of AI Act obligations - risk_factors = Column(JSON) # Risk factors from assessment - recommendations = Column(JSON) # Recommendations from assessment - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_ai_system_classification', 'classification'), - Index('ix_ai_system_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class AuditExportDB(Base): - """ - Tracks audit export packages generated for external auditors. - """ - __tablename__ = 'compliance_audit_exports' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - export_type = Column(String(50), nullable=False) # "full", "controls_only", "evidence_only" - export_name = Column(String(200)) # User-friendly name - - # Scope - included_regulations = Column(JSON) # List of regulation codes - included_domains = Column(JSON) # List of control domains - date_range_start = Column(Date) - date_range_end = Column(Date) - - # Generation - requested_by = Column(String(100), nullable=False) - requested_at = Column(DateTime, nullable=False, default=datetime.utcnow) - completed_at = Column(DateTime) - - # Output - file_path = Column(String(500)) - file_hash = Column(String(64)) # SHA-256 of ZIP - file_size_bytes = Column(Integer) - - status = Column(Enum(ExportStatusEnum), default=ExportStatusEnum.PENDING) - error_message = Column(Text) - - # Statistics - total_controls = Column(Integer) - total_evidence = Column(Integer) - compliance_score = Column(Float) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - def __repr__(self): - return f"" - - -# ============================================================================ -# SERVICE MODULE REGISTRY (Sprint 3) -# ============================================================================ - -class ServiceModuleDB(Base): - """ - Registry of all Breakpilot services/modules for compliance mapping. - - Tracks which regulations apply to which services, enabling: - - Service-specific compliance views - - Aggregated risk per service - - Gap analysis by module - """ - __tablename__ = 'compliance_service_modules' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(100), unique=True, nullable=False, index=True) # e.g., "consent-service" - display_name = Column(String(200), nullable=False) # e.g., "Go Consent Service" - description = Column(Text) - - # Technical details - service_type = Column(Enum(ServiceTypeEnum), nullable=False) - port = Column(Integer) # Primary port (if applicable) - technology_stack = Column(JSON) # e.g., ["Go", "Gin", "PostgreSQL"] - repository_path = Column(String(500)) # e.g., "/consent-service" - docker_image = Column(String(200)) # e.g., "breakpilot-pwa-consent-service" - - # Data categories handled - data_categories = Column(JSON) # e.g., ["personal_data", "consent_records"] - processes_pii = Column(Boolean, default=False) # Handles personally identifiable info? - processes_health_data = Column(Boolean, default=False) # Handles special category health data? - ai_components = Column(Boolean, default=False) # Contains AI/ML components? - - # Status - is_active = Column(Boolean, default=True) - criticality = Column(String(20), default="medium") # "critical", "high", "medium", "low" - - # Compliance aggregation - compliance_score = Column(Float) # Calculated score 0-100 - last_compliance_check = Column(DateTime) - - # Owner - owner_team = Column(String(100)) # e.g., "Backend Team" - owner_contact = Column(String(200)) # e.g., "backend@breakpilot.app" - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - regulation_mappings = relationship("ModuleRegulationMappingDB", back_populates="module", cascade="all, delete-orphan") - module_risks = relationship("ModuleRiskDB", back_populates="module", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_module_type_active', 'service_type', 'is_active'), - ) - - def __repr__(self): - return f"" - - -class ModuleRegulationMappingDB(Base): - """ - Maps services to applicable regulations with relevance level. - - Enables filtering: "Show all GDPR requirements for consent-service" - """ - __tablename__ = 'compliance_module_regulations' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - module_id = Column(String(36), ForeignKey('compliance_service_modules.id'), nullable=False, index=True) - regulation_id = Column(String(36), ForeignKey('compliance_regulations.id'), nullable=False, index=True) - - relevance_level = Column(Enum(RelevanceLevelEnum), nullable=False, default=RelevanceLevelEnum.MEDIUM) - notes = Column(Text) # Why this regulation applies - applicable_articles = Column(JSON) # List of specific articles that apply - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - module = relationship("ServiceModuleDB", back_populates="regulation_mappings") - regulation = relationship("RegulationDB") - - __table_args__ = ( - Index('ix_module_regulation', 'module_id', 'regulation_id', unique=True), - ) - - -class ModuleRiskDB(Base): - """ - Service-specific risks aggregated from requirements and controls. - """ - __tablename__ = 'compliance_module_risks' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - module_id = Column(String(36), ForeignKey('compliance_service_modules.id'), nullable=False, index=True) - risk_id = Column(String(36), ForeignKey('compliance_risks.id'), nullable=False, index=True) - - # Module-specific assessment - module_likelihood = Column(Integer) # 1-5, may differ from global - module_impact = Column(Integer) # 1-5, may differ from global - module_risk_level = Column(Enum(RiskLevelEnum)) - - assessment_notes = Column(Text) # Module-specific notes - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - module = relationship("ServiceModuleDB", back_populates="module_risks") - risk = relationship("RiskDB") - - __table_args__ = ( - Index('ix_module_risk', 'module_id', 'risk_id', unique=True), - ) - - -# ============================================================================ -# AUDIT SESSION & SIGN-OFF (Sprint 3 - Phase 3) -# ============================================================================ - -class AuditResultEnum(str, enum.Enum): - """Result of an audit sign-off for a requirement.""" - COMPLIANT = "compliant" # Fully compliant - COMPLIANT_WITH_NOTES = "compliant_notes" # Compliant with observations - NON_COMPLIANT = "non_compliant" # Not compliant - remediation required - NOT_APPLICABLE = "not_applicable" # Not applicable to this audit - PENDING = "pending" # Not yet reviewed - - -class AuditSessionStatusEnum(str, enum.Enum): - """Status of an audit session.""" - DRAFT = "draft" # Session created, not started - IN_PROGRESS = "in_progress" # Audit in progress - COMPLETED = "completed" # All items reviewed - ARCHIVED = "archived" # Historical record - - -class AuditSessionDB(Base): - """ - Audit session for structured compliance reviews. - - Enables auditors to: - - Create named audit sessions (e.g., "Q1 2026 GDPR Audit") - - Track progress through requirements - - Sign off individual items with digital signatures - - Generate audit reports - """ - __tablename__ = 'compliance_audit_sessions' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(200), nullable=False) # e.g., "Q1 2026 Compliance Audit" - description = Column(Text) - - # Auditor information - auditor_name = Column(String(100), nullable=False) # e.g., "Dr. Thomas Müller" - auditor_email = Column(String(200)) - auditor_organization = Column(String(200)) # External auditor company - - # Session scope - status = Column(Enum(AuditSessionStatusEnum), default=AuditSessionStatusEnum.DRAFT) - regulation_ids = Column(JSON) # Filter: ["GDPR", "AIACT"] or null for all - - # Progress tracking - total_items = Column(Integer, default=0) - completed_items = Column(Integer, default=0) - compliant_count = Column(Integer, default=0) - non_compliant_count = Column(Integer, default=0) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - started_at = Column(DateTime) # When audit began - completed_at = Column(DateTime) # When audit finished - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - signoffs = relationship("AuditSignOffDB", back_populates="session", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_audit_session_status', 'status'), - Index('ix_audit_session_auditor', 'auditor_name'), - ) - - def __repr__(self): - return f"" - - @property - def completion_percentage(self) -> float: - """Calculate completion percentage.""" - if self.total_items == 0: - return 0.0 - return round((self.completed_items / self.total_items) * 100, 1) - - -class AuditSignOffDB(Base): - """ - Individual sign-off for a requirement within an audit session. - - Features: - - Records audit result (compliant, non-compliant, etc.) - - Stores auditor notes and observations - - Creates digital signature (SHA-256 hash) for tamper evidence - """ - __tablename__ = 'compliance_audit_signoffs' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - session_id = Column(String(36), ForeignKey('compliance_audit_sessions.id'), nullable=False, index=True) - requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True) - - # Audit result - result = Column(Enum(AuditResultEnum), default=AuditResultEnum.PENDING) - notes = Column(Text) # Auditor observations - - # Evidence references for this sign-off - evidence_ids = Column(JSON) # List of evidence IDs reviewed - - # Digital signature (SHA-256 hash of result + auditor + timestamp) - signature_hash = Column(String(64)) # SHA-256 hex string - signed_at = Column(DateTime) - signed_by = Column(String(100)) # Auditor name at time of signing - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - session = relationship("AuditSessionDB", back_populates="signoffs") - requirement = relationship("RequirementDB") - - __table_args__ = ( - Index('ix_signoff_session_requirement', 'session_id', 'requirement_id', unique=True), - Index('ix_signoff_result', 'result'), - ) - - def __repr__(self): - return f"" - - def create_signature(self, auditor_name: str) -> str: - """ - Create a digital signature for this sign-off. - - Returns SHA-256 hash of: result + requirement_id + auditor_name + timestamp - """ - import hashlib - from datetime import datetime - - timestamp = datetime.utcnow().isoformat() - data = f"{self.result.value}|{self.requirement_id}|{auditor_name}|{timestamp}" - signature = hashlib.sha256(data.encode()).hexdigest() - - self.signature_hash = signature - self.signed_at = datetime.utcnow() - self.signed_by = auditor_name - - return signature - - -# ============================================================================ -# ISO 27001 ISMS MODELS (Kapitel 4-10) -# ============================================================================ - -class ApprovalStatusEnum(str, enum.Enum): - """Approval status for ISMS documents.""" - DRAFT = "draft" - UNDER_REVIEW = "under_review" - APPROVED = "approved" - SUPERSEDED = "superseded" - - -class FindingTypeEnum(str, enum.Enum): - """ISO 27001 audit finding classification.""" - MAJOR = "major" # Major nonconformity - blocks certification - MINOR = "minor" # Minor nonconformity - requires CAPA - OFI = "ofi" # Opportunity for Improvement - POSITIVE = "positive" # Positive observation - - -class FindingStatusEnum(str, enum.Enum): - """Status of an audit finding.""" - OPEN = "open" - IN_PROGRESS = "in_progress" - CORRECTIVE_ACTION_PENDING = "capa_pending" - VERIFICATION_PENDING = "verification_pending" - VERIFIED = "verified" - CLOSED = "closed" - - -class CAPATypeEnum(str, enum.Enum): - """Type of corrective/preventive action.""" - CORRECTIVE = "corrective" # Fix the nonconformity - PREVENTIVE = "preventive" # Prevent recurrence - BOTH = "both" - - -class ISMSScopeDB(Base): - """ - ISMS Scope Definition (ISO 27001 Kapitel 4.3) - - Defines the boundaries and applicability of the ISMS. - This is MANDATORY for certification. - """ - __tablename__ = 'compliance_isms_scope' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - version = Column(String(20), nullable=False, default="1.0") - - # Scope definition - scope_statement = Column(Text, nullable=False) # Main scope text - included_locations = Column(JSON) # List of locations - included_processes = Column(JSON) # List of processes - included_services = Column(JSON) # List of services/products - excluded_items = Column(JSON) # Explicitly excluded items - exclusion_justification = Column(Text) # Why items are excluded - - # Boundaries - organizational_boundary = Column(Text) # Legal entity, departments - physical_boundary = Column(Text) # Locations, networks - technical_boundary = Column(Text) # Systems, applications - - # Approval - status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) - approved_by = Column(String(100)) - approved_at = Column(DateTime) - approval_signature = Column(String(64)) # SHA-256 hash - - # Validity - effective_date = Column(Date) - review_date = Column(Date) # Next mandatory review - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - created_by = Column(String(100)) - updated_by = Column(String(100)) - - __table_args__ = ( - Index('ix_isms_scope_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class ISMSContextDB(Base): - """ - ISMS Context (ISO 27001 Kapitel 4.1, 4.2) - - Documents internal/external issues and interested parties. - """ - __tablename__ = 'compliance_isms_context' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - version = Column(String(20), nullable=False, default="1.0") - - # 4.1 Internal issues - internal_issues = Column(JSON) # List of {"issue": "", "impact": "", "treatment": ""} - - # 4.1 External issues - external_issues = Column(JSON) # List of {"issue": "", "impact": "", "treatment": ""} - - # 4.2 Interested parties - interested_parties = Column(JSON) # List of {"party": "", "requirements": [], "relevance": ""} - - # Legal/regulatory requirements - regulatory_requirements = Column(JSON) # DSGVO, AI Act, etc. - contractual_requirements = Column(JSON) # Customer contracts - - # Analysis - swot_strengths = Column(JSON) - swot_weaknesses = Column(JSON) - swot_opportunities = Column(JSON) - swot_threats = Column(JSON) - - # Approval - status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) - approved_by = Column(String(100)) - approved_at = Column(DateTime) - - # Review - last_reviewed_at = Column(DateTime) - next_review_date = Column(Date) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - def __repr__(self): - return f"" - - -class ISMSPolicyDB(Base): - """ - ISMS Policies (ISO 27001 Kapitel 5.2) - - Information security policy and sub-policies. - """ - __tablename__ = 'compliance_isms_policies' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - policy_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "POL-ISMS-001" - - # Policy details - title = Column(String(200), nullable=False) - policy_type = Column(String(50), nullable=False) # "master", "operational", "technical" - description = Column(Text) - policy_text = Column(Text, nullable=False) # Full policy content - - # Scope - applies_to = Column(JSON) # Roles, departments, systems - - # Document control - version = Column(String(20), nullable=False, default="1.0") - status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) - - # Approval chain - authored_by = Column(String(100)) - reviewed_by = Column(String(100)) - approved_by = Column(String(100)) # Must be top management - approved_at = Column(DateTime) - approval_signature = Column(String(64)) - - # Validity - effective_date = Column(Date) - review_frequency_months = Column(Integer, default=12) - next_review_date = Column(Date) - - # References - parent_policy_id = Column(String(36), ForeignKey('compliance_isms_policies.id')) - related_controls = Column(JSON) # List of control_ids - - # Document path - document_path = Column(String(500)) # Link to full document - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_policy_type_status', 'policy_type', 'status'), - ) - - def __repr__(self): - return f"" - - -class SecurityObjectiveDB(Base): - """ - Security Objectives (ISO 27001 Kapitel 6.2) - - Measurable information security objectives. - """ - __tablename__ = 'compliance_security_objectives' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - objective_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "OBJ-001" - - # Objective definition - title = Column(String(200), nullable=False) - description = Column(Text) - category = Column(String(50)) # "availability", "confidentiality", "integrity", "compliance" - - # SMART criteria - specific = Column(Text) # What exactly - measurable = Column(Text) # How measured - achievable = Column(Text) # Is it realistic - relevant = Column(Text) # Why important - time_bound = Column(Text) # Deadline - - # Metrics - kpi_name = Column(String(100)) - kpi_target = Column(String(100)) # Target value - kpi_current = Column(String(100)) # Current value - kpi_unit = Column(String(50)) # %, count, score - measurement_frequency = Column(String(50)) # monthly, quarterly - - # Responsibility - owner = Column(String(100)) - accountable = Column(String(100)) # RACI: Accountable - - # Status - status = Column(String(30), default="active") # active, achieved, not_achieved, cancelled - progress_percentage = Column(Integer, default=0) - - # Timeline - target_date = Column(Date) - achieved_date = Column(Date) - - # Linked items - related_controls = Column(JSON) - related_risks = Column(JSON) - - # Approval - approved_by = Column(String(100)) - approved_at = Column(DateTime) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_objective_status', 'status'), - Index('ix_objective_category', 'category'), - ) - - def __repr__(self): - return f"" - - -class StatementOfApplicabilityDB(Base): - """ - Statement of Applicability (SoA) - ISO 27001 Anhang A Mapping - - Documents which Annex A controls are applicable and why. - This is MANDATORY for certification. - """ - __tablename__ = 'compliance_soa' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - # ISO 27001:2022 Annex A reference - annex_a_control = Column(String(20), nullable=False, index=True) # e.g., "A.5.1" - annex_a_title = Column(String(300), nullable=False) - annex_a_category = Column(String(100)) # "Organizational", "People", "Physical", "Technological" - - # Applicability decision - is_applicable = Column(Boolean, nullable=False) - applicability_justification = Column(Text, nullable=False) # MUST be documented - - # Implementation status - implementation_status = Column(String(30), default="planned") # planned, partial, implemented, not_implemented - implementation_notes = Column(Text) - - # Mapping to our controls - breakpilot_control_ids = Column(JSON) # List of our control_ids that address this - coverage_level = Column(String(20), default="full") # full, partial, planned - - # Evidence - evidence_description = Column(Text) - evidence_ids = Column(JSON) # Links to EvidenceDB - - # Risk-based justification (for exclusions) - risk_assessment_notes = Column(Text) # If not applicable, explain why - compensating_controls = Column(Text) # If partial, explain compensating measures - - # Approval - reviewed_by = Column(String(100)) - reviewed_at = Column(DateTime) - approved_by = Column(String(100)) - approved_at = Column(DateTime) - - # Version tracking - version = Column(String(20), default="1.0") - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_soa_annex_control', 'annex_a_control', unique=True), - Index('ix_soa_applicable', 'is_applicable'), - Index('ix_soa_status', 'implementation_status'), - ) - - def __repr__(self): - return f"" - - -class AuditFindingDB(Base): - """ - Audit Finding with ISO 27001 Classification (Major/Minor/OFI) - - Tracks findings from internal and external audits with proper - classification and CAPA workflow. - """ - __tablename__ = 'compliance_audit_findings' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - finding_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "FIND-2026-001" - - # Source - audit_session_id = Column(String(36), ForeignKey('compliance_audit_sessions.id'), index=True) - internal_audit_id = Column(String(36), ForeignKey('compliance_internal_audits.id'), index=True) - - # Classification (CRITICAL for ISO 27001!) - finding_type = Column(Enum(FindingTypeEnum), nullable=False) - - # ISO reference - iso_chapter = Column(String(20)) # e.g., "6.1.2", "9.2" - annex_a_control = Column(String(20)) # e.g., "A.8.2" - - # Finding details - title = Column(String(300), nullable=False) - description = Column(Text, nullable=False) - objective_evidence = Column(Text, nullable=False) # What the auditor observed - - # Root cause analysis - root_cause = Column(Text) - root_cause_method = Column(String(50)) # "5-why", "fishbone", "pareto" - - # Impact assessment - impact_description = Column(Text) - affected_processes = Column(JSON) - affected_assets = Column(JSON) - - # Status tracking - status = Column(Enum(FindingStatusEnum), default=FindingStatusEnum.OPEN) - - # Responsibility - owner = Column(String(100)) # Person responsible for closure - auditor = Column(String(100)) # Auditor who raised finding - - # Dates - identified_date = Column(Date, nullable=False, default=date.today) - due_date = Column(Date) # Deadline for closure - closed_date = Column(Date) - - # Verification - verification_method = Column(Text) - verified_by = Column(String(100)) - verified_at = Column(DateTime) - verification_evidence = Column(Text) - - # Closure - closure_notes = Column(Text) - closed_by = Column(String(100)) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - corrective_actions = relationship("CorrectiveActionDB", back_populates="finding", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_finding_type_status', 'finding_type', 'status'), - Index('ix_finding_due_date', 'due_date'), - ) - - def __repr__(self): - return f"" - - @property - def is_blocking(self) -> bool: - """Major findings block certification.""" - return self.finding_type == FindingTypeEnum.MAJOR and self.status != FindingStatusEnum.CLOSED - - -class CorrectiveActionDB(Base): - """ - Corrective & Preventive Actions (CAPA) - ISO 27001 10.1 - - Tracks actions taken to address nonconformities. - """ - __tablename__ = 'compliance_corrective_actions' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - capa_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "CAPA-2026-001" - - # Link to finding - finding_id = Column(String(36), ForeignKey('compliance_audit_findings.id'), nullable=False, index=True) - - # Type - capa_type = Column(Enum(CAPATypeEnum), nullable=False) - - # Action details - title = Column(String(300), nullable=False) - description = Column(Text, nullable=False) - expected_outcome = Column(Text) - - # Responsibility - assigned_to = Column(String(100), nullable=False) - approved_by = Column(String(100)) - - # Timeline - planned_start = Column(Date) - planned_completion = Column(Date, nullable=False) - actual_completion = Column(Date) - - # Status - status = Column(String(30), default="planned") # planned, in_progress, completed, verified, cancelled - progress_percentage = Column(Integer, default=0) - - # Resources - estimated_effort_hours = Column(Integer) - actual_effort_hours = Column(Integer) - resources_required = Column(Text) - - # Evidence of implementation - implementation_evidence = Column(Text) - evidence_ids = Column(JSON) - - # Effectiveness review - effectiveness_criteria = Column(Text) - effectiveness_verified = Column(Boolean, default=False) - effectiveness_verification_date = Column(Date) - effectiveness_notes = Column(Text) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - finding = relationship("AuditFindingDB", back_populates="corrective_actions") - - __table_args__ = ( - Index('ix_capa_status', 'status'), - Index('ix_capa_due', 'planned_completion'), - ) - - def __repr__(self): - return f"" - - -class ManagementReviewDB(Base): - """ - Management Review (ISO 27001 Kapitel 9.3) - - Records mandatory management reviews of the ISMS. - """ - __tablename__ = 'compliance_management_reviews' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - review_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "MR-2026-Q1" - - # Review details - title = Column(String(200), nullable=False) - review_date = Column(Date, nullable=False) - review_period_start = Column(Date) # Period being reviewed - review_period_end = Column(Date) - - # Participants - chairperson = Column(String(100), nullable=False) # Usually top management - attendees = Column(JSON) # List of {"name": "", "role": ""} - - # 9.3 Review Inputs (mandatory!) - input_previous_actions = Column(Text) # Status of previous review actions - input_isms_changes = Column(Text) # Changes in internal/external issues - input_security_performance = Column(Text) # Nonconformities, monitoring, audit results - input_interested_party_feedback = Column(Text) - input_risk_assessment_results = Column(Text) - input_improvement_opportunities = Column(Text) - - # Additional inputs - input_policy_effectiveness = Column(Text) - input_objective_achievement = Column(Text) - input_resource_adequacy = Column(Text) - - # 9.3 Review Outputs (mandatory!) - output_improvement_decisions = Column(Text) # Decisions for improvement - output_isms_changes = Column(Text) # Changes needed to ISMS - output_resource_needs = Column(Text) # Resource requirements - - # Action items - action_items = Column(JSON) # List of {"action": "", "owner": "", "due_date": ""} - - # Overall assessment - isms_effectiveness_rating = Column(String(20)) # "effective", "partially_effective", "not_effective" - key_decisions = Column(Text) - - # Approval - status = Column(String(30), default="draft") # draft, conducted, approved - approved_by = Column(String(100)) - approved_at = Column(DateTime) - minutes_document_path = Column(String(500)) # Link to meeting minutes - - # Next review - next_review_date = Column(Date) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_mgmt_review_date', 'review_date'), - Index('ix_mgmt_review_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class InternalAuditDB(Base): - """ - Internal Audit (ISO 27001 Kapitel 9.2) - - Tracks internal audit program and individual audits. - """ - __tablename__ = 'compliance_internal_audits' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - audit_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "IA-2026-001" - - # Audit details - title = Column(String(200), nullable=False) - audit_type = Column(String(50), nullable=False) # "scheduled", "surveillance", "special" - - # Scope - scope_description = Column(Text, nullable=False) - iso_chapters_covered = Column(JSON) # e.g., ["4", "5", "6.1"] - annex_a_controls_covered = Column(JSON) # e.g., ["A.5", "A.6"] - processes_covered = Column(JSON) - departments_covered = Column(JSON) - - # Audit criteria - criteria = Column(Text) # Standards, policies being audited against - - # Timeline - planned_date = Column(Date, nullable=False) - actual_start_date = Column(Date) - actual_end_date = Column(Date) - - # Audit team - lead_auditor = Column(String(100), nullable=False) - audit_team = Column(JSON) # List of auditor names - auditee_representatives = Column(JSON) # Who was interviewed - - # Status - status = Column(String(30), default="planned") # planned, in_progress, completed, cancelled - - # Results summary - total_findings = Column(Integer, default=0) - major_findings = Column(Integer, default=0) - minor_findings = Column(Integer, default=0) - ofi_count = Column(Integer, default=0) - positive_observations = Column(Integer, default=0) - - # Conclusion - audit_conclusion = Column(Text) - overall_assessment = Column(String(30)) # "conforming", "minor_nc", "major_nc" - - # Report - report_date = Column(Date) - report_document_path = Column(String(500)) - - # Sign-off - report_approved_by = Column(String(100)) - report_approved_at = Column(DateTime) - - # Follow-up - follow_up_audit_required = Column(Boolean, default=False) - follow_up_audit_id = Column(String(36)) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - findings = relationship("AuditFindingDB", backref="internal_audit", foreign_keys=[AuditFindingDB.internal_audit_id]) - - __table_args__ = ( - Index('ix_internal_audit_date', 'planned_date'), - Index('ix_internal_audit_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class AuditTrailDB(Base): - """ - Comprehensive Audit Trail for ISMS Changes - - Tracks all changes to compliance-relevant data for - accountability and forensic analysis. - """ - __tablename__ = 'compliance_audit_trail' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - # What changed - entity_type = Column(String(50), nullable=False, index=True) # "control", "risk", "policy", etc. - entity_id = Column(String(36), nullable=False, index=True) - entity_name = Column(String(200)) # Human-readable identifier - - # Action - action = Column(String(20), nullable=False) # "create", "update", "delete", "approve", "sign" - - # Change details - field_changed = Column(String(100)) # Which field (for updates) - old_value = Column(Text) - new_value = Column(Text) - change_summary = Column(Text) # Human-readable summary - - # Who & When - performed_by = Column(String(100), nullable=False) - performed_at = Column(DateTime, nullable=False, default=datetime.utcnow) - - # Context - ip_address = Column(String(45)) - user_agent = Column(String(500)) - session_id = Column(String(100)) - - # Integrity - checksum = Column(String(64)) # SHA-256 of the change - - # Timestamps (immutable after creation) - created_at = Column(DateTime, nullable=False, default=datetime.utcnow) - - __table_args__ = ( - Index('ix_audit_trail_entity', 'entity_type', 'entity_id'), - Index('ix_audit_trail_time', 'performed_at'), - Index('ix_audit_trail_user', 'performed_by'), - ) - - def __repr__(self): - return f"" - - -class ISMSReadinessCheckDB(Base): - """ - ISMS Readiness Check Results - - Stores automated pre-audit checks to identify potential - Major findings before external audit. - """ - __tablename__ = 'compliance_isms_readiness' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - # Check run - check_date = Column(DateTime, nullable=False, default=datetime.utcnow) - triggered_by = Column(String(100)) # "scheduled", "manual", "pre-audit" - - # Overall status - overall_status = Column(String(20), nullable=False) # "ready", "at_risk", "not_ready" - certification_possible = Column(Boolean, nullable=False) - - # Chapter-by-chapter status (ISO 27001) - chapter_4_status = Column(String(20)) # Context - chapter_5_status = Column(String(20)) # Leadership - chapter_6_status = Column(String(20)) # Planning - chapter_7_status = Column(String(20)) # Support - chapter_8_status = Column(String(20)) # Operation - chapter_9_status = Column(String(20)) # Performance - chapter_10_status = Column(String(20)) # Improvement - - # Potential Major findings - potential_majors = Column(JSON) # List of {"check": "", "status": "", "recommendation": ""} - - # Potential Minor findings - potential_minors = Column(JSON) - - # Improvement opportunities - improvement_opportunities = Column(JSON) - - # Scores - readiness_score = Column(Float) # 0-100 - documentation_score = Column(Float) - implementation_score = Column(Float) - evidence_score = Column(Float) - - # Recommendations - priority_actions = Column(JSON) # List of recommended actions before audit - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - - __table_args__ = ( - Index('ix_readiness_date', 'check_date'), - Index('ix_readiness_status', 'overall_status'), - ) - - def __repr__(self): - return f"" diff --git a/backend-compliance/compliance/db/regulation_models.py b/backend-compliance/compliance/db/regulation_models.py new file mode 100644 index 0000000..38c60bd --- /dev/null +++ b/backend-compliance/compliance/db/regulation_models.py @@ -0,0 +1,134 @@ +""" +Regulation & Requirement models — extracted from compliance/db/models.py. + +The foundational compliance aggregate: regulations (GDPR, AI Act, CRA, ...) and +the individual requirements they contain. Re-exported from +``compliance.db.models`` for backwards compatibility. + +DO NOT change __tablename__, column names, or relationship strings. +""" + +import uuid +import enum +from datetime import datetime, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, Boolean, DateTime, Date, + ForeignKey, Enum, JSON, Index, +) +from sqlalchemy.orm import relationship + +from classroom_engine.database import Base + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class RegulationTypeEnum(str, enum.Enum): + """Type of regulation/standard.""" + EU_REGULATION = "eu_regulation" # Directly applicable EU law + EU_DIRECTIVE = "eu_directive" # Requires national implementation + DE_LAW = "de_law" # German national law + BSI_STANDARD = "bsi_standard" # BSI technical guidelines + INDUSTRY_STANDARD = "industry_standard" # ISO, OWASP, etc. + + +# ============================================================================ +# MODELS +# ============================================================================ + +class RegulationDB(Base): + """ + Represents a regulation, directive, or standard. + + Examples: GDPR, AI Act, CRA, BSI-TR-03161 + """ + __tablename__ = 'compliance_regulations' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + code = Column(String(20), unique=True, nullable=False, index=True) # e.g., "GDPR", "AIACT" + name = Column(String(200), nullable=False) # Short name + full_name = Column(Text) # Full official name + regulation_type = Column(Enum(RegulationTypeEnum), nullable=False) + source_url = Column(String(500)) # EUR-Lex URL or similar + local_pdf_path = Column(String(500)) # Local PDF if available + effective_date = Column(Date) # When it came into force + description = Column(Text) # Brief description + is_active = Column(Boolean, default=True) + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + requirements = relationship("RequirementDB", back_populates="regulation", cascade="all, delete-orphan") + + def __repr__(self): + return f"" + + +class RequirementDB(Base): + """ + Individual requirement from a regulation. + + Examples: GDPR Art. 32(1)(a), AI Act Art. 9, BSI-TR O.Auth_1 + """ + __tablename__ = 'compliance_requirements' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + regulation_id = Column(String(36), ForeignKey('compliance_regulations.id'), nullable=False, index=True) + + # Requirement identification + article = Column(String(50), nullable=False) # e.g., "Art. 32", "O.Auth_1" + paragraph = Column(String(20)) # e.g., "(1)(a)" + requirement_id_external = Column(String(50)) # External ID (e.g., BSI ID) + title = Column(String(300), nullable=False) # Requirement title + description = Column(Text) # Brief description + requirement_text = Column(Text) # Original text from regulation + + # Breakpilot-specific interpretation and implementation + breakpilot_interpretation = Column(Text) # How Breakpilot interprets this + implementation_status = Column(String(30), default="not_started") # not_started, in_progress, implemented, verified + implementation_details = Column(Text) # How we implemented it + code_references = Column(JSON) # List of {"file": "...", "line": ..., "description": "..."} + documentation_links = Column(JSON) # List of internal doc links + + # Evidence for auditors + evidence_description = Column(Text) # What evidence proves compliance + evidence_artifacts = Column(JSON) # List of {"type": "...", "path": "...", "description": "..."} + + # Audit-specific fields + auditor_notes = Column(Text) # Notes from auditor review + audit_status = Column(String(30), default="pending") # pending, in_review, approved, rejected + last_audit_date = Column(DateTime) + last_auditor = Column(String(100)) + + is_applicable = Column(Boolean, default=True) # Applicable to Breakpilot? + applicability_reason = Column(Text) # Why/why not applicable + + priority = Column(Integer, default=2) # 1=Critical, 2=High, 3=Medium + + # Source document reference + source_page = Column(Integer) # Page number in source document + source_section = Column(String(100)) # Section in source document + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + regulation = relationship("RegulationDB", back_populates="requirements") + control_mappings = relationship("ControlMappingDB", back_populates="requirement", cascade="all, delete-orphan") + + __table_args__ = ( + Index('ix_requirement_regulation_article', 'regulation_id', 'article'), + Index('ix_requirement_audit_status', 'audit_status'), + Index('ix_requirement_impl_status', 'implementation_status'), + ) + + def __repr__(self): + return f"" + + +__all__ = ["RegulationTypeEnum", "RegulationDB", "RequirementDB"] diff --git a/backend-compliance/compliance/db/service_module_models.py b/backend-compliance/compliance/db/service_module_models.py new file mode 100644 index 0000000..475c82a --- /dev/null +++ b/backend-compliance/compliance/db/service_module_models.py @@ -0,0 +1,176 @@ +""" +Service Module Registry models — extracted from compliance/db/models.py. + +Sprint 3: registry of all Breakpilot services/modules for compliance mapping, +per-module regulation applicability, and per-module risk aggregation. +Re-exported from ``compliance.db.models`` for backwards compatibility. + +DO NOT change __tablename__, column names, or relationship strings. +""" + +import uuid +import enum +from datetime import datetime, timezone + +from sqlalchemy import ( + Column, String, Text, Integer, Boolean, DateTime, + ForeignKey, Enum, JSON, Index, Float, +) +from sqlalchemy.orm import relationship + +from classroom_engine.database import Base +# RiskLevelEnum is re-used across aggregates; sourced here from control_models. +from compliance.db.control_models import RiskLevelEnum # noqa: F401 + + +# ============================================================================ +# ENUMS +# ============================================================================ + +class ServiceTypeEnum(str, enum.Enum): + """Type of Breakpilot service/module.""" + BACKEND = "backend" # API/Backend services + DATABASE = "database" # Data storage + AI = "ai" # AI/ML services + COMMUNICATION = "communication" # Chat/Video/Messaging + STORAGE = "storage" # File/Object storage + INFRASTRUCTURE = "infrastructure" # Load balancer, reverse proxy + MONITORING = "monitoring" # Logging, metrics + SECURITY = "security" # Auth, encryption, secrets + + +class RelevanceLevelEnum(str, enum.Enum): + """Relevance level of a regulation to a service.""" + CRITICAL = "critical" # Non-compliance = shutdown + HIGH = "high" # Major risk + MEDIUM = "medium" # Moderate risk + LOW = "low" # Minor risk + + +# ============================================================================ +# MODELS +# ============================================================================ + +class ServiceModuleDB(Base): + """ + Registry of all Breakpilot services/modules for compliance mapping. + + Tracks which regulations apply to which services, enabling: + - Service-specific compliance views + - Aggregated risk per service + - Gap analysis by module + """ + __tablename__ = 'compliance_service_modules' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + name = Column(String(100), unique=True, nullable=False, index=True) # e.g., "consent-service" + display_name = Column(String(200), nullable=False) # e.g., "Go Consent Service" + description = Column(Text) + + # Technical details + service_type = Column(Enum(ServiceTypeEnum), nullable=False) + port = Column(Integer) # Primary port (if applicable) + technology_stack = Column(JSON) # e.g., ["Go", "Gin", "PostgreSQL"] + repository_path = Column(String(500)) # e.g., "/consent-service" + docker_image = Column(String(200)) # e.g., "breakpilot-pwa-consent-service" + + # Data categories handled + data_categories = Column(JSON) # e.g., ["personal_data", "consent_records"] + processes_pii = Column(Boolean, default=False) # Handles personally identifiable info? + processes_health_data = Column(Boolean, default=False) # Handles special category health data? + ai_components = Column(Boolean, default=False) # Contains AI/ML components? + + # Status + is_active = Column(Boolean, default=True) + criticality = Column(String(20), default="medium") # "critical", "high", "medium", "low" + + # Compliance aggregation + compliance_score = Column(Float) # Calculated score 0-100 + last_compliance_check = Column(DateTime) + + # Owner + owner_team = Column(String(100)) # e.g., "Backend Team" + owner_contact = Column(String(200)) # e.g., "backend@breakpilot.app" + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + regulation_mappings = relationship("ModuleRegulationMappingDB", back_populates="module", cascade="all, delete-orphan") + module_risks = relationship("ModuleRiskDB", back_populates="module", cascade="all, delete-orphan") + + __table_args__ = ( + Index('ix_module_type_active', 'service_type', 'is_active'), + ) + + def __repr__(self): + return f"" + + +class ModuleRegulationMappingDB(Base): + """ + Maps services to applicable regulations with relevance level. + + Enables filtering: "Show all GDPR requirements for consent-service" + """ + __tablename__ = 'compliance_module_regulations' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + module_id = Column(String(36), ForeignKey('compliance_service_modules.id'), nullable=False, index=True) + regulation_id = Column(String(36), ForeignKey('compliance_regulations.id'), nullable=False, index=True) + + relevance_level = Column(Enum(RelevanceLevelEnum), nullable=False, default=RelevanceLevelEnum.MEDIUM) + notes = Column(Text) # Why this regulation applies + applicable_articles = Column(JSON) # List of specific articles that apply + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + module = relationship("ServiceModuleDB", back_populates="regulation_mappings") + regulation = relationship("RegulationDB") + + __table_args__ = ( + Index('ix_module_regulation', 'module_id', 'regulation_id', unique=True), + ) + + +class ModuleRiskDB(Base): + """ + Service-specific risks aggregated from requirements and controls. + """ + __tablename__ = 'compliance_module_risks' + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + module_id = Column(String(36), ForeignKey('compliance_service_modules.id'), nullable=False, index=True) + risk_id = Column(String(36), ForeignKey('compliance_risks.id'), nullable=False, index=True) + + # Module-specific assessment + module_likelihood = Column(Integer) # 1-5, may differ from global + module_impact = Column(Integer) # 1-5, may differ from global + module_risk_level = Column(Enum(RiskLevelEnum)) + + assessment_notes = Column(Text) # Module-specific notes + + # Timestamps + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + # Relationships + module = relationship("ServiceModuleDB", back_populates="module_risks") + risk = relationship("RiskDB") + + __table_args__ = ( + Index('ix_module_risk', 'module_id', 'risk_id', unique=True), + ) + + +__all__ = [ + "ServiceTypeEnum", + "RelevanceLevelEnum", + "ServiceModuleDB", + "ModuleRegulationMappingDB", + "ModuleRiskDB", +]