""" RSS Fetcher für Google Alerts. Liest Google Alerts RSS Feeds und konvertiert sie in AlertItems. Google Alerts RSS Feed Format: - Feed URL: https://google.com/alerts/feeds// - Entries enthalten: title, link, published, content """ import asyncio import logging from dataclasses import dataclass, field from datetime import datetime from typing import Optional from html import unescape import re import httpx try: import feedparser FEEDPARSER_AVAILABLE = True except ImportError: FEEDPARSER_AVAILABLE = False from ..models.alert_item import AlertItem, AlertSource, AlertStatus logger = logging.getLogger(__name__) @dataclass class FeedConfig: """Konfiguration für einen RSS Feed.""" url: str topic_label: str # z.B. "Inklusion Bayern" enabled: bool = True fetch_interval_minutes: int = 60 last_fetched: Optional[datetime] = None last_entry_id: Optional[str] = None # Für Duplikat-Erkennung @dataclass class FetchResult: """Ergebnis eines Feed-Fetches.""" feed_url: str success: bool items: list = field(default_factory=list) # List[AlertItem] error: Optional[str] = None fetched_at: datetime = field(default_factory=datetime.utcnow) new_items_count: int = 0 skipped_count: int = 0 # Bereits bekannte Items class RSSFetcher: """ Fetcher für Google Alerts RSS Feeds. Usage: fetcher = RSSFetcher() fetcher.add_feed("https://google.com/alerts/feeds/...", "Inklusion") results = await fetcher.fetch_all() """ def __init__(self, timeout: int = 30, user_agent: str = "BreakPilot-AlertAgent/0.1"): """ Initialisiere RSSFetcher. Args: timeout: HTTP Timeout in Sekunden user_agent: User-Agent Header """ if not FEEDPARSER_AVAILABLE: raise ImportError( "feedparser ist nicht installiert. " "Installiere mit: pip install feedparser" ) self.feeds: list[FeedConfig] = [] self.timeout = timeout self.user_agent = user_agent self._client: Optional[httpx.AsyncClient] = None def add_feed(self, url: str, topic_label: str, **kwargs) -> None: """Füge einen Feed hinzu.""" config = FeedConfig(url=url, topic_label=topic_label, **kwargs) self.feeds.append(config) logger.info(f"Feed hinzugefügt: {topic_label} ({url[:50]}...)") def remove_feed(self, url: str) -> bool: """Entferne einen Feed.""" before = len(self.feeds) self.feeds = [f for f in self.feeds if f.url != url] return len(self.feeds) < before async def _get_client(self) -> httpx.AsyncClient: """Hole oder erstelle HTTP Client.""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( timeout=self.timeout, headers={"User-Agent": self.user_agent}, follow_redirects=True, ) return self._client async def close(self) -> None: """Schließe HTTP Client.""" if self._client: await self._client.aclose() self._client = None async def fetch_feed(self, config: FeedConfig, known_entry_ids: Optional[set] = None) -> FetchResult: """ Fetch einen einzelnen Feed. Args: config: Feed-Konfiguration known_entry_ids: Optional Set von bereits bekannten Entry-IDs Returns: FetchResult mit AlertItems """ result = FetchResult(feed_url=config.url, success=False) known_ids = known_entry_ids or set() try: client = await self._get_client() response = await client.get(config.url) response.raise_for_status() # Parse Feed feed = feedparser.parse(response.text) if feed.bozo and feed.bozo_exception: # Feed hatte Parsing-Fehler, aber möglicherweise noch nutzbar logger.warning(f"Feed {config.topic_label}: Parsing-Warnung: {feed.bozo_exception}") if not feed.entries: logger.info(f"Feed {config.topic_label}: Keine Einträge") result.success = True return result items = [] for entry in feed.entries: # Entry-ID für Duplikat-Check entry_id = entry.get("id") or entry.get("link") or entry.get("title") if entry_id in known_ids: result.skipped_count += 1 continue # Konvertiere zu AlertItem alert = self._entry_to_alert(entry, config) if alert: items.append(alert) result.new_items_count += 1 result.items = items result.success = True config.last_fetched = datetime.utcnow() logger.info( f"Feed {config.topic_label}: {result.new_items_count} neue, " f"{result.skipped_count} übersprungen" ) except httpx.HTTPStatusError as e: result.error = f"HTTP {e.response.status_code}: {e.response.reason_phrase}" logger.error(f"Feed {config.topic_label}: {result.error}") except httpx.RequestError as e: result.error = f"Request failed: {str(e)}" logger.error(f"Feed {config.topic_label}: {result.error}") except Exception as e: result.error = f"Unexpected error: {str(e)}" logger.exception(f"Feed {config.topic_label}: Unerwarteter Fehler") return result def _entry_to_alert(self, entry: dict, config: FeedConfig) -> Optional[AlertItem]: """ Konvertiere feedparser Entry zu AlertItem. Google Alerts Entry Format: - title: Titel mit HTML-Entities - link: URL zum Artikel - published_parsed: Datum als struct_time - content[0].value: HTML Content mit Snippet """ try: # Title bereinigen title = unescape(entry.get("title", "")) title = self._clean_html(title) # URL extrahieren url = entry.get("link", "") if not url: return None # Snippet aus Content extrahieren snippet = "" if "content" in entry and entry["content"]: content_html = entry["content"][0].get("value", "") snippet = self._clean_html(content_html) elif "summary" in entry: snippet = self._clean_html(entry["summary"]) # Datum parsen published_at = None if "published_parsed" in entry and entry["published_parsed"]: try: published_at = datetime(*entry["published_parsed"][:6]) except (TypeError, ValueError): pass # AlertItem erstellen alert = AlertItem( source=AlertSource.GOOGLE_ALERTS_RSS, topic_label=config.topic_label, feed_url=config.url, title=title, url=url, snippet=snippet[:2000], # Limit snippet length published_at=published_at, status=AlertStatus.NEW, ) return alert except Exception as e: logger.warning(f"Entry konnte nicht konvertiert werden: {e}") return None def _clean_html(self, html: str) -> str: """Entferne HTML Tags und bereinige Text.""" if not html: return "" # HTML-Entities dekodieren text = unescape(html) # HTML Tags entfernen text = re.sub(r"<[^>]+>", " ", text) # Whitespace normalisieren text = re.sub(r"\s+", " ", text) return text.strip() async def fetch_all(self, known_entry_ids: Optional[set] = None, parallel: bool = True) -> list[FetchResult]: """ Fetch alle konfigurierten Feeds. Args: known_entry_ids: Set von bekannten Entry-IDs (global) parallel: Wenn True, fetche parallel Returns: Liste von FetchResults """ active_feeds = [f for f in self.feeds if f.enabled] if not active_feeds: logger.warning("Keine aktiven Feeds konfiguriert") return [] logger.info(f"Fetche {len(active_feeds)} Feeds...") if parallel: tasks = [ self.fetch_feed(config, known_entry_ids) for config in active_feeds ] results = await asyncio.gather(*tasks, return_exceptions=True) # Exceptions in FetchResults konvertieren processed = [] for i, result in enumerate(results): if isinstance(result, Exception): processed.append(FetchResult( feed_url=active_feeds[i].url, success=False, error=str(result) )) else: processed.append(result) return processed else: results = [] for config in active_feeds: result = await self.fetch_feed(config, known_entry_ids) results.append(result) return results def get_all_items(self, results: list[FetchResult]) -> list[AlertItem]: """Extrahiere alle AlertItems aus FetchResults.""" items = [] for result in results: if result.success: items.extend(result.items) return items def get_stats(self, results: list[FetchResult]) -> dict: """Generiere Statistiken über Fetch-Ergebnisse.""" total_new = sum(r.new_items_count for r in results) total_skipped = sum(r.skipped_count for r in results) successful = sum(1 for r in results if r.success) failed = sum(1 for r in results if not r.success) return { "feeds_total": len(results), "feeds_successful": successful, "feeds_failed": failed, "items_new": total_new, "items_skipped": total_skipped, "errors": [r.error for r in results if r.error], } async def fetch_and_store_feed( topic_id: str, feed_url: str, db, ) -> dict: """ Convenience function to fetch a single feed and store results. This is the function used by the API to trigger manual fetches. Args: topic_id: The topic ID to associate with fetched items feed_url: The RSS feed URL to fetch db: Database session for storing results Returns: dict with new_items and duplicates_skipped counts """ from ..db.repository import AlertItemRepository, TopicRepository if not FEEDPARSER_AVAILABLE: raise ImportError("feedparser ist nicht installiert") fetcher = RSSFetcher() fetcher.add_feed(feed_url, topic_label=topic_id) # Get known entry IDs to skip duplicates alert_repo = AlertItemRepository(db) existing_urls = alert_repo.get_existing_urls(topic_id) # Fetch the feed results = await fetcher.fetch_all(known_entry_ids=existing_urls) await fetcher.close() if not results: return {"new_items": 0, "duplicates_skipped": 0} result = results[0] if not result.success: raise Exception(result.error or "Feed fetch failed") # Store new items new_count = 0 for item in result.items: alert_repo.create_from_alert_item(item, topic_id) new_count += 1 # Update topic stats topic_repo = TopicRepository(db) topic_repo.update_fetch_status( topic_id, last_fetch_error=None, items_fetched=new_count, ) return { "new_items": new_count, "duplicates_skipped": result.skipped_count, }