9437e029d0
Migrates REGULATION_LICENSE_MAP (135 entries) and SOURCE_REGULATION_CLASSIFICATION (58 entries) from hardcoded Python dicts to compliance.regulation_registry table. - SQL migration: 002_regulation_registry.sql (table + indexes + trigger) - Migration script: f1_migrate_regulation_registry.py (162 rows, --dry-run) - RegulationRegistry cache: 5min TTL, prefix fallback, graceful degradation - control_generator._classify_regulation() delegates to DB with dict fallback - source_type_classification.classify_source_regulation() delegates to DB - 34 new tests (lookup, cache, degradation, migration data consistency) - 421 total tests pass, 0 regressions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
286 lines
11 KiB
Python
286 lines
11 KiB
Python
"""Tests for RegulationRegistry — DB-backed lookup with cache and fallback."""
|
|
|
|
import time
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
from services.regulation_registry import (
|
|
RegulationRegistry,
|
|
_CACHE_TTL_SECONDS,
|
|
)
|
|
|
|
|
|
# ── Test data: simulates DB rows ──────────────────────────────────────────
|
|
|
|
_MOCK_DB_ROWS = [
|
|
# (regulation_id, regulation_name_de, license_rule, license_type,
|
|
# attribution, source_type, jurisdiction, status)
|
|
("eu_2016_679", "DSGVO (EU) 2016/679", 1, "EU_LAW",
|
|
None, "law", "EU", "active"),
|
|
("nist_sp_800_53", "NIST SP 800-53 Rev. 5", 1, "NIST_PUBLIC_DOMAIN",
|
|
None, "standard", "US", "active"),
|
|
("owasp_asvs", "OWASP ASVS 4.0", 2, "CC-BY-SA-4.0",
|
|
"OWASP Foundation, CC BY-SA 4.0", "standard", "INT", "active"),
|
|
("bdsg", "Bundesdatenschutzgesetz (BDSG)", 1, "DE_LAW",
|
|
None, "law", "DE", "active"),
|
|
("at_dsg", "Österreichisches Datenschutzgesetz (DSG)", 1, "AT_LAW",
|
|
None, "law", "AT", "active"),
|
|
]
|
|
|
|
|
|
def _mock_db_execute(query):
|
|
"""Mock that returns our test rows."""
|
|
mock_result = MagicMock()
|
|
mock_result.fetchall.return_value = _MOCK_DB_ROWS
|
|
return mock_result
|
|
|
|
|
|
@pytest.fixture
|
|
def registry():
|
|
"""Create a registry with mocked DB."""
|
|
reg = RegulationRegistry()
|
|
with patch("services.regulation_registry.SessionLocal") as mock_session_cls:
|
|
mock_session = MagicMock()
|
|
mock_session.execute = _mock_db_execute
|
|
mock_session_cls.return_value = mock_session
|
|
reg._load()
|
|
return reg
|
|
|
|
|
|
# ── classify_regulation tests ─────────────────────────────────────────────
|
|
|
|
|
|
class TestClassifyRegulation:
|
|
def test_exact_match_eu_law(self, registry):
|
|
result = registry.classify_regulation("eu_2016_679")
|
|
assert result["rule"] == 1
|
|
assert result["license"] == "EU_LAW"
|
|
assert result["source_type"] == "law"
|
|
assert result["name"] == "DSGVO (EU) 2016/679"
|
|
|
|
def test_exact_match_case_insensitive(self, registry):
|
|
result = registry.classify_regulation("EU_2016_679")
|
|
assert result["rule"] == 1
|
|
assert result["name"] == "DSGVO (EU) 2016/679"
|
|
|
|
def test_exact_match_with_whitespace(self, registry):
|
|
result = registry.classify_regulation(" eu_2016_679 ")
|
|
assert result["rule"] == 1
|
|
|
|
def test_nist_standard(self, registry):
|
|
result = registry.classify_regulation("nist_sp_800_53")
|
|
assert result["rule"] == 1
|
|
assert result["source_type"] == "standard"
|
|
|
|
def test_owasp_rule2(self, registry):
|
|
result = registry.classify_regulation("owasp_asvs")
|
|
assert result["rule"] == 2
|
|
assert result["attribution"] == "OWASP Foundation, CC BY-SA 4.0"
|
|
|
|
def test_german_law(self, registry):
|
|
result = registry.classify_regulation("bdsg")
|
|
assert result["rule"] == 1
|
|
assert result["source_type"] == "law"
|
|
assert result["jurisdiction"] == "DE"
|
|
|
|
def test_austrian_law(self, registry):
|
|
result = registry.classify_regulation("at_dsg")
|
|
assert result["rule"] == 1
|
|
assert result["jurisdiction"] == "AT"
|
|
|
|
def test_prefix_enisa_rule2(self, registry):
|
|
result = registry.classify_regulation("enisa_supply_chain_2024")
|
|
assert result["rule"] == 2
|
|
assert result["source_type"] == "standard"
|
|
assert "ENISA" in result["attribution"]
|
|
|
|
def test_prefix_bsi_rule3(self, registry):
|
|
result = registry.classify_regulation("bsi_tr_03161")
|
|
assert result["rule"] == 3
|
|
assert result["source_type"] == "restricted"
|
|
assert result["name"] == "INTERNAL_ONLY"
|
|
|
|
def test_prefix_iso_rule3(self, registry):
|
|
result = registry.classify_regulation("iso_27001")
|
|
assert result["rule"] == 3
|
|
assert result["source_type"] == "restricted"
|
|
|
|
def test_prefix_etsi_rule3(self, registry):
|
|
result = registry.classify_regulation("etsi_en_303_645")
|
|
assert result["rule"] == 3
|
|
|
|
def test_unknown_defaults_to_restricted(self, registry):
|
|
result = registry.classify_regulation("some_unknown_regulation")
|
|
assert result["rule"] == 3
|
|
assert result["source_type"] == "restricted"
|
|
assert result["license"] == "UNKNOWN"
|
|
|
|
|
|
# ── source_type_by_name tests ────────────────────────────────────────────
|
|
|
|
|
|
class TestSourceTypeByName:
|
|
def test_exact_match_law(self, registry):
|
|
result = registry.source_type_by_name("DSGVO (EU) 2016/679")
|
|
assert result == "law"
|
|
|
|
def test_exact_match_standard(self, registry):
|
|
result = registry.source_type_by_name("NIST SP 800-53 Rev. 5")
|
|
assert result == "standard"
|
|
|
|
def test_empty_returns_framework(self, registry):
|
|
assert registry.source_type_by_name("") == "framework"
|
|
assert registry.source_type_by_name(None) == "framework"
|
|
|
|
def test_heuristic_law(self, registry):
|
|
assert registry.source_type_by_name("Verordnung XYZ") == "law"
|
|
assert registry.source_type_by_name("Some EU Directive") == "law"
|
|
|
|
def test_heuristic_guideline(self, registry):
|
|
assert registry.source_type_by_name("EDPB Leitlinie 99/2025") == "guideline"
|
|
assert registry.source_type_by_name("BSI Standard 200-1") == "guideline"
|
|
|
|
def test_heuristic_framework(self, registry):
|
|
# "ENISA Cloud Guidelines" matches "guideline" before "enisa" in heuristic order
|
|
assert registry.source_type_by_name("ENISA Cloud Report") == "framework"
|
|
assert registry.source_type_by_name("OWASP Testing Guide") == "framework"
|
|
|
|
def test_unknown_returns_framework(self, registry):
|
|
assert registry.source_type_by_name("Completely Unknown Document") == "framework"
|
|
|
|
|
|
# ── is_open_source tests ─────────────��───────────────────────────────────
|
|
|
|
|
|
class TestIsOpenSource:
|
|
def test_rule1_is_open(self, registry):
|
|
assert registry.is_open_source("eu_2016_679") is True
|
|
|
|
def test_rule2_is_open(self, registry):
|
|
assert registry.is_open_source("owasp_asvs") is True
|
|
|
|
def test_rule3_is_not_open(self, registry):
|
|
assert registry.is_open_source("bsi_tr_03161") is False
|
|
|
|
def test_unknown_is_not_open(self, registry):
|
|
assert registry.is_open_source("unknown_thing") is False
|
|
|
|
|
|
# ── Cache behavior tests ──────��──────────────────────────────────────────
|
|
|
|
|
|
class TestCacheBehavior:
|
|
def test_fresh_cache_not_stale(self, registry):
|
|
assert registry._is_stale() is False
|
|
|
|
def test_old_cache_is_stale(self, registry):
|
|
registry._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1
|
|
assert registry._is_stale() is True
|
|
|
|
def test_ensure_loaded_reloads_when_stale(self):
|
|
reg = RegulationRegistry()
|
|
reg._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 100 # force stale
|
|
|
|
load_called = False
|
|
original_load = reg._load
|
|
|
|
def tracking_load():
|
|
nonlocal load_called
|
|
load_called = True
|
|
|
|
reg._load = tracking_load
|
|
reg._ensure_loaded()
|
|
assert load_called, "_load should have been called when cache is stale"
|
|
|
|
def test_ensure_loaded_skips_when_fresh(self, registry):
|
|
with patch.object(registry, "_load") as mock_load:
|
|
registry._ensure_loaded()
|
|
mock_load.assert_not_called()
|
|
|
|
|
|
# ── Graceful degradation tests ──────��────────────────────────────────────
|
|
|
|
|
|
class TestGracefulDegradation:
|
|
def test_db_failure_uses_stale_cache(self):
|
|
"""If DB fails, stale cache entries are still usable."""
|
|
reg = RegulationRegistry()
|
|
|
|
# First load succeeds
|
|
with patch("services.regulation_registry.SessionLocal") as mock_cls:
|
|
mock_session = MagicMock()
|
|
mock_session.execute = _mock_db_execute
|
|
mock_cls.return_value = mock_session
|
|
reg._load()
|
|
|
|
# Force stale
|
|
reg._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1
|
|
|
|
# Second load fails — DB error
|
|
from sqlalchemy.exc import OperationalError
|
|
with patch("services.regulation_registry.SessionLocal") as mock_cls:
|
|
mock_cls.side_effect = OperationalError("connection refused", None, None)
|
|
reg._ensure_loaded()
|
|
|
|
# Should still have cached data
|
|
result = reg.classify_regulation("eu_2016_679")
|
|
assert result["rule"] == 1
|
|
|
|
def test_empty_registry_returns_unknown(self):
|
|
"""Unloaded registry returns safe defaults."""
|
|
reg = RegulationRegistry()
|
|
reg._loaded_at = time.monotonic() # pretend fresh but empty
|
|
|
|
result = reg.classify_regulation("eu_2016_679")
|
|
assert result["rule"] == 3 # safe default
|
|
assert result["license"] == "UNKNOWN"
|
|
|
|
|
|
# ── Migration data consistency tests ───────��─────────────────────────────
|
|
|
|
|
|
class TestMigrationDataConsistency:
|
|
"""Verify that the migration script produces valid data."""
|
|
|
|
def test_build_rows_produces_data(self):
|
|
from scripts.f1_migrate_regulation_registry import build_rows
|
|
rows = build_rows()
|
|
assert len(rows) > 100 # at least 100 entries
|
|
|
|
def test_all_rows_have_required_fields(self):
|
|
from scripts.f1_migrate_regulation_registry import build_rows
|
|
rows = build_rows()
|
|
for row in rows:
|
|
assert row["regulation_id"], f"Missing regulation_id: {row}"
|
|
assert row["regulation_name_de"], f"Missing name: {row}"
|
|
assert row["license_rule"] in (1, 2, 3), f"Bad rule: {row}"
|
|
assert row["source_type"] in (
|
|
"law", "guideline", "standard", "framework", "restricted"
|
|
), f"Bad source_type: {row}"
|
|
assert row["jurisdiction"], f"Missing jurisdiction: {row}"
|
|
assert row["status"] in ("active", "needs_review", "deprecated")
|
|
|
|
def test_no_duplicate_regulation_ids(self):
|
|
from scripts.f1_migrate_regulation_registry import build_rows
|
|
rows = build_rows()
|
|
ids = [r["regulation_id"] for r in rows]
|
|
assert len(ids) == len(set(ids)), f"Duplicates: {[x for x in ids if ids.count(x) > 1]}"
|
|
|
|
def test_known_regulations_present(self):
|
|
from scripts.f1_migrate_regulation_registry import build_rows
|
|
rows = build_rows()
|
|
ids = {r["regulation_id"] for r in rows}
|
|
assert "eu_2016_679" in ids # DSGVO
|
|
assert "bdsg" in ids # BDSG
|
|
assert "nist_sp_800_53" in ids # NIST
|
|
assert "owasp_asvs" in ids # OWASP
|
|
|
|
def test_owasp_has_attribution(self):
|
|
from scripts.f1_migrate_regulation_registry import build_rows
|
|
rows = build_rows()
|
|
owasp = [r for r in rows if r["regulation_id"] == "owasp_asvs"][0]
|
|
assert owasp["attribution"] is not None
|
|
assert "OWASP" in owasp["attribution"]
|
|
assert owasp["license_rule"] == 2
|