""" Digest Generator fuer Wochenzusammenfassungen. Generiert LLM-basierte Zusammenfassungen der wichtigsten Alerts: - Gruppierung nach Wichtigkeit (Kritisch, Dringend, Wichtig, etc.) - Kurze Zusammenfassung pro Kategorie - HTML-Ausgabe fuer E-Mail und UI - PDF-Export Verwendung: generator = DigestGenerator(db_session, llm_client) digest = await generator.generate_weekly_digest(user_id) """ import uuid from typing import List, Dict, Any, Optional from datetime import datetime, timedelta from dataclasses import dataclass import json import os from ..db.models import ( AlertItemDB, AlertDigestDB, UserAlertSubscriptionDB, ImportanceLevelEnum, DigestStatusEnum ) @dataclass class DigestSection: """Eine Sektion im Digest (z.B. Kritisch, Dringend).""" importance_level: ImportanceLevelEnum label_de: str color: str items: List[AlertItemDB] summary: str = "" @dataclass class DigestContent: """Vollstaendiger Digest-Inhalt.""" user_id: str period_start: datetime period_end: datetime sections: List[DigestSection] total_alerts: int critical_count: int urgent_count: int introduction: str = "" html: str = "" class DigestGenerator: """ Generiert Wochenzusammenfassungen fuer Alerts. Unterstuetzt: - Lokale Ollama-Modelle - OpenAI API - Anthropic API """ def __init__( self, db_session, llm_provider: str = "ollama", llm_model: str = "llama3.2:3b" ): """ Initialisiere den Digest Generator. Args: db_session: SQLAlchemy Session llm_provider: "ollama", "openai", oder "anthropic" llm_model: Modellname """ self.db = db_session self.llm_provider = llm_provider self.llm_model = llm_model async def generate_weekly_digest( self, user_id: str, weeks_back: int = 1 ) -> Optional[AlertDigestDB]: """ Generiere einen Wochendigest fuer einen User. Args: user_id: User-ID weeks_back: Wie viele Wochen zurueck (default: letzte Woche) Returns: AlertDigestDB oder None bei Fehler """ # Zeitraum berechnen now = datetime.utcnow() period_end = now - timedelta(days=now.weekday()) # Montag dieser Woche period_start = period_end - timedelta(weeks=weeks_back) # Alerts laden alerts = self._load_alerts_for_period(user_id, period_start, period_end) if not alerts: return None # Nach Wichtigkeit gruppieren sections = self._group_by_importance(alerts) # Digest-Content erstellen content = DigestContent( user_id=user_id, period_start=period_start, period_end=period_end, sections=sections, total_alerts=len(alerts), critical_count=len([a for a in alerts if a.importance_level == ImportanceLevelEnum.KRITISCH]), urgent_count=len([a for a in alerts if a.importance_level == ImportanceLevelEnum.DRINGEND]) ) # LLM-Zusammenfassungen generieren await self._generate_summaries(content) # HTML generieren content.html = self._generate_html(content) # In DB speichern digest = self._save_digest(content) return digest def _load_alerts_for_period( self, user_id: str, start: datetime, end: datetime ) -> List[AlertItemDB]: """Lade alle Alerts fuer einen Zeitraum.""" return self.db.query(AlertItemDB).filter( AlertItemDB.user_id == user_id, AlertItemDB.fetched_at >= start, AlertItemDB.fetched_at < end, AlertItemDB.status != "dropped" ).order_by(AlertItemDB.fetched_at.desc()).all() def _group_by_importance( self, alerts: List[AlertItemDB] ) -> List[DigestSection]: """Gruppiere Alerts nach Wichtigkeit.""" importance_config = [ (ImportanceLevelEnum.KRITISCH, "Kritisch", "#dc2626"), (ImportanceLevelEnum.DRINGEND, "Dringend", "#ea580c"), (ImportanceLevelEnum.WICHTIG, "Wichtig", "#d97706"), (ImportanceLevelEnum.PRUEFEN, "Zu pruefen", "#2563eb"), (ImportanceLevelEnum.INFO, "Info", "#64748b"), ] sections = [] for level, label, color in importance_config: items = [a for a in alerts if a.importance_level == level] if items: sections.append(DigestSection( importance_level=level, label_de=label, color=color, items=items[:5] # Max 5 pro Kategorie )) return sections async def _generate_summaries(self, content: DigestContent): """Generiere LLM-basierte Zusammenfassungen.""" # Einleitung generieren content.introduction = await self._generate_introduction(content) # Zusammenfassungen pro Sektion for section in content.sections: section.summary = await self._generate_section_summary(section) async def _generate_introduction(self, content: DigestContent) -> str: """Generiere eine einleitende Zusammenfassung.""" prompt = f"""Du bist ein Assistent fuer Schulleitungen und Lehrkraefte in Deutschland. Schreibe eine kurze Einleitung (2-3 Saetze) fuer einen Wochenbericht. Zeitraum: {content.period_start.strftime('%d.%m.%Y')} - {content.period_end.strftime('%d.%m.%Y')} Gesamt: {content.total_alerts} Meldungen Kritisch: {content.critical_count} Dringend: {content.urgent_count} Schreibe auf Deutsch in einfacher Sprache (B1/B2 Niveau). Beginne mit "Diese Woche..." oder "In der vergangenen Woche...".""" return await self._call_llm(prompt, max_tokens=150) async def _generate_section_summary(self, section: DigestSection) -> str: """Generiere Zusammenfassung fuer eine Sektion.""" if not section.items: return "" titles = "\n".join([f"- {item.title}" for item in section.items[:5]]) prompt = f"""Fasse diese {len(section.items)} Meldungen der Kategorie "{section.label_de}" in 1-2 Saetzen zusammen: {titles} Schreibe auf Deutsch in einfacher Sprache. Nenne die wichtigsten Handlungsbedarfe.""" return await self._call_llm(prompt, max_tokens=100) async def _call_llm(self, prompt: str, max_tokens: int = 200) -> str: """Rufe das LLM auf.""" try: if self.llm_provider == "ollama": return await self._call_ollama(prompt, max_tokens) elif self.llm_provider == "openai": return await self._call_openai(prompt, max_tokens) elif self.llm_provider == "anthropic": return await self._call_anthropic(prompt, max_tokens) else: return self._generate_fallback_summary(prompt) except Exception as e: print(f"LLM call failed: {e}") return self._generate_fallback_summary(prompt) async def _call_ollama(self, prompt: str, max_tokens: int) -> str: """Rufe lokales Ollama-Modell auf.""" import httpx try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "http://localhost:11434/api/generate", json={ "model": self.llm_model, "prompt": prompt, "stream": False, "options": { "num_predict": max_tokens, "temperature": 0.7 } } ) if response.status_code == 200: data = response.json() return data.get("response", "").strip() except Exception as e: print(f"Ollama error: {e}") return self._generate_fallback_summary(prompt) async def _call_openai(self, prompt: str, max_tokens: int) -> str: """Rufe OpenAI API auf.""" import httpx api_key = os.getenv("OPENAI_API_KEY") if not api_key: return self._generate_fallback_summary(prompt) try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": self.llm_model or "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": 0.7 } ) if response.status_code == 200: data = response.json() return data["choices"][0]["message"]["content"].strip() except Exception as e: print(f"OpenAI error: {e}") return self._generate_fallback_summary(prompt) async def _call_anthropic(self, prompt: str, max_tokens: int) -> str: """Rufe Anthropic API auf.""" import httpx api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: return self._generate_fallback_summary(prompt) try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" }, json={ "model": self.llm_model or "claude-3-5-sonnet-latest", "max_tokens": max_tokens, "messages": [{"role": "user", "content": prompt}] } ) if response.status_code == 200: data = response.json() return data["content"][0]["text"].strip() except Exception as e: print(f"Anthropic error: {e}") return self._generate_fallback_summary(prompt) def _generate_fallback_summary(self, prompt: str) -> str: """Fallback ohne LLM.""" if "Einleitung" in prompt or "Wochenbericht" in prompt: return "Diese Woche haben Sie neue relevante Meldungen erhalten. Hier ist Ihre Zusammenfassung." return "Mehrere relevante Meldungen zu diesem Thema." def _generate_html(self, content: DigestContent) -> str: """Generiere HTML fuer den Digest.""" sections_html = "" for section in content.sections: items_html = "" for item in section.items: items_html += f"""
{item.title}
{item.source_name or 'Unbekannt'}
Oeffnen """ sections_html += f"""

