Phase 1 — VVT Backend (localStorage → API): - migrations/006_vvt.sql: Neue Tabellen (vvt_organization, vvt_activities, vvt_audit_log) - compliance/db/vvt_models.py: SQLAlchemy-Models für alle VVT-Tabellen - compliance/api/vvt_routes.py: Vollständiger CRUD-Router (10 Endpoints) - compliance/api/__init__.py: VVT-Router registriert - compliance/api/schemas.py: VVT Pydantic-Schemas ergänzt - app/(sdk)/sdk/vvt/page.tsx: API-Client + camelCase↔snake_case Mapping, localStorage durch persistente DB-Calls ersetzt (POST/PUT/DELETE/GET) - tests/test_vvt_routes.py: 18 Tests (alle grün) Phase 3 — Document Generator PDF-Export: - document-generator/page.tsx: "Als PDF exportieren"-Button funktioniert jetzt via window.print() + Print-Window mit korrektem HTML - Fallback-Banner wenn Template-Service (breakpilot-core) nicht erreichbar Phase 4 — Source Policy erweiterte Filter: - SourcesTab.tsx: source_type-Filter (Rechtlich / Leitlinien / Vorlagen / etc.) - PIIRulesTab.tsx: category-Filter (E-Mail / Telefon / IBAN / etc.) - source_policy_router.py: Backend-Endpoints unterstützen jetzt source_type und category als Query-Parameter - requirements.txt: reportlab==4.2.5 ergänzt (fehlende Audit-PDF-Dependency) Phase 2 — Training (Migration-Skripte): - scripts/apply_training_migrations.sh: SSH-Skript für Mac Mini - scripts/apply_vvt_migration.sh: Vollständiges Deploy-Skript für VVT Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1992 lines
55 KiB
Python
1992 lines
55 KiB
Python
"""
|
|
Pydantic schemas for Compliance API.
|
|
"""
|
|
|
|
from datetime import datetime, date
|
|
from typing import Optional, List, Any, Dict
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
# ============================================================================
|
|
# Enums as strings for API
|
|
# ============================================================================
|
|
|
|
class RegulationType(str):
|
|
EU_REGULATION = "eu_regulation"
|
|
EU_DIRECTIVE = "eu_directive"
|
|
DE_LAW = "de_law"
|
|
BSI_STANDARD = "bsi_standard"
|
|
INDUSTRY_STANDARD = "industry_standard"
|
|
|
|
|
|
class ControlType(str):
|
|
PREVENTIVE = "preventive"
|
|
DETECTIVE = "detective"
|
|
CORRECTIVE = "corrective"
|
|
|
|
|
|
class ControlDomain(str):
|
|
GOVERNANCE = "gov"
|
|
PRIVACY = "priv"
|
|
IAM = "iam"
|
|
CRYPTO = "crypto"
|
|
SDLC = "sdlc"
|
|
OPS = "ops"
|
|
AI = "ai"
|
|
CRA = "cra"
|
|
AUDIT = "aud"
|
|
|
|
|
|
class ControlStatus(str):
|
|
PASS = "pass"
|
|
PARTIAL = "partial"
|
|
FAIL = "fail"
|
|
NOT_APPLICABLE = "n/a"
|
|
PLANNED = "planned"
|
|
|
|
|
|
class RiskLevel(str):
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
CRITICAL = "critical"
|
|
|
|
|
|
class EvidenceStatus(str):
|
|
VALID = "valid"
|
|
EXPIRED = "expired"
|
|
PENDING = "pending"
|
|
FAILED = "failed"
|
|
|
|
|
|
# ============================================================================
|
|
# Regulation Schemas
|
|
# ============================================================================
|
|
|
|
class RegulationBase(BaseModel):
|
|
code: str
|
|
name: str
|
|
full_name: Optional[str] = None
|
|
regulation_type: str
|
|
source_url: Optional[str] = None
|
|
local_pdf_path: Optional[str] = None
|
|
effective_date: Optional[date] = None
|
|
description: Optional[str] = None
|
|
is_active: bool = True
|
|
|
|
|
|
class RegulationCreate(RegulationBase):
|
|
pass
|
|
|
|
|
|
class RegulationResponse(RegulationBase):
|
|
id: str
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
requirement_count: Optional[int] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class RegulationListResponse(BaseModel):
|
|
regulations: List[RegulationResponse]
|
|
total: int
|
|
|
|
|
|
# ============================================================================
|
|
# Pagination Schemas (defined here, completed after Response classes)
|
|
# ============================================================================
|
|
|
|
class PaginationMeta(BaseModel):
|
|
"""Pagination metadata for list responses."""
|
|
page: int
|
|
page_size: int
|
|
total: int
|
|
total_pages: int
|
|
has_next: bool
|
|
has_prev: bool
|
|
|
|
|
|
# ============================================================================
|
|
# Requirement Schemas
|
|
# ============================================================================
|
|
|
|
class RequirementBase(BaseModel):
|
|
article: str
|
|
paragraph: Optional[str] = None
|
|
title: str
|
|
description: Optional[str] = None
|
|
requirement_text: Optional[str] = None
|
|
breakpilot_interpretation: Optional[str] = None
|
|
is_applicable: bool = True
|
|
applicability_reason: Optional[str] = None
|
|
priority: int = 2
|
|
|
|
|
|
class RequirementCreate(RequirementBase):
|
|
regulation_id: str
|
|
|
|
|
|
class RequirementResponse(RequirementBase):
|
|
id: str
|
|
regulation_id: str
|
|
regulation_code: Optional[str] = None
|
|
|
|
# Implementation tracking
|
|
implementation_status: Optional[str] = "not_started"
|
|
implementation_details: Optional[str] = None
|
|
code_references: Optional[List[Dict[str, Any]]] = None
|
|
documentation_links: Optional[List[str]] = None
|
|
|
|
# Evidence for auditors
|
|
evidence_description: Optional[str] = None
|
|
evidence_artifacts: Optional[List[Dict[str, Any]]] = None
|
|
|
|
# Audit tracking
|
|
auditor_notes: Optional[str] = None
|
|
audit_status: Optional[str] = "pending"
|
|
last_audit_date: Optional[datetime] = None
|
|
last_auditor: Optional[str] = None
|
|
|
|
# Source reference
|
|
source_page: Optional[int] = None
|
|
source_section: Optional[str] = None
|
|
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class RequirementListResponse(BaseModel):
|
|
requirements: List[RequirementResponse]
|
|
total: int
|
|
|
|
|
|
class PaginatedRequirementResponse(BaseModel):
|
|
"""Paginated response for requirements - optimized for large datasets."""
|
|
data: List[RequirementResponse]
|
|
pagination: PaginationMeta
|
|
|
|
|
|
# ============================================================================
|
|
# Control Schemas
|
|
# ============================================================================
|
|
|
|
class ControlBase(BaseModel):
|
|
control_id: str
|
|
domain: str
|
|
control_type: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
pass_criteria: str
|
|
implementation_guidance: Optional[str] = None
|
|
code_reference: Optional[str] = None
|
|
documentation_url: Optional[str] = None
|
|
is_automated: bool = False
|
|
automation_tool: Optional[str] = None
|
|
automation_config: Optional[Dict[str, Any]] = None
|
|
owner: Optional[str] = None
|
|
review_frequency_days: int = 90
|
|
|
|
|
|
class ControlCreate(ControlBase):
|
|
pass
|
|
|
|
|
|
class ControlUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
pass_criteria: Optional[str] = None
|
|
implementation_guidance: Optional[str] = None
|
|
code_reference: Optional[str] = None
|
|
documentation_url: Optional[str] = None
|
|
is_automated: Optional[bool] = None
|
|
automation_tool: Optional[str] = None
|
|
automation_config: Optional[Dict[str, Any]] = None
|
|
owner: Optional[str] = None
|
|
status: Optional[str] = None
|
|
status_notes: Optional[str] = None
|
|
|
|
|
|
class ControlResponse(ControlBase):
|
|
id: str
|
|
status: str
|
|
status_notes: Optional[str] = None
|
|
last_reviewed_at: Optional[datetime] = None
|
|
next_review_at: Optional[datetime] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
evidence_count: Optional[int] = None
|
|
requirement_count: Optional[int] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ControlListResponse(BaseModel):
|
|
controls: List[ControlResponse]
|
|
total: int
|
|
|
|
|
|
class PaginatedControlResponse(BaseModel):
|
|
"""Paginated response for controls - optimized for large datasets."""
|
|
data: List[ControlResponse]
|
|
pagination: PaginationMeta
|
|
|
|
|
|
class ControlReviewRequest(BaseModel):
|
|
status: str
|
|
status_notes: Optional[str] = None
|
|
|
|
|
|
# ============================================================================
|
|
# Control Mapping Schemas
|
|
# ============================================================================
|
|
|
|
class MappingBase(BaseModel):
|
|
requirement_id: str
|
|
control_id: str
|
|
coverage_level: str = "full"
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class MappingCreate(MappingBase):
|
|
pass
|
|
|
|
|
|
class MappingResponse(MappingBase):
|
|
id: str
|
|
requirement_article: Optional[str] = None
|
|
requirement_title: Optional[str] = None
|
|
control_control_id: Optional[str] = None
|
|
control_title: Optional[str] = None
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class MappingListResponse(BaseModel):
|
|
mappings: List[MappingResponse]
|
|
total: int
|
|
|
|
|
|
# ============================================================================
|
|
# Evidence Schemas
|
|
# ============================================================================
|
|
|
|
class EvidenceBase(BaseModel):
|
|
control_id: str
|
|
evidence_type: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
artifact_url: Optional[str] = None
|
|
valid_from: Optional[datetime] = None
|
|
valid_until: Optional[datetime] = None
|
|
source: Optional[str] = None
|
|
ci_job_id: Optional[str] = None
|
|
|
|
|
|
class EvidenceCreate(EvidenceBase):
|
|
pass
|
|
|
|
|
|
class EvidenceResponse(EvidenceBase):
|
|
id: str
|
|
artifact_path: Optional[str] = None
|
|
artifact_hash: Optional[str] = None
|
|
file_size_bytes: Optional[int] = None
|
|
mime_type: Optional[str] = None
|
|
status: str
|
|
uploaded_by: Optional[str] = None
|
|
collected_at: datetime
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class EvidenceListResponse(BaseModel):
|
|
evidence: List[EvidenceResponse]
|
|
total: int
|
|
|
|
|
|
class EvidenceCollectRequest(BaseModel):
|
|
"""Request to auto-collect evidence from CI."""
|
|
control_id: str
|
|
evidence_type: str
|
|
title: str
|
|
ci_job_id: str
|
|
artifact_url: str
|
|
|
|
|
|
# ============================================================================
|
|
# Risk Schemas
|
|
# ============================================================================
|
|
|
|
class RiskBase(BaseModel):
|
|
risk_id: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
category: str
|
|
likelihood: int = Field(ge=1, le=5)
|
|
impact: int = Field(ge=1, le=5)
|
|
mitigating_controls: Optional[List[str]] = None
|
|
owner: Optional[str] = None
|
|
treatment_plan: Optional[str] = None
|
|
|
|
|
|
class RiskCreate(RiskBase):
|
|
pass
|
|
|
|
|
|
class RiskUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
category: Optional[str] = None
|
|
likelihood: Optional[int] = Field(default=None, ge=1, le=5)
|
|
impact: Optional[int] = Field(default=None, ge=1, le=5)
|
|
residual_likelihood: Optional[int] = Field(default=None, ge=1, le=5)
|
|
residual_impact: Optional[int] = Field(default=None, ge=1, le=5)
|
|
mitigating_controls: Optional[List[str]] = None
|
|
owner: Optional[str] = None
|
|
status: Optional[str] = None
|
|
treatment_plan: Optional[str] = None
|
|
|
|
|
|
class RiskResponse(RiskBase):
|
|
id: str
|
|
inherent_risk: str
|
|
residual_likelihood: Optional[int] = None
|
|
residual_impact: Optional[int] = None
|
|
residual_risk: Optional[str] = None
|
|
status: str
|
|
identified_date: Optional[date] = None
|
|
review_date: Optional[date] = None
|
|
last_assessed_at: Optional[datetime] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class RiskListResponse(BaseModel):
|
|
risks: List[RiskResponse]
|
|
total: int
|
|
|
|
|
|
class RiskMatrixResponse(BaseModel):
|
|
"""Risk matrix data for visualization."""
|
|
matrix: Dict[str, Dict[str, List[str]]] # likelihood -> impact -> risk_ids
|
|
risks: List[RiskResponse]
|
|
|
|
|
|
# ============================================================================
|
|
# AI System Schemas (AI Act Compliance)
|
|
# ============================================================================
|
|
|
|
class AISystemBase(BaseModel):
|
|
name: str
|
|
description: Optional[str] = None
|
|
purpose: Optional[str] = None
|
|
sector: Optional[str] = None
|
|
classification: str = "unclassified"
|
|
status: str = "draft"
|
|
obligations: Optional[List[str]] = None
|
|
|
|
|
|
class AISystemCreate(AISystemBase):
|
|
pass
|
|
|
|
|
|
class AISystemUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
purpose: Optional[str] = None
|
|
sector: Optional[str] = None
|
|
classification: Optional[str] = None
|
|
status: Optional[str] = None
|
|
obligations: Optional[List[str]] = None
|
|
|
|
|
|
class AISystemResponse(AISystemBase):
|
|
id: str
|
|
assessment_date: Optional[datetime] = None
|
|
assessment_result: Optional[Dict[str, Any]] = None
|
|
risk_factors: Optional[List[Dict[str, Any]]] = None
|
|
recommendations: Optional[List[str]] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AISystemListResponse(BaseModel):
|
|
systems: List[AISystemResponse]
|
|
total: int
|
|
|
|
|
|
# ============================================================================
|
|
# Dashboard & Export Schemas
|
|
# ============================================================================
|
|
|
|
class DashboardResponse(BaseModel):
|
|
compliance_score: float
|
|
total_regulations: int
|
|
total_requirements: int
|
|
total_controls: int
|
|
controls_by_status: Dict[str, int]
|
|
controls_by_domain: Dict[str, Dict[str, int]]
|
|
total_evidence: int
|
|
evidence_by_status: Dict[str, int]
|
|
total_risks: int
|
|
risks_by_level: Dict[str, int]
|
|
recent_activity: List[Dict[str, Any]]
|
|
|
|
|
|
class ExportRequest(BaseModel):
|
|
export_type: str = "full" # "full", "controls_only", "evidence_only"
|
|
included_regulations: Optional[List[str]] = None
|
|
included_domains: Optional[List[str]] = None
|
|
date_range_start: Optional[date] = None
|
|
date_range_end: Optional[date] = None
|
|
|
|
|
|
class ExportResponse(BaseModel):
|
|
id: str
|
|
export_type: str
|
|
export_name: Optional[str] = None
|
|
status: str
|
|
requested_by: str
|
|
requested_at: datetime
|
|
completed_at: Optional[datetime] = None
|
|
file_path: Optional[str] = None
|
|
file_hash: Optional[str] = None
|
|
file_size_bytes: Optional[int] = None
|
|
total_controls: Optional[int] = None
|
|
total_evidence: Optional[int] = None
|
|
compliance_score: Optional[float] = None
|
|
error_message: Optional[str] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ExportListResponse(BaseModel):
|
|
exports: List[ExportResponse]
|
|
total: int
|
|
|
|
|
|
# ============================================================================
|
|
# Seeding Schemas
|
|
# ============================================================================
|
|
|
|
class SeedRequest(BaseModel):
|
|
force: bool = False
|
|
|
|
|
|
class SeedResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
counts: Dict[str, int]
|
|
|
|
|
|
# ============================================================================
|
|
# PDF Extraction Schemas
|
|
# ============================================================================
|
|
|
|
class BSIAspectResponse(BaseModel):
|
|
"""Response schema for an extracted BSI-TR Pruefaspekt."""
|
|
aspect_id: str
|
|
title: str
|
|
full_text: str
|
|
category: str
|
|
page_number: int
|
|
section: str
|
|
requirement_level: str
|
|
source_document: str
|
|
keywords: List[str] = []
|
|
related_aspects: List[str] = []
|
|
|
|
|
|
class PDFExtractionResponse(BaseModel):
|
|
"""Response for PDF extraction operation."""
|
|
success: bool
|
|
source_document: str
|
|
total_aspects: int
|
|
aspects: List[BSIAspectResponse]
|
|
statistics: Dict[str, Any]
|
|
requirements_created: int = 0
|
|
|
|
|
|
class PDFExtractionRequest(BaseModel):
|
|
"""Request to extract requirements from a PDF."""
|
|
document_code: str # e.g., "BSI-TR-03161-2"
|
|
save_to_db: bool = True
|
|
force: bool = False
|
|
|
|
|
|
# ============================================================================
|
|
# Paginated Response Schemas (after all Response classes are defined)
|
|
# ============================================================================
|
|
|
|
class PaginatedRequirementResponse(BaseModel):
|
|
"""Paginated response for requirements."""
|
|
data: List[RequirementResponse]
|
|
pagination: PaginationMeta
|
|
|
|
|
|
class PaginatedControlResponse(BaseModel):
|
|
"""Paginated response for controls."""
|
|
data: List[ControlResponse]
|
|
pagination: PaginationMeta
|
|
|
|
|
|
class PaginatedEvidenceResponse(BaseModel):
|
|
"""Paginated response for evidence."""
|
|
data: List[EvidenceResponse]
|
|
pagination: PaginationMeta
|
|
|
|
|
|
class PaginatedRiskResponse(BaseModel):
|
|
"""Paginated response for risks."""
|
|
data: List[RiskResponse]
|
|
pagination: PaginationMeta
|
|
|
|
|
|
# ============================================================================
|
|
# Service Module Schemas (Sprint 3)
|
|
# ============================================================================
|
|
|
|
class ServiceModuleBase(BaseModel):
|
|
"""Base schema for service modules."""
|
|
name: str
|
|
display_name: str
|
|
description: Optional[str] = None
|
|
service_type: str
|
|
port: Optional[int] = None
|
|
technology_stack: Optional[List[str]] = None
|
|
repository_path: Optional[str] = None
|
|
docker_image: Optional[str] = None
|
|
data_categories: Optional[List[str]] = None
|
|
processes_pii: bool = False
|
|
processes_health_data: bool = False
|
|
ai_components: bool = False
|
|
criticality: str = "medium"
|
|
owner_team: Optional[str] = None
|
|
owner_contact: Optional[str] = None
|
|
|
|
|
|
class ServiceModuleCreate(ServiceModuleBase):
|
|
"""Schema for creating a service module."""
|
|
pass
|
|
|
|
|
|
class ServiceModuleResponse(ServiceModuleBase):
|
|
"""Response schema for service module."""
|
|
id: str
|
|
is_active: bool
|
|
compliance_score: Optional[float] = None
|
|
last_compliance_check: Optional[datetime] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
regulation_count: Optional[int] = None
|
|
risk_count: Optional[int] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ServiceModuleListResponse(BaseModel):
|
|
"""List response for service modules."""
|
|
modules: List[ServiceModuleResponse]
|
|
total: int
|
|
|
|
|
|
class ServiceModuleDetailResponse(ServiceModuleResponse):
|
|
"""Detailed response including regulations and risks."""
|
|
regulations: Optional[List[Dict[str, Any]]] = None
|
|
risks: Optional[List[Dict[str, Any]]] = None
|
|
|
|
|
|
class ModuleRegulationMappingBase(BaseModel):
|
|
"""Base schema for module-regulation mapping."""
|
|
module_id: str
|
|
regulation_id: str
|
|
relevance_level: str = "medium"
|
|
notes: Optional[str] = None
|
|
applicable_articles: Optional[List[str]] = None
|
|
|
|
|
|
class ModuleRegulationMappingCreate(ModuleRegulationMappingBase):
|
|
"""Schema for creating a module-regulation mapping."""
|
|
pass
|
|
|
|
|
|
class ModuleRegulationMappingResponse(ModuleRegulationMappingBase):
|
|
"""Response schema for module-regulation mapping."""
|
|
id: str
|
|
module_name: Optional[str] = None
|
|
regulation_code: Optional[str] = None
|
|
regulation_name: Optional[str] = None
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ModuleSeedRequest(BaseModel):
|
|
"""Request to seed service modules."""
|
|
force: bool = False
|
|
|
|
|
|
class ModuleSeedResponse(BaseModel):
|
|
"""Response from seeding service modules."""
|
|
success: bool
|
|
message: str
|
|
modules_created: int
|
|
mappings_created: int
|
|
|
|
|
|
class ModuleComplianceOverview(BaseModel):
|
|
"""Overview of compliance status for all modules."""
|
|
total_modules: int
|
|
modules_by_type: Dict[str, int]
|
|
modules_by_criticality: Dict[str, int]
|
|
modules_processing_pii: int
|
|
modules_with_ai: int
|
|
average_compliance_score: Optional[float] = None
|
|
regulations_coverage: Dict[str, int] # regulation_code -> module_count
|
|
|
|
|
|
# ============================================================================
|
|
# AI Assistant Schemas (Sprint 4)
|
|
# ============================================================================
|
|
|
|
class AIInterpretationRequest(BaseModel):
|
|
"""Request for AI interpretation of a requirement."""
|
|
requirement_id: str
|
|
force_refresh: bool = False
|
|
|
|
|
|
class AIInterpretationResponse(BaseModel):
|
|
"""AI-generated interpretation of a requirement."""
|
|
requirement_id: str
|
|
summary: str
|
|
applicability: str
|
|
technical_measures: List[str]
|
|
affected_modules: List[str]
|
|
risk_level: str
|
|
implementation_hints: List[str]
|
|
confidence_score: float
|
|
error: Optional[str] = None
|
|
|
|
|
|
class AIBatchInterpretationRequest(BaseModel):
|
|
"""Request for batch interpretation of requirements."""
|
|
requirement_ids: List[str]
|
|
regulation_code: Optional[str] = None
|
|
rate_limit: float = 1.0
|
|
|
|
|
|
class AIBatchInterpretationResponse(BaseModel):
|
|
"""Response for batch interpretation."""
|
|
total: int
|
|
processed: int
|
|
interpretations: List[AIInterpretationResponse]
|
|
|
|
|
|
class AIControlSuggestionRequest(BaseModel):
|
|
"""Request for AI control suggestions."""
|
|
requirement_id: str
|
|
|
|
|
|
class AIControlSuggestionItem(BaseModel):
|
|
"""A single control suggestion from AI."""
|
|
control_id: str
|
|
domain: str
|
|
title: str
|
|
description: str
|
|
pass_criteria: str
|
|
implementation_guidance: str
|
|
is_automated: bool
|
|
automation_tool: Optional[str] = None
|
|
priority: str
|
|
confidence_score: float
|
|
|
|
|
|
class AIControlSuggestionResponse(BaseModel):
|
|
"""Response with AI control suggestions."""
|
|
requirement_id: str
|
|
suggestions: List[AIControlSuggestionItem]
|
|
|
|
|
|
class AIRiskAssessmentRequest(BaseModel):
|
|
"""Request for AI risk assessment of a module."""
|
|
module_id: str
|
|
|
|
|
|
class AIRiskFactor(BaseModel):
|
|
"""A risk factor in the assessment."""
|
|
factor: str
|
|
severity: str
|
|
likelihood: str
|
|
|
|
|
|
class AIRiskAssessmentResponse(BaseModel):
|
|
"""AI-generated risk assessment."""
|
|
module_name: str
|
|
overall_risk: str
|
|
risk_factors: List[AIRiskFactor]
|
|
recommendations: List[str]
|
|
compliance_gaps: List[str]
|
|
confidence_score: float
|
|
|
|
|
|
class AIGapAnalysisRequest(BaseModel):
|
|
"""Request for gap analysis."""
|
|
requirement_id: str
|
|
|
|
|
|
class AIGapAnalysisResponse(BaseModel):
|
|
"""AI-generated gap analysis."""
|
|
requirement_id: str
|
|
requirement_title: str
|
|
coverage_level: str
|
|
existing_controls: List[str]
|
|
missing_coverage: List[str]
|
|
suggested_actions: List[str]
|
|
|
|
|
|
class AIStatusResponse(BaseModel):
|
|
"""Status of the AI provider."""
|
|
provider: str
|
|
model: str
|
|
is_available: bool
|
|
is_mock: bool
|
|
error: Optional[str] = None
|
|
|
|
|
|
# ============================================================================
|
|
# Executive Dashboard Schemas (Phase 3 - Sprint 1)
|
|
# ============================================================================
|
|
|
|
class TrendDataPoint(BaseModel):
|
|
"""A single data point for trend charts."""
|
|
date: str # ISO date string
|
|
score: float
|
|
label: Optional[str] = None # Formatted date for display (e.g., "Jan 26")
|
|
|
|
|
|
class RiskSummary(BaseModel):
|
|
"""Summary of a risk for executive display."""
|
|
id: str
|
|
risk_id: str
|
|
title: str
|
|
risk_level: str # "low", "medium", "high", "critical"
|
|
owner: Optional[str] = None
|
|
status: str
|
|
category: str
|
|
impact: int
|
|
likelihood: int
|
|
|
|
|
|
class DeadlineItem(BaseModel):
|
|
"""An upcoming deadline for executive display."""
|
|
id: str
|
|
title: str
|
|
deadline: str # ISO date string
|
|
days_remaining: int
|
|
type: str # "control_review", "evidence_expiry", "audit"
|
|
status: str # "on_track", "at_risk", "overdue"
|
|
owner: Optional[str] = None
|
|
|
|
|
|
class TeamWorkloadItem(BaseModel):
|
|
"""Workload distribution for a team or person."""
|
|
name: str
|
|
pending_tasks: int
|
|
in_progress_tasks: int
|
|
completed_tasks: int
|
|
total_tasks: int
|
|
completion_rate: float
|
|
|
|
|
|
class ExecutiveDashboardResponse(BaseModel):
|
|
"""
|
|
Executive Dashboard Response
|
|
|
|
Provides a high-level overview for managers and executives:
|
|
- Traffic light status (green/yellow/red)
|
|
- Overall compliance score
|
|
- 12-month trend data
|
|
- Top 5 risks
|
|
- Upcoming deadlines
|
|
- Team workload distribution
|
|
"""
|
|
traffic_light_status: str # "green", "yellow", "red"
|
|
overall_score: float
|
|
score_trend: List[TrendDataPoint]
|
|
previous_score: Optional[float] = None
|
|
score_change: Optional[float] = None # Positive = improvement
|
|
|
|
# Counts
|
|
total_regulations: int
|
|
total_requirements: int
|
|
total_controls: int
|
|
open_risks: int
|
|
|
|
# Top items
|
|
top_risks: List[RiskSummary]
|
|
upcoming_deadlines: List[DeadlineItem]
|
|
|
|
# Workload
|
|
team_workload: List[TeamWorkloadItem]
|
|
|
|
# Last updated
|
|
last_updated: str
|
|
|
|
|
|
class ComplianceSnapshotCreate(BaseModel):
|
|
"""Request to create a compliance snapshot."""
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class ComplianceSnapshotResponse(BaseModel):
|
|
"""Response for a compliance snapshot."""
|
|
id: str
|
|
snapshot_date: str
|
|
overall_score: float
|
|
scores_by_regulation: Dict[str, float]
|
|
scores_by_domain: Dict[str, float]
|
|
total_controls: int
|
|
passed_controls: int
|
|
failed_controls: int
|
|
notes: Optional[str] = None
|
|
created_at: str
|
|
|
|
|
|
# ============================================================================
|
|
# PDF Extraction Schemas
|
|
# ============================================================================
|
|
|
|
class BSIAspectResponse(BaseModel):
|
|
"""A single extracted BSI-TR Pruefaspekt (test aspect)."""
|
|
aspect_id: str
|
|
title: str
|
|
full_text: str
|
|
category: str
|
|
page_number: int
|
|
section: str
|
|
requirement_level: str
|
|
source_document: str
|
|
keywords: Optional[List[str]] = None
|
|
related_aspects: Optional[List[str]] = None
|
|
|
|
|
|
class PDFExtractionRequest(BaseModel):
|
|
"""Request for PDF extraction."""
|
|
document_code: str = Field(..., description="BSI-TR document code, e.g. BSI-TR-03161-2")
|
|
save_to_db: bool = Field(True, description="Whether to save extracted requirements to database")
|
|
force: bool = Field(False, description="Force re-extraction even if requirements exist")
|
|
|
|
|
|
class PDFExtractionResponse(BaseModel):
|
|
"""Response from PDF extraction endpoint."""
|
|
# Simple endpoint format (new /pdf/extract/{doc_code})
|
|
doc_code: Optional[str] = None
|
|
total_extracted: Optional[int] = None
|
|
saved_to_db: Optional[int] = None
|
|
aspects: Optional[List[BSIAspectResponse]] = None
|
|
# Legacy scraper endpoint format (/scraper/extract-pdf)
|
|
success: Optional[bool] = None
|
|
source_document: Optional[str] = None
|
|
total_aspects: Optional[int] = None
|
|
statistics: Optional[Dict[str, Any]] = None
|
|
requirements_created: Optional[int] = None
|
|
|
|
|
|
# ============================================================================
|
|
# Audit Session & Sign-off Schemas (Phase 3 - Sprint 3)
|
|
# ============================================================================
|
|
|
|
class AuditResult(str):
|
|
"""Audit result values for sign-off."""
|
|
COMPLIANT = "compliant"
|
|
COMPLIANT_WITH_NOTES = "compliant_notes"
|
|
NON_COMPLIANT = "non_compliant"
|
|
NOT_APPLICABLE = "not_applicable"
|
|
PENDING = "pending"
|
|
|
|
|
|
class AuditSessionStatus(str):
|
|
"""Audit session status values."""
|
|
DRAFT = "draft"
|
|
IN_PROGRESS = "in_progress"
|
|
COMPLETED = "completed"
|
|
ARCHIVED = "archived"
|
|
|
|
|
|
class CreateAuditSessionRequest(BaseModel):
|
|
"""Request to create a new audit session."""
|
|
name: str = Field(..., min_length=1, max_length=200)
|
|
description: Optional[str] = None
|
|
auditor_name: str = Field(..., min_length=1, max_length=100)
|
|
auditor_email: Optional[str] = None
|
|
auditor_organization: Optional[str] = None
|
|
regulation_codes: Optional[List[str]] = None # Filter by regulations
|
|
|
|
|
|
class UpdateAuditSessionRequest(BaseModel):
|
|
"""Request to update an audit session."""
|
|
name: Optional[str] = Field(None, min_length=1, max_length=200)
|
|
description: Optional[str] = None
|
|
status: Optional[str] = None
|
|
|
|
|
|
class AuditSessionSummary(BaseModel):
|
|
"""Summary of an audit session for list views."""
|
|
id: str
|
|
name: str
|
|
auditor_name: str
|
|
status: str
|
|
total_items: int
|
|
completed_items: int
|
|
completion_percentage: float
|
|
created_at: datetime
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AuditSessionResponse(AuditSessionSummary):
|
|
"""Full response for an audit session."""
|
|
description: Optional[str] = None
|
|
auditor_email: Optional[str] = None
|
|
auditor_organization: Optional[str] = None
|
|
regulation_ids: Optional[List[str]] = None
|
|
compliant_count: int = 0
|
|
non_compliant_count: int = 0
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AuditSessionListResponse(BaseModel):
|
|
"""List response for audit sessions."""
|
|
sessions: List[AuditSessionSummary]
|
|
total: int
|
|
|
|
|
|
class AuditSessionDetailResponse(AuditSessionResponse):
|
|
"""Detailed response including statistics breakdown."""
|
|
statistics: Optional["AuditStatistics"] = None
|
|
|
|
|
|
class SignOffRequest(BaseModel):
|
|
"""Request to sign off a single requirement."""
|
|
result: str = Field(..., description="Audit result: compliant, compliant_notes, non_compliant, not_applicable, pending")
|
|
notes: Optional[str] = None
|
|
sign: bool = Field(False, description="Whether to create digital signature")
|
|
|
|
|
|
class SignOffResponse(BaseModel):
|
|
"""Response for a sign-off operation."""
|
|
id: str
|
|
session_id: str
|
|
requirement_id: str
|
|
result: str
|
|
notes: Optional[str] = None
|
|
is_signed: bool
|
|
signature_hash: Optional[str] = None
|
|
signed_at: Optional[datetime] = None
|
|
signed_by: Optional[str] = None
|
|
created_at: datetime
|
|
updated_at: Optional[datetime] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AuditChecklistItem(BaseModel):
|
|
"""A single item in the audit checklist."""
|
|
requirement_id: str
|
|
regulation_code: str
|
|
article: str
|
|
paragraph: Optional[str] = None
|
|
title: str
|
|
description: Optional[str] = None
|
|
|
|
# Current audit state
|
|
current_result: str = "pending" # AuditResult
|
|
notes: Optional[str] = None
|
|
is_signed: bool = False
|
|
signed_at: Optional[datetime] = None
|
|
signed_by: Optional[str] = None
|
|
|
|
# Context info
|
|
evidence_count: int = 0
|
|
controls_mapped: int = 0
|
|
implementation_status: Optional[str] = None
|
|
|
|
# Priority
|
|
priority: int = 2
|
|
|
|
|
|
class AuditStatistics(BaseModel):
|
|
"""Statistics for an audit session."""
|
|
total: int
|
|
compliant: int
|
|
compliant_with_notes: int
|
|
non_compliant: int
|
|
not_applicable: int
|
|
pending: int
|
|
completion_percentage: float
|
|
|
|
|
|
class AuditChecklistResponse(BaseModel):
|
|
"""Response for audit checklist endpoint."""
|
|
session: AuditSessionSummary
|
|
items: List[AuditChecklistItem]
|
|
pagination: PaginationMeta
|
|
statistics: AuditStatistics
|
|
|
|
|
|
class AuditChecklistFilterRequest(BaseModel):
|
|
"""Filter options for audit checklist."""
|
|
regulation_code: Optional[str] = None
|
|
result_filter: Optional[str] = None # "pending", "compliant", "non_compliant", etc.
|
|
search: Optional[str] = None
|
|
signed_only: bool = False
|
|
|
|
|
|
# ============================================================================
|
|
# Report Generation Schemas (Phase 3 - Sprint 3)
|
|
# ============================================================================
|
|
|
|
class GenerateReportRequest(BaseModel):
|
|
"""Request to generate an audit report."""
|
|
session_id: str
|
|
report_type: str = "full" # "full", "summary", "non_compliant_only"
|
|
include_evidence: bool = True
|
|
include_signatures: bool = True
|
|
language: str = "de" # "de" or "en"
|
|
|
|
|
|
class ReportGenerationResponse(BaseModel):
|
|
"""Response for report generation."""
|
|
report_id: str
|
|
session_id: str
|
|
status: str # "pending", "generating", "completed", "failed"
|
|
report_type: str
|
|
file_path: Optional[str] = None
|
|
file_size_bytes: Optional[int] = None
|
|
created_at: datetime
|
|
completed_at: Optional[datetime] = None
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
class ReportDownloadResponse(BaseModel):
|
|
"""Response for report download."""
|
|
report_id: str
|
|
filename: str
|
|
mime_type: str
|
|
file_size_bytes: int
|
|
download_url: str
|
|
|
|
|
|
# ============================================================================
|
|
# ISO 27001 ISMS Schemas (Kapitel 4-10)
|
|
# ============================================================================
|
|
|
|
# --- Enums ---
|
|
|
|
class ApprovalStatus(str):
|
|
DRAFT = "draft"
|
|
UNDER_REVIEW = "under_review"
|
|
APPROVED = "approved"
|
|
SUPERSEDED = "superseded"
|
|
|
|
|
|
class FindingType(str):
|
|
MAJOR = "major"
|
|
MINOR = "minor"
|
|
OFI = "ofi"
|
|
POSITIVE = "positive"
|
|
|
|
|
|
class FindingStatus(str):
|
|
OPEN = "open"
|
|
IN_PROGRESS = "in_progress"
|
|
CAPA_PENDING = "capa_pending"
|
|
VERIFICATION_PENDING = "verification_pending"
|
|
VERIFIED = "verified"
|
|
CLOSED = "closed"
|
|
|
|
|
|
class CAPAType(str):
|
|
CORRECTIVE = "corrective"
|
|
PREVENTIVE = "preventive"
|
|
BOTH = "both"
|
|
|
|
|
|
# --- ISMS Scope (ISO 27001 4.3) ---
|
|
|
|
class ISMSScopeBase(BaseModel):
|
|
"""Base schema for ISMS Scope."""
|
|
scope_statement: str
|
|
included_locations: Optional[List[str]] = None
|
|
included_processes: Optional[List[str]] = None
|
|
included_services: Optional[List[str]] = None
|
|
excluded_items: Optional[List[str]] = None
|
|
exclusion_justification: Optional[str] = None
|
|
organizational_boundary: Optional[str] = None
|
|
physical_boundary: Optional[str] = None
|
|
technical_boundary: Optional[str] = None
|
|
|
|
|
|
class ISMSScopeCreate(ISMSScopeBase):
|
|
"""Schema for creating ISMS Scope."""
|
|
pass
|
|
|
|
|
|
class ISMSScopeUpdate(BaseModel):
|
|
"""Schema for updating ISMS Scope."""
|
|
scope_statement: Optional[str] = None
|
|
included_locations: Optional[List[str]] = None
|
|
included_processes: Optional[List[str]] = None
|
|
included_services: Optional[List[str]] = None
|
|
excluded_items: Optional[List[str]] = None
|
|
exclusion_justification: Optional[str] = None
|
|
organizational_boundary: Optional[str] = None
|
|
physical_boundary: Optional[str] = None
|
|
technical_boundary: Optional[str] = None
|
|
|
|
|
|
class ISMSScopeResponse(ISMSScopeBase):
|
|
"""Response schema for ISMS Scope."""
|
|
id: str
|
|
version: str
|
|
status: str
|
|
approved_by: Optional[str] = None
|
|
approved_at: Optional[datetime] = None
|
|
effective_date: Optional[date] = None
|
|
review_date: Optional[date] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ISMSScopeApproveRequest(BaseModel):
|
|
"""Request to approve ISMS Scope."""
|
|
approved_by: str
|
|
effective_date: date
|
|
review_date: date
|
|
|
|
|
|
# --- ISMS Context (ISO 27001 4.1, 4.2) ---
|
|
|
|
class ContextIssue(BaseModel):
|
|
"""Single context issue."""
|
|
issue: str
|
|
impact: str
|
|
treatment: Optional[str] = None
|
|
|
|
|
|
class InterestedParty(BaseModel):
|
|
"""Single interested party."""
|
|
party: str
|
|
requirements: List[str]
|
|
relevance: str
|
|
|
|
|
|
class ISMSContextBase(BaseModel):
|
|
"""Base schema for ISMS Context."""
|
|
internal_issues: Optional[List[ContextIssue]] = None
|
|
external_issues: Optional[List[ContextIssue]] = None
|
|
interested_parties: Optional[List[InterestedParty]] = None
|
|
regulatory_requirements: Optional[List[str]] = None
|
|
contractual_requirements: Optional[List[str]] = None
|
|
swot_strengths: Optional[List[str]] = None
|
|
swot_weaknesses: Optional[List[str]] = None
|
|
swot_opportunities: Optional[List[str]] = None
|
|
swot_threats: Optional[List[str]] = None
|
|
|
|
|
|
class ISMSContextCreate(ISMSContextBase):
|
|
"""Schema for creating ISMS Context."""
|
|
pass
|
|
|
|
|
|
class ISMSContextResponse(ISMSContextBase):
|
|
"""Response schema for ISMS Context."""
|
|
id: str
|
|
version: str
|
|
status: str
|
|
approved_by: Optional[str] = None
|
|
approved_at: Optional[datetime] = None
|
|
last_reviewed_at: Optional[datetime] = None
|
|
next_review_date: Optional[date] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
# --- ISMS Policies (ISO 27001 5.2) ---
|
|
|
|
class ISMSPolicyBase(BaseModel):
|
|
"""Base schema for ISMS Policy."""
|
|
policy_id: str
|
|
title: str
|
|
policy_type: str # "master", "operational", "technical"
|
|
description: Optional[str] = None
|
|
policy_text: str
|
|
applies_to: Optional[List[str]] = None
|
|
review_frequency_months: int = 12
|
|
related_controls: Optional[List[str]] = None
|
|
|
|
|
|
class ISMSPolicyCreate(ISMSPolicyBase):
|
|
"""Schema for creating ISMS Policy."""
|
|
authored_by: str
|
|
|
|
|
|
class ISMSPolicyUpdate(BaseModel):
|
|
"""Schema for updating ISMS Policy."""
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
policy_text: Optional[str] = None
|
|
applies_to: Optional[List[str]] = None
|
|
review_frequency_months: Optional[int] = None
|
|
related_controls: Optional[List[str]] = None
|
|
|
|
|
|
class ISMSPolicyResponse(ISMSPolicyBase):
|
|
"""Response schema for ISMS Policy."""
|
|
id: str
|
|
version: str
|
|
status: str
|
|
authored_by: Optional[str] = None
|
|
reviewed_by: Optional[str] = None
|
|
approved_by: Optional[str] = None
|
|
approved_at: Optional[datetime] = None
|
|
effective_date: Optional[date] = None
|
|
next_review_date: Optional[date] = None
|
|
document_path: Optional[str] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ISMSPolicyListResponse(BaseModel):
|
|
"""List response for ISMS Policies."""
|
|
policies: List[ISMSPolicyResponse]
|
|
total: int
|
|
|
|
|
|
class ISMSPolicyApproveRequest(BaseModel):
|
|
"""Request to approve ISMS Policy."""
|
|
reviewed_by: str
|
|
approved_by: str
|
|
effective_date: date
|
|
|
|
|
|
# --- Security Objectives (ISO 27001 6.2) ---
|
|
|
|
class SecurityObjectiveBase(BaseModel):
|
|
"""Base schema for Security Objective."""
|
|
objective_id: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
category: str # "availability", "confidentiality", "integrity", "compliance"
|
|
specific: Optional[str] = None
|
|
measurable: Optional[str] = None
|
|
achievable: Optional[str] = None
|
|
relevant: Optional[str] = None
|
|
time_bound: Optional[str] = None
|
|
kpi_name: Optional[str] = None
|
|
kpi_target: Optional[str] = None
|
|
kpi_unit: Optional[str] = None
|
|
measurement_frequency: Optional[str] = None
|
|
owner: Optional[str] = None
|
|
target_date: Optional[date] = None
|
|
related_controls: Optional[List[str]] = None
|
|
related_risks: Optional[List[str]] = None
|
|
|
|
|
|
class SecurityObjectiveCreate(SecurityObjectiveBase):
|
|
"""Schema for creating Security Objective."""
|
|
pass
|
|
|
|
|
|
class SecurityObjectiveUpdate(BaseModel):
|
|
"""Schema for updating Security Objective."""
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
kpi_current: Optional[str] = None
|
|
progress_percentage: Optional[int] = None
|
|
status: Optional[str] = None
|
|
|
|
|
|
class SecurityObjectiveResponse(SecurityObjectiveBase):
|
|
"""Response schema for Security Objective."""
|
|
id: str
|
|
kpi_current: Optional[str] = None
|
|
status: str
|
|
progress_percentage: int
|
|
achieved_date: Optional[date] = None
|
|
approved_by: Optional[str] = None
|
|
approved_at: Optional[datetime] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class SecurityObjectiveListResponse(BaseModel):
|
|
"""List response for Security Objectives."""
|
|
objectives: List[SecurityObjectiveResponse]
|
|
total: int
|
|
|
|
|
|
# --- Statement of Applicability (SoA) ---
|
|
|
|
class SoAEntryBase(BaseModel):
|
|
"""Base schema for SoA Entry."""
|
|
annex_a_control: str # e.g., "A.5.1"
|
|
annex_a_title: str
|
|
annex_a_category: Optional[str] = None
|
|
is_applicable: bool
|
|
applicability_justification: str
|
|
implementation_status: str = "planned"
|
|
implementation_notes: Optional[str] = None
|
|
breakpilot_control_ids: Optional[List[str]] = None
|
|
coverage_level: str = "full"
|
|
evidence_description: Optional[str] = None
|
|
risk_assessment_notes: Optional[str] = None
|
|
compensating_controls: Optional[str] = None
|
|
|
|
|
|
class SoAEntryCreate(SoAEntryBase):
|
|
"""Schema for creating SoA Entry."""
|
|
pass
|
|
|
|
|
|
class SoAEntryUpdate(BaseModel):
|
|
"""Schema for updating SoA Entry."""
|
|
is_applicable: Optional[bool] = None
|
|
applicability_justification: Optional[str] = None
|
|
implementation_status: Optional[str] = None
|
|
implementation_notes: Optional[str] = None
|
|
breakpilot_control_ids: Optional[List[str]] = None
|
|
coverage_level: Optional[str] = None
|
|
evidence_description: Optional[str] = None
|
|
|
|
|
|
class SoAEntryResponse(SoAEntryBase):
|
|
"""Response schema for SoA Entry."""
|
|
id: str
|
|
evidence_ids: Optional[List[str]] = None
|
|
reviewed_by: Optional[str] = None
|
|
reviewed_at: Optional[datetime] = None
|
|
approved_by: Optional[str] = None
|
|
approved_at: Optional[datetime] = None
|
|
version: str
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class SoAListResponse(BaseModel):
|
|
"""List response for SoA."""
|
|
entries: List[SoAEntryResponse]
|
|
total: int
|
|
applicable_count: int
|
|
not_applicable_count: int
|
|
implemented_count: int
|
|
planned_count: int
|
|
|
|
|
|
class SoAApproveRequest(BaseModel):
|
|
"""Request to approve SoA entry."""
|
|
reviewed_by: str
|
|
approved_by: str
|
|
|
|
|
|
# --- Audit Findings (Major/Minor/OFI) ---
|
|
|
|
class AuditFindingBase(BaseModel):
|
|
"""Base schema for Audit Finding."""
|
|
finding_type: str # "major", "minor", "ofi", "positive"
|
|
iso_chapter: Optional[str] = None
|
|
annex_a_control: Optional[str] = None
|
|
title: str
|
|
description: str
|
|
objective_evidence: str
|
|
impact_description: Optional[str] = None
|
|
affected_processes: Optional[List[str]] = None
|
|
affected_assets: Optional[List[str]] = None
|
|
owner: Optional[str] = None
|
|
due_date: Optional[date] = None
|
|
|
|
|
|
class AuditFindingCreate(AuditFindingBase):
|
|
"""Schema for creating Audit Finding."""
|
|
audit_session_id: Optional[str] = None
|
|
internal_audit_id: Optional[str] = None
|
|
auditor: str
|
|
|
|
|
|
class AuditFindingUpdate(BaseModel):
|
|
"""Schema for updating Audit Finding."""
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
root_cause: Optional[str] = None
|
|
root_cause_method: Optional[str] = None
|
|
owner: Optional[str] = None
|
|
due_date: Optional[date] = None
|
|
status: Optional[str] = None
|
|
|
|
|
|
class AuditFindingResponse(AuditFindingBase):
|
|
"""Response schema for Audit Finding."""
|
|
id: str
|
|
finding_id: str
|
|
audit_session_id: Optional[str] = None
|
|
internal_audit_id: Optional[str] = None
|
|
root_cause: Optional[str] = None
|
|
root_cause_method: Optional[str] = None
|
|
status: str
|
|
auditor: Optional[str] = None
|
|
identified_date: date
|
|
closed_date: Optional[date] = None
|
|
verification_method: Optional[str] = None
|
|
verified_by: Optional[str] = None
|
|
verified_at: Optional[datetime] = None
|
|
closure_notes: Optional[str] = None
|
|
closed_by: Optional[str] = None
|
|
is_blocking: bool
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AuditFindingListResponse(BaseModel):
|
|
"""List response for Audit Findings."""
|
|
findings: List[AuditFindingResponse]
|
|
total: int
|
|
major_count: int
|
|
minor_count: int
|
|
ofi_count: int
|
|
open_count: int
|
|
|
|
|
|
class AuditFindingCloseRequest(BaseModel):
|
|
"""Request to close an Audit Finding."""
|
|
closure_notes: str
|
|
closed_by: str
|
|
verification_method: str
|
|
verification_evidence: str
|
|
|
|
|
|
# --- Corrective Actions (CAPA) ---
|
|
|
|
class CorrectiveActionBase(BaseModel):
|
|
"""Base schema for Corrective Action."""
|
|
capa_type: str # "corrective", "preventive", "both"
|
|
title: str
|
|
description: str
|
|
expected_outcome: Optional[str] = None
|
|
assigned_to: str
|
|
planned_completion: date
|
|
effectiveness_criteria: Optional[str] = None
|
|
estimated_effort_hours: Optional[int] = None
|
|
resources_required: Optional[str] = None
|
|
|
|
|
|
class CorrectiveActionCreate(CorrectiveActionBase):
|
|
"""Schema for creating Corrective Action."""
|
|
finding_id: str
|
|
planned_start: Optional[date] = None
|
|
|
|
|
|
class CorrectiveActionUpdate(BaseModel):
|
|
"""Schema for updating Corrective Action."""
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
assigned_to: Optional[str] = None
|
|
planned_completion: Optional[date] = None
|
|
status: Optional[str] = None
|
|
progress_percentage: Optional[int] = None
|
|
implementation_evidence: Optional[str] = None
|
|
|
|
|
|
class CorrectiveActionResponse(CorrectiveActionBase):
|
|
"""Response schema for Corrective Action."""
|
|
id: str
|
|
capa_id: str
|
|
finding_id: str
|
|
planned_start: Optional[date] = None
|
|
actual_completion: Optional[date] = None
|
|
status: str
|
|
progress_percentage: int
|
|
approved_by: Optional[str] = None
|
|
actual_effort_hours: Optional[int] = None
|
|
implementation_evidence: Optional[str] = None
|
|
evidence_ids: Optional[List[str]] = None
|
|
effectiveness_verified: bool
|
|
effectiveness_verification_date: Optional[date] = None
|
|
effectiveness_notes: Optional[str] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class CorrectiveActionListResponse(BaseModel):
|
|
"""List response for Corrective Actions."""
|
|
actions: List[CorrectiveActionResponse]
|
|
total: int
|
|
|
|
|
|
class CAPAVerifyRequest(BaseModel):
|
|
"""Request to verify CAPA effectiveness."""
|
|
verified_by: str
|
|
effectiveness_notes: str
|
|
is_effective: bool
|
|
|
|
|
|
# --- Management Review (ISO 27001 9.3) ---
|
|
|
|
class ReviewAttendee(BaseModel):
|
|
"""Single attendee in management review."""
|
|
name: str
|
|
role: str
|
|
|
|
|
|
class ReviewActionItem(BaseModel):
|
|
"""Single action item from management review."""
|
|
action: str
|
|
owner: str
|
|
due_date: date
|
|
|
|
|
|
class ManagementReviewBase(BaseModel):
|
|
"""Base schema for Management Review."""
|
|
title: str
|
|
review_date: date
|
|
review_period_start: Optional[date] = None
|
|
review_period_end: Optional[date] = None
|
|
chairperson: str
|
|
attendees: Optional[List[ReviewAttendee]] = None
|
|
|
|
|
|
class ManagementReviewCreate(ManagementReviewBase):
|
|
"""Schema for creating Management Review."""
|
|
pass
|
|
|
|
|
|
class ManagementReviewUpdate(BaseModel):
|
|
"""Schema for updating Management Review."""
|
|
# Inputs (9.3)
|
|
input_previous_actions: Optional[str] = None
|
|
input_isms_changes: Optional[str] = None
|
|
input_security_performance: Optional[str] = None
|
|
input_interested_party_feedback: Optional[str] = None
|
|
input_risk_assessment_results: Optional[str] = None
|
|
input_improvement_opportunities: Optional[str] = None
|
|
input_policy_effectiveness: Optional[str] = None
|
|
input_objective_achievement: Optional[str] = None
|
|
input_resource_adequacy: Optional[str] = None
|
|
# Outputs (9.3)
|
|
output_improvement_decisions: Optional[str] = None
|
|
output_isms_changes: Optional[str] = None
|
|
output_resource_needs: Optional[str] = None
|
|
# Action items
|
|
action_items: Optional[List[ReviewActionItem]] = None
|
|
# Assessment
|
|
isms_effectiveness_rating: Optional[str] = None
|
|
key_decisions: Optional[str] = None
|
|
status: Optional[str] = None
|
|
|
|
|
|
class ManagementReviewResponse(ManagementReviewBase):
|
|
"""Response schema for Management Review."""
|
|
id: str
|
|
review_id: str
|
|
input_previous_actions: Optional[str] = None
|
|
input_isms_changes: Optional[str] = None
|
|
input_security_performance: Optional[str] = None
|
|
input_interested_party_feedback: Optional[str] = None
|
|
input_risk_assessment_results: Optional[str] = None
|
|
input_improvement_opportunities: Optional[str] = None
|
|
input_policy_effectiveness: Optional[str] = None
|
|
input_objective_achievement: Optional[str] = None
|
|
input_resource_adequacy: Optional[str] = None
|
|
output_improvement_decisions: Optional[str] = None
|
|
output_isms_changes: Optional[str] = None
|
|
output_resource_needs: Optional[str] = None
|
|
action_items: Optional[List[ReviewActionItem]] = None
|
|
isms_effectiveness_rating: Optional[str] = None
|
|
key_decisions: Optional[str] = None
|
|
status: str
|
|
approved_by: Optional[str] = None
|
|
approved_at: Optional[datetime] = None
|
|
minutes_document_path: Optional[str] = None
|
|
next_review_date: Optional[date] = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ManagementReviewListResponse(BaseModel):
|
|
"""List response for Management Reviews."""
|
|
reviews: List[ManagementReviewResponse]
|
|
total: int
|
|
|
|
|
|
class ManagementReviewApproveRequest(BaseModel):
|
|
"""Request to approve Management Review."""
|
|
approved_by: str
|
|
next_review_date: date
|
|
minutes_document_path: Optional[str] = None
|
|
|
|
|
|
# --- Internal Audit (ISO 27001 9.2) ---
|
|
|
|
class InternalAuditBase(BaseModel):
|
|
"""Base schema for Internal Audit."""
|
|
title: str
|
|
audit_type: str # "scheduled", "surveillance", "special"
|
|
scope_description: str
|
|
iso_chapters_covered: Optional[List[str]] = None
|
|
annex_a_controls_covered: Optional[List[str]] = None
|
|
processes_covered: Optional[List[str]] = None
|
|
departments_covered: Optional[List[str]] = None
|
|
criteria: Optional[str] = None
|
|
planned_date: date
|
|
lead_auditor: str
|
|
audit_team: Optional[List[str]] = None
|
|
|
|
|
|
class InternalAuditCreate(InternalAuditBase):
|
|
"""Schema for creating Internal Audit."""
|
|
pass
|
|
|
|
|
|
class InternalAuditUpdate(BaseModel):
|
|
"""Schema for updating Internal Audit."""
|
|
title: Optional[str] = None
|
|
scope_description: Optional[str] = None
|
|
actual_start_date: Optional[date] = None
|
|
actual_end_date: Optional[date] = None
|
|
auditee_representatives: Optional[List[str]] = None
|
|
status: Optional[str] = None
|
|
audit_conclusion: Optional[str] = None
|
|
overall_assessment: Optional[str] = None
|
|
|
|
|
|
class InternalAuditResponse(InternalAuditBase):
|
|
"""Response schema for Internal Audit."""
|
|
id: str
|
|
audit_id: str
|
|
actual_start_date: Optional[date] = None
|
|
actual_end_date: Optional[date] = None
|
|
auditee_representatives: Optional[List[str]] = None
|
|
status: str
|
|
total_findings: int
|
|
major_findings: int
|
|
minor_findings: int
|
|
ofi_count: int
|
|
positive_observations: int
|
|
audit_conclusion: Optional[str] = None
|
|
overall_assessment: Optional[str] = None
|
|
report_date: Optional[date] = None
|
|
report_document_path: Optional[str] = None
|
|
report_approved_by: Optional[str] = None
|
|
report_approved_at: Optional[datetime] = None
|
|
follow_up_audit_required: bool
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class InternalAuditListResponse(BaseModel):
|
|
"""List response for Internal Audits."""
|
|
audits: List[InternalAuditResponse]
|
|
total: int
|
|
|
|
|
|
class InternalAuditCompleteRequest(BaseModel):
|
|
"""Request to complete Internal Audit."""
|
|
audit_conclusion: str
|
|
overall_assessment: str # "conforming", "minor_nc", "major_nc"
|
|
follow_up_audit_required: bool
|
|
|
|
|
|
# --- ISMS Readiness Check ---
|
|
|
|
class PotentialFinding(BaseModel):
|
|
"""Potential finding from readiness check."""
|
|
check: str
|
|
status: str # "pass", "fail", "warning"
|
|
recommendation: str
|
|
iso_reference: Optional[str] = None
|
|
|
|
|
|
class ISMSReadinessCheckResponse(BaseModel):
|
|
"""Response for ISMS Readiness Check."""
|
|
id: str
|
|
check_date: datetime
|
|
triggered_by: Optional[str] = None
|
|
overall_status: str # "ready", "at_risk", "not_ready"
|
|
certification_possible: bool
|
|
# Chapter statuses
|
|
chapter_4_status: Optional[str] = None
|
|
chapter_5_status: Optional[str] = None
|
|
chapter_6_status: Optional[str] = None
|
|
chapter_7_status: Optional[str] = None
|
|
chapter_8_status: Optional[str] = None
|
|
chapter_9_status: Optional[str] = None
|
|
chapter_10_status: Optional[str] = None
|
|
# Findings
|
|
potential_majors: List[PotentialFinding]
|
|
potential_minors: List[PotentialFinding]
|
|
improvement_opportunities: List[PotentialFinding]
|
|
# Scores
|
|
readiness_score: float
|
|
documentation_score: Optional[float] = None
|
|
implementation_score: Optional[float] = None
|
|
evidence_score: Optional[float] = None
|
|
# Priority actions
|
|
priority_actions: List[str]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class ISMSReadinessCheckRequest(BaseModel):
|
|
"""Request to run ISMS Readiness Check."""
|
|
triggered_by: str = "manual"
|
|
|
|
|
|
# --- Audit Trail ---
|
|
|
|
class AuditTrailEntry(BaseModel):
|
|
"""Single audit trail entry."""
|
|
id: str
|
|
entity_type: str
|
|
entity_id: str
|
|
entity_name: Optional[str] = None
|
|
action: str
|
|
field_changed: Optional[str] = None
|
|
old_value: Optional[str] = None
|
|
new_value: Optional[str] = None
|
|
change_summary: Optional[str] = None
|
|
performed_by: str
|
|
performed_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class AuditTrailResponse(BaseModel):
|
|
"""Response for Audit Trail query."""
|
|
entries: List[AuditTrailEntry]
|
|
total: int
|
|
pagination: PaginationMeta
|
|
|
|
|
|
# --- ISO 27001 Chapter Status Overview ---
|
|
|
|
class ISO27001ChapterStatus(BaseModel):
|
|
"""Status of a single ISO 27001 chapter."""
|
|
chapter: str
|
|
title: str
|
|
status: str # "compliant", "partial", "non_compliant", "not_started"
|
|
completion_percentage: float
|
|
open_findings: int
|
|
key_documents: List[str]
|
|
last_reviewed: Optional[datetime] = None
|
|
|
|
|
|
class ISO27001OverviewResponse(BaseModel):
|
|
"""Complete ISO 27001 status overview."""
|
|
overall_status: str # "ready", "at_risk", "not_ready"
|
|
certification_readiness: float # 0-100
|
|
chapters: List[ISO27001ChapterStatus]
|
|
scope_approved: bool
|
|
soa_approved: bool
|
|
last_management_review: Optional[datetime] = None
|
|
last_internal_audit: Optional[datetime] = None
|
|
open_major_findings: int
|
|
open_minor_findings: int
|
|
policies_count: int
|
|
policies_approved: int
|
|
objectives_count: int
|
|
objectives_achieved: int
|
|
|
|
|
|
# ============================================================================
|
|
# VVT Schemas — Verzeichnis von Verarbeitungstaetigkeiten (Art. 30 DSGVO)
|
|
# ============================================================================
|
|
|
|
class VVTOrganizationUpdate(BaseModel):
|
|
organization_name: Optional[str] = None
|
|
industry: Optional[str] = None
|
|
locations: Optional[List[str]] = None
|
|
employee_count: Optional[int] = None
|
|
dpo_name: Optional[str] = None
|
|
dpo_contact: Optional[str] = None
|
|
vvt_version: Optional[str] = None
|
|
last_review_date: Optional[date] = None
|
|
next_review_date: Optional[date] = None
|
|
review_interval: Optional[str] = None
|
|
|
|
|
|
class VVTOrganizationResponse(BaseModel):
|
|
id: str
|
|
organization_name: str
|
|
industry: Optional[str] = None
|
|
locations: List[Any] = []
|
|
employee_count: Optional[int] = None
|
|
dpo_name: Optional[str] = None
|
|
dpo_contact: Optional[str] = None
|
|
vvt_version: str = '1.0'
|
|
last_review_date: Optional[date] = None
|
|
next_review_date: Optional[date] = None
|
|
review_interval: str = 'annual'
|
|
created_at: datetime
|
|
updated_at: Optional[datetime] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class VVTActivityCreate(BaseModel):
|
|
vvt_id: str
|
|
name: str
|
|
description: Optional[str] = None
|
|
purposes: List[str] = []
|
|
legal_bases: List[str] = []
|
|
data_subject_categories: List[str] = []
|
|
personal_data_categories: List[str] = []
|
|
recipient_categories: List[str] = []
|
|
third_country_transfers: List[Any] = []
|
|
retention_period: Dict[str, Any] = {}
|
|
tom_description: Optional[str] = None
|
|
business_function: Optional[str] = None
|
|
systems: List[str] = []
|
|
deployment_model: Optional[str] = None
|
|
data_sources: List[Any] = []
|
|
data_flows: List[Any] = []
|
|
protection_level: str = 'MEDIUM'
|
|
dpia_required: bool = False
|
|
structured_toms: Dict[str, Any] = {}
|
|
status: str = 'DRAFT'
|
|
responsible: Optional[str] = None
|
|
owner: Optional[str] = None
|
|
|
|
|
|
class VVTActivityUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
purposes: Optional[List[str]] = None
|
|
legal_bases: Optional[List[str]] = None
|
|
data_subject_categories: Optional[List[str]] = None
|
|
personal_data_categories: Optional[List[str]] = None
|
|
recipient_categories: Optional[List[str]] = None
|
|
third_country_transfers: Optional[List[Any]] = None
|
|
retention_period: Optional[Dict[str, Any]] = None
|
|
tom_description: Optional[str] = None
|
|
business_function: Optional[str] = None
|
|
systems: Optional[List[str]] = None
|
|
deployment_model: Optional[str] = None
|
|
data_sources: Optional[List[Any]] = None
|
|
data_flows: Optional[List[Any]] = None
|
|
protection_level: Optional[str] = None
|
|
dpia_required: Optional[bool] = None
|
|
structured_toms: Optional[Dict[str, Any]] = None
|
|
status: Optional[str] = None
|
|
responsible: Optional[str] = None
|
|
owner: Optional[str] = None
|
|
|
|
|
|
class VVTActivityResponse(BaseModel):
|
|
id: str
|
|
vvt_id: str
|
|
name: str
|
|
description: Optional[str] = None
|
|
purposes: List[Any] = []
|
|
legal_bases: List[Any] = []
|
|
data_subject_categories: List[Any] = []
|
|
personal_data_categories: List[Any] = []
|
|
recipient_categories: List[Any] = []
|
|
third_country_transfers: List[Any] = []
|
|
retention_period: Dict[str, Any] = {}
|
|
tom_description: Optional[str] = None
|
|
business_function: Optional[str] = None
|
|
systems: List[Any] = []
|
|
deployment_model: Optional[str] = None
|
|
data_sources: List[Any] = []
|
|
data_flows: List[Any] = []
|
|
protection_level: str = 'MEDIUM'
|
|
dpia_required: bool = False
|
|
structured_toms: Dict[str, Any] = {}
|
|
status: str = 'DRAFT'
|
|
responsible: Optional[str] = None
|
|
owner: Optional[str] = None
|
|
created_at: datetime
|
|
updated_at: Optional[datetime] = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class VVTStatsResponse(BaseModel):
|
|
total: int
|
|
by_status: Dict[str, int]
|
|
by_business_function: Dict[str, int]
|
|
dpia_required_count: int
|
|
third_country_count: int
|
|
draft_count: int
|
|
approved_count: int
|
|
|
|
|
|
class VVTAuditLogEntry(BaseModel):
|
|
id: str
|
|
action: str
|
|
entity_type: str
|
|
entity_id: Optional[str] = None
|
|
changed_by: Optional[str] = None
|
|
old_values: Optional[Dict[str, Any]] = None
|
|
new_values: Optional[Dict[str, Any]] = None
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|