diff --git a/control-pipeline/tests/test_f5_validation.py b/control-pipeline/tests/test_f5_validation.py new file mode 100644 index 0000000..95b9e73 --- /dev/null +++ b/control-pipeline/tests/test_f5_validation.py @@ -0,0 +1,122 @@ +"""F5 Validation: Verify DB-backed lookups match old hardcoded dicts.""" + +import pytest + + +class TestRegulationRegistryConsistency: + """Ensure all old REGULATION_LICENSE_MAP entries are in the DB.""" + + def test_all_old_entries_in_db(self): + from services.control_generator import REGULATION_LICENSE_MAP + from scripts.f1_migrate_regulation_registry import build_rows + + db_ids = {r["regulation_id"] for r in build_rows()} + for reg_id in REGULATION_LICENSE_MAP: + assert reg_id in db_ids, f"Missing from DB: {reg_id}" + + def test_classify_regulation_matches_old(self): + """DB-backed classify_regulation returns same rule as old dict.""" + from services.control_generator import REGULATION_LICENSE_MAP + from services.regulation_registry import RegulationRegistry + from unittest.mock import patch, MagicMock + + # Build mock DB with migration data + from scripts.f1_migrate_regulation_registry import build_rows + rows = build_rows() + mock_rows = [ + (r["regulation_id"], r["regulation_name_de"], r["license_rule"], + r["license_type"], r.get("attribution"), r["source_type"], + r["jurisdiction"], r["status"]) + for r in rows + ] + + reg = RegulationRegistry() + with patch("services.regulation_registry.SessionLocal") as mock_cls: + mock_session = MagicMock() + mock_result = MagicMock() + mock_result.fetchall.return_value = mock_rows + mock_session.execute.return_value = mock_result + mock_cls.return_value = mock_session + reg._load() + + # Compare every entry + mismatches = [] + for reg_id, info in REGULATION_LICENSE_MAP.items(): + db_result = reg.classify_regulation(reg_id) + if db_result["rule"] != info["rule"]: + mismatches.append(f"{reg_id}: DB rule={db_result['rule']} vs dict rule={info['rule']}") + + assert not mismatches, f"Rule mismatches:\n" + "\n".join(mismatches) + + +class TestActionOntologyConsistency: + """Ensure all old ACTION_TYPES entries are in the DB.""" + + def test_all_action_types_migrated(self): + from services.control_ontology import ACTION_TYPES + from scripts.f2_migrate_actions import build_action_types + + db_names = {t["canonical_name"] for t in build_action_types()} + for action in ACTION_TYPES: + assert action in db_names, f"Missing action_type: {action}" + + def test_all_aliases_migrated(self): + from services.control_ontology import ACTION_TYPES + from scripts.f2_migrate_actions import build_action_synonyms + + db_synonyms = {s["synonym"] for s in build_action_synonyms() if s["pattern_type"] == "alias"} + missing = [] + for action, info in ACTION_TYPES.items(): + for alias in info.get("aliases", []): + if alias.lower() not in db_synonyms: + missing.append(f"{action}: {alias}") + + assert not missing, f"Missing aliases:\n" + "\n".join(missing) + + def test_all_negative_patterns_migrated(self): + from services.control_ontology import _NEGATIVE_PATTERNS + from scripts.f2_migrate_actions import build_action_synonyms + + db_patterns = {s["synonym"] for s in build_action_synonyms() if s["pattern_type"] == "negative_pattern"} + for pattern, _ in _NEGATIVE_PATTERNS: + assert pattern.lower() in db_patterns, f"Missing negative pattern: {pattern}" + + +class TestObjectSynonymsConsistency: + """Ensure all old _OBJECT_SYNONYMS are in the DB.""" + + def test_all_objects_migrated(self): + from services.control_dedup import _OBJECT_SYNONYMS + from scripts.f3_migrate_objects import build_rows + + db_synonyms = {r["synonym"] for r in build_rows()} + missing = [] + for syn in _OBJECT_SYNONYMS: + if syn.lower() not in db_synonyms: + missing.append(syn) + + assert not missing, f"Missing object synonyms:\n" + "\n".join(missing) + + +class TestLLMEnrichmentQuality: + """Basic quality checks on LLM-generated synonyms.""" + + def test_no_empty_synonyms_in_db(self): + """All synonyms should have content.""" + from scripts.f2_migrate_actions import build_action_synonyms + for s in build_action_synonyms(): + assert len(s["synonym"].strip()) >= 2, f"Too short: {s}" + + def test_no_duplicate_canonical_in_actions(self): + """Each synonym should map to exactly one canonical action.""" + from scripts.f2_migrate_actions import build_action_synonyms + synonyms = build_action_synonyms() + seen = {} + for s in synonyms: + key = (s["synonym"], s["language"], s["pattern_type"]) + if key in seen: + assert seen[key] == s["canonical_action"], ( + f"Duplicate synonym '{s['synonym']}' maps to both " + f"'{seen[key]}' and '{s['canonical_action']}'" + ) + seen[key] = s["canonical_action"]