feat(control-pipeline): BSI QUAIDAL Clean-Room ingestion (AI Act Art. 10)
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 40s
CI / test-python-voice (push) Successful in 36s
CI / test-bqas (push) Successful in 33s

Clean-Room derivation of 195 controls from BSI QUAIDAL (10 criteria + 15
building blocks + 30 measures + 140 metrics) for EU AI Act Art. 10
training-data quality compliance.

- ingest_bsi_quaidal.py parses YAML frontmatter into a structural index
  (no protected prose stored on disk).
- derive_quaidal_mcs.py rewrites each entry via local LLM (qwen3.5:35b-a3b)
  with a hard 4-gram plagiarism gate < 20%; achieved mean overlap 0.5%.
- Migration 011 adds compliance.derived_controls table with full source
  provenance (framework, section, url, commit SHA, license note).
- apply_quaidal_to_db.py UPSERTs YAML into DB.
- Source repo (legal-sources/bsi-quaidal/) gitignored.

Same pattern as IACE module DIN-reference handling: name the norm and
section, never quote.

Backed by BSI license clarification 2026-05: § 5 UrhG anwendbar,
share:true im Frontmatter; Clean-Room derivation is the safe path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-19 13:02:49 +02:00
parent 9a1ad87acd
commit 7d721a6787
10 changed files with 8376 additions and 0 deletions
@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""Upsert derived QUAIDAL controls from YAML into compliance.derived_controls.
Reads:
control-pipeline/data/quaidal/master_controls.yaml
control-pipeline/data/quaidal/atomic_controls.yaml
control-pipeline/data/quaidal/mitigations.yaml
control-pipeline/data/quaidal/metrics.yaml
Writes: compliance.derived_controls (idempotent UPSERT by derived_id)
Usage:
# Mac Mini direct:
python3 control-pipeline/scripts/apply_quaidal_to_db.py
# Via SSH (locally, against macmini DB):
DB_HOST=macmini python3 control-pipeline/scripts/apply_quaidal_to_db.py
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
try:
import psycopg
import yaml
except ImportError as e:
print(f"ERROR: missing dependency {e.name}. Install with: pip install psycopg[binary] pyyaml", file=sys.stderr)
sys.exit(2)
REPO_ROOT = Path(__file__).resolve().parents[2]
DATA_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
KIND_FILES = {
"criterion": "master_controls.yaml",
"building_block": "atomic_controls.yaml",
"measure": "mitigations.yaml",
"metric": "metrics.yaml",
}
UPSERT_SQL = """
INSERT INTO compliance.derived_controls (
derived_id, kind, canonical_name, description, regulation_anchor,
related_quaidal_ids, external_refs,
source_framework, source_section, source_url, source_commit_sha,
source_title_original, source_license_note,
plagiarism_score_at_generation, generated_by_model, yaml_path
) VALUES (
%(derived_id)s, %(kind)s, %(canonical_name)s, %(description)s, %(regulation_anchor)s,
%(related_quaidal_ids)s::jsonb, %(external_refs)s::jsonb,
%(source_framework)s, %(source_section)s, %(source_url)s, %(source_commit_sha)s,
%(source_title_original)s, %(source_license_note)s,
%(plagiarism_score)s, %(generated_by_model)s, %(yaml_path)s
)
ON CONFLICT (derived_id) DO UPDATE SET
kind = EXCLUDED.kind,
canonical_name = EXCLUDED.canonical_name,
description = EXCLUDED.description,
regulation_anchor = EXCLUDED.regulation_anchor,
related_quaidal_ids = EXCLUDED.related_quaidal_ids,
external_refs = EXCLUDED.external_refs,
source_framework = EXCLUDED.source_framework,
source_section = EXCLUDED.source_section,
source_url = EXCLUDED.source_url,
source_commit_sha = EXCLUDED.source_commit_sha,
source_title_original = EXCLUDED.source_title_original,
source_license_note = EXCLUDED.source_license_note,
plagiarism_score_at_generation = EXCLUDED.plagiarism_score_at_generation,
generated_by_model = EXCLUDED.generated_by_model,
yaml_path = EXCLUDED.yaml_path
"""
def load_yaml_records(yaml_path: Path) -> tuple[list[dict], str | None, str | None]:
if not yaml_path.exists():
return [], None, None
data = yaml.safe_load(yaml_path.read_text(encoding="utf-8"))
return data.get("controls", []), data.get("commit_sha"), data.get("generated_by_model")
def to_row(ctrl: dict, yaml_path: Path, default_model: str | None, default_commit: str | None) -> dict:
source = ctrl.get("source") or {}
return {
"derived_id": ctrl["id"],
"kind": ctrl["kind"],
"canonical_name": ctrl["canonical_name"],
"description": ctrl["description"],
"regulation_anchor": ctrl.get("regulation_anchor"),
"related_quaidal_ids": json.dumps(ctrl.get("related_quaidal_ids", []), ensure_ascii=False),
"external_refs": json.dumps(ctrl.get("external_refs", []), ensure_ascii=False),
"source_framework": source.get("framework", "BSI QUAIDAL"),
"source_section": source.get("section", ""),
"source_url": source.get("url"),
"source_commit_sha": source.get("commit_sha") or default_commit,
"source_title_original": source.get("title_original_de"),
"source_license_note": source.get("license_note"),
"plagiarism_score": ctrl.get("plagiarism_score_at_generation"),
"generated_by_model": default_model,
"yaml_path": str(yaml_path.relative_to(REPO_ROOT)),
}
def build_dsn(args: argparse.Namespace) -> str:
if args.dsn:
return args.dsn
return (
f"host={args.db_host} port={args.db_port} "
f"dbname={args.db_name} user={args.db_user} password={args.db_password}"
)
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--dsn", help="Full DSN; overrides individual flags")
ap.add_argument("--db-host", default=os.environ.get("DB_HOST", "localhost"))
ap.add_argument("--db-port", default=os.environ.get("DB_PORT", "5432"))
ap.add_argument("--db-name", default=os.environ.get("DB_NAME", "breakpilot_db"))
ap.add_argument("--db-user", default=os.environ.get("DB_USER", "breakpilot"))
ap.add_argument("--db-password", default=os.environ.get("DB_PASSWORD", "breakpilot"))
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
total = 0
rows: list[dict] = []
for kind, fname in KIND_FILES.items():
path = DATA_DIR / fname
records, commit, model = load_yaml_records(path)
for rec in records:
rows.append(to_row(rec, path, model, commit))
if records:
print(f" {fname}: {len(records)} entries", file=sys.stderr)
total += len(records)
if not rows:
print("ERROR: no YAML records found; run derive_quaidal_mcs.py first", file=sys.stderr)
return 2
print(f"Total rows: {total}", file=sys.stderr)
if args.dry_run:
print("Dry run — sample row:", file=sys.stderr)
print(json.dumps({k: (v[:200] if isinstance(v, str) else v) for k, v in rows[0].items()}, indent=2, ensure_ascii=False))
return 0
dsn = build_dsn(args)
print(f"Connecting to {args.db_host}:{args.db_port}/{args.db_name}", file=sys.stderr)
inserted = updated = 0
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
for row in rows:
cur.execute(
"SELECT 1 FROM compliance.derived_controls WHERE derived_id = %s",
(row["derived_id"],),
)
existed = cur.fetchone() is not None
cur.execute(UPSERT_SQL, row)
if existed:
updated += 1
else:
inserted += 1
conn.commit()
print(f"Inserted: {inserted}, Updated: {updated}", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,400 @@
#!/usr/bin/env python3
"""Clean-Room MC derivation from BSI QUAIDAL.
For each QUAIDAL entry in the parsed index, ask a local LLM to produce our own
wording for a Master Control / atomic control / mitigation / metric. Reject any
output whose 4-gram overlap with the BSI source text exceeds PLAGIARISM_LIMIT.
We never store the BSI prose; only our own derived wording plus structural
references (BSI section ID + URL + commit SHA).
Usage:
# Single entry, prints to stdout for review:
python3 control-pipeline/scripts/derive_quaidal_mcs.py --only QKB-01 --dry-run
# Full run, writes YAML:
python3 control-pipeline/scripts/derive_quaidal_mcs.py --ollama-host macmini
Output: control-pipeline/data/quaidal/{master_controls,atomic_controls,mitigations,metrics}.yaml
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from dataclasses import dataclass
from pathlib import Path
try:
import httpx
import yaml
except ImportError as e:
print(f"ERROR: missing dependency {e.name}. Install with: pip install httpx pyyaml", file=sys.stderr)
sys.exit(2)
REPO_ROOT = Path(__file__).resolve().parents[2]
SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal"
INDEX_FILE = REPO_ROOT / "control-pipeline" / "data" / "quaidal" / "quaidal_index.json"
OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
PLAGIARISM_LIMIT = 0.20 # max share of 4-grams that may appear in BSI source
N_GRAM = 4
MAX_RETRIES = 3
DEFAULT_OLLAMA_URL = "http://macmini:11434"
OLLAMA_MODEL = "qwen3.5:35b-a3b"
QUAIDAL_REPO_URL = "https://github.com/BSI-Bund/QUAIDAL"
KIND_TO_PROMPT_ROLE = {
"criterion": "Master Control",
"building_block": "atomarer technischer Control",
"measure": "Schutzmaßnahme",
"metric": "messbarer Qualitäts-Indikator",
}
KIND_TO_OUTPUT_FILE = {
"criterion": "master_controls.yaml",
"building_block": "atomic_controls.yaml",
"measure": "mitigations.yaml",
"metric": "metrics.yaml",
}
# ---------------------------------------------------------------------------
# Source-side extraction (kept in memory, never written to disk)
# ---------------------------------------------------------------------------
FRONTMATTER_RE = re.compile(r"^---\s*\n.*?\n---\s*\n", re.DOTALL)
SECTION_RE = re.compile(r"^###?\s+(.+?)\s*$", re.MULTILINE)
def load_source_extract(rel_path: str) -> dict:
"""Load BSI source text for ONE entry. Used only for prompt + plagiarism check."""
path = SOURCE_ROOT / rel_path
text = path.read_text(encoding="utf-8")
# Strip frontmatter; capture shortdesc separately for the prompt.
fm_match = re.match(r"^---\s*\n(.*?)\n---\s*\n", text, re.DOTALL)
shortdesc = ""
if fm_match:
for line in fm_match.group(1).splitlines():
if line.lower().startswith("shortdesc:"):
shortdesc = line.split(":", 1)[1].strip()
break
body = FRONTMATTER_RE.sub("", text, count=1)
# Pull the first 1-2 paragraphs under "Beschreibung" (or whole body if none)
desc_match = re.search(r"###?\s+Beschreibung\s*\n+(.+?)(?:\n###?\s|\Z)", body, re.DOTALL)
description_excerpt = desc_match.group(1).strip() if desc_match else body[:1500].strip()
paragraphs = [p.strip() for p in description_excerpt.split("\n\n") if p.strip()]
description_excerpt = "\n\n".join(paragraphs[:2])
return {
"shortdesc": shortdesc,
"description_excerpt": description_excerpt,
"full_body": body,
}
# ---------------------------------------------------------------------------
# Plagiarism gate
# ---------------------------------------------------------------------------
WORD_RE = re.compile(r"\b[\wäöüÄÖÜß]+\b", re.UNICODE)
def _tokenize(text: str) -> list[str]:
return [w.lower() for w in WORD_RE.findall(text)]
def ngram_overlap(produced: str, source: str, n: int = N_GRAM) -> float:
"""Share of produced n-grams that also appear in source."""
p_tokens = _tokenize(produced)
s_tokens = _tokenize(source)
if len(p_tokens) < n:
return 0.0
s_grams = {tuple(s_tokens[i : i + n]) for i in range(len(s_tokens) - n + 1)}
if not s_grams:
return 0.0
p_grams = [tuple(p_tokens[i : i + n]) for i in range(len(p_tokens) - n + 1)]
hits = sum(1 for g in p_grams if g in s_grams)
return hits / len(p_grams)
# ---------------------------------------------------------------------------
# LLM prompt + call
# ---------------------------------------------------------------------------
PROMPT_TEMPLATE = """Du bist Compliance-Engineer bei BreakPilot. Schreibe eine eigenständige Anforderung im Stil einer technischen Kontroll-Spezifikation.
Quelle: BSI QUAIDAL Sektion {entry_id} ("{title_de}"). Die Quelle steht unter unklarer Lizenz (BSI-Veröffentlichung, § 5 UrhG anwendbar) — wir dürfen die Idee aufgreifen, aber NICHT abschreiben.
Aufgabe: Formuliere eine eigenständige Anforderung im Stil eines {role}. Anforderungen:
- Eigene Formulierung in deutscher Sprache. Kein Satz darf aus der Quelle übernommen werden, auch nicht teilweise. Synonyme verwenden, Satzbau ändern, Inhalt strukturell anders aufbauen.
- 2-4 Sätze (max 80 Wörter).
- Sprachstil: nüchtern, technisch, normativ ("muss", "ist sicherzustellen", "ist zu prüfen").
- Bezug auf KI-Trainingsdaten oder KI-Datenqualität, je nach Quelle.
- Nicht die wörtlichen BSI-Beispiele kopieren.
Quellauszug (NUR zur Orientierung, NICHT abschreiben):
---
shortdesc: {shortdesc}
{description_excerpt}
---
Antwort: Liefere AUSSCHLIESSLICH die fertige Beschreibung als reinen Text — kein JSON, keine Überschriften, keine Anführungszeichen, keine Quellenangabe."""
def call_ollama(prompt: str, ollama_url: str, model: str, retries: int = 2) -> str:
last_err = None
for attempt in range(retries + 1):
try:
resp = httpx.post(
f"{ollama_url}/api/chat",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.4},
"think": False,
},
timeout=180.0,
)
resp.raise_for_status()
return resp.json()["message"]["content"].strip()
except (httpx.HTTPError, KeyError, ValueError) as e:
last_err = e
if attempt < retries:
time.sleep(2 ** attempt)
raise RuntimeError(f"Ollama call failed after {retries+1} attempts: {last_err}")
def strip_llm_artifacts(text: str) -> str:
"""Clean leading/trailing markdown and quotes from LLM output."""
text = text.strip()
# Strip surrounding code fences
if text.startswith("```"):
text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text)
# Strip surrounding quotes
text = text.strip('""”„')
# Drop a leading "Beschreibung:" or similar label
text = re.sub(r"^(Beschreibung|Description|Anforderung|Control):\s*", "", text, flags=re.IGNORECASE)
return text.strip()
# ---------------------------------------------------------------------------
# Derivation
# ---------------------------------------------------------------------------
@dataclass
class DerivedControl:
derived_id: str
source_id: str
kind: str
canonical_name: str
description: str
plagiarism_score: float
related_quaidal_ids: list[str]
external_refs: list[dict]
source: dict
_ASCII_FOLD = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "Ä": "ae", "Ö": "oe", "Ü": "ue", "ß": "ss"})
def slug(text: str) -> str:
text = text.translate(_ASCII_FOLD).lower()
text = re.sub(r"[^a-z0-9]+", "-", text)
return text.strip("-")
def derived_id_for(entry: dict) -> str:
prefix = {
"criterion": "MC-AI-DATA",
"building_block": "AC-AI-DATA",
"measure": "MIT-AI-DATA",
"metric": "MET-AI-DATA",
}.get(entry["kind"], "X-AI-DATA")
title = entry["title_de"]
title = re.sub(r"^\s*(QKB|QB|MA|QM)-\d+[a-zA-Z]?\s*", "", title)
return f"{prefix}-{entry['id']}-{slug(title)[:40]}".rstrip("-")
def derive_one(entry: dict, source_extract: dict, ollama_url: str, model: str, *, verbose: bool = False) -> DerivedControl:
role = KIND_TO_PROMPT_ROLE.get(entry["kind"], "Control")
prompt = PROMPT_TEMPLATE.format(
entry_id=entry["id"],
title_de=entry["title_de"],
role=role,
shortdesc=source_extract["shortdesc"] or "(keiner)",
description_excerpt=source_extract["description_excerpt"] or "(keine Beschreibung)",
)
source_corpus = "\n\n".join(filter(None, [source_extract["shortdesc"], source_extract["description_excerpt"]]))
best: tuple[str, float] | None = None
for attempt in range(1, MAX_RETRIES + 1):
output = call_ollama(prompt, ollama_url, model)
output = strip_llm_artifacts(output)
score = ngram_overlap(output, source_corpus)
if verbose:
print(f" attempt {attempt}: overlap={score:.2%} len={len(output)}", file=sys.stderr)
if score < PLAGIARISM_LIMIT:
best = (output, score)
break
if best is None or score < best[1]:
best = (output, score)
# Strengthen the next prompt by appending a reject notice
prompt += f"\n\n(Vorheriger Versuch hatte {score:.0%} Wortdeckung mit der Quelle. Verwende völlig andere Begriffe und Satzstruktur.)"
if best is None:
raise RuntimeError(f"Could not derive {entry['id']}: no output")
output, score = best
if score >= PLAGIARISM_LIMIT:
raise RuntimeError(
f"Plagiarism gate failed for {entry['id']}: best overlap {score:.2%} >= limit {PLAGIARISM_LIMIT:.0%}.\n"
f"Output:\n{output}"
)
title_de_clean = re.sub(r"^\s*(QKB|QB|MA|QM)-\d+[a-zA-Z]?\s*", "", entry["title_de"]).strip()
return DerivedControl(
derived_id=derived_id_for(entry),
source_id=entry["id"],
kind=entry["kind"],
canonical_name=title_de_clean or entry["title_de"],
description=output,
plagiarism_score=round(score, 4),
related_quaidal_ids=entry["referenced_ids"],
external_refs=entry["external_refs"],
source={
"framework": "BSI QUAIDAL",
"section": entry["id"],
"title_original_de": entry["title_de"],
"url": f"{QUAIDAL_REPO_URL}/blob/main/{entry['source_path'].replace(' ', '%20')}",
"commit_sha": None, # filled in by main()
"license_note": "§ 5 UrhG anwendbar; share:true im Frontmatter; Clean-Room-Ableitung.",
},
)
# ---------------------------------------------------------------------------
# Output writers
# ---------------------------------------------------------------------------
def control_to_dict(c: DerivedControl) -> dict:
d = {
"id": c.derived_id,
"canonical_name": c.canonical_name,
"description": c.description,
"kind": c.kind,
"regulation_anchor": "EU AI Act Art. 10 (Datenqualität für Hochrisiko-KI)",
"related_quaidal_ids": c.related_quaidal_ids,
"external_refs": c.external_refs,
"source": c.source,
"plagiarism_score_at_generation": c.plagiarism_score,
}
return d
def write_yaml_per_kind(controls: list[DerivedControl], commit_sha: str | None) -> dict[str, Path]:
out: dict[str, list[dict]] = {}
for c in controls:
c.source["commit_sha"] = commit_sha
fname = KIND_TO_OUTPUT_FILE.get(c.kind, "other.yaml")
out.setdefault(fname, []).append(control_to_dict(c))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
written: dict[str, Path] = {}
for fname, items in out.items():
path = OUTPUT_DIR / fname
payload = {
"source": "Derived from BSI QUAIDAL (Clean-Room)",
"source_url": QUAIDAL_REPO_URL,
"commit_sha": commit_sha,
"plagiarism_limit_4gram": PLAGIARISM_LIMIT,
"generated_by_model": OLLAMA_MODEL,
"controls": items,
}
path.write_text(yaml.safe_dump(payload, allow_unicode=True, sort_keys=False), encoding="utf-8")
written[fname] = path
return written
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--only", help="Derive only this QUAIDAL ID (e.g. QKB-01)")
ap.add_argument("--kind", help="Derive only entries of this kind (criterion/building_block/measure/metric)")
ap.add_argument("--limit", type=int, help="Process at most N entries")
ap.add_argument("--dry-run", action="store_true", help="Print derived controls instead of writing YAML")
ap.add_argument("--ollama-host", default="macmini", help="Ollama host (default: macmini)")
ap.add_argument("--model", default=OLLAMA_MODEL)
ap.add_argument("--verbose", action="store_true")
args = ap.parse_args()
if not INDEX_FILE.exists():
print(f"ERROR: missing index. Run ingest_bsi_quaidal.py first ({INDEX_FILE})", file=sys.stderr)
return 2
index = json.loads(INDEX_FILE.read_text(encoding="utf-8"))
entries = index["entries"]
if args.only:
entries = [e for e in entries if e["id"].upper() == args.only.upper()]
if args.kind:
entries = [e for e in entries if e["kind"] == args.kind]
if args.limit:
entries = entries[: args.limit]
if not entries:
print("No entries match the filter.", file=sys.stderr)
return 1
ollama_url = args.ollama_host if "://" in args.ollama_host else f"http://{args.ollama_host}:11434"
print(f"Derivation: {len(entries)} entries, model={args.model}, ollama={ollama_url}, limit={PLAGIARISM_LIMIT:.0%}", file=sys.stderr)
derived: list[DerivedControl] = []
failed: list[tuple[str, str]] = []
for i, entry in enumerate(entries, 1):
if args.verbose:
print(f"[{i}/{len(entries)}] {entry['id']} ({entry['kind']}): {entry['title_de']}", file=sys.stderr)
try:
extract = load_source_extract(entry["source_path"])
ctrl = derive_one(entry, extract, ollama_url, args.model, verbose=args.verbose)
derived.append(ctrl)
except Exception as exc: # noqa: BLE001
failed.append((entry["id"], str(exc)))
print(f" FAILED {entry['id']}: {exc}", file=sys.stderr)
print(f"\nDerived: {len(derived)} | Failed: {len(failed)}", file=sys.stderr)
if args.dry_run:
for c in derived:
c.source["commit_sha"] = index.get("commit_sha")
print(yaml.safe_dump(control_to_dict(c), allow_unicode=True, sort_keys=False))
print("---")
return 0 if not failed else 1
written = write_yaml_per_kind(derived, index.get("commit_sha"))
for fname, path in written.items():
print(f"Wrote {path.relative_to(REPO_ROOT)} ({sum(1 for c in derived if KIND_TO_OUTPUT_FILE[c.kind] == fname)} entries)", file=sys.stderr)
if failed:
print("\nFailures:", file=sys.stderr)
for fid, msg in failed:
print(f" - {fid}: {msg.splitlines()[0]}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,242 @@
#!/usr/bin/env python3
"""Parse BSI QUAIDAL Markdown catalog into a structural index.
Clean-Room principle: this script does NOT persist any QUAIDAL prose to disk.
It only extracts non-protectable structural facts (IDs, type, file paths,
cross-references to other QUAIDAL entries, references to external norms).
The derivation step (derive_quaidal_mcs.py) reads the index plus the original
.md files from the gitignored clone and asks the LLM to produce our own
wordings, never copying the BSI prose into our own controls/database.
Input: legal-sources/bsi-quaidal/0000_Markdown/**/*.md (gitignored clone)
Output: control-pipeline/data/quaidal/quaidal_index.json (structural only)
Usage:
python3 control-pipeline/scripts/ingest_bsi_quaidal.py
python3 control-pipeline/scripts/ingest_bsi_quaidal.py --check # validate only
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import asdict, dataclass, field
from pathlib import Path
try:
import yaml
except ImportError:
print("ERROR: PyYAML missing. Install with: pip install pyyaml", file=sys.stderr)
sys.exit(2)
REPO_ROOT = Path(__file__).resolve().parents[2]
SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal"
MARKDOWN_ROOT = SOURCE_ROOT / "0000_Markdown"
OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
OUTPUT_FILE = OUTPUT_DIR / "quaidal_index.json"
# Map folder name -> our internal kind. Sub-folders inside the Methoden tree
# (e.g. "QM-10_Dimension Reduction") are treated as method variants of their
# parent QM.
KIND_BY_PARENT_DIR = {
"0000_Qualitätskriterien": "criterion", # QKB → Master Control candidates
"0001_Qualitätsbausteine": "building_block", # QB → atomic controls
"0002_Maßnahmen": "measure", # M → mitigations
"0003_Qualitätsmetriken_methoden": "metric", # QM → runtime check / metric
"0002_Referenz-Matrizen": "matrix", # cross-walk matrix
"9998_CustomTemplates": "template",
}
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
ID_RE = re.compile(r"\b((?:QKB|QB|MA|QM)-\d+[a-zA-Z]?)", re.IGNORECASE)
@dataclass
class IndexEntry:
id: str # Canonical ID: QKB-01, QB-03, M-12, QM-07
kind: str # criterion / building_block / measure / metric / matrix / template
title_de: str
title_en: str
source_path: str # relative to SOURCE_ROOT
referenced_ids: list[str] = field(default_factory=list) # other QUAIDAL IDs linked in this file
external_refs: list[dict] = field(default_factory=list) # {framework, citation, ref_id}
tags: list[str] = field(default_factory=list)
share: bool | None = None
def parse_frontmatter(text: str) -> dict:
m = FRONTMATTER_RE.match(text)
if not m:
return {}
try:
return yaml.safe_load(m.group(1)) or {}
except yaml.YAMLError:
return {}
def canonical_id(raw_id: str | list | None, filename: str) -> str | None:
"""QUAIDAL files sometimes list multiple IDs or odd casing — normalise."""
candidates: list[str] = []
if isinstance(raw_id, list):
candidates.extend(str(x) for x in raw_id)
elif isinstance(raw_id, str):
candidates.append(raw_id)
# Fallback: derive from filename
candidates.append(filename)
for c in candidates:
m = ID_RE.search(c)
if m:
return m.group(1).upper().replace(" ", "-")
return None
def determine_kind(path: Path) -> str:
for parent in path.parents:
if parent.name in KIND_BY_PARENT_DIR:
return KIND_BY_PARENT_DIR[parent.name]
return "unknown"
def collect_referenced_ids(body: str, own_id: str) -> list[str]:
found = {m.group(1).upper() for m in ID_RE.finditer(body)}
found.discard(own_id)
return sorted(found)
REF_FRAMEWORKS = [
("AI Act", ["AI-Act", "AI Act", "Verordnung (EU) 2024/1689", "KI-VO"]),
("EU GDPR", ["DSGVO", "Verordnung (EU) 2016/679", "GDPR"]),
("ISO/IEC 25012", ["ISO/IEC 25012", "ISO 25012"]),
("ISO/IEC 25024", ["ISO/IEC 25024", "ISO 25024"]),
("ISO/IEC 23894", ["ISO/IEC 23894", "ISO 23894"]),
("ISO/IEC 42001", ["ISO/IEC 42001", "ISO 42001"]),
("NIST AI RMF", ["NIST AI RMF", "AI Risk Management Framework"]),
("BSI Grundschutz", ["IT-Grundschutz", "Grundschutz"]),
("BSI AIC4", ["AIC4", "AI Cloud Service Compliance Criteria"]),
]
def detect_external_refs(body: str) -> list[dict]:
refs: list[dict] = []
seen: set[tuple[str, str]] = set()
# Section "Referenzen" tables — pick up first column ref-id and first
# textual hit of the framework. We do NOT store the BSI "Kurzbeschr."
# column to avoid copying their prose.
for line in body.splitlines():
for framework, patterns in REF_FRAMEWORKS:
for pat in patterns:
if pat.lower() in line.lower():
# Try to grab an article/section nearby (e.g. "Artikel 10")
art = re.search(r"(Artikel|Art\.?|Section|§)\s*([0-9]+[a-z]?)", line, re.IGNORECASE)
citation = f"{art.group(1)} {art.group(2)}" if art else None
key = (framework, citation or "")
if key in seen:
continue
seen.add(key)
refs.append({"framework": framework, "citation": citation})
break
return refs
def parse_file(path: Path) -> IndexEntry | None:
text = path.read_text(encoding="utf-8")
fm = parse_frontmatter(text)
body = text[text.find("---", 3) + 3 :] if text.startswith("---") else text
own_id = canonical_id(fm.get("ID"), path.stem)
if not own_id:
return None
title_de = str(fm.get("TitleGer") or fm.get("Title") or path.stem).strip()
title_en = str(fm.get("Title") or "").strip()
tags_raw = fm.get("tags") or []
if isinstance(tags_raw, str):
tags_raw = [tags_raw]
tags = [str(t).strip() for t in tags_raw if t]
share_val = fm.get("share")
share = bool(share_val) if share_val is not None else None
return IndexEntry(
id=own_id,
kind=determine_kind(path),
title_de=title_de,
title_en=title_en,
source_path=str(path.relative_to(SOURCE_ROOT)),
referenced_ids=collect_referenced_ids(body, own_id),
external_refs=detect_external_refs(body),
tags=tags,
share=share,
)
def get_commit_sha() -> str | None:
try:
out = subprocess.run(
["git", "-C", str(SOURCE_ROOT), "rev-parse", "HEAD"],
capture_output=True,
text=True,
check=True,
)
return out.stdout.strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--check", action="store_true", help="Parse + validate, do not write output")
args = ap.parse_args()
if not MARKDOWN_ROOT.exists():
print(f"ERROR: clone not found at {SOURCE_ROOT}", file=sys.stderr)
print("Run: git clone --depth=1 https://github.com/BSI-Bund/QUAIDAL.git legal-sources/bsi-quaidal", file=sys.stderr)
return 2
entries: list[IndexEntry] = []
skipped: list[Path] = []
for path in sorted(MARKDOWN_ROOT.rglob("*.md")):
entry = parse_file(path)
if entry is None:
skipped.append(path)
continue
entries.append(entry)
by_kind: dict[str, int] = {}
for e in entries:
by_kind[e.kind] = by_kind.get(e.kind, 0) + 1
print(f"Parsed {len(entries)} entries (skipped {len(skipped)} without ID):")
for kind, count in sorted(by_kind.items()):
print(f" {kind:18s} {count}")
if args.check:
return 0
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
payload = {
"source": "BSI QUAIDAL",
"source_url": "https://github.com/BSI-Bund/QUAIDAL",
"commit_sha": get_commit_sha(),
"license_note": (
"BSI-Veroeffentlichung. Repo enthaelt keine SPDX-Lizenzdatei. "
"Frontmatter share:true. Veroeffentlichung durch Bundesbehoerde, "
"§ 5 UrhG (amtliche Werke) anwendbar. BSI hat 05/2026 die Annahme "
"CC-BY-SA-4.0 in unserer Anfrage nicht widersprochen, aber auch "
"nicht aktiv bestaetigt. Wir derivieren Clean-Room (eigene "
"Formulierungen, nur Referenz auf BSI QUAIDAL Sektion)."
),
"entries": [asdict(e) for e in entries],
}
OUTPUT_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nWrote index: {OUTPUT_FILE.relative_to(REPO_ROOT)}")
print(f"Commit SHA: {payload['commit_sha']}")
return 0
if __name__ == "__main__":
sys.exit(main())