652e3a65a3
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 36s
CI / test-python-voice (push) Successful in 33s
CI / test-bqas (push) Successful in 31s
Migrates ACTION_TYPES (26+8 types), _NEGATIVE_PATTERNS (22), _ACTION_SYNONYMS (65), and _OBJECT_SYNONYMS (75) from hardcoded dicts to DB tables. - SQL migration: 003_action_object_ontology.sql (3 tables) - Migration scripts: f2_migrate_actions.py (34 types, 145 synonyms), f3_migrate_objects.py (75 objects) - OntologyRegistry cache: 5min TTL, raises RuntimeError if empty (safe fallback to dicts) - control_ontology.classify_action/get_phase delegate to DB with dict fallback - control_dedup.normalize_action/normalize_object delegate to DB with dict fallback - 25 new tests, 446 total pass, 0 regressions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
207 lines
6.8 KiB
Python
207 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
F2 Migration: Populate action_types + action_synonyms from hardcoded dicts.
|
|
|
|
Sources:
|
|
- ACTION_TYPES (control_ontology.py) — 26 types + ~150 aliases
|
|
- _NEGATIVE_PATTERNS (control_ontology.py) — 22 patterns
|
|
- _ACTION_SYNONYMS (control_dedup.py) — 65 synonyms
|
|
|
|
Usage:
|
|
python3 scripts/f2_migrate_actions.py --dry-run
|
|
python3 scripts/f2_migrate_actions.py --db-host macmini
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from services.control_ontology import ACTION_TYPES, _NEGATIVE_PATTERNS # noqa: E402
|
|
from services.control_dedup import _ACTION_SYNONYMS # noqa: E402
|
|
|
|
# Extra action types found in _ACTION_SYNONYMS but missing from ACTION_TYPES
|
|
_EXTRA_ACTION_TYPES = {
|
|
"audit": "evidence",
|
|
"log": "evidence",
|
|
"block": "implementation",
|
|
"authorize": "governance",
|
|
"authenticate": "implementation",
|
|
"update": "operation",
|
|
"backup": "operation",
|
|
"restore": "operation",
|
|
}
|
|
|
|
|
|
def build_action_types() -> list[dict]:
|
|
"""Build action_types rows from ACTION_TYPES + extras."""
|
|
rows = []
|
|
for name, info in ACTION_TYPES.items():
|
|
rows.append({
|
|
"canonical_name": name,
|
|
"phase": info["phase"],
|
|
})
|
|
for name, phase in _EXTRA_ACTION_TYPES.items():
|
|
if name not in ACTION_TYPES:
|
|
rows.append({
|
|
"canonical_name": name,
|
|
"phase": phase,
|
|
})
|
|
return rows
|
|
|
|
|
|
def build_action_synonyms() -> list[dict]:
|
|
"""Build action_synonyms rows from all 3 sources."""
|
|
rows = []
|
|
seen: set[tuple[str, str, str]] = set() # (synonym, language, pattern_type)
|
|
|
|
# 1) Aliases from ACTION_TYPES
|
|
for action_type, info in ACTION_TYPES.items():
|
|
for alias in info.get("aliases", []):
|
|
key = (alias.lower(), "de", "alias")
|
|
if key not in seen:
|
|
seen.add(key)
|
|
rows.append({
|
|
"canonical_action": action_type,
|
|
"synonym": alias.lower(),
|
|
"language": "de",
|
|
"source": "migration",
|
|
"pattern_type": "alias",
|
|
})
|
|
|
|
# 2) Negative patterns
|
|
for pattern, action_type in _NEGATIVE_PATTERNS:
|
|
key = (pattern.lower(), "de", "negative_pattern")
|
|
if key not in seen:
|
|
seen.add(key)
|
|
rows.append({
|
|
"canonical_action": action_type,
|
|
"synonym": pattern.lower(),
|
|
"language": "de",
|
|
"source": "migration",
|
|
"pattern_type": "negative_pattern",
|
|
})
|
|
|
|
# 3) _ACTION_SYNONYMS (German → canonical English)
|
|
for synonym, canonical in _ACTION_SYNONYMS.items():
|
|
# Determine language
|
|
lang = "en" if synonym == canonical else "de"
|
|
key = (synonym.lower(), lang, "alias")
|
|
if key not in seen:
|
|
seen.add(key)
|
|
# Map canonical to valid action_type
|
|
action = _map_dedup_canonical(canonical)
|
|
rows.append({
|
|
"canonical_action": action,
|
|
"synonym": synonym.lower(),
|
|
"language": lang,
|
|
"source": "migration",
|
|
"pattern_type": "alias",
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def _map_dedup_canonical(canonical: str) -> str:
|
|
"""Map control_dedup canonical names to action_types names."""
|
|
# Most map directly, some need adjustment
|
|
mapping = {
|
|
"test": "test",
|
|
"verify": "verify", # in ACTION_TYPES
|
|
"validate": "validate", # in ACTION_TYPES
|
|
"audit": "audit",
|
|
"log": "log",
|
|
"block": "block",
|
|
"restrict": "restrict_access",
|
|
"authorize": "authorize",
|
|
"authenticate": "authenticate",
|
|
"update": "update",
|
|
"backup": "backup",
|
|
"restore": "restore",
|
|
}
|
|
return mapping.get(canonical, canonical)
|
|
|
|
|
|
def insert_via_sqlalchemy(action_types: list[dict], synonyms: list[dict], db_host: str):
|
|
"""Insert rows using SQLAlchemy."""
|
|
from sqlalchemy import create_engine, text
|
|
|
|
url = "postgresql://breakpilot:breakpilot123@%s:5432/breakpilot_db" % db_host
|
|
engine = create_engine(url)
|
|
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SET search_path TO compliance, public"))
|
|
|
|
# Insert action_types
|
|
for row in action_types:
|
|
conn.execute(
|
|
text("""
|
|
INSERT INTO action_types (canonical_name, phase)
|
|
VALUES (:canonical_name, :phase)
|
|
ON CONFLICT (canonical_name) DO UPDATE SET
|
|
phase = EXCLUDED.phase
|
|
"""),
|
|
row,
|
|
)
|
|
print("Inserted %d action_types" % len(action_types))
|
|
|
|
# Insert action_synonyms
|
|
inserted = 0
|
|
skipped = 0
|
|
for row in synonyms:
|
|
try:
|
|
conn.execute(
|
|
text("""
|
|
INSERT INTO action_synonyms
|
|
(canonical_action, synonym, language, source, pattern_type)
|
|
VALUES
|
|
(:canonical_action, :synonym, :language, :source, :pattern_type)
|
|
ON CONFLICT (synonym, language, pattern_type) DO UPDATE SET
|
|
canonical_action = EXCLUDED.canonical_action,
|
|
source = EXCLUDED.source
|
|
"""),
|
|
row,
|
|
)
|
|
inserted += 1
|
|
except Exception as e:
|
|
print(" Skip %s: %s" % (row["synonym"], e))
|
|
skipped += 1
|
|
|
|
conn.commit()
|
|
print("Inserted %d action_synonyms (%d skipped)" % (inserted, skipped))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Migrate action types + synonyms")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print stats only")
|
|
parser.add_argument("--db-host", default="localhost", help="PostgreSQL host")
|
|
args = parser.parse_args()
|
|
|
|
action_types = build_action_types()
|
|
synonyms = build_action_synonyms()
|
|
|
|
print("Action types: %d" % len(action_types))
|
|
print("Action synonyms: %d" % len(synonyms))
|
|
by_type = {}
|
|
for s in synonyms:
|
|
by_type[s["pattern_type"]] = by_type.get(s["pattern_type"], 0) + 1
|
|
print(" By pattern_type: %s" % by_type)
|
|
by_source = {}
|
|
for s in synonyms:
|
|
by_source[s["canonical_action"]] = by_source.get(s["canonical_action"], 0) + 1
|
|
print(" Top actions: %s" % dict(sorted(by_source.items(), key=lambda x: -x[1])[:10]))
|
|
|
|
if args.dry_run:
|
|
print("\n--- DRY RUN ---")
|
|
print("\nAction types:")
|
|
for at in action_types:
|
|
print(" %s (%s)" % (at["canonical_name"], at["phase"]))
|
|
return
|
|
|
|
insert_via_sqlalchemy(action_types, synonyms, args.db_host)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|