Files
breakpilot-lehrer/backend-lehrer/alerts_agent/db/repository.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

993 lines
31 KiB
Python

"""
Repository für Alerts Agent - CRUD Operationen für Topics, Items, Rules und Profile.
Abstraktion der Datenbank-Operationen.
"""
import hashlib
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session as DBSession
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy import or_, and_, func
from .models import (
AlertTopicDB, AlertItemDB, AlertRuleDB, AlertProfileDB,
AlertSourceEnum, AlertStatusEnum, RelevanceDecisionEnum,
FeedTypeEnum, RuleActionEnum
)
# =============================================================================
# TOPIC REPOSITORY
# =============================================================================
class TopicRepository:
"""Repository für Alert Topics (Feed-Quellen)."""
def __init__(self, db: DBSession):
self.db = db
# ==================== CREATE ====================
def create(
self,
name: str,
feed_url: str = None,
feed_type: str = "rss",
user_id: str = None,
description: str = "",
fetch_interval_minutes: int = 60,
is_active: bool = True,
) -> AlertTopicDB:
"""Erstellt ein neues Topic."""
import uuid
topic = AlertTopicDB(
id=str(uuid.uuid4()),
user_id=user_id,
name=name,
description=description,
feed_url=feed_url,
feed_type=FeedTypeEnum(feed_type),
fetch_interval_minutes=fetch_interval_minutes,
is_active=is_active,
)
self.db.add(topic)
self.db.commit()
self.db.refresh(topic)
return topic
# ==================== READ ====================
def get_by_id(self, topic_id: str) -> Optional[AlertTopicDB]:
"""Holt ein Topic nach ID."""
return self.db.query(AlertTopicDB).filter(
AlertTopicDB.id == topic_id
).first()
def get_all(
self,
user_id: str = None,
is_active: bool = None,
limit: int = 100,
offset: int = 0,
) -> List[AlertTopicDB]:
"""Holt alle Topics mit optionalen Filtern."""
query = self.db.query(AlertTopicDB)
if user_id:
query = query.filter(AlertTopicDB.user_id == user_id)
if is_active is not None:
query = query.filter(AlertTopicDB.is_active == is_active)
return query.order_by(
AlertTopicDB.created_at.desc()
).offset(offset).limit(limit).all()
def get_active_for_fetch(self) -> List[AlertTopicDB]:
"""Holt alle aktiven Topics die gefetcht werden sollten."""
# Topics wo fetch_interval_minutes vergangen ist
return self.db.query(AlertTopicDB).filter(
AlertTopicDB.is_active == True,
AlertTopicDB.feed_url.isnot(None),
).all()
# ==================== UPDATE ====================
def update(
self,
topic_id: str,
name: str = None,
description: str = None,
feed_url: str = None,
feed_type: str = None,
is_active: bool = None,
fetch_interval_minutes: int = None,
) -> Optional[AlertTopicDB]:
"""Aktualisiert ein Topic."""
topic = self.get_by_id(topic_id)
if not topic:
return None
if name is not None:
topic.name = name
if description is not None:
topic.description = description
if feed_url is not None:
topic.feed_url = feed_url
if feed_type is not None:
topic.feed_type = FeedTypeEnum(feed_type)
if is_active is not None:
topic.is_active = is_active
if fetch_interval_minutes is not None:
topic.fetch_interval_minutes = fetch_interval_minutes
self.db.commit()
self.db.refresh(topic)
return topic
def update_fetch_status(
self,
topic_id: str,
last_fetch_error: str = None,
items_fetched: int = 0,
) -> Optional[AlertTopicDB]:
"""Aktualisiert den Fetch-Status eines Topics."""
topic = self.get_by_id(topic_id)
if not topic:
return None
topic.last_fetched_at = datetime.utcnow()
topic.last_fetch_error = last_fetch_error
topic.total_items_fetched += items_fetched
self.db.commit()
self.db.refresh(topic)
return topic
def increment_stats(
self,
topic_id: str,
kept: int = 0,
dropped: int = 0,
) -> Optional[AlertTopicDB]:
"""Erhöht die Statistiken eines Topics."""
topic = self.get_by_id(topic_id)
if not topic:
return None
topic.items_kept += kept
topic.items_dropped += dropped
self.db.commit()
self.db.refresh(topic)
return topic
# ==================== DELETE ====================
def delete(self, topic_id: str) -> bool:
"""Löscht ein Topic (und alle zugehörigen Items via CASCADE)."""
topic = self.get_by_id(topic_id)
if not topic:
return False
self.db.delete(topic)
self.db.commit()
return True
# ==================== CONVERSION ====================
def to_dict(self, topic: AlertTopicDB) -> Dict[str, Any]:
"""Konvertiert DB-Model zu Dictionary."""
return {
"id": topic.id,
"user_id": topic.user_id,
"name": topic.name,
"description": topic.description,
"feed_url": topic.feed_url,
"feed_type": topic.feed_type.value,
"is_active": topic.is_active,
"fetch_interval_minutes": topic.fetch_interval_minutes,
"last_fetched_at": topic.last_fetched_at.isoformat() if topic.last_fetched_at else None,
"last_fetch_error": topic.last_fetch_error,
"stats": {
"total_items_fetched": topic.total_items_fetched,
"items_kept": topic.items_kept,
"items_dropped": topic.items_dropped,
},
"created_at": topic.created_at.isoformat() if topic.created_at else None,
"updated_at": topic.updated_at.isoformat() if topic.updated_at else None,
}
# =============================================================================
# ALERT ITEM REPOSITORY
# =============================================================================
class AlertItemRepository:
"""Repository für Alert Items (einzelne Alerts/Artikel)."""
def __init__(self, db: DBSession):
self.db = db
# ==================== CREATE ====================
def create(
self,
topic_id: str,
title: str,
url: str,
snippet: str = "",
source: str = "google_alerts_rss",
published_at: datetime = None,
lang: str = "de",
) -> AlertItemDB:
"""Erstellt einen neuen Alert."""
import uuid
# URL-Hash berechnen
url_hash = self._compute_url_hash(url)
alert = AlertItemDB(
id=str(uuid.uuid4()),
topic_id=topic_id,
title=title,
url=url,
snippet=snippet,
source=AlertSourceEnum(source),
published_at=published_at,
lang=lang,
url_hash=url_hash,
canonical_url=self._normalize_url(url),
)
self.db.add(alert)
self.db.commit()
self.db.refresh(alert)
return alert
def create_if_not_exists(
self,
topic_id: str,
title: str,
url: str,
snippet: str = "",
source: str = "google_alerts_rss",
published_at: datetime = None,
) -> Optional[AlertItemDB]:
"""Erstellt einen Alert nur wenn URL noch nicht existiert."""
url_hash = self._compute_url_hash(url)
existing = self.db.query(AlertItemDB).filter(
AlertItemDB.url_hash == url_hash
).first()
if existing:
return None # Duplikat
return self.create(
topic_id=topic_id,
title=title,
url=url,
snippet=snippet,
source=source,
published_at=published_at,
)
# ==================== READ ====================
def get_by_id(self, alert_id: str) -> Optional[AlertItemDB]:
"""Holt einen Alert nach ID."""
return self.db.query(AlertItemDB).filter(
AlertItemDB.id == alert_id
).first()
def get_by_url_hash(self, url_hash: str) -> Optional[AlertItemDB]:
"""Holt einen Alert nach URL-Hash."""
return self.db.query(AlertItemDB).filter(
AlertItemDB.url_hash == url_hash
).first()
def get_inbox(
self,
user_id: str = None,
topic_id: str = None,
decision: str = None,
status: str = None,
limit: int = 50,
offset: int = 0,
) -> List[AlertItemDB]:
"""
Holt Inbox-Items mit Filtern.
Ohne decision werden KEEP und REVIEW angezeigt.
"""
query = self.db.query(AlertItemDB)
if topic_id:
query = query.filter(AlertItemDB.topic_id == topic_id)
if decision:
query = query.filter(
AlertItemDB.relevance_decision == RelevanceDecisionEnum(decision)
)
else:
# Default: KEEP und REVIEW
query = query.filter(
or_(
AlertItemDB.relevance_decision == RelevanceDecisionEnum.KEEP,
AlertItemDB.relevance_decision == RelevanceDecisionEnum.REVIEW,
AlertItemDB.relevance_decision.is_(None)
)
)
if status:
query = query.filter(AlertItemDB.status == AlertStatusEnum(status))
return query.order_by(
AlertItemDB.relevance_score.desc().nullslast(),
AlertItemDB.fetched_at.desc()
).offset(offset).limit(limit).all()
def get_unscored(
self,
topic_id: str = None,
limit: int = 100,
) -> List[AlertItemDB]:
"""Holt alle unbewerteten Alerts."""
query = self.db.query(AlertItemDB).filter(
AlertItemDB.status == AlertStatusEnum.NEW
)
if topic_id:
query = query.filter(AlertItemDB.topic_id == topic_id)
return query.order_by(AlertItemDB.fetched_at.desc()).limit(limit).all()
def get_by_topic(
self,
topic_id: str,
limit: int = 100,
offset: int = 0,
) -> List[AlertItemDB]:
"""Holt alle Alerts eines Topics."""
return self.db.query(AlertItemDB).filter(
AlertItemDB.topic_id == topic_id
).order_by(
AlertItemDB.fetched_at.desc()
).offset(offset).limit(limit).all()
def count_by_status(self, topic_id: str = None) -> Dict[str, int]:
"""Zählt Alerts nach Status."""
query = self.db.query(
AlertItemDB.status,
func.count(AlertItemDB.id).label('count')
)
if topic_id:
query = query.filter(AlertItemDB.topic_id == topic_id)
results = query.group_by(AlertItemDB.status).all()
return {r[0].value: r[1] for r in results}
def count_by_decision(self, topic_id: str = None) -> Dict[str, int]:
"""Zählt Alerts nach Relevanz-Entscheidung."""
query = self.db.query(
AlertItemDB.relevance_decision,
func.count(AlertItemDB.id).label('count')
)
if topic_id:
query = query.filter(AlertItemDB.topic_id == topic_id)
results = query.group_by(AlertItemDB.relevance_decision).all()
return {
(r[0].value if r[0] else "unscored"): r[1]
for r in results
}
# ==================== UPDATE ====================
def update_scoring(
self,
alert_id: str,
score: float,
decision: str,
reasons: List[str] = None,
summary: str = None,
model: str = None,
) -> Optional[AlertItemDB]:
"""Aktualisiert das Scoring eines Alerts."""
alert = self.get_by_id(alert_id)
if not alert:
return None
alert.relevance_score = score
alert.relevance_decision = RelevanceDecisionEnum(decision)
alert.relevance_reasons = reasons or []
alert.relevance_summary = summary
alert.scored_by_model = model
alert.scored_at = datetime.utcnow()
alert.status = AlertStatusEnum.SCORED
alert.processed_at = datetime.utcnow()
self.db.commit()
self.db.refresh(alert)
return alert
def update_status(
self,
alert_id: str,
status: str,
) -> Optional[AlertItemDB]:
"""Aktualisiert den Status eines Alerts."""
alert = self.get_by_id(alert_id)
if not alert:
return None
alert.status = AlertStatusEnum(status)
self.db.commit()
self.db.refresh(alert)
return alert
def mark_reviewed(
self,
alert_id: str,
is_relevant: bool,
notes: str = None,
tags: List[str] = None,
) -> Optional[AlertItemDB]:
"""Markiert einen Alert als reviewed mit Feedback."""
alert = self.get_by_id(alert_id)
if not alert:
return None
alert.status = AlertStatusEnum.REVIEWED
alert.user_marked_relevant = is_relevant
if notes:
alert.user_notes = notes
if tags:
alert.user_tags = tags
self.db.commit()
self.db.refresh(alert)
return alert
def archive(self, alert_id: str) -> Optional[AlertItemDB]:
"""Archiviert einen Alert."""
return self.update_status(alert_id, "archived")
# ==================== DELETE ====================
def delete(self, alert_id: str) -> bool:
"""Löscht einen Alert."""
alert = self.get_by_id(alert_id)
if not alert:
return False
self.db.delete(alert)
self.db.commit()
return True
def delete_old(self, days: int = 90, topic_id: str = None) -> int:
"""Löscht alte archivierte Alerts."""
from datetime import timedelta
cutoff = datetime.utcnow() - timedelta(days=days)
query = self.db.query(AlertItemDB).filter(
AlertItemDB.status == AlertStatusEnum.ARCHIVED,
AlertItemDB.fetched_at < cutoff,
)
if topic_id:
query = query.filter(AlertItemDB.topic_id == topic_id)
count = query.delete()
self.db.commit()
return count
# ==================== FOR RSS FETCHER ====================
def get_existing_urls(self, topic_id: str) -> set:
"""
Holt alle bekannten URL-Hashes für ein Topic.
Wird vom RSS-Fetcher verwendet um Duplikate zu vermeiden.
"""
results = self.db.query(AlertItemDB.url_hash).filter(
AlertItemDB.topic_id == topic_id
).all()
return {r[0] for r in results if r[0]}
def create_from_alert_item(self, alert_item, topic_id: str) -> AlertItemDB:
"""
Erstellt einen Alert aus einem AlertItem-Objekt vom RSS-Fetcher.
Args:
alert_item: AlertItem from rss_fetcher
topic_id: Topic ID to associate with
Returns:
Created AlertItemDB instance
"""
return self.create(
topic_id=topic_id,
title=alert_item.title,
url=alert_item.url,
snippet=alert_item.snippet or "",
source=alert_item.source.value if hasattr(alert_item.source, 'value') else str(alert_item.source),
published_at=alert_item.published_at,
)
# ==================== HELPER ====================
def _compute_url_hash(self, url: str) -> str:
"""Berechnet SHA256 Hash der normalisierten URL."""
normalized = self._normalize_url(url)
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def _normalize_url(self, url: str) -> str:
"""Normalisiert URL für Deduplizierung."""
import urllib.parse
parsed = urllib.parse.urlparse(url)
# Tracking-Parameter entfernen
tracking_params = {
"utm_source", "utm_medium", "utm_campaign", "utm_content", "utm_term",
"fbclid", "gclid", "ref", "source"
}
query_params = urllib.parse.parse_qs(parsed.query)
cleaned_params = {k: v for k, v in query_params.items()
if k.lower() not in tracking_params}
cleaned_query = urllib.parse.urlencode(cleaned_params, doseq=True)
# Rekonstruiere URL ohne Fragment
normalized = urllib.parse.urlunparse((
parsed.scheme,
parsed.netloc.lower(),
parsed.path.rstrip("/"),
parsed.params,
cleaned_query,
"" # No fragment
))
return normalized
# ==================== CONVERSION ====================
def to_dict(self, alert: AlertItemDB) -> Dict[str, Any]:
"""Konvertiert DB-Model zu Dictionary."""
return {
"id": alert.id,
"topic_id": alert.topic_id,
"title": alert.title,
"url": alert.url,
"snippet": alert.snippet,
"source": alert.source.value,
"lang": alert.lang,
"published_at": alert.published_at.isoformat() if alert.published_at else None,
"fetched_at": alert.fetched_at.isoformat() if alert.fetched_at else None,
"status": alert.status.value,
"relevance": {
"score": alert.relevance_score,
"decision": alert.relevance_decision.value if alert.relevance_decision else None,
"reasons": alert.relevance_reasons,
"summary": alert.relevance_summary,
"model": alert.scored_by_model,
"scored_at": alert.scored_at.isoformat() if alert.scored_at else None,
},
"user_feedback": {
"marked_relevant": alert.user_marked_relevant,
"tags": alert.user_tags,
"notes": alert.user_notes,
},
}
# =============================================================================
# ALERT RULE REPOSITORY
# =============================================================================
class RuleRepository:
"""Repository für Alert Rules (Filterregeln)."""
def __init__(self, db: DBSession):
self.db = db
# ==================== CREATE ====================
def create(
self,
name: str,
conditions: List[Dict],
action_type: str = "keep",
action_config: Dict = None,
topic_id: str = None,
user_id: str = None,
description: str = "",
priority: int = 0,
) -> AlertRuleDB:
"""Erstellt eine neue Regel."""
import uuid
rule = AlertRuleDB(
id=str(uuid.uuid4()),
topic_id=topic_id,
user_id=user_id,
name=name,
description=description,
conditions=conditions,
action_type=RuleActionEnum(action_type),
action_config=action_config or {},
priority=priority,
)
self.db.add(rule)
self.db.commit()
self.db.refresh(rule)
return rule
# ==================== READ ====================
def get_by_id(self, rule_id: str) -> Optional[AlertRuleDB]:
"""Holt eine Regel nach ID."""
return self.db.query(AlertRuleDB).filter(
AlertRuleDB.id == rule_id
).first()
def get_active(
self,
topic_id: str = None,
user_id: str = None,
) -> List[AlertRuleDB]:
"""Holt alle aktiven Regeln, sortiert nach Priorität."""
query = self.db.query(AlertRuleDB).filter(
AlertRuleDB.is_active == True
)
if topic_id:
# Topic-spezifische und globale Regeln
query = query.filter(
or_(
AlertRuleDB.topic_id == topic_id,
AlertRuleDB.topic_id.is_(None)
)
)
if user_id:
query = query.filter(
or_(
AlertRuleDB.user_id == user_id,
AlertRuleDB.user_id.is_(None)
)
)
return query.order_by(AlertRuleDB.priority.desc()).all()
def get_all(
self,
user_id: str = None,
topic_id: str = None,
is_active: bool = None,
) -> List[AlertRuleDB]:
"""Holt alle Regeln mit optionalen Filtern."""
query = self.db.query(AlertRuleDB)
if user_id:
query = query.filter(AlertRuleDB.user_id == user_id)
if topic_id:
query = query.filter(AlertRuleDB.topic_id == topic_id)
if is_active is not None:
query = query.filter(AlertRuleDB.is_active == is_active)
return query.order_by(AlertRuleDB.priority.desc()).all()
# ==================== UPDATE ====================
def update(
self,
rule_id: str,
name: str = None,
description: str = None,
conditions: List[Dict] = None,
action_type: str = None,
action_config: Dict = None,
priority: int = None,
is_active: bool = None,
) -> Optional[AlertRuleDB]:
"""Aktualisiert eine Regel."""
rule = self.get_by_id(rule_id)
if not rule:
return None
if name is not None:
rule.name = name
if description is not None:
rule.description = description
if conditions is not None:
rule.conditions = conditions
if action_type is not None:
rule.action_type = RuleActionEnum(action_type)
if action_config is not None:
rule.action_config = action_config
if priority is not None:
rule.priority = priority
if is_active is not None:
rule.is_active = is_active
self.db.commit()
self.db.refresh(rule)
return rule
def increment_match_count(self, rule_id: str) -> Optional[AlertRuleDB]:
"""Erhöht den Match-Counter einer Regel."""
rule = self.get_by_id(rule_id)
if not rule:
return None
rule.match_count += 1
rule.last_matched_at = datetime.utcnow()
self.db.commit()
self.db.refresh(rule)
return rule
# ==================== DELETE ====================
def delete(self, rule_id: str) -> bool:
"""Löscht eine Regel."""
rule = self.get_by_id(rule_id)
if not rule:
return False
self.db.delete(rule)
self.db.commit()
return True
# ==================== CONVERSION ====================
def to_dict(self, rule: AlertRuleDB) -> Dict[str, Any]:
"""Konvertiert DB-Model zu Dictionary."""
return {
"id": rule.id,
"topic_id": rule.topic_id,
"user_id": rule.user_id,
"name": rule.name,
"description": rule.description,
"conditions": rule.conditions,
"action_type": rule.action_type.value,
"action_config": rule.action_config,
"priority": rule.priority,
"is_active": rule.is_active,
"stats": {
"match_count": rule.match_count,
"last_matched_at": rule.last_matched_at.isoformat() if rule.last_matched_at else None,
},
"created_at": rule.created_at.isoformat() if rule.created_at else None,
"updated_at": rule.updated_at.isoformat() if rule.updated_at else None,
}
# =============================================================================
# ALERT PROFILE REPOSITORY
# =============================================================================
class ProfileRepository:
"""Repository für Alert Profiles (Nutzer-Profile für Relevanz-Scoring)."""
def __init__(self, db: DBSession):
self.db = db
# ==================== CREATE / GET-OR-CREATE ====================
def get_or_create(self, user_id: str = None) -> AlertProfileDB:
"""Holt oder erstellt ein Profil."""
profile = self.get_by_user_id(user_id)
if profile:
return profile
# Neues Profil erstellen
import uuid
profile = AlertProfileDB(
id=str(uuid.uuid4()),
user_id=user_id,
name="Default" if not user_id else f"Profile {user_id[:8]}",
)
self.db.add(profile)
self.db.commit()
self.db.refresh(profile)
return profile
def create_default_education_profile(self, user_id: str = None) -> AlertProfileDB:
"""Erstellt ein Standard-Profil für Bildungsthemen."""
import uuid
profile = AlertProfileDB(
id=str(uuid.uuid4()),
user_id=user_id,
name="Bildung Default",
priorities=[
{
"label": "Inklusion",
"weight": 0.9,
"keywords": ["inklusiv", "Förderbedarf", "Behinderung", "Barrierefreiheit"],
"description": "Inklusive Bildung, Förderschulen, Nachteilsausgleich"
},
{
"label": "Datenschutz Schule",
"weight": 0.85,
"keywords": ["DSGVO", "Schülerfotos", "Einwilligung", "personenbezogene Daten"],
"description": "DSGVO in Schulen, Datenschutz bei Klassenfotos"
},
{
"label": "Schulrecht Bayern",
"weight": 0.8,
"keywords": ["BayEUG", "Schulordnung", "Kultusministerium", "Bayern"],
"description": "Bayerisches Schulrecht, Verordnungen"
},
{
"label": "Digitalisierung Schule",
"weight": 0.7,
"keywords": ["DigitalPakt", "Tablet-Klasse", "Lernplattform"],
"description": "Digitale Medien im Unterricht"
},
],
exclusions=["Stellenanzeige", "Praktikum gesucht", "Werbung", "Pressemitteilung"],
policies={
"prefer_german_sources": True,
"max_age_days": 30,
"min_content_length": 100,
}
)
self.db.add(profile)
self.db.commit()
self.db.refresh(profile)
return profile
# ==================== READ ====================
def get_by_id(self, profile_id: str) -> Optional[AlertProfileDB]:
"""Holt ein Profil nach ID."""
return self.db.query(AlertProfileDB).filter(
AlertProfileDB.id == profile_id
).first()
def get_by_user_id(self, user_id: str) -> Optional[AlertProfileDB]:
"""Holt ein Profil nach User-ID."""
if not user_id:
# Default-Profil ohne User
return self.db.query(AlertProfileDB).filter(
AlertProfileDB.user_id.is_(None)
).first()
return self.db.query(AlertProfileDB).filter(
AlertProfileDB.user_id == user_id
).first()
# ==================== UPDATE ====================
def update_priorities(
self,
profile_id: str,
priorities: List[Dict],
) -> Optional[AlertProfileDB]:
"""Aktualisiert die Prioritäten eines Profils."""
profile = self.get_by_id(profile_id)
if not profile:
return None
profile.priorities = priorities
self.db.commit()
self.db.refresh(profile)
return profile
def update_exclusions(
self,
profile_id: str,
exclusions: List[str],
) -> Optional[AlertProfileDB]:
"""Aktualisiert die Ausschlüsse eines Profils."""
profile = self.get_by_id(profile_id)
if not profile:
return None
profile.exclusions = exclusions
self.db.commit()
self.db.refresh(profile)
return profile
def add_feedback(
self,
profile_id: str,
title: str,
url: str,
is_relevant: bool,
reason: str = "",
) -> Optional[AlertProfileDB]:
"""Fügt Feedback als Beispiel hinzu."""
profile = self.get_by_id(profile_id)
if not profile:
return None
example = {
"title": title,
"url": url,
"reason": reason,
"added_at": datetime.utcnow().isoformat(),
}
if is_relevant:
examples = list(profile.positive_examples or [])
examples.append(example)
profile.positive_examples = examples[-20:] # Max 20
profile.total_kept += 1
flag_modified(profile, "positive_examples")
else:
examples = list(profile.negative_examples or [])
examples.append(example)
profile.negative_examples = examples[-20:] # Max 20
profile.total_dropped += 1
flag_modified(profile, "negative_examples")
profile.total_scored += 1
self.db.commit()
self.db.refresh(profile)
return profile
def update_stats(
self,
profile_id: str,
kept: int = 0,
dropped: int = 0,
) -> Optional[AlertProfileDB]:
"""Aktualisiert die Statistiken eines Profils."""
profile = self.get_by_id(profile_id)
if not profile:
return None
profile.total_scored += kept + dropped
profile.total_kept += kept
profile.total_dropped += dropped
self.db.commit()
self.db.refresh(profile)
return profile
# ==================== DELETE ====================
def delete(self, profile_id: str) -> bool:
"""Löscht ein Profil."""
profile = self.get_by_id(profile_id)
if not profile:
return False
self.db.delete(profile)
self.db.commit()
return True
# ==================== CONVERSION ====================
def to_dict(self, profile: AlertProfileDB) -> Dict[str, Any]:
"""Konvertiert DB-Model zu Dictionary."""
return {
"id": profile.id,
"user_id": profile.user_id,
"name": profile.name,
"priorities": profile.priorities,
"exclusions": profile.exclusions,
"policies": profile.policies,
"examples": {
"positive": len(profile.positive_examples or []),
"negative": len(profile.negative_examples or []),
},
"stats": {
"total_scored": profile.total_scored,
"total_kept": profile.total_kept,
"total_dropped": profile.total_dropped,
"accuracy_estimate": profile.accuracy_estimate,
},
"created_at": profile.created_at.isoformat() if profile.created_at else None,
"updated_at": profile.updated_at.isoformat() if profile.updated_at else None,
}