This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

102 lines
3.1 KiB
Python

"""
Generation Job Model
Tracking für Content-Generierungs-Jobs
"""
from enum import Enum
from datetime import datetime
from typing import Optional, Dict, Any
import uuid
class JobStatus(str, Enum):
"""Job Status"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class GenerationJob:
"""Content-Generierungs-Job"""
def __init__(
self,
topic: str,
description: Optional[str] = None,
target_grade: Optional[str] = None,
material_count: int = 0
):
self.job_id = str(uuid.uuid4())
self.topic = topic
self.description = description
self.target_grade = target_grade
self.material_count = material_count
self.status = JobStatus.PENDING
self.progress = 0
self.message = "Job created"
self.result: Optional[Dict[str, Any]] = None
self.error: Optional[str] = None
self.created_at = datetime.utcnow()
self.updated_at = datetime.utcnow()
def update_progress(self, progress: int, message: str):
"""Update Job Progress"""
self.progress = progress
self.message = message
self.status = JobStatus.PROCESSING
self.updated_at = datetime.utcnow()
def complete(self, result: Dict[str, Any]):
"""Mark Job as Completed"""
self.status = JobStatus.COMPLETED
self.progress = 100
self.message = "Content generation completed"
self.result = result
self.updated_at = datetime.utcnow()
def fail(self, error: str):
"""Mark Job as Failed"""
self.status = JobStatus.FAILED
self.message = "Content generation failed"
self.error = error
self.updated_at = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict"""
return {
"job_id": self.job_id,
"topic": self.topic,
"description": self.description,
"target_grade": self.target_grade,
"material_count": self.material_count,
"status": self.status.value,
"progress": self.progress,
"message": self.message,
"result": self.result,
"error": self.error,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "GenerationJob":
"""Create from dict"""
job = cls(
topic=data["topic"],
description=data.get("description"),
target_grade=data.get("target_grade"),
material_count=data.get("material_count", 0)
)
job.job_id = data["job_id"]
job.status = JobStatus(data["status"])
job.progress = data["progress"]
job.message = data["message"]
job.result = data.get("result")
job.error = data.get("error")
job.created_at = datetime.fromisoformat(data["created_at"])
job.updated_at = datetime.fromisoformat(data["updated_at"])
return job