feat(pipeline): F2+F3 action/object ontology — DB-backed normalization
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 36s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 31s

Migrates ACTION_TYPES (26+8 types), _NEGATIVE_PATTERNS (22), _ACTION_SYNONYMS
(65), and _OBJECT_SYNONYMS (75) from hardcoded dicts to DB tables.

- SQL migration: 003_action_object_ontology.sql (3 tables)
- Migration scripts: f2_migrate_actions.py (34 types, 145 synonyms), f3_migrate_objects.py (75 objects)
- OntologyRegistry cache: 5min TTL, raises RuntimeError if empty (safe fallback to dicts)
- control_ontology.classify_action/get_phase delegate to DB with dict fallback
- control_dedup.normalize_action/normalize_object delegate to DB with dict fallback
- 25 new tests, 446 total pass, 0 regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-03 23:47:53 +02:00
parent aab8eeb335
commit 652e3a65a3
7 changed files with 854 additions and 16 deletions
@@ -0,0 +1,58 @@
-- Migration 003: Action & Object Ontology (Block F2+F3)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/003_action_object_ontology.sql
SET search_path TO compliance, public;
-- ========================================
-- action_types — 34 canonical action verbs
-- ========================================
CREATE TABLE IF NOT EXISTS action_types (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
canonical_name VARCHAR(50) UNIQUE NOT NULL,
phase VARCHAR(30) NOT NULL,
description_de TEXT,
description_en TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_action_types_phase ON action_types(phase);
-- ========================================
-- action_synonyms — German aliases + negative patterns
-- ========================================
CREATE TABLE IF NOT EXISTS action_synonyms (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
canonical_action VARCHAR(50) NOT NULL REFERENCES action_types(canonical_name),
synonym VARCHAR(100) NOT NULL,
language VARCHAR(5) NOT NULL DEFAULT 'de',
source VARCHAR(20) NOT NULL DEFAULT 'manual'
CHECK (source IN ('manual', 'llm', 'migration')),
pattern_type VARCHAR(20) NOT NULL DEFAULT 'alias'
CHECK (pattern_type IN ('alias', 'negative_pattern')),
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(synonym, language, pattern_type)
);
CREATE INDEX IF NOT EXISTS idx_action_synonyms_canonical ON action_synonyms(canonical_action);
CREATE INDEX IF NOT EXISTS idx_action_synonyms_pattern_type ON action_synonyms(pattern_type);
-- ========================================
-- object_synonyms — normalized object tokens
-- ========================================
CREATE TABLE IF NOT EXISTS object_synonyms (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
canonical_token VARCHAR(100) NOT NULL,
synonym VARCHAR(200) NOT NULL,
language VARCHAR(5) NOT NULL DEFAULT 'de',
source VARCHAR(20) NOT NULL DEFAULT 'manual'
CHECK (source IN ('manual', 'llm', 'migration')),
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(synonym, language)
);
CREATE INDEX IF NOT EXISTS idx_object_synonyms_canonical ON object_synonyms(canonical_token);
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
F2 Migration: Populate action_types + action_synonyms from hardcoded dicts.
Sources:
- ACTION_TYPES (control_ontology.py) — 26 types + ~150 aliases
- _NEGATIVE_PATTERNS (control_ontology.py) — 22 patterns
- _ACTION_SYNONYMS (control_dedup.py) — 65 synonyms
Usage:
python3 scripts/f2_migrate_actions.py --dry-run
python3 scripts/f2_migrate_actions.py --db-host macmini
"""
import argparse
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from services.control_ontology import ACTION_TYPES, _NEGATIVE_PATTERNS # noqa: E402
from services.control_dedup import _ACTION_SYNONYMS # noqa: E402
# Extra action types found in _ACTION_SYNONYMS but missing from ACTION_TYPES
_EXTRA_ACTION_TYPES = {
"audit": "evidence",
"log": "evidence",
"block": "implementation",
"authorize": "governance",
"authenticate": "implementation",
"update": "operation",
"backup": "operation",
"restore": "operation",
}
def build_action_types() -> list[dict]:
"""Build action_types rows from ACTION_TYPES + extras."""
rows = []
for name, info in ACTION_TYPES.items():
rows.append({
"canonical_name": name,
"phase": info["phase"],
})
for name, phase in _EXTRA_ACTION_TYPES.items():
if name not in ACTION_TYPES:
rows.append({
"canonical_name": name,
"phase": phase,
})
return rows
def build_action_synonyms() -> list[dict]:
"""Build action_synonyms rows from all 3 sources."""
rows = []
seen: set[tuple[str, str, str]] = set() # (synonym, language, pattern_type)
# 1) Aliases from ACTION_TYPES
for action_type, info in ACTION_TYPES.items():
for alias in info.get("aliases", []):
key = (alias.lower(), "de", "alias")
if key not in seen:
seen.add(key)
rows.append({
"canonical_action": action_type,
"synonym": alias.lower(),
"language": "de",
"source": "migration",
"pattern_type": "alias",
})
# 2) Negative patterns
for pattern, action_type in _NEGATIVE_PATTERNS:
key = (pattern.lower(), "de", "negative_pattern")
if key not in seen:
seen.add(key)
rows.append({
"canonical_action": action_type,
"synonym": pattern.lower(),
"language": "de",
"source": "migration",
"pattern_type": "negative_pattern",
})
# 3) _ACTION_SYNONYMS (German → canonical English)
for synonym, canonical in _ACTION_SYNONYMS.items():
# Determine language
lang = "en" if synonym == canonical else "de"
key = (synonym.lower(), lang, "alias")
if key not in seen:
seen.add(key)
# Map canonical to valid action_type
action = _map_dedup_canonical(canonical)
rows.append({
"canonical_action": action,
"synonym": synonym.lower(),
"language": lang,
"source": "migration",
"pattern_type": "alias",
})
return rows
def _map_dedup_canonical(canonical: str) -> str:
"""Map control_dedup canonical names to action_types names."""
# Most map directly, some need adjustment
mapping = {
"test": "test",
"verify": "verify", # in ACTION_TYPES
"validate": "validate", # in ACTION_TYPES
"audit": "audit",
"log": "log",
"block": "block",
"restrict": "restrict_access",
"authorize": "authorize",
"authenticate": "authenticate",
"update": "update",
"backup": "backup",
"restore": "restore",
}
return mapping.get(canonical, canonical)
def insert_via_sqlalchemy(action_types: list[dict], synonyms: list[dict], db_host: str):
"""Insert rows using SQLAlchemy."""
from sqlalchemy import create_engine, text
url = "postgresql://breakpilot:breakpilot123@%s:5432/breakpilot_db" % db_host
engine = create_engine(url)
with engine.connect() as conn:
conn.execute(text("SET search_path TO compliance, public"))
# Insert action_types
for row in action_types:
conn.execute(
text("""
INSERT INTO action_types (canonical_name, phase)
VALUES (:canonical_name, :phase)
ON CONFLICT (canonical_name) DO UPDATE SET
phase = EXCLUDED.phase
"""),
row,
)
print("Inserted %d action_types" % len(action_types))
# Insert action_synonyms
inserted = 0
skipped = 0
for row in synonyms:
try:
conn.execute(
text("""
INSERT INTO action_synonyms
(canonical_action, synonym, language, source, pattern_type)
VALUES
(:canonical_action, :synonym, :language, :source, :pattern_type)
ON CONFLICT (synonym, language, pattern_type) DO UPDATE SET
canonical_action = EXCLUDED.canonical_action,
source = EXCLUDED.source
"""),
row,
)
inserted += 1
except Exception as e:
print(" Skip %s: %s" % (row["synonym"], e))
skipped += 1
conn.commit()
print("Inserted %d action_synonyms (%d skipped)" % (inserted, skipped))
def main():
parser = argparse.ArgumentParser(description="Migrate action types + synonyms")
parser.add_argument("--dry-run", action="store_true", help="Print stats only")
parser.add_argument("--db-host", default="localhost", help="PostgreSQL host")
args = parser.parse_args()
action_types = build_action_types()
synonyms = build_action_synonyms()
print("Action types: %d" % len(action_types))
print("Action synonyms: %d" % len(synonyms))
by_type = {}
for s in synonyms:
by_type[s["pattern_type"]] = by_type.get(s["pattern_type"], 0) + 1
print(" By pattern_type: %s" % by_type)
by_source = {}
for s in synonyms:
by_source[s["canonical_action"]] = by_source.get(s["canonical_action"], 0) + 1
print(" Top actions: %s" % dict(sorted(by_source.items(), key=lambda x: -x[1])[:10]))
if args.dry_run:
print("\n--- DRY RUN ---")
print("\nAction types:")
for at in action_types:
print(" %s (%s)" % (at["canonical_name"], at["phase"]))
return
insert_via_sqlalchemy(action_types, synonyms, args.db_host)
if __name__ == "__main__":
main()
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
F3 Migration: Populate object_synonyms from hardcoded dict.
Source: _OBJECT_SYNONYMS (control_dedup.py) — 75 synonyms
Usage:
python3 scripts/f3_migrate_objects.py --dry-run
python3 scripts/f3_migrate_objects.py --db-host macmini
"""
import argparse
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from services.control_dedup import _OBJECT_SYNONYMS # noqa: E402
def build_rows() -> list[dict]:
"""Build object_synonyms rows."""
rows = []
for synonym, canonical in _OBJECT_SYNONYMS.items():
# Detect language (heuristic: German if contains umlauts or common DE words)
lang = "de"
lower = synonym.lower()
if all(c in "abcdefghijklmnopqrstuvwxyz0123456789 -_" for c in lower):
# Pure ASCII — likely English
lang = "en"
# Override for known German without umlauts
if lower in ("passwort", "kennwort", "zugangsdaten", "fernzugriff",
"sitzung", "firewall", "netzwerk", "vorfall",
"schwachstelle", "richtlinie", "schulung",
"protokoll", "datensicherung", "wiederherstellung"):
lang = "de"
rows.append({
"canonical_token": canonical,
"synonym": lower,
"language": lang,
"source": "migration",
})
return rows
def insert_via_sqlalchemy(rows: list[dict], db_host: str):
"""Insert rows using SQLAlchemy."""
from sqlalchemy import create_engine, text
url = "postgresql://breakpilot:breakpilot123@%s:5432/breakpilot_db" % db_host
engine = create_engine(url)
with engine.connect() as conn:
conn.execute(text("SET search_path TO compliance, public"))
inserted = 0
for row in rows:
conn.execute(
text("""
INSERT INTO object_synonyms
(canonical_token, synonym, language, source)
VALUES
(:canonical_token, :synonym, :language, :source)
ON CONFLICT (synonym, language) DO UPDATE SET
canonical_token = EXCLUDED.canonical_token,
source = EXCLUDED.source
"""),
row,
)
inserted += 1
conn.commit()
print("Inserted %d object_synonyms" % inserted)
def main():
parser = argparse.ArgumentParser(description="Migrate object synonyms")
parser.add_argument("--dry-run", action="store_true", help="Print stats only")
parser.add_argument("--db-host", default="localhost", help="PostgreSQL host")
args = parser.parse_args()
rows = build_rows()
print("Object synonyms: %d" % len(rows))
# Group by canonical
by_canonical = {}
for r in rows:
by_canonical[r["canonical_token"]] = by_canonical.get(r["canonical_token"], 0) + 1
print("Unique canonical tokens: %d" % len(by_canonical))
print("Top tokens: %s" % dict(sorted(by_canonical.items(), key=lambda x: -x[1])[:10]))
if args.dry_run:
return
insert_via_sqlalchemy(rows, args.db_host)
if __name__ == "__main__":
main()
+25 -6
View File
@@ -126,22 +126,29 @@ _ACTION_SYNONYMS: dict[str, str] = {
def normalize_action(action: str) -> str:
"""Normalize an action verb to a canonical English form."""
"""Normalize an action verb to a canonical English form.
Delegates to DB-backed OntologyRegistry with dict fallback.
"""
try:
from .ontology_registry import get_ontology_registry
return get_ontology_registry().normalize_action(action)
except Exception:
pass
# Fallback: original logic
if not action:
return ""
action = action.strip().lower()
# Strip German infinitive/conjugation suffixes for lookup
action_base = re.sub(r"(en|t|st|e|te|tet|end)$", "", action)
# Try exact match first, then base form
if action in _ACTION_SYNONYMS:
return _ACTION_SYNONYMS[action]
if action_base in _ACTION_SYNONYMS:
return _ACTION_SYNONYMS[action_base]
# Fuzzy: check if action starts with any known verb
for verb, canonical in _ACTION_SYNONYMS.items():
if action.startswith(verb) or verb.startswith(action):
return canonical
return action # fallback: return as-is
return action
# ── Object Normalization ─────────────────────────────────────────────
@@ -237,7 +244,19 @@ _OBJECT_KEYS_SORTED = sorted(_OBJECT_SYNONYMS.keys(), key=len, reverse=True)
def normalize_object(obj: str) -> str:
"""Normalize a compliance object to a canonical token."""
"""Normalize a compliance object to a canonical token.
Delegates to DB-backed OntologyRegistry with dict fallback.
"""
# Try DB-backed registry first
try:
from .ontology_registry import get_ontology_registry
result = get_ontology_registry().normalize_object(obj)
if result != obj.strip().lower():
return result
except Exception:
pass
if not obj:
return ""
obj_lower = obj.strip().lower()
+22 -10
View File
@@ -223,31 +223,43 @@ _FRAMEWORK_PATTERNS: list[str] = [
def classify_action(text: str) -> str:
"""Classify an obligation action text into a canonical action_type."""
text_lower = text.lower().strip()
"""Classify an obligation action text into a canonical action_type.
# Check negative patterns first
Delegates to DB-backed OntologyRegistry (with 5min cache).
Falls back to hardcoded dicts if DB is unavailable.
"""
try:
from .ontology_registry import get_ontology_registry
return get_ontology_registry().classify_action(text)
except Exception:
pass
# Fallback: original logic
text_lower = text.lower().strip()
for pattern, action_type in _NEGATIVE_PATTERNS:
if pattern in text_lower:
return action_type
# Direct alias match
if text_lower in _ALIAS_TO_ACTION:
return _ALIAS_TO_ACTION[text_lower]
# Substring match (longest first)
best_match = ""
best_action = "implement" # default fallback
best_action = "implement"
for alias, action_type in sorted(_ALIAS_TO_ACTION.items(), key=lambda x: -len(x[0])):
if alias in text_lower and len(alias) > len(best_match):
best_match = alias
best_action = action_type
return best_action
def get_phase(action_type: str) -> str:
"""Get the control_phase for an action_type."""
"""Get the control_phase for an action_type.
Delegates to DB-backed OntologyRegistry with dict fallback.
"""
try:
from .ontology_registry import get_ontology_registry
return get_ontology_registry().get_phase(action_type)
except Exception:
pass
info = ACTION_TYPES.get(action_type, {})
return info.get("phase", "implementation")
@@ -0,0 +1,217 @@
"""
DB-backed Action & Object Ontology Registry with in-memory cache.
Replaces hardcoded ACTION_TYPES, _NEGATIVE_PATTERNS, _ACTION_SYNONYMS,
and _OBJECT_SYNONYMS with PostgreSQL tables.
Cache TTL: 5 minutes. Thread-safe via simple timestamp check.
Falls back to hardcoded dicts if DB is unavailable.
"""
import logging
import re
import time
from typing import Optional
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from db.session import SessionLocal
logger = logging.getLogger(__name__)
_CACHE_TTL_SECONDS = 300 # 5 minutes
class OntologyRegistry:
"""In-memory cache of action_types, action_synonyms, and object_synonyms."""
def __init__(self):
# Action types: canonical_name → phase
self._action_phases: dict[str, str] = {}
# Alias → canonical action (for classify_action)
self._alias_to_action: dict[str, str] = {}
# Negative patterns: [(pattern, action_type)] ordered longest first
self._negative_patterns: list[tuple[str, str]] = []
# Action synonyms for dedup: synonym → canonical (for normalize_action)
self._action_synonyms: dict[str, str] = {}
# Object synonyms: synonym → canonical_token (for normalize_object)
self._object_synonyms: dict[str, str] = {}
# Sorted object keys (longest first) for substring matching
self._object_keys_sorted: list[str] = []
self._loaded_at: float = 0.0
def _is_stale(self) -> bool:
return (time.monotonic() - self._loaded_at) > _CACHE_TTL_SECONDS
def _load(self) -> bool:
"""Load all ontology data from DB into memory."""
try:
db = SessionLocal()
try:
return self._load_from_db(db)
finally:
db.close()
except SQLAlchemyError:
logger.warning(
"Failed to load ontology from DB — using stale cache",
exc_info=True,
)
return False
def _load_from_db(self, db) -> bool:
"""Load from DB session."""
# 1. Action types
rows = db.execute(text(
"SELECT canonical_name, phase FROM action_types"
)).fetchall()
action_phases = {r[0]: r[1] for r in rows}
# 2. Action synonyms (aliases + negative patterns)
rows = db.execute(text(
"SELECT canonical_action, synonym, pattern_type FROM action_synonyms"
)).fetchall()
alias_to_action: dict[str, str] = {}
negative_patterns: list[tuple[str, str]] = []
action_synonyms: dict[str, str] = {}
for canonical, synonym, ptype in rows:
if ptype == "negative_pattern":
negative_patterns.append((synonym, canonical))
else:
alias_to_action[synonym] = canonical
action_synonyms[synonym] = canonical
# Sort negative patterns: longest first (for priority matching)
negative_patterns.sort(key=lambda x: -len(x[0]))
# 3. Object synonyms
rows = db.execute(text(
"SELECT canonical_token, synonym FROM object_synonyms"
)).fetchall()
object_synonyms = {r[1]: r[0] for r in rows}
object_keys_sorted = sorted(object_synonyms.keys(), key=len, reverse=True)
# Commit to cache
self._action_phases = action_phases
self._alias_to_action = alias_to_action
self._negative_patterns = negative_patterns
self._action_synonyms = action_synonyms
self._object_synonyms = object_synonyms
self._object_keys_sorted = object_keys_sorted
self._loaded_at = time.monotonic()
logger.info(
"Ontology loaded: %d action_types, %d aliases, %d neg_patterns, %d object_synonyms",
len(action_phases), len(alias_to_action),
len(negative_patterns), len(object_synonyms),
)
return True
@property
def is_loaded(self) -> bool:
"""True if the cache has any data."""
return len(self._action_phases) > 0
def _ensure_loaded(self) -> None:
if self._is_stale():
self._load()
if not self.is_loaded:
raise RuntimeError("OntologyRegistry has no data")
# ── Action Classification (replaces control_ontology.classify_action) ──
def classify_action(self, text_input: str) -> str:
"""Classify text into a canonical action_type."""
self._ensure_loaded()
text_lower = text_input.lower().strip()
# Check negative patterns first
for pattern, action_type in self._negative_patterns:
if pattern in text_lower:
return action_type
# Direct alias match
if text_lower in self._alias_to_action:
return self._alias_to_action[text_lower]
# Substring match (longest first)
best_match = ""
best_action = "implement"
for alias, action_type in sorted(
self._alias_to_action.items(), key=lambda x: -len(x[0])
):
if alias in text_lower and len(alias) > len(best_match):
best_match = alias
best_action = action_type
return best_action
def get_phase(self, action_type: str) -> str:
"""Get the control_phase for an action_type."""
self._ensure_loaded()
return self._action_phases.get(action_type, "implementation")
# ── Action Normalization (replaces control_dedup.normalize_action) ──
def normalize_action(self, action: str) -> str:
"""Normalize an action verb to a canonical English form."""
self._ensure_loaded()
if not action:
return ""
action = action.strip().lower()
action_base = re.sub(r"(en|t|st|e|te|tet|end)$", "", action)
if action in self._action_synonyms:
return self._action_synonyms[action]
if action_base in self._action_synonyms:
return self._action_synonyms[action_base]
for verb, canonical in self._action_synonyms.items():
if action.startswith(verb) or verb.startswith(action):
return canonical
return action
# ── Object Normalization (replaces control_dedup.normalize_object) ──
def normalize_object(self, obj: str) -> str:
"""Normalize an object to a canonical token."""
self._ensure_loaded()
if not obj:
return ""
obj_lower = obj.strip().lower()
# Exact match
if obj_lower in self._object_synonyms:
return self._object_synonyms[obj_lower]
# Substring match (longest phrase first)
for phrase in self._object_keys_sorted:
if phrase in obj_lower:
return self._object_synonyms[phrase]
return obj_lower
def get_action_types(self) -> dict[str, str]:
"""Return all action_type → phase mappings."""
self._ensure_loaded()
return dict(self._action_phases)
def get_object_synonyms(self) -> dict[str, str]:
"""Return all object synonym → canonical mappings."""
self._ensure_loaded()
return dict(self._object_synonyms)
# Module-level singleton
_registry: Optional[OntologyRegistry] = None
def get_ontology_registry() -> OntologyRegistry:
"""Get or create the singleton OntologyRegistry instance."""
global _registry
if _registry is None:
_registry = OntologyRegistry()
return _registry
@@ -0,0 +1,226 @@
"""Tests for OntologyRegistry — DB-backed action/object normalization."""
import time
from unittest.mock import MagicMock, patch
import pytest
from services.ontology_registry import OntologyRegistry, _CACHE_TTL_SECONDS
# ── Mock DB data ──────────────────────────────────────────────────────
_MOCK_ACTION_TYPES = [
("implement", "implementation"),
("monitor", "monitoring"),
("prevent", "implementation"),
("exclude", "implementation"),
("test", "testing"),
("encrypt", "implementation"),
("document", "evidence"),
("train", "training"),
]
_MOCK_ACTION_SYNONYMS = [
# (canonical_action, synonym, pattern_type)
("implement", "implementieren", "alias"),
("implement", "umsetzen", "alias"),
("implement", "einführen", "alias"),
("monitor", "überwachen", "alias"),
("test", "testen", "alias"),
("encrypt", "verschlüsseln", "alias"),
("document", "dokumentieren", "alias"),
("train", "schulen", "alias"),
# Negative patterns
("exclude", "dürfen nicht", "negative_pattern"),
("exclude", "darf nicht", "negative_pattern"),
("prevent", "verhindern", "negative_pattern"),
("prevent", "nicht gespeichert", "negative_pattern"),
]
_MOCK_OBJECT_SYNONYMS = [
("multi_factor_auth", "mfa"),
("multi_factor_auth", "2fa"),
("password_policy", "passwort"),
("encryption", "verschlüsselung"),
("audit_logging", "audit-log"),
("firewall", "firewall"),
("personal_data", "personenbezogene daten"),
]
def _mock_execute(query):
"""Route mock queries to correct test data."""
q = str(query)
mock_result = MagicMock()
if "action_types" in q:
mock_result.fetchall.return_value = _MOCK_ACTION_TYPES
elif "action_synonyms" in q:
mock_result.fetchall.return_value = _MOCK_ACTION_SYNONYMS
elif "object_synonyms" in q:
mock_result.fetchall.return_value = _MOCK_OBJECT_SYNONYMS
else:
mock_result.fetchall.return_value = []
return mock_result
@pytest.fixture
def registry():
"""Create a registry with mocked DB."""
reg = OntologyRegistry()
with patch("services.ontology_registry.SessionLocal") as mock_cls:
mock_session = MagicMock()
mock_session.execute = _mock_execute
mock_cls.return_value = mock_session
reg._load()
return reg
# ── classify_action tests ────────────────────────────────────────────
class TestClassifyAction:
def test_direct_alias(self, registry):
assert registry.classify_action("implementieren") == "implement"
assert registry.classify_action("überwachen") == "monitor"
assert registry.classify_action("testen") == "test"
def test_case_insensitive(self, registry):
assert registry.classify_action("IMPLEMENTIEREN") == "implement"
def test_negative_pattern(self, registry):
assert registry.classify_action("dürfen nicht verwendet werden") == "exclude"
assert registry.classify_action("darf nicht gespeichert werden") == "prevent"
def test_negative_pattern_priority(self, registry):
# "nicht gespeichert" is more specific than "darf nicht"
assert registry.classify_action("nicht gespeichert") == "prevent"
def test_substring_match(self, registry):
assert registry.classify_action("Maßnahmen implementieren und dokumentieren") == "implement"
def test_unknown_defaults_to_implement(self, registry):
assert registry.classify_action("fliegen") == "implement"
# ── get_phase tests ──────────────────────────────────────────────────
class TestGetPhase:
def test_known_phase(self, registry):
assert registry.get_phase("implement") == "implementation"
assert registry.get_phase("monitor") == "monitoring"
assert registry.get_phase("test") == "testing"
def test_unknown_defaults_to_implementation(self, registry):
assert registry.get_phase("unknown_action") == "implementation"
# ── normalize_action tests ───────────────────────────────────────────
class TestNormalizeAction:
def test_exact_match(self, registry):
assert registry.normalize_action("implementieren") == "implement"
assert registry.normalize_action("testen") == "test"
def test_empty(self, registry):
assert registry.normalize_action("") == ""
def test_passthrough_unknown(self, registry):
assert registry.normalize_action("fliegen") == "fliegen"
# ── normalize_object tests ───────────────────────────────────────────
class TestNormalizeObject:
def test_exact_match(self, registry):
assert registry.normalize_object("mfa") == "multi_factor_auth"
assert registry.normalize_object("2fa") == "multi_factor_auth"
assert registry.normalize_object("passwort") == "password_policy"
def test_case_insensitive(self, registry):
assert registry.normalize_object("MFA") == "multi_factor_auth"
def test_substring_match(self, registry):
assert registry.normalize_object("die personenbezogene daten verarbeiten") == "personal_data"
def test_empty(self, registry):
assert registry.normalize_object("") == ""
def test_unknown_passthrough(self, registry):
assert registry.normalize_object("raumschiff") == "raumschiff"
# ── Cache behavior tests ────────────────────────────────────────────
class TestCacheBehavior:
def test_fresh_cache_not_stale(self, registry):
assert registry._is_stale() is False
def test_old_cache_is_stale(self, registry):
registry._loaded_at = time.monotonic() - _CACHE_TTL_SECONDS - 1
assert registry._is_stale() is True
# ── Migration data consistency ───────────────────────────────────────
class TestF2MigrationData:
def test_build_action_types(self):
from scripts.f2_migrate_actions import build_action_types
types = build_action_types()
assert len(types) >= 26
names = {t["canonical_name"] for t in types}
assert "implement" in names
assert "monitor" in names
assert "encrypt" in names
def test_build_action_synonyms(self):
from scripts.f2_migrate_actions import build_action_synonyms
synonyms = build_action_synonyms()
assert len(synonyms) > 100
# Check pattern types
aliases = [s for s in synonyms if s["pattern_type"] == "alias"]
negatives = [s for s in synonyms if s["pattern_type"] == "negative_pattern"]
assert len(aliases) > 80
assert len(negatives) > 15
def test_no_duplicate_synonyms(self):
from scripts.f2_migrate_actions import build_action_synonyms
synonyms = build_action_synonyms()
keys = [(s["synonym"], s["language"], s["pattern_type"]) for s in synonyms]
assert len(keys) == len(set(keys))
def test_all_canonical_actions_exist(self):
from scripts.f2_migrate_actions import build_action_types, build_action_synonyms
type_names = {t["canonical_name"] for t in build_action_types()}
synonyms = build_action_synonyms()
for s in synonyms:
assert s["canonical_action"] in type_names, (
"Synonym '%s' references unknown action '%s'" % (s["synonym"], s["canonical_action"])
)
class TestF3MigrationData:
def test_build_object_rows(self):
from scripts.f3_migrate_objects import build_rows
rows = build_rows()
assert len(rows) >= 70
def test_no_duplicate_objects(self):
from scripts.f3_migrate_objects import build_rows
rows = build_rows()
keys = [(r["synonym"], r["language"]) for r in rows]
assert len(keys) == len(set(keys))
def test_known_objects_present(self):
from scripts.f3_migrate_objects import build_rows
rows = build_rows()
synonyms = {r["synonym"] for r in rows}
assert "mfa" in synonyms
assert "passwort" in synonyms
assert "firewall" in synonyms