7d721a6787
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 40s
CI / test-python-voice (push) Successful in 36s
CI / test-bqas (push) Successful in 33s
Clean-Room derivation of 195 controls from BSI QUAIDAL (10 criteria + 15 building blocks + 30 measures + 140 metrics) for EU AI Act Art. 10 training-data quality compliance. - ingest_bsi_quaidal.py parses YAML frontmatter into a structural index (no protected prose stored on disk). - derive_quaidal_mcs.py rewrites each entry via local LLM (qwen3.5:35b-a3b) with a hard 4-gram plagiarism gate < 20%; achieved mean overlap 0.5%. - Migration 011 adds compliance.derived_controls table with full source provenance (framework, section, url, commit SHA, license note). - apply_quaidal_to_db.py UPSERTs YAML into DB. - Source repo (legal-sources/bsi-quaidal/) gitignored. Same pattern as IACE module DIN-reference handling: name the norm and section, never quote. Backed by BSI license clarification 2026-05: § 5 UrhG anwendbar, share:true im Frontmatter; Clean-Room derivation is the safe path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
243 lines
8.7 KiB
Python
243 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Parse BSI QUAIDAL Markdown catalog into a structural index.
|
|
|
|
Clean-Room principle: this script does NOT persist any QUAIDAL prose to disk.
|
|
It only extracts non-protectable structural facts (IDs, type, file paths,
|
|
cross-references to other QUAIDAL entries, references to external norms).
|
|
|
|
The derivation step (derive_quaidal_mcs.py) reads the index plus the original
|
|
.md files from the gitignored clone and asks the LLM to produce our own
|
|
wordings, never copying the BSI prose into our own controls/database.
|
|
|
|
Input: legal-sources/bsi-quaidal/0000_Markdown/**/*.md (gitignored clone)
|
|
Output: control-pipeline/data/quaidal/quaidal_index.json (structural only)
|
|
|
|
Usage:
|
|
python3 control-pipeline/scripts/ingest_bsi_quaidal.py
|
|
python3 control-pipeline/scripts/ingest_bsi_quaidal.py --check # validate only
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import asdict, dataclass, field
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
print("ERROR: PyYAML missing. Install with: pip install pyyaml", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal"
|
|
MARKDOWN_ROOT = SOURCE_ROOT / "0000_Markdown"
|
|
OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
|
|
OUTPUT_FILE = OUTPUT_DIR / "quaidal_index.json"
|
|
|
|
# Map folder name -> our internal kind. Sub-folders inside the Methoden tree
|
|
# (e.g. "QM-10_Dimension Reduction") are treated as method variants of their
|
|
# parent QM.
|
|
KIND_BY_PARENT_DIR = {
|
|
"0000_Qualitätskriterien": "criterion", # QKB → Master Control candidates
|
|
"0001_Qualitätsbausteine": "building_block", # QB → atomic controls
|
|
"0002_Maßnahmen": "measure", # M → mitigations
|
|
"0003_Qualitätsmetriken_methoden": "metric", # QM → runtime check / metric
|
|
"0002_Referenz-Matrizen": "matrix", # cross-walk matrix
|
|
"9998_CustomTemplates": "template",
|
|
}
|
|
|
|
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
|
|
ID_RE = re.compile(r"\b((?:QKB|QB|MA|QM)-\d+[a-zA-Z]?)", re.IGNORECASE)
|
|
|
|
|
|
@dataclass
|
|
class IndexEntry:
|
|
id: str # Canonical ID: QKB-01, QB-03, M-12, QM-07
|
|
kind: str # criterion / building_block / measure / metric / matrix / template
|
|
title_de: str
|
|
title_en: str
|
|
source_path: str # relative to SOURCE_ROOT
|
|
referenced_ids: list[str] = field(default_factory=list) # other QUAIDAL IDs linked in this file
|
|
external_refs: list[dict] = field(default_factory=list) # {framework, citation, ref_id}
|
|
tags: list[str] = field(default_factory=list)
|
|
share: bool | None = None
|
|
|
|
|
|
def parse_frontmatter(text: str) -> dict:
|
|
m = FRONTMATTER_RE.match(text)
|
|
if not m:
|
|
return {}
|
|
try:
|
|
return yaml.safe_load(m.group(1)) or {}
|
|
except yaml.YAMLError:
|
|
return {}
|
|
|
|
|
|
def canonical_id(raw_id: str | list | None, filename: str) -> str | None:
|
|
"""QUAIDAL files sometimes list multiple IDs or odd casing — normalise."""
|
|
candidates: list[str] = []
|
|
if isinstance(raw_id, list):
|
|
candidates.extend(str(x) for x in raw_id)
|
|
elif isinstance(raw_id, str):
|
|
candidates.append(raw_id)
|
|
# Fallback: derive from filename
|
|
candidates.append(filename)
|
|
for c in candidates:
|
|
m = ID_RE.search(c)
|
|
if m:
|
|
return m.group(1).upper().replace(" ", "-")
|
|
return None
|
|
|
|
|
|
def determine_kind(path: Path) -> str:
|
|
for parent in path.parents:
|
|
if parent.name in KIND_BY_PARENT_DIR:
|
|
return KIND_BY_PARENT_DIR[parent.name]
|
|
return "unknown"
|
|
|
|
|
|
def collect_referenced_ids(body: str, own_id: str) -> list[str]:
|
|
found = {m.group(1).upper() for m in ID_RE.finditer(body)}
|
|
found.discard(own_id)
|
|
return sorted(found)
|
|
|
|
|
|
REF_FRAMEWORKS = [
|
|
("AI Act", ["AI-Act", "AI Act", "Verordnung (EU) 2024/1689", "KI-VO"]),
|
|
("EU GDPR", ["DSGVO", "Verordnung (EU) 2016/679", "GDPR"]),
|
|
("ISO/IEC 25012", ["ISO/IEC 25012", "ISO 25012"]),
|
|
("ISO/IEC 25024", ["ISO/IEC 25024", "ISO 25024"]),
|
|
("ISO/IEC 23894", ["ISO/IEC 23894", "ISO 23894"]),
|
|
("ISO/IEC 42001", ["ISO/IEC 42001", "ISO 42001"]),
|
|
("NIST AI RMF", ["NIST AI RMF", "AI Risk Management Framework"]),
|
|
("BSI Grundschutz", ["IT-Grundschutz", "Grundschutz"]),
|
|
("BSI AIC4", ["AIC4", "AI Cloud Service Compliance Criteria"]),
|
|
]
|
|
|
|
|
|
def detect_external_refs(body: str) -> list[dict]:
|
|
refs: list[dict] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
# Section "Referenzen" tables — pick up first column ref-id and first
|
|
# textual hit of the framework. We do NOT store the BSI "Kurzbeschr."
|
|
# column to avoid copying their prose.
|
|
for line in body.splitlines():
|
|
for framework, patterns in REF_FRAMEWORKS:
|
|
for pat in patterns:
|
|
if pat.lower() in line.lower():
|
|
# Try to grab an article/section nearby (e.g. "Artikel 10")
|
|
art = re.search(r"(Artikel|Art\.?|Section|§)\s*([0-9]+[a-z]?)", line, re.IGNORECASE)
|
|
citation = f"{art.group(1)} {art.group(2)}" if art else None
|
|
key = (framework, citation or "")
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
refs.append({"framework": framework, "citation": citation})
|
|
break
|
|
return refs
|
|
|
|
|
|
def parse_file(path: Path) -> IndexEntry | None:
|
|
text = path.read_text(encoding="utf-8")
|
|
fm = parse_frontmatter(text)
|
|
body = text[text.find("---", 3) + 3 :] if text.startswith("---") else text
|
|
|
|
own_id = canonical_id(fm.get("ID"), path.stem)
|
|
if not own_id:
|
|
return None
|
|
|
|
title_de = str(fm.get("TitleGer") or fm.get("Title") or path.stem).strip()
|
|
title_en = str(fm.get("Title") or "").strip()
|
|
tags_raw = fm.get("tags") or []
|
|
if isinstance(tags_raw, str):
|
|
tags_raw = [tags_raw]
|
|
tags = [str(t).strip() for t in tags_raw if t]
|
|
|
|
share_val = fm.get("share")
|
|
share = bool(share_val) if share_val is not None else None
|
|
|
|
return IndexEntry(
|
|
id=own_id,
|
|
kind=determine_kind(path),
|
|
title_de=title_de,
|
|
title_en=title_en,
|
|
source_path=str(path.relative_to(SOURCE_ROOT)),
|
|
referenced_ids=collect_referenced_ids(body, own_id),
|
|
external_refs=detect_external_refs(body),
|
|
tags=tags,
|
|
share=share,
|
|
)
|
|
|
|
|
|
def get_commit_sha() -> str | None:
|
|
try:
|
|
out = subprocess.run(
|
|
["git", "-C", str(SOURCE_ROOT), "rev-parse", "HEAD"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return out.stdout.strip()
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--check", action="store_true", help="Parse + validate, do not write output")
|
|
args = ap.parse_args()
|
|
|
|
if not MARKDOWN_ROOT.exists():
|
|
print(f"ERROR: clone not found at {SOURCE_ROOT}", file=sys.stderr)
|
|
print("Run: git clone --depth=1 https://github.com/BSI-Bund/QUAIDAL.git legal-sources/bsi-quaidal", file=sys.stderr)
|
|
return 2
|
|
|
|
entries: list[IndexEntry] = []
|
|
skipped: list[Path] = []
|
|
for path in sorted(MARKDOWN_ROOT.rglob("*.md")):
|
|
entry = parse_file(path)
|
|
if entry is None:
|
|
skipped.append(path)
|
|
continue
|
|
entries.append(entry)
|
|
|
|
by_kind: dict[str, int] = {}
|
|
for e in entries:
|
|
by_kind[e.kind] = by_kind.get(e.kind, 0) + 1
|
|
|
|
print(f"Parsed {len(entries)} entries (skipped {len(skipped)} without ID):")
|
|
for kind, count in sorted(by_kind.items()):
|
|
print(f" {kind:18s} {count}")
|
|
|
|
if args.check:
|
|
return 0
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"source": "BSI QUAIDAL",
|
|
"source_url": "https://github.com/BSI-Bund/QUAIDAL",
|
|
"commit_sha": get_commit_sha(),
|
|
"license_note": (
|
|
"BSI-Veroeffentlichung. Repo enthaelt keine SPDX-Lizenzdatei. "
|
|
"Frontmatter share:true. Veroeffentlichung durch Bundesbehoerde, "
|
|
"§ 5 UrhG (amtliche Werke) anwendbar. BSI hat 05/2026 die Annahme "
|
|
"CC-BY-SA-4.0 in unserer Anfrage nicht widersprochen, aber auch "
|
|
"nicht aktiv bestaetigt. Wir derivieren Clean-Room (eigene "
|
|
"Formulierungen, nur Referenz auf BSI QUAIDAL Sektion)."
|
|
),
|
|
"entries": [asdict(e) for e in entries],
|
|
}
|
|
OUTPUT_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"\nWrote index: {OUTPUT_FILE.relative_to(REPO_ROOT)}")
|
|
print(f"Commit SHA: {payload['commit_sha']}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|