Files
breakpilot-core/control-pipeline/scripts/ingest_bsi_quaidal.py
T
Benjamin Admin 7d721a6787
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 40s
CI / test-python-voice (push) Successful in 36s
CI / test-bqas (push) Successful in 33s
feat(control-pipeline): BSI QUAIDAL Clean-Room ingestion (AI Act Art. 10)
Clean-Room derivation of 195 controls from BSI QUAIDAL (10 criteria + 15
building blocks + 30 measures + 140 metrics) for EU AI Act Art. 10
training-data quality compliance.

- ingest_bsi_quaidal.py parses YAML frontmatter into a structural index
  (no protected prose stored on disk).
- derive_quaidal_mcs.py rewrites each entry via local LLM (qwen3.5:35b-a3b)
  with a hard 4-gram plagiarism gate < 20%; achieved mean overlap 0.5%.
- Migration 011 adds compliance.derived_controls table with full source
  provenance (framework, section, url, commit SHA, license note).
- apply_quaidal_to_db.py UPSERTs YAML into DB.
- Source repo (legal-sources/bsi-quaidal/) gitignored.

Same pattern as IACE module DIN-reference handling: name the norm and
section, never quote.

Backed by BSI license clarification 2026-05: § 5 UrhG anwendbar,
share:true im Frontmatter; Clean-Room derivation is the safe path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 13:02:49 +02:00

