A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
357 lines
10 KiB
Python
357 lines
10 KiB
Python
"""
|
|
Email Parser für Google Alerts.
|
|
|
|
Parst Google Alert E-Mails und extrahiert Alert-Items.
|
|
|
|
Google Alert E-Mail Format:
|
|
- Subject: Google Alert - <Suchbegriff>
|
|
- Body enthält HTML mit Links zu Artikeln
|
|
- Jeder Artikel hat: Titel, URL, Snippet, Quelle
|
|
"""
|
|
import re
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import List, Optional, Dict, Any
|
|
from html import unescape
|
|
from urllib.parse import urlparse, parse_qs, unquote
|
|
from email import message_from_bytes, message_from_string
|
|
from email.message import EmailMessage
|
|
from bs4 import BeautifulSoup
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ParsedAlertEmail:
|
|
"""Ergebnis eines geparsten Google Alert E-Mails."""
|
|
search_term: str
|
|
items: List[Dict[str, Any]]
|
|
received_at: datetime
|
|
message_id: Optional[str] = None
|
|
|
|
|
|
def extract_real_url(google_redirect_url: str) -> str:
|
|
"""
|
|
Extrahiert die echte URL aus einem Google Redirect-Link.
|
|
|
|
Google Alert Links haben das Format:
|
|
https://www.google.com/url?rct=j&sa=t&url=<ENCODED_URL>&...
|
|
|
|
Args:
|
|
google_redirect_url: Google Redirect URL
|
|
|
|
Returns:
|
|
Echte Ziel-URL
|
|
"""
|
|
if "google.com/url" in google_redirect_url:
|
|
parsed = urlparse(google_redirect_url)
|
|
params = parse_qs(parsed.query)
|
|
|
|
if "url" in params:
|
|
return unquote(params["url"][0])
|
|
|
|
return google_redirect_url
|
|
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Bereinigt Text von HTML-Entities und überschüssigem Whitespace."""
|
|
if not text:
|
|
return ""
|
|
|
|
# HTML-Entities dekodieren
|
|
text = unescape(text)
|
|
|
|
# Überschüssigen Whitespace entfernen
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
|
return text.strip()
|
|
|
|
|
|
def parse_google_alert_html(html_content: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Parst den HTML-Body einer Google Alert E-Mail.
|
|
|
|
Args:
|
|
html_content: HTML-Inhalt der E-Mail
|
|
|
|
Returns:
|
|
Liste von Alert-Items mit title, url, snippet, source
|
|
"""
|
|
items = []
|
|
|
|
try:
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# Google Alerts verwendet verschiedene Formate
|
|
# Format 1: Tabellen-basiert (älteres Format)
|
|
for table in soup.find_all('table'):
|
|
# Suche nach Links in der Tabelle
|
|
for link in table.find_all('a', href=True):
|
|
href = link.get('href', '')
|
|
|
|
# Nur Google-Redirect-Links (echte Alert-Links)
|
|
if 'google.com/url' not in href:
|
|
continue
|
|
|
|
real_url = extract_real_url(href)
|
|
|
|
# Titel ist der Link-Text
|
|
title = clean_text(link.get_text())
|
|
if not title or len(title) < 5:
|
|
continue
|
|
|
|
# Snippet: Text nach dem Link in der gleichen Zelle
|
|
parent = link.find_parent('td') or link.find_parent('div')
|
|
snippet = ""
|
|
if parent:
|
|
# Text nach dem Link extrahieren
|
|
full_text = clean_text(parent.get_text())
|
|
if title in full_text:
|
|
snippet = full_text.replace(title, '').strip()
|
|
# Ersten 300 Zeichen als Snippet
|
|
snippet = snippet[:300]
|
|
|
|
# Quelle extrahieren (Domain)
|
|
source_domain = urlparse(real_url).netloc
|
|
|
|
items.append({
|
|
"title": title,
|
|
"url": real_url,
|
|
"snippet": snippet,
|
|
"source": source_domain,
|
|
})
|
|
|
|
# Format 2: Div-basiert (neueres Format)
|
|
if not items:
|
|
for div in soup.find_all('div', class_=re.compile(r'.*')):
|
|
for link in div.find_all('a', href=True):
|
|
href = link.get('href', '')
|
|
|
|
if 'google.com/url' not in href:
|
|
continue
|
|
|
|
real_url = extract_real_url(href)
|
|
title = clean_text(link.get_text())
|
|
|
|
if not title or len(title) < 5:
|
|
continue
|
|
|
|
# Duplikate vermeiden
|
|
if any(i['url'] == real_url for i in items):
|
|
continue
|
|
|
|
source_domain = urlparse(real_url).netloc
|
|
|
|
items.append({
|
|
"title": title,
|
|
"url": real_url,
|
|
"snippet": "",
|
|
"source": source_domain,
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing Google Alert HTML: {e}")
|
|
|
|
return items
|
|
|
|
|
|
def parse_email_message(
|
|
email_bytes: bytes = None,
|
|
email_string: str = None,
|
|
) -> Optional[ParsedAlertEmail]:
|
|
"""
|
|
Parst eine E-Mail-Nachricht (Google Alert Format).
|
|
|
|
Args:
|
|
email_bytes: Raw E-Mail als Bytes
|
|
email_string: E-Mail als String
|
|
|
|
Returns:
|
|
ParsedAlertEmail oder None bei Fehler
|
|
"""
|
|
try:
|
|
if email_bytes:
|
|
msg = message_from_bytes(email_bytes)
|
|
elif email_string:
|
|
msg = message_from_string(email_string)
|
|
else:
|
|
return None
|
|
|
|
# Prüfen ob es eine Google Alert E-Mail ist
|
|
subject = msg.get('Subject', '')
|
|
if 'Google Alert' not in subject:
|
|
logger.debug(f"Not a Google Alert email: {subject}")
|
|
return None
|
|
|
|
# Suchbegriff aus Subject extrahieren
|
|
# Format: "Google Alert - <Suchbegriff>"
|
|
search_term = ""
|
|
if ' - ' in subject:
|
|
search_term = subject.split(' - ', 1)[1].strip()
|
|
|
|
# Message-ID
|
|
message_id = msg.get('Message-ID', '')
|
|
|
|
# Empfangsdatum
|
|
date_str = msg.get('Date', '')
|
|
received_at = datetime.utcnow() # Fallback
|
|
|
|
# HTML-Body extrahieren
|
|
html_content = ""
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
if content_type == 'text/html':
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or 'utf-8'
|
|
html_content = payload.decode(charset, errors='replace')
|
|
break
|
|
else:
|
|
content_type = msg.get_content_type()
|
|
if content_type == 'text/html':
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or 'utf-8'
|
|
html_content = payload.decode(charset, errors='replace')
|
|
|
|
if not html_content:
|
|
logger.warning(f"No HTML content in Google Alert email: {subject}")
|
|
return None
|
|
|
|
# HTML parsen
|
|
items = parse_google_alert_html(html_content)
|
|
|
|
return ParsedAlertEmail(
|
|
search_term=search_term,
|
|
items=items,
|
|
received_at=received_at,
|
|
message_id=message_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing email message: {e}")
|
|
return None
|
|
|
|
|
|
async def process_alert_emails(
|
|
emails: List[bytes],
|
|
topic_id: str,
|
|
db,
|
|
) -> Dict[str, int]:
|
|
"""
|
|
Verarbeitet eine Liste von Google Alert E-Mails und speichert Items in DB.
|
|
|
|
Args:
|
|
emails: Liste von E-Mails als Bytes
|
|
topic_id: ID des zugehörigen Topics
|
|
db: SQLAlchemy Session
|
|
|
|
Returns:
|
|
Dict mit new_items und duplicates_skipped
|
|
"""
|
|
from alerts_agent.db.repository import AlertItemRepository
|
|
from alerts_agent.db.models import AlertSourceEnum
|
|
|
|
repo = AlertItemRepository(db)
|
|
|
|
total_new = 0
|
|
total_skipped = 0
|
|
|
|
for email_bytes in emails:
|
|
parsed = parse_email_message(email_bytes=email_bytes)
|
|
|
|
if not parsed:
|
|
continue
|
|
|
|
for item in parsed.items:
|
|
alert = repo.create_if_not_exists(
|
|
topic_id=topic_id,
|
|
title=item["title"],
|
|
url=item["url"],
|
|
snippet=item.get("snippet", ""),
|
|
source=AlertSourceEnum.GOOGLE_ALERTS_EMAIL,
|
|
)
|
|
|
|
if alert:
|
|
total_new += 1
|
|
else:
|
|
total_skipped += 1
|
|
|
|
return {
|
|
"new_items": total_new,
|
|
"duplicates_skipped": total_skipped,
|
|
}
|
|
|
|
|
|
# IMAP-Integration für automatisches E-Mail-Fetching
|
|
async def fetch_emails_from_imap(
|
|
host: str,
|
|
username: str,
|
|
password: str,
|
|
folder: str = "INBOX",
|
|
search_criteria: str = 'FROM "googlealerts-noreply@google.com" UNSEEN',
|
|
limit: int = 100,
|
|
) -> List[bytes]:
|
|
"""
|
|
Holt E-Mails von einem IMAP-Server.
|
|
|
|
Args:
|
|
host: IMAP-Server Hostname
|
|
username: IMAP-Benutzername
|
|
password: IMAP-Passwort
|
|
folder: IMAP-Ordner (default: INBOX)
|
|
search_criteria: IMAP-Suchkriterien
|
|
limit: Maximale Anzahl E-Mails
|
|
|
|
Returns:
|
|
Liste von E-Mails als Bytes
|
|
"""
|
|
try:
|
|
import aioimaplib
|
|
except ImportError:
|
|
logger.error("aioimaplib not installed. Run: pip install aioimaplib")
|
|
return []
|
|
|
|
emails = []
|
|
|
|
try:
|
|
# IMAP-Verbindung
|
|
client = aioimaplib.IMAP4_SSL(host)
|
|
await client.wait_hello_from_server()
|
|
|
|
# Login
|
|
await client.login(username, password)
|
|
|
|
# Ordner auswählen
|
|
await client.select(folder)
|
|
|
|
# E-Mails suchen
|
|
result, data = await client.search(search_criteria)
|
|
|
|
if result != 'OK':
|
|
logger.error(f"IMAP search failed: {result}")
|
|
return []
|
|
|
|
# Message-IDs extrahieren
|
|
message_ids = data[0].split()[-limit:] # Letzte N E-Mails
|
|
|
|
# E-Mails abrufen
|
|
for msg_id in message_ids:
|
|
result, data = await client.fetch(msg_id, '(RFC822)')
|
|
if result == 'OK' and data:
|
|
# data ist eine Liste von Tupeln
|
|
for item in data:
|
|
if isinstance(item, tuple) and len(item) >= 2:
|
|
emails.append(item[1])
|
|
|
|
# Logout
|
|
await client.logout()
|
|
|
|
except Exception as e:
|
|
logger.error(f"IMAP fetch error: {e}")
|
|
|
|
return emails
|