Files
Benjamin Admin 8510af46eb feat(pipeline): MC Quality Overhaul — 74.5% → 92.8% accuracy, 5.3K → 13.6K MCs
Phase 0: Quality Audit script (Claude Sonnet, 1750 samples)
Phase 1: Object ontology expanded 31 → 74 tokens with descriptions + boundaries
Phase 2: 174K controls re-classified via Haiku (10 batches, $50)
  - Generic tokens removed (documentation, procedure, process)
  - L2 sub-topics added (108K + 64K controls)
  - Bad subtopics fixed (stakeholder_*, escalation fragments)
Phase 3: Re-clustering K=18704 (37K objects → 16.7K groups)
Phase 4: Direct MC generation from canonical tokens (gpre2_direct_mc.py)
Phase 5: Regulation-source split (gpre3, dry-run tested)

New features:
- Tenant-isolated document upload API (rag-service)
- BAuA crawler (Playwright, 131 PDFs downloaded)
- OSHA Technical Manual crawler (23 chapters)
- CE obligation extractor (6141 obligations from Qdrant)

RAG ingestion:
- 126 BAuA PDFs (TRBS/TRGS/ASR): 27,664 chunks
- OSHA Technical Manual: 7,241 chunks
- OSHA 1910 Subpart O (full): 745 chunks
- EuGH C-588/21 P: 216 chunks
- EU 2018/1725: 842 chunks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-10 15:08:15 +02:00

