Files
breakpilot-core/control-pipeline/scripts/derive_quaidal_mcs.py
T
Benjamin Admin 7d721a6787
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 40s
CI / test-python-voice (push) Successful in 36s
CI / test-bqas (push) Successful in 33s
feat(control-pipeline): BSI QUAIDAL Clean-Room ingestion (AI Act Art. 10)
Clean-Room derivation of 195 controls from BSI QUAIDAL (10 criteria + 15
building blocks + 30 measures + 140 metrics) for EU AI Act Art. 10
training-data quality compliance.

- ingest_bsi_quaidal.py parses YAML frontmatter into a structural index
  (no protected prose stored on disk).
- derive_quaidal_mcs.py rewrites each entry via local LLM (qwen3.5:35b-a3b)
  with a hard 4-gram plagiarism gate < 20%; achieved mean overlap 0.5%.
- Migration 011 adds compliance.derived_controls table with full source
  provenance (framework, section, url, commit SHA, license note).
- apply_quaidal_to_db.py UPSERTs YAML into DB.
- Source repo (legal-sources/bsi-quaidal/) gitignored.

Same pattern as IACE module DIN-reference handling: name the norm and
section, never quote.

Backed by BSI license clarification 2026-05: § 5 UrhG anwendbar,
share:true im Frontmatter; Clean-Room derivation is the safe path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 13:02:49 +02:00

401 lines
15 KiB
Python

