""" Rule Engine für Alerts Agent. Evaluiert Regeln gegen Alert-Items und führt Aktionen aus. Regel-Struktur: - Bedingungen: [{field, operator, value}, ...] (AND-verknüpft) - Aktion: keep, drop, tag, email, webhook, slack - Priorität: Höhere Priorität wird zuerst evaluiert """ import re import logging from dataclasses import dataclass from typing import List, Dict, Any, Optional, Callable from enum import Enum from alerts_agent.db.models import AlertItemDB, AlertRuleDB, RuleActionEnum logger = logging.getLogger(__name__) class ConditionOperator(str, Enum): """Operatoren für Regel-Bedingungen.""" CONTAINS = "contains" NOT_CONTAINS = "not_contains" EQUALS = "equals" NOT_EQUALS = "not_equals" STARTS_WITH = "starts_with" ENDS_WITH = "ends_with" REGEX = "regex" GREATER_THAN = "gt" LESS_THAN = "lt" GREATER_EQUAL = "gte" LESS_EQUAL = "lte" IN_LIST = "in" NOT_IN_LIST = "not_in" @dataclass class RuleCondition: """Eine einzelne Regel-Bedingung.""" field: str # "title", "snippet", "url", "source", "relevance_score" operator: ConditionOperator value: Any # str, float, list @classmethod def from_dict(cls, data: Dict) -> "RuleCondition": """Erstellt eine Bedingung aus einem Dict.""" return cls( field=data.get("field", ""), operator=ConditionOperator(data.get("operator", data.get("op", "contains"))), value=data.get("value", ""), ) @dataclass class RuleMatch: """Ergebnis einer Regel-Evaluierung.""" rule_id: str rule_name: str matched: bool action: RuleActionEnum action_config: Dict[str, Any] conditions_met: List[str] # Welche Bedingungen haben gematched def get_field_value(alert: AlertItemDB, field: str) -> Any: """ Extrahiert einen Feldwert aus einem Alert. Args: alert: Alert-Item field: Feldname Returns: Feldwert oder None """ field_map = { "title": alert.title, "snippet": alert.snippet, "url": alert.url, "source": alert.source.value if alert.source else "", "status": alert.status.value if alert.status else "", "relevance_score": alert.relevance_score, "relevance_decision": alert.relevance_decision.value if alert.relevance_decision else "", "lang": alert.lang, "topic_id": alert.topic_id, } return field_map.get(field) def evaluate_condition( alert: AlertItemDB, condition: RuleCondition, ) -> bool: """ Evaluiert eine einzelne Bedingung gegen einen Alert. Args: alert: Alert-Item condition: Zu evaluierende Bedingung Returns: True wenn Bedingung erfüllt """ field_value = get_field_value(alert, condition.field) if field_value is None: return False op = condition.operator target = condition.value try: # String-Operationen (case-insensitive) if isinstance(field_value, str): field_lower = field_value.lower() target_lower = str(target).lower() if isinstance(target, str) else target if op == ConditionOperator.CONTAINS: return target_lower in field_lower elif op == ConditionOperator.NOT_CONTAINS: return target_lower not in field_lower elif op == ConditionOperator.EQUALS: return field_lower == target_lower elif op == ConditionOperator.NOT_EQUALS: return field_lower != target_lower elif op == ConditionOperator.STARTS_WITH: return field_lower.startswith(target_lower) elif op == ConditionOperator.ENDS_WITH: return field_lower.endswith(target_lower) elif op == ConditionOperator.REGEX: try: return bool(re.search(str(target), field_value, re.IGNORECASE)) except re.error: logger.warning(f"Invalid regex pattern: {target}") return False elif op == ConditionOperator.IN_LIST: if isinstance(target, list): return any(t.lower() in field_lower for t in target if isinstance(t, str)) return False elif op == ConditionOperator.NOT_IN_LIST: if isinstance(target, list): return not any(t.lower() in field_lower for t in target if isinstance(t, str)) return True # Numerische Operationen elif isinstance(field_value, (int, float)): target_num = float(target) if target else 0 if op == ConditionOperator.EQUALS: return field_value == target_num elif op == ConditionOperator.NOT_EQUALS: return field_value != target_num elif op == ConditionOperator.GREATER_THAN: return field_value > target_num elif op == ConditionOperator.LESS_THAN: return field_value < target_num elif op == ConditionOperator.GREATER_EQUAL: return field_value >= target_num elif op == ConditionOperator.LESS_EQUAL: return field_value <= target_num except Exception as e: logger.error(f"Error evaluating condition: {e}") return False return False def evaluate_rule( alert: AlertItemDB, rule: AlertRuleDB, ) -> RuleMatch: """ Evaluiert eine Regel gegen einen Alert. Alle Bedingungen müssen erfüllt sein (AND-Verknüpfung). Args: alert: Alert-Item rule: Zu evaluierende Regel Returns: RuleMatch-Ergebnis """ conditions = rule.conditions or [] conditions_met = [] all_matched = True for cond_dict in conditions: condition = RuleCondition.from_dict(cond_dict) if evaluate_condition(alert, condition): conditions_met.append(f"{condition.field} {condition.operator.value} {condition.value}") else: all_matched = False # Wenn keine Bedingungen definiert sind, matcht die Regel immer if not conditions: all_matched = True return RuleMatch( rule_id=rule.id, rule_name=rule.name, matched=all_matched, action=rule.action_type, action_config=rule.action_config or {}, conditions_met=conditions_met, ) def evaluate_rules_for_alert( alert: AlertItemDB, rules: List[AlertRuleDB], ) -> Optional[RuleMatch]: """ Evaluiert alle Regeln gegen einen Alert und gibt den ersten Match zurück. Regeln werden nach Priorität (absteigend) evaluiert. Args: alert: Alert-Item rules: Liste von Regeln (sollte bereits nach Priorität sortiert sein) Returns: Erster RuleMatch oder None """ for rule in rules: if not rule.is_active: continue # Topic-Filter: Regel gilt nur für bestimmtes Topic if rule.topic_id and rule.topic_id != alert.topic_id: continue match = evaluate_rule(alert, rule) if match.matched: logger.debug( f"Rule '{rule.name}' matched alert '{alert.id[:8]}': " f"{match.conditions_met}" ) return match return None class RuleEngine: """ Rule Engine für Batch-Verarbeitung von Alerts. Verwendet für das Scoring von mehreren Alerts gleichzeitig. """ def __init__(self, db_session): """ Initialisiert die Rule Engine. Args: db_session: SQLAlchemy Session """ self.db = db_session self._rules_cache: Optional[List[AlertRuleDB]] = None def _get_active_rules(self) -> List[AlertRuleDB]: """Lädt aktive Regeln aus der Datenbank (cached).""" if self._rules_cache is None: from alerts_agent.db.repository import RuleRepository repo = RuleRepository(self.db) self._rules_cache = repo.get_active() return self._rules_cache def clear_cache(self) -> None: """Leert den Regel-Cache.""" self._rules_cache = None def process_alert( self, alert: AlertItemDB, ) -> Optional[RuleMatch]: """ Verarbeitet einen Alert mit allen aktiven Regeln. Args: alert: Alert-Item Returns: RuleMatch wenn eine Regel matcht, sonst None """ rules = self._get_active_rules() return evaluate_rules_for_alert(alert, rules) def process_alerts( self, alerts: List[AlertItemDB], ) -> Dict[str, RuleMatch]: """ Verarbeitet mehrere Alerts mit allen aktiven Regeln. Args: alerts: Liste von Alert-Items Returns: Dict von alert_id -> RuleMatch (nur für gematschte Alerts) """ rules = self._get_active_rules() results = {} for alert in alerts: match = evaluate_rules_for_alert(alert, rules) if match: results[alert.id] = match return results def apply_rule_actions( self, alert: AlertItemDB, match: RuleMatch, ) -> Dict[str, Any]: """ Wendet die Regel-Aktion auf einen Alert an. Args: alert: Alert-Item match: RuleMatch mit Aktionsinformationen Returns: Dict mit Ergebnis der Aktion """ from alerts_agent.db.repository import AlertItemRepository, RuleRepository alert_repo = AlertItemRepository(self.db) rule_repo = RuleRepository(self.db) action = match.action config = match.action_config result = {"action": action.value, "success": False} try: if action == RuleActionEnum.KEEP: # Alert als KEEP markieren alert_repo.update_scoring( alert_id=alert.id, score=1.0, decision="KEEP", reasons=["rule_match"], summary=f"Matched rule: {match.rule_name}", model="rule_engine", ) result["success"] = True elif action == RuleActionEnum.DROP: # Alert als DROP markieren alert_repo.update_scoring( alert_id=alert.id, score=0.0, decision="DROP", reasons=["rule_match"], summary=f"Dropped by rule: {match.rule_name}", model="rule_engine", ) result["success"] = True elif action == RuleActionEnum.TAG: # Tags hinzufügen tags = config.get("tags", []) if tags: existing_tags = alert.user_tags or [] new_tags = list(set(existing_tags + tags)) alert_repo.update(alert.id, user_tags=new_tags) result["tags_added"] = tags result["success"] = True elif action == RuleActionEnum.EMAIL: # E-Mail-Benachrichtigung senden # Wird von Actions-Modul behandelt result["email_config"] = config result["success"] = True result["deferred"] = True # Wird später gesendet elif action == RuleActionEnum.WEBHOOK: # Webhook aufrufen # Wird von Actions-Modul behandelt result["webhook_config"] = config result["success"] = True result["deferred"] = True elif action == RuleActionEnum.SLACK: # Slack-Nachricht senden # Wird von Actions-Modul behandelt result["slack_config"] = config result["success"] = True result["deferred"] = True # Match-Count erhöhen rule_repo.increment_match_count(match.rule_id) except Exception as e: logger.error(f"Error applying rule action: {e}") result["error"] = str(e) return result # Convenience-Funktionen für einfache Nutzung def create_keyword_rule( name: str, keywords: List[str], action: str = "keep", field: str = "title", ) -> Dict: """ Erstellt eine Keyword-basierte Regel. Args: name: Regelname keywords: Liste von Keywords (OR-verknüpft über IN_LIST) action: Aktion (keep, drop, tag) field: Feld zum Prüfen (title, snippet, url) Returns: Regel-Definition als Dict """ return { "name": name, "conditions": [ { "field": field, "operator": "in", "value": keywords, } ], "action_type": action, "action_config": {}, } def create_exclusion_rule( name: str, excluded_terms: List[str], field: str = "title", ) -> Dict: """ Erstellt eine Ausschluss-Regel. Args: name: Regelname excluded_terms: Liste von auszuschließenden Begriffen field: Feld zum Prüfen Returns: Regel-Definition als Dict """ return { "name": name, "conditions": [ { "field": field, "operator": "in", "value": excluded_terms, } ], "action_type": "drop", "action_config": {}, } def create_score_threshold_rule( name: str, min_score: float, action: str = "keep", ) -> Dict: """ Erstellt eine Score-basierte Regel. Args: name: Regelname min_score: Mindest-Score action: Aktion bei Erreichen des Scores Returns: Regel-Definition als Dict """ return { "name": name, "conditions": [ { "field": "relevance_score", "operator": "gte", "value": min_score, } ], "action_type": action, "action_config": {}, }