This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/agent-core/tests/test_task_router.py
BreakPilot Dev 19855efacc
Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
feat: BreakPilot PWA - Full codebase (clean push without large binaries)
All services: admin-v2, studio-v2, website, ai-compliance-sdk,
consent-service, klausur-service, voice-service, and infrastructure.
Large PDFs and compiled binaries excluded via .gitignore.
2026-02-11 13:25:58 +01:00

204 lines
5.7 KiB
Python

"""
Tests for Task Router
Tests cover:
- Intent-based routing
- Routing rules
- Fallback handling
- Routing statistics
"""
import pytest
import asyncio
from unittest.mock import MagicMock
import sys
sys.path.insert(0, str(__file__).rsplit('/tests/', 1)[0])
from orchestrator.task_router import (
TaskRouter,
RoutingRule,
RoutingResult,
RoutingStrategy,
)
class TestRoutingRule:
"""Tests for RoutingRule dataclass"""
def test_rule_creation(self):
"""Rule should be created correctly"""
rule = RoutingRule(
intent_pattern="learning_*",
agent_type="tutor-agent",
priority=10
)
assert rule.intent_pattern == "learning_*"
assert rule.agent_type == "tutor-agent"
assert rule.priority == 10
def test_rule_matches_exact(self):
"""Rule should match exact intent"""
rule = RoutingRule(
intent_pattern="grade_exam",
agent_type="grader-agent"
)
assert rule.matches("grade_exam", {}) is True
assert rule.matches("grade_quiz", {}) is False
def test_rule_matches_wildcard(self):
"""Rule should match wildcard patterns"""
rule = RoutingRule(
intent_pattern="learning_*",
agent_type="tutor-agent"
)
assert rule.matches("learning_math", {}) is True
assert rule.matches("learning_english", {}) is True
assert rule.matches("grading_math", {}) is False
def test_rule_matches_conditions(self):
"""Rule should check conditions"""
rule = RoutingRule(
intent_pattern="*",
agent_type="vip-agent",
conditions={"is_vip": True}
)
assert rule.matches("any_intent", {"is_vip": True}) is True
assert rule.matches("any_intent", {"is_vip": False}) is False
assert rule.matches("any_intent", {}) is False
class TestRoutingResult:
"""Tests for RoutingResult dataclass"""
def test_successful_result(self):
"""Should create successful routing result"""
result = RoutingResult(
success=True,
agent_id="tutor-1",
agent_type="tutor-agent",
reason="Primary agent selected"
)
assert result.success is True
assert result.agent_id == "tutor-1"
assert result.is_fallback is False
def test_fallback_result(self):
"""Should indicate fallback routing"""
result = RoutingResult(
success=True,
agent_id="backup-1",
agent_type="backup-agent",
is_fallback=True,
reason="Fallback used"
)
assert result.success is True
assert result.is_fallback is True
def test_failed_result(self):
"""Should create failed routing result"""
result = RoutingResult(
success=False,
reason="No agents available"
)
assert result.success is False
assert result.agent_id is None
class TestTaskRouter:
"""Tests for TaskRouter"""
@pytest.fixture
def router(self):
"""Create a task router without supervisor"""
return TaskRouter(supervisor=None)
def test_default_rules_exist(self, router):
"""Router should have default rules"""
rules = router.get_rules()
assert len(rules) > 0
def test_add_rule(self, router):
"""Should add new routing rule"""
original_count = len(router.rules)
router.add_rule(RoutingRule(
intent_pattern="custom_*",
agent_type="custom-agent",
priority=100
))
assert len(router.rules) == original_count + 1
def test_rules_sorted_by_priority(self, router):
"""Rules should be sorted by priority (high first)"""
router.add_rule(RoutingRule(
intent_pattern="low_*",
agent_type="low-agent",
priority=1
))
router.add_rule(RoutingRule(
intent_pattern="high_*",
agent_type="high-agent",
priority=100
))
# Highest priority should be first
assert router.rules[0].priority >= router.rules[-1].priority
def test_remove_rule(self, router):
"""Should remove routing rule"""
router.add_rule(RoutingRule(
intent_pattern="removable_*",
agent_type="temp-agent"
))
result = router.remove_rule("removable_*")
assert result is True
def test_find_matching_rules(self, router):
"""Should find rules matching intent"""
matching = router.find_matching_rules("learning_math")
assert len(matching) > 0
assert any(r["agent_type"] == "tutor-agent" for r in matching)
def test_get_routing_stats_empty(self, router):
"""Should return empty stats initially"""
stats = router.get_routing_stats()
assert stats["total_routes"] == 0
def test_set_default_route(self, router):
"""Should set default agent for type"""
router.set_default_route("tutor-agent", "tutor-primary")
assert router._default_routes["tutor-agent"] == "tutor-primary"
def test_clear_history(self, router):
"""Should clear routing history"""
# Add some history
router._routing_history.append({"test": "data"})
router.clear_history()
assert len(router._routing_history) == 0
class TestRoutingStrategy:
"""Tests for RoutingStrategy enum"""
def test_strategy_values(self):
"""Strategies should have expected values"""
assert RoutingStrategy.DIRECT.value == "direct"
assert RoutingStrategy.ROUND_ROBIN.value == "round_robin"
assert RoutingStrategy.LEAST_LOADED.value == "least_loaded"
assert RoutingStrategy.PRIORITY.value == "priority"