Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
102 lines
3.1 KiB
Python
102 lines
3.1 KiB
Python
"""
|
|
Generation Job Model
|
|
Tracking für Content-Generierungs-Jobs
|
|
"""
|
|
|
|
from enum import Enum
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any
|
|
import uuid
|
|
|
|
|
|
class JobStatus(str, Enum):
|
|
"""Job Status"""
|
|
PENDING = "pending"
|
|
PROCESSING = "processing"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
class GenerationJob:
|
|
"""Content-Generierungs-Job"""
|
|
|
|
def __init__(
|
|
self,
|
|
topic: str,
|
|
description: Optional[str] = None,
|
|
target_grade: Optional[str] = None,
|
|
material_count: int = 0
|
|
):
|
|
self.job_id = str(uuid.uuid4())
|
|
self.topic = topic
|
|
self.description = description
|
|
self.target_grade = target_grade
|
|
self.material_count = material_count
|
|
|
|
self.status = JobStatus.PENDING
|
|
self.progress = 0
|
|
self.message = "Job created"
|
|
self.result: Optional[Dict[str, Any]] = None
|
|
self.error: Optional[str] = None
|
|
|
|
self.created_at = datetime.utcnow()
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def update_progress(self, progress: int, message: str):
|
|
"""Update Job Progress"""
|
|
self.progress = progress
|
|
self.message = message
|
|
self.status = JobStatus.PROCESSING
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def complete(self, result: Dict[str, Any]):
|
|
"""Mark Job as Completed"""
|
|
self.status = JobStatus.COMPLETED
|
|
self.progress = 100
|
|
self.message = "Content generation completed"
|
|
self.result = result
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def fail(self, error: str):
|
|
"""Mark Job as Failed"""
|
|
self.status = JobStatus.FAILED
|
|
self.message = "Content generation failed"
|
|
self.error = error
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dict"""
|
|
return {
|
|
"job_id": self.job_id,
|
|
"topic": self.topic,
|
|
"description": self.description,
|
|
"target_grade": self.target_grade,
|
|
"material_count": self.material_count,
|
|
"status": self.status.value,
|
|
"progress": self.progress,
|
|
"message": self.message,
|
|
"result": self.result,
|
|
"error": self.error,
|
|
"created_at": self.created_at.isoformat(),
|
|
"updated_at": self.updated_at.isoformat()
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "GenerationJob":
|
|
"""Create from dict"""
|
|
job = cls(
|
|
topic=data["topic"],
|
|
description=data.get("description"),
|
|
target_grade=data.get("target_grade"),
|
|
material_count=data.get("material_count", 0)
|
|
)
|
|
job.job_id = data["job_id"]
|
|
job.status = JobStatus(data["status"])
|
|
job.progress = data["progress"]
|
|
job.message = data["message"]
|
|
job.result = data.get("result")
|
|
job.error = data.get("error")
|
|
job.created_at = datetime.fromisoformat(data["created_at"])
|
|
job.updated_at = datetime.fromisoformat(data["updated_at"])
|
|
return job
|