feat(legal-sources): add OSHA machinery safety standards + international norms mapping
OSHA 29 CFR 1910 Subpart O (1910.211-1910.219) — complete machine guarding requirements. US federal law, public domain. International norms mapping table: China GB/T, Korea KS, India BIS equivalents to ISO/EN standards. Unfortunately all countries protect ISO copyright even for identical national adoptions (IDT). Only OSHA provides truly free machinery safety content. EU Excel harmonised standards list included for reference. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,256 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
BAuA Regulatory Crawler — TRBS, TRGS, ASR
|
||||
|
||||
Crawls the BAuA website using Playwright (headless browser),
|
||||
extracts PDF links, downloads all documents.
|
||||
|
||||
Usage:
|
||||
python3 crawl_baua.py # download all
|
||||
python3 crawl_baua.py --category trbs # only TRBS
|
||||
python3 crawl_baua.py --dry-run # list PDFs without downloading
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
|
||||
)
|
||||
logger = logging.getLogger("baua-crawler")
|
||||
|
||||
BASE_URL = "https://www.baua.de"
|
||||
OUTPUT_DIR = Path(__file__).parent / "pdfs"
|
||||
REGISTRY_FILE = Path(__file__).parent / "source_registry.json"
|
||||
|
||||
CATEGORIES = {
|
||||
"trbs": {
|
||||
"url": f"{BASE_URL}/DE/Angebote/Regelwerk/TRBS/TRBS.html",
|
||||
"name": "Technische Regeln für Betriebssicherheit",
|
||||
"source_type": "technical_rule",
|
||||
"legal_basis": "BetrSichV",
|
||||
},
|
||||
"trgs": {
|
||||
"url": f"{BASE_URL}/DE/Angebote/Regelwerk/TRGS/TRGS.html",
|
||||
"name": "Technische Regeln für Gefahrstoffe",
|
||||
"source_type": "technical_rule",
|
||||
"legal_basis": "GefStoffV",
|
||||
},
|
||||
"asr": {
|
||||
"url": f"{BASE_URL}/DE/Angebote/Regelwerk/ASR/ASR.html",
|
||||
"name": "Arbeitsstättenregeln",
|
||||
"source_type": "technical_rule",
|
||||
"legal_basis": "ArbStättV",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def crawl_index(page, category: str, config: dict) -> list[dict]:
|
||||
"""Crawl index page and extract detail page links."""
|
||||
logger.info("Crawling %s index: %s", category.upper(), config["url"])
|
||||
page.goto(config["url"], wait_until="networkidle", timeout=30000)
|
||||
time.sleep(3) # Wait for BunnyShield
|
||||
|
||||
# Extract all links to detail pages
|
||||
links = page.query_selector_all("a[href]")
|
||||
detail_urls = []
|
||||
seen = set()
|
||||
|
||||
for link in links:
|
||||
href = link.get_attribute("href") or ""
|
||||
text = (link.inner_text() or "").strip()
|
||||
|
||||
# Match pattern: /DE/Angebote/Regelwerk/TRBS/TRBS-1111 (no .html!)
|
||||
# ASR uses ASR-A1-3 (not ASR-ASR-A1-3)
|
||||
base_pattern = f"/DE/Angebote/Regelwerk/{category.upper()}/"
|
||||
is_detail = (base_pattern in href
|
||||
and "#" not in href and "?" not in href
|
||||
and href != base_pattern.rstrip("/")
|
||||
and href.split("/")[-1] != category.upper())
|
||||
if is_detail and href not in seen:
|
||||
full_url = urljoin(BASE_URL, href)
|
||||
seen.add(href)
|
||||
|
||||
# Extract regulation number from URL
|
||||
filename = href.split("/")[-1]
|
||||
detail_urls.append({
|
||||
"detail_url": full_url,
|
||||
"title": text[:200] if text else filename,
|
||||
"filename": filename,
|
||||
"category": category,
|
||||
})
|
||||
|
||||
logger.info("Found %d detail pages for %s", len(detail_urls), category.upper())
|
||||
return detail_urls
|
||||
|
||||
|
||||
def extract_pdf_url(page, detail: dict) -> dict:
|
||||
"""Visit detail page and extract PDF download link."""
|
||||
try:
|
||||
page.goto(detail["detail_url"], wait_until="networkidle", timeout=30000)
|
||||
time.sleep(2)
|
||||
|
||||
# Strategy 1: Direct PDF link
|
||||
pdf_links = page.query_selector_all('a[href$=".pdf"]')
|
||||
for link in pdf_links:
|
||||
href = link.get_attribute("href") or ""
|
||||
if href:
|
||||
detail["pdf_url"] = urljoin(BASE_URL, href)
|
||||
return detail
|
||||
|
||||
# Strategy 2: Download button with data attribute
|
||||
download_btns = page.query_selector_all("[data-download-url]")
|
||||
for btn in download_btns:
|
||||
url = btn.get_attribute("data-download-url") or ""
|
||||
if url and ".pdf" in url:
|
||||
detail["pdf_url"] = urljoin(BASE_URL, url)
|
||||
return detail
|
||||
|
||||
# Strategy 3: Links containing "pdf" or "download"
|
||||
all_links = page.query_selector_all("a[href]")
|
||||
for link in all_links:
|
||||
href = link.get_attribute("href") or ""
|
||||
text = (link.inner_text() or "").lower()
|
||||
if (".pdf" in href or "download" in text) and href:
|
||||
detail["pdf_url"] = urljoin(BASE_URL, href)
|
||||
return detail
|
||||
|
||||
# Strategy 4: Check for blob/dynamic download
|
||||
download_links = page.query_selector_all(
|
||||
'a[href*="blob"], a[href*="download"], a[href*="__blob"]'
|
||||
)
|
||||
for link in download_links:
|
||||
href = link.get_attribute("href") or ""
|
||||
if href:
|
||||
detail["pdf_url"] = urljoin(BASE_URL, href)
|
||||
return detail
|
||||
|
||||
logger.warning("No PDF found for %s", detail["filename"])
|
||||
detail["pdf_url"] = None
|
||||
return detail
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error on %s: %s", detail["detail_url"], e)
|
||||
detail["pdf_url"] = None
|
||||
return detail
|
||||
|
||||
|
||||
def download_pdf(page, detail: dict, output_dir: Path) -> dict:
|
||||
"""Download PDF and compute hash."""
|
||||
if not detail.get("pdf_url"):
|
||||
return detail
|
||||
|
||||
cat = detail["category"]
|
||||
safe_name = re.sub(r"[^a-zA-Z0-9_\-]", "_", detail["filename"]).lower()
|
||||
pdf_path = output_dir / cat / f"{safe_name}.pdf"
|
||||
pdf_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if pdf_path.exists():
|
||||
logger.info(" Already exists: %s", pdf_path.name)
|
||||
detail["local_path"] = str(pdf_path)
|
||||
detail["sha256"] = hashlib.sha256(pdf_path.read_bytes()).hexdigest()
|
||||
return detail
|
||||
|
||||
try:
|
||||
with page.expect_download(timeout=60000) as download_info:
|
||||
page.goto(detail["pdf_url"], timeout=30000)
|
||||
download = download_info.value
|
||||
download.save_as(str(pdf_path))
|
||||
except Exception:
|
||||
# Fallback: direct download via response
|
||||
try:
|
||||
response = page.request.get(detail["pdf_url"])
|
||||
if response.ok:
|
||||
pdf_path.write_bytes(response.body())
|
||||
else:
|
||||
logger.error(" Download failed: %s (HTTP %d)",
|
||||
detail["filename"], response.status)
|
||||
return detail
|
||||
except Exception as e:
|
||||
logger.error(" Download failed: %s — %s", detail["filename"], e)
|
||||
return detail
|
||||
|
||||
size = pdf_path.stat().st_size
|
||||
detail["local_path"] = str(pdf_path)
|
||||
detail["sha256"] = hashlib.sha256(pdf_path.read_bytes()).hexdigest()
|
||||
detail["size_bytes"] = size
|
||||
logger.info(" Downloaded: %s (%.1f KB)", pdf_path.name, size / 1024)
|
||||
return detail
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--category", choices=["trbs", "trgs", "asr"],
|
||||
help="Only crawl one category")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="List PDFs without downloading")
|
||||
parser.add_argument("--headless", action="store_true", default=True)
|
||||
parser.add_argument("--no-headless", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
headless = not args.no_headless
|
||||
categories = [args.category] if args.category else list(CATEGORIES.keys())
|
||||
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
registry = []
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=headless)
|
||||
context = browser.new_context(
|
||||
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
"Chrome/120.0.0.0 Safari/537.36"
|
||||
)
|
||||
page = context.new_page()
|
||||
|
||||
for cat in categories:
|
||||
config = CATEGORIES[cat]
|
||||
logger.info("\n=== %s ===", cat.upper())
|
||||
|
||||
# Step 1: Crawl index
|
||||
details = crawl_index(page, cat, config)
|
||||
|
||||
# Step 2: Extract PDF URLs
|
||||
for i, detail in enumerate(details):
|
||||
logger.info("[%d/%d] %s", i + 1, len(details), detail["filename"])
|
||||
extract_pdf_url(page, detail)
|
||||
time.sleep(1) # Be polite
|
||||
|
||||
# Step 3: Download PDFs
|
||||
if not args.dry_run:
|
||||
for detail in details:
|
||||
download_pdf(page, detail, OUTPUT_DIR)
|
||||
time.sleep(0.5)
|
||||
|
||||
# Add metadata
|
||||
for detail in details:
|
||||
detail["source_type"] = config["source_type"]
|
||||
detail["legal_basis"] = config["legal_basis"]
|
||||
detail["license_rule"] = 1 # §5 UrhG, gemeinfrei
|
||||
detail["jurisdiction"] = "DE"
|
||||
|
||||
registry.extend(details)
|
||||
|
||||
browser.close()
|
||||
|
||||
# Save registry
|
||||
REGISTRY_FILE.write_text(json.dumps(registry, indent=2, ensure_ascii=False))
|
||||
logger.info("\nRegistry saved: %s (%d entries)", REGISTRY_FILE, len(registry))
|
||||
|
||||
# Summary
|
||||
total = len(registry)
|
||||
with_pdf = sum(1 for r in registry if r.get("pdf_url"))
|
||||
downloaded = sum(1 for r in registry if r.get("local_path"))
|
||||
logger.info("Total: %d | PDF found: %d | Downloaded: %d", total, with_pdf, downloaded)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user