#!/usr/bin/env python3
"""Clean-Room MC derivation from BSI QUAIDAL.
For each QUAIDAL entry in the parsed index, ask a local LLM to produce our own
wording for a Master Control / atomic control / mitigation / metric. Reject any
output whose 4-gram overlap with the BSI source text exceeds PLAGIARISM_LIMIT.
We never store the BSI prose; only our own derived wording plus structural
references (BSI section ID + URL + commit SHA).
Usage:
# Single entry, prints to stdout for review:
python3 control-pipeline/scripts/derive_quaidal_mcs.py --only QKB-01 --dry-run
# Full run, writes YAML:
python3 control-pipeline/scripts/derive_quaidal_mcs.py --ollama-host macmini
Output: control-pipeline/data/quaidal/{master_controls,atomic_controls,mitigations,metrics}.yaml
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from dataclasses import dataclass
from pathlib import Path
try:
import httpx
import yaml
except ImportError as e:
print(f"ERROR: missing dependency {e.name}. Install with: pip install httpx pyyaml", file=sys.stderr)
sys.exit(2)
REPO_ROOT = Path(__file__).resolve().parents[2]
SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal"
INDEX_FILE = REPO_ROOT / "control-pipeline" / "data" / "quaidal" / "quaidal_index.json"
OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
PLAGIARISM_LIMIT = 0.20 # max share of 4-grams that may appear in BSI source
N_GRAM = 4
MAX_RETRIES = 3
DEFAULT_OLLAMA_URL = "http://macmini:11434"
OLLAMA_MODEL = "qwen3.5:35b-a3b"
QUAIDAL_REPO_URL = "https://github.com/BSI-Bund/QUAIDAL"
KIND_TO_PROMPT_ROLE = {
"criterion": "Master Control",
"building_block": "atomarer technischer Control",
"measure": "Schutzmaßnahme",
"metric": "messbarer Qualitäts-Indikator",
}
KIND_TO_OUTPUT_FILE = {
"criterion": "master_controls.yaml",
"building_block": "atomic_controls.yaml",
"measure": "mitigations.yaml",
"metric": "metrics.yaml",
}
# ---------------------------------------------------------------------------
# Source-side extraction (kept in memory, never written to disk)
# ---------------------------------------------------------------------------
FRONTMATTER_RE = re.compile(r"^---\s*\n.*?\n---\s*\n", re.DOTALL)
SECTION_RE = re.compile(r"^###?\s+(.+?)\s*$", re.MULTILINE)
def load_source_extract(rel_path: str) -> dict:
"""Load BSI source text for ONE entry. Used only for prompt + plagiarism check."""
path = SOURCE_ROOT / rel_path
text = path.read_text(encoding="utf-8")
# Strip frontmatter; capture shortdesc separately for the prompt.
fm_match = re.match(r"^---\s*\n(.*?)\n---\s*\n", text, re.DOTALL)
shortdesc = ""
if fm_match:
for line in fm_match.group(1).splitlines():
if line.lower().startswith("shortdesc:"):
shortdesc = line.split(":", 1)[1].strip()
break
body = FRONTMATTER_RE.sub("", text, count=1)
# Pull the first 1-2 paragraphs under "Beschreibung" (or whole body if none)
desc_match = re.search(r"###?\s+Beschreibung\s*\n+(.+?)(?:\n###?\s|\Z)", body, re.DOTALL)
description_excerpt = desc_match.group(1).strip() if desc_match else body[:1500].strip()
paragraphs = [p.strip() for p in description_excerpt.split("\n\n") if p.strip()]
description_excerpt = "\n\n".join(paragraphs[:2])
return {
"shortdesc": shortdesc,
"description_excerpt": description_excerpt,
"full_body": body,
}
# ---------------------------------------------------------------------------
# Plagiarism gate
# ---------------------------------------------------------------------------
WORD_RE = re.compile(r"\b[\wäöüÄÖÜß]+\b", re.UNICODE)
def _tokenize(text: str) -> list[str]:
return [w.lower() for w in WORD_RE.findall(text)]
def ngram_overlap(produced: str, source: str, n: int = N_GRAM) -> float:
"""Share of produced n-grams that also appear in source."""
p_tokens = _tokenize(produced)
s_tokens = _tokenize(source)
if len(p_tokens) < n:
return 0.0
s_grams = {tuple(s_tokens[i : i + n]) for i in range(len(s_tokens) - n + 1)}
if not s_grams:
return 0.0
p_grams = [tuple(p_tokens[i : i + n]) for i in range(len(p_tokens) - n + 1)]
hits = sum(1 for g in p_grams if g in s_grams)
return hits / len(p_grams)
# ---------------------------------------------------------------------------
# LLM prompt + call
# ---------------------------------------------------------------------------
PROMPT_TEMPLATE = """Du bist Compliance-Engineer bei BreakPilot. Schreibe eine eigenständige Anforderung im Stil einer technischen Kontroll-Spezifikation.
Quelle: BSI QUAIDAL Sektion {entry_id} ("{title_de}"). Die Quelle steht unter unklarer Lizenz (BSI-Veröffentlichung, § 5 UrhG anwendbar) — wir dürfen die Idee aufgreifen, aber NICHT abschreiben.
Aufgabe: Formuliere eine eigenständige Anforderung im Stil eines {role}. Anforderungen:
- Eigene Formulierung in deutscher Sprache. Kein Satz darf aus der Quelle übernommen werden, auch nicht teilweise. Synonyme verwenden, Satzbau ändern, Inhalt strukturell anders aufbauen.
- 2-4 Sätze (max 80 Wörter).
- Sprachstil: nüchtern, technisch, normativ ("muss", "ist sicherzustellen", "ist zu prüfen").
- Bezug auf KI-Trainingsdaten oder KI-Datenqualität, je nach Quelle.
- Nicht die wörtlichen BSI-Beispiele kopieren.
Quellauszug (NUR zur Orientierung, NICHT abschreiben):
---
shortdesc: {shortdesc}
{description_excerpt}
---
Antwort: Liefere AUSSCHLIESSLICH die fertige Beschreibung als reinen Text — kein JSON, keine Überschriften, keine Anführungszeichen, keine Quellenangabe."""
def call_ollama(prompt: str, ollama_url: str, model: str, retries: int = 2) -> str:
last_err = None
for attempt in range(retries + 1):
try:
resp = httpx.post(
f"{ollama_url}/api/chat",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.4},
"think": False,
},
timeout=180.0,
)
resp.raise_for_status()
return resp.json()["message"]["content"].strip()
except (httpx.HTTPError, KeyError, ValueError) as e:
last_err = e
if attempt < retries:
time.sleep(2 ** attempt)
raise RuntimeError(f"Ollama call failed after {retries+1} attempts: {last_err}")
def strip_llm_artifacts(text: str) -> str:
"""Clean leading/trailing markdown and quotes from LLM output."""
text = text.strip()
# Strip surrounding code fences
if text.startswith("```"):
text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text)
# Strip surrounding quotes
text = text.strip('""”„')
# Drop a leading "Beschreibung:" or similar label
text = re.sub(r"^(Beschreibung|Description|Anforderung|Control):\s*", "", text, flags=re.IGNORECASE)
return text.strip()
# ---------------------------------------------------------------------------
# Derivation
# ---------------------------------------------------------------------------
@dataclass
class DerivedControl:
derived_id: str
source_id: str
kind: str
canonical_name: str
description: str
plagiarism_score: float
related_quaidal_ids: list[str]
external_refs: list[dict]
source: dict
_ASCII_FOLD = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "Ä": "ae", "Ö": "oe", "Ü": "ue", "ß": "ss"})
def slug(text: str) -> str:
text = text.translate(_ASCII_FOLD).lower()
text = re.sub(r"[^a-z0-9]+", "-", text)
return text.strip("-")
def derived_id_for(entry: dict) -> str:
prefix = {
"criterion": "MC-AI-DATA",
"building_block": "AC-AI-DATA",
"measure": "MIT-AI-DATA",
"metric": "MET-AI-DATA",
}.get(entry["kind"], "X-AI-DATA")
title = entry["title_de"]
title = re.sub(r"^\s*(QKB|QB|MA|QM)-\d+[a-zA-Z]?\s*", "", title)
return f"{prefix}-{entry['id']}-{slug(title)[:40]}".rstrip("-")
def derive_one(entry: dict, source_extract: dict, ollama_url: str, model: str, *, verbose: bool = False) -> DerivedControl:
role = KIND_TO_PROMPT_ROLE.get(entry["kind"], "Control")
prompt = PROMPT_TEMPLATE.format(
entry_id=entry["id"],
title_de=entry["title_de"],
role=role,
shortdesc=source_extract["shortdesc"] or "(keiner)",
description_excerpt=source_extract["description_excerpt"] or "(keine Beschreibung)",
)
source_corpus = "\n\n".join(filter(None, [source_extract["shortdesc"], source_extract["description_excerpt"]]))
best: tuple[str, float] | None = None
for attempt in range(1, MAX_RETRIES + 1):
output = call_ollama(prompt, ollama_url, model)
output = strip_llm_artifacts(output)
score = ngram_overlap(output, source_corpus)
if verbose:
print(f" attempt {attempt}: overlap={score:.2%} len={len(output)}", file=sys.stderr)
if score < PLAGIARISM_LIMIT:
best = (output, score)
break
if best is None or score < best[1]:
best = (output, score)
# Strengthen the next prompt by appending a reject notice
prompt += f"\n\n(Vorheriger Versuch hatte {score:.0%} Wortdeckung mit der Quelle. Verwende völlig andere Begriffe und Satzstruktur.)"
if best is None:
raise RuntimeError(f"Could not derive {entry['id']}: no output")
output, score = best
if score >= PLAGIARISM_LIMIT:
raise RuntimeError(
f"Plagiarism gate failed for {entry['id']}: best overlap {score:.2%} >= limit {PLAGIARISM_LIMIT:.0%}.\n"
f"Output:\n{output}"
)
title_de_clean = re.sub(r"^\s*(QKB|QB|MA|QM)-\d+[a-zA-Z]?\s*", "", entry["title_de"]).strip()
return DerivedControl(
derived_id=derived_id_for(entry),
source_id=entry["id"],
kind=entry["kind"],
canonical_name=title_de_clean or entry["title_de"],
description=output,
plagiarism_score=round(score, 4),
related_quaidal_ids=entry["referenced_ids"],
external_refs=entry["external_refs"],
source={
"framework": "BSI QUAIDAL",
"section": entry["id"],
"title_original_de": entry["title_de"],
"url": f"{QUAIDAL_REPO_URL}/blob/main/{entry['source_path'].replace(' ', '%20')}",
"commit_sha": None, # filled in by main()
"license_note": "§ 5 UrhG anwendbar; share:true im Frontmatter; Clean-Room-Ableitung.",
},
)
# ---------------------------------------------------------------------------
# Output writers
# ---------------------------------------------------------------------------
def control_to_dict(c: DerivedControl) -> dict:
d = {
"id": c.derived_id,
"canonical_name": c.canonical_name,
"description": c.description,
"kind": c.kind,
"regulation_anchor": "EU AI Act Art. 10 (Datenqualität für Hochrisiko-KI)",
"related_quaidal_ids": c.related_quaidal_ids,
"external_refs": c.external_refs,
"source": c.source,
"plagiarism_score_at_generation": c.plagiarism_score,
}
return d
def write_yaml_per_kind(controls: list[DerivedControl], commit_sha: str | None) -> dict[str, Path]:
out: dict[str, list[dict]] = {}
for c in controls:
c.source["commit_sha"] = commit_sha
fname = KIND_TO_OUTPUT_FILE.get(c.kind, "other.yaml")
out.setdefault(fname, []).append(control_to_dict(c))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
written: dict[str, Path] = {}
for fname, items in out.items():
path = OUTPUT_DIR / fname
payload = {
"source": "Derived from BSI QUAIDAL (Clean-Room)",
"source_url": QUAIDAL_REPO_URL,
"commit_sha": commit_sha,
"plagiarism_limit_4gram": PLAGIARISM_LIMIT,
"generated_by_model": OLLAMA_MODEL,
"controls": items,
}
path.write_text(yaml.safe_dump(payload, allow_unicode=True, sort_keys=False), encoding="utf-8")
written[fname] = path
return written
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--only", help="Derive only this QUAIDAL ID (e.g. QKB-01)")
ap.add_argument("--kind", help="Derive only entries of this kind (criterion/building_block/measure/metric)")
ap.add_argument("--limit", type=int, help="Process at most N entries")
ap.add_argument("--dry-run", action="store_true", help="Print derived controls instead of writing YAML")
ap.add_argument("--ollama-host", default="macmini", help="Ollama host (default: macmini)")
ap.add_argument("--model", default=OLLAMA_MODEL)
ap.add_argument("--verbose", action="store_true")
args = ap.parse_args()
if not INDEX_FILE.exists():
print(f"ERROR: missing index. Run ingest_bsi_quaidal.py first ({INDEX_FILE})", file=sys.stderr)
return 2
index = json.loads(INDEX_FILE.read_text(encoding="utf-8"))
entries = index["entries"]
if args.only:
entries = [e for e in entries if e["id"].upper() == args.only.upper()]
if args.kind:
entries = [e for e in entries if e["kind"] == args.kind]
if args.limit:
entries = entries[: args.limit]
if not entries:
print("No entries match the filter.", file=sys.stderr)
return 1
ollama_url = args.ollama_host if "://" in args.ollama_host else f"http://{args.ollama_host}:11434"
print(f"Derivation: {len(entries)} entries, model={args.model}, ollama={ollama_url}, limit={PLAGIARISM_LIMIT:.0%}", file=sys.stderr)
derived: list[DerivedControl] = []
failed: list[tuple[str, str]] = []
for i, entry in enumerate(entries, 1):
if args.verbose:
print(f"[{i}/{len(entries)}] {entry['id']} ({entry['kind']}): {entry['title_de']}", file=sys.stderr)
try:
extract = load_source_extract(entry["source_path"])
ctrl = derive_one(entry, extract, ollama_url, args.model, verbose=args.verbose)
derived.append(ctrl)
except Exception as exc: # noqa: BLE001
failed.append((entry["id"], str(exc)))
print(f" FAILED {entry['id']}: {exc}", file=sys.stderr)
print(f"\nDerived: {len(derived)} | Failed: {len(failed)}", file=sys.stderr)
if args.dry_run:
for c in derived:
c.source["commit_sha"] = index.get("commit_sha")
print(yaml.safe_dump(control_to_dict(c), allow_unicode=True, sort_keys=False))
print("---")
return 0 if not failed else 1
written = write_yaml_per_kind(derived, index.get("commit_sha"))
for fname, path in written.items():
print(f"Wrote {path.relative_to(REPO_ROOT)} ({sum(1 for c in derived if KIND_TO_OUTPUT_FILE[c.kind] == fname)} entries)", file=sys.stderr)
if failed:
print("\nFailures:", file=sys.stderr)
for fid, msg in failed:
print(f" - {fid}: {msg.splitlines()[0]}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())