7d721a6787
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 40s
CI / test-python-voice (push) Successful in 36s
CI / test-bqas (push) Successful in 33s
Clean-Room derivation of 195 controls from BSI QUAIDAL (10 criteria + 15 building blocks + 30 measures + 140 metrics) for EU AI Act Art. 10 training-data quality compliance. - ingest_bsi_quaidal.py parses YAML frontmatter into a structural index (no protected prose stored on disk). - derive_quaidal_mcs.py rewrites each entry via local LLM (qwen3.5:35b-a3b) with a hard 4-gram plagiarism gate < 20%; achieved mean overlap 0.5%. - Migration 011 adds compliance.derived_controls table with full source provenance (framework, section, url, commit SHA, license note). - apply_quaidal_to_db.py UPSERTs YAML into DB. - Source repo (legal-sources/bsi-quaidal/) gitignored. Same pattern as IACE module DIN-reference handling: name the norm and section, never quote. Backed by BSI license clarification 2026-05: § 5 UrhG anwendbar, share:true im Frontmatter; Clean-Room derivation is the safe path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
401 lines
15 KiB
Python
401 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Clean-Room MC derivation from BSI QUAIDAL.
|
|
|
|
For each QUAIDAL entry in the parsed index, ask a local LLM to produce our own
|
|
wording for a Master Control / atomic control / mitigation / metric. Reject any
|
|
output whose 4-gram overlap with the BSI source text exceeds PLAGIARISM_LIMIT.
|
|
|
|
We never store the BSI prose; only our own derived wording plus structural
|
|
references (BSI section ID + URL + commit SHA).
|
|
|
|
Usage:
|
|
# Single entry, prints to stdout for review:
|
|
python3 control-pipeline/scripts/derive_quaidal_mcs.py --only QKB-01 --dry-run
|
|
|
|
# Full run, writes YAML:
|
|
python3 control-pipeline/scripts/derive_quaidal_mcs.py --ollama-host macmini
|
|
|
|
Output: control-pipeline/data/quaidal/{master_controls,atomic_controls,mitigations,metrics}.yaml
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import httpx
|
|
import yaml
|
|
except ImportError as e:
|
|
print(f"ERROR: missing dependency {e.name}. Install with: pip install httpx pyyaml", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
SOURCE_ROOT = REPO_ROOT / "legal-sources" / "bsi-quaidal"
|
|
INDEX_FILE = REPO_ROOT / "control-pipeline" / "data" / "quaidal" / "quaidal_index.json"
|
|
OUTPUT_DIR = REPO_ROOT / "control-pipeline" / "data" / "quaidal"
|
|
|
|
PLAGIARISM_LIMIT = 0.20 # max share of 4-grams that may appear in BSI source
|
|
N_GRAM = 4
|
|
MAX_RETRIES = 3
|
|
|
|
DEFAULT_OLLAMA_URL = "http://macmini:11434"
|
|
OLLAMA_MODEL = "qwen3.5:35b-a3b"
|
|
QUAIDAL_REPO_URL = "https://github.com/BSI-Bund/QUAIDAL"
|
|
|
|
KIND_TO_PROMPT_ROLE = {
|
|
"criterion": "Master Control",
|
|
"building_block": "atomarer technischer Control",
|
|
"measure": "Schutzmaßnahme",
|
|
"metric": "messbarer Qualitäts-Indikator",
|
|
}
|
|
|
|
KIND_TO_OUTPUT_FILE = {
|
|
"criterion": "master_controls.yaml",
|
|
"building_block": "atomic_controls.yaml",
|
|
"measure": "mitigations.yaml",
|
|
"metric": "metrics.yaml",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Source-side extraction (kept in memory, never written to disk)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
FRONTMATTER_RE = re.compile(r"^---\s*\n.*?\n---\s*\n", re.DOTALL)
|
|
SECTION_RE = re.compile(r"^###?\s+(.+?)\s*$", re.MULTILINE)
|
|
|
|
|
|
def load_source_extract(rel_path: str) -> dict:
|
|
"""Load BSI source text for ONE entry. Used only for prompt + plagiarism check."""
|
|
path = SOURCE_ROOT / rel_path
|
|
text = path.read_text(encoding="utf-8")
|
|
|
|
# Strip frontmatter; capture shortdesc separately for the prompt.
|
|
fm_match = re.match(r"^---\s*\n(.*?)\n---\s*\n", text, re.DOTALL)
|
|
shortdesc = ""
|
|
if fm_match:
|
|
for line in fm_match.group(1).splitlines():
|
|
if line.lower().startswith("shortdesc:"):
|
|
shortdesc = line.split(":", 1)[1].strip()
|
|
break
|
|
body = FRONTMATTER_RE.sub("", text, count=1)
|
|
|
|
# Pull the first 1-2 paragraphs under "Beschreibung" (or whole body if none)
|
|
desc_match = re.search(r"###?\s+Beschreibung\s*\n+(.+?)(?:\n###?\s|\Z)", body, re.DOTALL)
|
|
description_excerpt = desc_match.group(1).strip() if desc_match else body[:1500].strip()
|
|
paragraphs = [p.strip() for p in description_excerpt.split("\n\n") if p.strip()]
|
|
description_excerpt = "\n\n".join(paragraphs[:2])
|
|
|
|
return {
|
|
"shortdesc": shortdesc,
|
|
"description_excerpt": description_excerpt,
|
|
"full_body": body,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plagiarism gate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
WORD_RE = re.compile(r"\b[\wäöüÄÖÜß]+\b", re.UNICODE)
|
|
|
|
|
|
def _tokenize(text: str) -> list[str]:
|
|
return [w.lower() for w in WORD_RE.findall(text)]
|
|
|
|
|
|
def ngram_overlap(produced: str, source: str, n: int = N_GRAM) -> float:
|
|
"""Share of produced n-grams that also appear in source."""
|
|
p_tokens = _tokenize(produced)
|
|
s_tokens = _tokenize(source)
|
|
if len(p_tokens) < n:
|
|
return 0.0
|
|
s_grams = {tuple(s_tokens[i : i + n]) for i in range(len(s_tokens) - n + 1)}
|
|
if not s_grams:
|
|
return 0.0
|
|
p_grams = [tuple(p_tokens[i : i + n]) for i in range(len(p_tokens) - n + 1)]
|
|
hits = sum(1 for g in p_grams if g in s_grams)
|
|
return hits / len(p_grams)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LLM prompt + call
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PROMPT_TEMPLATE = """Du bist Compliance-Engineer bei BreakPilot. Schreibe eine eigenständige Anforderung im Stil einer technischen Kontroll-Spezifikation.
|
|
|
|
Quelle: BSI QUAIDAL Sektion {entry_id} ("{title_de}"). Die Quelle steht unter unklarer Lizenz (BSI-Veröffentlichung, § 5 UrhG anwendbar) — wir dürfen die Idee aufgreifen, aber NICHT abschreiben.
|
|
|
|
Aufgabe: Formuliere eine eigenständige Anforderung im Stil eines {role}. Anforderungen:
|
|
- Eigene Formulierung in deutscher Sprache. Kein Satz darf aus der Quelle übernommen werden, auch nicht teilweise. Synonyme verwenden, Satzbau ändern, Inhalt strukturell anders aufbauen.
|
|
- 2-4 Sätze (max 80 Wörter).
|
|
- Sprachstil: nüchtern, technisch, normativ ("muss", "ist sicherzustellen", "ist zu prüfen").
|
|
- Bezug auf KI-Trainingsdaten oder KI-Datenqualität, je nach Quelle.
|
|
- Nicht die wörtlichen BSI-Beispiele kopieren.
|
|
|
|
Quellauszug (NUR zur Orientierung, NICHT abschreiben):
|
|
---
|
|
shortdesc: {shortdesc}
|
|
|
|
{description_excerpt}
|
|
---
|
|
|
|
Antwort: Liefere AUSSCHLIESSLICH die fertige Beschreibung als reinen Text — kein JSON, keine Überschriften, keine Anführungszeichen, keine Quellenangabe."""
|
|
|
|
|
|
def call_ollama(prompt: str, ollama_url: str, model: str, retries: int = 2) -> str:
|
|
last_err = None
|
|
for attempt in range(retries + 1):
|
|
try:
|
|
resp = httpx.post(
|
|
f"{ollama_url}/api/chat",
|
|
json={
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"stream": False,
|
|
"options": {"temperature": 0.4},
|
|
"think": False,
|
|
},
|
|
timeout=180.0,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["message"]["content"].strip()
|
|
except (httpx.HTTPError, KeyError, ValueError) as e:
|
|
last_err = e
|
|
if attempt < retries:
|
|
time.sleep(2 ** attempt)
|
|
raise RuntimeError(f"Ollama call failed after {retries+1} attempts: {last_err}")
|
|
|
|
|
|
def strip_llm_artifacts(text: str) -> str:
|
|
"""Clean leading/trailing markdown and quotes from LLM output."""
|
|
text = text.strip()
|
|
# Strip surrounding code fences
|
|
if text.startswith("```"):
|
|
text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
|
|
text = re.sub(r"\n?```\s*$", "", text)
|
|
# Strip surrounding quotes
|
|
text = text.strip('"„"”„')
|
|
# Drop a leading "Beschreibung:" or similar label
|
|
text = re.sub(r"^(Beschreibung|Description|Anforderung|Control):\s*", "", text, flags=re.IGNORECASE)
|
|
return text.strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Derivation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class DerivedControl:
|
|
derived_id: str
|
|
source_id: str
|
|
kind: str
|
|
canonical_name: str
|
|
description: str
|
|
plagiarism_score: float
|
|
related_quaidal_ids: list[str]
|
|
external_refs: list[dict]
|
|
source: dict
|
|
|
|
|
|
_ASCII_FOLD = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "Ä": "ae", "Ö": "oe", "Ü": "ue", "ß": "ss"})
|
|
|
|
|
|
def slug(text: str) -> str:
|
|
text = text.translate(_ASCII_FOLD).lower()
|
|
text = re.sub(r"[^a-z0-9]+", "-", text)
|
|
return text.strip("-")
|
|
|
|
|
|
def derived_id_for(entry: dict) -> str:
|
|
prefix = {
|
|
"criterion": "MC-AI-DATA",
|
|
"building_block": "AC-AI-DATA",
|
|
"measure": "MIT-AI-DATA",
|
|
"metric": "MET-AI-DATA",
|
|
}.get(entry["kind"], "X-AI-DATA")
|
|
title = entry["title_de"]
|
|
title = re.sub(r"^\s*(QKB|QB|MA|QM)-\d+[a-zA-Z]?\s*", "", title)
|
|
return f"{prefix}-{entry['id']}-{slug(title)[:40]}".rstrip("-")
|
|
|
|
|
|
def derive_one(entry: dict, source_extract: dict, ollama_url: str, model: str, *, verbose: bool = False) -> DerivedControl:
|
|
role = KIND_TO_PROMPT_ROLE.get(entry["kind"], "Control")
|
|
prompt = PROMPT_TEMPLATE.format(
|
|
entry_id=entry["id"],
|
|
title_de=entry["title_de"],
|
|
role=role,
|
|
shortdesc=source_extract["shortdesc"] or "(keiner)",
|
|
description_excerpt=source_extract["description_excerpt"] or "(keine Beschreibung)",
|
|
)
|
|
|
|
source_corpus = "\n\n".join(filter(None, [source_extract["shortdesc"], source_extract["description_excerpt"]]))
|
|
|
|
best: tuple[str, float] | None = None
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
output = call_ollama(prompt, ollama_url, model)
|
|
output = strip_llm_artifacts(output)
|
|
score = ngram_overlap(output, source_corpus)
|
|
if verbose:
|
|
print(f" attempt {attempt}: overlap={score:.2%} len={len(output)}", file=sys.stderr)
|
|
if score < PLAGIARISM_LIMIT:
|
|
best = (output, score)
|
|
break
|
|
if best is None or score < best[1]:
|
|
best = (output, score)
|
|
# Strengthen the next prompt by appending a reject notice
|
|
prompt += f"\n\n(Vorheriger Versuch hatte {score:.0%} Wortdeckung mit der Quelle. Verwende völlig andere Begriffe und Satzstruktur.)"
|
|
|
|
if best is None:
|
|
raise RuntimeError(f"Could not derive {entry['id']}: no output")
|
|
output, score = best
|
|
if score >= PLAGIARISM_LIMIT:
|
|
raise RuntimeError(
|
|
f"Plagiarism gate failed for {entry['id']}: best overlap {score:.2%} >= limit {PLAGIARISM_LIMIT:.0%}.\n"
|
|
f"Output:\n{output}"
|
|
)
|
|
|
|
title_de_clean = re.sub(r"^\s*(QKB|QB|MA|QM)-\d+[a-zA-Z]?\s*", "", entry["title_de"]).strip()
|
|
return DerivedControl(
|
|
derived_id=derived_id_for(entry),
|
|
source_id=entry["id"],
|
|
kind=entry["kind"],
|
|
canonical_name=title_de_clean or entry["title_de"],
|
|
description=output,
|
|
plagiarism_score=round(score, 4),
|
|
related_quaidal_ids=entry["referenced_ids"],
|
|
external_refs=entry["external_refs"],
|
|
source={
|
|
"framework": "BSI QUAIDAL",
|
|
"section": entry["id"],
|
|
"title_original_de": entry["title_de"],
|
|
"url": f"{QUAIDAL_REPO_URL}/blob/main/{entry['source_path'].replace(' ', '%20')}",
|
|
"commit_sha": None, # filled in by main()
|
|
"license_note": "§ 5 UrhG anwendbar; share:true im Frontmatter; Clean-Room-Ableitung.",
|
|
},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output writers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def control_to_dict(c: DerivedControl) -> dict:
|
|
d = {
|
|
"id": c.derived_id,
|
|
"canonical_name": c.canonical_name,
|
|
"description": c.description,
|
|
"kind": c.kind,
|
|
"regulation_anchor": "EU AI Act Art. 10 (Datenqualität für Hochrisiko-KI)",
|
|
"related_quaidal_ids": c.related_quaidal_ids,
|
|
"external_refs": c.external_refs,
|
|
"source": c.source,
|
|
"plagiarism_score_at_generation": c.plagiarism_score,
|
|
}
|
|
return d
|
|
|
|
|
|
def write_yaml_per_kind(controls: list[DerivedControl], commit_sha: str | None) -> dict[str, Path]:
|
|
out: dict[str, list[dict]] = {}
|
|
for c in controls:
|
|
c.source["commit_sha"] = commit_sha
|
|
fname = KIND_TO_OUTPUT_FILE.get(c.kind, "other.yaml")
|
|
out.setdefault(fname, []).append(control_to_dict(c))
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
written: dict[str, Path] = {}
|
|
for fname, items in out.items():
|
|
path = OUTPUT_DIR / fname
|
|
payload = {
|
|
"source": "Derived from BSI QUAIDAL (Clean-Room)",
|
|
"source_url": QUAIDAL_REPO_URL,
|
|
"commit_sha": commit_sha,
|
|
"plagiarism_limit_4gram": PLAGIARISM_LIMIT,
|
|
"generated_by_model": OLLAMA_MODEL,
|
|
"controls": items,
|
|
}
|
|
path.write_text(yaml.safe_dump(payload, allow_unicode=True, sort_keys=False), encoding="utf-8")
|
|
written[fname] = path
|
|
return written
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--only", help="Derive only this QUAIDAL ID (e.g. QKB-01)")
|
|
ap.add_argument("--kind", help="Derive only entries of this kind (criterion/building_block/measure/metric)")
|
|
ap.add_argument("--limit", type=int, help="Process at most N entries")
|
|
ap.add_argument("--dry-run", action="store_true", help="Print derived controls instead of writing YAML")
|
|
ap.add_argument("--ollama-host", default="macmini", help="Ollama host (default: macmini)")
|
|
ap.add_argument("--model", default=OLLAMA_MODEL)
|
|
ap.add_argument("--verbose", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
if not INDEX_FILE.exists():
|
|
print(f"ERROR: missing index. Run ingest_bsi_quaidal.py first ({INDEX_FILE})", file=sys.stderr)
|
|
return 2
|
|
index = json.loads(INDEX_FILE.read_text(encoding="utf-8"))
|
|
entries = index["entries"]
|
|
if args.only:
|
|
entries = [e for e in entries if e["id"].upper() == args.only.upper()]
|
|
if args.kind:
|
|
entries = [e for e in entries if e["kind"] == args.kind]
|
|
if args.limit:
|
|
entries = entries[: args.limit]
|
|
|
|
if not entries:
|
|
print("No entries match the filter.", file=sys.stderr)
|
|
return 1
|
|
|
|
ollama_url = args.ollama_host if "://" in args.ollama_host else f"http://{args.ollama_host}:11434"
|
|
print(f"Derivation: {len(entries)} entries, model={args.model}, ollama={ollama_url}, limit={PLAGIARISM_LIMIT:.0%}", file=sys.stderr)
|
|
|
|
derived: list[DerivedControl] = []
|
|
failed: list[tuple[str, str]] = []
|
|
for i, entry in enumerate(entries, 1):
|
|
if args.verbose:
|
|
print(f"[{i}/{len(entries)}] {entry['id']} ({entry['kind']}): {entry['title_de']}", file=sys.stderr)
|
|
try:
|
|
extract = load_source_extract(entry["source_path"])
|
|
ctrl = derive_one(entry, extract, ollama_url, args.model, verbose=args.verbose)
|
|
derived.append(ctrl)
|
|
except Exception as exc: # noqa: BLE001
|
|
failed.append((entry["id"], str(exc)))
|
|
print(f" FAILED {entry['id']}: {exc}", file=sys.stderr)
|
|
|
|
print(f"\nDerived: {len(derived)} | Failed: {len(failed)}", file=sys.stderr)
|
|
|
|
if args.dry_run:
|
|
for c in derived:
|
|
c.source["commit_sha"] = index.get("commit_sha")
|
|
print(yaml.safe_dump(control_to_dict(c), allow_unicode=True, sort_keys=False))
|
|
print("---")
|
|
return 0 if not failed else 1
|
|
|
|
written = write_yaml_per_kind(derived, index.get("commit_sha"))
|
|
for fname, path in written.items():
|
|
print(f"Wrote {path.relative_to(REPO_ROOT)} ({sum(1 for c in derived if KIND_TO_OUTPUT_FILE[c.kind] == fname)} entries)", file=sys.stderr)
|
|
|
|
if failed:
|
|
print("\nFailures:", file=sys.stderr)
|
|
for fid, msg in failed:
|
|
print(f" - {fid}: {msg.splitlines()[0]}", file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|