#!/usr/bin/env python3 """Parse BSI QUAIDAL Markdown catalog into a structural index. Clean-Room principle: this script does NOT persist any QUAIDAL prose to disk. It only extracts non-protectable structural facts (IDs, type, file paths, cross-references to other QUAIDAL entries, references to external norms). The derivation step (derive_quaidal_mcs.py) reads the index plus the original .md files from the gitignored clone and asks the LLM to produce our own wordings, never copying the BSI prose into our own controls/database. Input: legal-sources/bsi-quaidal/0000_Markdown/**/*.md (gitignored clone) Output: control-pipeline/data/quaidal/quaidal_index.json (structural only) Usage: python3 control-pipeline/scripts/ingest_bsi_quaidal.py python3 control-pipeline/scripts/ingest_bsi_quaidal.py --check # validate only """ from __future__ import annotations import argparse import json import re import subprocess import sys from dataclasses import asdict, dataclass, field from pathlib import Path try: import yaml except ImportError: print("ERROR: PyYAML missing. Install with: pip install pyyaml", file=sys.stderr) sys.exit(2) REPO_ROOT = Path(__file__).resolve().parents[2] SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal" MARKDOWN_ROOT = SOURCE_ROOT / "0000_Markdown" OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal" OUTPUT_FILE = OUTPUT_DIR / "quaidal_index.json" # Map folder name -> our internal kind. Sub-folders inside the Methoden tree # (e.g. "QM-10_Dimension Reduction") are treated as method variants of their # parent QM. KIND_BY_PARENT_DIR = { "0000_Qualitätskriterien": "criterion", # QKB → Master Control candidates "0001_Qualitätsbausteine": "building_block", # QB → atomic controls "0002_Maßnahmen": "measure", # M → mitigations "0003_Qualitätsmetriken_methoden": "metric", # QM → runtime check / metric "0002_Referenz-Matrizen": "matrix", # cross-walk matrix "9998_CustomTemplates": "template", } FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) ID_RE = re.compile(r"\b((?:QKB|QB|MA|QM)-\d+[a-zA-Z]?)", re.IGNORECASE) @dataclass class IndexEntry: id: str # Canonical ID: QKB-01, QB-03, M-12, QM-07 kind: str # criterion / building_block / measure / metric / matrix / template title_de: str title_en: str source_path: str # relative to SOURCE_ROOT referenced_ids: list[str] = field(default_factory=list) # other QUAIDAL IDs linked in this file external_refs: list[dict] = field(default_factory=list) # {framework, citation, ref_id} tags: list[str] = field(default_factory=list) share: bool | None = None def parse_frontmatter(text: str) -> dict: m = FRONTMATTER_RE.match(text) if not m: return {} try: return yaml.safe_load(m.group(1)) or {} except yaml.YAMLError: return {} def canonical_id(raw_id: str | list | None, filename: str) -> str | None: """QUAIDAL files sometimes list multiple IDs or odd casing — normalise.""" candidates: list[str] = [] if isinstance(raw_id, list): candidates.extend(str(x) for x in raw_id) elif isinstance(raw_id, str): candidates.append(raw_id) # Fallback: derive from filename candidates.append(filename) for c in candidates: m = ID_RE.search(c) if m: return m.group(1).upper().replace(" ", "-") return None def determine_kind(path: Path) -> str: for parent in path.parents: if parent.name in KIND_BY_PARENT_DIR: return KIND_BY_PARENT_DIR[parent.name] return "unknown" def collect_referenced_ids(body: str, own_id: str) -> list[str]: found = {m.group(1).upper() for m in ID_RE.finditer(body)} found.discard(own_id) return sorted(found) REF_FRAMEWORKS = [ ("AI Act", ["AI-Act", "AI Act", "Verordnung (EU) 2024/1689", "KI-VO"]), ("EU GDPR", ["DSGVO", "Verordnung (EU) 2016/679", "GDPR"]), ("ISO/IEC 25012", ["ISO/IEC 25012", "ISO 25012"]), ("ISO/IEC 25024", ["ISO/IEC 25024", "ISO 25024"]), ("ISO/IEC 23894", ["ISO/IEC 23894", "ISO 23894"]), ("ISO/IEC 42001", ["ISO/IEC 42001", "ISO 42001"]), ("NIST AI RMF", ["NIST AI RMF", "AI Risk Management Framework"]), ("BSI Grundschutz", ["IT-Grundschutz", "Grundschutz"]), ("BSI AIC4", ["AIC4", "AI Cloud Service Compliance Criteria"]), ] def detect_external_refs(body: str) -> list[dict]: refs: list[dict] = [] seen: set[tuple[str, str]] = set() # Section "Referenzen" tables — pick up first column ref-id and first # textual hit of the framework. We do NOT store the BSI "Kurzbeschr." # column to avoid copying their prose. for line in body.splitlines(): for framework, patterns in REF_FRAMEWORKS: for pat in patterns: if pat.lower() in line.lower(): # Try to grab an article/section nearby (e.g. "Artikel 10") art = re.search(r"(Artikel|Art\.?|Section|§)\s*([0-9]+[a-z]?)", line, re.IGNORECASE) citation = f"{art.group(1)} {art.group(2)}" if art else None key = (framework, citation or "") if key in seen: continue seen.add(key) refs.append({"framework": framework, "citation": citation}) break return refs def parse_file(path: Path) -> IndexEntry | None: text = path.read_text(encoding="utf-8") fm = parse_frontmatter(text) body = text[text.find("---", 3) + 3 :] if text.startswith("---") else text own_id = canonical_id(fm.get("ID"), path.stem) if not own_id: return None title_de = str(fm.get("TitleGer") or fm.get("Title") or path.stem).strip() title_en = str(fm.get("Title") or "").strip() tags_raw = fm.get("tags") or [] if isinstance(tags_raw, str): tags_raw = [tags_raw] tags = [str(t).strip() for t in tags_raw if t] share_val = fm.get("share") share = bool(share_val) if share_val is not None else None return IndexEntry( id=own_id, kind=determine_kind(path), title_de=title_de, title_en=title_en, source_path=str(path.relative_to(SOURCE_ROOT)), referenced_ids=collect_referenced_ids(body, own_id), external_refs=detect_external_refs(body), tags=tags, share=share, ) def get_commit_sha() -> str | None: try: out = subprocess.run( ["git", "-C", str(SOURCE_ROOT), "rev-parse", "HEAD"], capture_output=True, text=True, check=True, ) return out.stdout.strip() except (subprocess.CalledProcessError, FileNotFoundError): return None def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--check", action="store_true", help="Parse + validate, do not write output") args = ap.parse_args() if not MARKDOWN_ROOT.exists(): print(f"ERROR: clone not found at {SOURCE_ROOT}", file=sys.stderr) print("Run: git clone --depth=1 https://github.com/BSI-Bund/QUAIDAL.git legal-sources/bsi-quaidal", file=sys.stderr) return 2 entries: list[IndexEntry] = [] skipped: list[Path] = [] for path in sorted(MARKDOWN_ROOT.rglob("*.md")): entry = parse_file(path) if entry is None: skipped.append(path) continue entries.append(entry) by_kind: dict[str, int] = {} for e in entries: by_kind[e.kind] = by_kind.get(e.kind, 0) + 1 print(f"Parsed {len(entries)} entries (skipped {len(skipped)} without ID):") for kind, count in sorted(by_kind.items()): print(f" {kind:18s} {count}") if args.check: return 0 OUTPUT_DIR.mkdir(parents=True, exist_ok=True) payload = { "source": "BSI QUAIDAL", "source_url": "https://github.com/BSI-Bund/QUAIDAL", "commit_sha": get_commit_sha(), "license_note": ( "BSI-Veroeffentlichung. Repo enthaelt keine SPDX-Lizenzdatei. " "Frontmatter share:true. Veroeffentlichung durch Bundesbehoerde, " "§ 5 UrhG (amtliche Werke) anwendbar. BSI hat 05/2026 die Annahme " "CC-BY-SA-4.0 in unserer Anfrage nicht widersprochen, aber auch " "nicht aktiv bestaetigt. Wir derivieren Clean-Room (eigene " "Formulierungen, nur Referenz auf BSI QUAIDAL Sektion)." ), "entries": [asdict(e) for e in entries], } OUTPUT_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print(f"\nWrote index: {OUTPUT_FILE.relative_to(REPO_ROOT)}") print(f"Commit SHA: {payload['commit_sha']}") return 0 if __name__ == "__main__": sys.exit(main())