{section.label_de}

({len(section.items)} Meldungen)
{f'

{section.summary}

' if section.summary else ''} {items_html}
""" return f""" Wochenbericht - BreakPilot Alerts

Wochenbericht

{content.period_start.strftime('%d.%m.%Y')} - {content.period_end.strftime('%d.%m.%Y')}

{content.total_alerts}
Gesamt
{content.critical_count}
Kritisch
{content.urgent_count}
Dringend
{f'

{content.introduction}

' if content.introduction else ''} {sections_html}

Dieser Bericht wurde automatisch von BreakPilot Alerts erstellt.

Einstellungen anpassen | Abmelden

""" def _save_digest(self, content: DigestContent) -> AlertDigestDB: """Speichere Digest in der Datenbank.""" # Finde Subscription fuer User subscription = self.db.query(UserAlertSubscriptionDB).filter( UserAlertSubscriptionDB.user_id == content.user_id, UserAlertSubscriptionDB.is_active == True ).first() digest = AlertDigestDB( id=str(uuid.uuid4()), subscription_id=subscription.id if subscription else None, user_id=content.user_id, period_start=content.period_start, period_end=content.period_end, summary_html=content.html, total_alerts=content.total_alerts, critical_count=content.critical_count, urgent_count=content.urgent_count, important_count=sum(len(s.items) for s in content.sections if s.importance_level == ImportanceLevelEnum.WICHTIG), review_count=sum(len(s.items) for s in content.sections if s.importance_level == ImportanceLevelEnum.PRUEFEN), info_count=sum(len(s.items) for s in content.sections if s.importance_level == ImportanceLevelEnum.INFO), status=DigestStatusEnum.PENDING ) self.db.add(digest) self.db.commit() self.db.refresh(digest) return digest async def generate_digest_for_all_users(db_session) -> int: """ Generiere Digests fuer alle aktiven Subscriptions. Wird vom Scheduler (z.B. Celery, APScheduler) aufgerufen. Returns: Anzahl generierter Digests """ # Finde alle aktiven Subscriptions mit Digest aktiviert subscriptions = db_session.query(UserAlertSubscriptionDB).filter( UserAlertSubscriptionDB.is_active == True, UserAlertSubscriptionDB.digest_enabled == True ).all() generator = DigestGenerator(db_session) count = 0 for sub in subscriptions: try: digest = await generator.generate_weekly_digest(sub.user_id) if digest: count += 1 except Exception as e: print(f"Error generating digest for user {sub.user_id}: {e}") return count