From 3702f7075463c7ed383e607b64be8d0af82ec369 Mon Sep 17 00:00:00 2001 From: Sharang Parnerkar <30073382+mighty840@users.noreply.github.com> Date: Mon, 20 Apr 2026 07:50:29 +0200 Subject: [PATCH] fix: strip duplicate inline class definitions from db shim files models.py and repository.py are backwards-compat re-export shims from Phase 1. Both files still contained the original 1466/1547 line class definitions below the re-export block. These inline definitions shadowed the correctly-imported sub-module versions and failed at import time because Column, AuditResultEnum, etc. were no longer in scope. Fix: - models.py: remove all duplicate Base-subclass definitions (lines 209- 1581). Retain EvidenceConfidenceEnum and EvidenceTruthStatusEnum (unique to this shim, not yet extracted to a sub-module) and the two models that have no sub-module yet: LLMGenerationAuditDB and AssertionDB. Add back the SQLAlchemy column-type imports those two models need. - repository.py: remove all duplicate Repository class definitions (lines 40-1692). All classes are now fully provided by the sub-repositories. Result: 172 pytest tests pass, import OK. --- backend-compliance/compliance/db/models.py | 1377 +------------- .../compliance/db/repository.py | 1654 ----------------- 2 files changed, 9 insertions(+), 3022 deletions(-) diff --git a/backend-compliance/compliance/db/models.py b/backend-compliance/compliance/db/models.py index b2dc19e..e8edf4e 100644 --- a/backend-compliance/compliance/db/models.py +++ b/backend-compliance/compliance/db/models.py @@ -26,6 +26,14 @@ module and re-export here. """ import enum # noqa: F401 — used by inline enum classes below +import uuid +from datetime import datetime + +from sqlalchemy import ( # noqa: F401 — used by inline model classes below + Boolean, Column, Date, DateTime, Enum, Float, + ForeignKey, Index, Integer, JSON, String, Text, +) +from sqlalchemy.orm import backref # noqa: F401 # Order matters: later modules reference classes defined in earlier ones via # SQLAlchemy string relationships. Keep foundational aggregates first. @@ -198,1379 +206,12 @@ class RelevanceLevelEnum(str, enum.Enum): LOW = "low" # Minor risk -# ============================================================================ -# MODELS -# ============================================================================ - -class RegulationDB(Base): - """ - Represents a regulation, directive, or standard. - - Examples: GDPR, AI Act, CRA, BSI-TR-03161 - """ - __tablename__ = 'compliance_regulations' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - code = Column(String(20), unique=True, nullable=False, index=True) # e.g., "GDPR", "AIACT" - name = Column(String(200), nullable=False) # Short name - full_name = Column(Text) # Full official name - regulation_type = Column(Enum(RegulationTypeEnum), nullable=False) - source_url = Column(String(500)) # EUR-Lex URL or similar - local_pdf_path = Column(String(500)) # Local PDF if available - effective_date = Column(Date) # When it came into force - description = Column(Text) # Brief description - is_active = Column(Boolean, default=True) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - requirements = relationship("RequirementDB", back_populates="regulation", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - - -class RequirementDB(Base): - """ - Individual requirement from a regulation. - - Examples: GDPR Art. 32(1)(a), AI Act Art. 9, BSI-TR O.Auth_1 - """ - __tablename__ = 'compliance_requirements' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - regulation_id = Column(String(36), ForeignKey('compliance_regulations.id'), nullable=False, index=True) - - # Requirement identification - article = Column(String(50), nullable=False) # e.g., "Art. 32", "O.Auth_1" - paragraph = Column(String(20)) # e.g., "(1)(a)" - requirement_id_external = Column(String(50)) # External ID (e.g., BSI ID) - title = Column(String(300), nullable=False) # Requirement title - description = Column(Text) # Brief description - requirement_text = Column(Text) # Original text from regulation - - # Breakpilot-specific interpretation and implementation - breakpilot_interpretation = Column(Text) # How Breakpilot interprets this - implementation_status = Column(String(30), default="not_started") # not_started, in_progress, implemented, verified - implementation_details = Column(Text) # How we implemented it - code_references = Column(JSON) # List of {"file": "...", "line": ..., "description": "..."} - documentation_links = Column(JSON) # List of internal doc links - - # Evidence for auditors - evidence_description = Column(Text) # What evidence proves compliance - evidence_artifacts = Column(JSON) # List of {"type": "...", "path": "...", "description": "..."} - - # Audit-specific fields - auditor_notes = Column(Text) # Notes from auditor review - audit_status = Column(String(30), default="pending") # pending, in_review, approved, rejected - last_audit_date = Column(DateTime) - last_auditor = Column(String(100)) - - is_applicable = Column(Boolean, default=True) # Applicable to Breakpilot? - applicability_reason = Column(Text) # Why/why not applicable - - priority = Column(Integer, default=2) # 1=Critical, 2=High, 3=Medium - - # Source document reference - source_page = Column(Integer) # Page number in source document - source_section = Column(String(100)) # Section in source document - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - regulation = relationship("RegulationDB", back_populates="requirements") - control_mappings = relationship("ControlMappingDB", back_populates="requirement", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_requirement_regulation_article', 'regulation_id', 'article'), - Index('ix_requirement_audit_status', 'audit_status'), - Index('ix_requirement_impl_status', 'implementation_status'), - ) - - def __repr__(self): - return f"" - - -class ControlDB(Base): - """ - Technical or organizational security control. - - Examples: PRIV-001 (Verarbeitungsverzeichnis), SDLC-001 (SAST Scanning) - """ - __tablename__ = 'compliance_controls' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - control_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "PRIV-001" - - domain = Column(Enum(ControlDomainEnum), nullable=False, index=True) - control_type = Column(Enum(ControlTypeEnum), nullable=False) - - title = Column(String(300), nullable=False) - description = Column(Text) - pass_criteria = Column(Text, nullable=False) # Measurable pass criteria - implementation_guidance = Column(Text) # How to implement - - # Code/Evidence references - code_reference = Column(String(500)) # e.g., "backend/middleware/pii_redactor.py:45" - documentation_url = Column(String(500)) # Link to internal docs - - # Automation - is_automated = Column(Boolean, default=False) - automation_tool = Column(String(100)) # e.g., "Semgrep", "Trivy" - automation_config = Column(JSON) # Tool-specific config - - # Status - status = Column(Enum(ControlStatusEnum), default=ControlStatusEnum.PLANNED) - status_notes = Column(Text) - status_justification = Column(Text) # Required for n/a transitions - - # Ownership & Review - owner = Column(String(100)) # Responsible person/team - review_frequency_days = Column(Integer, default=90) - last_reviewed_at = Column(DateTime) - next_review_at = Column(DateTime) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - mappings = relationship("ControlMappingDB", back_populates="control", cascade="all, delete-orphan") - evidence = relationship("EvidenceDB", back_populates="control", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_control_domain_status', 'domain', 'status'), - ) - - def __repr__(self): - return f"" - - -class ControlMappingDB(Base): - """ - Maps requirements to controls (many-to-many with metadata). - """ - __tablename__ = 'compliance_control_mappings' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True) - control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True) - - coverage_level = Column(String(20), default="full") # "full", "partial", "planned" - notes = Column(Text) # Explanation of coverage - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - requirement = relationship("RequirementDB", back_populates="control_mappings") - control = relationship("ControlDB", back_populates="mappings") - - __table_args__ = ( - Index('ix_mapping_req_ctrl', 'requirement_id', 'control_id', unique=True), - ) - - -class EvidenceDB(Base): - """ - Audit evidence for controls. - - Types: scan_report, policy_document, config_snapshot, test_result, - manual_upload, screenshot, external_link - """ - __tablename__ = 'compliance_evidence' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - control_id = Column(String(36), ForeignKey('compliance_controls.id'), nullable=False, index=True) - - evidence_type = Column(String(50), nullable=False) # Type of evidence - title = Column(String(300), nullable=False) - description = Column(Text) - - # File/Link storage - artifact_path = Column(String(500)) # Local file path - artifact_url = Column(String(500)) # External URL - artifact_hash = Column(String(64)) # SHA-256 hash - file_size_bytes = Column(Integer) - mime_type = Column(String(100)) - - # Validity period - valid_from = Column(DateTime, nullable=False, default=datetime.utcnow) - valid_until = Column(DateTime) # NULL = no expiry - status = Column(Enum(EvidenceStatusEnum), default=EvidenceStatusEnum.VALID) - - # Source tracking - source = Column(String(100)) # "ci_pipeline", "manual", "api" - ci_job_id = Column(String(100)) # CI/CD job reference - uploaded_by = Column(String(100)) # User who uploaded - - # Anti-Fake-Evidence: Confidence & Truth tracking - confidence_level = Column(Enum(EvidenceConfidenceEnum), default=EvidenceConfidenceEnum.E1) - truth_status = Column(Enum(EvidenceTruthStatusEnum), default=EvidenceTruthStatusEnum.UPLOADED) - generation_mode = Column(String(100)) # e.g. "draft_assistance", "auto_generation" - may_be_used_as_evidence = Column(Boolean, default=True) - reviewed_by = Column(String(200)) - reviewed_at = Column(DateTime) - - # Anti-Fake-Evidence Phase 2: Four-Eyes review - approval_status = Column(String(30), default="none") - first_reviewer = Column(String(200)) - first_reviewed_at = Column(DateTime) - second_reviewer = Column(String(200)) - second_reviewed_at = Column(DateTime) - requires_four_eyes = Column(Boolean, default=False) - - # Timestamps - collected_at = Column(DateTime, default=datetime.utcnow) - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - control = relationship("ControlDB", back_populates="evidence") - - __table_args__ = ( - Index('ix_evidence_control_type', 'control_id', 'evidence_type'), - Index('ix_evidence_status', 'status'), - Index('ix_evidence_approval_status', 'approval_status'), - ) - - def __repr__(self): - return f"" - - -class RiskDB(Base): - """ - Risk register entry with likelihood x impact scoring. - """ - __tablename__ = 'compliance_risks' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - risk_id = Column(String(20), unique=True, nullable=False, index=True) # e.g., "RISK-001" - - title = Column(String(300), nullable=False) - description = Column(Text) - category = Column(String(50), nullable=False) # "data_breach", "compliance_gap", etc. - - # Inherent risk (before controls) - likelihood = Column(Integer, nullable=False) # 1-5 - impact = Column(Integer, nullable=False) # 1-5 - inherent_risk = Column(Enum(RiskLevelEnum), nullable=False) - - # Mitigating controls - mitigating_controls = Column(JSON) # List of control_ids - - # Residual risk (after controls) - residual_likelihood = Column(Integer) - residual_impact = Column(Integer) - residual_risk = Column(Enum(RiskLevelEnum)) - - # Management - owner = Column(String(100)) - status = Column(String(20), default="open") # "open", "mitigated", "accepted", "transferred" - treatment_plan = Column(Text) - - # Review - identified_date = Column(Date, default=date.today) - review_date = Column(Date) - last_assessed_at = Column(DateTime) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_risk_category_status', 'category', 'status'), - Index('ix_risk_inherent', 'inherent_risk'), - ) - - def __repr__(self): - return f"" - - @staticmethod - def calculate_risk_level(likelihood: int, impact: int) -> RiskLevelEnum: - """Calculate risk level from likelihood x impact matrix.""" - score = likelihood * impact - if score >= 20: - return RiskLevelEnum.CRITICAL - elif score >= 12: - return RiskLevelEnum.HIGH - elif score >= 6: - return RiskLevelEnum.MEDIUM - else: - return RiskLevelEnum.LOW - - -class AIClassificationEnum(str, enum.Enum): - """AI Act risk classification.""" - PROHIBITED = "prohibited" - HIGH_RISK = "high-risk" - LIMITED_RISK = "limited-risk" - MINIMAL_RISK = "minimal-risk" - UNCLASSIFIED = "unclassified" - - -class AISystemStatusEnum(str, enum.Enum): - """Status of an AI system in compliance tracking.""" - DRAFT = "draft" - CLASSIFIED = "classified" - COMPLIANT = "compliant" - NON_COMPLIANT = "non-compliant" - - -class AISystemDB(Base): - """ - AI System registry for AI Act compliance. - Tracks AI systems, their risk classification, and compliance status. - """ - __tablename__ = 'compliance_ai_systems' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(300), nullable=False) - description = Column(Text) - purpose = Column(String(500)) - sector = Column(String(100)) - - # AI Act classification - classification = Column(Enum(AIClassificationEnum), default=AIClassificationEnum.UNCLASSIFIED) - status = Column(Enum(AISystemStatusEnum), default=AISystemStatusEnum.DRAFT) - - # Assessment - assessment_date = Column(DateTime) - assessment_result = Column(JSON) # Full assessment result - obligations = Column(JSON) # List of AI Act obligations - risk_factors = Column(JSON) # Risk factors from assessment - recommendations = Column(JSON) # Recommendations from assessment - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_ai_system_classification', 'classification'), - Index('ix_ai_system_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class AuditExportDB(Base): - """ - Tracks audit export packages generated for external auditors. - """ - __tablename__ = 'compliance_audit_exports' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - export_type = Column(String(50), nullable=False) # "full", "controls_only", "evidence_only" - export_name = Column(String(200)) # User-friendly name - - # Scope - included_regulations = Column(JSON) # List of regulation codes - included_domains = Column(JSON) # List of control domains - date_range_start = Column(Date) - date_range_end = Column(Date) - - # Generation - requested_by = Column(String(100), nullable=False) - requested_at = Column(DateTime, nullable=False, default=datetime.utcnow) - completed_at = Column(DateTime) - - # Output - file_path = Column(String(500)) - file_hash = Column(String(64)) # SHA-256 of ZIP - file_size_bytes = Column(Integer) - - status = Column(Enum(ExportStatusEnum), default=ExportStatusEnum.PENDING) - error_message = Column(Text) - - # Statistics - total_controls = Column(Integer) - total_evidence = Column(Integer) - compliance_score = Column(Float) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - def __repr__(self): - return f"" # ============================================================================ -# SERVICE MODULE REGISTRY (Sprint 3) +# MODELS — unique to this shim (not yet extracted to a sub-module) # ============================================================================ -class ServiceModuleDB(Base): - """ - Registry of all Breakpilot services/modules for compliance mapping. - - Tracks which regulations apply to which services, enabling: - - Service-specific compliance views - - Aggregated risk per service - - Gap analysis by module - """ - __tablename__ = 'compliance_service_modules' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(100), unique=True, nullable=False, index=True) # e.g., "consent-service" - display_name = Column(String(200), nullable=False) # e.g., "Go Consent Service" - description = Column(Text) - - # Technical details - service_type = Column(Enum(ServiceTypeEnum), nullable=False) - port = Column(Integer) # Primary port (if applicable) - technology_stack = Column(JSON) # e.g., ["Go", "Gin", "PostgreSQL"] - repository_path = Column(String(500)) # e.g., "/consent-service" - docker_image = Column(String(200)) # e.g., "breakpilot-pwa-consent-service" - - # Data categories handled - data_categories = Column(JSON) # e.g., ["personal_data", "consent_records"] - processes_pii = Column(Boolean, default=False) # Handles personally identifiable info? - processes_health_data = Column(Boolean, default=False) # Handles special category health data? - ai_components = Column(Boolean, default=False) # Contains AI/ML components? - - # Status - is_active = Column(Boolean, default=True) - criticality = Column(String(20), default="medium") # "critical", "high", "medium", "low" - - # Compliance aggregation - compliance_score = Column(Float) # Calculated score 0-100 - last_compliance_check = Column(DateTime) - - # Owner - owner_team = Column(String(100)) # e.g., "Backend Team" - owner_contact = Column(String(200)) # e.g., "backend@breakpilot.app" - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - regulation_mappings = relationship("ModuleRegulationMappingDB", back_populates="module", cascade="all, delete-orphan") - module_risks = relationship("ModuleRiskDB", back_populates="module", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_module_type_active', 'service_type', 'is_active'), - ) - - def __repr__(self): - return f"" - - -class ModuleRegulationMappingDB(Base): - """ - Maps services to applicable regulations with relevance level. - - Enables filtering: "Show all GDPR requirements for consent-service" - """ - __tablename__ = 'compliance_module_regulations' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - module_id = Column(String(36), ForeignKey('compliance_service_modules.id'), nullable=False, index=True) - regulation_id = Column(String(36), ForeignKey('compliance_regulations.id'), nullable=False, index=True) - - relevance_level = Column(Enum(RelevanceLevelEnum), nullable=False, default=RelevanceLevelEnum.MEDIUM) - notes = Column(Text) # Why this regulation applies - applicable_articles = Column(JSON) # List of specific articles that apply - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - module = relationship("ServiceModuleDB", back_populates="regulation_mappings") - regulation = relationship("RegulationDB") - - __table_args__ = ( - Index('ix_module_regulation', 'module_id', 'regulation_id', unique=True), - ) - - -class ModuleRiskDB(Base): - """ - Service-specific risks aggregated from requirements and controls. - """ - __tablename__ = 'compliance_module_risks' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - module_id = Column(String(36), ForeignKey('compliance_service_modules.id'), nullable=False, index=True) - risk_id = Column(String(36), ForeignKey('compliance_risks.id'), nullable=False, index=True) - - # Module-specific assessment - module_likelihood = Column(Integer) # 1-5, may differ from global - module_impact = Column(Integer) # 1-5, may differ from global - module_risk_level = Column(Enum(RiskLevelEnum)) - - assessment_notes = Column(Text) # Module-specific notes - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - module = relationship("ServiceModuleDB", back_populates="module_risks") - risk = relationship("RiskDB") - - __table_args__ = ( - Index('ix_module_risk', 'module_id', 'risk_id', unique=True), - ) - - -# ============================================================================ -# AUDIT SESSION & SIGN-OFF (Sprint 3 - Phase 3) -# ============================================================================ - -class AuditResultEnum(str, enum.Enum): - """Result of an audit sign-off for a requirement.""" - COMPLIANT = "compliant" # Fully compliant - COMPLIANT_WITH_NOTES = "compliant_notes" # Compliant with observations - NON_COMPLIANT = "non_compliant" # Not compliant - remediation required - NOT_APPLICABLE = "not_applicable" # Not applicable to this audit - PENDING = "pending" # Not yet reviewed - - -class AuditSessionStatusEnum(str, enum.Enum): - """Status of an audit session.""" - DRAFT = "draft" # Session created, not started - IN_PROGRESS = "in_progress" # Audit in progress - COMPLETED = "completed" # All items reviewed - ARCHIVED = "archived" # Historical record - - -class AuditSessionDB(Base): - """ - Audit session for structured compliance reviews. - - Enables auditors to: - - Create named audit sessions (e.g., "Q1 2026 GDPR Audit") - - Track progress through requirements - - Sign off individual items with digital signatures - - Generate audit reports - """ - __tablename__ = 'compliance_audit_sessions' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(200), nullable=False) # e.g., "Q1 2026 Compliance Audit" - description = Column(Text) - - # Auditor information - auditor_name = Column(String(100), nullable=False) # e.g., "Dr. Thomas Müller" - auditor_email = Column(String(200)) - auditor_organization = Column(String(200)) # External auditor company - - # Session scope - status = Column(Enum(AuditSessionStatusEnum), default=AuditSessionStatusEnum.DRAFT) - regulation_ids = Column(JSON) # Filter: ["GDPR", "AIACT"] or null for all - - # Progress tracking - total_items = Column(Integer, default=0) - completed_items = Column(Integer, default=0) - compliant_count = Column(Integer, default=0) - non_compliant_count = Column(Integer, default=0) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - started_at = Column(DateTime) # When audit began - completed_at = Column(DateTime) # When audit finished - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - signoffs = relationship("AuditSignOffDB", back_populates="session", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_audit_session_status', 'status'), - Index('ix_audit_session_auditor', 'auditor_name'), - ) - - def __repr__(self): - return f"" - - @property - def completion_percentage(self) -> float: - """Calculate completion percentage.""" - if self.total_items == 0: - return 0.0 - return round((self.completed_items / self.total_items) * 100, 1) - - -class AuditSignOffDB(Base): - """ - Individual sign-off for a requirement within an audit session. - - Features: - - Records audit result (compliant, non-compliant, etc.) - - Stores auditor notes and observations - - Creates digital signature (SHA-256 hash) for tamper evidence - """ - __tablename__ = 'compliance_audit_signoffs' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - session_id = Column(String(36), ForeignKey('compliance_audit_sessions.id'), nullable=False, index=True) - requirement_id = Column(String(36), ForeignKey('compliance_requirements.id'), nullable=False, index=True) - - # Audit result - result = Column(Enum(AuditResultEnum), default=AuditResultEnum.PENDING) - notes = Column(Text) # Auditor observations - - # Evidence references for this sign-off - evidence_ids = Column(JSON) # List of evidence IDs reviewed - - # Digital signature (SHA-256 hash of result + auditor + timestamp) - signature_hash = Column(String(64)) # SHA-256 hex string - signed_at = Column(DateTime) - signed_by = Column(String(100)) # Auditor name at time of signing - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - session = relationship("AuditSessionDB", back_populates="signoffs") - requirement = relationship("RequirementDB") - - __table_args__ = ( - Index('ix_signoff_session_requirement', 'session_id', 'requirement_id', unique=True), - Index('ix_signoff_result', 'result'), - ) - - def __repr__(self): - return f"" - - def create_signature(self, auditor_name: str) -> str: - """ - Create a digital signature for this sign-off. - - Returns SHA-256 hash of: result + requirement_id + auditor_name + timestamp - """ - import hashlib - from datetime import datetime - - timestamp = datetime.utcnow().isoformat() - data = f"{self.result.value}|{self.requirement_id}|{auditor_name}|{timestamp}" - signature = hashlib.sha256(data.encode()).hexdigest() - - self.signature_hash = signature - self.signed_at = datetime.utcnow() - self.signed_by = auditor_name - - return signature - - -# ============================================================================ -# ISO 27001 ISMS MODELS (Kapitel 4-10) -# ============================================================================ - -class ApprovalStatusEnum(str, enum.Enum): - """Approval status for ISMS documents.""" - DRAFT = "draft" - UNDER_REVIEW = "under_review" - APPROVED = "approved" - SUPERSEDED = "superseded" - - -class FindingTypeEnum(str, enum.Enum): - """ISO 27001 audit finding classification.""" - MAJOR = "major" # Major nonconformity - blocks certification - MINOR = "minor" # Minor nonconformity - requires CAPA - OFI = "ofi" # Opportunity for Improvement - POSITIVE = "positive" # Positive observation - - -class FindingStatusEnum(str, enum.Enum): - """Status of an audit finding.""" - OPEN = "open" - IN_PROGRESS = "in_progress" - CORRECTIVE_ACTION_PENDING = "capa_pending" - VERIFICATION_PENDING = "verification_pending" - VERIFIED = "verified" - CLOSED = "closed" - - -class CAPATypeEnum(str, enum.Enum): - """Type of corrective/preventive action.""" - CORRECTIVE = "corrective" # Fix the nonconformity - PREVENTIVE = "preventive" # Prevent recurrence - BOTH = "both" - - -class ISMSScopeDB(Base): - """ - ISMS Scope Definition (ISO 27001 Kapitel 4.3) - - Defines the boundaries and applicability of the ISMS. - This is MANDATORY for certification. - """ - __tablename__ = 'compliance_isms_scope' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - version = Column(String(20), nullable=False, default="1.0") - - # Scope definition - scope_statement = Column(Text, nullable=False) # Main scope text - included_locations = Column(JSON) # List of locations - included_processes = Column(JSON) # List of processes - included_services = Column(JSON) # List of services/products - excluded_items = Column(JSON) # Explicitly excluded items - exclusion_justification = Column(Text) # Why items are excluded - - # Boundaries - organizational_boundary = Column(Text) # Legal entity, departments - physical_boundary = Column(Text) # Locations, networks - technical_boundary = Column(Text) # Systems, applications - - # Approval - status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) - approved_by = Column(String(100)) - approved_at = Column(DateTime) - approval_signature = Column(String(64)) # SHA-256 hash - - # Validity - effective_date = Column(Date) - review_date = Column(Date) # Next mandatory review - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - created_by = Column(String(100)) - updated_by = Column(String(100)) - - __table_args__ = ( - Index('ix_isms_scope_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class ISMSContextDB(Base): - """ - ISMS Context (ISO 27001 Kapitel 4.1, 4.2) - - Documents internal/external issues and interested parties. - """ - __tablename__ = 'compliance_isms_context' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - version = Column(String(20), nullable=False, default="1.0") - - # 4.1 Internal issues - internal_issues = Column(JSON) # List of {"issue": "", "impact": "", "treatment": ""} - - # 4.1 External issues - external_issues = Column(JSON) # List of {"issue": "", "impact": "", "treatment": ""} - - # 4.2 Interested parties - interested_parties = Column(JSON) # List of {"party": "", "requirements": [], "relevance": ""} - - # Legal/regulatory requirements - regulatory_requirements = Column(JSON) # DSGVO, AI Act, etc. - contractual_requirements = Column(JSON) # Customer contracts - - # Analysis - swot_strengths = Column(JSON) - swot_weaknesses = Column(JSON) - swot_opportunities = Column(JSON) - swot_threats = Column(JSON) - - # Approval - status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) - approved_by = Column(String(100)) - approved_at = Column(DateTime) - - # Review - last_reviewed_at = Column(DateTime) - next_review_date = Column(Date) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - def __repr__(self): - return f"" - - -class ISMSPolicyDB(Base): - """ - ISMS Policies (ISO 27001 Kapitel 5.2) - - Information security policy and sub-policies. - """ - __tablename__ = 'compliance_isms_policies' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - policy_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "POL-ISMS-001" - - # Policy details - title = Column(String(200), nullable=False) - policy_type = Column(String(50), nullable=False) # "master", "operational", "technical" - description = Column(Text) - policy_text = Column(Text, nullable=False) # Full policy content - - # Scope - applies_to = Column(JSON) # Roles, departments, systems - - # Document control - version = Column(String(20), nullable=False, default="1.0") - status = Column(Enum(ApprovalStatusEnum), default=ApprovalStatusEnum.DRAFT) - - # Approval chain - authored_by = Column(String(100)) - reviewed_by = Column(String(100)) - approved_by = Column(String(100)) # Must be top management - approved_at = Column(DateTime) - approval_signature = Column(String(64)) - - # Validity - effective_date = Column(Date) - review_frequency_months = Column(Integer, default=12) - next_review_date = Column(Date) - - # References - parent_policy_id = Column(String(36), ForeignKey('compliance_isms_policies.id')) - related_controls = Column(JSON) # List of control_ids - - # Document path - document_path = Column(String(500)) # Link to full document - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_policy_type_status', 'policy_type', 'status'), - ) - - def __repr__(self): - return f"" - - -class SecurityObjectiveDB(Base): - """ - Security Objectives (ISO 27001 Kapitel 6.2) - - Measurable information security objectives. - """ - __tablename__ = 'compliance_security_objectives' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - objective_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "OBJ-001" - - # Objective definition - title = Column(String(200), nullable=False) - description = Column(Text) - category = Column(String(50)) # "availability", "confidentiality", "integrity", "compliance" - - # SMART criteria - specific = Column(Text) # What exactly - measurable = Column(Text) # How measured - achievable = Column(Text) # Is it realistic - relevant = Column(Text) # Why important - time_bound = Column(Text) # Deadline - - # Metrics - kpi_name = Column(String(100)) - kpi_target = Column(String(100)) # Target value - kpi_current = Column(String(100)) # Current value - kpi_unit = Column(String(50)) # %, count, score - measurement_frequency = Column(String(50)) # monthly, quarterly - - # Responsibility - owner = Column(String(100)) - accountable = Column(String(100)) # RACI: Accountable - - # Status - status = Column(String(30), default="active") # active, achieved, not_achieved, cancelled - progress_percentage = Column(Integer, default=0) - - # Timeline - target_date = Column(Date) - achieved_date = Column(Date) - - # Linked items - related_controls = Column(JSON) - related_risks = Column(JSON) - - # Approval - approved_by = Column(String(100)) - approved_at = Column(DateTime) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_objective_status', 'status'), - Index('ix_objective_category', 'category'), - ) - - def __repr__(self): - return f"" - - -class StatementOfApplicabilityDB(Base): - """ - Statement of Applicability (SoA) - ISO 27001 Anhang A Mapping - - Documents which Annex A controls are applicable and why. - This is MANDATORY for certification. - """ - __tablename__ = 'compliance_soa' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - # ISO 27001:2022 Annex A reference - annex_a_control = Column(String(20), nullable=False, index=True) # e.g., "A.5.1" - annex_a_title = Column(String(300), nullable=False) - annex_a_category = Column(String(100)) # "Organizational", "People", "Physical", "Technological" - - # Applicability decision - is_applicable = Column(Boolean, nullable=False) - applicability_justification = Column(Text, nullable=False) # MUST be documented - - # Implementation status - implementation_status = Column(String(30), default="planned") # planned, partial, implemented, not_implemented - implementation_notes = Column(Text) - - # Mapping to our controls - breakpilot_control_ids = Column(JSON) # List of our control_ids that address this - coverage_level = Column(String(20), default="full") # full, partial, planned - - # Evidence - evidence_description = Column(Text) - evidence_ids = Column(JSON) # Links to EvidenceDB - - # Risk-based justification (for exclusions) - risk_assessment_notes = Column(Text) # If not applicable, explain why - compensating_controls = Column(Text) # If partial, explain compensating measures - - # Approval - reviewed_by = Column(String(100)) - reviewed_at = Column(DateTime) - approved_by = Column(String(100)) - approved_at = Column(DateTime) - - # Version tracking - version = Column(String(20), default="1.0") - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_soa_annex_control', 'annex_a_control', unique=True), - Index('ix_soa_applicable', 'is_applicable'), - Index('ix_soa_status', 'implementation_status'), - ) - - def __repr__(self): - return f"" - - -class AuditFindingDB(Base): - """ - Audit Finding with ISO 27001 Classification (Major/Minor/OFI) - - Tracks findings from internal and external audits with proper - classification and CAPA workflow. - """ - __tablename__ = 'compliance_audit_findings' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - finding_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "FIND-2026-001" - - # Source - audit_session_id = Column(String(36), ForeignKey('compliance_audit_sessions.id'), index=True) - internal_audit_id = Column(String(36), ForeignKey('compliance_internal_audits.id'), index=True) - - # Classification (CRITICAL for ISO 27001!) - finding_type = Column(Enum(FindingTypeEnum), nullable=False) - - # ISO reference - iso_chapter = Column(String(20)) # e.g., "6.1.2", "9.2" - annex_a_control = Column(String(20)) # e.g., "A.8.2" - - # Finding details - title = Column(String(300), nullable=False) - description = Column(Text, nullable=False) - objective_evidence = Column(Text, nullable=False) # What the auditor observed - - # Root cause analysis - root_cause = Column(Text) - root_cause_method = Column(String(50)) # "5-why", "fishbone", "pareto" - - # Impact assessment - impact_description = Column(Text) - affected_processes = Column(JSON) - affected_assets = Column(JSON) - - # Status tracking - status = Column(Enum(FindingStatusEnum), default=FindingStatusEnum.OPEN) - - # Responsibility - owner = Column(String(100)) # Person responsible for closure - auditor = Column(String(100)) # Auditor who raised finding - - # Dates - identified_date = Column(Date, nullable=False, default=date.today) - due_date = Column(Date) # Deadline for closure - closed_date = Column(Date) - - # Verification - verification_method = Column(Text) - verified_by = Column(String(100)) - verified_at = Column(DateTime) - verification_evidence = Column(Text) - - # Closure - closure_notes = Column(Text) - closed_by = Column(String(100)) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - corrective_actions = relationship("CorrectiveActionDB", back_populates="finding", cascade="all, delete-orphan") - - __table_args__ = ( - Index('ix_finding_type_status', 'finding_type', 'status'), - Index('ix_finding_due_date', 'due_date'), - ) - - def __repr__(self): - return f"" - - @property - def is_blocking(self) -> bool: - """Major findings block certification.""" - return self.finding_type == FindingTypeEnum.MAJOR and self.status != FindingStatusEnum.CLOSED - - -class CorrectiveActionDB(Base): - """ - Corrective & Preventive Actions (CAPA) - ISO 27001 10.1 - - Tracks actions taken to address nonconformities. - """ - __tablename__ = 'compliance_corrective_actions' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - capa_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "CAPA-2026-001" - - # Link to finding - finding_id = Column(String(36), ForeignKey('compliance_audit_findings.id'), nullable=False, index=True) - - # Type - capa_type = Column(Enum(CAPATypeEnum), nullable=False) - - # Action details - title = Column(String(300), nullable=False) - description = Column(Text, nullable=False) - expected_outcome = Column(Text) - - # Responsibility - assigned_to = Column(String(100), nullable=False) - approved_by = Column(String(100)) - - # Timeline - planned_start = Column(Date) - planned_completion = Column(Date, nullable=False) - actual_completion = Column(Date) - - # Status - status = Column(String(30), default="planned") # planned, in_progress, completed, verified, cancelled - progress_percentage = Column(Integer, default=0) - - # Resources - estimated_effort_hours = Column(Integer) - actual_effort_hours = Column(Integer) - resources_required = Column(Text) - - # Evidence of implementation - implementation_evidence = Column(Text) - evidence_ids = Column(JSON) - - # Effectiveness review - effectiveness_criteria = Column(Text) - effectiveness_verified = Column(Boolean, default=False) - effectiveness_verification_date = Column(Date) - effectiveness_notes = Column(Text) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - finding = relationship("AuditFindingDB", back_populates="corrective_actions") - - __table_args__ = ( - Index('ix_capa_status', 'status'), - Index('ix_capa_due', 'planned_completion'), - ) - - def __repr__(self): - return f"" - - -class ManagementReviewDB(Base): - """ - Management Review (ISO 27001 Kapitel 9.3) - - Records mandatory management reviews of the ISMS. - """ - __tablename__ = 'compliance_management_reviews' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - review_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "MR-2026-Q1" - - # Review details - title = Column(String(200), nullable=False) - review_date = Column(Date, nullable=False) - review_period_start = Column(Date) # Period being reviewed - review_period_end = Column(Date) - - # Participants - chairperson = Column(String(100), nullable=False) # Usually top management - attendees = Column(JSON) # List of {"name": "", "role": ""} - - # 9.3 Review Inputs (mandatory!) - input_previous_actions = Column(Text) # Status of previous review actions - input_isms_changes = Column(Text) # Changes in internal/external issues - input_security_performance = Column(Text) # Nonconformities, monitoring, audit results - input_interested_party_feedback = Column(Text) - input_risk_assessment_results = Column(Text) - input_improvement_opportunities = Column(Text) - - # Additional inputs - input_policy_effectiveness = Column(Text) - input_objective_achievement = Column(Text) - input_resource_adequacy = Column(Text) - - # 9.3 Review Outputs (mandatory!) - output_improvement_decisions = Column(Text) # Decisions for improvement - output_isms_changes = Column(Text) # Changes needed to ISMS - output_resource_needs = Column(Text) # Resource requirements - - # Action items - action_items = Column(JSON) # List of {"action": "", "owner": "", "due_date": ""} - - # Overall assessment - isms_effectiveness_rating = Column(String(20)) # "effective", "partially_effective", "not_effective" - key_decisions = Column(Text) - - # Approval - status = Column(String(30), default="draft") # draft, conducted, approved - approved_by = Column(String(100)) - approved_at = Column(DateTime) - minutes_document_path = Column(String(500)) # Link to meeting minutes - - # Next review - next_review_date = Column(Date) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - __table_args__ = ( - Index('ix_mgmt_review_date', 'review_date'), - Index('ix_mgmt_review_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class InternalAuditDB(Base): - """ - Internal Audit (ISO 27001 Kapitel 9.2) - - Tracks internal audit program and individual audits. - """ - __tablename__ = 'compliance_internal_audits' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - audit_id = Column(String(30), unique=True, nullable=False, index=True) # e.g., "IA-2026-001" - - # Audit details - title = Column(String(200), nullable=False) - audit_type = Column(String(50), nullable=False) # "scheduled", "surveillance", "special" - - # Scope - scope_description = Column(Text, nullable=False) - iso_chapters_covered = Column(JSON) # e.g., ["4", "5", "6.1"] - annex_a_controls_covered = Column(JSON) # e.g., ["A.5", "A.6"] - processes_covered = Column(JSON) - departments_covered = Column(JSON) - - # Audit criteria - criteria = Column(Text) # Standards, policies being audited against - - # Timeline - planned_date = Column(Date, nullable=False) - actual_start_date = Column(Date) - actual_end_date = Column(Date) - - # Audit team - lead_auditor = Column(String(100), nullable=False) - audit_team = Column(JSON) # List of auditor names - auditee_representatives = Column(JSON) # Who was interviewed - - # Status - status = Column(String(30), default="planned") # planned, in_progress, completed, cancelled - - # Results summary - total_findings = Column(Integer, default=0) - major_findings = Column(Integer, default=0) - minor_findings = Column(Integer, default=0) - ofi_count = Column(Integer, default=0) - positive_observations = Column(Integer, default=0) - - # Conclusion - audit_conclusion = Column(Text) - overall_assessment = Column(String(30)) # "conforming", "minor_nc", "major_nc" - - # Report - report_date = Column(Date) - report_document_path = Column(String(500)) - - # Sign-off - report_approved_by = Column(String(100)) - report_approved_at = Column(DateTime) - - # Follow-up - follow_up_audit_required = Column(Boolean, default=False) - follow_up_audit_id = Column(String(36)) - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - findings = relationship("AuditFindingDB", backref="internal_audit", foreign_keys=[AuditFindingDB.internal_audit_id]) - - __table_args__ = ( - Index('ix_internal_audit_date', 'planned_date'), - Index('ix_internal_audit_status', 'status'), - ) - - def __repr__(self): - return f"" - - -class AuditTrailDB(Base): - """ - Comprehensive Audit Trail for ISMS Changes - - Tracks all changes to compliance-relevant data for - accountability and forensic analysis. - """ - __tablename__ = 'compliance_audit_trail' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - # What changed - entity_type = Column(String(50), nullable=False, index=True) # "control", "risk", "policy", etc. - entity_id = Column(String(36), nullable=False, index=True) - entity_name = Column(String(200)) # Human-readable identifier - - # Action - action = Column(String(20), nullable=False) # "create", "update", "delete", "approve", "sign" - - # Change details - field_changed = Column(String(100)) # Which field (for updates) - old_value = Column(Text) - new_value = Column(Text) - change_summary = Column(Text) # Human-readable summary - - # Who & When - performed_by = Column(String(100), nullable=False) - performed_at = Column(DateTime, nullable=False, default=datetime.utcnow) - - # Context - ip_address = Column(String(45)) - user_agent = Column(String(500)) - session_id = Column(String(100)) - - # Integrity - checksum = Column(String(64)) # SHA-256 of the change - - # Timestamps (immutable after creation) - created_at = Column(DateTime, nullable=False, default=datetime.utcnow) - - __table_args__ = ( - Index('ix_audit_trail_entity', 'entity_type', 'entity_id'), - Index('ix_audit_trail_time', 'performed_at'), - Index('ix_audit_trail_user', 'performed_by'), - ) - - def __repr__(self): - return f"" - - -class ISMSReadinessCheckDB(Base): - """ - ISMS Readiness Check Results - - Stores automated pre-audit checks to identify potential - Major findings before external audit. - """ - __tablename__ = 'compliance_isms_readiness' - - id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) - - # Check run - check_date = Column(DateTime, nullable=False, default=datetime.utcnow) - triggered_by = Column(String(100)) # "scheduled", "manual", "pre-audit" - - # Overall status - overall_status = Column(String(20), nullable=False) # "ready", "at_risk", "not_ready" - certification_possible = Column(Boolean, nullable=False) - - # Chapter-by-chapter status (ISO 27001) - chapter_4_status = Column(String(20)) # Context - chapter_5_status = Column(String(20)) # Leadership - chapter_6_status = Column(String(20)) # Planning - chapter_7_status = Column(String(20)) # Support - chapter_8_status = Column(String(20)) # Operation - chapter_9_status = Column(String(20)) # Performance - chapter_10_status = Column(String(20)) # Improvement - - # Potential Major findings - potential_majors = Column(JSON) # List of {"check": "", "status": "", "recommendation": ""} - - # Potential Minor findings - potential_minors = Column(JSON) - - # Improvement opportunities - improvement_opportunities = Column(JSON) - - # Scores - readiness_score = Column(Float) # 0-100 - documentation_score = Column(Float) - implementation_score = Column(Float) - evidence_score = Column(Float) - - # Recommendations - priority_actions = Column(JSON) # List of recommended actions before audit - - # Timestamps - created_at = Column(DateTime, default=datetime.utcnow) - - __table_args__ = ( - Index('ix_readiness_date', 'check_date'), - Index('ix_readiness_status', 'overall_status'), - ) - - def __repr__(self): - return f"" - - class LLMGenerationAuditDB(Base): """ Audit trail for LLM-generated content. diff --git a/backend-compliance/compliance/db/repository.py b/backend-compliance/compliance/db/repository.py index 3d8511d..2881e53 100644 --- a/backend-compliance/compliance/db/repository.py +++ b/backend-compliance/compliance/db/repository.py @@ -36,1657 +36,3 @@ from compliance.db.audit_session_repository import ( # noqa: F401 AuditSignOffRepository, ) - -class RegulationRepository: - """Repository for regulations/standards.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - code: str, - name: str, - regulation_type: RegulationTypeEnum, - full_name: Optional[str] = None, - source_url: Optional[str] = None, - local_pdf_path: Optional[str] = None, - effective_date: Optional[date] = None, - description: Optional[str] = None, - ) -> RegulationDB: - """Create a new regulation.""" - regulation = RegulationDB( - id=str(uuid.uuid4()), - code=code, - name=name, - full_name=full_name, - regulation_type=regulation_type, - source_url=source_url, - local_pdf_path=local_pdf_path, - effective_date=effective_date, - description=description, - ) - self.db.add(regulation) - self.db.commit() - self.db.refresh(regulation) - return regulation - - def get_by_id(self, regulation_id: str) -> Optional[RegulationDB]: - """Get regulation by ID.""" - return self.db.query(RegulationDB).filter(RegulationDB.id == regulation_id).first() - - def get_by_code(self, code: str) -> Optional[RegulationDB]: - """Get regulation by code (e.g., 'GDPR').""" - return self.db.query(RegulationDB).filter(RegulationDB.code == code).first() - - def get_all( - self, - regulation_type: Optional[RegulationTypeEnum] = None, - is_active: Optional[bool] = True - ) -> List[RegulationDB]: - """Get all regulations with optional filters.""" - query = self.db.query(RegulationDB) - if regulation_type: - query = query.filter(RegulationDB.regulation_type == regulation_type) - if is_active is not None: - query = query.filter(RegulationDB.is_active == is_active) - return query.order_by(RegulationDB.code).all() - - def update(self, regulation_id: str, **kwargs) -> Optional[RegulationDB]: - """Update a regulation.""" - regulation = self.get_by_id(regulation_id) - if not regulation: - return None - for key, value in kwargs.items(): - if hasattr(regulation, key): - setattr(regulation, key, value) - regulation.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(regulation) - return regulation - - def delete(self, regulation_id: str) -> bool: - """Delete a regulation.""" - regulation = self.get_by_id(regulation_id) - if not regulation: - return False - self.db.delete(regulation) - self.db.commit() - return True - - def get_active(self) -> List[RegulationDB]: - """Get all active regulations.""" - return self.get_all(is_active=True) - - def count(self) -> int: - """Count all regulations.""" - return self.db.query(func.count(RegulationDB.id)).scalar() or 0 - - -class RequirementRepository: - """Repository for requirements.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - regulation_id: str, - article: str, - title: str, - paragraph: Optional[str] = None, - description: Optional[str] = None, - requirement_text: Optional[str] = None, - breakpilot_interpretation: Optional[str] = None, - is_applicable: bool = True, - priority: int = 2, - ) -> RequirementDB: - """Create a new requirement.""" - requirement = RequirementDB( - id=str(uuid.uuid4()), - regulation_id=regulation_id, - article=article, - paragraph=paragraph, - title=title, - description=description, - requirement_text=requirement_text, - breakpilot_interpretation=breakpilot_interpretation, - is_applicable=is_applicable, - priority=priority, - ) - self.db.add(requirement) - self.db.commit() - self.db.refresh(requirement) - return requirement - - def get_by_id(self, requirement_id: str) -> Optional[RequirementDB]: - """Get requirement by ID with eager-loaded relationships.""" - return ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - .filter(RequirementDB.id == requirement_id) - .first() - ) - - def get_by_regulation( - self, - regulation_id: str, - is_applicable: Optional[bool] = None - ) -> List[RequirementDB]: - """Get all requirements for a regulation with eager-loaded controls.""" - query = ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - .filter(RequirementDB.regulation_id == regulation_id) - ) - if is_applicable is not None: - query = query.filter(RequirementDB.is_applicable == is_applicable) - return query.order_by(RequirementDB.article, RequirementDB.paragraph).all() - - def get_by_regulation_code(self, code: str) -> List[RequirementDB]: - """Get requirements by regulation code with eager-loaded relationships.""" - return ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - .join(RegulationDB) - .filter(RegulationDB.code == code) - .order_by(RequirementDB.article, RequirementDB.paragraph) - .all() - ) - - def get_all(self, is_applicable: Optional[bool] = None) -> List[RequirementDB]: - """Get all requirements with optional filter and eager-loading.""" - query = ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - ) - if is_applicable is not None: - query = query.filter(RequirementDB.is_applicable == is_applicable) - return query.order_by(RequirementDB.article, RequirementDB.paragraph).all() - - def get_paginated( - self, - page: int = 1, - page_size: int = 50, - regulation_code: Optional[str] = None, - status: Optional[str] = None, - is_applicable: Optional[bool] = None, - search: Optional[str] = None, - ) -> Tuple[List[RequirementDB], int]: - """ - Get paginated requirements with eager-loaded relationships. - Returns tuple of (items, total_count). - """ - query = ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - ) - - # Filters - if regulation_code: - query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code) - if status: - query = query.filter(RequirementDB.implementation_status == status) - if is_applicable is not None: - query = query.filter(RequirementDB.is_applicable == is_applicable) - if search: - search_term = f"%{search}%" - query = query.filter( - or_( - RequirementDB.title.ilike(search_term), - RequirementDB.description.ilike(search_term), - RequirementDB.article.ilike(search_term), - ) - ) - - # Count before pagination - total = query.count() - - # Apply pagination and ordering - items = ( - query - .order_by(RequirementDB.priority.desc(), RequirementDB.article, RequirementDB.paragraph) - .offset((page - 1) * page_size) - .limit(page_size) - .all() - ) - - return items, total - - def delete(self, requirement_id: str) -> bool: - """Delete a requirement.""" - requirement = self.db.query(RequirementDB).filter(RequirementDB.id == requirement_id).first() - if not requirement: - return False - self.db.delete(requirement) - self.db.commit() - return True - - def count(self) -> int: - """Count all requirements.""" - return self.db.query(func.count(RequirementDB.id)).scalar() or 0 - - -class ControlRepository: - """Repository for controls.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - control_id: str, - domain: ControlDomainEnum, - control_type: str, - title: str, - pass_criteria: str, - description: Optional[str] = None, - implementation_guidance: Optional[str] = None, - code_reference: Optional[str] = None, - is_automated: bool = False, - automation_tool: Optional[str] = None, - owner: Optional[str] = None, - review_frequency_days: int = 90, - ) -> ControlDB: - """Create a new control.""" - control = ControlDB( - id=str(uuid.uuid4()), - control_id=control_id, - domain=domain, - control_type=control_type, - title=title, - description=description, - pass_criteria=pass_criteria, - implementation_guidance=implementation_guidance, - code_reference=code_reference, - is_automated=is_automated, - automation_tool=automation_tool, - owner=owner, - review_frequency_days=review_frequency_days, - ) - self.db.add(control) - self.db.commit() - self.db.refresh(control) - return control - - def get_by_id(self, control_uuid: str) -> Optional[ControlDB]: - """Get control by UUID with eager-loaded relationships.""" - return ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement), - selectinload(ControlDB.evidence) - ) - .filter(ControlDB.id == control_uuid) - .first() - ) - - def get_by_control_id(self, control_id: str) -> Optional[ControlDB]: - """Get control by control_id (e.g., 'PRIV-001') with eager-loaded relationships.""" - return ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement), - selectinload(ControlDB.evidence) - ) - .filter(ControlDB.control_id == control_id) - .first() - ) - - def get_all( - self, - domain: Optional[ControlDomainEnum] = None, - status: Optional[ControlStatusEnum] = None, - is_automated: Optional[bool] = None, - ) -> List[ControlDB]: - """Get all controls with optional filters and eager-loading.""" - query = ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings), - selectinload(ControlDB.evidence) - ) - ) - if domain: - query = query.filter(ControlDB.domain == domain) - if status: - query = query.filter(ControlDB.status == status) - if is_automated is not None: - query = query.filter(ControlDB.is_automated == is_automated) - return query.order_by(ControlDB.control_id).all() - - def get_paginated( - self, - page: int = 1, - page_size: int = 50, - domain: Optional[ControlDomainEnum] = None, - status: Optional[ControlStatusEnum] = None, - is_automated: Optional[bool] = None, - search: Optional[str] = None, - ) -> Tuple[List[ControlDB], int]: - """ - Get paginated controls with eager-loaded relationships. - Returns tuple of (items, total_count). - """ - query = ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings), - selectinload(ControlDB.evidence) - ) - ) - - if domain: - query = query.filter(ControlDB.domain == domain) - if status: - query = query.filter(ControlDB.status == status) - if is_automated is not None: - query = query.filter(ControlDB.is_automated == is_automated) - if search: - search_term = f"%{search}%" - query = query.filter( - or_( - ControlDB.title.ilike(search_term), - ControlDB.description.ilike(search_term), - ControlDB.control_id.ilike(search_term), - ) - ) - - total = query.count() - items = ( - query - .order_by(ControlDB.control_id) - .offset((page - 1) * page_size) - .limit(page_size) - .all() - ) - - return items, total - - def get_by_domain(self, domain: ControlDomainEnum) -> List[ControlDB]: - """Get all controls in a domain.""" - return self.get_all(domain=domain) - - def get_by_status(self, status: ControlStatusEnum) -> List[ControlDB]: - """Get all controls with a specific status.""" - return self.get_all(status=status) - - def update_status( - self, - control_id: str, - status: ControlStatusEnum, - status_notes: Optional[str] = None - ) -> Optional[ControlDB]: - """Update control status.""" - control = self.get_by_control_id(control_id) - if not control: - return None - control.status = status - if status_notes: - control.status_notes = status_notes - control.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(control) - return control - - def mark_reviewed(self, control_id: str) -> Optional[ControlDB]: - """Mark control as reviewed.""" - control = self.get_by_control_id(control_id) - if not control: - return None - control.last_reviewed_at = datetime.utcnow() - from datetime import timedelta - control.next_review_at = datetime.utcnow() + timedelta(days=control.review_frequency_days) - control.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(control) - return control - - def get_due_for_review(self) -> List[ControlDB]: - """Get controls due for review.""" - return ( - self.db.query(ControlDB) - .filter( - or_( - ControlDB.next_review_at is None, - ControlDB.next_review_at <= datetime.utcnow() - ) - ) - .order_by(ControlDB.next_review_at) - .all() - ) - - def get_statistics(self) -> Dict[str, Any]: - """Get control statistics by status and domain.""" - total = self.db.query(func.count(ControlDB.id)).scalar() - - by_status = dict( - self.db.query(ControlDB.status, func.count(ControlDB.id)) - .group_by(ControlDB.status) - .all() - ) - - by_domain = dict( - self.db.query(ControlDB.domain, func.count(ControlDB.id)) - .group_by(ControlDB.domain) - .all() - ) - - passed = by_status.get(ControlStatusEnum.PASS, 0) - partial = by_status.get(ControlStatusEnum.PARTIAL, 0) - - score = 0.0 - if total > 0: - score = ((passed + (partial * 0.5)) / total) * 100 - - return { - "total": total, - "by_status": {str(k.value) if k else "none": v for k, v in by_status.items()}, - "by_domain": {str(k.value) if k else "none": v for k, v in by_domain.items()}, - "compliance_score": round(score, 1), - } - - def get_multi_dimensional_score(self) -> Dict[str, Any]: - """ - Calculate multi-dimensional compliance score (Anti-Fake-Evidence). - - Returns 6 dimensions + hard_blocks + overall_readiness. - """ - from .models import ( - EvidenceDB, RequirementDB, ControlMappingDB, - EvidenceConfidenceEnum, EvidenceTruthStatusEnum, - ) - - # Weight map for confidence levels - conf_weights = {"E0": 0.0, "E1": 0.25, "E2": 0.5, "E3": 0.75, "E4": 1.0} - validated_statuses = {"validated_internal", "accepted_by_auditor", "provided_to_auditor"} - - controls = self.get_all() - total_controls = len(controls) - - if total_controls == 0: - return { - "requirement_coverage": 0.0, - "evidence_strength": 0.0, - "validation_quality": 0.0, - "evidence_freshness": 0.0, - "control_effectiveness": 0.0, - "overall_readiness": 0.0, - "hard_blocks": ["Keine Controls vorhanden"], - } - - # 1. requirement_coverage: % requirements linked to at least one control - total_reqs = self.db.query(func.count(RequirementDB.id)).scalar() or 0 - linked_reqs = ( - self.db.query(func.count(func.distinct(ControlMappingDB.requirement_id))) - .scalar() or 0 - ) - requirement_coverage = (linked_reqs / total_reqs * 100) if total_reqs > 0 else 0.0 - - # 2. evidence_strength: weighted average of evidence confidence - all_evidence = self.db.query(EvidenceDB).all() - if all_evidence: - total_weight = 0.0 - for e in all_evidence: - conf_val = e.confidence_level.value if e.confidence_level else "E1" - total_weight += conf_weights.get(conf_val, 0.25) - evidence_strength = (total_weight / len(all_evidence)) * 100 - else: - evidence_strength = 0.0 - - # 3. validation_quality: % evidence with truth_status >= validated_internal - if all_evidence: - validated_count = sum( - 1 for e in all_evidence - if (e.truth_status.value if e.truth_status else "uploaded") in validated_statuses - ) - validation_quality = (validated_count / len(all_evidence)) * 100 - else: - validation_quality = 0.0 - - # 4. evidence_freshness: % evidence not expired and reviewed < 90 days - now = datetime.now() - if all_evidence: - fresh_count = 0 - for e in all_evidence: - is_expired = e.valid_until and e.valid_until < now - is_stale = e.reviewed_at and (now - e.reviewed_at).days > 90 if hasattr(e, 'reviewed_at') and e.reviewed_at else False - if not is_expired and not is_stale: - fresh_count += 1 - evidence_freshness = (fresh_count / len(all_evidence)) * 100 - else: - evidence_freshness = 0.0 - - # 5. control_effectiveness: existing formula - passed = sum(1 for c in controls if c.status == ControlStatusEnum.PASS) - partial = sum(1 for c in controls if c.status == ControlStatusEnum.PARTIAL) - control_effectiveness = ((passed + partial * 0.5) / total_controls) * 100 - - # 6. overall_readiness: weighted composite - overall_readiness = ( - 0.20 * requirement_coverage + - 0.25 * evidence_strength + - 0.20 * validation_quality + - 0.10 * evidence_freshness + - 0.25 * control_effectiveness - ) - - # Hard blocks - hard_blocks = [] - - # Critical controls without any evidence - critical_no_evidence = [] - for c in controls: - if c.status in (ControlStatusEnum.PASS, ControlStatusEnum.PARTIAL): - evidence_for_ctrl = [e for e in all_evidence if e.control_id == c.id] - if not evidence_for_ctrl: - critical_no_evidence.append(c.control_id) - if critical_no_evidence: - hard_blocks.append( - f"{len(critical_no_evidence)} Controls mit Status pass/partial haben keine Evidence: " - f"{', '.join(critical_no_evidence[:5])}" - ) - - # Controls with only E0/E1 evidence claiming pass - weak_evidence_pass = [] - for c in controls: - if c.status == ControlStatusEnum.PASS: - evidence_for_ctrl = [e for e in all_evidence if e.control_id == c.id] - if evidence_for_ctrl: - max_conf = max( - conf_weights.get( - e.confidence_level.value if e.confidence_level else "E1", 0.25 - ) - for e in evidence_for_ctrl - ) - if max_conf < 0.5: # Only E0 or E1 - weak_evidence_pass.append(c.control_id) - if weak_evidence_pass: - hard_blocks.append( - f"{len(weak_evidence_pass)} Controls auf 'pass' haben nur E0/E1-Evidence: " - f"{', '.join(weak_evidence_pass[:5])}" - ) - - return { - "requirement_coverage": round(requirement_coverage, 1), - "evidence_strength": round(evidence_strength, 1), - "validation_quality": round(validation_quality, 1), - "evidence_freshness": round(evidence_freshness, 1), - "control_effectiveness": round(control_effectiveness, 1), - "overall_readiness": round(overall_readiness, 1), - "hard_blocks": hard_blocks, - } - - -class ControlMappingRepository: - """Repository for requirement-control mappings.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - requirement_id: str, - control_id: str, - coverage_level: str = "full", - notes: Optional[str] = None, - ) -> ControlMappingDB: - """Create a mapping.""" - # Get the control UUID from control_id - control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() - if not control: - raise ValueError(f"Control {control_id} not found") - - mapping = ControlMappingDB( - id=str(uuid.uuid4()), - requirement_id=requirement_id, - control_id=control.id, - coverage_level=coverage_level, - notes=notes, - ) - self.db.add(mapping) - self.db.commit() - self.db.refresh(mapping) - return mapping - - def get_by_requirement(self, requirement_id: str) -> List[ControlMappingDB]: - """Get all mappings for a requirement.""" - return ( - self.db.query(ControlMappingDB) - .filter(ControlMappingDB.requirement_id == requirement_id) - .all() - ) - - def get_by_control(self, control_uuid: str) -> List[ControlMappingDB]: - """Get all mappings for a control.""" - return ( - self.db.query(ControlMappingDB) - .filter(ControlMappingDB.control_id == control_uuid) - .all() - ) - - -class EvidenceRepository: - """Repository for evidence.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - control_id: str, - evidence_type: str, - title: str, - description: Optional[str] = None, - artifact_path: Optional[str] = None, - artifact_url: Optional[str] = None, - artifact_hash: Optional[str] = None, - file_size_bytes: Optional[int] = None, - mime_type: Optional[str] = None, - valid_until: Optional[datetime] = None, - source: str = "manual", - ci_job_id: Optional[str] = None, - uploaded_by: Optional[str] = None, - ) -> EvidenceDB: - """Create evidence record.""" - # Get control UUID - control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() - if not control: - raise ValueError(f"Control {control_id} not found") - - evidence = EvidenceDB( - id=str(uuid.uuid4()), - control_id=control.id, - evidence_type=evidence_type, - title=title, - description=description, - artifact_path=artifact_path, - artifact_url=artifact_url, - artifact_hash=artifact_hash, - file_size_bytes=file_size_bytes, - mime_type=mime_type, - valid_until=valid_until, - source=source, - ci_job_id=ci_job_id, - uploaded_by=uploaded_by, - ) - self.db.add(evidence) - self.db.commit() - self.db.refresh(evidence) - return evidence - - def get_by_id(self, evidence_id: str) -> Optional[EvidenceDB]: - """Get evidence by ID.""" - return self.db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() - - def get_by_control( - self, - control_id: str, - status: Optional[EvidenceStatusEnum] = None - ) -> List[EvidenceDB]: - """Get all evidence for a control.""" - control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() - if not control: - return [] - - query = self.db.query(EvidenceDB).filter(EvidenceDB.control_id == control.id) - if status: - query = query.filter(EvidenceDB.status == status) - return query.order_by(EvidenceDB.collected_at.desc()).all() - - def get_all( - self, - evidence_type: Optional[str] = None, - status: Optional[EvidenceStatusEnum] = None, - limit: int = 100, - ) -> List[EvidenceDB]: - """Get all evidence with filters.""" - query = self.db.query(EvidenceDB) - if evidence_type: - query = query.filter(EvidenceDB.evidence_type == evidence_type) - if status: - query = query.filter(EvidenceDB.status == status) - return query.order_by(EvidenceDB.collected_at.desc()).limit(limit).all() - - def update_status(self, evidence_id: str, status: EvidenceStatusEnum) -> Optional[EvidenceDB]: - """Update evidence status.""" - evidence = self.get_by_id(evidence_id) - if not evidence: - return None - evidence.status = status - evidence.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(evidence) - return evidence - - def get_statistics(self) -> Dict[str, Any]: - """Get evidence statistics.""" - total = self.db.query(func.count(EvidenceDB.id)).scalar() - - by_type = dict( - self.db.query(EvidenceDB.evidence_type, func.count(EvidenceDB.id)) - .group_by(EvidenceDB.evidence_type) - .all() - ) - - by_status = dict( - self.db.query(EvidenceDB.status, func.count(EvidenceDB.id)) - .group_by(EvidenceDB.status) - .all() - ) - - valid = by_status.get(EvidenceStatusEnum.VALID, 0) - coverage = (valid / total * 100) if total > 0 else 0 - - return { - "total": total, - "by_type": by_type, - "by_status": {str(k.value) if k else "none": v for k, v in by_status.items()}, - "coverage_percent": round(coverage, 1), - } - - -class RiskRepository: - """Repository for risks.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - risk_id: str, - title: str, - category: str, - likelihood: int, - impact: int, - description: Optional[str] = None, - mitigating_controls: Optional[List[str]] = None, - owner: Optional[str] = None, - treatment_plan: Optional[str] = None, - ) -> RiskDB: - """Create a risk.""" - inherent_risk = RiskDB.calculate_risk_level(likelihood, impact) - - risk = RiskDB( - id=str(uuid.uuid4()), - risk_id=risk_id, - title=title, - description=description, - category=category, - likelihood=likelihood, - impact=impact, - inherent_risk=inherent_risk, - mitigating_controls=mitigating_controls or [], - owner=owner, - treatment_plan=treatment_plan, - ) - self.db.add(risk) - self.db.commit() - self.db.refresh(risk) - return risk - - def get_by_id(self, risk_uuid: str) -> Optional[RiskDB]: - """Get risk by UUID.""" - return self.db.query(RiskDB).filter(RiskDB.id == risk_uuid).first() - - def get_by_risk_id(self, risk_id: str) -> Optional[RiskDB]: - """Get risk by risk_id (e.g., 'RISK-001').""" - return self.db.query(RiskDB).filter(RiskDB.risk_id == risk_id).first() - - def get_all( - self, - category: Optional[str] = None, - status: Optional[str] = None, - min_risk_level: Optional[RiskLevelEnum] = None, - ) -> List[RiskDB]: - """Get all risks with filters.""" - query = self.db.query(RiskDB) - if category: - query = query.filter(RiskDB.category == category) - if status: - query = query.filter(RiskDB.status == status) - if min_risk_level: - risk_order = { - RiskLevelEnum.LOW: 1, - RiskLevelEnum.MEDIUM: 2, - RiskLevelEnum.HIGH: 3, - RiskLevelEnum.CRITICAL: 4, - } - min_order = risk_order.get(min_risk_level, 1) - query = query.filter( - RiskDB.inherent_risk.in_( - [k for k, v in risk_order.items() if v >= min_order] - ) - ) - return query.order_by(RiskDB.risk_id).all() - - def update(self, risk_id: str, **kwargs) -> Optional[RiskDB]: - """Update a risk.""" - risk = self.get_by_risk_id(risk_id) - if not risk: - return None - - for key, value in kwargs.items(): - if hasattr(risk, key): - setattr(risk, key, value) - - # Recalculate risk levels if likelihood/impact changed - if 'likelihood' in kwargs or 'impact' in kwargs: - risk.inherent_risk = RiskDB.calculate_risk_level(risk.likelihood, risk.impact) - if 'residual_likelihood' in kwargs or 'residual_impact' in kwargs: - if risk.residual_likelihood and risk.residual_impact: - risk.residual_risk = RiskDB.calculate_risk_level( - risk.residual_likelihood, risk.residual_impact - ) - - risk.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(risk) - return risk - - def get_matrix_data(self) -> Dict[str, Any]: - """Get data for risk matrix visualization.""" - risks = self.get_all() - - matrix = {} - for risk in risks: - key = f"{risk.likelihood}_{risk.impact}" - if key not in matrix: - matrix[key] = [] - matrix[key].append({ - "risk_id": risk.risk_id, - "title": risk.title, - "inherent_risk": risk.inherent_risk.value if risk.inherent_risk else None, - }) - - return { - "matrix": matrix, - "total_risks": len(risks), - "by_level": { - "critical": len([r for r in risks if r.inherent_risk == RiskLevelEnum.CRITICAL]), - "high": len([r for r in risks if r.inherent_risk == RiskLevelEnum.HIGH]), - "medium": len([r for r in risks if r.inherent_risk == RiskLevelEnum.MEDIUM]), - "low": len([r for r in risks if r.inherent_risk == RiskLevelEnum.LOW]), - } - } - - -class AuditExportRepository: - """Repository for audit exports.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - export_type: str, - requested_by: str, - export_name: Optional[str] = None, - included_regulations: Optional[List[str]] = None, - included_domains: Optional[List[str]] = None, - date_range_start: Optional[date] = None, - date_range_end: Optional[date] = None, - ) -> AuditExportDB: - """Create an export request.""" - export = AuditExportDB( - id=str(uuid.uuid4()), - export_type=export_type, - export_name=export_name or f"audit_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - requested_by=requested_by, - included_regulations=included_regulations, - included_domains=included_domains, - date_range_start=date_range_start, - date_range_end=date_range_end, - ) - self.db.add(export) - self.db.commit() - self.db.refresh(export) - return export - - def get_by_id(self, export_id: str) -> Optional[AuditExportDB]: - """Get export by ID.""" - return self.db.query(AuditExportDB).filter(AuditExportDB.id == export_id).first() - - def get_all(self, limit: int = 50) -> List[AuditExportDB]: - """Get all exports.""" - return ( - self.db.query(AuditExportDB) - .order_by(AuditExportDB.requested_at.desc()) - .limit(limit) - .all() - ) - - def update_status( - self, - export_id: str, - status: ExportStatusEnum, - file_path: Optional[str] = None, - file_hash: Optional[str] = None, - file_size_bytes: Optional[int] = None, - error_message: Optional[str] = None, - total_controls: Optional[int] = None, - total_evidence: Optional[int] = None, - compliance_score: Optional[float] = None, - ) -> Optional[AuditExportDB]: - """Update export status.""" - export = self.get_by_id(export_id) - if not export: - return None - - export.status = status - if file_path: - export.file_path = file_path - if file_hash: - export.file_hash = file_hash - if file_size_bytes: - export.file_size_bytes = file_size_bytes - if error_message: - export.error_message = error_message - if total_controls is not None: - export.total_controls = total_controls - if total_evidence is not None: - export.total_evidence = total_evidence - if compliance_score is not None: - export.compliance_score = compliance_score - - if status == ExportStatusEnum.COMPLETED: - export.completed_at = datetime.utcnow() - - export.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(export) - return export - - -class ServiceModuleRepository: - """Repository for service modules (Sprint 3).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - name: str, - display_name: str, - service_type: str, - description: Optional[str] = None, - port: Optional[int] = None, - technology_stack: Optional[List[str]] = None, - repository_path: Optional[str] = None, - docker_image: Optional[str] = None, - data_categories: Optional[List[str]] = None, - processes_pii: bool = False, - processes_health_data: bool = False, - ai_components: bool = False, - criticality: str = "medium", - owner_team: Optional[str] = None, - owner_contact: Optional[str] = None, - ) -> "ServiceModuleDB": - """Create a service module.""" - from .models import ServiceModuleDB, ServiceTypeEnum - - module = ServiceModuleDB( - id=str(uuid.uuid4()), - name=name, - display_name=display_name, - description=description, - service_type=ServiceTypeEnum(service_type), - port=port, - technology_stack=technology_stack or [], - repository_path=repository_path, - docker_image=docker_image, - data_categories=data_categories or [], - processes_pii=processes_pii, - processes_health_data=processes_health_data, - ai_components=ai_components, - criticality=criticality, - owner_team=owner_team, - owner_contact=owner_contact, - ) - self.db.add(module) - self.db.commit() - self.db.refresh(module) - return module - - def get_by_id(self, module_id: str) -> Optional["ServiceModuleDB"]: - """Get module by ID.""" - from .models import ServiceModuleDB - return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.id == module_id).first() - - def get_by_name(self, name: str) -> Optional["ServiceModuleDB"]: - """Get module by name.""" - from .models import ServiceModuleDB - return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.name == name).first() - - def get_all( - self, - service_type: Optional[str] = None, - criticality: Optional[str] = None, - processes_pii: Optional[bool] = None, - ai_components: Optional[bool] = None, - ) -> List["ServiceModuleDB"]: - """Get all modules with filters.""" - from .models import ServiceModuleDB, ServiceTypeEnum - - query = self.db.query(ServiceModuleDB).filter(ServiceModuleDB.is_active) - - if service_type: - query = query.filter(ServiceModuleDB.service_type == ServiceTypeEnum(service_type)) - if criticality: - query = query.filter(ServiceModuleDB.criticality == criticality) - if processes_pii is not None: - query = query.filter(ServiceModuleDB.processes_pii == processes_pii) - if ai_components is not None: - query = query.filter(ServiceModuleDB.ai_components == ai_components) - - return query.order_by(ServiceModuleDB.name).all() - - def get_with_regulations(self, module_id: str) -> Optional["ServiceModuleDB"]: - """Get module with regulation mappings loaded.""" - from .models import ServiceModuleDB, ModuleRegulationMappingDB - from sqlalchemy.orm import selectinload - - return ( - self.db.query(ServiceModuleDB) - .options( - selectinload(ServiceModuleDB.regulation_mappings) - .selectinload(ModuleRegulationMappingDB.regulation) - ) - .filter(ServiceModuleDB.id == module_id) - .first() - ) - - def add_regulation_mapping( - self, - module_id: str, - regulation_id: str, - relevance_level: str = "medium", - notes: Optional[str] = None, - applicable_articles: Optional[List[str]] = None, - ) -> "ModuleRegulationMappingDB": - """Add a regulation mapping to a module.""" - from .models import ModuleRegulationMappingDB, RelevanceLevelEnum - - mapping = ModuleRegulationMappingDB( - id=str(uuid.uuid4()), - module_id=module_id, - regulation_id=regulation_id, - relevance_level=RelevanceLevelEnum(relevance_level), - notes=notes, - applicable_articles=applicable_articles, - ) - self.db.add(mapping) - self.db.commit() - self.db.refresh(mapping) - return mapping - - def get_overview(self) -> Dict[str, Any]: - """Get overview statistics for all modules.""" - from .models import ModuleRegulationMappingDB - - modules = self.get_all() - total = len(modules) - - by_type = {} - by_criticality = {} - pii_count = 0 - ai_count = 0 - - for m in modules: - type_key = m.service_type.value if m.service_type else "unknown" - by_type[type_key] = by_type.get(type_key, 0) + 1 - by_criticality[m.criticality] = by_criticality.get(m.criticality, 0) + 1 - if m.processes_pii: - pii_count += 1 - if m.ai_components: - ai_count += 1 - - # Get regulation coverage - regulation_coverage = {} - mappings = self.db.query(ModuleRegulationMappingDB).all() - for mapping in mappings: - reg = mapping.regulation - if reg: - code = reg.code - regulation_coverage[code] = regulation_coverage.get(code, 0) + 1 - - # Calculate average compliance score - scores = [m.compliance_score for m in modules if m.compliance_score is not None] - avg_score = sum(scores) / len(scores) if scores else None - - return { - "total_modules": total, - "modules_by_type": by_type, - "modules_by_criticality": by_criticality, - "modules_processing_pii": pii_count, - "modules_with_ai": ai_count, - "average_compliance_score": round(avg_score, 1) if avg_score else None, - "regulations_coverage": regulation_coverage, - } - - def seed_from_data(self, services_data: List[Dict[str, Any]], force: bool = False) -> Dict[str, int]: - """Seed modules from service_modules.py data.""" - - modules_created = 0 - mappings_created = 0 - - for svc in services_data: - # Check if module exists - existing = self.get_by_name(svc["name"]) - if existing and not force: - continue - - if existing and force: - # Delete existing module (cascades to mappings) - self.db.delete(existing) - self.db.commit() - - # Create module - module = self.create( - name=svc["name"], - display_name=svc["display_name"], - description=svc.get("description"), - service_type=svc["service_type"], - port=svc.get("port"), - technology_stack=svc.get("technology_stack"), - repository_path=svc.get("repository_path"), - docker_image=svc.get("docker_image"), - data_categories=svc.get("data_categories"), - processes_pii=svc.get("processes_pii", False), - processes_health_data=svc.get("processes_health_data", False), - ai_components=svc.get("ai_components", False), - criticality=svc.get("criticality", "medium"), - owner_team=svc.get("owner_team"), - ) - modules_created += 1 - - # Create regulation mappings - for reg_data in svc.get("regulations", []): - # Find regulation by code - reg = self.db.query(RegulationDB).filter( - RegulationDB.code == reg_data["code"] - ).first() - - if reg: - self.add_regulation_mapping( - module_id=module.id, - regulation_id=reg.id, - relevance_level=reg_data.get("relevance", "medium"), - notes=reg_data.get("notes"), - ) - mappings_created += 1 - - return { - "modules_created": modules_created, - "mappings_created": mappings_created, - } - - -class AuditSessionRepository: - """Repository for audit sessions (Sprint 3: Auditor-Verbesserungen).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - name: str, - auditor_name: str, - description: Optional[str] = None, - auditor_email: Optional[str] = None, - regulation_ids: Optional[List[str]] = None, - ) -> AuditSessionDB: - """Create a new audit session.""" - session = AuditSessionDB( - id=str(uuid.uuid4()), - name=name, - description=description, - auditor_name=auditor_name, - auditor_email=auditor_email, - regulation_ids=regulation_ids, - status=AuditSessionStatusEnum.DRAFT, - ) - self.db.add(session) - self.db.commit() - self.db.refresh(session) - return session - - def get_by_id(self, session_id: str) -> Optional[AuditSessionDB]: - """Get audit session by ID with eager-loaded signoffs.""" - return ( - self.db.query(AuditSessionDB) - .options( - selectinload(AuditSessionDB.signoffs) - .selectinload(AuditSignOffDB.requirement) - ) - .filter(AuditSessionDB.id == session_id) - .first() - ) - - def get_all( - self, - status: Optional[AuditSessionStatusEnum] = None, - limit: int = 50, - ) -> List[AuditSessionDB]: - """Get all audit sessions with optional status filter.""" - query = self.db.query(AuditSessionDB) - if status: - query = query.filter(AuditSessionDB.status == status) - return query.order_by(AuditSessionDB.created_at.desc()).limit(limit).all() - - def update_status( - self, - session_id: str, - status: AuditSessionStatusEnum, - ) -> Optional[AuditSessionDB]: - """Update session status and set appropriate timestamps.""" - session = self.get_by_id(session_id) - if not session: - return None - - session.status = status - if status == AuditSessionStatusEnum.IN_PROGRESS and not session.started_at: - session.started_at = datetime.utcnow() - elif status == AuditSessionStatusEnum.COMPLETED: - session.completed_at = datetime.utcnow() - - session.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(session) - return session - - def update_progress( - self, - session_id: str, - total_items: Optional[int] = None, - completed_items: Optional[int] = None, - ) -> Optional[AuditSessionDB]: - """Update session progress counters.""" - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if not session: - return None - - if total_items is not None: - session.total_items = total_items - if completed_items is not None: - session.completed_items = completed_items - - session.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(session) - return session - - def start_session(self, session_id: str) -> Optional[AuditSessionDB]: - """ - Start an audit session: - - Set status to IN_PROGRESS - - Initialize total_items based on requirements count - """ - session = self.get_by_id(session_id) - if not session: - return None - - # Count requirements for this session - query = self.db.query(func.count(RequirementDB.id)) - if session.regulation_ids: - query = query.join(RegulationDB).filter( - RegulationDB.id.in_(session.regulation_ids) - ) - total_requirements = query.scalar() or 0 - - session.status = AuditSessionStatusEnum.IN_PROGRESS - session.started_at = datetime.utcnow() - session.total_items = total_requirements - session.updated_at = datetime.utcnow() - - self.db.commit() - self.db.refresh(session) - return session - - def delete(self, session_id: str) -> bool: - """Delete an audit session (cascades to signoffs).""" - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if not session: - return False - - self.db.delete(session) - self.db.commit() - return True - - def get_statistics(self, session_id: str) -> Dict[str, Any]: - """Get detailed statistics for an audit session.""" - session = self.get_by_id(session_id) - if not session: - return {} - - signoffs = session.signoffs or [] - - stats = { - "total": session.total_items or 0, - "completed": len([s for s in signoffs if s.result != AuditResultEnum.PENDING]), - "compliant": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT]), - "compliant_with_notes": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES]), - "non_compliant": len([s for s in signoffs if s.result == AuditResultEnum.NON_COMPLIANT]), - "not_applicable": len([s for s in signoffs if s.result == AuditResultEnum.NOT_APPLICABLE]), - "pending": len([s for s in signoffs if s.result == AuditResultEnum.PENDING]), - "signed": len([s for s in signoffs if s.signature_hash]), - } - - total = stats["total"] if stats["total"] > 0 else 1 - stats["completion_percentage"] = round( - (stats["completed"] / total) * 100, 1 - ) - - return stats - - -class AuditSignOffRepository: - """Repository for audit sign-offs (Sprint 3: Auditor-Verbesserungen).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - session_id: str, - requirement_id: str, - result: AuditResultEnum = AuditResultEnum.PENDING, - notes: Optional[str] = None, - ) -> AuditSignOffDB: - """Create a new sign-off for a requirement.""" - signoff = AuditSignOffDB( - id=str(uuid.uuid4()), - session_id=session_id, - requirement_id=requirement_id, - result=result, - notes=notes, - ) - self.db.add(signoff) - self.db.commit() - self.db.refresh(signoff) - return signoff - - def get_by_id(self, signoff_id: str) -> Optional[AuditSignOffDB]: - """Get sign-off by ID.""" - return ( - self.db.query(AuditSignOffDB) - .options(joinedload(AuditSignOffDB.requirement)) - .filter(AuditSignOffDB.id == signoff_id) - .first() - ) - - def get_by_session_and_requirement( - self, - session_id: str, - requirement_id: str, - ) -> Optional[AuditSignOffDB]: - """Get sign-off by session and requirement ID.""" - return ( - self.db.query(AuditSignOffDB) - .filter( - and_( - AuditSignOffDB.session_id == session_id, - AuditSignOffDB.requirement_id == requirement_id, - ) - ) - .first() - ) - - def get_by_session( - self, - session_id: str, - result_filter: Optional[AuditResultEnum] = None, - ) -> List[AuditSignOffDB]: - """Get all sign-offs for a session.""" - query = ( - self.db.query(AuditSignOffDB) - .options(joinedload(AuditSignOffDB.requirement)) - .filter(AuditSignOffDB.session_id == session_id) - ) - if result_filter: - query = query.filter(AuditSignOffDB.result == result_filter) - return query.order_by(AuditSignOffDB.created_at).all() - - def update( - self, - signoff_id: str, - result: Optional[AuditResultEnum] = None, - notes: Optional[str] = None, - sign: bool = False, - signed_by: Optional[str] = None, - ) -> Optional[AuditSignOffDB]: - """Update a sign-off with optional digital signature.""" - signoff = self.db.query(AuditSignOffDB).filter( - AuditSignOffDB.id == signoff_id - ).first() - if not signoff: - return None - - if result is not None: - signoff.result = result - if notes is not None: - signoff.notes = notes - - if sign and signed_by: - signoff.create_signature(signed_by) - - signoff.updated_at = datetime.utcnow() - self.db.commit() - self.db.refresh(signoff) - - # Update session progress - self._update_session_progress(signoff.session_id) - - return signoff - - def sign_off( - self, - session_id: str, - requirement_id: str, - result: AuditResultEnum, - notes: Optional[str] = None, - sign: bool = False, - signed_by: Optional[str] = None, - ) -> AuditSignOffDB: - """ - Create or update a sign-off for a requirement. - This is the main method for auditors to record their findings. - """ - # Check if sign-off already exists - signoff = self.get_by_session_and_requirement(session_id, requirement_id) - - if signoff: - # Update existing - signoff.result = result - if notes is not None: - signoff.notes = notes - if sign and signed_by: - signoff.create_signature(signed_by) - signoff.updated_at = datetime.utcnow() - else: - # Create new - signoff = AuditSignOffDB( - id=str(uuid.uuid4()), - session_id=session_id, - requirement_id=requirement_id, - result=result, - notes=notes, - ) - if sign and signed_by: - signoff.create_signature(signed_by) - self.db.add(signoff) - - self.db.commit() - self.db.refresh(signoff) - - # Update session progress - self._update_session_progress(session_id) - - return signoff - - def _update_session_progress(self, session_id: str) -> None: - """Update the session's completed_items count.""" - completed = ( - self.db.query(func.count(AuditSignOffDB.id)) - .filter( - and_( - AuditSignOffDB.session_id == session_id, - AuditSignOffDB.result != AuditResultEnum.PENDING, - ) - ) - .scalar() - ) or 0 - - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if session: - session.completed_items = completed - session.updated_at = datetime.utcnow() - self.db.commit() - - def get_checklist( - self, - session_id: str, - page: int = 1, - page_size: int = 50, - result_filter: Optional[AuditResultEnum] = None, - regulation_code: Optional[str] = None, - search: Optional[str] = None, - ) -> Tuple[List[Dict[str, Any]], int]: - """ - Get audit checklist items for a session with pagination. - Returns requirements with their sign-off status. - """ - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if not session: - return [], 0 - - # Base query for requirements - query = ( - self.db.query(RequirementDB) - .options( - joinedload(RequirementDB.regulation), - selectinload(RequirementDB.control_mappings), - ) - ) - - # Filter by session's regulation_ids if set - if session.regulation_ids: - query = query.filter(RequirementDB.regulation_id.in_(session.regulation_ids)) - - # Filter by regulation code - if regulation_code: - query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code) - - # Search - if search: - search_term = f"%{search}%" - query = query.filter( - or_( - RequirementDB.title.ilike(search_term), - RequirementDB.article.ilike(search_term), - ) - ) - - # Get existing sign-offs for this session - signoffs_map = {} - signoffs = ( - self.db.query(AuditSignOffDB) - .filter(AuditSignOffDB.session_id == session_id) - .all() - ) - for s in signoffs: - signoffs_map[s.requirement_id] = s - - # Filter by result if specified - if result_filter: - if result_filter == AuditResultEnum.PENDING: - # Requirements without sign-off or with pending status - signed_req_ids = [ - s.requirement_id for s in signoffs - if s.result != AuditResultEnum.PENDING - ] - if signed_req_ids: - query = query.filter(~RequirementDB.id.in_(signed_req_ids)) - else: - # Requirements with specific result - matching_req_ids = [ - s.requirement_id for s in signoffs - if s.result == result_filter - ] - if matching_req_ids: - query = query.filter(RequirementDB.id.in_(matching_req_ids)) - else: - return [], 0 - - # Count and paginate - total = query.count() - requirements = ( - query - .order_by(RequirementDB.article, RequirementDB.paragraph) - .offset((page - 1) * page_size) - .limit(page_size) - .all() - ) - - # Build checklist items - items = [] - for req in requirements: - signoff = signoffs_map.get(req.id) - items.append({ - "requirement_id": req.id, - "regulation_code": req.regulation.code if req.regulation else None, - "regulation_name": req.regulation.name if req.regulation else None, - "article": req.article, - "paragraph": req.paragraph, - "title": req.title, - "description": req.description, - "current_result": signoff.result.value if signoff else AuditResultEnum.PENDING.value, - "notes": signoff.notes if signoff else None, - "is_signed": bool(signoff.signature_hash) if signoff else False, - "signed_at": signoff.signed_at if signoff else None, - "signed_by": signoff.signed_by if signoff else None, - "evidence_count": len(req.control_mappings) if req.control_mappings else 0, - "controls_mapped": len(req.control_mappings) if req.control_mappings else 0, - }) - - return items, total - - def delete(self, signoff_id: str) -> bool: - """Delete a sign-off.""" - signoff = self.db.query(AuditSignOffDB).filter( - AuditSignOffDB.id == signoff_id - ).first() - if not signoff: - return False - - session_id = signoff.session_id - self.db.delete(signoff) - self.db.commit() - - # Update session progress - self._update_session_progress(session_id) - - return True