feat(pipeline): derive 1,874 doc_check_controls from Master Controls
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 45s
CI / test-python-voice (push) Successful in 44s
CI / test-bqas (push) Successful in 40s

8 document types: DSE (571), Cookie (381), Löschkonzept (309),
Widerrufsbelehrung (153), DSFA (147), AVV (125), AGB (113), Impressum (75).

Each control has binary check_question + pass_criteria + fail_criteria.
Derived via Claude Haiku from existing MCs filtered by regulation source.

Table: compliance.doc_check_controls (local + production synced)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-10 20:56:23 +02:00
parent 0bb9726ddd
commit 0c1561d6cc
@@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""
Derive doc_check_controls from existing Master Controls.
Filters MCs by document-relevant regulations, then uses Claude Haiku
to generate check_question + pass_criteria + fail_criteria per control.
Usage:
python3 /app/scripts/derive_doc_check_controls.py --dry-run
python3 /app/scripts/derive_doc_check_controls.py
"""
import argparse
import json
import logging
import os
import time
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("doc-check-derive")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_URL = "https://api.anthropic.com/v1/messages"
# Document types and their regulation sources
DOC_TYPES = {
"dse": {
"name": "Datenschutzinformation",
"sources": ["DSGVO (EU) 2016/679"],
"articles": ["%13%", "%14%"],
"extra_tokens": ["personal_data%", "data_subject_rights%", "consent%",
"data_processing_register%", "data_transfer%"],
},
"cookie": {
"name": "Cookie-Richtlinie",
"sources": ["TDDDG", "ePrivacy-Richtlinie"],
"articles": ["%25%", "%5%"],
"extra_tokens": ["cookie_consent%", "consent%"],
},
"impressum": {
"name": "Impressum",
"sources": ["TMG"],
"articles": ["%5%"],
"extra_tokens": ["ecommerce%"],
},
"widerruf": {
"name": "Widerrufsbelehrung",
"sources": ["BGB"],
"articles": ["%355%", "%312%"],
"extra_tokens": ["consumer_protection%"],
},
"agb": {
"name": "AGB",
"sources": ["BGB"],
"articles": ["%305%", "%307%", "%308%", "%309%"],
"extra_tokens": ["consumer_protection%"],
},
"dsfa": {
"name": "Datenschutz-Folgenabschaetzung",
"sources": ["DSGVO (EU) 2016/679"],
"articles": ["%35%"],
"extra_tokens": ["dpia%"],
},
"avv": {
"name": "Auftragsverarbeitung",
"sources": ["DSGVO (EU) 2016/679"],
"articles": ["%28%"],
"extra_tokens": ["data_processing_agreement%"],
},
"loeschkonzept": {
"name": "Loeschkonzept",
"sources": ["DSGVO (EU) 2016/679"],
"articles": ["%5%", "%17%"],
"extra_tokens": ["data_retention%"],
},
}
SYSTEM_PROMPT = """Du erzeugst binäre Prüfkriterien für Compliance-Dokumente.
Für jeden Control erzeugst du:
1. check_question: Eine JA/NEIN Frage die ein LLM anhand eines Dokuments beantworten kann
2. pass_criteria: Konkrete Textinhalte die vorhanden sein MÜSSEN (3-5 Stück)
3. fail_criteria: Typische Fehler/Mängel (2-3 Stück)
4. severity: HIGH, MEDIUM oder LOW
REGELN:
- check_question muss BINÄR beantwortbar sein (nicht "wie gut")
- pass_criteria müssen KONKRET sein ("Name + Rechtsform + Anschrift", nicht "Angaben")
- fail_criteria müssen TYPISCHE Fehler beschreiben
- Alles auf Deutsch
Antworte als JSON-Array:
[{"id":"...","check_question":"...","pass_criteria":["..."],"fail_criteria":["..."],"severity":"HIGH"}]"""
def get_doc_controls(engine, doc_type: str, config: dict) -> list[dict]:
"""Get controls relevant for a document type."""
controls = []
# Strategy 1: By source + article
for source in config["sources"]:
for article in config["articles"]:
with engine.connect() as c:
rows = c.execute(text("""
SELECT cc.id, cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective,
pc.source_citation->>'article' as article
FROM compliance.canonical_controls cc
JOIN compliance.canonical_controls pc ON pc.id = cc.parent_control_uuid
WHERE pc.source_citation->>'source' = :source
AND pc.source_citation->>'article' LIKE :article
AND cc.release_state NOT IN ('deprecated', 'rejected')
LIMIT 200
"""), {"source": source, "article": article}).fetchall()
for r in rows:
controls.append({
"uuid": str(r[0]), "control_id": r[1],
"title": r[2] or "", "objective": r[3] or "",
"article": r[4] or "", "doc_type": doc_type,
})
# Strategy 2: By MC canonical_name
for token_pattern in config.get("extra_tokens", []):
with engine.connect() as c:
rows = c.execute(text("""
SELECT cc.id, cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective
FROM compliance.master_controls mc
JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id
JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid
WHERE mc.canonical_name LIKE :pattern
AND cc.release_state NOT IN ('deprecated', 'rejected')
LIMIT 100
"""), {"pattern": token_pattern}).fetchall()
for r in rows:
controls.append({
"uuid": str(r[0]), "control_id": r[1],
"title": r[2] or "", "objective": r[3] or "",
"article": "", "doc_type": doc_type,
})
# Deduplicate
seen = set()
unique = []
for c in controls:
if c["control_id"] not in seen:
seen.add(c["control_id"])
unique.append(c)
return unique
def enrich_with_llm(controls: list[dict], doc_type_name: str) -> list[dict]:
"""Add check_question, pass/fail_criteria via Haiku."""
enriched = []
batch_size = 5
for i in range(0, len(controls), batch_size):
batch = controls[i:i + batch_size]
items = [
f'- id="{c["control_id"]}" doc="{doc_type_name}" '
f't="{c["title"]}" o="{c["objective"][:100]}"'
for c in batch
]
prompt = (
f"Dokumenttyp: {doc_type_name}\n"
f"Erzeuge Prüfkriterien:\n" + "\n".join(items)
)
try:
resp = httpx.post(ANTHROPIC_URL, headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}, json={
"model": "claude-haiku-4-5-20251001",
"max_tokens": 2000, "temperature": 0.1,
"system": SYSTEM_PROMPT,
"messages": [{"role": "user", "content": prompt}],
}, timeout=45.0)
resp.raise_for_status()
content = resp.json().get("content", [{}])[0].get("text", "")
start = content.find("[")
end = content.rfind("]") + 1
if start >= 0 and end > start:
results = json.loads(content[start:end])
result_map = {r.get("id", ""): r for r in results}
for ctrl in batch:
r = result_map.get(ctrl["control_id"], {})
if r.get("check_question"):
ctrl["check_question"] = r["check_question"]
ctrl["pass_criteria"] = r.get("pass_criteria", [])
ctrl["fail_criteria"] = r.get("fail_criteria", [])
ctrl["severity"] = r.get("severity", "MEDIUM")
enriched.append(ctrl)
except Exception as e:
logger.error("Batch %d failed: %s", i, e)
time.sleep(0.5)
return enriched
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--doc-type", choices=list(DOC_TYPES.keys()),
help="Only one doc type")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Create table
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
c.execute(text("""
CREATE TABLE IF NOT EXISTS doc_check_controls (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
control_id VARCHAR(500) NOT NULL,
control_uuid UUID,
doc_type VARCHAR(50) NOT NULL,
title VARCHAR(500),
regulation VARCHAR(200),
article VARCHAR(100),
check_question TEXT NOT NULL,
pass_criteria JSONB DEFAULT '[]',
fail_criteria JSONB DEFAULT '[]',
severity VARCHAR(20) DEFAULT 'MEDIUM',
created_at TIMESTAMPTZ DEFAULT NOW()
)
"""))
c.execute(text("""
CREATE INDEX IF NOT EXISTS idx_doc_check_doc_type
ON doc_check_controls(doc_type)
"""))
doc_types = [args.doc_type] if args.doc_type else list(DOC_TYPES.keys())
all_checks = []
for dt in doc_types:
config = DOC_TYPES[dt]
logger.info("\n=== %s (%s) ===", dt, config["name"])
controls = get_doc_controls(engine, dt, config)
logger.info("Found %d relevant controls", len(controls))
if not controls:
continue
enriched = enrich_with_llm(controls, config["name"])
logger.info("Enriched %d with check criteria", len(enriched))
all_checks.extend(enriched)
logger.info("\nTotal: %d doc_check_controls across %d doc types",
len(all_checks), len(doc_types))
if args.dry_run:
for dc in all_checks[:5]:
logger.info(" [%s] %s: %s", dc["doc_type"], dc["control_id"],
dc.get("check_question", "?")[:80])
logger.info("DRY RUN — not writing")
return
# Write to DB
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
c.execute(text("DELETE FROM doc_check_controls"))
for dc in all_checks:
c.execute(text("""
INSERT INTO doc_check_controls
(control_id, control_uuid, doc_type, title,
check_question, pass_criteria, fail_criteria, severity)
VALUES (:cid, CAST(:uuid AS uuid), :doc_type, :title,
:question, CAST(:pass AS jsonb),
CAST(:fail AS jsonb), :severity)
"""), {
"cid": dc["control_id"],
"uuid": dc["uuid"],
"doc_type": dc["doc_type"],
"title": dc["title"],
"question": dc.get("check_question", ""),
"pass": json.dumps(dc.get("pass_criteria", [])),
"fail": json.dumps(dc.get("fail_criteria", [])),
"severity": dc.get("severity", "MEDIUM"),
})
logger.info("Wrote %d doc_check_controls to DB", len(all_checks))
# Save as JSON too
Path("/tmp/doc_check_controls.json").write_text(
json.dumps(all_checks, indent=2, ensure_ascii=False)
)
logger.info("Saved to /tmp/doc_check_controls.json")
if __name__ == "__main__":
main()