diff --git a/control-pipeline/scripts/f4_llm_enrich_synonyms.py b/control-pipeline/scripts/f4_llm_enrich_synonyms.py new file mode 100644 index 0000000..5f7eb61 --- /dev/null +++ b/control-pipeline/scripts/f4_llm_enrich_synonyms.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +""" +F4: LLM-based Synonym Enrichment for Action Types and Object Tokens. + +Uses Ollama (qwen3.5:35b-a3b) to generate additional German synonyms +for each canonical action type and object token. Results are stored +with source='llm' in the DB. + +Usage: + # Dry run (print, no DB write): + python3 scripts/f4_llm_enrich_synonyms.py --dry-run + + # Against Mac Mini: + python3 scripts/f4_llm_enrich_synonyms.py --db-host macmini --ollama-host macmini + + # Only actions or only objects: + python3 scripts/f4_llm_enrich_synonyms.py --actions-only + python3 scripts/f4_llm_enrich_synonyms.py --objects-only +""" + +import argparse +import json +import logging +import sys +import time +from pathlib import Path + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("f4-enrich") + +OLLAMA_MODEL = "qwen3.5:35b-a3b" + + +def call_ollama(prompt: str, ollama_url: str) -> str: + """Call Ollama with think:false for direct answers.""" + resp = httpx.post( + f"{ollama_url}/api/chat", + json={ + "model": OLLAMA_MODEL, + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "options": {"temperature": 0.3}, + "think": False, + }, + timeout=60.0, + ) + resp.raise_for_status() + return resp.json().get("message", {}).get("content", "") + + +def enrich_action_types(db_url: str, ollama_url: str, dry_run: bool) -> dict: + """Generate synonyms for each action type.""" + engine = create_engine(db_url, connect_args={"options": "-c search_path=compliance,public"}) + + with engine.connect() as conn: + # Get existing action types and their current synonyms + types = conn.execute(text("SELECT canonical_name, phase FROM action_types")).fetchall() + existing = {} + for row in conn.execute(text("SELECT canonical_action, synonym FROM action_synonyms")).fetchall(): + existing.setdefault(row[0], set()).add(row[1]) + + stats = {"types_processed": 0, "new_synonyms": 0, "skipped": 0} + all_new: list[dict] = [] + + for canonical, phase in types: + current_synonyms = existing.get(canonical, set()) + + prompt = f"""Du bist ein Compliance-Experte. Gib mir 5-8 deutsche Synonyme oder Umschreibungen fuer die Handlung "{canonical}" (Phase: {phase}) im Kontext von IT-Compliance und Datenschutz. + +Bestehende Synonyme (NICHT wiederholen): {', '.join(sorted(current_synonyms)[:10])} + +Antworte NUR mit einer JSON-Liste von Strings, z.B.: ["synonym1", "synonym2", ...] +Keine Erklaerungen, nur die JSON-Liste.""" + + try: + response = call_ollama(prompt, ollama_url) + # Parse JSON from response + synonyms = _parse_json_list(response) + + new_count = 0 + for syn in synonyms: + syn_lower = syn.lower().strip() + if not syn_lower or len(syn_lower) < 3: + continue + if syn_lower in current_synonyms: + stats["skipped"] += 1 + continue + all_new.append({ + "canonical_action": canonical, + "synonym": syn_lower, + "language": "de", + "source": "llm", + "pattern_type": "alias", + }) + current_synonyms.add(syn_lower) + new_count += 1 + + stats["types_processed"] += 1 + stats["new_synonyms"] += new_count + logger.info("%s: +%d new synonyms", canonical, new_count) + + except Exception as e: + logger.warning("Error for %s: %s", canonical, e) + + time.sleep(1) # Rate limit + + # Write to DB + if not dry_run and all_new: + with engine.begin() as conn: + for row in all_new: + conn.execute( + text(""" + INSERT INTO action_synonyms (canonical_action, synonym, language, source, pattern_type) + VALUES (:canonical_action, :synonym, :language, :source, :pattern_type) + ON CONFLICT (synonym, language, pattern_type) DO NOTHING + """), + row, + ) + logger.info("Wrote %d new action synonyms to DB", len(all_new)) + elif dry_run: + print("\n--- DRY RUN: Action Synonyms ---") + for row in all_new[:20]: + print(" %s → %s" % (row["canonical_action"], row["synonym"])) + if len(all_new) > 20: + print(" ... and %d more" % (len(all_new) - 20)) + + return stats + + +def enrich_object_tokens(db_url: str, ollama_url: str, dry_run: bool) -> dict: + """Generate synonyms for each object canonical token.""" + engine = create_engine(db_url, connect_args={"options": "-c search_path=compliance,public"}) + + with engine.connect() as conn: + # Get unique canonical tokens + tokens = conn.execute(text( + "SELECT DISTINCT canonical_token FROM object_synonyms ORDER BY canonical_token" + )).fetchall() + existing = {} + for row in conn.execute(text("SELECT canonical_token, synonym FROM object_synonyms")).fetchall(): + existing.setdefault(row[0], set()).add(row[1]) + + stats = {"tokens_processed": 0, "new_synonyms": 0, "skipped": 0} + all_new: list[dict] = [] + + for (token,) in tokens: + current_synonyms = existing.get(token, set()) + + prompt = f"""Du bist ein IT-Security-Experte. Gib mir 5-8 deutsche und englische Begriffe/Synonyme fuer das Konzept "{token}" im Kontext von IT-Sicherheit und Compliance. + +Bestehende Synonyme (NICHT wiederholen): {', '.join(sorted(current_synonyms)[:8])} + +Antworte NUR mit einer JSON-Liste von Strings, z.B.: ["synonym1", "synonym2", ...] +Keine Erklaerungen, nur die JSON-Liste.""" + + try: + response = call_ollama(prompt, ollama_url) + synonyms = _parse_json_list(response) + + new_count = 0 + for syn in synonyms: + syn_lower = syn.lower().strip() + if not syn_lower or len(syn_lower) < 2: + continue + if syn_lower in current_synonyms: + stats["skipped"] += 1 + continue + # Detect language + lang = "de" + if all(c in "abcdefghijklmnopqrstuvwxyz0123456789 -_" for c in syn_lower): + lang = "en" + all_new.append({ + "canonical_token": token, + "synonym": syn_lower, + "language": lang, + "source": "llm", + }) + current_synonyms.add(syn_lower) + new_count += 1 + + stats["tokens_processed"] += 1 + stats["new_synonyms"] += new_count + logger.info("%s: +%d new synonyms", token, new_count) + + except Exception as e: + logger.warning("Error for %s: %s", token, e) + + time.sleep(1) + + # Write to DB + if not dry_run and all_new: + with engine.begin() as conn: + for row in all_new: + conn.execute( + text(""" + INSERT INTO object_synonyms (canonical_token, synonym, language, source) + VALUES (:canonical_token, :synonym, :language, :source) + ON CONFLICT (synonym, language) DO NOTHING + """), + row, + ) + logger.info("Wrote %d new object synonyms to DB", len(all_new)) + elif dry_run: + print("\n--- DRY RUN: Object Synonyms ---") + for row in all_new[:20]: + print(" %s → %s (%s)" % (row["canonical_token"], row["synonym"], row["language"])) + if len(all_new) > 20: + print(" ... and %d more" % (len(all_new) - 20)) + + return stats + + +def _parse_json_list(text: str) -> list[str]: + """Extract JSON list from LLM response.""" + # Try to find JSON array in response + text = text.strip() + # Remove markdown code fences + if "```" in text: + text = text.split("```")[1] if text.count("```") >= 2 else text + text = text.strip() + if text.startswith("json"): + text = text[4:].strip() + + # Find first [ and last ] + start = text.find("[") + end = text.rfind("]") + if start >= 0 and end > start: + try: + return json.loads(text[start:end + 1]) + except json.JSONDecodeError: + pass + return [] + + +def main(): + parser = argparse.ArgumentParser(description="LLM Synonym Enrichment") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--db-host", default="localhost") + parser.add_argument("--ollama-host", default="localhost") + parser.add_argument("--actions-only", action="store_true") + parser.add_argument("--objects-only", action="store_true") + args = parser.parse_args() + + db_url = f"postgresql://breakpilot:breakpilot123@{args.db_host}:5432/breakpilot_db" + ollama_url = f"http://{args.ollama_host}:11434" + + if args.dry_run: + print("=== DRY RUN MODE ===\n") + + if not args.objects_only: + print("=== Enriching Action Types ===") + action_stats = enrich_action_types(db_url, ollama_url, args.dry_run) + print("Actions: %d processed, %d new synonyms\n" % ( + action_stats["types_processed"], action_stats["new_synonyms"])) + + if not args.actions_only: + print("=== Enriching Object Tokens ===") + object_stats = enrich_object_tokens(db_url, ollama_url, args.dry_run) + print("Objects: %d processed, %d new synonyms\n" % ( + object_stats["tokens_processed"], object_stats["new_synonyms"])) + + +if __name__ == "__main__": + main()