From 652e3a65a39810bba3cafa5242ea5fc4c9692d6a Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 3 May 2026 23:47:53 +0200 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20F2+F3=20action/object=20ontol?= =?UTF-8?q?ogy=20=E2=80=94=20DB-backed=20normalization?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrates ACTION_TYPES (26+8 types), _NEGATIVE_PATTERNS (22), _ACTION_SYNONYMS (65), and _OBJECT_SYNONYMS (75) from hardcoded dicts to DB tables. - SQL migration: 003_action_object_ontology.sql (3 tables) - Migration scripts: f2_migrate_actions.py (34 types, 145 synonyms), f3_migrate_objects.py (75 objects) - OntologyRegistry cache: 5min TTL, raises RuntimeError if empty (safe fallback to dicts) - control_ontology.classify_action/get_phase delegate to DB with dict fallback - control_dedup.normalize_action/normalize_object delegate to DB with dict fallback - 25 new tests, 446 total pass, 0 regressions Co-Authored-By: Claude Opus 4.6 (1M context) --- .../migrations/003_action_object_ontology.sql | 58 +++++ .../scripts/f2_migrate_actions.py | 206 ++++++++++++++++ .../scripts/f3_migrate_objects.py | 100 ++++++++ control-pipeline/services/control_dedup.py | 31 ++- control-pipeline/services/control_ontology.py | 32 ++- .../services/ontology_registry.py | 217 +++++++++++++++++ .../tests/test_ontology_registry.py | 226 ++++++++++++++++++ 7 files changed, 854 insertions(+), 16 deletions(-) create mode 100644 control-pipeline/migrations/003_action_object_ontology.sql create mode 100644 control-pipeline/scripts/f2_migrate_actions.py create mode 100644 control-pipeline/scripts/f3_migrate_objects.py create mode 100644 control-pipeline/services/ontology_registry.py create mode 100644 control-pipeline/tests/test_ontology_registry.py diff --git a/control-pipeline/migrations/003_action_object_ontology.sql b/control-pipeline/migrations/003_action_object_ontology.sql new file mode 100644 index 0000000..1f116b9 --- /dev/null +++ b/control-pipeline/migrations/003_action_object_ontology.sql @@ -0,0 +1,58 @@ +-- Migration 003: Action & Object Ontology (Block F2+F3) +-- Schema: compliance +-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/003_action_object_ontology.sql + +SET search_path TO compliance, public; + +-- ======================================== +-- action_types — 34 canonical action verbs +-- ======================================== + +CREATE TABLE IF NOT EXISTS action_types ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + canonical_name VARCHAR(50) UNIQUE NOT NULL, + phase VARCHAR(30) NOT NULL, + description_de TEXT, + description_en TEXT, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_action_types_phase ON action_types(phase); + +-- ======================================== +-- action_synonyms — German aliases + negative patterns +-- ======================================== + +CREATE TABLE IF NOT EXISTS action_synonyms ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + canonical_action VARCHAR(50) NOT NULL REFERENCES action_types(canonical_name), + synonym VARCHAR(100) NOT NULL, + language VARCHAR(5) NOT NULL DEFAULT 'de', + source VARCHAR(20) NOT NULL DEFAULT 'manual' + CHECK (source IN ('manual', 'llm', 'migration')), + pattern_type VARCHAR(20) NOT NULL DEFAULT 'alias' + CHECK (pattern_type IN ('alias', 'negative_pattern')), + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(synonym, language, pattern_type) +); + +CREATE INDEX IF NOT EXISTS idx_action_synonyms_canonical ON action_synonyms(canonical_action); +CREATE INDEX IF NOT EXISTS idx_action_synonyms_pattern_type ON action_synonyms(pattern_type); + +-- ======================================== +-- object_synonyms — normalized object tokens +-- ======================================== + +CREATE TABLE IF NOT EXISTS object_synonyms ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + canonical_token VARCHAR(100) NOT NULL, + synonym VARCHAR(200) NOT NULL, + language VARCHAR(5) NOT NULL DEFAULT 'de', + source VARCHAR(20) NOT NULL DEFAULT 'manual' + CHECK (source IN ('manual', 'llm', 'migration')), + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(synonym, language) +); + +CREATE INDEX IF NOT EXISTS idx_object_synonyms_canonical ON object_synonyms(canonical_token); diff --git a/control-pipeline/scripts/f2_migrate_actions.py b/control-pipeline/scripts/f2_migrate_actions.py new file mode 100644 index 0000000..a7caed2 --- /dev/null +++ b/control-pipeline/scripts/f2_migrate_actions.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +""" +F2 Migration: Populate action_types + action_synonyms from hardcoded dicts. + +Sources: + - ACTION_TYPES (control_ontology.py) — 26 types + ~150 aliases + - _NEGATIVE_PATTERNS (control_ontology.py) — 22 patterns + - _ACTION_SYNONYMS (control_dedup.py) — 65 synonyms + +Usage: + python3 scripts/f2_migrate_actions.py --dry-run + python3 scripts/f2_migrate_actions.py --db-host macmini +""" + +import argparse +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from services.control_ontology import ACTION_TYPES, _NEGATIVE_PATTERNS # noqa: E402 +from services.control_dedup import _ACTION_SYNONYMS # noqa: E402 + +# Extra action types found in _ACTION_SYNONYMS but missing from ACTION_TYPES +_EXTRA_ACTION_TYPES = { + "audit": "evidence", + "log": "evidence", + "block": "implementation", + "authorize": "governance", + "authenticate": "implementation", + "update": "operation", + "backup": "operation", + "restore": "operation", +} + + +def build_action_types() -> list[dict]: + """Build action_types rows from ACTION_TYPES + extras.""" + rows = [] + for name, info in ACTION_TYPES.items(): + rows.append({ + "canonical_name": name, + "phase": info["phase"], + }) + for name, phase in _EXTRA_ACTION_TYPES.items(): + if name not in ACTION_TYPES: + rows.append({ + "canonical_name": name, + "phase": phase, + }) + return rows + + +def build_action_synonyms() -> list[dict]: + """Build action_synonyms rows from all 3 sources.""" + rows = [] + seen: set[tuple[str, str, str]] = set() # (synonym, language, pattern_type) + + # 1) Aliases from ACTION_TYPES + for action_type, info in ACTION_TYPES.items(): + for alias in info.get("aliases", []): + key = (alias.lower(), "de", "alias") + if key not in seen: + seen.add(key) + rows.append({ + "canonical_action": action_type, + "synonym": alias.lower(), + "language": "de", + "source": "migration", + "pattern_type": "alias", + }) + + # 2) Negative patterns + for pattern, action_type in _NEGATIVE_PATTERNS: + key = (pattern.lower(), "de", "negative_pattern") + if key not in seen: + seen.add(key) + rows.append({ + "canonical_action": action_type, + "synonym": pattern.lower(), + "language": "de", + "source": "migration", + "pattern_type": "negative_pattern", + }) + + # 3) _ACTION_SYNONYMS (German → canonical English) + for synonym, canonical in _ACTION_SYNONYMS.items(): + # Determine language + lang = "en" if synonym == canonical else "de" + key = (synonym.lower(), lang, "alias") + if key not in seen: + seen.add(key) + # Map canonical to valid action_type + action = _map_dedup_canonical(canonical) + rows.append({ + "canonical_action": action, + "synonym": synonym.lower(), + "language": lang, + "source": "migration", + "pattern_type": "alias", + }) + + return rows + + +def _map_dedup_canonical(canonical: str) -> str: + """Map control_dedup canonical names to action_types names.""" + # Most map directly, some need adjustment + mapping = { + "test": "test", + "verify": "verify", # in ACTION_TYPES + "validate": "validate", # in ACTION_TYPES + "audit": "audit", + "log": "log", + "block": "block", + "restrict": "restrict_access", + "authorize": "authorize", + "authenticate": "authenticate", + "update": "update", + "backup": "backup", + "restore": "restore", + } + return mapping.get(canonical, canonical) + + +def insert_via_sqlalchemy(action_types: list[dict], synonyms: list[dict], db_host: str): + """Insert rows using SQLAlchemy.""" + from sqlalchemy import create_engine, text + + url = "postgresql://breakpilot:breakpilot123@%s:5432/breakpilot_db" % db_host + engine = create_engine(url) + + with engine.connect() as conn: + conn.execute(text("SET search_path TO compliance, public")) + + # Insert action_types + for row in action_types: + conn.execute( + text(""" + INSERT INTO action_types (canonical_name, phase) + VALUES (:canonical_name, :phase) + ON CONFLICT (canonical_name) DO UPDATE SET + phase = EXCLUDED.phase + """), + row, + ) + print("Inserted %d action_types" % len(action_types)) + + # Insert action_synonyms + inserted = 0 + skipped = 0 + for row in synonyms: + try: + conn.execute( + text(""" + INSERT INTO action_synonyms + (canonical_action, synonym, language, source, pattern_type) + VALUES + (:canonical_action, :synonym, :language, :source, :pattern_type) + ON CONFLICT (synonym, language, pattern_type) DO UPDATE SET + canonical_action = EXCLUDED.canonical_action, + source = EXCLUDED.source + """), + row, + ) + inserted += 1 + except Exception as e: + print(" Skip %s: %s" % (row["synonym"], e)) + skipped += 1 + + conn.commit() + print("Inserted %d action_synonyms (%d skipped)" % (inserted, skipped)) + + +def main(): + parser = argparse.ArgumentParser(description="Migrate action types + synonyms") + parser.add_argument("--dry-run", action="store_true", help="Print stats only") + parser.add_argument("--db-host", default="localhost", help="PostgreSQL host") + args = parser.parse_args() + + action_types = build_action_types() + synonyms = build_action_synonyms() + + print("Action types: %d" % len(action_types)) + print("Action synonyms: %d" % len(synonyms)) + by_type = {} + for s in synonyms: + by_type[s["pattern_type"]] = by_type.get(s["pattern_type"], 0) + 1 + print(" By pattern_type: %s" % by_type) + by_source = {} + for s in synonyms: + by_source[s["canonical_action"]] = by_source.get(s["canonical_action"], 0) + 1 + print(" Top actions: %s" % dict(sorted(by_source.items(), key=lambda x: -x[1])[:10])) + + if args.dry_run: + print("\n--- DRY RUN ---") + print("\nAction types:") + for at in action_types: + print(" %s (%s)" % (at["canonical_name"], at["phase"])) + return + + insert_via_sqlalchemy(action_types, synonyms, args.db_host) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/scripts/f3_migrate_objects.py b/control-pipeline/scripts/f3_migrate_objects.py new file mode 100644 index 0000000..284e38a --- /dev/null +++ b/control-pipeline/scripts/f3_migrate_objects.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +F3 Migration: Populate object_synonyms from hardcoded dict. + +Source: _OBJECT_SYNONYMS (control_dedup.py) — 75 synonyms + +Usage: + python3 scripts/f3_migrate_objects.py --dry-run + python3 scripts/f3_migrate_objects.py --db-host macmini +""" + +import argparse +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from services.control_dedup import _OBJECT_SYNONYMS # noqa: E402 + + +def build_rows() -> list[dict]: + """Build object_synonyms rows.""" + rows = [] + for synonym, canonical in _OBJECT_SYNONYMS.items(): + # Detect language (heuristic: German if contains umlauts or common DE words) + lang = "de" + lower = synonym.lower() + if all(c in "abcdefghijklmnopqrstuvwxyz0123456789 -_" for c in lower): + # Pure ASCII — likely English + lang = "en" + # Override for known German without umlauts + if lower in ("passwort", "kennwort", "zugangsdaten", "fernzugriff", + "sitzung", "firewall", "netzwerk", "vorfall", + "schwachstelle", "richtlinie", "schulung", + "protokoll", "datensicherung", "wiederherstellung"): + lang = "de" + + rows.append({ + "canonical_token": canonical, + "synonym": lower, + "language": lang, + "source": "migration", + }) + return rows + + +def insert_via_sqlalchemy(rows: list[dict], db_host: str): + """Insert rows using SQLAlchemy.""" + from sqlalchemy import create_engine, text + + url = "postgresql://breakpilot:breakpilot123@%s:5432/breakpilot_db" % db_host + engine = create_engine(url) + + with engine.connect() as conn: + conn.execute(text("SET search_path TO compliance, public")) + + inserted = 0 + for row in rows: + conn.execute( + text(""" + INSERT INTO object_synonyms + (canonical_token, synonym, language, source) + VALUES + (:canonical_token, :synonym, :language, :source) + ON CONFLICT (synonym, language) DO UPDATE SET + canonical_token = EXCLUDED.canonical_token, + source = EXCLUDED.source + """), + row, + ) + inserted += 1 + + conn.commit() + print("Inserted %d object_synonyms" % inserted) + + +def main(): + parser = argparse.ArgumentParser(description="Migrate object synonyms") + parser.add_argument("--dry-run", action="store_true", help="Print stats only") + parser.add_argument("--db-host", default="localhost", help="PostgreSQL host") + args = parser.parse_args() + + rows = build_rows() + print("Object synonyms: %d" % len(rows)) + + # Group by canonical + by_canonical = {} + for r in rows: + by_canonical[r["canonical_token"]] = by_canonical.get(r["canonical_token"], 0) + 1 + print("Unique canonical tokens: %d" % len(by_canonical)) + print("Top tokens: %s" % dict(sorted(by_canonical.items(), key=lambda x: -x[1])[:10])) + + if args.dry_run: + return + + insert_via_sqlalchemy(rows, args.db_host) + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/services/control_dedup.py b/control-pipeline/services/control_dedup.py index 26a26f2..2ca0e37 100644 --- a/control-pipeline/services/control_dedup.py +++ b/control-pipeline/services/control_dedup.py @@ -126,22 +126,29 @@ _ACTION_SYNONYMS: dict[str, str] = { def normalize_action(action: str) -> str: - """Normalize an action verb to a canonical English form.""" + """Normalize an action verb to a canonical English form. + + Delegates to DB-backed OntologyRegistry with dict fallback. + """ + try: + from .ontology_registry import get_ontology_registry + return get_ontology_registry().normalize_action(action) + except Exception: + pass + + # Fallback: original logic if not action: return "" action = action.strip().lower() - # Strip German infinitive/conjugation suffixes for lookup action_base = re.sub(r"(en|t|st|e|te|tet|end)$", "", action) - # Try exact match first, then base form if action in _ACTION_SYNONYMS: return _ACTION_SYNONYMS[action] if action_base in _ACTION_SYNONYMS: return _ACTION_SYNONYMS[action_base] - # Fuzzy: check if action starts with any known verb for verb, canonical in _ACTION_SYNONYMS.items(): if action.startswith(verb) or verb.startswith(action): return canonical - return action # fallback: return as-is + return action # ── Object Normalization ───────────────────────────────────────────── @@ -237,7 +244,19 @@ _OBJECT_KEYS_SORTED = sorted(_OBJECT_SYNONYMS.keys(), key=len, reverse=True) def normalize_object(obj: str) -> str: - """Normalize a compliance object to a canonical token.""" + """Normalize a compliance object to a canonical token. + + Delegates to DB-backed OntologyRegistry with dict fallback. + """ + # Try DB-backed registry first + try: + from .ontology_registry import get_ontology_registry + result = get_ontology_registry().normalize_object(obj) + if result != obj.strip().lower(): + return result + except Exception: + pass + if not obj: return "" obj_lower = obj.strip().lower() diff --git a/control-pipeline/services/control_ontology.py b/control-pipeline/services/control_ontology.py index 281c9e5..7ed8a4e 100644 --- a/control-pipeline/services/control_ontology.py +++ b/control-pipeline/services/control_ontology.py @@ -223,31 +223,43 @@ _FRAMEWORK_PATTERNS: list[str] = [ def classify_action(text: str) -> str: - """Classify an obligation action text into a canonical action_type.""" - text_lower = text.lower().strip() + """Classify an obligation action text into a canonical action_type. - # Check negative patterns first + Delegates to DB-backed OntologyRegistry (with 5min cache). + Falls back to hardcoded dicts if DB is unavailable. + """ + try: + from .ontology_registry import get_ontology_registry + return get_ontology_registry().classify_action(text) + except Exception: + pass + + # Fallback: original logic + text_lower = text.lower().strip() for pattern, action_type in _NEGATIVE_PATTERNS: if pattern in text_lower: return action_type - - # Direct alias match if text_lower in _ALIAS_TO_ACTION: return _ALIAS_TO_ACTION[text_lower] - - # Substring match (longest first) best_match = "" - best_action = "implement" # default fallback + best_action = "implement" for alias, action_type in sorted(_ALIAS_TO_ACTION.items(), key=lambda x: -len(x[0])): if alias in text_lower and len(alias) > len(best_match): best_match = alias best_action = action_type - return best_action def get_phase(action_type: str) -> str: - """Get the control_phase for an action_type.""" + """Get the control_phase for an action_type. + + Delegates to DB-backed OntologyRegistry with dict fallback. + """ + try: + from .ontology_registry import get_ontology_registry + return get_ontology_registry().get_phase(action_type) + except Exception: + pass info = ACTION_TYPES.get(action_type, {}) return info.get("phase", "implementation") diff --git a/control-pipeline/services/ontology_registry.py b/control-pipeline/services/ontology_registry.py new file mode 100644 index 0000000..c1a63e0 --- /dev/null +++ b/control-pipeline/services/ontology_registry.py @@ -0,0 +1,217 @@ +""" +DB-backed Action & Object Ontology Registry with in-memory cache. + +Replaces hardcoded ACTION_TYPES, _NEGATIVE_PATTERNS, _ACTION_SYNONYMS, +and _OBJECT_SYNONYMS with PostgreSQL tables. + +Cache TTL: 5 minutes. Thread-safe via simple timestamp check. +Falls back to hardcoded dicts if DB is unavailable. +""" + +import logging +import re +import time +from typing import Optional + +from sqlalchemy import text +from sqlalchemy.exc import SQLAlchemyError + +from db.session import SessionLocal + +logger = logging.getLogger(__name__) + +_CACHE_TTL_SECONDS = 300 # 5 minutes + + +class OntologyRegistry: + """In-memory cache of action_types, action_synonyms, and object_synonyms.""" + + def __init__(self): + # Action types: canonical_name → phase + self._action_phases: dict[str, str] = {} + # Alias → canonical action (for classify_action) + self._alias_to_action: dict[str, str] = {} + # Negative patterns: [(pattern, action_type)] ordered longest first + self._negative_patterns: list[tuple[str, str]] = [] + # Action synonyms for dedup: synonym → canonical (for normalize_action) + self._action_synonyms: dict[str, str] = {} + # Object synonyms: synonym → canonical_token (for normalize_object) + self._object_synonyms: dict[str, str] = {} + # Sorted object keys (longest first) for substring matching + self._object_keys_sorted: list[str] = [] + self._loaded_at: float = 0.0 + + def _is_stale(self) -> bool: + return (time.monotonic() - self._loaded_at) > _CACHE_TTL_SECONDS + + def _load(self) -> bool: + """Load all ontology data from DB into memory.""" + try: + db = SessionLocal() + try: + return self._load_from_db(db) + finally: + db.close() + except SQLAlchemyError: + logger.warning( + "Failed to load ontology from DB — using stale cache", + exc_info=True, + ) + return False + + def _load_from_db(self, db) -> bool: + """Load from DB session.""" + # 1. Action types + rows = db.execute(text( + "SELECT canonical_name, phase FROM action_types" + )).fetchall() + action_phases = {r[0]: r[1] for r in rows} + + # 2. Action synonyms (aliases + negative patterns) + rows = db.execute(text( + "SELECT canonical_action, synonym, pattern_type FROM action_synonyms" + )).fetchall() + + alias_to_action: dict[str, str] = {} + negative_patterns: list[tuple[str, str]] = [] + action_synonyms: dict[str, str] = {} + + for canonical, synonym, ptype in rows: + if ptype == "negative_pattern": + negative_patterns.append((synonym, canonical)) + else: + alias_to_action[synonym] = canonical + action_synonyms[synonym] = canonical + + # Sort negative patterns: longest first (for priority matching) + negative_patterns.sort(key=lambda x: -len(x[0])) + + # 3. Object synonyms + rows = db.execute(text( + "SELECT canonical_token, synonym FROM object_synonyms" + )).fetchall() + object_synonyms = {r[1]: r[0] for r in rows} + object_keys_sorted = sorted(object_synonyms.keys(), key=len, reverse=True) + + # Commit to cache + self._action_phases = action_phases + self._alias_to_action = alias_to_action + self._negative_patterns = negative_patterns + self._action_synonyms = action_synonyms + self._object_synonyms = object_synonyms + self._object_keys_sorted = object_keys_sorted + self._loaded_at = time.monotonic() + + logger.info( + "Ontology loaded: %d action_types, %d aliases, %d neg_patterns, %d object_synonyms", + len(action_phases), len(alias_to_action), + len(negative_patterns), len(object_synonyms), + ) + return True + + @property + def is_loaded(self) -> bool: + """True if the cache has any data.""" + return len(self._action_phases) > 0 + + def _ensure_loaded(self) -> None: + if self._is_stale(): + self._load() + if not self.is_loaded: + raise RuntimeError("OntologyRegistry has no data") + + # ── Action Classification (replaces control_ontology.classify_action) ── + + def classify_action(self, text_input: str) -> str: + """Classify text into a canonical action_type.""" + self._ensure_loaded() + text_lower = text_input.lower().strip() + + # Check negative patterns first + for pattern, action_type in self._negative_patterns: + if pattern in text_lower: + return action_type + + # Direct alias match + if text_lower in self._alias_to_action: + return self._alias_to_action[text_lower] + + # Substring match (longest first) + best_match = "" + best_action = "implement" + for alias, action_type in sorted( + self._alias_to_action.items(), key=lambda x: -len(x[0]) + ): + if alias in text_lower and len(alias) > len(best_match): + best_match = alias + best_action = action_type + + return best_action + + def get_phase(self, action_type: str) -> str: + """Get the control_phase for an action_type.""" + self._ensure_loaded() + return self._action_phases.get(action_type, "implementation") + + # ── Action Normalization (replaces control_dedup.normalize_action) ── + + def normalize_action(self, action: str) -> str: + """Normalize an action verb to a canonical English form.""" + self._ensure_loaded() + if not action: + return "" + action = action.strip().lower() + action_base = re.sub(r"(en|t|st|e|te|tet|end)$", "", action) + + if action in self._action_synonyms: + return self._action_synonyms[action] + if action_base in self._action_synonyms: + return self._action_synonyms[action_base] + + for verb, canonical in self._action_synonyms.items(): + if action.startswith(verb) or verb.startswith(action): + return canonical + + return action + + # ── Object Normalization (replaces control_dedup.normalize_object) ── + + def normalize_object(self, obj: str) -> str: + """Normalize an object to a canonical token.""" + self._ensure_loaded() + if not obj: + return "" + obj_lower = obj.strip().lower() + + # Exact match + if obj_lower in self._object_synonyms: + return self._object_synonyms[obj_lower] + + # Substring match (longest phrase first) + for phrase in self._object_keys_sorted: + if phrase in obj_lower: + return self._object_synonyms[phrase] + + return obj_lower + + def get_action_types(self) -> dict[str, str]: + """Return all action_type → phase mappings.""" + self._ensure_loaded() + return dict(self._action_phases) + + def get_object_synonyms(self) -> dict[str, str]: + """Return all object synonym → canonical mappings.""" + self._ensure_loaded() + return dict(self._object_synonyms) + + +# Module-level singleton +_registry: Optional[OntologyRegistry] = None + + +def get_ontology_registry() -> OntologyRegistry: + """Get or create the singleton OntologyRegistry instance.""" + global _registry + if _registry is None: + _registry = OntologyRegistry() + return _registry diff --git a/control-pipeline/tests/test_ontology_registry.py b/control-pipeline/tests/test_ontology_registry.py new file mode 100644 index 0000000..c2706a4 --- /dev/null +++ b/control-pipeline/tests/test_ontology_registry.py @@ -0,0 +1,226 @@ +"""Tests for OntologyRegistry — DB-backed action/object normalization.""" + +import time +from unittest.mock import MagicMock, patch + +import pytest + +from services.ontology_registry import OntologyRegistry, _CACHE_TTL_SECONDS + + +# ── Mock DB data ────────────────────────────────────────────────────── + +_MOCK_ACTION_TYPES = [ + ("implement", "implementation"), + ("monitor", "monitoring"), + ("prevent", "implementation"), + ("exclude", "implementation"), + ("test", "testing"), + ("encrypt", "implementation"), + ("document", "evidence"), + ("train", "training"), +] + +_MOCK_ACTION_SYNONYMS = [ + # (canonical_action, synonym, pattern_type) + ("implement", "implementieren", "alias"), + ("implement", "umsetzen", "alias"), + ("implement", "einführen", "alias"), + ("monitor", "überwachen", "alias"), + ("test", "testen", "alias"), + ("encrypt", "verschlüsseln", "alias"), + ("document", "dokumentieren", "alias"), + ("train", "schulen", "alias"), + # Negative patterns + ("exclude", "dürfen nicht", "negative_pattern"), + ("exclude", "darf nicht", "negative_pattern"), + ("prevent", "verhindern", "negative_pattern"), + ("prevent", "nicht gespeichert", "negative_pattern"), +] + +_MOCK_OBJECT_SYNONYMS = [ + ("multi_factor_auth", "mfa"), + ("multi_factor_auth", "2fa"), + ("password_policy", "passwort"), + ("encryption", "verschlüsselung"), + ("audit_logging", "audit-log"), + ("firewall", "firewall"), + ("personal_data", "personenbezogene daten"), +] + + +def _mock_execute(query): + """Route mock queries to correct test data.""" + q = str(query) + mock_result = MagicMock() + if "action_types" in q: + mock_result.fetchall.return_value = _MOCK_ACTION_TYPES + elif "action_synonyms" in q: + mock_result.fetchall.return_value = _MOCK_ACTION_SYNONYMS + elif "object_synonyms" in q: + mock_result.fetchall.return_value = _MOCK_OBJECT_SYNONYMS + else: + mock_result.fetchall.return_value = [] + return mock_result + + +@pytest.fixture +def registry(): + """Create a registry with mocked DB.""" + reg = OntologyRegistry() + with patch("services.ontology_registry.SessionLocal") as mock_cls: + mock_session = MagicMock() + mock_session.execute = _mock_execute + mock_cls.return_value = mock_session + reg._load() + return reg + + +# ── classify_action tests ──────────────────────────────────────────── + + +class TestClassifyAction: + def test_direct_alias(self, registry): + assert registry.classify_action("implementieren") == "implement" + assert registry.classify_action("überwachen") == "monitor" + assert registry.classify_action("testen") == "test" + + def test_case_insensitive(self, registry): + assert registry.classify_action("IMPLEMENTIEREN") == "implement" + + def test_negative_pattern(self, registry): + assert registry.classify_action("dürfen nicht verwendet werden") == "exclude" + assert registry.classify_action("darf nicht gespeichert werden") == "prevent" + + def test_negative_pattern_priority(self, registry): + # "nicht gespeichert" is more specific than "darf nicht" + assert registry.classify_action("nicht gespeichert") == "prevent" + + def test_substring_match(self, registry): + assert registry.classify_action("Maßnahmen implementieren und dokumentieren") == "implement" + + def test_unknown_defaults_to_implement(self, registry): + assert registry.classify_action("fliegen") == "implement" + + +# ── get_phase tests ────────────────────────────────────────────────── + + +class TestGetPhase: + def test_known_phase(self, registry): + assert registry.get_phase("implement") == "implementation" + assert registry.get_phase("monitor") == "monitoring" + assert registry.get_phase("test") == "testing" + + def test_unknown_defaults_to_implementation(self, registry): + assert registry.get_phase("unknown_action") == "implementation" + + +# ── normalize_action tests ─────────────────────────────────────────── + + +class TestNormalizeAction: + def test_exact_match(self, registry): + assert registry.normalize_action("implementieren") == "implement" + assert registry.normalize_action("testen") == "test" + + def test_empty(self, registry): + assert registry.normalize_action("") == "" + + def test_passthrough_unknown(self, registry): + assert registry.normalize_action("fliegen") == "fliegen" + + +# ── normalize_object tests ─────────────────────────────────────────── + + +class TestNormalizeObject: + def test_exact_match(self, registry): + assert registry.normalize_object("mfa") == "multi_factor_auth" + assert registry.normalize_object("2fa") == "multi_factor_auth" + assert registry.normalize_object("passwort") == "password_policy" + + def test_case_insensitive(self, registry): + assert registry.normalize_object("MFA") == "multi_factor_auth" + + def test_substring_match(self, registry): + assert registry.normalize_object("die personenbezogene daten verarbeiten") == "personal_data" + + def test_empty(self, registry): + assert registry.normalize_object("") == "" + + def test_unknown_passthrough(self, registry): + assert registry.normalize_object("raumschiff") == "raumschiff" + + +# ── Cache behavior tests ──────────────────────────────────────────── + + +class TestCacheBehavior: + def test_fresh_cache_not_stale(self, registry): + assert registry._is_stale() is False + + def test_old_cache_is_stale(self, registry): + registry._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1 + assert registry._is_stale() is True + + +# ── Migration data consistency ─────────────────────────────────────── + + +class TestF2MigrationData: + def test_build_action_types(self): + from scripts.f2_migrate_actions import build_action_types + types = build_action_types() + assert len(types) >= 26 + names = {t["canonical_name"] for t in types} + assert "implement" in names + assert "monitor" in names + assert "encrypt" in names + + def test_build_action_synonyms(self): + from scripts.f2_migrate_actions import build_action_synonyms + synonyms = build_action_synonyms() + assert len(synonyms) > 100 + + # Check pattern types + aliases = [s for s in synonyms if s["pattern_type"] == "alias"] + negatives = [s for s in synonyms if s["pattern_type"] == "negative_pattern"] + assert len(aliases) > 80 + assert len(negatives) > 15 + + def test_no_duplicate_synonyms(self): + from scripts.f2_migrate_actions import build_action_synonyms + synonyms = build_action_synonyms() + keys = [(s["synonym"], s["language"], s["pattern_type"]) for s in synonyms] + assert len(keys) == len(set(keys)) + + def test_all_canonical_actions_exist(self): + from scripts.f2_migrate_actions import build_action_types, build_action_synonyms + type_names = {t["canonical_name"] for t in build_action_types()} + synonyms = build_action_synonyms() + for s in synonyms: + assert s["canonical_action"] in type_names, ( + "Synonym '%s' references unknown action '%s'" % (s["synonym"], s["canonical_action"]) + ) + + +class TestF3MigrationData: + def test_build_object_rows(self): + from scripts.f3_migrate_objects import build_rows + rows = build_rows() + assert len(rows) >= 70 + + def test_no_duplicate_objects(self): + from scripts.f3_migrate_objects import build_rows + rows = build_rows() + keys = [(r["synonym"], r["language"]) for r in rows] + assert len(keys) == len(set(keys)) + + def test_known_objects_present(self): + from scripts.f3_migrate_objects import build_rows + rows = build_rows() + synonyms = {r["synonym"] for r in rows} + assert "mfa" in synonyms + assert "passwort" in synonyms + assert "firewall" in synonyms