diff --git a/control-pipeline/scripts/derive_doc_check_controls.py b/control-pipeline/scripts/derive_doc_check_controls.py new file mode 100644 index 0000000..1139924 --- /dev/null +++ b/control-pipeline/scripts/derive_doc_check_controls.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +Derive doc_check_controls from existing Master Controls. + +Filters MCs by document-relevant regulations, then uses Claude Haiku +to generate check_question + pass_criteria + fail_criteria per control. + +Usage: + python3 /app/scripts/derive_doc_check_controls.py --dry-run + python3 /app/scripts/derive_doc_check_controls.py +""" + +import argparse +import json +import logging +import os +import time +from pathlib import Path + +import httpx +from sqlalchemy import create_engine, text + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger("doc-check-derive") + +DB_URL = os.getenv( + "DATABASE_URL", + "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db", +) +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" + +# Document types and their regulation sources +DOC_TYPES = { + "dse": { + "name": "Datenschutzinformation", + "sources": ["DSGVO (EU) 2016/679"], + "articles": ["%13%", "%14%"], + "extra_tokens": ["personal_data%", "data_subject_rights%", "consent%", + "data_processing_register%", "data_transfer%"], + }, + "cookie": { + "name": "Cookie-Richtlinie", + "sources": ["TDDDG", "ePrivacy-Richtlinie"], + "articles": ["%25%", "%5%"], + "extra_tokens": ["cookie_consent%", "consent%"], + }, + "impressum": { + "name": "Impressum", + "sources": ["TMG"], + "articles": ["%5%"], + "extra_tokens": ["ecommerce%"], + }, + "widerruf": { + "name": "Widerrufsbelehrung", + "sources": ["BGB"], + "articles": ["%355%", "%312%"], + "extra_tokens": ["consumer_protection%"], + }, + "agb": { + "name": "AGB", + "sources": ["BGB"], + "articles": ["%305%", "%307%", "%308%", "%309%"], + "extra_tokens": ["consumer_protection%"], + }, + "dsfa": { + "name": "Datenschutz-Folgenabschaetzung", + "sources": ["DSGVO (EU) 2016/679"], + "articles": ["%35%"], + "extra_tokens": ["dpia%"], + }, + "avv": { + "name": "Auftragsverarbeitung", + "sources": ["DSGVO (EU) 2016/679"], + "articles": ["%28%"], + "extra_tokens": ["data_processing_agreement%"], + }, + "loeschkonzept": { + "name": "Loeschkonzept", + "sources": ["DSGVO (EU) 2016/679"], + "articles": ["%5%", "%17%"], + "extra_tokens": ["data_retention%"], + }, +} + +SYSTEM_PROMPT = """Du erzeugst binäre Prüfkriterien für Compliance-Dokumente. + +Für jeden Control erzeugst du: +1. check_question: Eine JA/NEIN Frage die ein LLM anhand eines Dokuments beantworten kann +2. pass_criteria: Konkrete Textinhalte die vorhanden sein MÜSSEN (3-5 Stück) +3. fail_criteria: Typische Fehler/Mängel (2-3 Stück) +4. severity: HIGH, MEDIUM oder LOW + +REGELN: +- check_question muss BINÄR beantwortbar sein (nicht "wie gut") +- pass_criteria müssen KONKRET sein ("Name + Rechtsform + Anschrift", nicht "Angaben") +- fail_criteria müssen TYPISCHE Fehler beschreiben +- Alles auf Deutsch + +Antworte als JSON-Array: +[{"id":"...","check_question":"...","pass_criteria":["..."],"fail_criteria":["..."],"severity":"HIGH"}]""" + + +def get_doc_controls(engine, doc_type: str, config: dict) -> list[dict]: + """Get controls relevant for a document type.""" + controls = [] + + # Strategy 1: By source + article + for source in config["sources"]: + for article in config["articles"]: + with engine.connect() as c: + rows = c.execute(text(""" + SELECT cc.id, cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective, + pc.source_citation->>'article' as article + FROM compliance.canonical_controls cc + JOIN compliance.canonical_controls pc ON pc.id = cc.parent_control_uuid + WHERE pc.source_citation->>'source' = :source + AND pc.source_citation->>'article' LIKE :article + AND cc.release_state NOT IN ('deprecated', 'rejected') + LIMIT 200 + """), {"source": source, "article": article}).fetchall() + for r in rows: + controls.append({ + "uuid": str(r[0]), "control_id": r[1], + "title": r[2] or "", "objective": r[3] or "", + "article": r[4] or "", "doc_type": doc_type, + }) + + # Strategy 2: By MC canonical_name + for token_pattern in config.get("extra_tokens", []): + with engine.connect() as c: + rows = c.execute(text(""" + SELECT cc.id, cc.control_id, cc.title, + COALESCE(cc.objective, '') as objective + FROM compliance.master_controls mc + JOIN compliance.master_control_members mcm ON mcm.master_control_uuid = mc.id + JOIN compliance.canonical_controls cc ON cc.id = mcm.control_uuid + WHERE mc.canonical_name LIKE :pattern + AND cc.release_state NOT IN ('deprecated', 'rejected') + LIMIT 100 + """), {"pattern": token_pattern}).fetchall() + for r in rows: + controls.append({ + "uuid": str(r[0]), "control_id": r[1], + "title": r[2] or "", "objective": r[3] or "", + "article": "", "doc_type": doc_type, + }) + + # Deduplicate + seen = set() + unique = [] + for c in controls: + if c["control_id"] not in seen: + seen.add(c["control_id"]) + unique.append(c) + + return unique + + +def enrich_with_llm(controls: list[dict], doc_type_name: str) -> list[dict]: + """Add check_question, pass/fail_criteria via Haiku.""" + enriched = [] + batch_size = 5 + + for i in range(0, len(controls), batch_size): + batch = controls[i:i + batch_size] + items = [ + f'- id="{c["control_id"]}" doc="{doc_type_name}" ' + f't="{c["title"]}" o="{c["objective"][:100]}"' + for c in batch + ] + + prompt = ( + f"Dokumenttyp: {doc_type_name}\n" + f"Erzeuge Prüfkriterien:\n" + "\n".join(items) + ) + + try: + resp = httpx.post(ANTHROPIC_URL, headers={ + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, json={ + "model": "claude-haiku-4-5-20251001", + "max_tokens": 2000, "temperature": 0.1, + "system": SYSTEM_PROMPT, + "messages": [{"role": "user", "content": prompt}], + }, timeout=45.0) + resp.raise_for_status() + content = resp.json().get("content", [{}])[0].get("text", "") + start = content.find("[") + end = content.rfind("]") + 1 + if start >= 0 and end > start: + results = json.loads(content[start:end]) + result_map = {r.get("id", ""): r for r in results} + for ctrl in batch: + r = result_map.get(ctrl["control_id"], {}) + if r.get("check_question"): + ctrl["check_question"] = r["check_question"] + ctrl["pass_criteria"] = r.get("pass_criteria", []) + ctrl["fail_criteria"] = r.get("fail_criteria", []) + ctrl["severity"] = r.get("severity", "MEDIUM") + enriched.append(ctrl) + except Exception as e: + logger.error("Batch %d failed: %s", i, e) + + time.sleep(0.5) + + return enriched + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--doc-type", choices=list(DOC_TYPES.keys()), + help="Only one doc type") + args = parser.parse_args() + + engine = create_engine( + DB_URL, connect_args={"options": "-c search_path=compliance,public"} + ) + + # Create table + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + c.execute(text(""" + CREATE TABLE IF NOT EXISTS doc_check_controls ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + control_id VARCHAR(500) NOT NULL, + control_uuid UUID, + doc_type VARCHAR(50) NOT NULL, + title VARCHAR(500), + regulation VARCHAR(200), + article VARCHAR(100), + check_question TEXT NOT NULL, + pass_criteria JSONB DEFAULT '[]', + fail_criteria JSONB DEFAULT '[]', + severity VARCHAR(20) DEFAULT 'MEDIUM', + created_at TIMESTAMPTZ DEFAULT NOW() + ) + """)) + c.execute(text(""" + CREATE INDEX IF NOT EXISTS idx_doc_check_doc_type + ON doc_check_controls(doc_type) + """)) + + doc_types = [args.doc_type] if args.doc_type else list(DOC_TYPES.keys()) + all_checks = [] + + for dt in doc_types: + config = DOC_TYPES[dt] + logger.info("\n=== %s (%s) ===", dt, config["name"]) + + controls = get_doc_controls(engine, dt, config) + logger.info("Found %d relevant controls", len(controls)) + + if not controls: + continue + + enriched = enrich_with_llm(controls, config["name"]) + logger.info("Enriched %d with check criteria", len(enriched)) + all_checks.extend(enriched) + + logger.info("\nTotal: %d doc_check_controls across %d doc types", + len(all_checks), len(doc_types)) + + if args.dry_run: + for dc in all_checks[:5]: + logger.info(" [%s] %s: %s", dc["doc_type"], dc["control_id"], + dc.get("check_question", "?")[:80]) + logger.info("DRY RUN — not writing") + return + + # Write to DB + with engine.begin() as c: + c.execute(text("SET search_path TO compliance, public")) + c.execute(text("DELETE FROM doc_check_controls")) + for dc in all_checks: + c.execute(text(""" + INSERT INTO doc_check_controls + (control_id, control_uuid, doc_type, title, + check_question, pass_criteria, fail_criteria, severity) + VALUES (:cid, CAST(:uuid AS uuid), :doc_type, :title, + :question, CAST(:pass AS jsonb), + CAST(:fail AS jsonb), :severity) + """), { + "cid": dc["control_id"], + "uuid": dc["uuid"], + "doc_type": dc["doc_type"], + "title": dc["title"], + "question": dc.get("check_question", ""), + "pass": json.dumps(dc.get("pass_criteria", [])), + "fail": json.dumps(dc.get("fail_criteria", [])), + "severity": dc.get("severity", "MEDIUM"), + }) + + logger.info("Wrote %d doc_check_controls to DB", len(all_checks)) + + # Save as JSON too + Path("/tmp/doc_check_controls.json").write_text( + json.dumps(all_checks, indent=2, ensure_ascii=False) + ) + logger.info("Saved to /tmp/doc_check_controls.json") + + +if __name__ == "__main__": + main()