""" Rule Engine - Models, Condition Evaluation, and Convenience Functions. Datenmodelle und Evaluierungs-Logik fuer Alert-Regeln. """ import re import logging from dataclasses import dataclass from typing import List, Dict, Any, Optional from enum import Enum from alerts_agent.db.models import AlertItemDB, AlertRuleDB, RuleActionEnum logger = logging.getLogger(__name__) class ConditionOperator(str, Enum): """Operatoren für Regel-Bedingungen.""" CONTAINS = "contains" NOT_CONTAINS = "not_contains" EQUALS = "equals" NOT_EQUALS = "not_equals" STARTS_WITH = "starts_with" ENDS_WITH = "ends_with" REGEX = "regex" GREATER_THAN = "gt" LESS_THAN = "lt" GREATER_EQUAL = "gte" LESS_EQUAL = "lte" IN_LIST = "in" NOT_IN_LIST = "not_in" @dataclass class RuleCondition: """Eine einzelne Regel-Bedingung.""" field: str operator: ConditionOperator value: Any @classmethod def from_dict(cls, data: Dict) -> "RuleCondition": return cls( field=data.get("field", ""), operator=ConditionOperator(data.get("operator", data.get("op", "contains"))), value=data.get("value", ""), ) @dataclass class RuleMatch: """Ergebnis einer Regel-Evaluierung.""" rule_id: str rule_name: str matched: bool action: RuleActionEnum action_config: Dict[str, Any] conditions_met: List[str] def get_field_value(alert: AlertItemDB, field: str) -> Any: """Extrahiert einen Feldwert aus einem Alert.""" field_map = { "title": alert.title, "snippet": alert.snippet, "url": alert.url, "source": alert.source.value if alert.source else "", "status": alert.status.value if alert.status else "", "relevance_score": alert.relevance_score, "relevance_decision": alert.relevance_decision.value if alert.relevance_decision else "", "lang": alert.lang, "topic_id": alert.topic_id, } return field_map.get(field) def evaluate_condition(alert: AlertItemDB, condition: RuleCondition) -> bool: """Evaluiert eine einzelne Bedingung gegen einen Alert.""" field_value = get_field_value(alert, condition.field) if field_value is None: return False op = condition.operator target = condition.value try: if isinstance(field_value, str): field_lower = field_value.lower() target_lower = str(target).lower() if isinstance(target, str) else target if op == ConditionOperator.CONTAINS: return target_lower in field_lower elif op == ConditionOperator.NOT_CONTAINS: return target_lower not in field_lower elif op == ConditionOperator.EQUALS: return field_lower == target_lower elif op == ConditionOperator.NOT_EQUALS: return field_lower != target_lower elif op == ConditionOperator.STARTS_WITH: return field_lower.startswith(target_lower) elif op == ConditionOperator.ENDS_WITH: return field_lower.endswith(target_lower) elif op == ConditionOperator.REGEX: try: return bool(re.search(str(target), field_value, re.IGNORECASE)) except re.error: logger.warning(f"Invalid regex pattern: {target}") return False elif op == ConditionOperator.IN_LIST: if isinstance(target, list): return any(t.lower() in field_lower for t in target if isinstance(t, str)) return False elif op == ConditionOperator.NOT_IN_LIST: if isinstance(target, list): return not any(t.lower() in field_lower for t in target if isinstance(t, str)) return True elif isinstance(field_value, (int, float)): target_num = float(target) if target else 0 if op == ConditionOperator.EQUALS: return field_value == target_num elif op == ConditionOperator.NOT_EQUALS: return field_value != target_num elif op == ConditionOperator.GREATER_THAN: return field_value > target_num elif op == ConditionOperator.LESS_THAN: return field_value < target_num elif op == ConditionOperator.GREATER_EQUAL: return field_value >= target_num elif op == ConditionOperator.LESS_EQUAL: return field_value <= target_num except Exception as e: logger.error(f"Error evaluating condition: {e}") return False return False def evaluate_rule(alert: AlertItemDB, rule: AlertRuleDB) -> RuleMatch: """Evaluiert eine Regel gegen einen Alert (AND-Verknüpfung).""" conditions = rule.conditions or [] conditions_met = [] all_matched = True for cond_dict in conditions: condition = RuleCondition.from_dict(cond_dict) if evaluate_condition(alert, condition): conditions_met.append(f"{condition.field} {condition.operator.value} {condition.value}") else: all_matched = False if not conditions: all_matched = True return RuleMatch( rule_id=rule.id, rule_name=rule.name, matched=all_matched, action=rule.action_type, action_config=rule.action_config or {}, conditions_met=conditions_met, ) def evaluate_rules_for_alert(alert: AlertItemDB, rules: List[AlertRuleDB]) -> Optional[RuleMatch]: """Evaluiert alle Regeln gegen einen Alert und gibt den ersten Match zurück.""" for rule in rules: if not rule.is_active: continue if rule.topic_id and rule.topic_id != alert.topic_id: continue match = evaluate_rule(alert, rule) if match.matched: logger.debug(f"Rule '{rule.name}' matched alert '{alert.id[:8]}': {match.conditions_met}") return match return None # Convenience-Funktionen def create_keyword_rule(name: str, keywords: List[str], action: str = "keep", field: str = "title") -> Dict: """Erstellt eine Keyword-basierte Regel.""" return { "name": name, "conditions": [{"field": field, "operator": "in", "value": keywords}], "action_type": action, "action_config": {}, } def create_exclusion_rule(name: str, excluded_terms: List[str], field: str = "title") -> Dict: """Erstellt eine Ausschluss-Regel.""" return { "name": name, "conditions": [{"field": field, "operator": "in", "value": excluded_terms}], "action_type": "drop", "action_config": {}, } def create_score_threshold_rule(name: str, min_score: float, action: str = "keep") -> Dict: """Erstellt eine Score-basierte Regel.""" return { "name": name, "conditions": [{"field": "relevance_score", "operator": "gte", "value": min_score}], "action_type": action, "action_config": {}, }