""" Rules API Routes für Alerts Agent. CRUD-Operationen für Alert-Regeln. """ from typing import List, Optional, Dict, Any from datetime import datetime from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy.orm import Session as DBSession from alerts_agent.db import get_db from alerts_agent.db.repository import RuleRepository from alerts_agent.db.models import RuleActionEnum router = APIRouter(prefix="/rules", tags=["alerts"]) # ============================================================================= # PYDANTIC MODELS # ============================================================================= class RuleConditionModel(BaseModel): """Model für eine Regel-Bedingung.""" field: str = Field(..., description="Feld zum Prüfen (title, snippet, url, source, relevance_score)") operator: str = Field(..., alias="op", description="Operator (contains, not_contains, equals, regex, gt, lt, in)") value: Any = Field(..., description="Vergleichswert (String, Zahl, oder Liste)") class Config: populate_by_name = True class RuleCreate(BaseModel): """Request-Model für Regel-Erstellung.""" name: str = Field(..., min_length=1, max_length=255) description: str = Field(default="", max_length=2000) conditions: List[RuleConditionModel] = Field(default_factory=list) action_type: str = Field(default="keep", description="Aktion: keep, drop, tag, email, webhook, slack") action_config: Dict[str, Any] = Field(default_factory=dict) topic_id: Optional[str] = Field(default=None, description="Optional: Nur für bestimmtes Topic") priority: int = Field(default=0, ge=0, le=1000, description="Priorität (höher = wird zuerst evaluiert)") is_active: bool = Field(default=True) class RuleUpdate(BaseModel): """Request-Model für Regel-Update.""" name: Optional[str] = Field(default=None, min_length=1, max_length=255) description: Optional[str] = Field(default=None, max_length=2000) conditions: Optional[List[RuleConditionModel]] = None action_type: Optional[str] = None action_config: Optional[Dict[str, Any]] = None topic_id: Optional[str] = None priority: Optional[int] = Field(default=None, ge=0, le=1000) is_active: Optional[bool] = None class RuleResponse(BaseModel): """Response-Model für Regel.""" id: str name: str description: str conditions: List[Dict[str, Any]] action_type: str action_config: Dict[str, Any] topic_id: Optional[str] priority: int is_active: bool match_count: int last_matched_at: Optional[datetime] created_at: datetime updated_at: datetime class Config: from_attributes = True class RuleListResponse(BaseModel): """Response-Model für Regel-Liste.""" rules: List[RuleResponse] total: int class RuleTestRequest(BaseModel): """Request-Model für Regel-Test.""" title: str = Field(default="Test Title") snippet: str = Field(default="Test snippet content") url: str = Field(default="https://example.com/test") source: str = Field(default="rss_feed") relevance_score: Optional[float] = Field(default=None) class RuleTestResponse(BaseModel): """Response-Model für Regel-Test.""" rule_id: str rule_name: str matched: bool action: str conditions_met: List[str] # ============================================================================= # API ENDPOINTS # ============================================================================= @router.post("", response_model=RuleResponse, status_code=201) async def create_rule( rule: RuleCreate, db: DBSession = Depends(get_db), ) -> RuleResponse: """ Erstellt eine neue Regel. Regeln werden nach Priorität evaluiert. Höhere Priorität = wird zuerst geprüft. """ repo = RuleRepository(db) # Conditions zu Dict konvertieren conditions = [ {"field": c.field, "op": c.operator, "value": c.value} for c in rule.conditions ] created = repo.create( name=rule.name, description=rule.description, conditions=conditions, action_type=rule.action_type, action_config=rule.action_config, topic_id=rule.topic_id, priority=rule.priority, ) if not rule.is_active: repo.update(created.id, is_active=False) created = repo.get_by_id(created.id) return _to_rule_response(created) @router.get("", response_model=RuleListResponse) async def list_rules( is_active: Optional[bool] = None, topic_id: Optional[str] = None, db: DBSession = Depends(get_db), ) -> RuleListResponse: """ Listet alle Regeln auf. Regeln sind nach Priorität sortiert (höchste zuerst). """ repo = RuleRepository(db) if is_active is True: rules = repo.get_active() else: rules = repo.get_all() # Topic-Filter if topic_id: rules = [r for r in rules if r.topic_id == topic_id or r.topic_id is None] return RuleListResponse( rules=[_to_rule_response(r) for r in rules], total=len(rules), ) @router.get("/{rule_id}", response_model=RuleResponse) async def get_rule( rule_id: str, db: DBSession = Depends(get_db), ) -> RuleResponse: """ Ruft eine Regel nach ID ab. """ repo = RuleRepository(db) rule = repo.get_by_id(rule_id) if not rule: raise HTTPException(status_code=404, detail="Regel nicht gefunden") return _to_rule_response(rule) @router.put("/{rule_id}", response_model=RuleResponse) async def update_rule( rule_id: str, updates: RuleUpdate, db: DBSession = Depends(get_db), ) -> RuleResponse: """ Aktualisiert eine Regel. """ repo = RuleRepository(db) # Nur übergebene Werte updaten update_dict = {} if updates.name is not None: update_dict["name"] = updates.name if updates.description is not None: update_dict["description"] = updates.description if updates.conditions is not None: update_dict["conditions"] = [ {"field": c.field, "op": c.operator, "value": c.value} for c in updates.conditions ] if updates.action_type is not None: update_dict["action_type"] = updates.action_type if updates.action_config is not None: update_dict["action_config"] = updates.action_config if updates.topic_id is not None: update_dict["topic_id"] = updates.topic_id if updates.priority is not None: update_dict["priority"] = updates.priority if updates.is_active is not None: update_dict["is_active"] = updates.is_active if not update_dict: raise HTTPException(status_code=400, detail="Keine Updates angegeben") updated = repo.update(rule_id, **update_dict) if not updated: raise HTTPException(status_code=404, detail="Regel nicht gefunden") return _to_rule_response(updated) @router.delete("/{rule_id}", status_code=204) async def delete_rule( rule_id: str, db: DBSession = Depends(get_db), ): """ Löscht eine Regel. """ repo = RuleRepository(db) success = repo.delete(rule_id) if not success: raise HTTPException(status_code=404, detail="Regel nicht gefunden") return None @router.post("/{rule_id}/activate", response_model=RuleResponse) async def activate_rule( rule_id: str, db: DBSession = Depends(get_db), ) -> RuleResponse: """ Aktiviert eine Regel. """ repo = RuleRepository(db) updated = repo.update(rule_id, is_active=True) if not updated: raise HTTPException(status_code=404, detail="Regel nicht gefunden") return _to_rule_response(updated) @router.post("/{rule_id}/deactivate", response_model=RuleResponse) async def deactivate_rule( rule_id: str, db: DBSession = Depends(get_db), ) -> RuleResponse: """ Deaktiviert eine Regel. """ repo = RuleRepository(db) updated = repo.update(rule_id, is_active=False) if not updated: raise HTTPException(status_code=404, detail="Regel nicht gefunden") return _to_rule_response(updated) @router.post("/{rule_id}/test", response_model=RuleTestResponse) async def test_rule( rule_id: str, test_data: RuleTestRequest, db: DBSession = Depends(get_db), ) -> RuleTestResponse: """ Testet eine Regel gegen Testdaten. Nützlich um Regeln vor der Aktivierung zu testen. """ from alerts_agent.processing.rule_engine import evaluate_rule from alerts_agent.db.models import AlertItemDB, AlertSourceEnum, AlertStatusEnum repo = RuleRepository(db) rule = repo.get_by_id(rule_id) if not rule: raise HTTPException(status_code=404, detail="Regel nicht gefunden") # Mock-Alert für Test erstellen mock_alert = AlertItemDB( id="test-alert", topic_id="test-topic", title=test_data.title, snippet=test_data.snippet, url=test_data.url, url_hash="test-hash", source=AlertSourceEnum(test_data.source) if test_data.source else AlertSourceEnum.RSS_FEED, status=AlertStatusEnum.NEW, relevance_score=test_data.relevance_score, ) # Regel evaluieren match = evaluate_rule(mock_alert, rule) return RuleTestResponse( rule_id=match.rule_id, rule_name=match.rule_name, matched=match.matched, action=match.action.value, conditions_met=match.conditions_met, ) @router.post("/test-all", response_model=List[RuleTestResponse]) async def test_all_rules( test_data: RuleTestRequest, db: DBSession = Depends(get_db), ) -> List[RuleTestResponse]: """ Testet alle aktiven Regeln gegen Testdaten. Zeigt welche Regeln matchen würden. """ from alerts_agent.processing.rule_engine import evaluate_rules_for_alert, evaluate_rule from alerts_agent.db.models import AlertItemDB, AlertSourceEnum, AlertStatusEnum repo = RuleRepository(db) rules = repo.get_active() # Mock-Alert für Test erstellen mock_alert = AlertItemDB( id="test-alert", topic_id="test-topic", title=test_data.title, snippet=test_data.snippet, url=test_data.url, url_hash="test-hash", source=AlertSourceEnum(test_data.source) if test_data.source else AlertSourceEnum.RSS_FEED, status=AlertStatusEnum.NEW, relevance_score=test_data.relevance_score, ) results = [] for rule in rules: match = evaluate_rule(mock_alert, rule) results.append(RuleTestResponse( rule_id=match.rule_id, rule_name=match.rule_name, matched=match.matched, action=match.action.value, conditions_met=match.conditions_met, )) return results # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def _to_rule_response(rule) -> RuleResponse: """Konvertiert ein Rule-DB-Objekt zu RuleResponse.""" return RuleResponse( id=rule.id, name=rule.name, description=rule.description or "", conditions=rule.conditions or [], action_type=rule.action_type.value if rule.action_type else "keep", action_config=rule.action_config or {}, topic_id=rule.topic_id, priority=rule.priority, is_active=rule.is_active, match_count=rule.match_count, last_matched_at=rule.last_matched_at, created_at=rule.created_at, updated_at=rule.updated_at, ) # ============================================================================= # PRESET RULES # ============================================================================= PRESET_RULES = { "exclude_jobs": { "name": "Stellenanzeigen ausschließen", "description": "Filtert Stellenanzeigen und Job-Postings", "conditions": [ {"field": "title", "op": "in", "value": ["Stellenanzeige", "Job", "Karriere", "Praktikum", "Werkstudent", "Ausbildung", "Referendariat"]} ], "action_type": "drop", "priority": 100, }, "exclude_ads": { "name": "Werbung ausschließen", "description": "Filtert Werbung und Pressemitteilungen", "conditions": [ {"field": "title", "op": "in", "value": ["Werbung", "Anzeige", "Pressemitteilung", "PR:", "Sponsored"]} ], "action_type": "drop", "priority": 100, }, "keep_inklusion": { "name": "Inklusion behalten", "description": "Behält Artikel zum Thema Inklusion", "conditions": [ {"field": "title", "op": "in", "value": ["Inklusion", "inklusiv", "Förderbedarf", "Förderschule", "Nachteilsausgleich"]} ], "action_type": "keep", "priority": 50, }, "keep_datenschutz": { "name": "Datenschutz behalten", "description": "Behält Artikel zum Thema Datenschutz in Schulen", "conditions": [ {"field": "title", "op": "in", "value": ["DSGVO", "Datenschutz", "Schülerfotos", "personenbezogen"]} ], "action_type": "keep", "priority": 50, }, } @router.get("/presets/list") async def list_preset_rules() -> Dict[str, Any]: """ Listet verfügbare Regel-Vorlagen auf. """ return { "presets": [ {"id": key, **value} for key, value in PRESET_RULES.items() ] } @router.post("/presets/{preset_id}/apply", response_model=RuleResponse) async def apply_preset_rule( preset_id: str, db: DBSession = Depends(get_db), ) -> RuleResponse: """ Wendet eine Regel-Vorlage an (erstellt die Regel). """ if preset_id not in PRESET_RULES: raise HTTPException(status_code=404, detail="Preset nicht gefunden") preset = PRESET_RULES[preset_id] repo = RuleRepository(db) created = repo.create( name=preset["name"], description=preset.get("description", ""), conditions=preset["conditions"], action_type=preset["action_type"], priority=preset.get("priority", 0), ) return _to_rule_response(created)