""" Digest Generator fuer Wochenzusammenfassungen. Generiert LLM-basierte Zusammenfassungen der wichtigsten Alerts: - Gruppierung nach Wichtigkeit (Kritisch, Dringend, Wichtig, etc.) - Kurze Zusammenfassung pro Kategorie - HTML-Ausgabe fuer E-Mail und UI - PDF-Export Verwendung: generator = DigestGenerator(db_session, llm_client) digest = await generator.generate_weekly_digest(user_id) """ import uuid from typing import List, Dict, Any, Optional from datetime import datetime, timedelta from dataclasses import dataclass import json import os from ..db.models import ( AlertItemDB, AlertDigestDB, UserAlertSubscriptionDB, ImportanceLevelEnum, DigestStatusEnum ) @dataclass class DigestSection: """Eine Sektion im Digest (z.B. Kritisch, Dringend).""" importance_level: ImportanceLevelEnum label_de: str color: str items: List[AlertItemDB] summary: str = "" @dataclass class DigestContent: """Vollstaendiger Digest-Inhalt.""" user_id: str period_start: datetime period_end: datetime sections: List[DigestSection] total_alerts: int critical_count: int urgent_count: int introduction: str = "" html: str = "" class DigestGenerator: """ Generiert Wochenzusammenfassungen fuer Alerts. Unterstuetzt: - Lokale Ollama-Modelle - OpenAI API - Anthropic API """ def __init__( self, db_session, llm_provider: str = "ollama", llm_model: str = "llama3.2:3b" ): """ Initialisiere den Digest Generator. Args: db_session: SQLAlchemy Session llm_provider: "ollama", "openai", oder "anthropic" llm_model: Modellname """ self.db = db_session self.llm_provider = llm_provider self.llm_model = llm_model async def generate_weekly_digest( self, user_id: str, weeks_back: int = 1 ) -> Optional[AlertDigestDB]: """ Generiere einen Wochendigest fuer einen User. Args: user_id: User-ID weeks_back: Wie viele Wochen zurueck (default: letzte Woche) Returns: AlertDigestDB oder None bei Fehler """ # Zeitraum berechnen now = datetime.utcnow() period_end = now - timedelta(days=now.weekday()) # Montag dieser Woche period_start = period_end - timedelta(weeks=weeks_back) # Alerts laden alerts = self._load_alerts_for_period(user_id, period_start, period_end) if not alerts: return None # Nach Wichtigkeit gruppieren sections = self._group_by_importance(alerts) # Digest-Content erstellen content = DigestContent( user_id=user_id, period_start=period_start, period_end=period_end, sections=sections, total_alerts=len(alerts), critical_count=len([a for a in alerts if a.importance_level == ImportanceLevelEnum.KRITISCH]), urgent_count=len([a for a in alerts if a.importance_level == ImportanceLevelEnum.DRINGEND]) ) # LLM-Zusammenfassungen generieren await self._generate_summaries(content) # HTML generieren content.html = self._generate_html(content) # In DB speichern digest = self._save_digest(content) return digest def _load_alerts_for_period( self, user_id: str, start: datetime, end: datetime ) -> List[AlertItemDB]: """Lade alle Alerts fuer einen Zeitraum.""" return self.db.query(AlertItemDB).filter( AlertItemDB.user_id == user_id, AlertItemDB.fetched_at >= start, AlertItemDB.fetched_at < end, AlertItemDB.status != "dropped" ).order_by(AlertItemDB.fetched_at.desc()).all() def _group_by_importance( self, alerts: List[AlertItemDB] ) -> List[DigestSection]: """Gruppiere Alerts nach Wichtigkeit.""" importance_config = [ (ImportanceLevelEnum.KRITISCH, "Kritisch", "#dc2626"), (ImportanceLevelEnum.DRINGEND, "Dringend", "#ea580c"), (ImportanceLevelEnum.WICHTIG, "Wichtig", "#d97706"), (ImportanceLevelEnum.PRUEFEN, "Zu pruefen", "#2563eb"), (ImportanceLevelEnum.INFO, "Info", "#64748b"), ] sections = [] for level, label, color in importance_config: items = [a for a in alerts if a.importance_level == level] if items: sections.append(DigestSection( importance_level=level, label_de=label, color=color, items=items[:5] # Max 5 pro Kategorie )) return sections async def _generate_summaries(self, content: DigestContent): """Generiere LLM-basierte Zusammenfassungen.""" # Einleitung generieren content.introduction = await self._generate_introduction(content) # Zusammenfassungen pro Sektion for section in content.sections: section.summary = await self._generate_section_summary(section) async def _generate_introduction(self, content: DigestContent) -> str: """Generiere eine einleitende Zusammenfassung.""" prompt = f"""Du bist ein Assistent fuer Schulleitungen und Lehrkraefte in Deutschland. Schreibe eine kurze Einleitung (2-3 Saetze) fuer einen Wochenbericht. Zeitraum: {content.period_start.strftime('%d.%m.%Y')} - {content.period_end.strftime('%d.%m.%Y')} Gesamt: {content.total_alerts} Meldungen Kritisch: {content.critical_count} Dringend: {content.urgent_count} Schreibe auf Deutsch in einfacher Sprache (B1/B2 Niveau). Beginne mit "Diese Woche..." oder "In der vergangenen Woche...".""" return await self._call_llm(prompt, max_tokens=150) async def _generate_section_summary(self, section: DigestSection) -> str: """Generiere Zusammenfassung fuer eine Sektion.""" if not section.items: return "" titles = "\n".join([f"- {item.title}" for item in section.items[:5]]) prompt = f"""Fasse diese {len(section.items)} Meldungen der Kategorie "{section.label_de}" in 1-2 Saetzen zusammen: {titles} Schreibe auf Deutsch in einfacher Sprache. Nenne die wichtigsten Handlungsbedarfe.""" return await self._call_llm(prompt, max_tokens=100) async def _call_llm(self, prompt: str, max_tokens: int = 200) -> str: """Rufe das LLM auf.""" try: if self.llm_provider == "ollama": return await self._call_ollama(prompt, max_tokens) elif self.llm_provider == "openai": return await self._call_openai(prompt, max_tokens) elif self.llm_provider == "anthropic": return await self._call_anthropic(prompt, max_tokens) else: return self._generate_fallback_summary(prompt) except Exception as e: print(f"LLM call failed: {e}") return self._generate_fallback_summary(prompt) async def _call_ollama(self, prompt: str, max_tokens: int) -> str: """Rufe lokales Ollama-Modell auf.""" import httpx try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "http://localhost:11434/api/generate", json={ "model": self.llm_model, "prompt": prompt, "stream": False, "options": { "num_predict": max_tokens, "temperature": 0.7 } } ) if response.status_code == 200: data = response.json() return data.get("response", "").strip() except Exception as e: print(f"Ollama error: {e}") return self._generate_fallback_summary(prompt) async def _call_openai(self, prompt: str, max_tokens: int) -> str: """Rufe OpenAI API auf.""" import httpx api_key = os.getenv("OPENAI_API_KEY") if not api_key: return self._generate_fallback_summary(prompt) try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": self.llm_model or "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": 0.7 } ) if response.status_code == 200: data = response.json() return data["choices"][0]["message"]["content"].strip() except Exception as e: print(f"OpenAI error: {e}") return self._generate_fallback_summary(prompt) async def _call_anthropic(self, prompt: str, max_tokens: int) -> str: """Rufe Anthropic API auf.""" import httpx api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: return self._generate_fallback_summary(prompt) try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" }, json={ "model": self.llm_model or "claude-3-5-sonnet-latest", "max_tokens": max_tokens, "messages": [{"role": "user", "content": prompt}] } ) if response.status_code == 200: data = response.json() return data["content"][0]["text"].strip() except Exception as e: print(f"Anthropic error: {e}") return self._generate_fallback_summary(prompt) def _generate_fallback_summary(self, prompt: str) -> str: """Fallback ohne LLM.""" if "Einleitung" in prompt or "Wochenbericht" in prompt: return "Diese Woche haben Sie neue relevante Meldungen erhalten. Hier ist Ihre Zusammenfassung." return "Mehrere relevante Meldungen zu diesem Thema." def _generate_html(self, content: DigestContent) -> str: """Generiere HTML fuer den Digest.""" sections_html = "" for section in content.sections: items_html = "" for item in section.items: items_html += f"""
{section.summary}
' if section.summary else ''}{content.period_start.strftime('%d.%m.%Y')} - {content.period_end.strftime('%d.%m.%Y')}
{content.introduction}
' if content.introduction else ''} {sections_html}Dieser Bericht wurde automatisch von BreakPilot Alerts erstellt.