diff --git a/backend-compliance/compliance/api/schemas.py b/backend-compliance/compliance/api/schemas.py index 69b9ec2..a8145e2 100644 --- a/backend-compliance/compliance/api/schemas.py +++ b/backend-compliance/compliance/api/schemas.py @@ -1,1899 +1,39 @@ """ -Pydantic schemas for Compliance API. +compliance.api.schemas — backwards-compatibility re-export shim. + +Phase 1 Step 3 split the monolithic 1899-line schemas module into per-domain +sibling modules under ``compliance.schemas``. Every public symbol is +re-exported here so existing imports +(``from compliance.api.schemas import RegulationResponse, ...``) continue +to work unchanged. + +New code SHOULD import directly from the domain module: + + from compliance.schemas.regulation import RegulationResponse + from compliance.schemas.control import ControlResponse + from compliance.schemas.dsr import ... # (future) + +During the split, every ``class Config:`` block was converted to the +Pydantic V2 ``model_config = ConfigDict(...)`` idiom (28 conversions). + +DO NOT add new classes to this file. Add them to the appropriate domain +module under ``compliance.schemas/`` and re-export here if you need +backwards compatibility. """ -from datetime import datetime, date -from typing import Optional, List, Any, Dict -from pydantic import BaseModel, Field - - -# ============================================================================ -# Enums as strings for API -# ============================================================================ - -class RegulationType(str): - EU_REGULATION = "eu_regulation" - EU_DIRECTIVE = "eu_directive" - DE_LAW = "de_law" - BSI_STANDARD = "bsi_standard" - INDUSTRY_STANDARD = "industry_standard" - - -class ControlType(str): - PREVENTIVE = "preventive" - DETECTIVE = "detective" - CORRECTIVE = "corrective" - - -class ControlDomain(str): - GOVERNANCE = "gov" - PRIVACY = "priv" - IAM = "iam" - CRYPTO = "crypto" - SDLC = "sdlc" - OPS = "ops" - AI = "ai" - CRA = "cra" - AUDIT = "aud" - - -class ControlStatus(str): - PASS = "pass" - PARTIAL = "partial" - FAIL = "fail" - NOT_APPLICABLE = "n/a" - PLANNED = "planned" - - -class RiskLevel(str): - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - CRITICAL = "critical" - - -class EvidenceStatus(str): - VALID = "valid" - EXPIRED = "expired" - PENDING = "pending" - FAILED = "failed" - - -# ============================================================================ -# Regulation Schemas -# ============================================================================ - -class RegulationBase(BaseModel): - code: str - name: str - full_name: Optional[str] = None - regulation_type: str - source_url: Optional[str] = None - local_pdf_path: Optional[str] = None - effective_date: Optional[date] = None - description: Optional[str] = None - is_active: bool = True - - -class RegulationCreate(RegulationBase): - pass - - -class RegulationResponse(RegulationBase): - id: str - created_at: datetime - updated_at: datetime - requirement_count: Optional[int] = None - - class Config: - from_attributes = True - - -class RegulationListResponse(BaseModel): - regulations: List[RegulationResponse] - total: int - - -# ============================================================================ -# Pagination Schemas (defined here, completed after Response classes) -# ============================================================================ - -class PaginationMeta(BaseModel): - """Pagination metadata for list responses.""" - page: int - page_size: int - total: int - total_pages: int - has_next: bool - has_prev: bool - - -# ============================================================================ -# Requirement Schemas -# ============================================================================ - -class RequirementBase(BaseModel): - article: str - paragraph: Optional[str] = None - title: str - description: Optional[str] = None - requirement_text: Optional[str] = None - breakpilot_interpretation: Optional[str] = None - is_applicable: bool = True - applicability_reason: Optional[str] = None - priority: int = 2 - - -class RequirementCreate(RequirementBase): - regulation_id: str - - -class RequirementResponse(RequirementBase): - id: str - regulation_id: str - regulation_code: Optional[str] = None - - # Implementation tracking - implementation_status: Optional[str] = "not_started" - implementation_details: Optional[str] = None - code_references: Optional[List[Dict[str, Any]]] = None - documentation_links: Optional[List[str]] = None - - # Evidence for auditors - evidence_description: Optional[str] = None - evidence_artifacts: Optional[List[Dict[str, Any]]] = None - - # Audit tracking - auditor_notes: Optional[str] = None - audit_status: Optional[str] = "pending" - last_audit_date: Optional[datetime] = None - last_auditor: Optional[str] = None - - # Source reference - source_page: Optional[int] = None - source_section: Optional[str] = None - - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class RequirementListResponse(BaseModel): - requirements: List[RequirementResponse] - total: int - - -class PaginatedRequirementResponse(BaseModel): - """Paginated response for requirements - optimized for large datasets.""" - data: List[RequirementResponse] - pagination: PaginationMeta - - -# ============================================================================ -# Control Schemas -# ============================================================================ - -class ControlBase(BaseModel): - control_id: str - domain: str - control_type: str - title: str - description: Optional[str] = None - pass_criteria: str - implementation_guidance: Optional[str] = None - code_reference: Optional[str] = None - documentation_url: Optional[str] = None - is_automated: bool = False - automation_tool: Optional[str] = None - automation_config: Optional[Dict[str, Any]] = None - owner: Optional[str] = None - review_frequency_days: int = 90 - - -class ControlCreate(ControlBase): - pass - - -class ControlUpdate(BaseModel): - title: Optional[str] = None - description: Optional[str] = None - pass_criteria: Optional[str] = None - implementation_guidance: Optional[str] = None - code_reference: Optional[str] = None - documentation_url: Optional[str] = None - is_automated: Optional[bool] = None - automation_tool: Optional[str] = None - automation_config: Optional[Dict[str, Any]] = None - owner: Optional[str] = None - status: Optional[str] = None - status_notes: Optional[str] = None - - -class ControlResponse(ControlBase): - id: str - status: str - status_notes: Optional[str] = None - last_reviewed_at: Optional[datetime] = None - next_review_at: Optional[datetime] = None - created_at: datetime - updated_at: datetime - evidence_count: Optional[int] = None - requirement_count: Optional[int] = None - - class Config: - from_attributes = True - - -class ControlListResponse(BaseModel): - controls: List[ControlResponse] - total: int - - -class PaginatedControlResponse(BaseModel): - """Paginated response for controls - optimized for large datasets.""" - data: List[ControlResponse] - pagination: PaginationMeta - - -class ControlReviewRequest(BaseModel): - status: str - status_notes: Optional[str] = None - - -# ============================================================================ -# Control Mapping Schemas -# ============================================================================ - -class MappingBase(BaseModel): - requirement_id: str - control_id: str - coverage_level: str = "full" - notes: Optional[str] = None - - -class MappingCreate(MappingBase): - pass - - -class MappingResponse(MappingBase): - id: str - requirement_article: Optional[str] = None - requirement_title: Optional[str] = None - control_control_id: Optional[str] = None - control_title: Optional[str] = None - created_at: datetime - - class Config: - from_attributes = True - - -class MappingListResponse(BaseModel): - mappings: List[MappingResponse] - total: int - - -# ============================================================================ -# Evidence Schemas -# ============================================================================ - -class EvidenceBase(BaseModel): - control_id: str - evidence_type: str - title: str - description: Optional[str] = None - artifact_url: Optional[str] = None - valid_from: Optional[datetime] = None - valid_until: Optional[datetime] = None - source: Optional[str] = None - ci_job_id: Optional[str] = None - - -class EvidenceCreate(EvidenceBase): - pass - - -class EvidenceResponse(EvidenceBase): - id: str - artifact_path: Optional[str] = None - artifact_hash: Optional[str] = None - file_size_bytes: Optional[int] = None - mime_type: Optional[str] = None - status: str - uploaded_by: Optional[str] = None - collected_at: datetime - created_at: datetime - - class Config: - from_attributes = True - - -class EvidenceListResponse(BaseModel): - evidence: List[EvidenceResponse] - total: int - - -class EvidenceCollectRequest(BaseModel): - """Request to auto-collect evidence from CI.""" - control_id: str - evidence_type: str - title: str - ci_job_id: str - artifact_url: str - - -# ============================================================================ -# Risk Schemas -# ============================================================================ - -class RiskBase(BaseModel): - risk_id: str - title: str - description: Optional[str] = None - category: str - likelihood: int = Field(ge=1, le=5) - impact: int = Field(ge=1, le=5) - mitigating_controls: Optional[List[str]] = None - owner: Optional[str] = None - treatment_plan: Optional[str] = None - - -class RiskCreate(RiskBase): - pass - - -class RiskUpdate(BaseModel): - title: Optional[str] = None - description: Optional[str] = None - category: Optional[str] = None - likelihood: Optional[int] = Field(default=None, ge=1, le=5) - impact: Optional[int] = Field(default=None, ge=1, le=5) - residual_likelihood: Optional[int] = Field(default=None, ge=1, le=5) - residual_impact: Optional[int] = Field(default=None, ge=1, le=5) - mitigating_controls: Optional[List[str]] = None - owner: Optional[str] = None - status: Optional[str] = None - treatment_plan: Optional[str] = None - - -class RiskResponse(RiskBase): - id: str - inherent_risk: str - residual_likelihood: Optional[int] = None - residual_impact: Optional[int] = None - residual_risk: Optional[str] = None - status: str - identified_date: Optional[date] = None - review_date: Optional[date] = None - last_assessed_at: Optional[datetime] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class RiskListResponse(BaseModel): - risks: List[RiskResponse] - total: int - - -class RiskMatrixResponse(BaseModel): - """Risk matrix data for visualization.""" - matrix: Dict[str, Dict[str, List[str]]] # likelihood -> impact -> risk_ids - risks: List[RiskResponse] - - -# ============================================================================ -# AI System Schemas (AI Act Compliance) -# ============================================================================ - -class AISystemBase(BaseModel): - name: str - description: Optional[str] = None - purpose: Optional[str] = None - sector: Optional[str] = None - classification: str = "unclassified" - status: str = "draft" - obligations: Optional[List[str]] = None - - -class AISystemCreate(AISystemBase): - pass - - -class AISystemUpdate(BaseModel): - name: Optional[str] = None - description: Optional[str] = None - purpose: Optional[str] = None - sector: Optional[str] = None - classification: Optional[str] = None - status: Optional[str] = None - obligations: Optional[List[str]] = None - - -class AISystemResponse(AISystemBase): - id: str - assessment_date: Optional[datetime] = None - assessment_result: Optional[Dict[str, Any]] = None - risk_factors: Optional[List[Dict[str, Any]]] = None - recommendations: Optional[List[str]] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class AISystemListResponse(BaseModel): - systems: List[AISystemResponse] - total: int - - -# ============================================================================ -# Dashboard & Export Schemas -# ============================================================================ - -class DashboardResponse(BaseModel): - compliance_score: float - total_regulations: int - total_requirements: int - total_controls: int - controls_by_status: Dict[str, int] - controls_by_domain: Dict[str, Dict[str, int]] - total_evidence: int - evidence_by_status: Dict[str, int] - total_risks: int - risks_by_level: Dict[str, int] - recent_activity: List[Dict[str, Any]] - - -class ExportRequest(BaseModel): - export_type: str = "full" # "full", "controls_only", "evidence_only" - included_regulations: Optional[List[str]] = None - included_domains: Optional[List[str]] = None - date_range_start: Optional[date] = None - date_range_end: Optional[date] = None - - -class ExportResponse(BaseModel): - id: str - export_type: str - export_name: Optional[str] = None - status: str - requested_by: str - requested_at: datetime - completed_at: Optional[datetime] = None - file_path: Optional[str] = None - file_hash: Optional[str] = None - file_size_bytes: Optional[int] = None - total_controls: Optional[int] = None - total_evidence: Optional[int] = None - compliance_score: Optional[float] = None - error_message: Optional[str] = None - - class Config: - from_attributes = True - - -class ExportListResponse(BaseModel): - exports: List[ExportResponse] - total: int - - -# ============================================================================ -# Seeding Schemas -# ============================================================================ - -class SeedRequest(BaseModel): - force: bool = False - - -class SeedResponse(BaseModel): - success: bool - message: str - counts: Dict[str, int] - - -class PaginatedEvidenceResponse(BaseModel): - """Paginated response for evidence.""" - data: List[EvidenceResponse] - pagination: PaginationMeta - - -class PaginatedRiskResponse(BaseModel): - """Paginated response for risks.""" - data: List[RiskResponse] - pagination: PaginationMeta - - -# ============================================================================ -# Service Module Schemas (Sprint 3) -# ============================================================================ - -class ServiceModuleBase(BaseModel): - """Base schema for service modules.""" - name: str - display_name: str - description: Optional[str] = None - service_type: str - port: Optional[int] = None - technology_stack: Optional[List[str]] = None - repository_path: Optional[str] = None - docker_image: Optional[str] = None - data_categories: Optional[List[str]] = None - processes_pii: bool = False - processes_health_data: bool = False - ai_components: bool = False - criticality: str = "medium" - owner_team: Optional[str] = None - owner_contact: Optional[str] = None - - -class ServiceModuleCreate(ServiceModuleBase): - """Schema for creating a service module.""" - pass - - -class ServiceModuleResponse(ServiceModuleBase): - """Response schema for service module.""" - id: str - is_active: bool - compliance_score: Optional[float] = None - last_compliance_check: Optional[datetime] = None - created_at: datetime - updated_at: datetime - regulation_count: Optional[int] = None - risk_count: Optional[int] = None - - class Config: - from_attributes = True - - -class ServiceModuleListResponse(BaseModel): - """List response for service modules.""" - modules: List[ServiceModuleResponse] - total: int - - -class ServiceModuleDetailResponse(ServiceModuleResponse): - """Detailed response including regulations and risks.""" - regulations: Optional[List[Dict[str, Any]]] = None - risks: Optional[List[Dict[str, Any]]] = None - - -class ModuleRegulationMappingBase(BaseModel): - """Base schema for module-regulation mapping.""" - module_id: str - regulation_id: str - relevance_level: str = "medium" - notes: Optional[str] = None - applicable_articles: Optional[List[str]] = None - - -class ModuleRegulationMappingCreate(ModuleRegulationMappingBase): - """Schema for creating a module-regulation mapping.""" - pass - - -class ModuleRegulationMappingResponse(ModuleRegulationMappingBase): - """Response schema for module-regulation mapping.""" - id: str - module_name: Optional[str] = None - regulation_code: Optional[str] = None - regulation_name: Optional[str] = None - created_at: datetime - - class Config: - from_attributes = True - - -class ModuleSeedRequest(BaseModel): - """Request to seed service modules.""" - force: bool = False - - -class ModuleSeedResponse(BaseModel): - """Response from seeding service modules.""" - success: bool - message: str - modules_created: int - mappings_created: int - - -class ModuleComplianceOverview(BaseModel): - """Overview of compliance status for all modules.""" - total_modules: int - modules_by_type: Dict[str, int] - modules_by_criticality: Dict[str, int] - modules_processing_pii: int - modules_with_ai: int - average_compliance_score: Optional[float] = None - regulations_coverage: Dict[str, int] # regulation_code -> module_count - - -# ============================================================================ -# Executive Dashboard Schemas (Phase 3 - Sprint 1) -# ============================================================================ - -class TrendDataPoint(BaseModel): - """A single data point for trend charts.""" - date: str # ISO date string - score: float - label: Optional[str] = None # Formatted date for display (e.g., "Jan 26") - - -class RiskSummary(BaseModel): - """Summary of a risk for executive display.""" - id: str - risk_id: str - title: str - risk_level: str # "low", "medium", "high", "critical" - owner: Optional[str] = None - status: str - category: str - impact: int - likelihood: int - - -class DeadlineItem(BaseModel): - """An upcoming deadline for executive display.""" - id: str - title: str - deadline: str # ISO date string - days_remaining: int - type: str # "control_review", "evidence_expiry", "audit" - status: str # "on_track", "at_risk", "overdue" - owner: Optional[str] = None - - -class TeamWorkloadItem(BaseModel): - """Workload distribution for a team or person.""" - name: str - pending_tasks: int - in_progress_tasks: int - completed_tasks: int - total_tasks: int - completion_rate: float - - -class ExecutiveDashboardResponse(BaseModel): - """ - Executive Dashboard Response - - Provides a high-level overview for managers and executives: - - Traffic light status (green/yellow/red) - - Overall compliance score - - 12-month trend data - - Top 5 risks - - Upcoming deadlines - - Team workload distribution - """ - traffic_light_status: str # "green", "yellow", "red" - overall_score: float - score_trend: List[TrendDataPoint] - previous_score: Optional[float] = None - score_change: Optional[float] = None # Positive = improvement - - # Counts - total_regulations: int - total_requirements: int - total_controls: int - open_risks: int - - # Top items - top_risks: List[RiskSummary] - upcoming_deadlines: List[DeadlineItem] - - # Workload - team_workload: List[TeamWorkloadItem] - - # Last updated - last_updated: str - - -class ComplianceSnapshotCreate(BaseModel): - """Request to create a compliance snapshot.""" - notes: Optional[str] = None - - -class ComplianceSnapshotResponse(BaseModel): - """Response for a compliance snapshot.""" - id: str - snapshot_date: str - overall_score: float - scores_by_regulation: Dict[str, float] - scores_by_domain: Dict[str, float] - total_controls: int - passed_controls: int - failed_controls: int - notes: Optional[str] = None - created_at: str - - -# ============================================================================ -# PDF Extraction Schemas -# ============================================================================ - -class BSIAspectResponse(BaseModel): - """A single extracted BSI-TR Pruefaspekt (test aspect).""" - aspect_id: str - title: str - full_text: str - category: str - page_number: int - section: str - requirement_level: str - source_document: str - keywords: Optional[List[str]] = None - related_aspects: Optional[List[str]] = None - - -class PDFExtractionRequest(BaseModel): - """Request for PDF extraction.""" - document_code: str = Field(..., description="BSI-TR document code, e.g. BSI-TR-03161-2") - save_to_db: bool = Field(True, description="Whether to save extracted requirements to database") - force: bool = Field(False, description="Force re-extraction even if requirements exist") - - -class PDFExtractionResponse(BaseModel): - """Response from PDF extraction endpoint.""" - # Simple endpoint format (new /pdf/extract/{doc_code}) - doc_code: Optional[str] = None - total_extracted: Optional[int] = None - saved_to_db: Optional[int] = None - aspects: Optional[List[BSIAspectResponse]] = None - # Legacy scraper endpoint format (/scraper/extract-pdf) - success: Optional[bool] = None - source_document: Optional[str] = None - total_aspects: Optional[int] = None - statistics: Optional[Dict[str, Any]] = None - requirements_created: Optional[int] = None - - -# ============================================================================ -# Audit Session & Sign-off Schemas (Phase 3 - Sprint 3) -# ============================================================================ - -class AuditResult(str): - """Audit result values for sign-off.""" - COMPLIANT = "compliant" - COMPLIANT_WITH_NOTES = "compliant_notes" - NON_COMPLIANT = "non_compliant" - NOT_APPLICABLE = "not_applicable" - PENDING = "pending" - - -class AuditSessionStatus(str): - """Audit session status values.""" - DRAFT = "draft" - IN_PROGRESS = "in_progress" - COMPLETED = "completed" - ARCHIVED = "archived" - - -class CreateAuditSessionRequest(BaseModel): - """Request to create a new audit session.""" - name: str = Field(..., min_length=1, max_length=200) - description: Optional[str] = None - auditor_name: str = Field(..., min_length=1, max_length=100) - auditor_email: Optional[str] = None - auditor_organization: Optional[str] = None - regulation_codes: Optional[List[str]] = None # Filter by regulations - - -class UpdateAuditSessionRequest(BaseModel): - """Request to update an audit session.""" - name: Optional[str] = Field(None, min_length=1, max_length=200) - description: Optional[str] = None - status: Optional[str] = None - - -class AuditSessionSummary(BaseModel): - """Summary of an audit session for list views.""" - id: str - name: str - auditor_name: str - status: str - total_items: int - completed_items: int - completion_percentage: float - created_at: datetime - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class AuditSessionResponse(AuditSessionSummary): - """Full response for an audit session.""" - description: Optional[str] = None - auditor_email: Optional[str] = None - auditor_organization: Optional[str] = None - regulation_ids: Optional[List[str]] = None - compliant_count: int = 0 - non_compliant_count: int = 0 - updated_at: datetime - - class Config: - from_attributes = True - - -class AuditSessionListResponse(BaseModel): - """List response for audit sessions.""" - sessions: List[AuditSessionSummary] - total: int - - -class AuditSessionDetailResponse(AuditSessionResponse): - """Detailed response including statistics breakdown.""" - statistics: Optional["AuditStatistics"] = None - - -class SignOffRequest(BaseModel): - """Request to sign off a single requirement.""" - result: str = Field(..., description="Audit result: compliant, compliant_notes, non_compliant, not_applicable, pending") - notes: Optional[str] = None - sign: bool = Field(False, description="Whether to create digital signature") - - -class SignOffResponse(BaseModel): - """Response for a sign-off operation.""" - id: str - session_id: str - requirement_id: str - result: str - notes: Optional[str] = None - is_signed: bool - signature_hash: Optional[str] = None - signed_at: Optional[datetime] = None - signed_by: Optional[str] = None - created_at: datetime - updated_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class AuditChecklistItem(BaseModel): - """A single item in the audit checklist.""" - requirement_id: str - regulation_code: str - article: str - paragraph: Optional[str] = None - title: str - description: Optional[str] = None - - # Current audit state - current_result: str = "pending" # AuditResult - notes: Optional[str] = None - is_signed: bool = False - signed_at: Optional[datetime] = None - signed_by: Optional[str] = None - - # Context info - evidence_count: int = 0 - controls_mapped: int = 0 - implementation_status: Optional[str] = None - - # Priority - priority: int = 2 - - -class AuditStatistics(BaseModel): - """Statistics for an audit session.""" - total: int - compliant: int - compliant_with_notes: int - non_compliant: int - not_applicable: int - pending: int - completion_percentage: float - - -class AuditChecklistResponse(BaseModel): - """Response for audit checklist endpoint.""" - session: AuditSessionSummary - items: List[AuditChecklistItem] - pagination: PaginationMeta - statistics: AuditStatistics - - -class AuditChecklistFilterRequest(BaseModel): - """Filter options for audit checklist.""" - regulation_code: Optional[str] = None - result_filter: Optional[str] = None # "pending", "compliant", "non_compliant", etc. - search: Optional[str] = None - signed_only: bool = False - - -# ============================================================================ -# Report Generation Schemas (Phase 3 - Sprint 3) -# ============================================================================ - -class GenerateReportRequest(BaseModel): - """Request to generate an audit report.""" - session_id: str - report_type: str = "full" # "full", "summary", "non_compliant_only" - include_evidence: bool = True - include_signatures: bool = True - language: str = "de" # "de" or "en" - - -class ReportGenerationResponse(BaseModel): - """Response for report generation.""" - report_id: str - session_id: str - status: str # "pending", "generating", "completed", "failed" - report_type: str - file_path: Optional[str] = None - file_size_bytes: Optional[int] = None - created_at: datetime - completed_at: Optional[datetime] = None - error_message: Optional[str] = None - - -class ReportDownloadResponse(BaseModel): - """Response for report download.""" - report_id: str - filename: str - mime_type: str - file_size_bytes: int - download_url: str - - -# ============================================================================ -# ISO 27001 ISMS Schemas (Kapitel 4-10) -# ============================================================================ - -# --- Enums --- - -class ApprovalStatus(str): - DRAFT = "draft" - UNDER_REVIEW = "under_review" - APPROVED = "approved" - SUPERSEDED = "superseded" - - -class FindingType(str): - MAJOR = "major" - MINOR = "minor" - OFI = "ofi" - POSITIVE = "positive" - - -class FindingStatus(str): - OPEN = "open" - IN_PROGRESS = "in_progress" - CAPA_PENDING = "capa_pending" - VERIFICATION_PENDING = "verification_pending" - VERIFIED = "verified" - CLOSED = "closed" - - -class CAPAType(str): - CORRECTIVE = "corrective" - PREVENTIVE = "preventive" - BOTH = "both" - - -# --- ISMS Scope (ISO 27001 4.3) --- - -class ISMSScopeBase(BaseModel): - """Base schema for ISMS Scope.""" - scope_statement: str - included_locations: Optional[List[str]] = None - included_processes: Optional[List[str]] = None - included_services: Optional[List[str]] = None - excluded_items: Optional[List[str]] = None - exclusion_justification: Optional[str] = None - organizational_boundary: Optional[str] = None - physical_boundary: Optional[str] = None - technical_boundary: Optional[str] = None - - -class ISMSScopeCreate(ISMSScopeBase): - """Schema for creating ISMS Scope.""" - pass - - -class ISMSScopeUpdate(BaseModel): - """Schema for updating ISMS Scope.""" - scope_statement: Optional[str] = None - included_locations: Optional[List[str]] = None - included_processes: Optional[List[str]] = None - included_services: Optional[List[str]] = None - excluded_items: Optional[List[str]] = None - exclusion_justification: Optional[str] = None - organizational_boundary: Optional[str] = None - physical_boundary: Optional[str] = None - technical_boundary: Optional[str] = None - - -class ISMSScopeResponse(ISMSScopeBase): - """Response schema for ISMS Scope.""" - id: str - version: str - status: str - approved_by: Optional[str] = None - approved_at: Optional[datetime] = None - effective_date: Optional[date] = None - review_date: Optional[date] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class ISMSScopeApproveRequest(BaseModel): - """Request to approve ISMS Scope.""" - approved_by: str - effective_date: date - review_date: date - - -# --- ISMS Context (ISO 27001 4.1, 4.2) --- - -class ContextIssue(BaseModel): - """Single context issue.""" - issue: str - impact: str - treatment: Optional[str] = None - - -class InterestedParty(BaseModel): - """Single interested party.""" - party: str - requirements: List[str] - relevance: str - - -class ISMSContextBase(BaseModel): - """Base schema for ISMS Context.""" - internal_issues: Optional[List[ContextIssue]] = None - external_issues: Optional[List[ContextIssue]] = None - interested_parties: Optional[List[InterestedParty]] = None - regulatory_requirements: Optional[List[str]] = None - contractual_requirements: Optional[List[str]] = None - swot_strengths: Optional[List[str]] = None - swot_weaknesses: Optional[List[str]] = None - swot_opportunities: Optional[List[str]] = None - swot_threats: Optional[List[str]] = None - - -class ISMSContextCreate(ISMSContextBase): - """Schema for creating ISMS Context.""" - pass - - -class ISMSContextResponse(ISMSContextBase): - """Response schema for ISMS Context.""" - id: str - version: str - status: str - approved_by: Optional[str] = None - approved_at: Optional[datetime] = None - last_reviewed_at: Optional[datetime] = None - next_review_date: Optional[date] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -# --- ISMS Policies (ISO 27001 5.2) --- - -class ISMSPolicyBase(BaseModel): - """Base schema for ISMS Policy.""" - policy_id: str - title: str - policy_type: str # "master", "operational", "technical" - description: Optional[str] = None - policy_text: str - applies_to: Optional[List[str]] = None - review_frequency_months: int = 12 - related_controls: Optional[List[str]] = None - - -class ISMSPolicyCreate(ISMSPolicyBase): - """Schema for creating ISMS Policy.""" - authored_by: str - - -class ISMSPolicyUpdate(BaseModel): - """Schema for updating ISMS Policy.""" - title: Optional[str] = None - description: Optional[str] = None - policy_text: Optional[str] = None - applies_to: Optional[List[str]] = None - review_frequency_months: Optional[int] = None - related_controls: Optional[List[str]] = None - - -class ISMSPolicyResponse(ISMSPolicyBase): - """Response schema for ISMS Policy.""" - id: str - version: str - status: str - authored_by: Optional[str] = None - reviewed_by: Optional[str] = None - approved_by: Optional[str] = None - approved_at: Optional[datetime] = None - effective_date: Optional[date] = None - next_review_date: Optional[date] = None - document_path: Optional[str] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class ISMSPolicyListResponse(BaseModel): - """List response for ISMS Policies.""" - policies: List[ISMSPolicyResponse] - total: int - - -class ISMSPolicyApproveRequest(BaseModel): - """Request to approve ISMS Policy.""" - reviewed_by: str - approved_by: str - effective_date: date - - -# --- Security Objectives (ISO 27001 6.2) --- - -class SecurityObjectiveBase(BaseModel): - """Base schema for Security Objective.""" - objective_id: str - title: str - description: Optional[str] = None - category: str # "availability", "confidentiality", "integrity", "compliance" - specific: Optional[str] = None - measurable: Optional[str] = None - achievable: Optional[str] = None - relevant: Optional[str] = None - time_bound: Optional[str] = None - kpi_name: Optional[str] = None - kpi_target: Optional[str] = None - kpi_unit: Optional[str] = None - measurement_frequency: Optional[str] = None - owner: Optional[str] = None - target_date: Optional[date] = None - related_controls: Optional[List[str]] = None - related_risks: Optional[List[str]] = None - - -class SecurityObjectiveCreate(SecurityObjectiveBase): - """Schema for creating Security Objective.""" - pass - - -class SecurityObjectiveUpdate(BaseModel): - """Schema for updating Security Objective.""" - title: Optional[str] = None - description: Optional[str] = None - kpi_current: Optional[str] = None - progress_percentage: Optional[int] = None - status: Optional[str] = None - - -class SecurityObjectiveResponse(SecurityObjectiveBase): - """Response schema for Security Objective.""" - id: str - kpi_current: Optional[str] = None - status: str - progress_percentage: int - achieved_date: Optional[date] = None - approved_by: Optional[str] = None - approved_at: Optional[datetime] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class SecurityObjectiveListResponse(BaseModel): - """List response for Security Objectives.""" - objectives: List[SecurityObjectiveResponse] - total: int - - -# --- Statement of Applicability (SoA) --- - -class SoAEntryBase(BaseModel): - """Base schema for SoA Entry.""" - annex_a_control: str # e.g., "A.5.1" - annex_a_title: str - annex_a_category: Optional[str] = None - is_applicable: bool - applicability_justification: str - implementation_status: str = "planned" - implementation_notes: Optional[str] = None - breakpilot_control_ids: Optional[List[str]] = None - coverage_level: str = "full" - evidence_description: Optional[str] = None - risk_assessment_notes: Optional[str] = None - compensating_controls: Optional[str] = None - - -class SoAEntryCreate(SoAEntryBase): - """Schema for creating SoA Entry.""" - pass - - -class SoAEntryUpdate(BaseModel): - """Schema for updating SoA Entry.""" - is_applicable: Optional[bool] = None - applicability_justification: Optional[str] = None - implementation_status: Optional[str] = None - implementation_notes: Optional[str] = None - breakpilot_control_ids: Optional[List[str]] = None - coverage_level: Optional[str] = None - evidence_description: Optional[str] = None - - -class SoAEntryResponse(SoAEntryBase): - """Response schema for SoA Entry.""" - id: str - evidence_ids: Optional[List[str]] = None - reviewed_by: Optional[str] = None - reviewed_at: Optional[datetime] = None - approved_by: Optional[str] = None - approved_at: Optional[datetime] = None - version: str - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class SoAListResponse(BaseModel): - """List response for SoA.""" - entries: List[SoAEntryResponse] - total: int - applicable_count: int - not_applicable_count: int - implemented_count: int - planned_count: int - - -class SoAApproveRequest(BaseModel): - """Request to approve SoA entry.""" - reviewed_by: str - approved_by: str - - -# --- Audit Findings (Major/Minor/OFI) --- - -class AuditFindingBase(BaseModel): - """Base schema for Audit Finding.""" - finding_type: str # "major", "minor", "ofi", "positive" - iso_chapter: Optional[str] = None - annex_a_control: Optional[str] = None - title: str - description: str - objective_evidence: str - impact_description: Optional[str] = None - affected_processes: Optional[List[str]] = None - affected_assets: Optional[List[str]] = None - owner: Optional[str] = None - due_date: Optional[date] = None - - -class AuditFindingCreate(AuditFindingBase): - """Schema for creating Audit Finding.""" - audit_session_id: Optional[str] = None - internal_audit_id: Optional[str] = None - auditor: str - - -class AuditFindingUpdate(BaseModel): - """Schema for updating Audit Finding.""" - title: Optional[str] = None - description: Optional[str] = None - root_cause: Optional[str] = None - root_cause_method: Optional[str] = None - owner: Optional[str] = None - due_date: Optional[date] = None - status: Optional[str] = None - - -class AuditFindingResponse(AuditFindingBase): - """Response schema for Audit Finding.""" - id: str - finding_id: str - audit_session_id: Optional[str] = None - internal_audit_id: Optional[str] = None - root_cause: Optional[str] = None - root_cause_method: Optional[str] = None - status: str - auditor: Optional[str] = None - identified_date: date - closed_date: Optional[date] = None - verification_method: Optional[str] = None - verified_by: Optional[str] = None - verified_at: Optional[datetime] = None - closure_notes: Optional[str] = None - closed_by: Optional[str] = None - is_blocking: bool - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class AuditFindingListResponse(BaseModel): - """List response for Audit Findings.""" - findings: List[AuditFindingResponse] - total: int - major_count: int - minor_count: int - ofi_count: int - open_count: int - - -class AuditFindingCloseRequest(BaseModel): - """Request to close an Audit Finding.""" - closure_notes: str - closed_by: str - verification_method: str - verification_evidence: str - - -# --- Corrective Actions (CAPA) --- - -class CorrectiveActionBase(BaseModel): - """Base schema for Corrective Action.""" - capa_type: str # "corrective", "preventive", "both" - title: str - description: str - expected_outcome: Optional[str] = None - assigned_to: str - planned_completion: date - effectiveness_criteria: Optional[str] = None - estimated_effort_hours: Optional[int] = None - resources_required: Optional[str] = None - - -class CorrectiveActionCreate(CorrectiveActionBase): - """Schema for creating Corrective Action.""" - finding_id: str - planned_start: Optional[date] = None - - -class CorrectiveActionUpdate(BaseModel): - """Schema for updating Corrective Action.""" - title: Optional[str] = None - description: Optional[str] = None - assigned_to: Optional[str] = None - planned_completion: Optional[date] = None - status: Optional[str] = None - progress_percentage: Optional[int] = None - implementation_evidence: Optional[str] = None - - -class CorrectiveActionResponse(CorrectiveActionBase): - """Response schema for Corrective Action.""" - id: str - capa_id: str - finding_id: str - planned_start: Optional[date] = None - actual_completion: Optional[date] = None - status: str - progress_percentage: int - approved_by: Optional[str] = None - actual_effort_hours: Optional[int] = None - implementation_evidence: Optional[str] = None - evidence_ids: Optional[List[str]] = None - effectiveness_verified: bool - effectiveness_verification_date: Optional[date] = None - effectiveness_notes: Optional[str] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class CorrectiveActionListResponse(BaseModel): - """List response for Corrective Actions.""" - actions: List[CorrectiveActionResponse] - total: int - - -class CAPAVerifyRequest(BaseModel): - """Request to verify CAPA effectiveness.""" - verified_by: str - effectiveness_notes: str - is_effective: bool - - -# --- Management Review (ISO 27001 9.3) --- - -class ReviewAttendee(BaseModel): - """Single attendee in management review.""" - name: str - role: str - - -class ReviewActionItem(BaseModel): - """Single action item from management review.""" - action: str - owner: str - due_date: date - - -class ManagementReviewBase(BaseModel): - """Base schema for Management Review.""" - title: str - review_date: date - review_period_start: Optional[date] = None - review_period_end: Optional[date] = None - chairperson: str - attendees: Optional[List[ReviewAttendee]] = None - - -class ManagementReviewCreate(ManagementReviewBase): - """Schema for creating Management Review.""" - pass - - -class ManagementReviewUpdate(BaseModel): - """Schema for updating Management Review.""" - # Inputs (9.3) - input_previous_actions: Optional[str] = None - input_isms_changes: Optional[str] = None - input_security_performance: Optional[str] = None - input_interested_party_feedback: Optional[str] = None - input_risk_assessment_results: Optional[str] = None - input_improvement_opportunities: Optional[str] = None - input_policy_effectiveness: Optional[str] = None - input_objective_achievement: Optional[str] = None - input_resource_adequacy: Optional[str] = None - # Outputs (9.3) - output_improvement_decisions: Optional[str] = None - output_isms_changes: Optional[str] = None - output_resource_needs: Optional[str] = None - # Action items - action_items: Optional[List[ReviewActionItem]] = None - # Assessment - isms_effectiveness_rating: Optional[str] = None - key_decisions: Optional[str] = None - status: Optional[str] = None - - -class ManagementReviewResponse(ManagementReviewBase): - """Response schema for Management Review.""" - id: str - review_id: str - input_previous_actions: Optional[str] = None - input_isms_changes: Optional[str] = None - input_security_performance: Optional[str] = None - input_interested_party_feedback: Optional[str] = None - input_risk_assessment_results: Optional[str] = None - input_improvement_opportunities: Optional[str] = None - input_policy_effectiveness: Optional[str] = None - input_objective_achievement: Optional[str] = None - input_resource_adequacy: Optional[str] = None - output_improvement_decisions: Optional[str] = None - output_isms_changes: Optional[str] = None - output_resource_needs: Optional[str] = None - action_items: Optional[List[ReviewActionItem]] = None - isms_effectiveness_rating: Optional[str] = None - key_decisions: Optional[str] = None - status: str - approved_by: Optional[str] = None - approved_at: Optional[datetime] = None - minutes_document_path: Optional[str] = None - next_review_date: Optional[date] = None - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class ManagementReviewListResponse(BaseModel): - """List response for Management Reviews.""" - reviews: List[ManagementReviewResponse] - total: int - - -class ManagementReviewApproveRequest(BaseModel): - """Request to approve Management Review.""" - approved_by: str - next_review_date: date - minutes_document_path: Optional[str] = None - - -# --- Internal Audit (ISO 27001 9.2) --- - -class InternalAuditBase(BaseModel): - """Base schema for Internal Audit.""" - title: str - audit_type: str # "scheduled", "surveillance", "special" - scope_description: str - iso_chapters_covered: Optional[List[str]] = None - annex_a_controls_covered: Optional[List[str]] = None - processes_covered: Optional[List[str]] = None - departments_covered: Optional[List[str]] = None - criteria: Optional[str] = None - planned_date: date - lead_auditor: str - audit_team: Optional[List[str]] = None - - -class InternalAuditCreate(InternalAuditBase): - """Schema for creating Internal Audit.""" - pass - - -class InternalAuditUpdate(BaseModel): - """Schema for updating Internal Audit.""" - title: Optional[str] = None - scope_description: Optional[str] = None - actual_start_date: Optional[date] = None - actual_end_date: Optional[date] = None - auditee_representatives: Optional[List[str]] = None - status: Optional[str] = None - audit_conclusion: Optional[str] = None - overall_assessment: Optional[str] = None - - -class InternalAuditResponse(InternalAuditBase): - """Response schema for Internal Audit.""" - id: str - audit_id: str - actual_start_date: Optional[date] = None - actual_end_date: Optional[date] = None - auditee_representatives: Optional[List[str]] = None - status: str - total_findings: int - major_findings: int - minor_findings: int - ofi_count: int - positive_observations: int - audit_conclusion: Optional[str] = None - overall_assessment: Optional[str] = None - report_date: Optional[date] = None - report_document_path: Optional[str] = None - report_approved_by: Optional[str] = None - report_approved_at: Optional[datetime] = None - follow_up_audit_required: bool - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class InternalAuditListResponse(BaseModel): - """List response for Internal Audits.""" - audits: List[InternalAuditResponse] - total: int - - -class InternalAuditCompleteRequest(BaseModel): - """Request to complete Internal Audit.""" - audit_conclusion: str - overall_assessment: str # "conforming", "minor_nc", "major_nc" - follow_up_audit_required: bool - - -# --- ISMS Readiness Check --- - -class PotentialFinding(BaseModel): - """Potential finding from readiness check.""" - check: str - status: str # "pass", "fail", "warning" - recommendation: str - iso_reference: Optional[str] = None - - -class ISMSReadinessCheckResponse(BaseModel): - """Response for ISMS Readiness Check.""" - id: str - check_date: datetime - triggered_by: Optional[str] = None - overall_status: str # "ready", "at_risk", "not_ready" - certification_possible: bool - # Chapter statuses - chapter_4_status: Optional[str] = None - chapter_5_status: Optional[str] = None - chapter_6_status: Optional[str] = None - chapter_7_status: Optional[str] = None - chapter_8_status: Optional[str] = None - chapter_9_status: Optional[str] = None - chapter_10_status: Optional[str] = None - # Findings - potential_majors: List[PotentialFinding] - potential_minors: List[PotentialFinding] - improvement_opportunities: List[PotentialFinding] - # Scores - readiness_score: float - documentation_score: Optional[float] = None - implementation_score: Optional[float] = None - evidence_score: Optional[float] = None - # Priority actions - priority_actions: List[str] - - class Config: - from_attributes = True - - -class ISMSReadinessCheckRequest(BaseModel): - """Request to run ISMS Readiness Check.""" - triggered_by: str = "manual" - - -# --- Audit Trail --- - -class AuditTrailEntry(BaseModel): - """Single audit trail entry.""" - id: str - entity_type: str - entity_id: str - entity_name: Optional[str] = None - action: str - field_changed: Optional[str] = None - old_value: Optional[str] = None - new_value: Optional[str] = None - change_summary: Optional[str] = None - performed_by: str - performed_at: datetime - - class Config: - from_attributes = True - - -class AuditTrailResponse(BaseModel): - """Response for Audit Trail query.""" - entries: List[AuditTrailEntry] - total: int - pagination: PaginationMeta - - -# --- ISO 27001 Chapter Status Overview --- - -class ISO27001ChapterStatus(BaseModel): - """Status of a single ISO 27001 chapter.""" - chapter: str - title: str - status: str # "compliant", "partial", "non_compliant", "not_started" - completion_percentage: float - open_findings: int - key_documents: List[str] - last_reviewed: Optional[datetime] = None - - -class ISO27001OverviewResponse(BaseModel): - """Complete ISO 27001 status overview.""" - overall_status: str # "ready", "at_risk", "not_ready", "not_started" - certification_readiness: float # 0-100 - chapters: List[ISO27001ChapterStatus] - scope_approved: bool - soa_approved: bool - last_management_review: Optional[datetime] = None - last_internal_audit: Optional[datetime] = None - open_major_findings: int - open_minor_findings: int - policies_count: int - policies_approved: int - objectives_count: int - objectives_achieved: int - - -# ============================================================================ -# VVT Schemas — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO) -# ============================================================================ - -class VVTOrganizationUpdate(BaseModel): - organization_name: Optional[str] = None - industry: Optional[str] = None - locations: Optional[List[str]] = None - employee_count: Optional[int] = None - dpo_name: Optional[str] = None - dpo_contact: Optional[str] = None - vvt_version: Optional[str] = None - last_review_date: Optional[date] = None - next_review_date: Optional[date] = None - review_interval: Optional[str] = None - - -class VVTOrganizationResponse(BaseModel): - id: str - organization_name: str - industry: Optional[str] = None - locations: List[Any] = [] - employee_count: Optional[int] = None - dpo_name: Optional[str] = None - dpo_contact: Optional[str] = None - vvt_version: str = '1.0' - last_review_date: Optional[date] = None - next_review_date: Optional[date] = None - review_interval: str = 'annual' - created_at: datetime - updated_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class VVTActivityCreate(BaseModel): - vvt_id: str - name: str - description: Optional[str] = None - purposes: List[str] = [] - legal_bases: List[str] = [] - data_subject_categories: List[str] = [] - personal_data_categories: List[str] = [] - recipient_categories: List[str] = [] - third_country_transfers: List[Any] = [] - retention_period: Dict[str, Any] = {} - tom_description: Optional[str] = None - business_function: Optional[str] = None - systems: List[str] = [] - deployment_model: Optional[str] = None - data_sources: List[Any] = [] - data_flows: List[Any] = [] - protection_level: str = 'MEDIUM' - dpia_required: bool = False - structured_toms: Dict[str, Any] = {} - status: str = 'DRAFT' - responsible: Optional[str] = None - owner: Optional[str] = None - last_reviewed_at: Optional[datetime] = None - next_review_at: Optional[datetime] = None - created_by: Optional[str] = None - dsfa_id: Optional[str] = None - - -class VVTActivityUpdate(BaseModel): - name: Optional[str] = None - description: Optional[str] = None - purposes: Optional[List[str]] = None - legal_bases: Optional[List[str]] = None - data_subject_categories: Optional[List[str]] = None - personal_data_categories: Optional[List[str]] = None - recipient_categories: Optional[List[str]] = None - third_country_transfers: Optional[List[Any]] = None - retention_period: Optional[Dict[str, Any]] = None - tom_description: Optional[str] = None - business_function: Optional[str] = None - systems: Optional[List[str]] = None - deployment_model: Optional[str] = None - data_sources: Optional[List[Any]] = None - data_flows: Optional[List[Any]] = None - protection_level: Optional[str] = None - dpia_required: Optional[bool] = None - structured_toms: Optional[Dict[str, Any]] = None - status: Optional[str] = None - responsible: Optional[str] = None - owner: Optional[str] = None - last_reviewed_at: Optional[datetime] = None - next_review_at: Optional[datetime] = None - created_by: Optional[str] = None - dsfa_id: Optional[str] = None - - -class VVTActivityResponse(BaseModel): - id: str - vvt_id: str - name: str - description: Optional[str] = None - purposes: List[Any] = [] - legal_bases: List[Any] = [] - data_subject_categories: List[Any] = [] - personal_data_categories: List[Any] = [] - recipient_categories: List[Any] = [] - third_country_transfers: List[Any] = [] - retention_period: Dict[str, Any] = {} - tom_description: Optional[str] = None - business_function: Optional[str] = None - systems: List[Any] = [] - deployment_model: Optional[str] = None - data_sources: List[Any] = [] - data_flows: List[Any] = [] - protection_level: str = 'MEDIUM' - dpia_required: bool = False - structured_toms: Dict[str, Any] = {} - status: str = 'DRAFT' - responsible: Optional[str] = None - owner: Optional[str] = None - last_reviewed_at: Optional[datetime] = None - next_review_at: Optional[datetime] = None - created_by: Optional[str] = None - dsfa_id: Optional[str] = None - created_at: datetime - updated_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class VVTStatsResponse(BaseModel): - total: int - by_status: Dict[str, int] - by_business_function: Dict[str, int] - dpia_required_count: int - third_country_count: int - draft_count: int - approved_count: int - overdue_review_count: int = 0 - - -class VVTAuditLogEntry(BaseModel): - id: str - action: str - entity_type: str - entity_id: Optional[str] = None - changed_by: Optional[str] = None - old_values: Optional[Dict[str, Any]] = None - new_values: Optional[Dict[str, Any]] = None - created_at: datetime - - class Config: - from_attributes = True - - -# ============================================================================ -# TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO) -# ============================================================================ - -class TOMStateResponse(BaseModel): - tenant_id: str - state: Dict[str, Any] = {} - version: int = 0 - last_modified: Optional[datetime] = None - is_new: bool = False - - -class TOMMeasureResponse(BaseModel): - id: str - tenant_id: str - control_id: str - name: str - description: Optional[str] = None - category: str - type: str - applicability: str = "REQUIRED" - applicability_reason: Optional[str] = None - implementation_status: str = "NOT_IMPLEMENTED" - responsible_person: Optional[str] = None - responsible_department: Optional[str] = None - implementation_date: Optional[datetime] = None - review_date: Optional[datetime] = None - review_frequency: Optional[str] = None - priority: Optional[str] = None - complexity: Optional[str] = None - linked_evidence: List[Any] = [] - evidence_gaps: List[Any] = [] - related_controls: Dict[str, Any] = {} - verified_at: Optional[datetime] = None - verified_by: Optional[str] = None - effectiveness_rating: Optional[str] = None - created_by: Optional[str] = None - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class TOMStatsResponse(BaseModel): - total: int = 0 - by_status: Dict[str, int] = {} - by_category: Dict[str, int] = {} - overdue_review_count: int = 0 - implemented: int = 0 - partial: int = 0 - not_implemented: int = 0 +from compliance.schemas.common import * # noqa: F401,F403 +from compliance.schemas.regulation import * # noqa: F401,F403 +from compliance.schemas.requirement import * # noqa: F401,F403 +from compliance.schemas.control import * # noqa: F401,F403 +from compliance.schemas.evidence import * # noqa: F401,F403 +from compliance.schemas.risk import * # noqa: F401,F403 +from compliance.schemas.ai_system import * # noqa: F401,F403 +from compliance.schemas.dashboard import * # noqa: F401,F403 +from compliance.schemas.service_module import * # noqa: F401,F403 +from compliance.schemas.bsi import * # noqa: F401,F403 +from compliance.schemas.audit_session import * # noqa: F401,F403 +from compliance.schemas.report import * # noqa: F401,F403 +from compliance.schemas.isms_governance import * # noqa: F401,F403 +from compliance.schemas.isms_audit import * # noqa: F401,F403 +from compliance.schemas.vvt import * # noqa: F401,F403 +from compliance.schemas.tom import * # noqa: F401,F403 diff --git a/backend-compliance/compliance/schemas/ai_system.py b/backend-compliance/compliance/schemas/ai_system.py new file mode 100644 index 0000000..14b893a --- /dev/null +++ b/backend-compliance/compliance/schemas/ai_system.py @@ -0,0 +1,63 @@ +""" +AI System (AI Act) Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# AI System Schemas (AI Act Compliance) +# ============================================================================ + +class AISystemBase(BaseModel): + name: str + description: Optional[str] = None + purpose: Optional[str] = None + sector: Optional[str] = None + classification: str = "unclassified" + status: str = "draft" + obligations: Optional[List[str]] = None + + +class AISystemCreate(AISystemBase): + pass + + +class AISystemUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + purpose: Optional[str] = None + sector: Optional[str] = None + classification: Optional[str] = None + status: Optional[str] = None + obligations: Optional[List[str]] = None + + +class AISystemResponse(AISystemBase): + id: str + assessment_date: Optional[datetime] = None + assessment_result: Optional[Dict[str, Any]] = None + risk_factors: Optional[List[Dict[str, Any]]] = None + recommendations: Optional[List[str]] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class AISystemListResponse(BaseModel): + systems: List[AISystemResponse] + total: int + diff --git a/backend-compliance/compliance/schemas/audit_session.py b/backend-compliance/compliance/schemas/audit_session.py new file mode 100644 index 0000000..b3c0e5c --- /dev/null +++ b/backend-compliance/compliance/schemas/audit_session.py @@ -0,0 +1,172 @@ +""" +Audit Session Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Audit Session & Sign-off Schemas (Phase 3 - Sprint 3) +# ============================================================================ + +class AuditResult(str): + """Audit result values for sign-off.""" + COMPLIANT = "compliant" + COMPLIANT_WITH_NOTES = "compliant_notes" + NON_COMPLIANT = "non_compliant" + NOT_APPLICABLE = "not_applicable" + PENDING = "pending" + + +class AuditSessionStatus(str): + """Audit session status values.""" + DRAFT = "draft" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + ARCHIVED = "archived" + + +class CreateAuditSessionRequest(BaseModel): + """Request to create a new audit session.""" + name: str = Field(..., min_length=1, max_length=200) + description: Optional[str] = None + auditor_name: str = Field(..., min_length=1, max_length=100) + auditor_email: Optional[str] = None + auditor_organization: Optional[str] = None + regulation_codes: Optional[List[str]] = None # Filter by regulations + + +class UpdateAuditSessionRequest(BaseModel): + """Request to update an audit session.""" + name: Optional[str] = Field(None, min_length=1, max_length=200) + description: Optional[str] = None + status: Optional[str] = None + + +class AuditSessionSummary(BaseModel): + """Summary of an audit session for list views.""" + id: str + name: str + auditor_name: str + status: str + total_items: int + completed_items: int + completion_percentage: float + created_at: datetime + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + + +class AuditSessionResponse(AuditSessionSummary): + """Full response for an audit session.""" + description: Optional[str] = None + auditor_email: Optional[str] = None + auditor_organization: Optional[str] = None + regulation_ids: Optional[List[str]] = None + compliant_count: int = 0 + non_compliant_count: int = 0 + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class AuditSessionListResponse(BaseModel): + """List response for audit sessions.""" + sessions: List[AuditSessionSummary] + total: int + + +class AuditSessionDetailResponse(AuditSessionResponse): + """Detailed response including statistics breakdown.""" + statistics: Optional["AuditStatistics"] = None + + +class SignOffRequest(BaseModel): + """Request to sign off a single requirement.""" + result: str = Field(..., description="Audit result: compliant, compliant_notes, non_compliant, not_applicable, pending") + notes: Optional[str] = None + sign: bool = Field(False, description="Whether to create digital signature") + + +class SignOffResponse(BaseModel): + """Response for a sign-off operation.""" + id: str + session_id: str + requirement_id: str + result: str + notes: Optional[str] = None + is_signed: bool + signature_hash: Optional[str] = None + signed_at: Optional[datetime] = None + signed_by: Optional[str] = None + created_at: datetime + updated_at: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + + +class AuditChecklistItem(BaseModel): + """A single item in the audit checklist.""" + requirement_id: str + regulation_code: str + article: str + paragraph: Optional[str] = None + title: str + description: Optional[str] = None + + # Current audit state + current_result: str = "pending" # AuditResult + notes: Optional[str] = None + is_signed: bool = False + signed_at: Optional[datetime] = None + signed_by: Optional[str] = None + + # Context info + evidence_count: int = 0 + controls_mapped: int = 0 + implementation_status: Optional[str] = None + + # Priority + priority: int = 2 + + +class AuditStatistics(BaseModel): + """Statistics for an audit session.""" + total: int + compliant: int + compliant_with_notes: int + non_compliant: int + not_applicable: int + pending: int + completion_percentage: float + + +class AuditChecklistResponse(BaseModel): + """Response for audit checklist endpoint.""" + session: AuditSessionSummary + items: List[AuditChecklistItem] + pagination: PaginationMeta + statistics: AuditStatistics + + +class AuditChecklistFilterRequest(BaseModel): + """Filter options for audit checklist.""" + regulation_code: Optional[str] = None + result_filter: Optional[str] = None # "pending", "compliant", "non_compliant", etc. + search: Optional[str] = None + signed_only: bool = False + diff --git a/backend-compliance/compliance/schemas/bsi.py b/backend-compliance/compliance/schemas/bsi.py new file mode 100644 index 0000000..66618d1 --- /dev/null +++ b/backend-compliance/compliance/schemas/bsi.py @@ -0,0 +1,58 @@ +""" +BSI / PDF Extraction Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# PDF Extraction Schemas +# ============================================================================ + +class BSIAspectResponse(BaseModel): + """A single extracted BSI-TR Pruefaspekt (test aspect).""" + aspect_id: str + title: str + full_text: str + category: str + page_number: int + section: str + requirement_level: str + source_document: str + keywords: Optional[List[str]] = None + related_aspects: Optional[List[str]] = None + + +class PDFExtractionRequest(BaseModel): + """Request for PDF extraction.""" + document_code: str = Field(..., description="BSI-TR document code, e.g. BSI-TR-03161-2") + save_to_db: bool = Field(True, description="Whether to save extracted requirements to database") + force: bool = Field(False, description="Force re-extraction even if requirements exist") + + +class PDFExtractionResponse(BaseModel): + """Response from PDF extraction endpoint.""" + # Simple endpoint format (new /pdf/extract/{doc_code}) + doc_code: Optional[str] = None + total_extracted: Optional[int] = None + saved_to_db: Optional[int] = None + aspects: Optional[List[BSIAspectResponse]] = None + # Legacy scraper endpoint format (/scraper/extract-pdf) + success: Optional[bool] = None + source_document: Optional[str] = None + total_aspects: Optional[int] = None + statistics: Optional[Dict[str, Any]] = None + requirements_created: Optional[int] = None + diff --git a/backend-compliance/compliance/schemas/common.py b/backend-compliance/compliance/schemas/common.py new file mode 100644 index 0000000..558cb9e --- /dev/null +++ b/backend-compliance/compliance/schemas/common.py @@ -0,0 +1,79 @@ +""" +Common (shared enums and pagination) Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + + +# ============================================================================ +# Enums as strings for API +# ============================================================================ + +class RegulationType(str): + EU_REGULATION = "eu_regulation" + EU_DIRECTIVE = "eu_directive" + DE_LAW = "de_law" + BSI_STANDARD = "bsi_standard" + INDUSTRY_STANDARD = "industry_standard" + + +class ControlType(str): + PREVENTIVE = "preventive" + DETECTIVE = "detective" + CORRECTIVE = "corrective" + + +class ControlDomain(str): + GOVERNANCE = "gov" + PRIVACY = "priv" + IAM = "iam" + CRYPTO = "crypto" + SDLC = "sdlc" + OPS = "ops" + AI = "ai" + CRA = "cra" + AUDIT = "aud" + + +class ControlStatus(str): + PASS = "pass" + PARTIAL = "partial" + FAIL = "fail" + NOT_APPLICABLE = "n/a" + PLANNED = "planned" + + +class RiskLevel(str): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class EvidenceStatus(str): + VALID = "valid" + EXPIRED = "expired" + PENDING = "pending" + FAILED = "failed" + + +# ============================================================================ +# Pagination Schemas (defined here, completed after Response classes) +# ============================================================================ + +class PaginationMeta(BaseModel): + """Pagination metadata for list responses.""" + page: int + page_size: int + total: int + total_pages: int + has_next: bool + has_prev: bool + diff --git a/backend-compliance/compliance/schemas/control.py b/backend-compliance/compliance/schemas/control.py new file mode 100644 index 0000000..eba1233 --- /dev/null +++ b/backend-compliance/compliance/schemas/control.py @@ -0,0 +1,119 @@ +""" +Control and ControlMapping Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Control Schemas +# ============================================================================ + +class ControlBase(BaseModel): + control_id: str + domain: str + control_type: str + title: str + description: Optional[str] = None + pass_criteria: str + implementation_guidance: Optional[str] = None + code_reference: Optional[str] = None + documentation_url: Optional[str] = None + is_automated: bool = False + automation_tool: Optional[str] = None + automation_config: Optional[Dict[str, Any]] = None + owner: Optional[str] = None + review_frequency_days: int = 90 + + +class ControlCreate(ControlBase): + pass + + +class ControlUpdate(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + pass_criteria: Optional[str] = None + implementation_guidance: Optional[str] = None + code_reference: Optional[str] = None + documentation_url: Optional[str] = None + is_automated: Optional[bool] = None + automation_tool: Optional[str] = None + automation_config: Optional[Dict[str, Any]] = None + owner: Optional[str] = None + status: Optional[str] = None + status_notes: Optional[str] = None + + +class ControlResponse(ControlBase): + id: str + status: str + status_notes: Optional[str] = None + last_reviewed_at: Optional[datetime] = None + next_review_at: Optional[datetime] = None + created_at: datetime + updated_at: datetime + evidence_count: Optional[int] = None + requirement_count: Optional[int] = None + + model_config = ConfigDict(from_attributes=True) + + +class ControlListResponse(BaseModel): + controls: List[ControlResponse] + total: int + + +class PaginatedControlResponse(BaseModel): + """Paginated response for controls - optimized for large datasets.""" + data: List[ControlResponse] + pagination: PaginationMeta + + +class ControlReviewRequest(BaseModel): + status: str + status_notes: Optional[str] = None + + +# ============================================================================ +# Control Mapping Schemas +# ============================================================================ + +class MappingBase(BaseModel): + requirement_id: str + control_id: str + coverage_level: str = "full" + notes: Optional[str] = None + + +class MappingCreate(MappingBase): + pass + + +class MappingResponse(MappingBase): + id: str + requirement_article: Optional[str] = None + requirement_title: Optional[str] = None + control_control_id: Optional[str] = None + control_title: Optional[str] = None + created_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class MappingListResponse(BaseModel): + mappings: List[MappingResponse] + total: int + diff --git a/backend-compliance/compliance/schemas/dashboard.py b/backend-compliance/compliance/schemas/dashboard.py new file mode 100644 index 0000000..4ef4cd1 --- /dev/null +++ b/backend-compliance/compliance/schemas/dashboard.py @@ -0,0 +1,195 @@ +""" +Dashboard, Export, Executive Dashboard Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) +from compliance.schemas.evidence import EvidenceResponse +from compliance.schemas.risk import RiskResponse + + +# ============================================================================ +# Dashboard & Export Schemas +# ============================================================================ + +class DashboardResponse(BaseModel): + compliance_score: float + total_regulations: int + total_requirements: int + total_controls: int + controls_by_status: Dict[str, int] + controls_by_domain: Dict[str, Dict[str, int]] + total_evidence: int + evidence_by_status: Dict[str, int] + total_risks: int + risks_by_level: Dict[str, int] + recent_activity: List[Dict[str, Any]] + + +class ExportRequest(BaseModel): + export_type: str = "full" # "full", "controls_only", "evidence_only" + included_regulations: Optional[List[str]] = None + included_domains: Optional[List[str]] = None + date_range_start: Optional[date] = None + date_range_end: Optional[date] = None + + +class ExportResponse(BaseModel): + id: str + export_type: str + export_name: Optional[str] = None + status: str + requested_by: str + requested_at: datetime + completed_at: Optional[datetime] = None + file_path: Optional[str] = None + file_hash: Optional[str] = None + file_size_bytes: Optional[int] = None + total_controls: Optional[int] = None + total_evidence: Optional[int] = None + compliance_score: Optional[float] = None + error_message: Optional[str] = None + + model_config = ConfigDict(from_attributes=True) + + +class ExportListResponse(BaseModel): + exports: List[ExportResponse] + total: int + + +# ============================================================================ +# Seeding Schemas +# ============================================================================ + +class SeedRequest(BaseModel): + force: bool = False + + +class SeedResponse(BaseModel): + success: bool + message: str + counts: Dict[str, int] + + +class PaginatedEvidenceResponse(BaseModel): + """Paginated response for evidence.""" + data: List[EvidenceResponse] + pagination: PaginationMeta + + +class PaginatedRiskResponse(BaseModel): + """Paginated response for risks.""" + data: List[RiskResponse] + pagination: PaginationMeta + + +# ============================================================================ +# Executive Dashboard Schemas (Phase 3 - Sprint 1) +# ============================================================================ + +class TrendDataPoint(BaseModel): + """A single data point for trend charts.""" + date: str # ISO date string + score: float + label: Optional[str] = None # Formatted date for display (e.g., "Jan 26") + + +class RiskSummary(BaseModel): + """Summary of a risk for executive display.""" + id: str + risk_id: str + title: str + risk_level: str # "low", "medium", "high", "critical" + owner: Optional[str] = None + status: str + category: str + impact: int + likelihood: int + + +class DeadlineItem(BaseModel): + """An upcoming deadline for executive display.""" + id: str + title: str + deadline: str # ISO date string + days_remaining: int + type: str # "control_review", "evidence_expiry", "audit" + status: str # "on_track", "at_risk", "overdue" + owner: Optional[str] = None + + +class TeamWorkloadItem(BaseModel): + """Workload distribution for a team or person.""" + name: str + pending_tasks: int + in_progress_tasks: int + completed_tasks: int + total_tasks: int + completion_rate: float + + +class ExecutiveDashboardResponse(BaseModel): + """ + Executive Dashboard Response + + Provides a high-level overview for managers and executives: + - Traffic light status (green/yellow/red) + - Overall compliance score + - 12-month trend data + - Top 5 risks + - Upcoming deadlines + - Team workload distribution + """ + traffic_light_status: str # "green", "yellow", "red" + overall_score: float + score_trend: List[TrendDataPoint] + previous_score: Optional[float] = None + score_change: Optional[float] = None # Positive = improvement + + # Counts + total_regulations: int + total_requirements: int + total_controls: int + open_risks: int + + # Top items + top_risks: List[RiskSummary] + upcoming_deadlines: List[DeadlineItem] + + # Workload + team_workload: List[TeamWorkloadItem] + + # Last updated + last_updated: str + + +class ComplianceSnapshotCreate(BaseModel): + """Request to create a compliance snapshot.""" + notes: Optional[str] = None + + +class ComplianceSnapshotResponse(BaseModel): + """Response for a compliance snapshot.""" + id: str + snapshot_date: str + overall_score: float + scores_by_regulation: Dict[str, float] + scores_by_domain: Dict[str, float] + total_controls: int + passed_controls: int + failed_controls: int + notes: Optional[str] = None + created_at: str + diff --git a/backend-compliance/compliance/schemas/evidence.py b/backend-compliance/compliance/schemas/evidence.py new file mode 100644 index 0000000..ef63d3f --- /dev/null +++ b/backend-compliance/compliance/schemas/evidence.py @@ -0,0 +1,66 @@ +""" +Evidence Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Evidence Schemas +# ============================================================================ + +class EvidenceBase(BaseModel): + control_id: str + evidence_type: str + title: str + description: Optional[str] = None + artifact_url: Optional[str] = None + valid_from: Optional[datetime] = None + valid_until: Optional[datetime] = None + source: Optional[str] = None + ci_job_id: Optional[str] = None + + +class EvidenceCreate(EvidenceBase): + pass + + +class EvidenceResponse(EvidenceBase): + id: str + artifact_path: Optional[str] = None + artifact_hash: Optional[str] = None + file_size_bytes: Optional[int] = None + mime_type: Optional[str] = None + status: str + uploaded_by: Optional[str] = None + collected_at: datetime + created_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class EvidenceListResponse(BaseModel): + evidence: List[EvidenceResponse] + total: int + + +class EvidenceCollectRequest(BaseModel): + """Request to auto-collect evidence from CI.""" + control_id: str + evidence_type: str + title: str + ci_job_id: str + artifact_url: str + diff --git a/backend-compliance/compliance/schemas/isms_audit.py b/backend-compliance/compliance/schemas/isms_audit.py new file mode 100644 index 0000000..755d67c --- /dev/null +++ b/backend-compliance/compliance/schemas/isms_audit.py @@ -0,0 +1,431 @@ +""" +ISMS Audit Execution (Findings, CAPA, Reviews, Internal Audit, Readiness) Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +class AuditFindingBase(BaseModel): + """Base schema for Audit Finding.""" + finding_type: str # "major", "minor", "ofi", "positive" + iso_chapter: Optional[str] = None + annex_a_control: Optional[str] = None + title: str + description: str + objective_evidence: str + impact_description: Optional[str] = None + affected_processes: Optional[List[str]] = None + affected_assets: Optional[List[str]] = None + owner: Optional[str] = None + due_date: Optional[date] = None + + +class AuditFindingCreate(AuditFindingBase): + """Schema for creating Audit Finding.""" + audit_session_id: Optional[str] = None + internal_audit_id: Optional[str] = None + auditor: str + + +class AuditFindingUpdate(BaseModel): + """Schema for updating Audit Finding.""" + title: Optional[str] = None + description: Optional[str] = None + root_cause: Optional[str] = None + root_cause_method: Optional[str] = None + owner: Optional[str] = None + due_date: Optional[date] = None + status: Optional[str] = None + + +class AuditFindingResponse(AuditFindingBase): + """Response schema for Audit Finding.""" + id: str + finding_id: str + audit_session_id: Optional[str] = None + internal_audit_id: Optional[str] = None + root_cause: Optional[str] = None + root_cause_method: Optional[str] = None + status: str + auditor: Optional[str] = None + identified_date: date + closed_date: Optional[date] = None + verification_method: Optional[str] = None + verified_by: Optional[str] = None + verified_at: Optional[datetime] = None + closure_notes: Optional[str] = None + closed_by: Optional[str] = None + is_blocking: bool + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class AuditFindingListResponse(BaseModel): + """List response for Audit Findings.""" + findings: List[AuditFindingResponse] + total: int + major_count: int + minor_count: int + ofi_count: int + open_count: int + + +class AuditFindingCloseRequest(BaseModel): + """Request to close an Audit Finding.""" + closure_notes: str + closed_by: str + verification_method: str + verification_evidence: str + + +# --- Corrective Actions (CAPA) --- + +class CorrectiveActionBase(BaseModel): + """Base schema for Corrective Action.""" + capa_type: str # "corrective", "preventive", "both" + title: str + description: str + expected_outcome: Optional[str] = None + assigned_to: str + planned_completion: date + effectiveness_criteria: Optional[str] = None + estimated_effort_hours: Optional[int] = None + resources_required: Optional[str] = None + + +class CorrectiveActionCreate(CorrectiveActionBase): + """Schema for creating Corrective Action.""" + finding_id: str + planned_start: Optional[date] = None + + +class CorrectiveActionUpdate(BaseModel): + """Schema for updating Corrective Action.""" + title: Optional[str] = None + description: Optional[str] = None + assigned_to: Optional[str] = None + planned_completion: Optional[date] = None + status: Optional[str] = None + progress_percentage: Optional[int] = None + implementation_evidence: Optional[str] = None + + +class CorrectiveActionResponse(CorrectiveActionBase): + """Response schema for Corrective Action.""" + id: str + capa_id: str + finding_id: str + planned_start: Optional[date] = None + actual_completion: Optional[date] = None + status: str + progress_percentage: int + approved_by: Optional[str] = None + actual_effort_hours: Optional[int] = None + implementation_evidence: Optional[str] = None + evidence_ids: Optional[List[str]] = None + effectiveness_verified: bool + effectiveness_verification_date: Optional[date] = None + effectiveness_notes: Optional[str] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class CorrectiveActionListResponse(BaseModel): + """List response for Corrective Actions.""" + actions: List[CorrectiveActionResponse] + total: int + + +class CAPAVerifyRequest(BaseModel): + """Request to verify CAPA effectiveness.""" + verified_by: str + effectiveness_notes: str + is_effective: bool + + +# --- Management Review (ISO 27001 9.3) --- + +class ReviewAttendee(BaseModel): + """Single attendee in management review.""" + name: str + role: str + + +class ReviewActionItem(BaseModel): + """Single action item from management review.""" + action: str + owner: str + due_date: date + + +class ManagementReviewBase(BaseModel): + """Base schema for Management Review.""" + title: str + review_date: date + review_period_start: Optional[date] = None + review_period_end: Optional[date] = None + chairperson: str + attendees: Optional[List[ReviewAttendee]] = None + + +class ManagementReviewCreate(ManagementReviewBase): + """Schema for creating Management Review.""" + pass + + +class ManagementReviewUpdate(BaseModel): + """Schema for updating Management Review.""" + # Inputs (9.3) + input_previous_actions: Optional[str] = None + input_isms_changes: Optional[str] = None + input_security_performance: Optional[str] = None + input_interested_party_feedback: Optional[str] = None + input_risk_assessment_results: Optional[str] = None + input_improvement_opportunities: Optional[str] = None + input_policy_effectiveness: Optional[str] = None + input_objective_achievement: Optional[str] = None + input_resource_adequacy: Optional[str] = None + # Outputs (9.3) + output_improvement_decisions: Optional[str] = None + output_isms_changes: Optional[str] = None + output_resource_needs: Optional[str] = None + # Action items + action_items: Optional[List[ReviewActionItem]] = None + # Assessment + isms_effectiveness_rating: Optional[str] = None + key_decisions: Optional[str] = None + status: Optional[str] = None + + +class ManagementReviewResponse(ManagementReviewBase): + """Response schema for Management Review.""" + id: str + review_id: str + input_previous_actions: Optional[str] = None + input_isms_changes: Optional[str] = None + input_security_performance: Optional[str] = None + input_interested_party_feedback: Optional[str] = None + input_risk_assessment_results: Optional[str] = None + input_improvement_opportunities: Optional[str] = None + input_policy_effectiveness: Optional[str] = None + input_objective_achievement: Optional[str] = None + input_resource_adequacy: Optional[str] = None + output_improvement_decisions: Optional[str] = None + output_isms_changes: Optional[str] = None + output_resource_needs: Optional[str] = None + action_items: Optional[List[ReviewActionItem]] = None + isms_effectiveness_rating: Optional[str] = None + key_decisions: Optional[str] = None + status: str + approved_by: Optional[str] = None + approved_at: Optional[datetime] = None + minutes_document_path: Optional[str] = None + next_review_date: Optional[date] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class ManagementReviewListResponse(BaseModel): + """List response for Management Reviews.""" + reviews: List[ManagementReviewResponse] + total: int + + +class ManagementReviewApproveRequest(BaseModel): + """Request to approve Management Review.""" + approved_by: str + next_review_date: date + minutes_document_path: Optional[str] = None + + +# --- Internal Audit (ISO 27001 9.2) --- + +class InternalAuditBase(BaseModel): + """Base schema for Internal Audit.""" + title: str + audit_type: str # "scheduled", "surveillance", "special" + scope_description: str + iso_chapters_covered: Optional[List[str]] = None + annex_a_controls_covered: Optional[List[str]] = None + processes_covered: Optional[List[str]] = None + departments_covered: Optional[List[str]] = None + criteria: Optional[str] = None + planned_date: date + lead_auditor: str + audit_team: Optional[List[str]] = None + + +class InternalAuditCreate(InternalAuditBase): + """Schema for creating Internal Audit.""" + pass + + +class InternalAuditUpdate(BaseModel): + """Schema for updating Internal Audit.""" + title: Optional[str] = None + scope_description: Optional[str] = None + actual_start_date: Optional[date] = None + actual_end_date: Optional[date] = None + auditee_representatives: Optional[List[str]] = None + status: Optional[str] = None + audit_conclusion: Optional[str] = None + overall_assessment: Optional[str] = None + + +class InternalAuditResponse(InternalAuditBase): + """Response schema for Internal Audit.""" + id: str + audit_id: str + actual_start_date: Optional[date] = None + actual_end_date: Optional[date] = None + auditee_representatives: Optional[List[str]] = None + status: str + total_findings: int + major_findings: int + minor_findings: int + ofi_count: int + positive_observations: int + audit_conclusion: Optional[str] = None + overall_assessment: Optional[str] = None + report_date: Optional[date] = None + report_document_path: Optional[str] = None + report_approved_by: Optional[str] = None + report_approved_at: Optional[datetime] = None + follow_up_audit_required: bool + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class InternalAuditListResponse(BaseModel): + """List response for Internal Audits.""" + audits: List[InternalAuditResponse] + total: int + + +class InternalAuditCompleteRequest(BaseModel): + """Request to complete Internal Audit.""" + audit_conclusion: str + overall_assessment: str # "conforming", "minor_nc", "major_nc" + follow_up_audit_required: bool + + +# --- ISMS Readiness Check --- + +class PotentialFinding(BaseModel): + """Potential finding from readiness check.""" + check: str + status: str # "pass", "fail", "warning" + recommendation: str + iso_reference: Optional[str] = None + + +class ISMSReadinessCheckResponse(BaseModel): + """Response for ISMS Readiness Check.""" + id: str + check_date: datetime + triggered_by: Optional[str] = None + overall_status: str # "ready", "at_risk", "not_ready" + certification_possible: bool + # Chapter statuses + chapter_4_status: Optional[str] = None + chapter_5_status: Optional[str] = None + chapter_6_status: Optional[str] = None + chapter_7_status: Optional[str] = None + chapter_8_status: Optional[str] = None + chapter_9_status: Optional[str] = None + chapter_10_status: Optional[str] = None + # Findings + potential_majors: List[PotentialFinding] + potential_minors: List[PotentialFinding] + improvement_opportunities: List[PotentialFinding] + # Scores + readiness_score: float + documentation_score: Optional[float] = None + implementation_score: Optional[float] = None + evidence_score: Optional[float] = None + # Priority actions + priority_actions: List[str] + + model_config = ConfigDict(from_attributes=True) + + +class ISMSReadinessCheckRequest(BaseModel): + """Request to run ISMS Readiness Check.""" + triggered_by: str = "manual" + + +# --- Audit Trail --- + +class AuditTrailEntry(BaseModel): + """Single audit trail entry.""" + id: str + entity_type: str + entity_id: str + entity_name: Optional[str] = None + action: str + field_changed: Optional[str] = None + old_value: Optional[str] = None + new_value: Optional[str] = None + change_summary: Optional[str] = None + performed_by: str + performed_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class AuditTrailResponse(BaseModel): + """Response for Audit Trail query.""" + entries: List[AuditTrailEntry] + total: int + pagination: PaginationMeta + + +# --- ISO 27001 Chapter Status Overview --- + +class ISO27001ChapterStatus(BaseModel): + """Status of a single ISO 27001 chapter.""" + chapter: str + title: str + status: str # "compliant", "partial", "non_compliant", "not_started" + completion_percentage: float + open_findings: int + key_documents: List[str] + last_reviewed: Optional[datetime] = None + + +class ISO27001OverviewResponse(BaseModel): + """Complete ISO 27001 status overview.""" + overall_status: str # "ready", "at_risk", "not_ready", "not_started" + certification_readiness: float # 0-100 + chapters: List[ISO27001ChapterStatus] + scope_approved: bool + soa_approved: bool + last_management_review: Optional[datetime] = None + last_internal_audit: Optional[datetime] = None + open_major_findings: int + open_minor_findings: int + policies_count: int + policies_approved: int + objectives_count: int + objectives_achieved: int + diff --git a/backend-compliance/compliance/schemas/isms_governance.py b/backend-compliance/compliance/schemas/isms_governance.py new file mode 100644 index 0000000..6959a22 --- /dev/null +++ b/backend-compliance/compliance/schemas/isms_governance.py @@ -0,0 +1,343 @@ +""" +ISMS Governance (Scope, Context, Policy, Objective, SoA) Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# ISO 27001 ISMS Schemas (Kapitel 4-10) +# ============================================================================ + +# --- Enums --- + +class ApprovalStatus(str): + DRAFT = "draft" + UNDER_REVIEW = "under_review" + APPROVED = "approved" + SUPERSEDED = "superseded" + + +class FindingType(str): + MAJOR = "major" + MINOR = "minor" + OFI = "ofi" + POSITIVE = "positive" + + +class FindingStatus(str): + OPEN = "open" + IN_PROGRESS = "in_progress" + CAPA_PENDING = "capa_pending" + VERIFICATION_PENDING = "verification_pending" + VERIFIED = "verified" + CLOSED = "closed" + + +class CAPAType(str): + CORRECTIVE = "corrective" + PREVENTIVE = "preventive" + BOTH = "both" + + +# --- ISMS Scope (ISO 27001 4.3) --- + +class ISMSScopeBase(BaseModel): + """Base schema for ISMS Scope.""" + scope_statement: str + included_locations: Optional[List[str]] = None + included_processes: Optional[List[str]] = None + included_services: Optional[List[str]] = None + excluded_items: Optional[List[str]] = None + exclusion_justification: Optional[str] = None + organizational_boundary: Optional[str] = None + physical_boundary: Optional[str] = None + technical_boundary: Optional[str] = None + + +class ISMSScopeCreate(ISMSScopeBase): + """Schema for creating ISMS Scope.""" + pass + + +class ISMSScopeUpdate(BaseModel): + """Schema for updating ISMS Scope.""" + scope_statement: Optional[str] = None + included_locations: Optional[List[str]] = None + included_processes: Optional[List[str]] = None + included_services: Optional[List[str]] = None + excluded_items: Optional[List[str]] = None + exclusion_justification: Optional[str] = None + organizational_boundary: Optional[str] = None + physical_boundary: Optional[str] = None + technical_boundary: Optional[str] = None + + +class ISMSScopeResponse(ISMSScopeBase): + """Response schema for ISMS Scope.""" + id: str + version: str + status: str + approved_by: Optional[str] = None + approved_at: Optional[datetime] = None + effective_date: Optional[date] = None + review_date: Optional[date] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class ISMSScopeApproveRequest(BaseModel): + """Request to approve ISMS Scope.""" + approved_by: str + effective_date: date + review_date: date + + +# --- ISMS Context (ISO 27001 4.1, 4.2) --- + +class ContextIssue(BaseModel): + """Single context issue.""" + issue: str + impact: str + treatment: Optional[str] = None + + +class InterestedParty(BaseModel): + """Single interested party.""" + party: str + requirements: List[str] + relevance: str + + +class ISMSContextBase(BaseModel): + """Base schema for ISMS Context.""" + internal_issues: Optional[List[ContextIssue]] = None + external_issues: Optional[List[ContextIssue]] = None + interested_parties: Optional[List[InterestedParty]] = None + regulatory_requirements: Optional[List[str]] = None + contractual_requirements: Optional[List[str]] = None + swot_strengths: Optional[List[str]] = None + swot_weaknesses: Optional[List[str]] = None + swot_opportunities: Optional[List[str]] = None + swot_threats: Optional[List[str]] = None + + +class ISMSContextCreate(ISMSContextBase): + """Schema for creating ISMS Context.""" + pass + + +class ISMSContextResponse(ISMSContextBase): + """Response schema for ISMS Context.""" + id: str + version: str + status: str + approved_by: Optional[str] = None + approved_at: Optional[datetime] = None + last_reviewed_at: Optional[datetime] = None + next_review_date: Optional[date] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +# --- ISMS Policies (ISO 27001 5.2) --- + +class ISMSPolicyBase(BaseModel): + """Base schema for ISMS Policy.""" + policy_id: str + title: str + policy_type: str # "master", "operational", "technical" + description: Optional[str] = None + policy_text: str + applies_to: Optional[List[str]] = None + review_frequency_months: int = 12 + related_controls: Optional[List[str]] = None + + +class ISMSPolicyCreate(ISMSPolicyBase): + """Schema for creating ISMS Policy.""" + authored_by: str + + +class ISMSPolicyUpdate(BaseModel): + """Schema for updating ISMS Policy.""" + title: Optional[str] = None + description: Optional[str] = None + policy_text: Optional[str] = None + applies_to: Optional[List[str]] = None + review_frequency_months: Optional[int] = None + related_controls: Optional[List[str]] = None + + +class ISMSPolicyResponse(ISMSPolicyBase): + """Response schema for ISMS Policy.""" + id: str + version: str + status: str + authored_by: Optional[str] = None + reviewed_by: Optional[str] = None + approved_by: Optional[str] = None + approved_at: Optional[datetime] = None + effective_date: Optional[date] = None + next_review_date: Optional[date] = None + document_path: Optional[str] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class ISMSPolicyListResponse(BaseModel): + """List response for ISMS Policies.""" + policies: List[ISMSPolicyResponse] + total: int + + +class ISMSPolicyApproveRequest(BaseModel): + """Request to approve ISMS Policy.""" + reviewed_by: str + approved_by: str + effective_date: date + + +# --- Security Objectives (ISO 27001 6.2) --- + +class SecurityObjectiveBase(BaseModel): + """Base schema for Security Objective.""" + objective_id: str + title: str + description: Optional[str] = None + category: str # "availability", "confidentiality", "integrity", "compliance" + specific: Optional[str] = None + measurable: Optional[str] = None + achievable: Optional[str] = None + relevant: Optional[str] = None + time_bound: Optional[str] = None + kpi_name: Optional[str] = None + kpi_target: Optional[str] = None + kpi_unit: Optional[str] = None + measurement_frequency: Optional[str] = None + owner: Optional[str] = None + target_date: Optional[date] = None + related_controls: Optional[List[str]] = None + related_risks: Optional[List[str]] = None + + +class SecurityObjectiveCreate(SecurityObjectiveBase): + """Schema for creating Security Objective.""" + pass + + +class SecurityObjectiveUpdate(BaseModel): + """Schema for updating Security Objective.""" + title: Optional[str] = None + description: Optional[str] = None + kpi_current: Optional[str] = None + progress_percentage: Optional[int] = None + status: Optional[str] = None + + +class SecurityObjectiveResponse(SecurityObjectiveBase): + """Response schema for Security Objective.""" + id: str + kpi_current: Optional[str] = None + status: str + progress_percentage: int + achieved_date: Optional[date] = None + approved_by: Optional[str] = None + approved_at: Optional[datetime] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class SecurityObjectiveListResponse(BaseModel): + """List response for Security Objectives.""" + objectives: List[SecurityObjectiveResponse] + total: int + + +# --- Statement of Applicability (SoA) --- + +class SoAEntryBase(BaseModel): + """Base schema for SoA Entry.""" + annex_a_control: str # e.g., "A.5.1" + annex_a_title: str + annex_a_category: Optional[str] = None + is_applicable: bool + applicability_justification: str + implementation_status: str = "planned" + implementation_notes: Optional[str] = None + breakpilot_control_ids: Optional[List[str]] = None + coverage_level: str = "full" + evidence_description: Optional[str] = None + risk_assessment_notes: Optional[str] = None + compensating_controls: Optional[str] = None + + +class SoAEntryCreate(SoAEntryBase): + """Schema for creating SoA Entry.""" + pass + + +class SoAEntryUpdate(BaseModel): + """Schema for updating SoA Entry.""" + is_applicable: Optional[bool] = None + applicability_justification: Optional[str] = None + implementation_status: Optional[str] = None + implementation_notes: Optional[str] = None + breakpilot_control_ids: Optional[List[str]] = None + coverage_level: Optional[str] = None + evidence_description: Optional[str] = None + + +class SoAEntryResponse(SoAEntryBase): + """Response schema for SoA Entry.""" + id: str + evidence_ids: Optional[List[str]] = None + reviewed_by: Optional[str] = None + reviewed_at: Optional[datetime] = None + approved_by: Optional[str] = None + approved_at: Optional[datetime] = None + version: str + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class SoAListResponse(BaseModel): + """List response for SoA.""" + entries: List[SoAEntryResponse] + total: int + applicable_count: int + not_applicable_count: int + implemented_count: int + planned_count: int + + +class SoAApproveRequest(BaseModel): + """Request to approve SoA entry.""" + reviewed_by: str + approved_by: str + + +# --- Audit Findings (Major/Minor/OFI) --- + diff --git a/backend-compliance/compliance/schemas/regulation.py b/backend-compliance/compliance/schemas/regulation.py new file mode 100644 index 0000000..3529764 --- /dev/null +++ b/backend-compliance/compliance/schemas/regulation.py @@ -0,0 +1,52 @@ +""" +Regulation Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Regulation Schemas +# ============================================================================ + +class RegulationBase(BaseModel): + code: str + name: str + full_name: Optional[str] = None + regulation_type: str + source_url: Optional[str] = None + local_pdf_path: Optional[str] = None + effective_date: Optional[date] = None + description: Optional[str] = None + is_active: bool = True + + +class RegulationCreate(RegulationBase): + pass + + +class RegulationResponse(RegulationBase): + id: str + created_at: datetime + updated_at: datetime + requirement_count: Optional[int] = None + + model_config = ConfigDict(from_attributes=True) + + +class RegulationListResponse(BaseModel): + regulations: List[RegulationResponse] + total: int + diff --git a/backend-compliance/compliance/schemas/report.py b/backend-compliance/compliance/schemas/report.py new file mode 100644 index 0000000..9933261 --- /dev/null +++ b/backend-compliance/compliance/schemas/report.py @@ -0,0 +1,53 @@ +""" +Report generation Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Report Generation Schemas (Phase 3 - Sprint 3) +# ============================================================================ + +class GenerateReportRequest(BaseModel): + """Request to generate an audit report.""" + session_id: str + report_type: str = "full" # "full", "summary", "non_compliant_only" + include_evidence: bool = True + include_signatures: bool = True + language: str = "de" # "de" or "en" + + +class ReportGenerationResponse(BaseModel): + """Response for report generation.""" + report_id: str + session_id: str + status: str # "pending", "generating", "completed", "failed" + report_type: str + file_path: Optional[str] = None + file_size_bytes: Optional[int] = None + created_at: datetime + completed_at: Optional[datetime] = None + error_message: Optional[str] = None + + +class ReportDownloadResponse(BaseModel): + """Response for report download.""" + report_id: str + filename: str + mime_type: str + file_size_bytes: int + download_url: str + diff --git a/backend-compliance/compliance/schemas/requirement.py b/backend-compliance/compliance/schemas/requirement.py new file mode 100644 index 0000000..beab892 --- /dev/null +++ b/backend-compliance/compliance/schemas/requirement.py @@ -0,0 +1,80 @@ +""" +Requirement Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Requirement Schemas +# ============================================================================ + +class RequirementBase(BaseModel): + article: str + paragraph: Optional[str] = None + title: str + description: Optional[str] = None + requirement_text: Optional[str] = None + breakpilot_interpretation: Optional[str] = None + is_applicable: bool = True + applicability_reason: Optional[str] = None + priority: int = 2 + + +class RequirementCreate(RequirementBase): + regulation_id: str + + +class RequirementResponse(RequirementBase): + id: str + regulation_id: str + regulation_code: Optional[str] = None + + # Implementation tracking + implementation_status: Optional[str] = "not_started" + implementation_details: Optional[str] = None + code_references: Optional[List[Dict[str, Any]]] = None + documentation_links: Optional[List[str]] = None + + # Evidence for auditors + evidence_description: Optional[str] = None + evidence_artifacts: Optional[List[Dict[str, Any]]] = None + + # Audit tracking + auditor_notes: Optional[str] = None + audit_status: Optional[str] = "pending" + last_audit_date: Optional[datetime] = None + last_auditor: Optional[str] = None + + # Source reference + source_page: Optional[int] = None + source_section: Optional[str] = None + + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class RequirementListResponse(BaseModel): + requirements: List[RequirementResponse] + total: int + + +class PaginatedRequirementResponse(BaseModel): + """Paginated response for requirements - optimized for large datasets.""" + data: List[RequirementResponse] + pagination: PaginationMeta + diff --git a/backend-compliance/compliance/schemas/risk.py b/backend-compliance/compliance/schemas/risk.py new file mode 100644 index 0000000..51503b5 --- /dev/null +++ b/backend-compliance/compliance/schemas/risk.py @@ -0,0 +1,79 @@ +""" +Risk Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Risk Schemas +# ============================================================================ + +class RiskBase(BaseModel): + risk_id: str + title: str + description: Optional[str] = None + category: str + likelihood: int = Field(ge=1, le=5) + impact: int = Field(ge=1, le=5) + mitigating_controls: Optional[List[str]] = None + owner: Optional[str] = None + treatment_plan: Optional[str] = None + + +class RiskCreate(RiskBase): + pass + + +class RiskUpdate(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + category: Optional[str] = None + likelihood: Optional[int] = Field(default=None, ge=1, le=5) + impact: Optional[int] = Field(default=None, ge=1, le=5) + residual_likelihood: Optional[int] = Field(default=None, ge=1, le=5) + residual_impact: Optional[int] = Field(default=None, ge=1, le=5) + mitigating_controls: Optional[List[str]] = None + owner: Optional[str] = None + status: Optional[str] = None + treatment_plan: Optional[str] = None + + +class RiskResponse(RiskBase): + id: str + inherent_risk: str + residual_likelihood: Optional[int] = None + residual_impact: Optional[int] = None + residual_risk: Optional[str] = None + status: str + identified_date: Optional[date] = None + review_date: Optional[date] = None + last_assessed_at: Optional[datetime] = None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class RiskListResponse(BaseModel): + risks: List[RiskResponse] + total: int + + +class RiskMatrixResponse(BaseModel): + """Risk matrix data for visualization.""" + matrix: Dict[str, Dict[str, List[str]]] # likelihood -> impact -> risk_ids + risks: List[RiskResponse] + diff --git a/backend-compliance/compliance/schemas/service_module.py b/backend-compliance/compliance/schemas/service_module.py new file mode 100644 index 0000000..21516ae --- /dev/null +++ b/backend-compliance/compliance/schemas/service_module.py @@ -0,0 +1,121 @@ +""" +Service Module Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# Service Module Schemas (Sprint 3) +# ============================================================================ + +class ServiceModuleBase(BaseModel): + """Base schema for service modules.""" + name: str + display_name: str + description: Optional[str] = None + service_type: str + port: Optional[int] = None + technology_stack: Optional[List[str]] = None + repository_path: Optional[str] = None + docker_image: Optional[str] = None + data_categories: Optional[List[str]] = None + processes_pii: bool = False + processes_health_data: bool = False + ai_components: bool = False + criticality: str = "medium" + owner_team: Optional[str] = None + owner_contact: Optional[str] = None + + +class ServiceModuleCreate(ServiceModuleBase): + """Schema for creating a service module.""" + pass + + +class ServiceModuleResponse(ServiceModuleBase): + """Response schema for service module.""" + id: str + is_active: bool + compliance_score: Optional[float] = None + last_compliance_check: Optional[datetime] = None + created_at: datetime + updated_at: datetime + regulation_count: Optional[int] = None + risk_count: Optional[int] = None + + model_config = ConfigDict(from_attributes=True) + + +class ServiceModuleListResponse(BaseModel): + """List response for service modules.""" + modules: List[ServiceModuleResponse] + total: int + + +class ServiceModuleDetailResponse(ServiceModuleResponse): + """Detailed response including regulations and risks.""" + regulations: Optional[List[Dict[str, Any]]] = None + risks: Optional[List[Dict[str, Any]]] = None + + +class ModuleRegulationMappingBase(BaseModel): + """Base schema for module-regulation mapping.""" + module_id: str + regulation_id: str + relevance_level: str = "medium" + notes: Optional[str] = None + applicable_articles: Optional[List[str]] = None + + +class ModuleRegulationMappingCreate(ModuleRegulationMappingBase): + """Schema for creating a module-regulation mapping.""" + pass + + +class ModuleRegulationMappingResponse(ModuleRegulationMappingBase): + """Response schema for module-regulation mapping.""" + id: str + module_name: Optional[str] = None + regulation_code: Optional[str] = None + regulation_name: Optional[str] = None + created_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class ModuleSeedRequest(BaseModel): + """Request to seed service modules.""" + force: bool = False + + +class ModuleSeedResponse(BaseModel): + """Response from seeding service modules.""" + success: bool + message: str + modules_created: int + mappings_created: int + + +class ModuleComplianceOverview(BaseModel): + """Overview of compliance status for all modules.""" + total_modules: int + modules_by_type: Dict[str, int] + modules_by_criticality: Dict[str, int] + modules_processing_pii: int + modules_with_ai: int + average_compliance_score: Optional[float] = None + regulations_coverage: Dict[str, int] # regulation_code -> module_count + diff --git a/backend-compliance/compliance/schemas/tom.py b/backend-compliance/compliance/schemas/tom.py new file mode 100644 index 0000000..488b26f --- /dev/null +++ b/backend-compliance/compliance/schemas/tom.py @@ -0,0 +1,71 @@ +""" +TOM (Technisch-Organisatorische Maßnahmen) Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# TOM — Technisch-Organisatorische Massnahmen (Art. 32 DSGVO) +# ============================================================================ + +class TOMStateResponse(BaseModel): + tenant_id: str + state: Dict[str, Any] = {} + version: int = 0 + last_modified: Optional[datetime] = None + is_new: bool = False + + +class TOMMeasureResponse(BaseModel): + id: str + tenant_id: str + control_id: str + name: str + description: Optional[str] = None + category: str + type: str + applicability: str = "REQUIRED" + applicability_reason: Optional[str] = None + implementation_status: str = "NOT_IMPLEMENTED" + responsible_person: Optional[str] = None + responsible_department: Optional[str] = None + implementation_date: Optional[datetime] = None + review_date: Optional[datetime] = None + review_frequency: Optional[str] = None + priority: Optional[str] = None + complexity: Optional[str] = None + linked_evidence: List[Any] = [] + evidence_gaps: List[Any] = [] + related_controls: Dict[str, Any] = {} + verified_at: Optional[datetime] = None + verified_by: Optional[str] = None + effectiveness_rating: Optional[str] = None + created_by: Optional[str] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + + +class TOMStatsResponse(BaseModel): + total: int = 0 + by_status: Dict[str, int] = {} + by_category: Dict[str, int] = {} + overdue_review_count: int = 0 + implemented: int = 0 + partial: int = 0 + not_implemented: int = 0 + diff --git a/backend-compliance/compliance/schemas/vvt.py b/backend-compliance/compliance/schemas/vvt.py new file mode 100644 index 0000000..70743a8 --- /dev/null +++ b/backend-compliance/compliance/schemas/vvt.py @@ -0,0 +1,168 @@ +""" +VVT (Verzeichnis von Verarbeitungstätigkeiten) Pydantic schemas — extracted from compliance/api/schemas.py. + +Phase 1 Step 3: the monolithic ``compliance.api.schemas`` module is being +split per domain under ``compliance.schemas``. This module is re-exported +from ``compliance.api.schemas`` for backwards compatibility. +""" + +from datetime import datetime, date +from typing import Optional, List, Any, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from compliance.schemas.common import ( + PaginationMeta, RegulationType, ControlType, ControlDomain, + ControlStatus, RiskLevel, EvidenceStatus, +) + + +# ============================================================================ +# VVT Schemas — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO) +# ============================================================================ + +class VVTOrganizationUpdate(BaseModel): + organization_name: Optional[str] = None + industry: Optional[str] = None + locations: Optional[List[str]] = None + employee_count: Optional[int] = None + dpo_name: Optional[str] = None + dpo_contact: Optional[str] = None + vvt_version: Optional[str] = None + last_review_date: Optional[date] = None + next_review_date: Optional[date] = None + review_interval: Optional[str] = None + + +class VVTOrganizationResponse(BaseModel): + id: str + organization_name: str + industry: Optional[str] = None + locations: List[Any] = [] + employee_count: Optional[int] = None + dpo_name: Optional[str] = None + dpo_contact: Optional[str] = None + vvt_version: str = '1.0' + last_review_date: Optional[date] = None + next_review_date: Optional[date] = None + review_interval: str = 'annual' + created_at: datetime + updated_at: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + + +class VVTActivityCreate(BaseModel): + vvt_id: str + name: str + description: Optional[str] = None + purposes: List[str] = [] + legal_bases: List[str] = [] + data_subject_categories: List[str] = [] + personal_data_categories: List[str] = [] + recipient_categories: List[str] = [] + third_country_transfers: List[Any] = [] + retention_period: Dict[str, Any] = {} + tom_description: Optional[str] = None + business_function: Optional[str] = None + systems: List[str] = [] + deployment_model: Optional[str] = None + data_sources: List[Any] = [] + data_flows: List[Any] = [] + protection_level: str = 'MEDIUM' + dpia_required: bool = False + structured_toms: Dict[str, Any] = {} + status: str = 'DRAFT' + responsible: Optional[str] = None + owner: Optional[str] = None + last_reviewed_at: Optional[datetime] = None + next_review_at: Optional[datetime] = None + created_by: Optional[str] = None + dsfa_id: Optional[str] = None + + +class VVTActivityUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + purposes: Optional[List[str]] = None + legal_bases: Optional[List[str]] = None + data_subject_categories: Optional[List[str]] = None + personal_data_categories: Optional[List[str]] = None + recipient_categories: Optional[List[str]] = None + third_country_transfers: Optional[List[Any]] = None + retention_period: Optional[Dict[str, Any]] = None + tom_description: Optional[str] = None + business_function: Optional[str] = None + systems: Optional[List[str]] = None + deployment_model: Optional[str] = None + data_sources: Optional[List[Any]] = None + data_flows: Optional[List[Any]] = None + protection_level: Optional[str] = None + dpia_required: Optional[bool] = None + structured_toms: Optional[Dict[str, Any]] = None + status: Optional[str] = None + responsible: Optional[str] = None + owner: Optional[str] = None + last_reviewed_at: Optional[datetime] = None + next_review_at: Optional[datetime] = None + created_by: Optional[str] = None + dsfa_id: Optional[str] = None + + +class VVTActivityResponse(BaseModel): + id: str + vvt_id: str + name: str + description: Optional[str] = None + purposes: List[Any] = [] + legal_bases: List[Any] = [] + data_subject_categories: List[Any] = [] + personal_data_categories: List[Any] = [] + recipient_categories: List[Any] = [] + third_country_transfers: List[Any] = [] + retention_period: Dict[str, Any] = {} + tom_description: Optional[str] = None + business_function: Optional[str] = None + systems: List[Any] = [] + deployment_model: Optional[str] = None + data_sources: List[Any] = [] + data_flows: List[Any] = [] + protection_level: str = 'MEDIUM' + dpia_required: bool = False + structured_toms: Dict[str, Any] = {} + status: str = 'DRAFT' + responsible: Optional[str] = None + owner: Optional[str] = None + last_reviewed_at: Optional[datetime] = None + next_review_at: Optional[datetime] = None + created_by: Optional[str] = None + dsfa_id: Optional[str] = None + created_at: datetime + updated_at: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + + +class VVTStatsResponse(BaseModel): + total: int + by_status: Dict[str, int] + by_business_function: Dict[str, int] + dpia_required_count: int + third_country_count: int + draft_count: int + approved_count: int + overdue_review_count: int = 0 + + +class VVTAuditLogEntry(BaseModel): + id: str + action: str + entity_type: str + entity_id: Optional[str] = None + changed_by: Optional[str] = None + old_values: Optional[Dict[str, Any]] = None + new_values: Optional[Dict[str, Any]] = None + created_at: datetime + + model_config = ConfigDict(from_attributes=True) +