"""Tests for RegulationRegistry — DB-backed lookup with cache and fallback.""" import time from unittest.mock import patch, MagicMock import pytest from services.regulation_registry import ( RegulationRegistry, _CACHE_TTL_SECONDS, ) # ── Test data: simulates DB rows ────────────────────────────────────────── _MOCK_DB_ROWS = [ # (regulation_id, regulation_name_de, license_rule, license_type, # attribution, source_type, jurisdiction, status) ("eu_2016_679", "DSGVO (EU) 2016/679", 1, "EU_LAW", None, "law", "EU", "active"), ("nist_sp_800_53", "NIST SP 800-53 Rev. 5", 1, "NIST_PUBLIC_DOMAIN", None, "standard", "US", "active"), ("owasp_asvs", "OWASP ASVS 4.0", 2, "CC-BY-SA-4.0", "OWASP Foundation, CC BY-SA 4.0", "standard", "INT", "active"), ("bdsg", "Bundesdatenschutzgesetz (BDSG)", 1, "DE_LAW", None, "law", "DE", "active"), ("at_dsg", "Österreichisches Datenschutzgesetz (DSG)", 1, "AT_LAW", None, "law", "AT", "active"), ] def _mock_db_execute(query): """Mock that returns our test rows.""" mock_result = MagicMock() mock_result.fetchall.return_value = _MOCK_DB_ROWS return mock_result @pytest.fixture def registry(): """Create a registry with mocked DB.""" reg = RegulationRegistry() with patch("services.regulation_registry.SessionLocal") as mock_session_cls: mock_session = MagicMock() mock_session.execute = _mock_db_execute mock_session_cls.return_value = mock_session reg._load() return reg # ── classify_regulation tests ───────────────────────────────────────────── class TestClassifyRegulation: def test_exact_match_eu_law(self, registry): result = registry.classify_regulation("eu_2016_679") assert result["rule"] == 1 assert result["license"] == "EU_LAW" assert result["source_type"] == "law" assert result["name"] == "DSGVO (EU) 2016/679" def test_exact_match_case_insensitive(self, registry): result = registry.classify_regulation("EU_2016_679") assert result["rule"] == 1 assert result["name"] == "DSGVO (EU) 2016/679" def test_exact_match_with_whitespace(self, registry): result = registry.classify_regulation(" eu_2016_679 ") assert result["rule"] == 1 def test_nist_standard(self, registry): result = registry.classify_regulation("nist_sp_800_53") assert result["rule"] == 1 assert result["source_type"] == "standard" def test_owasp_rule2(self, registry): result = registry.classify_regulation("owasp_asvs") assert result["rule"] == 2 assert result["attribution"] == "OWASP Foundation, CC BY-SA 4.0" def test_german_law(self, registry): result = registry.classify_regulation("bdsg") assert result["rule"] == 1 assert result["source_type"] == "law" assert result["jurisdiction"] == "DE" def test_austrian_law(self, registry): result = registry.classify_regulation("at_dsg") assert result["rule"] == 1 assert result["jurisdiction"] == "AT" def test_prefix_enisa_rule2(self, registry): result = registry.classify_regulation("enisa_supply_chain_2024") assert result["rule"] == 2 assert result["source_type"] == "standard" assert "ENISA" in result["attribution"] def test_prefix_bsi_rule3(self, registry): result = registry.classify_regulation("bsi_tr_03161") assert result["rule"] == 3 assert result["source_type"] == "restricted" assert result["name"] == "INTERNAL_ONLY" def test_prefix_iso_rule3(self, registry): result = registry.classify_regulation("iso_27001") assert result["rule"] == 3 assert result["source_type"] == "restricted" def test_prefix_etsi_rule3(self, registry): result = registry.classify_regulation("etsi_en_303_645") assert result["rule"] == 3 def test_unknown_defaults_to_restricted(self, registry): result = registry.classify_regulation("some_unknown_regulation") assert result["rule"] == 3 assert result["source_type"] == "restricted" assert result["license"] == "UNKNOWN" # ── source_type_by_name tests ──────────────────────────────────────────── class TestSourceTypeByName: def test_exact_match_law(self, registry): result = registry.source_type_by_name("DSGVO (EU) 2016/679") assert result == "law" def test_exact_match_standard(self, registry): result = registry.source_type_by_name("NIST SP 800-53 Rev. 5") assert result == "standard" def test_empty_returns_framework(self, registry): assert registry.source_type_by_name("") == "framework" assert registry.source_type_by_name(None) == "framework" def test_heuristic_law(self, registry): assert registry.source_type_by_name("Verordnung XYZ") == "law" assert registry.source_type_by_name("Some EU Directive") == "law" def test_heuristic_guideline(self, registry): assert registry.source_type_by_name("EDPB Leitlinie 99/2025") == "guideline" assert registry.source_type_by_name("BSI Standard 200-1") == "guideline" def test_heuristic_framework(self, registry): # "ENISA Cloud Guidelines" matches "guideline" before "enisa" in heuristic order assert registry.source_type_by_name("ENISA Cloud Report") == "framework" assert registry.source_type_by_name("OWASP Testing Guide") == "framework" def test_unknown_returns_framework(self, registry): assert registry.source_type_by_name("Completely Unknown Document") == "framework" # ── is_open_source tests ─────────────��─────────────────────────────────── class TestIsOpenSource: def test_rule1_is_open(self, registry): assert registry.is_open_source("eu_2016_679") is True def test_rule2_is_open(self, registry): assert registry.is_open_source("owasp_asvs") is True def test_rule3_is_not_open(self, registry): assert registry.is_open_source("bsi_tr_03161") is False def test_unknown_is_not_open(self, registry): assert registry.is_open_source("unknown_thing") is False # ── Cache behavior tests ──────��────────────────────────────────────────── class TestCacheBehavior: def test_fresh_cache_not_stale(self, registry): assert registry._is_stale() is False def test_old_cache_is_stale(self, registry): registry._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1 assert registry._is_stale() is True def test_ensure_loaded_reloads_when_stale(self): reg = RegulationRegistry() reg._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 100 # force stale load_called = False original_load = reg._load def tracking_load(): nonlocal load_called load_called = True reg._load = tracking_load reg._ensure_loaded() assert load_called, "_load should have been called when cache is stale" def test_ensure_loaded_skips_when_fresh(self, registry): with patch.object(registry, "_load") as mock_load: registry._ensure_loaded() mock_load.assert_not_called() # ── Graceful degradation tests ──────��──────────────────────────────────── class TestGracefulDegradation: def test_db_failure_uses_stale_cache(self): """If DB fails, stale cache entries are still usable.""" reg = RegulationRegistry() # First load succeeds with patch("services.regulation_registry.SessionLocal") as mock_cls: mock_session = MagicMock() mock_session.execute = _mock_db_execute mock_cls.return_value = mock_session reg._load() # Force stale reg._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1 # Second load fails — DB error from sqlalchemy.exc import OperationalError with patch("services.regulation_registry.SessionLocal") as mock_cls: mock_cls.side_effect = OperationalError("connection refused", None, None) reg._ensure_loaded() # Should still have cached data result = reg.classify_regulation("eu_2016_679") assert result["rule"] == 1 def test_empty_registry_returns_unknown(self): """Unloaded registry returns safe defaults.""" reg = RegulationRegistry() reg._loaded_at = time.monotonic() # pretend fresh but empty result = reg.classify_regulation("eu_2016_679") assert result["rule"] == 3 # safe default assert result["license"] == "UNKNOWN" # ── Migration data consistency tests ───────��───────────────────────────── class TestMigrationDataConsistency: """Verify that the migration script produces valid data.""" def test_build_rows_produces_data(self): from scripts.f1_migrate_regulation_registry import build_rows rows = build_rows() assert len(rows) > 100 # at least 100 entries def test_all_rows_have_required_fields(self): from scripts.f1_migrate_regulation_registry import build_rows rows = build_rows() for row in rows: assert row["regulation_id"], f"Missing regulation_id: {row}" assert row["regulation_name_de"], f"Missing name: {row}" assert row["license_rule"] in (1, 2, 3), f"Bad rule: {row}" assert row["source_type"] in ( "law", "guideline", "standard", "framework", "restricted" ), f"Bad source_type: {row}" assert row["jurisdiction"], f"Missing jurisdiction: {row}" assert row["status"] in ("active", "needs_review", "deprecated") def test_no_duplicate_regulation_ids(self): from scripts.f1_migrate_regulation_registry import build_rows rows = build_rows() ids = [r["regulation_id"] for r in rows] assert len(ids) == len(set(ids)), f"Duplicates: {[x for x in ids if ids.count(x) > 1]}" def test_known_regulations_present(self): from scripts.f1_migrate_regulation_registry import build_rows rows = build_rows() ids = {r["regulation_id"] for r in rows} assert "eu_2016_679" in ids # DSGVO assert "bdsg" in ids # BDSG assert "nist_sp_800_53" in ids # NIST assert "owasp_asvs" in ids # OWASP def test_owasp_has_attribution(self): from scripts.f1_migrate_regulation_registry import build_rows rows = build_rows() owasp = [r for r in rows if r["regulation_id"] == "owasp_asvs"][0] assert owasp["attribution"] is not None assert "OWASP" in owasp["attribution"] assert owasp["license_rule"] == 2