This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/klausur-service/backend/pipeline_checkpoints.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

277 lines
9.6 KiB
Python

"""
Pipeline Checkpoint System for Compliance Pipeline.
Provides checkpoint tracking, validation, and persistence for the compliance pipeline.
Checkpoints are saved to /tmp/pipeline_checkpoints.json and can be queried via API.
"""
import json
import os
from datetime import datetime
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Any
from enum import Enum
import logging
logger = logging.getLogger(__name__)
CHECKPOINT_FILE = "/tmp/pipeline_checkpoints.json"
class CheckpointStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class ValidationStatus(str, Enum):
PASSED = "passed"
WARNING = "warning"
FAILED = "failed"
NOT_RUN = "not_run"
@dataclass
class ValidationResult:
"""Result of a validation check."""
name: str
status: ValidationStatus
expected: Any
actual: Any
message: str
@dataclass
class PipelineCheckpoint:
"""A checkpoint in the pipeline."""
phase: str
name: str
status: CheckpointStatus
started_at: Optional[str] = None
completed_at: Optional[str] = None
duration_seconds: Optional[float] = None
metrics: Dict[str, Any] = field(default_factory=dict)
validations: List[Dict] = field(default_factory=list)
error: Optional[str] = None
@dataclass
class PipelineState:
"""Overall pipeline state."""
pipeline_id: str
status: str
started_at: str
completed_at: Optional[str] = None
current_phase: Optional[str] = None
checkpoints: List[PipelineCheckpoint] = field(default_factory=list)
summary: Dict[str, Any] = field(default_factory=dict)
class CheckpointManager:
"""Manages pipeline checkpoints and validations."""
def __init__(self, pipeline_id: Optional[str] = None):
self.pipeline_id = pipeline_id or datetime.now().strftime("%Y%m%d_%H%M%S")
self.state = PipelineState(
pipeline_id=self.pipeline_id,
status="initializing",
started_at=datetime.now().isoformat(),
checkpoints=[]
)
self._current_checkpoint: Optional[PipelineCheckpoint] = None
self._checkpoint_start_time: Optional[float] = None
def start_checkpoint(self, phase: str, name: str) -> None:
"""Start a new checkpoint."""
import time
self._checkpoint_start_time = time.time()
self._current_checkpoint = PipelineCheckpoint(
phase=phase,
name=name,
status=CheckpointStatus.RUNNING,
started_at=datetime.now().isoformat(),
metrics={},
validations=[]
)
self.state.current_phase = phase
self.state.status = "running"
logger.info(f"[CHECKPOINT] Started: {phase} - {name}")
self._save()
def add_metric(self, name: str, value: Any) -> None:
"""Add a metric to the current checkpoint."""
if self._current_checkpoint:
self._current_checkpoint.metrics[name] = value
self._save()
def validate(self, name: str, expected: Any, actual: Any,
tolerance: float = 0.0, min_value: Optional[Any] = None) -> ValidationResult:
"""
Validate a metric against expected value.
Args:
name: Validation name
expected: Expected value (can be exact or minimum)
actual: Actual value
tolerance: Percentage tolerance for numeric values (0.0 = exact match)
min_value: If set, validates that actual >= min_value
"""
status = ValidationStatus.PASSED
message = "OK"
if min_value is not None:
# Minimum value check
if actual < min_value:
status = ValidationStatus.FAILED
message = f"Below minimum: {actual} < {min_value}"
elif actual < expected:
status = ValidationStatus.WARNING
message = f"Below expected but above minimum: {min_value} <= {actual} < {expected}"
else:
message = f"OK: {actual} >= {expected}"
elif isinstance(expected, (int, float)) and isinstance(actual, (int, float)):
# Numeric comparison with tolerance
if tolerance > 0:
lower = expected * (1 - tolerance)
upper = expected * (1 + tolerance)
if actual < lower:
status = ValidationStatus.FAILED
message = f"Below tolerance: {actual} < {lower:.0f} (expected {expected} +/- {tolerance*100:.0f}%)"
elif actual > upper:
status = ValidationStatus.WARNING
message = f"Above expected: {actual} > {upper:.0f}"
else:
message = f"Within tolerance: {lower:.0f} <= {actual} <= {upper:.0f}"
else:
if actual < expected:
status = ValidationStatus.FAILED
message = f"Below expected: {actual} < {expected}"
elif actual > expected:
status = ValidationStatus.WARNING
message = f"Above expected: {actual} > {expected}"
else:
# Exact match
if actual != expected:
status = ValidationStatus.FAILED
message = f"Mismatch: {actual} != {expected}"
result = ValidationResult(
name=name,
status=status,
expected=expected,
actual=actual,
message=message
)
if self._current_checkpoint:
self._current_checkpoint.validations.append(asdict(result))
self._save()
log_level = logging.INFO if status == ValidationStatus.PASSED else (
logging.WARNING if status == ValidationStatus.WARNING else logging.ERROR
)
logger.log(log_level, f"[VALIDATION] {name}: {message}")
return result
def complete_checkpoint(self, success: bool = True, error: Optional[str] = None) -> None:
"""Complete the current checkpoint."""
import time
if self._current_checkpoint:
self._current_checkpoint.status = CheckpointStatus.COMPLETED if success else CheckpointStatus.FAILED
self._current_checkpoint.completed_at = datetime.now().isoformat()
if self._checkpoint_start_time:
self._current_checkpoint.duration_seconds = time.time() - self._checkpoint_start_time
if error:
self._current_checkpoint.error = error
self.state.checkpoints.append(self._current_checkpoint)
status_icon = "" if success else ""
logger.info(f"[CHECKPOINT] {status_icon} Completed: {self._current_checkpoint.phase} - {self._current_checkpoint.name}")
self._current_checkpoint = None
self._checkpoint_start_time = None
self._save()
def fail_checkpoint(self, error: str) -> None:
"""Mark current checkpoint as failed."""
self.complete_checkpoint(success=False, error=error)
self.state.status = "failed"
self._save()
def complete_pipeline(self, summary: Dict[str, Any]) -> None:
"""Mark pipeline as complete."""
self.state.status = "completed"
self.state.completed_at = datetime.now().isoformat()
self.state.current_phase = None
self.state.summary = summary
self._save()
logger.info("[PIPELINE] Complete!")
def get_state(self) -> Dict[str, Any]:
"""Get current pipeline state as dict."""
return asdict(self.state)
def _save(self) -> None:
"""Save state to file."""
try:
with open(CHECKPOINT_FILE, "w") as f:
json.dump(asdict(self.state), f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to save checkpoint state: {e}")
@staticmethod
def load_state() -> Optional[Dict[str, Any]]:
"""Load state from file."""
try:
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, "r") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load checkpoint state: {e}")
return None
# Expected values for validation (can be adjusted based on experience)
EXPECTED_VALUES = {
"ingestion": {
"total_chunks": 11000, # Expected minimum chunks (24 regulations)
"min_chunks": 10000, # Absolute minimum acceptable
"regulations": {
"GDPR": {"min": 600, "expected": 700},
"AIACT": {"min": 1000, "expected": 1200},
"CRA": {"min": 600, "expected": 700},
"NIS2": {"min": 400, "expected": 530},
"TDDDG": {"min": 150, "expected": 187},
"BSI-TR-03161-1": {"min": 180, "expected": 227},
"BSI-TR-03161-2": {"min": 170, "expected": 214},
"BSI-TR-03161-3": {"min": 160, "expected": 199},
"DORA": {"min": 300, "expected": 500},
"PSD2": {"min": 400, "expected": 600},
"AMLR": {"min": 350, "expected": 550},
"EHDS": {"min": 400, "expected": 600},
"MiCA": {"min": 500, "expected": 800},
}
},
"extraction": {
"total_checkpoints": 4500,
"min_checkpoints": 4000,
"checkpoints_per_regulation": {
"GDPR": {"min": 150, "expected": 250},
"AIACT": {"min": 200, "expected": 350},
"DORA": {"min": 100, "expected": 180},
}
},
"controls": {
"total_controls": 1100,
"min_controls": 1000,
},
"measures": {
"total_measures": 1100,
"min_measures": 1000,
}
}