#!/usr/bin/env python3 """ Extract CE-relevant obligations from TRBS/TRGS/ASR/OSHA chunks in Qdrant. Searches for MUSS/SOLL patterns in chunk texts and classifies them. Output: JSON file with structured obligations for the CE session. Usage: python3 /app/scripts/extract_ce_obligations.py python3 /app/scripts/extract_ce_obligations.py --output /tmp/ce_obligations.json """ import argparse import json import logging import os import re from pathlib import Path import httpx logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("ce-obligations") QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333") COLLECTION = "bp_compliance_ce" OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") LLM_MODEL = "qwen3.5:35b-a3b" # Obligation patterns (DE + EN) OBLIGATION_PATTERNS = re.compile( r"(muss|müssen|hat\s+[\w\s]*zu\s|ist\s+[\w\s]*sicherzustellen|" r"ist\s+verpflichtet|sind\s+verpflichtet|darf\s+nicht|" r"shall|must|required\s+to|is\s+required|shall\s+not)", re.IGNORECASE, ) # CE relevance keywords CE_KEYWORDS = re.compile( r"(maschine|schutzeinrichtung|gefährdung|quetsch|scher|stoß|" r"schneid|fang|einzug|absturz|druck|explosion|brand|" r"elektrisch|spannung|erdung|schutzleiter|not-halt|" r"betriebsanleitung|kennzeichnung|prüfung|prüfpflicht|" r"instandhaltung|wartung|sicherheitsabstand|" r"schutzmaßnahme|persönliche schutzausrüstung|psa|" r"machine|guard|hazard|crush|shear|cut|entangle|" r"lockout|tagout|electrical|grounding|emergency stop|" r"safety distance|protective device|ppe|inspection)", re.IGNORECASE, ) HAZARD_CATEGORIES = { "quetsch|crush|squeeze": "mechanical_crushing", "schneid|cut": "mechanical_cutting", "fang|einzug|entangle|draw": "mechanical_entanglement", "absturz|fall": "fall_hazard", "explosion|ex-bereich|atex": "explosion_hazard", "brand|fire|feuer": "fire_hazard", "elektrisch|electrical|spannung|voltage": "electrical_hazard", "lärm|noise|schall": "noise_hazard", "gefahrstoff|hazardous substance|chemical": "chemical_hazard", "ergonomie|ergonomic|heben|lift": "ergonomic_hazard", "temperatur|heat|hitze|kälte|cold": "thermal_hazard", "strahlung|radiation|laser": "radiation_hazard", "not-halt|emergency stop|e-stop": "emergency_stop", "lockout|tagout|loto": "lockout_tagout", "kennzeichnung|label|marking|sign": "safety_marking", "prüfung|inspection|test": "inspection_requirement", "instandhaltung|maintenance|wartung": "maintenance", "schutzeinrichtung|guard|protective device": "protective_device", "betriebsanleitung|instruction|manual": "operating_instructions", "druck|pressure|behälter|vessel|kessel|boiler": "pressure_hazard", } # Source-based overrides: TRGS docs about chemicals/storage # should never be classified as mechanical hazards _CHEMICAL_SOURCES = re.compile( r"trgs\s*(5[0-9]{2}|7[0-9]{2}|9[0-9]{2}|4[0-9]{2}|6[0-9]{2})", re.IGNORECASE, ) def _classify_hazard(text: str, source: str) -> str: """Classify hazard with source-aware overrides.""" # TRGS sources → chemical/pressure/explosion, never mechanical if _CHEMICAL_SOURCES.search(source): if re.search(r"explosion|ex-bereich|atex|zündfähig", text, re.IGNORECASE): return "explosion_hazard" if re.search(r"druck|pressure|behälter|vessel", text, re.IGNORECASE): return "pressure_hazard" if re.search(r"brand|fire|feuer", text, re.IGNORECASE): return "fire_hazard" return "chemical_hazard" # Standard pattern matching (order matters — specific first) for pattern, category in HAZARD_CATEGORIES.items(): if re.search(pattern, text, re.IGNORECASE): return category return "general" def scroll_chunks(source_filter: str = None) -> list[dict]: """Scroll through Qdrant to get all relevant chunks.""" chunks = [] offset = None batch = 100 while True: scroll_body = { "limit": batch, "with_payload": True, "with_vector": False, } if offset is not None: scroll_body["offset"] = offset resp = httpx.post( f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll", json=scroll_body, timeout=30.0, ) data = resp.json() points = data.get("result", {}).get("points", []) next_offset = data.get("result", {}).get("next_page_offset") for pt in points: payload = pt.get("payload", {}) source = payload.get("source", payload.get("filename", "")) text = payload.get("chunk_text", "") # Filter for TRBS/TRGS/ASR/OSHA source_lower = source.lower() is_relevant = any(k in source_lower for k in ["trbs", "trgs", "asr", "osha"]) if not is_relevant: continue # Check for obligation patterns if not OBLIGATION_PATTERNS.search(text): continue # Check CE relevance if not CE_KEYWORDS.search(text): continue # Classify hazard category (source-aware) hazard = _classify_hazard(text, source) # Determine obligation type if re.search(r"muss|müssen|shall|must|required", text, re.IGNORECASE): obl_type = "MUSS" elif re.search(r"soll|sollte|should", text, re.IGNORECASE): obl_type = "SOLL" else: obl_type = "MUSS" chunks.append({ "source": source, "section": payload.get("section", ""), "paragraph": payload.get("paragraph", ""), "obligation_text": text.strip()[:500], "hazard_category": hazard, "obligation_type": obl_type, "ce_relevance": "high" if hazard != "general" else "medium", "filename": payload.get("filename", ""), }) if next_offset is None or not points: break offset = next_offset if len(chunks) % 500 == 0: logger.info(" Scanned... %d obligations found so far", len(chunks)) return chunks def main(): parser = argparse.ArgumentParser() parser.add_argument("--output", default="/tmp/ce_obligations.json") args = parser.parse_args() logger.info("Scanning %s for CE obligations...", COLLECTION) obligations = scroll_chunks() logger.info("Found %d CE-relevant obligations", len(obligations)) # Stats by_source = {} by_hazard = {} for o in obligations: src = o["source"][:30] by_source[src] = by_source.get(src, 0) + 1 by_hazard[o["hazard_category"]] = by_hazard.get(o["hazard_category"], 0) + 1 logger.info("\nBy source:") for src, cnt in sorted(by_source.items(), key=lambda x: -x[1])[:20]: logger.info(" %4d %s", cnt, src) logger.info("\nBy hazard category:") for cat, cnt in sorted(by_hazard.items(), key=lambda x: -x[1]): logger.info(" %4d %s", cnt, cat) # Save Path(args.output).write_text( json.dumps(obligations, indent=2, ensure_ascii=False) ) logger.info("\nSaved to %s", args.output) if __name__ == "__main__": main()