feat(pipeline): F4 LLM synonym enrichment script

Uses Ollama (qwen3.5:35b-a3b, think:false) to generate additional
German synonyms for action types and object tokens. Results stored
with source='llm' in action_synonyms/object_synonyms tables.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-05 15:45:43 +02:00
parent 775d8b52f3
commit a20de0b52b
@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
F4: LLM-based Synonym Enrichment for Action Types and Object Tokens.
Uses Ollama (qwen3.5:35b-a3b) to generate additional German synonyms
for each canonical action type and object token. Results are stored
with source='llm' in the DB.
Usage:
# Dry run (print, no DB write):
python3 scripts/f4_llm_enrich_synonyms.py --dry-run
# Against Mac Mini:
python3 scripts/f4_llm_enrich_synonyms.py --db-host macmini --ollama-host macmini
# Only actions or only objects:
python3 scripts/f4_llm_enrich_synonyms.py --actions-only
python3 scripts/f4_llm_enrich_synonyms.py --objects-only
"""
import argparse
import json
import logging
import sys
import time
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("f4-enrich")
OLLAMA_MODEL = "qwen3.5:35b-a3b"
def call_ollama(prompt: str, ollama_url: str) -> str:
"""Call Ollama with think:false for direct answers."""
resp = httpx.post(
f"{ollama_url}/api/chat",
json={
"model": OLLAMA_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.3},
"think": False,
},
timeout=60.0,
)
resp.raise_for_status()
return resp.json().get("message", {}).get("content", "")
def enrich_action_types(db_url: str, ollama_url: str, dry_run: bool) -> dict:
"""Generate synonyms for each action type."""
engine = create_engine(db_url, connect_args={"options": "-c search_path=compliance,public"})
with engine.connect() as conn:
# Get existing action types and their current synonyms
types = conn.execute(text("SELECT canonical_name, phase FROM action_types")).fetchall()
existing = {}
for row in conn.execute(text("SELECT canonical_action, synonym FROM action_synonyms")).fetchall():
existing.setdefault(row[0], set()).add(row[1])
stats = {"types_processed": 0, "new_synonyms": 0, "skipped": 0}
all_new: list[dict] = []
for canonical, phase in types:
current_synonyms = existing.get(canonical, set())
prompt = f"""Du bist ein Compliance-Experte. Gib mir 5-8 deutsche Synonyme oder Umschreibungen fuer die Handlung "{canonical}" (Phase: {phase}) im Kontext von IT-Compliance und Datenschutz.
Bestehende Synonyme (NICHT wiederholen): {', '.join(sorted(current_synonyms)[:10])}
Antworte NUR mit einer JSON-Liste von Strings, z.B.: ["synonym1", "synonym2", ...]
Keine Erklaerungen, nur die JSON-Liste."""
try:
response = call_ollama(prompt, ollama_url)
# Parse JSON from response
synonyms = _parse_json_list(response)
new_count = 0
for syn in synonyms:
syn_lower = syn.lower().strip()
if not syn_lower or len(syn_lower) < 3:
continue
if syn_lower in current_synonyms:
stats["skipped"] += 1
continue
all_new.append({
"canonical_action": canonical,
"synonym": syn_lower,
"language": "de",
"source": "llm",
"pattern_type": "alias",
})
current_synonyms.add(syn_lower)
new_count += 1
stats["types_processed"] += 1
stats["new_synonyms"] += new_count
logger.info("%s: +%d new synonyms", canonical, new_count)
except Exception as e:
logger.warning("Error for %s: %s", canonical, e)
time.sleep(1) # Rate limit
# Write to DB
if not dry_run and all_new:
with engine.begin() as conn:
for row in all_new:
conn.execute(
text("""
INSERT INTO action_synonyms (canonical_action, synonym, language, source, pattern_type)
VALUES (:canonical_action, :synonym, :language, :source, :pattern_type)
ON CONFLICT (synonym, language, pattern_type) DO NOTHING
"""),
row,
)
logger.info("Wrote %d new action synonyms to DB", len(all_new))
elif dry_run:
print("\n--- DRY RUN: Action Synonyms ---")
for row in all_new[:20]:
print(" %s%s" % (row["canonical_action"], row["synonym"]))
if len(all_new) > 20:
print(" ... and %d more" % (len(all_new) - 20))
return stats
def enrich_object_tokens(db_url: str, ollama_url: str, dry_run: bool) -> dict:
"""Generate synonyms for each object canonical token."""
engine = create_engine(db_url, connect_args={"options": "-c search_path=compliance,public"})
with engine.connect() as conn:
# Get unique canonical tokens
tokens = conn.execute(text(
"SELECT DISTINCT canonical_token FROM object_synonyms ORDER BY canonical_token"
)).fetchall()
existing = {}
for row in conn.execute(text("SELECT canonical_token, synonym FROM object_synonyms")).fetchall():
existing.setdefault(row[0], set()).add(row[1])
stats = {"tokens_processed": 0, "new_synonyms": 0, "skipped": 0}
all_new: list[dict] = []
for (token,) in tokens:
current_synonyms = existing.get(token, set())
prompt = f"""Du bist ein IT-Security-Experte. Gib mir 5-8 deutsche und englische Begriffe/Synonyme fuer das Konzept "{token}" im Kontext von IT-Sicherheit und Compliance.
Bestehende Synonyme (NICHT wiederholen): {', '.join(sorted(current_synonyms)[:8])}
Antworte NUR mit einer JSON-Liste von Strings, z.B.: ["synonym1", "synonym2", ...]
Keine Erklaerungen, nur die JSON-Liste."""
try:
response = call_ollama(prompt, ollama_url)
synonyms = _parse_json_list(response)
new_count = 0
for syn in synonyms:
syn_lower = syn.lower().strip()
if not syn_lower or len(syn_lower) < 2:
continue
if syn_lower in current_synonyms:
stats["skipped"] += 1
continue
# Detect language
lang = "de"
if all(c in "abcdefghijklmnopqrstuvwxyz0123456789 -_" for c in syn_lower):
lang = "en"
all_new.append({
"canonical_token": token,
"synonym": syn_lower,
"language": lang,
"source": "llm",
})
current_synonyms.add(syn_lower)
new_count += 1
stats["tokens_processed"] += 1
stats["new_synonyms"] += new_count
logger.info("%s: +%d new synonyms", token, new_count)
except Exception as e:
logger.warning("Error for %s: %s", token, e)
time.sleep(1)
# Write to DB
if not dry_run and all_new:
with engine.begin() as conn:
for row in all_new:
conn.execute(
text("""
INSERT INTO object_synonyms (canonical_token, synonym, language, source)
VALUES (:canonical_token, :synonym, :language, :source)
ON CONFLICT (synonym, language) DO NOTHING
"""),
row,
)
logger.info("Wrote %d new object synonyms to DB", len(all_new))
elif dry_run:
print("\n--- DRY RUN: Object Synonyms ---")
for row in all_new[:20]:
print(" %s%s (%s)" % (row["canonical_token"], row["synonym"], row["language"]))
if len(all_new) > 20:
print(" ... and %d more" % (len(all_new) - 20))
return stats
def _parse_json_list(text: str) -> list[str]:
"""Extract JSON list from LLM response."""
# Try to find JSON array in response
text = text.strip()
# Remove markdown code fences
if "```" in text:
text = text.split("```")[1] if text.count("```") >= 2 else text
text = text.strip()
if text.startswith("json"):
text = text[4:].strip()
# Find first [ and last ]
start = text.find("[")
end = text.rfind("]")
if start >= 0 and end > start:
try:
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
pass
return []
def main():
parser = argparse.ArgumentParser(description="LLM Synonym Enrichment")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--db-host", default="localhost")
parser.add_argument("--ollama-host", default="localhost")
parser.add_argument("--actions-only", action="store_true")
parser.add_argument("--objects-only", action="store_true")
args = parser.parse_args()
db_url = f"postgresql://breakpilot:breakpilot123@{args.db_host}:5432/breakpilot_db"
ollama_url = f"http://{args.ollama_host}:11434"
if args.dry_run:
print("=== DRY RUN MODE ===\n")
if not args.objects_only:
print("=== Enriching Action Types ===")
action_stats = enrich_action_types(db_url, ollama_url, args.dry_run)
print("Actions: %d processed, %d new synonyms\n" % (
action_stats["types_processed"], action_stats["new_synonyms"]))
if not args.actions_only:
print("=== Enriching Object Tokens ===")
object_stats = enrich_object_tokens(db_url, ollama_url, args.dry_run)
print("Objects: %d processed, %d new synonyms\n" % (
object_stats["tokens_processed"], object_stats["new_synonyms"]))
if __name__ == "__main__":
main()