243 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""Parse BSI QUAIDAL Markdown catalog into a structural index.
Clean-Room principle: this script does NOT persist any QUAIDAL prose to disk.
It only extracts non-protectable structural facts (IDs, type, file paths,
cross-references to other QUAIDAL entries, references to external norms).
The derivation step (derive_quaidal_mcs.py) reads the index plus the original
.md files from the gitignored clone and asks the LLM to produce our own
wordings, never copying the BSI prose into our own controls/database.
Input: legal-sources/bsi-quaidal/0000_Markdown/**/*.md (gitignored clone)
Output: control-pipeline/data/quaidal/quaidal_index.json (structural only)
Usage:
python3 control-pipeline/scripts/ingest_bsi_quaidal.py
python3 control-pipeline/scripts/ingest_bsi_quaidal.py --check # validate only
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import asdict, dataclass, field
from pathlib import Path
try:
import yaml
except ImportError:
print("ERROR: PyYAML missing. Install with: pip install pyyaml", file=sys.stderr)
sys.exit(2)
REPO_ROOT = Path(__file__).resolve().parents[2]
SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal"
MARKDOWN_ROOT = SOURCE_ROOT / "0000_Markdown"
OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
OUTPUT_FILE = OUTPUT_DIR / "quaidal_index.json"
# Map folder name -> our internal kind. Sub-folders inside the Methoden tree
# (e.g. "QM-10_Dimension Reduction") are treated as method variants of their
# parent QM.
KIND_BY_PARENT_DIR = {
"0000_Qualitätskriterien": "criterion", # QKB → Master Control candidates
"0001_Qualitätsbausteine": "building_block", # QB → atomic controls
"0002_Maßnahmen": "measure", # M → mitigations
"0003_Qualitätsmetriken_methoden": "metric", # QM → runtime check / metric
"0002_Referenz-Matrizen": "matrix", # cross-walk matrix
"9998_CustomTemplates": "template",
}
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
ID_RE = re.compile(r"\b((?:QKB|QB|MA|QM)-\d+[a-zA-Z]?)", re.IGNORECASE)
@dataclass
class IndexEntry:
id: str # Canonical ID: QKB-01, QB-03, M-12, QM-07
kind: str # criterion / building_block / measure / metric / matrix / template
title_de: str
title_en: str
source_path: str # relative to SOURCE_ROOT
referenced_ids: list[str] = field(default_factory=list) # other QUAIDAL IDs linked in this file
external_refs: list[dict] = field(default_factory=list) # {framework, citation, ref_id}
tags: list[str] = field(default_factory=list)
share: bool | None = None
def parse_frontmatter(text: str) -> dict:
m = FRONTMATTER_RE.match(text)
if not m:
return {}
try:
return yaml.safe_load(m.group(1)) or {}
except yaml.YAMLError:
return {}
def canonical_id(raw_id: str | list | None, filename: str) -> str | None:
"""QUAIDAL files sometimes list multiple IDs or odd casing — normalise."""
candidates: list[str] = []
if isinstance(raw_id, list):
candidates.extend(str(x) for x in raw_id)
elif isinstance(raw_id, str):
candidates.append(raw_id)
# Fallback: derive from filename
candidates.append(filename)
for c in candidates:
m = ID_RE.search(c)
if m:
return m.group(1).upper().replace(" ", "-")
return None
def determine_kind(path: Path) -> str:
for parent in path.parents:
if parent.name in KIND_BY_PARENT_DIR:
return KIND_BY_PARENT_DIR[parent.name]
return "unknown"
def collect_referenced_ids(body: str, own_id: str) -> list[str]:
found = {m.group(1).upper() for m in ID_RE.finditer(body)}
found.discard(own_id)
return sorted(found)
REF_FRAMEWORKS = [
("AI Act", ["AI-Act", "AI Act", "Verordnung (EU) 2024/1689", "KI-VO"]),
("EU GDPR", ["DSGVO", "Verordnung (EU) 2016/679", "GDPR"]),
("ISO/IEC 25012", ["ISO/IEC 25012", "ISO 25012"]),
("ISO/IEC 25024", ["ISO/IEC 25024", "ISO 25024"]),
("ISO/IEC 23894", ["ISO/IEC 23894", "ISO 23894"]),
("ISO/IEC 42001", ["ISO/IEC 42001", "ISO 42001"]),
("NIST AI RMF", ["NIST AI RMF", "AI Risk Management Framework"]),
("BSI Grundschutz", ["IT-Grundschutz", "Grundschutz"]),
("BSI AIC4", ["AIC4", "AI Cloud Service Compliance Criteria"]),
]
def detect_external_refs(body: str) -> list[dict]:
refs: list[dict] = []
seen: set[tuple[str, str]] = set()
# Section "Referenzen" tables — pick up first column ref-id and first
# textual hit of the framework. We do NOT store the BSI "Kurzbeschr."
# column to avoid copying their prose.
for line in body.splitlines():
for framework, patterns in REF_FRAMEWORKS:
for pat in patterns:
if pat.lower() in line.lower():
# Try to grab an article/section nearby (e.g. "Artikel 10")
art = re.search(r"(Artikel|Art\.?|Section|§)\s*([0-9]+[a-z]?)", line, re.IGNORECASE)
citation = f"{art.group(1)} {art.group(2)}" if art else None
key = (framework, citation or "")
if key in seen:
continue
seen.add(key)
refs.append({"framework": framework, "citation": citation})
break
return refs
def parse_file(path: Path) -> IndexEntry | None:
text = path.read_text(encoding="utf-8")
fm = parse_frontmatter(text)
body = text[text.find("---", 3) + 3 :] if text.startswith("---") else text
own_id = canonical_id(fm.get("ID"), path.stem)
if not own_id:
return None
title_de = str(fm.get("TitleGer") or fm.get("Title") or path.stem).strip()
title_en = str(fm.get("Title") or "").strip()
tags_raw = fm.get("tags") or []
if isinstance(tags_raw, str):
tags_raw = [tags_raw]
tags = [str(t).strip() for t in tags_raw if t]
share_val = fm.get("share")
share = bool(share_val) if share_val is not None else None
return IndexEntry(
id=own_id,
kind=determine_kind(path),
title_de=title_de,
title_en=title_en,
source_path=str(path.relative_to(SOURCE_ROOT)),
referenced_ids=collect_referenced_ids(body, own_id),
external_refs=detect_external_refs(body),
tags=tags,
share=share,
)
def get_commit_sha() -> str | None:
try:
out = subprocess.run(
["git", "-C", str(SOURCE_ROOT), "rev-parse", "HEAD"],
capture_output=True,
text=True,
check=True,
)
return out.stdout.strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--check", action="store_true", help="Parse + validate, do not write output")
args = ap.parse_args()
if not MARKDOWN_ROOT.exists():
print(f"ERROR: clone not found at {SOURCE_ROOT}", file=sys.stderr)
print("Run: git clone --depth=1 https://github.com/BSI-Bund/QUAIDAL.git legal-sources/bsi-quaidal", file=sys.stderr)
return 2
entries: list[IndexEntry] = []
skipped: list[Path] = []
for path in sorted(MARKDOWN_ROOT.rglob("*.md")):
entry = parse_file(path)
if entry is None:
skipped.append(path)
continue
entries.append(entry)
by_kind: dict[str, int] = {}
for e in entries:
by_kind[e.kind] = by_kind.get(e.kind, 0) + 1
print(f"Parsed {len(entries)} entries (skipped {len(skipped)} without ID):")
for kind, count in sorted(by_kind.items()):
print(f" {kind:18s} {count}")
if args.check:
return 0
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
payload = {
"source": "BSI QUAIDAL",
"source_url": "https://github.com/BSI-Bund/QUAIDAL",
"commit_sha": get_commit_sha(),
"license_note": (
"BSI-Veroeffentlichung. Repo enthaelt keine SPDX-Lizenzdatei. "
"Frontmatter share:true. Veroeffentlichung durch Bundesbehoerde, "
"§ 5 UrhG (amtliche Werke) anwendbar. BSI hat 05/2026 die Annahme "
"CC-BY-SA-4.0 in unserer Anfrage nicht widersprochen, aber auch "
"nicht aktiv bestaetigt. Wir derivieren Clean-Room (eigene "
"Formulierungen, nur Referenz auf BSI QUAIDAL Sektion)."
),
"entries": [asdict(e) for e in entries],
}
OUTPUT_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nWrote index: {OUTPUT_FILE.relative_to(REPO_ROOT)}")
print(f"Commit SHA: {payload['commit_sha']}")
return 0
if __name__ == "__main__":
sys.exit(main())