Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 31s
CI/CD / test-python-backend-compliance (push) Successful in 1m35s
CI/CD / test-python-document-crawler (push) Successful in 20s
CI/CD / test-python-dsms-gateway (push) Successful in 17s
CI/CD / validate-canonical-controls (push) Successful in 10s
CI/CD / Deploy (push) Has been skipped
- Control Library: parent control display, ObligationTypeBadge, GenerationStrategyBadge variants, evidence string fallback - API: expose parent_control_uuid/id/title in canonical controls - Fix: DSFA SQLAlchemy 2.0 Row._mapping compatibility - Migration 074: control_parent_links + control_dedup_reviews tables - QA scripts: benchmark, gap analysis, OSCAL import, OWASP cleanup, phase5 normalize, phase74 gap fill, sync_db, run_job - Docs: dedup engine, RAG benchmark, lessons learned, pipeline docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
309 lines
10 KiB
Python
309 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Preview Pass 0b: Turn obligation candidates into atomic controls.
|
|
|
|
Picks a few obligations from Pass 0a results, calls LLM to compose
|
|
atomic controls, and writes them to canonical_controls with parent_control_uuid.
|
|
|
|
Usage:
|
|
python3 test_pass0b_preview.py --input /tmp/pass0a_results_60controls.json --limit 3
|
|
"""
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import urllib.parse
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
# Register JSON adapter
|
|
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
|
|
|
|
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
|
|
ANTHROPIC_MODEL = os.environ.get("DECOMPOSITION_LLM_MODEL", "claude-sonnet-4-6")
|
|
|
|
SYSTEM_PROMPT = """\
|
|
Du bist ein Security-Compliance-Experte. Du erstellst aus einer einzelnen \
|
|
normativen Pflicht ein praxisorientiertes, atomares Security Control.
|
|
|
|
Das Control muss UMSETZBAR sein — keine Gesetzesparaphrase.
|
|
Antworte NUR als JSON. Keine Erklärungen."""
|
|
|
|
|
|
def build_pass0b_prompt(obl_text, action, obj, parent_title, category, source_ref):
|
|
return f"""\
|
|
Erstelle aus der folgenden Pflicht ein atomares Control.
|
|
|
|
PFLICHT: {obl_text}
|
|
HANDLUNG: {action}
|
|
GEGENSTAND: {obj}
|
|
|
|
KONTEXT (Ursprungs-Control):
|
|
Titel: {parent_title}
|
|
Kategorie: {category}
|
|
Quellreferenz: {source_ref}
|
|
|
|
Antworte als JSON:
|
|
{{
|
|
"title": "Kurzer Titel (max 80 Zeichen, deutsch)",
|
|
"objective": "Was muss erreicht werden? (1-2 Sätze)",
|
|
"requirements": ["Konkrete Anforderung 1", "Anforderung 2"],
|
|
"test_procedure": ["Prüfschritt 1", "Prüfschritt 2"],
|
|
"evidence": ["Nachweis 1", "Nachweis 2"],
|
|
"severity": "critical|high|medium|low",
|
|
"category": "security|privacy|governance|operations|finance|reporting"
|
|
}}"""
|
|
|
|
|
|
def call_anthropic(prompt):
|
|
headers = {
|
|
"x-api-key": ANTHROPIC_API_KEY,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
}
|
|
payload = {
|
|
"model": ANTHROPIC_MODEL,
|
|
"max_tokens": 4096,
|
|
"system": [{"type": "text", "text": SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"}}],
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
}
|
|
resp = requests.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload, timeout=120)
|
|
if resp.status_code != 200:
|
|
return None, {}, f"HTTP {resp.status_code}: {resp.text[:200]}"
|
|
data = resp.json()
|
|
text = data.get("content", [{}])[0].get("text", "")
|
|
return text, data.get("usage", {}), None
|
|
|
|
|
|
def parse_json_object(text):
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
match = re.search(r"\{[\s\S]*\}", text)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def generate_control_id(domain, cur):
|
|
prefix = domain.upper()[:4]
|
|
cur.execute("""
|
|
SELECT MAX(CAST(SPLIT_PART(control_id, '-', 2) AS INTEGER))
|
|
FROM compliance.canonical_controls
|
|
WHERE control_id LIKE %s
|
|
AND SPLIT_PART(control_id, '-', 2) ~ '^[0-9]+$'
|
|
""", (f"{prefix}-%",))
|
|
row = cur.fetchone()
|
|
if row and row[0] is not None:
|
|
return f"{prefix}-{row[0] + 1}"
|
|
return f"{prefix}-001"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--input", default="/tmp/pass0a_results_60controls.json")
|
|
parser.add_argument("--limit", type=int, default=3, help="Number of obligations to process")
|
|
parser.add_argument("--control", type=str, help="Pick obligations from this control_id")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
if not ANTHROPIC_API_KEY and not args.dry_run:
|
|
print("ERROR: Set ANTHROPIC_API_KEY")
|
|
sys.exit(1)
|
|
|
|
# Load 0a results
|
|
with open(args.input) as f:
|
|
obligations = json.load(f)
|
|
|
|
# Filter: only passed, pflicht or empfehlung
|
|
obligations = [o for o in obligations if o.get("passed", False)]
|
|
|
|
if args.control:
|
|
obligations = [o for o in obligations if o["control_id"] == args.control]
|
|
|
|
# Pick diverse sample
|
|
picked = []
|
|
seen_types = set()
|
|
for o in obligations:
|
|
otype = o["obligation_type"]
|
|
if otype not in seen_types and len(picked) < args.limit:
|
|
picked.append(o)
|
|
seen_types.add(otype)
|
|
# Fill rest
|
|
for o in obligations:
|
|
if o not in picked and len(picked) < args.limit:
|
|
picked.append(o)
|
|
|
|
if not picked:
|
|
print("No obligations found.")
|
|
return
|
|
|
|
# Connect to DB
|
|
db_url = os.environ["DATABASE_URL"]
|
|
p = urllib.parse.urlparse(db_url)
|
|
conn = psycopg2.connect(
|
|
host=p.hostname, port=p.port or 5432,
|
|
user=p.username, password=p.password,
|
|
dbname=p.path.lstrip("/"),
|
|
options="-c search_path=compliance,public",
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
# Get parent control info
|
|
ctrl_ids = list(set(o["control_id"] for o in picked))
|
|
cur.execute("""
|
|
SELECT control_id, id, title, category, source_citation
|
|
FROM compliance.canonical_controls
|
|
WHERE control_id = ANY(%s)
|
|
""", (ctrl_ids,))
|
|
ctrl_map = {}
|
|
for row in cur.fetchall():
|
|
sc = row[4] if isinstance(row[4], dict) else (json.loads(row[4]) if row[4] else {})
|
|
# Derive domain prefix from control_id (e.g. "DSGV" from "DSGV-001")
|
|
prefix = row[0].split("-")[0] if "-" in row[0] else "COMP"
|
|
ctrl_map[row[0]] = {
|
|
"uuid": str(row[1]), "title": row[2], "category": row[3] or "",
|
|
"source_ref": f"{sc.get('source', '')} {sc.get('article', '')}",
|
|
"domain": prefix,
|
|
}
|
|
|
|
print("=" * 70)
|
|
print(f"Pass 0b Preview — {len(picked)} Obligations → Atomic Controls")
|
|
print("=" * 70)
|
|
|
|
created = []
|
|
for i, obl in enumerate(picked, 1):
|
|
ctrl = ctrl_map.get(obl["control_id"], {})
|
|
print(f"\n{'─'*70}")
|
|
print(f"[{i}/{len(picked)}] {obl['control_id']}: [{obl['obligation_type'].upper()}]")
|
|
print(f" Obligation: {obl['obligation_text'][:120]}")
|
|
print(f" Parent: {ctrl.get('title', 'N/A')}")
|
|
|
|
if args.dry_run:
|
|
print(" [DRY RUN]")
|
|
continue
|
|
|
|
prompt = build_pass0b_prompt(
|
|
obl["obligation_text"], obl["action"], obl["object"],
|
|
ctrl.get("title", ""), ctrl.get("category", ""),
|
|
ctrl.get("source_ref", ""),
|
|
)
|
|
|
|
t0 = time.time()
|
|
resp_text, usage, error = call_anthropic(prompt)
|
|
elapsed = time.time() - t0
|
|
|
|
if error:
|
|
print(f" ERROR: {error}")
|
|
continue
|
|
|
|
result = parse_json_object(resp_text)
|
|
if not result:
|
|
print(f" PARSE ERROR: {resp_text[:200]}")
|
|
continue
|
|
|
|
in_tok = usage.get("input_tokens", 0)
|
|
out_tok = usage.get("output_tokens", 0)
|
|
print(f" LLM: {elapsed:.1f}s | {in_tok} in / {out_tok} out")
|
|
|
|
# Generate control_id
|
|
domain = ctrl.get("domain", "COMP")
|
|
new_control_id = generate_control_id(domain, cur)
|
|
|
|
# Show result
|
|
print(f"\n === ATOMIC CONTROL: {new_control_id} ===")
|
|
print(f" Titel: {result.get('title', 'N/A')}")
|
|
print(f" Ziel: {result.get('objective', 'N/A')}")
|
|
print(f" Typ: {obl['obligation_type']}")
|
|
reqs = result.get("requirements", [])
|
|
if reqs:
|
|
print(f" Anforderungen:")
|
|
for r in reqs:
|
|
print(f" - {r}")
|
|
tests = result.get("test_procedure", [])
|
|
if tests:
|
|
print(f" Pruefverfahren:")
|
|
for t in tests:
|
|
print(f" - {t}")
|
|
evidence = result.get("evidence", [])
|
|
if evidence:
|
|
print(f" Nachweise:")
|
|
for e in evidence:
|
|
print(f" - {e}")
|
|
print(f" Severity: {result.get('severity', 'medium')}")
|
|
print(f" Category: {result.get('category', 'governance')}")
|
|
|
|
# Write to DB
|
|
new_uuid = str(uuid.uuid4())
|
|
parent_uuid = ctrl.get("uuid")
|
|
source_cit = {}
|
|
if ctrl.get("source_ref"):
|
|
parts = ctrl["source_ref"].strip().split(" ", 1)
|
|
source_cit = {"source": parts[0], "article": parts[1] if len(parts) > 1 else ""}
|
|
|
|
cur.execute("""
|
|
INSERT INTO compliance.canonical_controls (
|
|
id, control_id, title, objective, requirements, test_procedure,
|
|
evidence, severity, category, release_state,
|
|
source_citation, generation_metadata, generation_strategy,
|
|
pipeline_version, parent_control_uuid, framework_id
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s,
|
|
%s, %s,
|
|
(SELECT id FROM compliance.canonical_control_frameworks LIMIT 1)
|
|
)
|
|
""", (
|
|
new_uuid, new_control_id,
|
|
result.get("title", ""),
|
|
result.get("objective", ""),
|
|
json.dumps(result.get("requirements", []), ensure_ascii=False),
|
|
json.dumps(result.get("test_procedure", []), ensure_ascii=False),
|
|
json.dumps(result.get("evidence", []), ensure_ascii=False),
|
|
result.get("severity", "medium"),
|
|
result.get("category", "governance"),
|
|
"draft",
|
|
psycopg2.extras.Json(source_cit),
|
|
psycopg2.extras.Json({
|
|
"obligation_type": obl["obligation_type"],
|
|
"obligation_text": obl["obligation_text"],
|
|
"pass0b_model": ANTHROPIC_MODEL,
|
|
"decomposition_method": "pass0b_preview",
|
|
}),
|
|
"pass0b_atomic",
|
|
6, # pipeline_version
|
|
parent_uuid,
|
|
))
|
|
conn.commit()
|
|
|
|
created.append({
|
|
"control_id": new_control_id,
|
|
"title": result.get("title", ""),
|
|
"obligation_type": obl["obligation_type"],
|
|
"parent_control_id": obl["control_id"],
|
|
})
|
|
print(f" ✓ Geschrieben: {new_control_id} (parent: {obl['control_id']})")
|
|
|
|
time.sleep(0.5)
|
|
|
|
if created:
|
|
print(f"\n{'='*70}")
|
|
print(f"ERGEBNIS: {len(created)} atomare Controls erstellt")
|
|
print(f"{'='*70}")
|
|
for c in created:
|
|
print(f" {c['control_id']}: {c['title']} [{c['obligation_type']}] (von {c['parent_control_id']})")
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|