81db904b3e
OSHA 29 CFR 1910 Subpart O (1910.211-1910.219) — complete machine guarding requirements. US federal law, public domain. International norms mapping table: China GB/T, Korea KS, India BIS equivalents to ISO/EN standards. Unfortunately all countries protect ISO copyright even for identical national adoptions (IDT). Only OSHA provides truly free machinery safety content. EU Excel harmonised standards list included for reference. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
257 lines
8.9 KiB
Python
257 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BAuA Regulatory Crawler — TRBS, TRGS, ASR
|
|
|
|
Crawls the BAuA website using Playwright (headless browser),
|
|
extracts PDF links, downloads all documents.
|
|
|
|
Usage:
|
|
python3 crawl_baua.py # download all
|
|
python3 crawl_baua.py --category trbs # only TRBS
|
|
python3 crawl_baua.py --dry-run # list PDFs without downloading
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
|
|
)
|
|
logger = logging.getLogger("baua-crawler")
|
|
|
|
BASE_URL = "https://www.baua.de"
|
|
OUTPUT_DIR = Path(__file__).parent / "pdfs"
|
|
REGISTRY_FILE = Path(__file__).parent / "source_registry.json"
|
|
|
|
CATEGORIES = {
|
|
"trbs": {
|
|
"url": f"{BASE_URL}/DE/Angebote/Regelwerk/TRBS/TRBS.html",
|
|
"name": "Technische Regeln für Betriebssicherheit",
|
|
"source_type": "technical_rule",
|
|
"legal_basis": "BetrSichV",
|
|
},
|
|
"trgs": {
|
|
"url": f"{BASE_URL}/DE/Angebote/Regelwerk/TRGS/TRGS.html",
|
|
"name": "Technische Regeln für Gefahrstoffe",
|
|
"source_type": "technical_rule",
|
|
"legal_basis": "GefStoffV",
|
|
},
|
|
"asr": {
|
|
"url": f"{BASE_URL}/DE/Angebote/Regelwerk/ASR/ASR.html",
|
|
"name": "Arbeitsstättenregeln",
|
|
"source_type": "technical_rule",
|
|
"legal_basis": "ArbStättV",
|
|
},
|
|
}
|
|
|
|
|
|
def crawl_index(page, category: str, config: dict) -> list[dict]:
|
|
"""Crawl index page and extract detail page links."""
|
|
logger.info("Crawling %s index: %s", category.upper(), config["url"])
|
|
page.goto(config["url"], wait_until="networkidle", timeout=30000)
|
|
time.sleep(3) # Wait for BunnyShield
|
|
|
|
# Extract all links to detail pages
|
|
links = page.query_selector_all("a[href]")
|
|
detail_urls = []
|
|
seen = set()
|
|
|
|
for link in links:
|
|
href = link.get_attribute("href") or ""
|
|
text = (link.inner_text() or "").strip()
|
|
|
|
# Match pattern: /DE/Angebote/Regelwerk/TRBS/TRBS-1111 (no .html!)
|
|
# ASR uses ASR-A1-3 (not ASR-ASR-A1-3)
|
|
base_pattern = f"/DE/Angebote/Regelwerk/{category.upper()}/"
|
|
is_detail = (base_pattern in href
|
|
and "#" not in href and "?" not in href
|
|
and href != base_pattern.rstrip("/")
|
|
and href.split("/")[-1] != category.upper())
|
|
if is_detail and href not in seen:
|
|
full_url = urljoin(BASE_URL, href)
|
|
seen.add(href)
|
|
|
|
# Extract regulation number from URL
|
|
filename = href.split("/")[-1]
|
|
detail_urls.append({
|
|
"detail_url": full_url,
|
|
"title": text[:200] if text else filename,
|
|
"filename": filename,
|
|
"category": category,
|
|
})
|
|
|
|
logger.info("Found %d detail pages for %s", len(detail_urls), category.upper())
|
|
return detail_urls
|
|
|
|
|
|
def extract_pdf_url(page, detail: dict) -> dict:
|
|
"""Visit detail page and extract PDF download link."""
|
|
try:
|
|
page.goto(detail["detail_url"], wait_until="networkidle", timeout=30000)
|
|
time.sleep(2)
|
|
|
|
# Strategy 1: Direct PDF link
|
|
pdf_links = page.query_selector_all('a[href$=".pdf"]')
|
|
for link in pdf_links:
|
|
href = link.get_attribute("href") or ""
|
|
if href:
|
|
detail["pdf_url"] = urljoin(BASE_URL, href)
|
|
return detail
|
|
|
|
# Strategy 2: Download button with data attribute
|
|
download_btns = page.query_selector_all("[data-download-url]")
|
|
for btn in download_btns:
|
|
url = btn.get_attribute("data-download-url") or ""
|
|
if url and ".pdf" in url:
|
|
detail["pdf_url"] = urljoin(BASE_URL, url)
|
|
return detail
|
|
|
|
# Strategy 3: Links containing "pdf" or "download"
|
|
all_links = page.query_selector_all("a[href]")
|
|
for link in all_links:
|
|
href = link.get_attribute("href") or ""
|
|
text = (link.inner_text() or "").lower()
|
|
if (".pdf" in href or "download" in text) and href:
|
|
detail["pdf_url"] = urljoin(BASE_URL, href)
|
|
return detail
|
|
|
|
# Strategy 4: Check for blob/dynamic download
|
|
download_links = page.query_selector_all(
|
|
'a[href*="blob"], a[href*="download"], a[href*="__blob"]'
|
|
)
|
|
for link in download_links:
|
|
href = link.get_attribute("href") or ""
|
|
if href:
|
|
detail["pdf_url"] = urljoin(BASE_URL, href)
|
|
return detail
|
|
|
|
logger.warning("No PDF found for %s", detail["filename"])
|
|
detail["pdf_url"] = None
|
|
return detail
|
|
|
|
except Exception as e:
|
|
logger.error("Error on %s: %s", detail["detail_url"], e)
|
|
detail["pdf_url"] = None
|
|
return detail
|
|
|
|
|
|
def download_pdf(page, detail: dict, output_dir: Path) -> dict:
|
|
"""Download PDF and compute hash."""
|
|
if not detail.get("pdf_url"):
|
|
return detail
|
|
|
|
cat = detail["category"]
|
|
safe_name = re.sub(r"[^a-zA-Z0-9_\-]", "_", detail["filename"]).lower()
|
|
pdf_path = output_dir / cat / f"{safe_name}.pdf"
|
|
pdf_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if pdf_path.exists():
|
|
logger.info(" Already exists: %s", pdf_path.name)
|
|
detail["local_path"] = str(pdf_path)
|
|
detail["sha256"] = hashlib.sha256(pdf_path.read_bytes()).hexdigest()
|
|
return detail
|
|
|
|
try:
|
|
with page.expect_download(timeout=60000) as download_info:
|
|
page.goto(detail["pdf_url"], timeout=30000)
|
|
download = download_info.value
|
|
download.save_as(str(pdf_path))
|
|
except Exception:
|
|
# Fallback: direct download via response
|
|
try:
|
|
response = page.request.get(detail["pdf_url"])
|
|
if response.ok:
|
|
pdf_path.write_bytes(response.body())
|
|
else:
|
|
logger.error(" Download failed: %s (HTTP %d)",
|
|
detail["filename"], response.status)
|
|
return detail
|
|
except Exception as e:
|
|
logger.error(" Download failed: %s — %s", detail["filename"], e)
|
|
return detail
|
|
|
|
size = pdf_path.stat().st_size
|
|
detail["local_path"] = str(pdf_path)
|
|
detail["sha256"] = hashlib.sha256(pdf_path.read_bytes()).hexdigest()
|
|
detail["size_bytes"] = size
|
|
logger.info(" Downloaded: %s (%.1f KB)", pdf_path.name, size / 1024)
|
|
return detail
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--category", choices=["trbs", "trgs", "asr"],
|
|
help="Only crawl one category")
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help="List PDFs without downloading")
|
|
parser.add_argument("--headless", action="store_true", default=True)
|
|
parser.add_argument("--no-headless", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
headless = not args.no_headless
|
|
categories = [args.category] if args.category else list(CATEGORIES.keys())
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
registry = []
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=headless)
|
|
context = browser.new_context(
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
page = context.new_page()
|
|
|
|
for cat in categories:
|
|
config = CATEGORIES[cat]
|
|
logger.info("\n=== %s ===", cat.upper())
|
|
|
|
# Step 1: Crawl index
|
|
details = crawl_index(page, cat, config)
|
|
|
|
# Step 2: Extract PDF URLs
|
|
for i, detail in enumerate(details):
|
|
logger.info("[%d/%d] %s", i + 1, len(details), detail["filename"])
|
|
extract_pdf_url(page, detail)
|
|
time.sleep(1) # Be polite
|
|
|
|
# Step 3: Download PDFs
|
|
if not args.dry_run:
|
|
for detail in details:
|
|
download_pdf(page, detail, OUTPUT_DIR)
|
|
time.sleep(0.5)
|
|
|
|
# Add metadata
|
|
for detail in details:
|
|
detail["source_type"] = config["source_type"]
|
|
detail["legal_basis"] = config["legal_basis"]
|
|
detail["license_rule"] = 1 # §5 UrhG, gemeinfrei
|
|
detail["jurisdiction"] = "DE"
|
|
|
|
registry.extend(details)
|
|
|
|
browser.close()
|
|
|
|
# Save registry
|
|
REGISTRY_FILE.write_text(json.dumps(registry, indent=2, ensure_ascii=False))
|
|
logger.info("\nRegistry saved: %s (%d entries)", REGISTRY_FILE, len(registry))
|
|
|
|
# Summary
|
|
total = len(registry)
|
|
with_pdf = sum(1 for r in registry if r.get("pdf_url"))
|
|
downloaded = sum(1 for r in registry if r.get("local_path"))
|
|
logger.info("Total: %d | PDF found: %d | Downloaded: %d", total, with_pdf, downloaded)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|