Files
breakpilot-core/control-pipeline/tests/test_ontology_registry.py
T
Benjamin Admin 652e3a65a3
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 36s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 31s
feat(pipeline): F2+F3 action/object ontology — DB-backed normalization
Migrates ACTION_TYPES (26+8 types), _NEGATIVE_PATTERNS (22), _ACTION_SYNONYMS
(65), and _OBJECT_SYNONYMS (75) from hardcoded dicts to DB tables.

- SQL migration: 003_action_object_ontology.sql (3 tables)
- Migration scripts: f2_migrate_actions.py (34 types, 145 synonyms), f3_migrate_objects.py (75 objects)
- OntologyRegistry cache: 5min TTL, raises RuntimeError if empty (safe fallback to dicts)
- control_ontology.classify_action/get_phase delegate to DB with dict fallback
- control_dedup.normalize_action/normalize_object delegate to DB with dict fallback
- 25 new tests, 446 total pass, 0 regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 23:47:53 +02:00

227 lines
8.4 KiB
Python

"""Tests for OntologyRegistry — DB-backed action/object normalization."""
import time
from unittest.mock import MagicMock, patch
import pytest
from services.ontology_registry import OntologyRegistry, _CACHE_TTL_SECONDS
# ── Mock DB data ──────────────────────────────────────────────────────
_MOCK_ACTION_TYPES = [
("implement", "implementation"),
("monitor", "monitoring"),
("prevent", "implementation"),
("exclude", "implementation"),
("test", "testing"),
("encrypt", "implementation"),
("document", "evidence"),
("train", "training"),
]
_MOCK_ACTION_SYNONYMS = [
# (canonical_action, synonym, pattern_type)
("implement", "implementieren", "alias"),
("implement", "umsetzen", "alias"),
("implement", "einführen", "alias"),
("monitor", "überwachen", "alias"),
("test", "testen", "alias"),
("encrypt", "verschlüsseln", "alias"),
("document", "dokumentieren", "alias"),
("train", "schulen", "alias"),
# Negative patterns
("exclude", "dürfen nicht", "negative_pattern"),
("exclude", "darf nicht", "negative_pattern"),
("prevent", "verhindern", "negative_pattern"),
("prevent", "nicht gespeichert", "negative_pattern"),
]
_MOCK_OBJECT_SYNONYMS = [
("multi_factor_auth", "mfa"),
("multi_factor_auth", "2fa"),
("password_policy", "passwort"),
("encryption", "verschlüsselung"),
("audit_logging", "audit-log"),
("firewall", "firewall"),
("personal_data", "personenbezogene daten"),
]
def _mock_execute(query):
"""Route mock queries to correct test data."""
q = str(query)
mock_result = MagicMock()
if "action_types" in q:
mock_result.fetchall.return_value = _MOCK_ACTION_TYPES
elif "action_synonyms" in q:
mock_result.fetchall.return_value = _MOCK_ACTION_SYNONYMS
elif "object_synonyms" in q:
mock_result.fetchall.return_value = _MOCK_OBJECT_SYNONYMS
else:
mock_result.fetchall.return_value = []
return mock_result
@pytest.fixture
def registry():
"""Create a registry with mocked DB."""
reg = OntologyRegistry()
with patch("services.ontology_registry.SessionLocal") as mock_cls:
mock_session = MagicMock()
mock_session.execute = _mock_execute
mock_cls.return_value = mock_session
reg._load()
return reg
# ── classify_action tests ────────────────────────────────────────────
class TestClassifyAction:
def test_direct_alias(self, registry):
assert registry.classify_action("implementieren") == "implement"
assert registry.classify_action("überwachen") == "monitor"
assert registry.classify_action("testen") == "test"
def test_case_insensitive(self, registry):
assert registry.classify_action("IMPLEMENTIEREN") == "implement"
def test_negative_pattern(self, registry):
assert registry.classify_action("dürfen nicht verwendet werden") == "exclude"
assert registry.classify_action("darf nicht gespeichert werden") == "prevent"
def test_negative_pattern_priority(self, registry):
# "nicht gespeichert" is more specific than "darf nicht"
assert registry.classify_action("nicht gespeichert") == "prevent"
def test_substring_match(self, registry):
assert registry.classify_action("Maßnahmen implementieren und dokumentieren") == "implement"
def test_unknown_defaults_to_implement(self, registry):
assert registry.classify_action("fliegen") == "implement"
# ── get_phase tests ──────────────────────────────────────────────────
class TestGetPhase:
def test_known_phase(self, registry):
assert registry.get_phase("implement") == "implementation"
assert registry.get_phase("monitor") == "monitoring"
assert registry.get_phase("test") == "testing"
def test_unknown_defaults_to_implementation(self, registry):
assert registry.get_phase("unknown_action") == "implementation"
# ── normalize_action tests ───────────────────────────────────────────
class TestNormalizeAction:
def test_exact_match(self, registry):
assert registry.normalize_action("implementieren") == "implement"
assert registry.normalize_action("testen") == "test"
def test_empty(self, registry):
assert registry.normalize_action("") == ""
def test_passthrough_unknown(self, registry):
assert registry.normalize_action("fliegen") == "fliegen"
# ── normalize_object tests ───────────────────────────────────────────
class TestNormalizeObject:
def test_exact_match(self, registry):
assert registry.normalize_object("mfa") == "multi_factor_auth"
assert registry.normalize_object("2fa") == "multi_factor_auth"
assert registry.normalize_object("passwort") == "password_policy"
def test_case_insensitive(self, registry):
assert registry.normalize_object("MFA") == "multi_factor_auth"
def test_substring_match(self, registry):
assert registry.normalize_object("die personenbezogene daten verarbeiten") == "personal_data"
def test_empty(self, registry):
assert registry.normalize_object("") == ""
def test_unknown_passthrough(self, registry):
assert registry.normalize_object("raumschiff") == "raumschiff"
# ── Cache behavior tests ────────────────────────────────────────────
class TestCacheBehavior:
def test_fresh_cache_not_stale(self, registry):
assert registry._is_stale() is False
def test_old_cache_is_stale(self, registry):
registry._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1
assert registry._is_stale() is True
# ── Migration data consistency ───────────────────────────────────────
class TestF2MigrationData:
def test_build_action_types(self):
from scripts.f2_migrate_actions import build_action_types
types = build_action_types()
assert len(types) >= 26
names = {t["canonical_name"] for t in types}
assert "implement" in names
assert "monitor" in names
assert "encrypt" in names
def test_build_action_synonyms(self):
from scripts.f2_migrate_actions import build_action_synonyms
synonyms = build_action_synonyms()
assert len(synonyms) > 100
# Check pattern types
aliases = [s for s in synonyms if s["pattern_type"] == "alias"]
negatives = [s for s in synonyms if s["pattern_type"] == "negative_pattern"]
assert len(aliases) > 80
assert len(negatives) > 15
def test_no_duplicate_synonyms(self):
from scripts.f2_migrate_actions import build_action_synonyms
synonyms = build_action_synonyms()
keys = [(s["synonym"], s["language"], s["pattern_type"]) for s in synonyms]
assert len(keys) == len(set(keys))
def test_all_canonical_actions_exist(self):
from scripts.f2_migrate_actions import build_action_types, build_action_synonyms
type_names = {t["canonical_name"] for t in build_action_types()}
synonyms = build_action_synonyms()
for s in synonyms:
assert s["canonical_action"] in type_names, (
"Synonym '%s' references unknown action '%s'" % (s["synonym"], s["canonical_action"])
)
class TestF3MigrationData:
def test_build_object_rows(self):
from scripts.f3_migrate_objects import build_rows
rows = build_rows()
assert len(rows) >= 70
def test_no_duplicate_objects(self):
from scripts.f3_migrate_objects import build_rows
rows = build_rows()
keys = [(r["synonym"], r["language"]) for r in rows]
assert len(keys) == len(set(keys))
def test_known_objects_present(self):
from scripts.f3_migrate_objects import build_rows
rows = build_rows()
synonyms = {r["synonym"] for r in rows}
assert "mfa" in synonyms
assert "passwort" in synonyms
assert "firewall" in synonyms