"""Tests for OntologyRegistry — DB-backed action/object normalization.""" import time from unittest.mock import MagicMock, patch import pytest from services.ontology_registry import OntologyRegistry, _CACHE_TTL_SECONDS # ── Mock DB data ────────────────────────────────────────────────────── _MOCK_ACTION_TYPES = [ ("implement", "implementation"), ("monitor", "monitoring"), ("prevent", "implementation"), ("exclude", "implementation"), ("test", "testing"), ("encrypt", "implementation"), ("document", "evidence"), ("train", "training"), ] _MOCK_ACTION_SYNONYMS = [ # (canonical_action, synonym, pattern_type) ("implement", "implementieren", "alias"), ("implement", "umsetzen", "alias"), ("implement", "einführen", "alias"), ("monitor", "überwachen", "alias"), ("test", "testen", "alias"), ("encrypt", "verschlüsseln", "alias"), ("document", "dokumentieren", "alias"), ("train", "schulen", "alias"), # Negative patterns ("exclude", "dürfen nicht", "negative_pattern"), ("exclude", "darf nicht", "negative_pattern"), ("prevent", "verhindern", "negative_pattern"), ("prevent", "nicht gespeichert", "negative_pattern"), ] _MOCK_OBJECT_SYNONYMS = [ ("multi_factor_auth", "mfa"), ("multi_factor_auth", "2fa"), ("password_policy", "passwort"), ("encryption", "verschlüsselung"), ("audit_logging", "audit-log"), ("firewall", "firewall"), ("personal_data", "personenbezogene daten"), ] def _mock_execute(query): """Route mock queries to correct test data.""" q = str(query) mock_result = MagicMock() if "action_types" in q: mock_result.fetchall.return_value = _MOCK_ACTION_TYPES elif "action_synonyms" in q: mock_result.fetchall.return_value = _MOCK_ACTION_SYNONYMS elif "object_synonyms" in q: mock_result.fetchall.return_value = _MOCK_OBJECT_SYNONYMS else: mock_result.fetchall.return_value = [] return mock_result @pytest.fixture def registry(): """Create a registry with mocked DB.""" reg = OntologyRegistry() with patch("services.ontology_registry.SessionLocal") as mock_cls: mock_session = MagicMock() mock_session.execute = _mock_execute mock_cls.return_value = mock_session reg._load() return reg # ── classify_action tests ──────────────────────────────────────────── class TestClassifyAction: def test_direct_alias(self, registry): assert registry.classify_action("implementieren") == "implement" assert registry.classify_action("überwachen") == "monitor" assert registry.classify_action("testen") == "test" def test_case_insensitive(self, registry): assert registry.classify_action("IMPLEMENTIEREN") == "implement" def test_negative_pattern(self, registry): assert registry.classify_action("dürfen nicht verwendet werden") == "exclude" assert registry.classify_action("darf nicht gespeichert werden") == "prevent" def test_negative_pattern_priority(self, registry): # "nicht gespeichert" is more specific than "darf nicht" assert registry.classify_action("nicht gespeichert") == "prevent" def test_substring_match(self, registry): assert registry.classify_action("Maßnahmen implementieren und dokumentieren") == "implement" def test_unknown_defaults_to_implement(self, registry): assert registry.classify_action("fliegen") == "implement" # ── get_phase tests ────────────────────────────────────────────────── class TestGetPhase: def test_known_phase(self, registry): assert registry.get_phase("implement") == "implementation" assert registry.get_phase("monitor") == "monitoring" assert registry.get_phase("test") == "testing" def test_unknown_defaults_to_implementation(self, registry): assert registry.get_phase("unknown_action") == "implementation" # ── normalize_action tests ─────────────────────────────────────────── class TestNormalizeAction: def test_exact_match(self, registry): assert registry.normalize_action("implementieren") == "implement" assert registry.normalize_action("testen") == "test" def test_empty(self, registry): assert registry.normalize_action("") == "" def test_passthrough_unknown(self, registry): assert registry.normalize_action("fliegen") == "fliegen" # ── normalize_object tests ─────────────────────────────────────────── class TestNormalizeObject: def test_exact_match(self, registry): assert registry.normalize_object("mfa") == "multi_factor_auth" assert registry.normalize_object("2fa") == "multi_factor_auth" assert registry.normalize_object("passwort") == "password_policy" def test_case_insensitive(self, registry): assert registry.normalize_object("MFA") == "multi_factor_auth" def test_substring_match(self, registry): assert registry.normalize_object("die personenbezogene daten verarbeiten") == "personal_data" def test_empty(self, registry): assert registry.normalize_object("") == "" def test_unknown_passthrough(self, registry): assert registry.normalize_object("raumschiff") == "raumschiff" # ── Cache behavior tests ──────────────────────────────────────────── class TestCacheBehavior: def test_fresh_cache_not_stale(self, registry): assert registry._is_stale() is False def test_old_cache_is_stale(self, registry): registry._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1 assert registry._is_stale() is True # ── Migration data consistency ─────────────────────────────────────── class TestF2MigrationData: def test_build_action_types(self): from scripts.f2_migrate_actions import build_action_types types = build_action_types() assert len(types) >= 26 names = {t["canonical_name"] for t in types} assert "implement" in names assert "monitor" in names assert "encrypt" in names def test_build_action_synonyms(self): from scripts.f2_migrate_actions import build_action_synonyms synonyms = build_action_synonyms() assert len(synonyms) > 100 # Check pattern types aliases = [s for s in synonyms if s["pattern_type"] == "alias"] negatives = [s for s in synonyms if s["pattern_type"] == "negative_pattern"] assert len(aliases) > 80 assert len(negatives) > 15 def test_no_duplicate_synonyms(self): from scripts.f2_migrate_actions import build_action_synonyms synonyms = build_action_synonyms() keys = [(s["synonym"], s["language"], s["pattern_type"]) for s in synonyms] assert len(keys) == len(set(keys)) def test_all_canonical_actions_exist(self): from scripts.f2_migrate_actions import build_action_types, build_action_synonyms type_names = {t["canonical_name"] for t in build_action_types()} synonyms = build_action_synonyms() for s in synonyms: assert s["canonical_action"] in type_names, ( "Synonym '%s' references unknown action '%s'" % (s["synonym"], s["canonical_action"]) ) class TestF3MigrationData: def test_build_object_rows(self): from scripts.f3_migrate_objects import build_rows rows = build_rows() assert len(rows) >= 70 def test_no_duplicate_objects(self): from scripts.f3_migrate_objects import build_rows rows = build_rows() keys = [(r["synonym"], r["language"]) for r in rows] assert len(keys) == len(set(keys)) def test_known_objects_present(self): from scripts.f3_migrate_objects import build_rows rows = build_rows() synonyms = {r["synonym"] for r in rows} assert "mfa" in synonyms assert "passwort" in synonyms assert "firewall" in synonyms