#!/usr/bin/env python3 """ BAuA Regulatory Crawler — TRBS, TRGS, ASR Crawls the BAuA website using Playwright (headless browser), extracts PDF links, downloads all documents. Usage: python3 crawl_baua.py # download all python3 crawl_baua.py --category trbs # only TRBS python3 crawl_baua.py --dry-run # list PDFs without downloading """ import argparse import hashlib import json import logging import re import time from pathlib import Path from urllib.parse import urljoin from playwright.sync_api import sync_playwright logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("baua-crawler") BASE_URL = "https://www.baua.de" OUTPUT_DIR = Path(__file__).parent / "pdfs" REGISTRY_FILE = Path(__file__).parent / "source_registry.json" CATEGORIES = { "trbs": { "url": f"{BASE_URL}/DE/Angebote/Regelwerk/TRBS/TRBS.html", "name": "Technische Regeln für Betriebssicherheit", "source_type": "technical_rule", "legal_basis": "BetrSichV", }, "trgs": { "url": f"{BASE_URL}/DE/Angebote/Regelwerk/TRGS/TRGS.html", "name": "Technische Regeln für Gefahrstoffe", "source_type": "technical_rule", "legal_basis": "GefStoffV", }, "asr": { "url": f"{BASE_URL}/DE/Angebote/Regelwerk/ASR/ASR.html", "name": "Arbeitsstättenregeln", "source_type": "technical_rule", "legal_basis": "ArbStättV", }, } def crawl_index(page, category: str, config: dict) -> list[dict]: """Crawl index page and extract detail page links.""" logger.info("Crawling %s index: %s", category.upper(), config["url"]) page.goto(config["url"], wait_until="networkidle", timeout=30000) time.sleep(3) # Wait for BunnyShield # Extract all links to detail pages links = page.query_selector_all("a[href]") detail_urls = [] seen = set() for link in links: href = link.get_attribute("href") or "" text = (link.inner_text() or "").strip() # Match pattern: /DE/Angebote/Regelwerk/TRBS/TRBS-1111 (no .html!) # ASR uses ASR-A1-3 (not ASR-ASR-A1-3) base_pattern = f"/DE/Angebote/Regelwerk/{category.upper()}/" is_detail = (base_pattern in href and "#" not in href and "?" not in href and href != base_pattern.rstrip("/") and href.split("/")[-1] != category.upper()) if is_detail and href not in seen: full_url = urljoin(BASE_URL, href) seen.add(href) # Extract regulation number from URL filename = href.split("/")[-1] detail_urls.append({ "detail_url": full_url, "title": text[:200] if text else filename, "filename": filename, "category": category, }) logger.info("Found %d detail pages for %s", len(detail_urls), category.upper()) return detail_urls def extract_pdf_url(page, detail: dict) -> dict: """Visit detail page and extract PDF download link.""" try: page.goto(detail["detail_url"], wait_until="networkidle", timeout=30000) time.sleep(2) # Strategy 1: Direct PDF link pdf_links = page.query_selector_all('a[href$=".pdf"]') for link in pdf_links: href = link.get_attribute("href") or "" if href: detail["pdf_url"] = urljoin(BASE_URL, href) return detail # Strategy 2: Download button with data attribute download_btns = page.query_selector_all("[data-download-url]") for btn in download_btns: url = btn.get_attribute("data-download-url") or "" if url and ".pdf" in url: detail["pdf_url"] = urljoin(BASE_URL, url) return detail # Strategy 3: Links containing "pdf" or "download" all_links = page.query_selector_all("a[href]") for link in all_links: href = link.get_attribute("href") or "" text = (link.inner_text() or "").lower() if (".pdf" in href or "download" in text) and href: detail["pdf_url"] = urljoin(BASE_URL, href) return detail # Strategy 4: Check for blob/dynamic download download_links = page.query_selector_all( 'a[href*="blob"], a[href*="download"], a[href*="__blob"]' ) for link in download_links: href = link.get_attribute("href") or "" if href: detail["pdf_url"] = urljoin(BASE_URL, href) return detail logger.warning("No PDF found for %s", detail["filename"]) detail["pdf_url"] = None return detail except Exception as e: logger.error("Error on %s: %s", detail["detail_url"], e) detail["pdf_url"] = None return detail def download_pdf(page, detail: dict, output_dir: Path) -> dict: """Download PDF and compute hash.""" if not detail.get("pdf_url"): return detail cat = detail["category"] safe_name = re.sub(r"[^a-zA-Z0-9_\-]", "_", detail["filename"]).lower() pdf_path = output_dir / cat / f"{safe_name}.pdf" pdf_path.parent.mkdir(parents=True, exist_ok=True) if pdf_path.exists(): logger.info(" Already exists: %s", pdf_path.name) detail["local_path"] = str(pdf_path) detail["sha256"] = hashlib.sha256(pdf_path.read_bytes()).hexdigest() return detail try: with page.expect_download(timeout=60000) as download_info: page.goto(detail["pdf_url"], timeout=30000) download = download_info.value download.save_as(str(pdf_path)) except Exception: # Fallback: direct download via response try: response = page.request.get(detail["pdf_url"]) if response.ok: pdf_path.write_bytes(response.body()) else: logger.error(" Download failed: %s (HTTP %d)", detail["filename"], response.status) return detail except Exception as e: logger.error(" Download failed: %s — %s", detail["filename"], e) return detail size = pdf_path.stat().st_size detail["local_path"] = str(pdf_path) detail["sha256"] = hashlib.sha256(pdf_path.read_bytes()).hexdigest() detail["size_bytes"] = size logger.info(" Downloaded: %s (%.1f KB)", pdf_path.name, size / 1024) return detail def main(): parser = argparse.ArgumentParser() parser.add_argument("--category", choices=["trbs", "trgs", "asr"], help="Only crawl one category") parser.add_argument("--dry-run", action="store_true", help="List PDFs without downloading") parser.add_argument("--headless", action="store_true", default=True) parser.add_argument("--no-headless", action="store_true") args = parser.parse_args() headless = not args.no_headless categories = [args.category] if args.category else list(CATEGORIES.keys()) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) registry = [] with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) page = context.new_page() for cat in categories: config = CATEGORIES[cat] logger.info("\n=== %s ===", cat.upper()) # Step 1: Crawl index details = crawl_index(page, cat, config) # Step 2: Extract PDF URLs for i, detail in enumerate(details): logger.info("[%d/%d] %s", i + 1, len(details), detail["filename"]) extract_pdf_url(page, detail) time.sleep(1) # Be polite # Step 3: Download PDFs if not args.dry_run: for detail in details: download_pdf(page, detail, OUTPUT_DIR) time.sleep(0.5) # Add metadata for detail in details: detail["source_type"] = config["source_type"] detail["legal_basis"] = config["legal_basis"] detail["license_rule"] = 1 # §5 UrhG, gemeinfrei detail["jurisdiction"] = "DE" registry.extend(details) browser.close() # Save registry REGISTRY_FILE.write_text(json.dumps(registry, indent=2, ensure_ascii=False)) logger.info("\nRegistry saved: %s (%d entries)", REGISTRY_FILE, len(registry)) # Summary total = len(registry) with_pdf = sum(1 for r in registry if r.get("pdf_url")) downloaded = sum(1 for r in registry if r.get("local_path")) logger.info("Total: %d | PDF found: %d | Downloaded: %d", total, with_pdf, downloaded) if __name__ == "__main__": main()