215 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""
Extract CE-relevant obligations from TRBS/TRGS/ASR/OSHA chunks in Qdrant.
Searches for MUSS/SOLL patterns in chunk texts and classifies them.
Output: JSON file with structured obligations for the CE session.
Usage:
python3 /app/scripts/extract_ce_obligations.py
python3 /app/scripts/extract_ce_obligations.py --output /tmp/ce_obligations.json
"""
import argparse
import json
import logging
import os
import re
from pathlib import Path
import httpx
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("ce-obligations")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
COLLECTION = "bp_compliance_ce"
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
LLM_MODEL = "qwen3.5:35b-a3b"
# Obligation patterns (DE + EN)
OBLIGATION_PATTERNS = re.compile(
r"(muss|müssen|hat\s+[\w\s]*zu\s|ist\s+[\w\s]*sicherzustellen|"
r"ist\s+verpflichtet|sind\s+verpflichtet|darf\s+nicht|"
r"shall|must|required\s+to|is\s+required|shall\s+not)",
re.IGNORECASE,
)
# CE relevance keywords
CE_KEYWORDS = re.compile(
r"(maschine|schutzeinrichtung|gefährdung|quetsch|scher|stoß|"
r"schneid|fang|einzug|absturz|druck|explosion|brand|"
r"elektrisch|spannung|erdung|schutzleiter|not-halt|"
r"betriebsanleitung|kennzeichnung|prüfung|prüfpflicht|"
r"instandhaltung|wartung|sicherheitsabstand|"
r"schutzmaßnahme|persönliche schutzausrüstung|psa|"
r"machine|guard|hazard|crush|shear|cut|entangle|"
r"lockout|tagout|electrical|grounding|emergency stop|"
r"safety distance|protective device|ppe|inspection)",
re.IGNORECASE,
)
HAZARD_CATEGORIES = {
"quetsch|crush|squeeze": "mechanical_crushing",
"schneid|cut": "mechanical_cutting",
"fang|einzug|entangle|draw": "mechanical_entanglement",
"absturz|fall": "fall_hazard",
"explosion|ex-bereich|atex": "explosion_hazard",
"brand|fire|feuer": "fire_hazard",
"elektrisch|electrical|spannung|voltage": "electrical_hazard",
"lärm|noise|schall": "noise_hazard",
"gefahrstoff|hazardous substance|chemical": "chemical_hazard",
"ergonomie|ergonomic|heben|lift": "ergonomic_hazard",
"temperatur|heat|hitze|kälte|cold": "thermal_hazard",
"strahlung|radiation|laser": "radiation_hazard",
"not-halt|emergency stop|e-stop": "emergency_stop",
"lockout|tagout|loto": "lockout_tagout",
"kennzeichnung|label|marking|sign": "safety_marking",
"prüfung|inspection|test": "inspection_requirement",
"instandhaltung|maintenance|wartung": "maintenance",
"schutzeinrichtung|guard|protective device": "protective_device",
"betriebsanleitung|instruction|manual": "operating_instructions",
"druck|pressure|behälter|vessel|kessel|boiler": "pressure_hazard",
}
# Source-based overrides: TRGS docs about chemicals/storage
# should never be classified as mechanical hazards
_CHEMICAL_SOURCES = re.compile(
r"trgs\s*(5[0-9]{2}|7[0-9]{2}|9[0-9]{2}|4[0-9]{2}|6[0-9]{2})",
re.IGNORECASE,
)
def _classify_hazard(text: str, source: str) -> str:
"""Classify hazard with source-aware overrides."""
# TRGS sources → chemical/pressure/explosion, never mechanical
if _CHEMICAL_SOURCES.search(source):
if re.search(r"explosion|ex-bereich|atex|zündfähig", text, re.IGNORECASE):
return "explosion_hazard"
if re.search(r"druck|pressure|behälter|vessel", text, re.IGNORECASE):
return "pressure_hazard"
if re.search(r"brand|fire|feuer", text, re.IGNORECASE):
return "fire_hazard"
return "chemical_hazard"
# Standard pattern matching (order matters — specific first)
for pattern, category in HAZARD_CATEGORIES.items():
if re.search(pattern, text, re.IGNORECASE):
return category
return "general"
def scroll_chunks(source_filter: str = None) -> list[dict]:
"""Scroll through Qdrant to get all relevant chunks."""
chunks = []
offset = None
batch = 100
while True:
scroll_body = {
"limit": batch,
"with_payload": True,
"with_vector": False,
}
if offset is not None:
scroll_body["offset"] = offset
resp = httpx.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll",
json=scroll_body,
timeout=30.0,
)
data = resp.json()
points = data.get("result", {}).get("points", [])
next_offset = data.get("result", {}).get("next_page_offset")
for pt in points:
payload = pt.get("payload", {})
source = payload.get("source", payload.get("filename", ""))
text = payload.get("chunk_text", "")
# Filter for TRBS/TRGS/ASR/OSHA
source_lower = source.lower()
is_relevant = any(k in source_lower for k in
["trbs", "trgs", "asr", "osha"])
if not is_relevant:
continue
# Check for obligation patterns
if not OBLIGATION_PATTERNS.search(text):
continue
# Check CE relevance
if not CE_KEYWORDS.search(text):
continue
# Classify hazard category (source-aware)
hazard = _classify_hazard(text, source)
# Determine obligation type
if re.search(r"muss|müssen|shall|must|required", text, re.IGNORECASE):
obl_type = "MUSS"
elif re.search(r"soll|sollte|should", text, re.IGNORECASE):
obl_type = "SOLL"
else:
obl_type = "MUSS"
chunks.append({
"source": source,
"section": payload.get("section", ""),
"paragraph": payload.get("paragraph", ""),
"obligation_text": text.strip()[:500],
"hazard_category": hazard,
"obligation_type": obl_type,
"ce_relevance": "high" if hazard != "general" else "medium",
"filename": payload.get("filename", ""),
})
if next_offset is None or not points:
break
offset = next_offset
if len(chunks) % 500 == 0:
logger.info(" Scanned... %d obligations found so far", len(chunks))
return chunks
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--output", default="/tmp/ce_obligations.json")
args = parser.parse_args()
logger.info("Scanning %s for CE obligations...", COLLECTION)
obligations = scroll_chunks()
logger.info("Found %d CE-relevant obligations", len(obligations))
# Stats
by_source = {}
by_hazard = {}
for o in obligations:
src = o["source"][:30]
by_source[src] = by_source.get(src, 0) + 1
by_hazard[o["hazard_category"]] = by_hazard.get(o["hazard_category"], 0) + 1
logger.info("\nBy source:")
for src, cnt in sorted(by_source.items(), key=lambda x: -x[1])[:20]:
logger.info(" %4d %s", cnt, src)
logger.info("\nBy hazard category:")
for cat, cnt in sorted(by_hazard.items(), key=lambda x: -x[1]):
logger.info(" %4d %s", cnt, cat)
# Save
Path(args.output).write_text(
json.dumps(obligations, indent=2, ensure_ascii=False)
)
logger.info("\nSaved to %s", args.output)
if __name__ == "__main__":
main()