feat(pipeline): MC Quality Overhaul — 74.5% → 92.8% accuracy, 5.3K → 13.6K MCs

Phase 0: Quality Audit script (Claude Sonnet, 1750 samples)
Phase 1: Object ontology expanded 31 → 74 tokens with descriptions + boundaries
Phase 2: 174K controls re-classified via Haiku (10 batches, $50)
  - Generic tokens removed (documentation, procedure, process)
  - L2 sub-topics added (108K + 64K controls)
  - Bad subtopics fixed (stakeholder_*, escalation fragments)
Phase 3: Re-clustering K=18704 (37K objects → 16.7K groups)
Phase 4: Direct MC generation from canonical tokens (gpre2_direct_mc.py)
Phase 5: Regulation-source split (gpre3, dry-run tested)

New features:
- Tenant-isolated document upload API (rag-service)
- BAuA crawler (Playwright, 131 PDFs downloaded)
- OSHA Technical Manual crawler (23 chapters)
- CE obligation extractor (6141 obligations from Qdrant)

RAG ingestion:
- 126 BAuA PDFs (TRBS/TRGS/ASR): 27,664 chunks
- OSHA Technical Manual: 7,241 chunks
- OSHA 1910 Subpart O (full): 745 chunks
- EuGH C-588/21 P: 216 chunks
- EU 2018/1725: 842 chunks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-10 15:08:15 +02:00
parent 81db904b3e
commit 8510af46eb
19 changed files with 3173 additions and 6 deletions
@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""
Extract CE-relevant obligations from TRBS/TRGS/ASR/OSHA chunks in Qdrant.
Searches for MUSS/SOLL patterns in chunk texts and classifies them.
Output: JSON file with structured obligations for the CE session.
Usage:
python3 /app/scripts/extract_ce_obligations.py
python3 /app/scripts/extract_ce_obligations.py --output /tmp/ce_obligations.json
"""
import argparse
import json
import logging
import os
import re
from pathlib import Path
import httpx
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("ce-obligations")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
COLLECTION = "bp_compliance_ce"
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
LLM_MODEL = "qwen3.5:35b-a3b"
# Obligation patterns (DE + EN)
OBLIGATION_PATTERNS = re.compile(
r"(muss|müssen|hat\s+[\w\s]*zu\s|ist\s+[\w\s]*sicherzustellen|"
r"ist\s+verpflichtet|sind\s+verpflichtet|darf\s+nicht|"
r"shall|must|required\s+to|is\s+required|shall\s+not)",
re.IGNORECASE,
)
# CE relevance keywords
CE_KEYWORDS = re.compile(
r"(maschine|schutzeinrichtung|gefährdung|quetsch|scher|stoß|"
r"schneid|fang|einzug|absturz|druck|explosion|brand|"
r"elektrisch|spannung|erdung|schutzleiter|not-halt|"
r"betriebsanleitung|kennzeichnung|prüfung|prüfpflicht|"
r"instandhaltung|wartung|sicherheitsabstand|"
r"schutzmaßnahme|persönliche schutzausrüstung|psa|"
r"machine|guard|hazard|crush|shear|cut|entangle|"
r"lockout|tagout|electrical|grounding|emergency stop|"
r"safety distance|protective device|ppe|inspection)",
re.IGNORECASE,
)
HAZARD_CATEGORIES = {
"quetsch|crush|squeeze": "mechanical_crushing",
"schneid|cut": "mechanical_cutting",
"fang|einzug|entangle|draw": "mechanical_entanglement",
"absturz|fall": "fall_hazard",
"explosion|ex-bereich|atex": "explosion_hazard",
"brand|fire|feuer": "fire_hazard",
"elektrisch|electrical|spannung|voltage": "electrical_hazard",
"lärm|noise|schall": "noise_hazard",
"gefahrstoff|hazardous substance|chemical": "chemical_hazard",
"ergonomie|ergonomic|heben|lift": "ergonomic_hazard",
"temperatur|heat|hitze|kälte|cold": "thermal_hazard",
"strahlung|radiation|laser": "radiation_hazard",
"not-halt|emergency stop|e-stop": "emergency_stop",
"lockout|tagout|loto": "lockout_tagout",
"kennzeichnung|label|marking|sign": "safety_marking",
"prüfung|inspection|test": "inspection_requirement",
"instandhaltung|maintenance|wartung": "maintenance",
"schutzeinrichtung|guard|protective device": "protective_device",
"betriebsanleitung|instruction|manual": "operating_instructions",
"druck|pressure|behälter|vessel|kessel|boiler": "pressure_hazard",
}
# Source-based overrides: TRGS docs about chemicals/storage
# should never be classified as mechanical hazards
_CHEMICAL_SOURCES = re.compile(
r"trgs\s*(5[0-9]{2}|7[0-9]{2}|9[0-9]{2}|4[0-9]{2}|6[0-9]{2})",
re.IGNORECASE,
)
def _classify_hazard(text: str, source: str) -> str:
"""Classify hazard with source-aware overrides."""
# TRGS sources → chemical/pressure/explosion, never mechanical
if _CHEMICAL_SOURCES.search(source):
if re.search(r"explosion|ex-bereich|atex|zündfähig", text, re.IGNORECASE):
return "explosion_hazard"
if re.search(r"druck|pressure|behälter|vessel", text, re.IGNORECASE):
return "pressure_hazard"
if re.search(r"brand|fire|feuer", text, re.IGNORECASE):
return "fire_hazard"
return "chemical_hazard"
# Standard pattern matching (order matters — specific first)
for pattern, category in HAZARD_CATEGORIES.items():
if re.search(pattern, text, re.IGNORECASE):
return category
return "general"
def scroll_chunks(source_filter: str = None) -> list[dict]:
"""Scroll through Qdrant to get all relevant chunks."""
chunks = []
offset = None
batch = 100
while True:
scroll_body = {
"limit": batch,
"with_payload": True,
"with_vector": False,
}
if offset is not None:
scroll_body["offset"] = offset
resp = httpx.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll",
json=scroll_body,
timeout=30.0,
)
data = resp.json()
points = data.get("result", {}).get("points", [])
next_offset = data.get("result", {}).get("next_page_offset")
for pt in points:
payload = pt.get("payload", {})
source = payload.get("source", payload.get("filename", ""))
text = payload.get("chunk_text", "")
# Filter for TRBS/TRGS/ASR/OSHA
source_lower = source.lower()
is_relevant = any(k in source_lower for k in
["trbs", "trgs", "asr", "osha"])
if not is_relevant:
continue
# Check for obligation patterns
if not OBLIGATION_PATTERNS.search(text):
continue
# Check CE relevance
if not CE_KEYWORDS.search(text):
continue
# Classify hazard category (source-aware)
hazard = _classify_hazard(text, source)
# Determine obligation type
if re.search(r"muss|müssen|shall|must|required", text, re.IGNORECASE):
obl_type = "MUSS"
elif re.search(r"soll|sollte|should", text, re.IGNORECASE):
obl_type = "SOLL"
else:
obl_type = "MUSS"
chunks.append({
"source": source,
"section": payload.get("section", ""),
"paragraph": payload.get("paragraph", ""),
"obligation_text": text.strip()[:500],
"hazard_category": hazard,
"obligation_type": obl_type,
"ce_relevance": "high" if hazard != "general" else "medium",
"filename": payload.get("filename", ""),
})
if next_offset is None or not points:
break
offset = next_offset
if len(chunks) % 500 == 0:
logger.info(" Scanned... %d obligations found so far", len(chunks))
return chunks
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--output", default="/tmp/ce_obligations.json")
args = parser.parse_args()
logger.info("Scanning %s for CE obligations...", COLLECTION)
obligations = scroll_chunks()
logger.info("Found %d CE-relevant obligations", len(obligations))
# Stats
by_source = {}
by_hazard = {}
for o in obligations:
src = o["source"][:30]
by_source[src] = by_source.get(src, 0) + 1
by_hazard[o["hazard_category"]] = by_hazard.get(o["hazard_category"], 0) + 1
logger.info("\nBy source:")
for src, cnt in sorted(by_source.items(), key=lambda x: -x[1])[:20]:
logger.info(" %4d %s", cnt, src)
logger.info("\nBy hazard category:")
for cat, cnt in sorted(by_hazard.items(), key=lambda x: -x[1]):
logger.info(" %4d %s", cnt, cat)
# Save
Path(args.output).write_text(
json.dumps(obligations, indent=2, ensure_ascii=False)
)
logger.info("\nSaved to %s", args.output)
if __name__ == "__main__":
main()
@@ -0,0 +1,289 @@
#!/usr/bin/env python3
"""
Add L2 sub-topics to broad tokens. Instead of just "incident",
produces "incident:response", "incident:detection", etc.
Only processes tokens with >500 controls AND <90% audit accuracy.
Usage:
python3 /app/scripts/gpre0_add_subtopics.py --dry-run
python3 /app/scripts/gpre0_add_subtopics.py
"""
import argparse
import json
import logging
import os
import time
from collections import defaultdict
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("gpre0-subtopics")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = "claude-haiku-4-5-20251001"
ANTHROPIC_URL = "https://api.anthropic.com/v1/messages"
CHECKPOINT_DIR = Path("/tmp/gpre0_subtopic_checkpoints")
# Tokens that are too broad — need L2 sub-topics
BROAD_TOKENS = {
# Round 1 (already done)
"risk_management", "policy", "audit_logging", "incident",
"access_control", "compliance_audit", "asset_management",
"key_management", "third_party_management", "monitoring",
"financial_reporting", "data_classification", "change_management",
"alerting", "multi_factor_auth", "api_security",
"certificate_management", "human_resources_security",
"training", "data_processing_agreement", "data_processing_register",
"consumer_protection", "input_validation", "vulnerability",
"dpia", "data_breach_notification", "backup",
"supply_chain_due_diligence", "awareness",
"privacy_by_design", "credentials", "logging_configuration",
# Round 2 (remaining large tokens)
"supervisory_authority", "certification", "secure_development",
"product_safety", "personal_data", "data_subject_rights", "consent",
"ai_system", "encryption", "data_retention", "disaster_recovery",
"data_transfer", "aml", "transport_encryption", "network_security",
"physical_security", "medical_device", "patch_management",
"cookie_consent", "video_surveillance", "network_segmentation",
"telecommunications", "privileged_access", "session_management",
"password_policy", "governance", "whistleblowing", "payment_services",
"health_data", "sensitive_data", "ecommerce", "sustainability_reporting",
"critical_infrastructure", "regulatory",
}
SYSTEM_PROMPT = """Du bist ein Compliance-Spezialist. Jeder Control hat bereits ein Hauptthema (L1 Token).
Deine Aufgabe: Bestimme ein SPEZIFISCHES Sub-Thema (L2) innerhalb des Hauptthemas.
Das L2 Sub-Thema soll den KONKRETEN Aspekt beschreiben. Verwende kurze, klare englische Bezeichnungen.
Beispiele:
- L1=incident, Titel="Incident Response Plan erstellen" → L2="response_plan"
- L1=incident, Titel="Sicherheitsvorfälle erkennen" → L2="detection"
- L1=incident, Titel="Recovery nach Vorfall dokumentieren" → L2="recovery"
- L1=incident, Titel="Forensische Analyse durchführen" → L2="forensics"
- L1=risk_management, Titel="Risikobewertung durchführen" → L2="assessment"
- L1=risk_management, Titel="Risikominderungsmaßnahmen umsetzen" → L2="treatment"
- L1=risk_management, Titel="Restrisiko akzeptieren" → L2="acceptance"
- L1=access_control, Titel="Rollenbasierte Zugriffskontrolle" → L2="rbac"
- L1=access_control, Titel="Zugriffsrechte regelmäßig prüfen" → L2="access_review"
- L1=access_control, Titel="Identitätsmanagement implementieren" → L2="identity_management"
- L1=monitoring, Titel="Systemverfügbarkeit überwachen" → L2="availability"
- L1=monitoring, Titel="Sicherheitsereignisse überwachen" → L2="security_events"
- L1=policy, Titel="Datenschutzrichtlinie erstellen" → L2="data_protection"
- L1=policy, Titel="Acceptable Use Policy definieren" → L2="acceptable_use"
- L1=policy, Titel="Passwortrichtlinie festlegen" → L2="password"
- L1=financial_reporting, Titel="Jahresabschluss erstellen" → L2="annual_accounts"
- L1=financial_reporting, Titel="Steuererklärung einreichen" → L2="tax"
- L1=alerting, Titel="Datenpanne an Behörde melden" → L2="breach_notification"
- L1=alerting, Titel="Sicherheitswarnung eskalieren" → L2="escalation"
REGELN:
- L2 soll 1-3 Wörter sein, snake_case
- L2 soll SPEZIFISCH sein (nicht das L1 wiederholen)
- Verwende konsistente L2-Bezeichnungen für ähnliche Controls
Antworte NUR als JSON-Array: [{"id":"...","l2":"subtopic"}, ...]"""
def call_claude(controls_batch: list[dict]) -> tuple[list[dict], dict]:
"""Send batch to Claude for L2 sub-topic assignment."""
items = []
for c in controls_batch:
items.append(
f'- id="{c["control_id"]}" '
f'L1="{c["current_object"]}" '
f't="{c["title"]}" '
f'o="{c["objective"][:80]}"'
)
prompt = "Bestimme L2 Sub-Topics:\n" + "\n".join(items)
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": ANTHROPIC_MODEL,
"max_tokens": 1500,
"temperature": 0.0,
"system": SYSTEM_PROMPT,
"messages": [{"role": "user", "content": prompt}],
}
try:
resp = httpx.post(
ANTHROPIC_URL, headers=headers, json=payload, timeout=45.0
)
resp.raise_for_status()
data = resp.json()
content = data.get("content", [{}])[0].get("text", "")
usage = data.get("usage", {})
start = content.find("[")
end = content.rfind("]") + 1
if start >= 0 and end > start:
return json.loads(content[start:end]), usage
return [], usage
except httpx.TimeoutException:
logger.error("TIMEOUT — skipping")
return [], {}
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
logger.warning("Rate limited — waiting 60s")
time.sleep(60)
else:
logger.error("API error %d", e.response.status_code)
return [], {}
except Exception as e:
logger.error("Failed: %s", e)
return [], {}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--batch-size", type=int, default=20)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Build LIKE patterns for broad tokens
like_clauses = " OR ".join(
f"cc.generation_metadata->>'merge_group_hint' LIKE '%:{tok}:%'"
for tok in BROAD_TOKENS
)
with engine.connect() as c:
rows = c.execute(text(f"""
SELECT cc.id, cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective,
cc.generation_metadata->>'merge_group_hint' as hint
FROM canonical_controls cc
WHERE cc.generation_metadata->>'merge_group_hint' IS NOT NULL
AND cc.release_state NOT IN ('deprecated', 'rejected')
AND ({like_clauses})
""")).fetchall()
controls = []
for uuid, cid, title, objective, hint in rows:
parts = hint.split(":", 2) if hint else []
obj = parts[1] if len(parts) > 1 else ""
if obj in BROAD_TOKENS:
controls.append({
"uuid": str(uuid), "control_id": cid,
"title": title or "", "objective": objective or "",
"current_hint": hint, "current_object": obj,
})
logger.info("Found %d controls in broad tokens to add L2 sub-topics", len(controls))
# Process
total_tagged = 0
total_skipped = 0
total_input_tokens = 0
total_output_tokens = 0
corrections = []
l2_stats: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for i in range(0, len(controls), args.batch_size):
batch = controls[i:i + args.batch_size]
results, usage = call_claude(batch)
total_input_tokens += usage.get("input_tokens", 0)
total_output_tokens += usage.get("output_tokens", 0)
if not results:
total_skipped += len(batch)
continue
result_map = {r.get("id", ""): r for r in results}
for ctrl in batch:
r = result_map.get(ctrl["control_id"], {})
l2 = r.get("l2", "")
if not l2:
total_skipped += 1
continue
total_tagged += 1
old_hint = ctrl["current_hint"]
parts = old_hint.split(":", 2)
action = parts[0] if parts else "implement"
l1 = parts[1] if len(parts) > 1 else "unknown"
phase = parts[2] if len(parts) > 2 else "implementation"
# New format: action:L1_L2:phase
new_obj = f"{l1}_{l2}"
new_hint = f"{action}:{new_obj}:{phase}"
corrections.append({
"uuid": ctrl["uuid"],
"old_hint": old_hint,
"new_hint": new_hint,
})
l2_stats[l1][l2] += 1
processed = min(i + args.batch_size, len(controls))
if processed % 5000 < args.batch_size or processed >= len(controls):
logger.info(
"Progress: %d/%d (tagged=%d skip=%d)",
processed, len(controls), total_tagged, total_skipped,
)
time.sleep(0.3)
# Report
cost_in = total_input_tokens / 1_000_000 * 0.80
cost_out = total_output_tokens / 1_000_000 * 4.00
logger.info("\n" + "=" * 60)
logger.info("SUBTOPIC REPORT")
logger.info("=" * 60)
logger.info("Total: %d | Tagged: %d | Skipped: %d", len(controls), total_tagged, total_skipped)
logger.info("Cost: $%.2f (Haiku)", cost_in + cost_out)
# Show L2 distribution per L1
for l1, subs in sorted(l2_stats.items()):
top_subs = sorted(subs.items(), key=lambda x: -x[1])[:10]
logger.info("\n%s (%d unique L2):", l1, len(subs))
for l2, cnt in top_subs:
logger.info(" %4d %s_%s", cnt, l1, l2)
# Save corrections
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
corr_file = CHECKPOINT_DIR / "corrections_subtopics.json"
corr_file.write_text(json.dumps(corrections))
logger.info("\nSaved %d corrections to %s", len(corrections), corr_file)
if args.dry_run:
logger.info("DRY RUN — not updating DB")
return
if corrections:
logger.info("Applying %d corrections...", len(corrections))
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
for corr in corrections:
c.execute(text("""
UPDATE canonical_controls
SET generation_metadata = jsonb_set(
generation_metadata,
'{merge_group_hint}',
to_jsonb(CAST(:new_hint AS text))
)
WHERE id = CAST(:uuid AS uuid)
"""), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]})
logger.info("Done. %d hints updated.", len(corrections))
if __name__ == "__main__":
main()
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""Apply saved corrections from JSON file to DB (crash recovery)."""
import argparse
import json
import logging
import os
from pathlib import Path
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("apply-corrections")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("file", help="Path to corrections JSON file")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
corrections = json.loads(Path(args.file).read_text())
logger.info("Loaded %d corrections from %s", len(corrections), args.file)
if args.dry_run:
for c in corrections[:10]:
logger.info(" %s: %s%s", c["uuid"][:8], c["old_hint"], c["new_hint"])
logger.info("DRY RUN — not applying")
return
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
applied = 0
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
for corr in corrections:
c.execute(text("""
UPDATE canonical_controls
SET generation_metadata = jsonb_set(
generation_metadata,
'{merge_group_hint}',
to_jsonb(CAST(:new_hint AS text))
)
WHERE id = CAST(:uuid AS uuid)
"""), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]})
applied += 1
logger.info("Applied %d corrections.", applied)
if __name__ == "__main__":
main()
@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""Fix bad L2 subtopics: stakeholder_*, escalation fragments, *_approval*, *_documentation."""
import json
import logging
import os
import time
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("fix-subtopics")
DB_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_URL = "https://api.anthropic.com/v1/messages"
SYSTEM_PROMPT = """Du klassifizierst Controls mit einem L1_L2 Token. Das L2 soll den KONKRETEN fachlichen Aspekt beschreiben.
VERBOTENE L2-Wörter (zu generisch):
- stakeholder (zu vage — WER sind die Stakeholder? WAS wird getan?)
- documentation (ist eine Handlung, kein Thema)
- approval (ist eine Handlung)
- communication (zu vage)
Stattdessen SPEZIFISCH:
- "stakeholder_notification" bei Behördenmeldung → "authority_reporting"
- "stakeholder_consultation" bei DSFA → "impact_consultation"
- "stakeholder_engagement" bei Training → "participant_selection"
- "escalation_procedure""severity_classification" oder "response_plan"
- "access_documentation""access_policy" oder "permission_matrix"
- "approval_process""authorization_workflow" oder "sign_off"
L2 = 1-3 Wörter, snake_case, FACHLICH SPEZIFISCH.
Antworte NUR als JSON-Array: [{"id":"...","token":"L1_L2"}, ...]"""
def main():
engine = create_engine(DB_URL, connect_args={"options": "-c search_path=compliance,public"})
with engine.connect() as c:
rows = c.execute(text("""
SELECT cc.id, cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective,
cc.generation_metadata->>'merge_group_hint' as hint
FROM canonical_controls cc
WHERE cc.release_state NOT IN ('deprecated', 'rejected')
AND cc.generation_metadata->>'merge_group_hint' IS NOT NULL
AND (
cc.generation_metadata->>'merge_group_hint' LIKE '%stakeholder%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%_escalation_%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%_approval_%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%response_time%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%machine_re%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%management_app%'
)
""")).fetchall()
controls = []
for uuid, cid, title, objective, hint in rows:
parts = hint.split(":", 2) if hint else []
controls.append({
"uuid": str(uuid), "control_id": cid,
"title": title or "", "objective": objective or "",
"current_hint": hint,
"current_object": parts[1] if len(parts) > 1 else "",
})
logger.info("Found %d controls with bad subtopics to fix", len(controls))
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
corrections = []
total_fixed = 0
batch_size = 20
for i in range(0, len(controls), batch_size):
batch = controls[i:i + batch_size]
items = [
f'- id="{c["control_id"]}" cur="{c["current_object"]}" t="{c["title"]}" o="{c["objective"][:80]}"'
for c in batch
]
try:
resp = httpx.post(ANTHROPIC_URL, headers=headers, json={
"model": "claude-haiku-4-5-20251001",
"max_tokens": 1500, "temperature": 0.0,
"system": SYSTEM_PROMPT,
"messages": [{"role": "user", "content": "Fix:\n" + "\n".join(items)}],
}, timeout=45.0)
resp.raise_for_status()
content = resp.json().get("content", [{}])[0].get("text", "")
start = content.find("[")
end = content.rfind("]") + 1
results = json.loads(content[start:end]) if start >= 0 else []
except Exception as e:
logger.error("Batch %d failed: %s", i, e)
continue
result_map = {r.get("id", ""): r for r in results}
for ctrl in batch:
r = result_map.get(ctrl["control_id"], {})
new_token = r.get("token", "")
if not new_token or new_token == ctrl["current_object"]:
continue
if "stakeholder" in new_token or "approval" in new_token:
continue # Still bad
parts = ctrl["current_hint"].split(":", 2)
action = parts[0] if parts else "implement"
phase = parts[2] if len(parts) > 2 else "implementation"
corrections.append({
"uuid": ctrl["uuid"],
"old_hint": ctrl["current_hint"],
"new_hint": f"{action}:{new_token}:{phase}",
})
total_fixed += 1
if (i + batch_size) % 200 < batch_size:
logger.info("Progress: %d/%d (fixed=%d)", min(i + batch_size, len(controls)), len(controls), total_fixed)
time.sleep(0.3)
logger.info("Fixed: %d of %d controls", total_fixed, len(controls))
# Save + apply
Path("/tmp/corrections_bad_subtopics.json").write_text(json.dumps(corrections))
if corrections:
logger.info("Applying %d corrections...", len(corrections))
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
for corr in corrections:
c.execute(text("""
UPDATE canonical_controls
SET generation_metadata = jsonb_set(
generation_metadata,
'{merge_group_hint}',
to_jsonb(CAST(:new_hint AS text))
)
WHERE id = CAST(:uuid AS uuid)
"""), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]})
logger.info("Done.")
if __name__ == "__main__":
main()
@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
Fix generic tokens: Re-classify controls that were assigned to
action-based tokens (documentation, procedure, process, etc.)
instead of topic-based tokens.
Runs sequentially in 5 batches. NO retry on timeout.
Usage:
python3 /app/scripts/gpre0_fix_generic_tokens.py --dry-run
python3 /app/scripts/gpre0_fix_generic_tokens.py
"""
import argparse
import json
import logging
import os
import time
from collections import defaultdict
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("gpre0-fix-generic")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = "claude-haiku-4-5-20251001"
ANTHROPIC_URL = "https://api.anthropic.com/v1/messages"
CHECKPOINT_DIR = Path("/tmp/gpre0_fix_checkpoints")
# Tokens that are ACTION-based, not TOPIC-based → must be re-classified
FORBIDDEN_TOKENS = {
"documentation", "procedure", "process",
"compliance_reporting", "records_management",
}
SYSTEM_PROMPT = """Du bist ein Compliance-Klassifizierer. Ordne jeden Control dem THEMA zu, nicht der Handlung.
KRITISCH: Die Tokens "documentation", "procedure", "process", "compliance_reporting",
"records_management" sind VERBOTEN. Klassifiziere nach dem INHALTLICHEN THEMA.
Beispiele:
- "Risikobewertung dokumentieren" → risk_management (NICHT documentation)
- "Incident-Verfahren definieren" → incident (NICHT procedure)
- "Verschlüsselungsprozess implementieren" → encryption (NICHT process)
- "Audit-Ergebnisse berichten" → compliance_audit (NICHT compliance_reporting)
- "Datenschutz-Unterlagen verwalten" → personal_data (NICHT records_management)
- "Löschkonzept dokumentieren" → data_retention (NICHT documentation)
- "Zertifizierungsverfahren definieren" → certification (NICHT procedure)
- "Schulungsprozess durchführen" → training (NICHT process)
ERLAUBTE TOKENS:
SECURITY: multi_factor_auth, password_policy, credentials, session_management,
privileged_access, access_control, encryption, transport_encryption,
key_management, certificate_management, network_security, network_segmentation,
firewall, vpn, remote_access, monitoring, audit_logging, siem, alerting,
compliance_audit, vulnerability, patch_management, backup, disaster_recovery,
physical_security, secure_development, api_security, input_validation,
container_security, logging_configuration
DATA_PROTECTION: personal_data, sensitive_data, health_data, consent,
data_subject_rights, data_retention, data_transfer, data_breach_notification,
dpia, data_processing_agreement, privacy_by_design, data_processing_register,
data_classification, cookie_consent, video_surveillance
GOVERNANCE: policy, training, awareness, incident, risk_management,
third_party_management, change_management, asset_management,
human_resources_security
REGULATORY: supervisory_authority, certification, product_safety, ai_system,
financial_reporting, aml, whistleblowing, consumer_protection, ecommerce,
telecommunications, medical_device, payment_services, critical_infrastructure,
supply_chain_due_diligence, sustainability_reporting
Antworte NUR als JSON-Array: [{"id":"...","token":"...","conf":0.9}, ...]"""
def call_claude(controls_batch: list[dict]) -> tuple[list[dict], dict]:
"""Send batch to Claude. NO retry on timeout."""
items = []
for c in controls_batch:
items.append(
f'- id="{c["control_id"]}" '
f'cur="{c["current_object"]}" '
f't="{c["title"]}" '
f'o="{c["objective"][:100]}"'
)
prompt = "Klassifiziere nach THEMA (nicht Handlung):\n" + "\n".join(items)
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": ANTHROPIC_MODEL,
"max_tokens": 1500,
"temperature": 0.0,
"system": SYSTEM_PROMPT,
"messages": [{"role": "user", "content": prompt}],
}
try:
resp = httpx.post(
ANTHROPIC_URL, headers=headers, json=payload, timeout=45.0
)
resp.raise_for_status()
data = resp.json()
content = data.get("content", [{}])[0].get("text", "")
usage = data.get("usage", {})
start = content.find("[")
end = content.rfind("]") + 1
if start >= 0 and end > start:
return json.loads(content[start:end]), usage
return [], usage
except httpx.TimeoutException:
logger.error("TIMEOUT — skipping batch")
return [], {}
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
logger.warning("Rate limited — waiting 60s")
time.sleep(60)
else:
logger.error("API error %d", e.response.status_code)
return [], {}
except Exception as e:
logger.error("Failed: %s", e)
return [], {}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--batch-size", type=int, default=20)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Load only controls with forbidden tokens
forbidden_pattern = "|".join(
f":{tok}:" for tok in FORBIDDEN_TOKENS
)
with engine.connect() as c:
rows = c.execute(text("""
SELECT cc.id, cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective,
cc.generation_metadata->>'merge_group_hint' as hint
FROM canonical_controls cc
WHERE cc.generation_metadata->>'merge_group_hint' IS NOT NULL
AND cc.release_state NOT IN ('deprecated', 'rejected')
AND (
cc.generation_metadata->>'merge_group_hint' LIKE '%:documentation:%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%:procedure:%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%:process:%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%:compliance_reporting:%'
OR cc.generation_metadata->>'merge_group_hint' LIKE '%:records_management:%'
)
""")).fetchall()
controls = []
for uuid, cid, title, objective, hint in rows:
parts = hint.split(":", 2) if hint else []
controls.append({
"uuid": str(uuid), "control_id": cid,
"title": title or "", "objective": objective or "",
"current_hint": hint,
"current_object": parts[1] if len(parts) > 1 else hint,
})
logger.info("Found %d controls with forbidden tokens to re-classify", len(controls))
# Process
total_fixed = 0
total_kept = 0
total_skipped = 0
total_input_tokens = 0
total_output_tokens = 0
corrections = []
change_stats: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for i in range(0, len(controls), args.batch_size):
batch = controls[i:i + args.batch_size]
results, usage = call_claude(batch)
total_input_tokens += usage.get("input_tokens", 0)
total_output_tokens += usage.get("output_tokens", 0)
if not results:
total_skipped += len(batch)
continue
result_map = {r.get("id", ""): r for r in results}
for ctrl in batch:
r = result_map.get(ctrl["control_id"], {})
new_token = r.get("token", "")
if not new_token or new_token in FORBIDDEN_TOKENS:
total_kept += 1
continue
old_obj = ctrl["current_object"]
if new_token != old_obj:
total_fixed += 1
parts = ctrl["current_hint"].split(":", 2)
action = parts[0] if parts else "implement"
phase = parts[2] if len(parts) > 2 else "implementation"
corrections.append({
"uuid": ctrl["uuid"],
"old_hint": ctrl["current_hint"],
"new_hint": f"{action}:{new_token}:{phase}",
})
change_stats[old_obj][new_token] += 1
else:
total_kept += 1
processed = min(i + args.batch_size, len(controls))
if processed % 2000 < args.batch_size or processed >= len(controls):
logger.info(
"Progress: %d/%d (fixed=%d kept=%d skip=%d)",
processed, len(controls), total_fixed, total_kept, total_skipped,
)
time.sleep(0.3)
# Report
cost_in = total_input_tokens / 1_000_000 * 0.80
cost_out = total_output_tokens / 1_000_000 * 4.00
logger.info("\n" + "=" * 60)
logger.info("GENERIC TOKEN FIX REPORT")
logger.info("=" * 60)
logger.info("Total: %d controls", len(controls))
logger.info("Fixed: %d", total_fixed)
logger.info("Kept: %d (LLM also chose forbidden → kept as-is)", total_kept)
logger.info("Skipped: %d", total_skipped)
logger.info("Cost: $%.2f (Haiku)", cost_in + cost_out)
logger.info("\nTop changes:")
flat = []
for old, news in change_stats.items():
for new, cnt in news.items():
flat.append((cnt, old, new))
for cnt, old, new in sorted(flat, reverse=True)[:30]:
logger.info(" %4d × %s%s", cnt, old, new)
# Save corrections
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
corr_file = CHECKPOINT_DIR / "corrections_generic_fix.json"
corr_file.write_text(json.dumps(corrections))
logger.info("Saved %d corrections to %s", len(corrections), corr_file)
if args.dry_run:
logger.info("DRY RUN — not updating DB")
return
if corrections:
logger.info("Applying %d corrections...", len(corrections))
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
for corr in corrections:
c.execute(text("""
UPDATE canonical_controls
SET generation_metadata = jsonb_set(
generation_metadata,
'{merge_group_hint}',
to_jsonb(CAST(:new_hint AS text))
)
WHERE id = CAST(:uuid AS uuid)
"""), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]})
logger.info("Done. %d hints corrected.", len(corrections))
if __name__ == "__main__":
main()
+37
View File
@@ -0,0 +1,37 @@
#!/bin/bash
# Run all 10 batches sequentially. Safe: if one fails, the rest don't run.
# Each batch saves corrections to JSON before applying to DB.
#
# Usage: bash /app/scripts/gpre0_run_all.sh
# bash /app/scripts/gpre0_run_all.sh 5 # start from batch 5
set -e
START=${1:-1}
TOTAL=10
echo "=== Starting from batch $START of $TOTAL ==="
for i in $(seq $START $TOTAL); do
echo ""
echo "================================================================"
echo " BATCH $i/$TOTAL$(date)"
echo "================================================================"
PYTHONPATH=/app python3 /app/scripts/gpre0_validate_hints.py \
--batch-id $i \
--total-batches $TOTAL \
--batch-size 20
EXIT_CODE=$?
if [ $EXIT_CODE -ne 0 ]; then
echo "BATCH $i FAILED with exit code $EXIT_CODE"
echo "Resume with: bash /app/scripts/gpre0_run_all.sh $i"
exit $EXIT_CODE
fi
echo "BATCH $i DONE — $(date)"
done
echo ""
echo "ALL $TOTAL BATCHES COMPLETE!"
@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
Phase 2: Validate and correct merge_group_hints using Claude Haiku.
Re-classifies each control's object token against the expanded ontology
(74 canonical tokens). Corrects wrong hints in the DB.
SAFETY: Split into 4 batches. NEVER retries on timeout (double-billing!).
Writes checkpoint after each API call for safe resume.
Usage:
python3 /app/scripts/gpre0_validate_hints.py --batch-id 1 --dry-run
python3 /app/scripts/gpre0_validate_hints.py --batch-id 1
python3 /app/scripts/gpre0_validate_hints.py --batch-id 2
python3 /app/scripts/gpre0_validate_hints.py --batch-id 3
python3 /app/scripts/gpre0_validate_hints.py --batch-id 4
"""
import argparse
import json
import logging
import os
import time
from collections import defaultdict
from pathlib import Path
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("gpre0-validate")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = "claude-haiku-4-5-20251001"
ANTHROPIC_URL = "https://api.anthropic.com/v1/messages"
CHECKPOINT_DIR = Path("/tmp/gpre0_checkpoints")
SYSTEM_PROMPT = """Du bist ein Compliance-Klassifizierer. Ordne jeden Control GENAU EINEM Token zu.
REGEL: Waehle IMMER den naechstbesten Token aus der Liste. OTHER nur wenn ABSOLUT
kein Token auch nur entfernt passt (<1% der Faelle). Im Zweifel: den breitesten
passenden Token waehlen (z.B. "policy" fuer Governance-Dokumente, "procedure" fuer
Ablauf-Definitionen, "risk_management" fuer Bewertungen).
TOKENS:
SECURITY: multi_factor_auth, password_policy, credentials, session_management,
privileged_access, access_control, encryption, transport_encryption,
key_management, certificate_management, network_security, network_segmentation,
firewall, vpn, remote_access, monitoring (NUR Echtzeit-Systemueberwachung),
audit_logging (Protokollierung/Audit Trail), siem, alerting (Meldepflichten),
compliance_audit (externe Pruefungen), vulnerability, patch_management,
backup, disaster_recovery, physical_security, secure_development,
api_security, input_validation, container_security, logging_configuration
DATA_PROTECTION: personal_data (DSGVO-Verarbeitung), sensitive_data (Art.9),
health_data, consent, data_subject_rights, data_retention, data_transfer,
data_breach_notification, dpia, data_processing_agreement, privacy_by_design,
data_processing_register, data_classification, cookie_consent, video_surveillance
GOVERNANCE: policy (Richtlinie definieren), procedure (Verfahren definieren),
process (Betriebsprozess ausfuehren), training (Schulung), awareness,
incident (Vorfallsbehandlung), risk_management, third_party_management,
change_management, documentation, records_management, compliance_reporting,
asset_management, human_resources_security
REGULATORY: supervisory_authority, certification (Zertifizierung/Konformitaet),
product_safety, ai_system, financial_reporting, aml, whistleblowing,
consumer_protection, ecommerce, telecommunications, medical_device,
payment_services, critical_infrastructure, supply_chain_due_diligence,
sustainability_reporting
ABGRENZUNGEN:
- monitoring = NUR Echtzeit-Systemueberwachung, NICHT Audit/Schulung/Bewertung
- audit_logging = Protokollierung, NICHT externe Pruefung (→ compliance_audit)
- procedure = Verfahren DEFINIEREN, NICHT Vorfaelle behandeln (→ incident)
- personal_data = DSGVO-Verarbeitung, NICHT Zertifizierung (→ certification)
- alerting = Meldepflichten, NICHT Vorfallsbehandlung (→ incident)
Antworte NUR als JSON-Array: [{"id":"...","token":"...","conf":0.9}, ...]
KEIN weiterer Text. Nur das Array."""
def call_claude(controls_batch: list[dict]) -> tuple[list[dict], dict]:
"""Send batch to Claude. NO RETRY on timeout (double-billing risk!)."""
items = []
for c in controls_batch:
items.append(
f'- id="{c["control_id"]}" '
f'cur="{c["current_object"]}" '
f't="{c["title"]}" '
f'o="{c["objective"][:100]}"'
)
prompt = "Klassifiziere:\n" + "\n".join(items)
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": ANTHROPIC_MODEL,
"max_tokens": 1500,
"temperature": 0.0,
"system": SYSTEM_PROMPT,
"messages": [{"role": "user", "content": prompt}],
}
try:
resp = httpx.post(
ANTHROPIC_URL, headers=headers, json=payload, timeout=45.0
)
resp.raise_for_status()
data = resp.json()
content = data.get("content", [{}])[0].get("text", "")
usage = data.get("usage", {})
start = content.find("[")
end = content.rfind("]") + 1
if start >= 0 and end > start:
return json.loads(content[start:end]), usage
logger.warning("No JSON array in response")
return [], usage
except httpx.TimeoutException:
# CRITICAL: Do NOT retry! Log and skip.
logger.error("TIMEOUT — skipping batch (NOT retrying to avoid double-billing)")
return [], {}
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
logger.warning("Rate limited — waiting 60s then skipping")
time.sleep(60)
else:
logger.error("API error %d — skipping batch", e.response.status_code)
return [], {}
except Exception as e:
logger.error("Request failed — skipping: %s", e)
return [], {}
def load_checkpoint(batch_id: int) -> int:
"""Load last processed index for this batch."""
cp_file = CHECKPOINT_DIR / f"batch_{batch_id}.json"
if cp_file.exists():
data = json.loads(cp_file.read_text())
return data.get("last_index", 0)
return 0
def save_checkpoint(batch_id: int, last_index: int, stats: dict):
"""Save progress checkpoint."""
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
cp_file = CHECKPOINT_DIR / f"batch_{batch_id}.json"
cp_file.write_text(json.dumps({
"batch_id": batch_id,
"last_index": last_index,
**stats,
}))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--batch-id", type=int, required=True)
parser.add_argument("--total-batches", type=int, default=10)
parser.add_argument("--batch-size", type=int, default=20)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--resume", action="store_true",
help="Resume from checkpoint")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Load ALL control IDs ordered deterministically, then select quarter
with engine.connect() as c:
all_ids = c.execute(text("""
SELECT cc.id
FROM canonical_controls cc
WHERE cc.generation_metadata->>'merge_group_hint' IS NOT NULL
AND cc.generation_metadata->>'merge_group_hint' != ''
AND cc.release_state NOT IN ('deprecated', 'rejected')
ORDER BY cc.id
""")).fetchall()
total = len(all_ids)
chunk = total // args.total_batches
start_idx = (args.batch_id - 1) * chunk
end_idx = total if args.batch_id == args.total_batches else args.batch_id * chunk
batch_ids = [str(r[0]) for r in all_ids[start_idx:end_idx]]
logger.info("Batch %d/%d: controls %d-%d (%d controls of %d total)",
args.batch_id, args.total_batches, start_idx, end_idx, len(batch_ids), total)
# Load full data for this batch
id_list = ",".join(f"'{uid}'" for uid in batch_ids)
with engine.connect() as c:
rows = c.execute(text(f"""
SELECT cc.id, cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective,
cc.generation_metadata->>'merge_group_hint' as hint
FROM canonical_controls cc
WHERE cc.id IN ({id_list})
ORDER BY cc.id
""")).fetchall()
controls = []
for uuid, cid, title, objective, hint in rows:
parts = hint.split(":", 2) if hint else []
controls.append({
"uuid": str(uuid), "control_id": cid,
"title": title or "", "objective": objective or "",
"current_hint": hint, "current_object": parts[1] if len(parts) > 1 else hint,
})
# Resume from checkpoint?
start_from = 0
if args.resume:
start_from = load_checkpoint(args.batch_id)
if start_from > 0:
logger.info("Resuming from index %d", start_from)
# Process
total_same = 0
total_changed = 0
total_other = 0
total_skipped = 0
total_input_tokens = 0
total_output_tokens = 0
corrections: list[dict] = []
change_stats: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for i in range(start_from, len(controls), args.batch_size):
batch = controls[i:i + args.batch_size]
results, usage = call_claude(batch)
total_input_tokens += usage.get("input_tokens", 0)
total_output_tokens += usage.get("output_tokens", 0)
if not results:
total_skipped += len(batch)
save_checkpoint(args.batch_id, i + args.batch_size, {
"same": total_same, "changed": total_changed,
"other": total_other, "skipped": total_skipped,
})
continue
result_map = {r.get("id", ""): r for r in results}
for ctrl in batch:
r = result_map.get(ctrl["control_id"], {})
new_token = r.get("token", "")
if not new_token:
total_skipped += 1
continue
old_obj = ctrl["current_object"]
if new_token == "OTHER":
total_other += 1
elif new_token == old_obj:
total_same += 1
else:
total_changed += 1
parts = ctrl["current_hint"].split(":", 2)
action = parts[0] if parts else "implement"
phase = parts[2] if len(parts) > 2 else "implementation"
corrections.append({
"uuid": ctrl["uuid"],
"old_hint": ctrl["current_hint"],
"new_hint": f"{action}:{new_token}:{phase}",
})
change_stats[old_obj][new_token] += 1
# Checkpoint every batch
save_checkpoint(args.batch_id, i + args.batch_size, {
"same": total_same, "changed": total_changed,
"other": total_other, "skipped": total_skipped,
})
processed = min(i + args.batch_size, len(controls))
if processed % 1000 < args.batch_size or processed >= len(controls):
logger.info(
"Batch %d: %d/%d (same=%d changed=%d other=%d skip=%d)",
args.batch_id, processed, len(controls),
total_same, total_changed, total_other, total_skipped,
)
time.sleep(0.3)
# Report
cost_in = total_input_tokens / 1_000_000 * 0.80 # Haiku
cost_out = total_output_tokens / 1_000_000 * 4.00 # Haiku
total_cost = cost_in + cost_out
total_proc = total_same + total_changed + total_other
logger.info("\n" + "=" * 60)
logger.info("BATCH %d REPORT", args.batch_id)
logger.info("=" * 60)
logger.info("Processed: %d | Skipped: %d", total_proc, total_skipped)
logger.info("Same: %d (%.1f%%)", total_same, total_same / max(total_proc, 1) * 100)
logger.info("Changed: %d (%.1f%%)", total_changed, total_changed / max(total_proc, 1) * 100)
logger.info("OTHER: %d (%.1f%%)", total_other, total_other / max(total_proc, 1) * 100)
logger.info("Cost: $%.2f (Haiku)", total_cost)
logger.info("Cost/ctrl: $%.5f", total_cost / max(total_proc, 1))
# Top changes
flat = []
for old, news in change_stats.items():
for new, cnt in news.items():
flat.append((cnt, old, new))
logger.info("\nTop Changes:")
for cnt, old, new in sorted(flat, reverse=True)[:20]:
logger.info(" %4d × %s%s", cnt, old, new)
# Always save corrections to file (recovery safety)
corr_file = CHECKPOINT_DIR / f"corrections_batch_{args.batch_id}.json"
if corrections:
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
corr_file.write_text(json.dumps(corrections))
logger.info("Saved %d corrections to %s", len(corrections), corr_file)
if args.dry_run:
logger.info("\nDRY RUN — not updating DB")
return
# Apply corrections in single transaction
if corrections:
logger.info("\nApplying %d corrections...", len(corrections))
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
for corr in corrections:
c.execute(text("""
UPDATE canonical_controls
SET generation_metadata = jsonb_set(
generation_metadata,
'{merge_group_hint}',
to_jsonb(CAST(:new_hint AS text))
)
WHERE id = CAST(:uuid AS uuid)
"""), {"uuid": corr["uuid"], "new_hint": corr["new_hint"]})
logger.info("Done. %d hints corrected.", len(corrections))
else:
logger.info("No corrections needed.")
if __name__ == "__main__":
main()
+214
View File
@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""
G-pre2 v2: Build Master Controls directly from canonical tokens.
No K-Means needed — Phase 2 already normalized merge_group_hints
to 74 canonical tokens. Each token = one object group.
Groups controls by (canonical_token, phase) and creates MCs
for tokens with >=2 distinct phases.
Usage:
python3 /app/scripts/gpre2_direct_mc.py --dry-run
python3 /app/scripts/gpre2_direct_mc.py --min-phases 2
"""
import argparse
import json
import logging
import os
from collections import defaultdict
from sqlalchemy import create_engine, text
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("gpre2-direct")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
PHASE_ORDER = {
"scope": 0, "definition": 1, "governance": 1,
"design": 2, "implementation": 3, "configuration": 3,
"operation": 4, "training": 4, "monitoring": 5,
"testing": 6, "review": 7, "assessment": 8, "remediation": 8,
"validation": 9, "reporting": 10, "evidence": 11,
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--min-phases", type=int, default=2)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Step 1: Load all controls with merge_group_hint
logger.info("Loading controls...")
with engine.connect() as c:
rows = c.execute(text("""
SELECT id, control_id,
generation_metadata->>'merge_group_hint' AS hint
FROM canonical_controls
WHERE generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
AND release_state NOT IN ('deprecated', 'rejected')
""")).fetchall()
logger.info("Loaded %d controls", len(rows))
# Step 2: Group by (object_token, phase)
token_phases: dict[str, dict[str, list]] = defaultdict(
lambda: defaultdict(list)
)
for uuid, control_id, hint in rows:
parts = hint.split(":", 2)
if len(parts) < 2:
continue
action = parts[0]
obj = parts[1]
phase = parts[2] if len(parts) > 2 else "implementation"
token_phases[obj][phase].append((str(uuid), control_id, action))
logger.info("Found %d unique object tokens", len(token_phases))
# Step 3: Create Master Controls
master_controls = []
master_members = []
for token, phases in token_phases.items():
if len(phases) < args.min_phases:
continue
sorted_phases = sorted(
phases.keys(), key=lambda p: PHASE_ORDER.get(p, 99)
)
phase_counts = {p: len(ctrls) for p, ctrls in phases.items()}
total = sum(phase_counts.values())
master_controls.append({
"canonical_name": token,
"phases_covered": json.dumps(sorted_phases),
"phase_control_count": json.dumps(phase_counts),
"total_controls": total,
})
for phase, controls in phases.items():
for ctrl_uuid, ctrl_id, action in controls:
master_members.append({
"canonical_name": token,
"control_uuid": ctrl_uuid,
"phase": phase,
"action": action,
})
logger.info(
"Created %d Master Controls with %d members (min %d phases)",
len(master_controls), len(master_members), args.min_phases,
)
# Stats
if master_controls:
counts = [mc["total_controls"] for mc in master_controls]
phases_per = [
len(json.loads(mc["phases_covered"])) for mc in master_controls
]
logger.info(" Avg controls/MC: %.1f", sum(counts) / len(counts))
logger.info(" Max controls/MC: %d", max(counts))
logger.info(" Avg phases/MC: %.1f", sum(phases_per) / len(phases_per))
logger.info(" Max phases/MC: %d", max(phases_per))
# Size distribution
logger.info("\n Size distribution:")
logger.info(" ≤10: %d", sum(1 for c in counts if c <= 10))
logger.info(" 11-50: %d", sum(1 for c in counts if 11 <= c <= 50))
logger.info(" 51-200: %d", sum(1 for c in counts if 51 <= c <= 200))
logger.info(" 201-500: %d", sum(1 for c in counts if 201 <= c <= 500))
logger.info(" 501-2K: %d", sum(1 for c in counts if 501 <= c <= 2000))
logger.info(" >2K: %d", sum(1 for c in counts if c > 2000))
# Top 15
top = sorted(master_controls, key=lambda x: -x["total_controls"])[:15]
logger.info("\n Top 15 Master Controls:")
for mc in top:
logger.info(
" %6d %s (%d phases)",
mc["total_controls"],
mc["canonical_name"],
len(json.loads(mc["phases_covered"])),
)
if args.dry_run:
logger.info("\nDRY RUN — not writing to DB")
return
# Step 4: Write to DB
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
c.execute(text("DELETE FROM master_control_members"))
c.execute(text("DELETE FROM master_controls"))
# Get next object_group_id
max_gid = c.execute(
text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups")
).scalar()
next_gid = max_gid + 1
mc_uuids = {}
for mc in master_controls:
gid = next_gid
next_gid += 1
mc_id = f"MC-{gid}"
c.execute(text("""
INSERT INTO master_controls
(master_control_id, object_group_id, canonical_name,
phases_covered, phase_control_count, total_controls)
VALUES (:mcid, :gid, :name,
CAST(:phases AS jsonb),
CAST(:pcounts AS jsonb), :total)
"""), {
"mcid": mc_id, "gid": gid,
"name": mc["canonical_name"],
"phases": mc["phases_covered"],
"pcounts": mc["phase_control_count"],
"total": mc["total_controls"],
})
mc_uuid = c.execute(text(
"SELECT id FROM master_controls WHERE master_control_id = :mcid"
), {"mcid": mc_id}).scalar()
mc_uuids[mc["canonical_name"]] = str(mc_uuid)
# Insert members
mem_count = 0
for mem in master_members:
mc_uuid = mc_uuids.get(mem["canonical_name"])
if not mc_uuid:
continue
c.execute(text("""
INSERT INTO master_control_members
(master_control_uuid, control_uuid, phase, action)
VALUES (CAST(:mc AS uuid), CAST(:ctrl AS uuid),
:phase, :action)
"""), {
"mc": mc_uuid,
"ctrl": mem["control_uuid"],
"phase": mem["phase"],
"action": mem["action"],
})
mem_count += 1
logger.info("Wrote %d MCs + %d members to DB", len(master_controls), mem_count)
if __name__ == "__main__":
main()
@@ -0,0 +1,298 @@
#!/usr/bin/env python3
"""
G-pre3: Split large Master Controls by regulation source.
For each MC with >200 controls:
1. Load member controls with parent's source_citation->>'source'
2. Group by regulation source
3. Sources with >= MIN_SOURCE_SIZE → new sub-MC
4. Small sources → merge into "mixed" bucket
5. UNKNOWN (no source_citation) → sub-cluster by embedding if >MAX_MC
6. Delete original large MC, create new sub-MCs
Usage:
python3 /app/scripts/gpre3_regulation_split.py --dry-run
python3 /app/scripts/gpre3_regulation_split.py --min-source 15 --max-mc 100
"""
import argparse
import json
import logging
import os
import re
from collections import defaultdict
from sqlalchemy import create_engine, text
from services.embedding_utils import subcluster_controls
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("gpre3")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
# ── Source key normalization ────────────────────────────────────────
# fmt: off
_SOURCE_SHORT: dict[str, str] = {
"DSGVO (EU) 2016/679": "dsgvo", "NIS2-Richtlinie (EU) 2022/2555": "nis2",
"KI-Verordnung (EU) 2024/1689": "ai_act", "Cyber Resilience Act (CRA)": "cra",
"Digital Services Act (DSA)": "dsa", "Digital Markets Act (DMA)": "dma",
"Digital Operational Resilience Act": "dora", "Data Governance Act (DGA)": "dga",
"Data Act": "data_act", "Maschinenverordnung (EU) 2023/1230": "machinery_reg",
"Medizinprodukteverordnung (EU) 2017/745 (MDR)": "mdr",
"European Health Data Space": "ehds", "European Accessibility Act": "eaa",
"EU Cybersecurity Act": "eu_csa", "EU Blue Guide 2022": "eu_blue_guide",
"EU-US Data Privacy Framework": "eu_us_dpf", "Markets in Crypto-Assets (MiCA)": "mica",
"Standardvertragsklauseln (SCC)": "scc", "ePrivacy-Richtlinie": "eprivacy",
"Batterieverordnung (EU) 2023/1542": "battery_reg",
"Bundesdatenschutzgesetz (BDSG)": "bdsg",
"BSI-Gesetz (BSIG 2025, NIS2-Umsetzung)": "bsig",
"BSI-Kritisverordnung (BSI-KritisV)": "bsi_kritisv",
"Geldwaeschegesetz (GwG)": "gwg", "Hinweisgeberschutzgesetz (HinSchG)": "hinschg",
"Lieferkettensorgfaltspflichtengesetz (LkSG)": "lksg",
"KRITIS-Dachgesetz (KRITISDachG)": "kritisdachg",
"NIST SP 800-53 Rev. 5": "nist_800_53", "NIST Cybersecurity Framework 2.0": "nist_csf",
"NIST Privacy Framework 1.0": "nist_privacy",
"NIST SP 800-207 (Zero Trust)": "nist_zero_trust",
"NIST SP 800-218 (SSDF)": "nist_ssdf", "NIST SP 800-63-3": "nist_800_63",
"NIST AI Risk Management Framework": "nist_ai_rmf",
"NISTIR 8259A IoT Security": "nist_iot",
"OWASP Top 10 (2021)": "owasp_top10", "OWASP API Security Top 10 (2023)": "owasp_api",
"OWASP ASVS 4.0": "owasp_asvs", "OWASP SAMM 2.0": "owasp_samm",
"OWASP MASVS 2.0": "owasp_masvs", "OWASP Mobile Top 10": "owasp_mobile",
"ENISA": "enisa", "TDDDG": "tdddg", "TKG": "tkg", "TMG": "tmg",
"BGB": "bgb", "UWG": "uwg", "UrhG": "urhg",
"BAIT (BaFin 2024)": "bait", "VAIT (BaFin 2022)": "vait",
"AML-Verordnung": "aml_reg", "Zahlungsdiensterichtlinie 2": "psd2",
"Telekommunikationsgesetz Oesterreich": "at_tkg",
"Österreichisches Datenschutzgesetz (DSG)": "at_dsg",
"Allgemeines Gleichbehandlungsgesetz (AGG)": "agg",
"Aktiengesetz (AktG)": "aktg", "Handelsgesetzbuch (HGB)": "hgb",
"GmbH-Gesetz (GmbHG)": "gmbhg", "Insolvenzordnung (InsO)": "inso",
"Gewerbeordnung (GewO)": "gewo", "Abgabenordnung (AO)": "ao",
}
# fmt: on
def source_to_key(source: str) -> str:
"""Normalize regulation source name to a short slug key."""
if source in _SOURCE_SHORT:
return _SOURCE_SHORT[source]
s = source.lower()
s = re.sub(r"\(.*?\)", "", s)
s = re.sub(r"[^a-z0-9äöüß]+", "_", s)
s = re.sub(r"_+", "_", s).strip("_")
return s[:40] if s else "unknown"
# ── Main ───────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--min-source", type=int, default=15,
help="Min controls per source for own sub-MC")
parser.add_argument("--max-mc", type=int, default=100,
help="Max controls per sub-MC before sub-clustering")
parser.add_argument("--threshold", type=int, default=200,
help="Only split MCs with more than N controls")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Step 1: Find large master controls
with engine.connect() as c:
large_mcs = c.execute(text("""
SELECT mc.id, mc.master_control_id, mc.object_group_id,
mc.canonical_name, mc.total_controls
FROM master_controls mc
WHERE mc.total_controls > :threshold
ORDER BY mc.total_controls DESC
"""), {"threshold": args.threshold}).fetchall()
logger.info("Found %d MCs with >%d controls", len(large_mcs), args.threshold)
if not large_mcs:
return
# Step 2: Build split plans
all_splits = []
for mc_uuid, mc_id, og_id, canonical, total in large_mcs:
plan = _build_split_plan(engine, mc_uuid, mc_id, og_id, canonical, total, args)
all_splits.append(plan)
total_new = sum(len(sp["sub_groups"]) for sp in all_splits)
total_covered = sum(
sum(len(sg["controls"]) for sg in sp["sub_groups"]) for sp in all_splits
)
logger.info("SUMMARY: %d large MCs → %d sub-MCs (%d controls)", len(all_splits), total_new, total_covered)
if args.dry_run:
logger.info("DRY RUN — not writing to DB")
return
_write_splits(engine, all_splits)
def _build_split_plan(engine, mc_uuid, mc_id, og_id, canonical, total, args) -> dict:
"""Build a regulation-source split plan for one large MC."""
logger.info("\n━━━ %s: %s (%d controls) ━━━", mc_id, canonical, total)
with engine.connect() as c:
members = c.execute(text("""
SELECT mcm.control_uuid, mcm.phase, mcm.action,
cc.control_id, cc.title,
COALESCE(pc.source_citation->>'source', 'UNKNOWN') AS src
FROM master_control_members mcm
JOIN canonical_controls cc ON cc.id = mcm.control_uuid
LEFT JOIN canonical_controls pc ON pc.id = cc.parent_control_uuid
WHERE mcm.master_control_uuid = CAST(:mc_uuid AS uuid)
"""), {"mc_uuid": str(mc_uuid)}).fetchall()
by_source: dict[str, list[dict]] = defaultdict(list)
for ctrl_uuid, phase, action, cid, title, src in members:
by_source[src].append({
"control_uuid": str(ctrl_uuid), "phase": phase,
"action": action, "control_id": cid, "title": title,
})
sorted_sources = sorted(by_source.items(), key=lambda x: -len(x[1]))
for src, ctrls in sorted_sources[:8]:
logger.info(" %4d %s", len(ctrls), src)
if len(sorted_sources) > 8:
logger.info(" ... +%d more sources", len(sorted_sources) - 8)
plan = {"mc_uuid": str(mc_uuid), "mc_id": mc_id, "og_id": og_id,
"canonical": canonical, "total": total, "sub_groups": []}
own_mc_sources = []
mixed_controls = []
for src, ctrls in sorted_sources:
if src == "UNKNOWN":
continue
if len(ctrls) >= args.min_source:
own_mc_sources.append((src, ctrls))
else:
mixed_controls.extend(ctrls)
unknown_controls = by_source.get("UNKNOWN", [])
# (a) Named regulation sub-MCs
for src, ctrls in own_mc_sources:
key = source_to_key(src)
name = f"{canonical}_{key}"
_add_subgroups(plan, name, src, ctrls, args.max_mc)
# (b) Mixed small-source bucket
if mixed_controls:
_add_subgroups(plan, f"{canonical}_mixed", "mixed", mixed_controls, args.max_mc)
# (c) UNKNOWN bucket
if unknown_controls:
_add_subgroups(plan, f"{canonical}_general", "general", unknown_controls, args.max_mc)
logger.info("%d sub-groups:", len(plan["sub_groups"]))
for sg in sorted(plan["sub_groups"], key=lambda x: -len(x["controls"])):
logger.info(" %4d %s", len(sg["controls"]), sg["name"])
return plan
def _add_subgroups(plan: dict, name: str, source: str,
controls: list[dict], max_mc: int):
"""Add controls as one or more sub-groups to the plan."""
if len(controls) <= max_mc:
plan["sub_groups"].append({"name": name, "source": source, "controls": controls})
else:
clusters = subcluster_controls(controls, max_mc)
for i, cluster in enumerate(clusters):
sub_name = f"{name}_{i+1}" if len(clusters) > 1 else name
plan["sub_groups"].append({"name": sub_name, "source": source, "controls": cluster})
def _write_splits(engine, splits: list[dict]):
"""Apply split plan: delete old MCs, create new object_groups + MCs."""
with engine.begin() as c:
c.execute(text("SET search_path TO compliance, public"))
max_gid = c.execute(
text("SELECT COALESCE(MAX(group_id), 0) FROM object_groups")
).scalar()
next_gid = max_gid + 1
total_mc = 0
total_mem = 0
for sp in splits:
c.execute(text(
"DELETE FROM master_control_members "
"WHERE master_control_uuid = CAST(:u AS uuid)"
), {"u": sp["mc_uuid"]})
c.execute(text(
"DELETE FROM master_controls WHERE id = CAST(:u AS uuid)"
), {"u": sp["mc_uuid"]})
logger.info("Deleted %s (%s)", sp["mc_id"], sp["canonical"])
for sg in sp["sub_groups"]:
if not sg["controls"]:
continue
gid = next_gid
next_gid += 1
members_list = list({ctrl["control_id"] for ctrl in sg["controls"]})
c.execute(text("""
INSERT INTO object_groups
(group_id, canonical_name, member_count, members, top_controls_count)
VALUES (:gid, :name, :cnt, CAST(:members AS jsonb), 0)
"""), {"gid": gid, "name": sg["name"], "cnt": len(members_list),
"members": json.dumps(members_list)})
by_phase: dict[str, list[dict]] = defaultdict(list)
for ctrl in sg["controls"]:
by_phase[ctrl["phase"]].append(ctrl)
sorted_phases = sorted(by_phase.keys())
phase_counts = {p: len(v) for p, v in by_phase.items()}
mc_id = f"MC-{gid}"
c.execute(text("""
INSERT INTO master_controls
(master_control_id, object_group_id, canonical_name,
phases_covered, phase_control_count, total_controls)
VALUES (:mcid, :gid, :name,
CAST(:phases AS jsonb), CAST(:pcounts AS jsonb), :total)
"""), {"mcid": mc_id, "gid": gid, "name": sg["name"],
"phases": json.dumps(sorted_phases),
"pcounts": json.dumps(phase_counts),
"total": sum(phase_counts.values())})
mc_uuid = c.execute(text(
"SELECT id FROM master_controls WHERE master_control_id = :mcid"
), {"mcid": mc_id}).scalar()
for ctrl in sg["controls"]:
c.execute(text("""
INSERT INTO master_control_members
(master_control_uuid, control_uuid, phase, action)
VALUES (CAST(:mc AS uuid), CAST(:ctrl AS uuid), :phase, :action)
"""), {"mc": str(mc_uuid), "ctrl": ctrl["control_uuid"],
"phase": ctrl["phase"], "action": ctrl["action"]})
total_mem += 1
total_mc += 1
logger.info("Created %d new MCs with %d members", total_mc, total_mem)
with engine.connect() as c:
stats = c.execute(text("""
SELECT count(*), count(CASE WHEN total_controls > 200 THEN 1 END),
AVG(total_controls)::int
FROM compliance.master_controls
""")).fetchone()
logger.info("Final: %d MCs, %d still >200, avg %d controls/MC", stats[0], stats[1], stats[2])
if __name__ == "__main__":
main()
@@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""
Phase 0: Quality Audit for Master Control Assignments.
Uses Claude Sonnet to validate whether controls are correctly assigned
to their Master Controls. Samples controls from large and small MCs.
Usage:
python3 /app/scripts/gpre_quality_audit.py
python3 /app/scripts/gpre_quality_audit.py --large-sample 50 --small-sample 10
python3 /app/scripts/gpre_quality_audit.py --mc MC-8292 # single MC
"""
import argparse
import json
import logging
import os
import random
import time
from collections import defaultdict
import httpx
from sqlalchemy import create_engine, text
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("quality-audit")
DB_URL = os.getenv(
"DATABASE_URL",
"postgresql://breakpilot:breakpilot123@postgres:5432/breakpilot_db",
)
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = os.getenv("AUDIT_MODEL", "claude-sonnet-4-20250514")
ANTHROPIC_URL = "https://api.anthropic.com/v1/messages"
SYSTEM_PROMPT = """Du bist ein Compliance-Experte der prüft ob Controls korrekt zu Master Controls zugeordnet sind.
Für jeden Control beantworte:
1. MATCH: Gehört dieser Control thematisch zum Master Control Topic?
2. CONFIDENCE: Wie sicher bist du? (0.0-1.0)
3. REASON: Kurze Begründung (max 1 Satz)
4. SUGGESTED_TOPIC: Falls MATCH=false, welches Topic wäre korrekt?
Wichtige Unterscheidungen:
- "monitoring" = kontinuierliche Überwachung, Alerting, Log-Analyse
- "training" = Schulung, Awareness, Lernmaterialien
- "personal_data" = personenbezogene Daten, DSGVO-Betroffenenrechte
- "procedure" = Verfahren, Prozesse (aber NICHT wenn es spezifisch um Incidents geht)
- "incident" = Sicherheitsvorfälle, Breach Notification, Recovery
- "policy" = Richtlinien, Regelwerke, Governance-Dokumente
- "encryption" = Verschlüsselung, Kryptografie, Key Management
- "audit_logging" = Protokollierung, Audit Trail, Nachvollziehbarkeit
Antworte NUR als JSON-Array, ein Objekt pro Control."""
def call_claude(controls_batch: list[dict], mc_topic: str) -> list[dict]:
"""Send a batch of controls to Claude for validation."""
items = []
for c in controls_batch:
items.append(
f"- Control '{c['control_id']}': "
f"Titel=\"{c['title']}\", "
f"Objective=\"{c['objective'][:150]}...\", "
f"Phase={c['phase']}, Action={c['action']}"
)
prompt = (
f"Master Control Topic: \"{mc_topic}\"\n\n"
f"Prüfe diese {len(controls_batch)} Controls:\n\n"
+ "\n".join(items)
+ "\n\nAntwort als JSON-Array mit Feldern: "
"control_id, match (bool), confidence (float), reason (str), "
"suggested_topic (str, nur wenn match=false)."
)
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": ANTHROPIC_MODEL,
"max_tokens": 2048,
"temperature": 0.1,
"system": SYSTEM_PROMPT,
"messages": [{"role": "user", "content": prompt}],
}
for attempt in range(3):
try:
resp = httpx.post(
ANTHROPIC_URL,
headers=headers,
json=payload,
timeout=60.0,
)
resp.raise_for_status()
data = resp.json()
content = data.get("content", [{}])[0].get("text", "")
usage = data.get("usage", {})
# Parse JSON from response
start = content.find("[")
end = content.rfind("]") + 1
if start >= 0 and end > start:
results = json.loads(content[start:end])
return results, usage
logger.warning("No JSON array in response: %s", content[:200])
return [], usage
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
wait = 30 * (attempt + 1)
logger.warning("Rate limited, waiting %ds...", wait)
time.sleep(wait)
else:
logger.error("API error: %s", e)
return [], {}
except Exception as e:
logger.error("Request failed (attempt %d): %s", attempt + 1, e)
if attempt < 2:
time.sleep(5)
return [], {}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--large-sample", type=int, default=50,
help="Controls to sample per large MC")
parser.add_argument("--small-sample", type=int, default=10,
help="Controls to sample per small MC")
parser.add_argument("--small-mc-count", type=int, default=50,
help="Number of small MCs to audit")
parser.add_argument("--mc", type=str, default=None,
help="Audit a single MC by ID (e.g., MC-8292)")
parser.add_argument("--batch-size", type=int, default=10,
help="Controls per API call")
args = parser.parse_args()
engine = create_engine(
DB_URL, connect_args={"options": "-c search_path=compliance,public"}
)
# Load MCs to audit
with engine.connect() as c:
if args.mc:
mcs = c.execute(text("""
SELECT id, master_control_id, canonical_name, total_controls
FROM master_controls WHERE master_control_id = :mc
"""), {"mc": args.mc}).fetchall()
else:
# Large MCs (>200) + random small MCs
large = c.execute(text("""
SELECT id, master_control_id, canonical_name, total_controls
FROM master_controls WHERE total_controls > 200
ORDER BY total_controls DESC
""")).fetchall()
small = c.execute(text("""
SELECT id, master_control_id, canonical_name, total_controls
FROM master_controls WHERE total_controls BETWEEN 10 AND 200
ORDER BY RANDOM() LIMIT :cnt
"""), {"cnt": args.small_mc_count}).fetchall()
mcs = list(large) + list(small)
logger.info("Auditing %d Master Controls", len(mcs))
# Results tracking
total_checked = 0
total_match = 0
total_mismatch = 0
total_input_tokens = 0
total_output_tokens = 0
mc_results: dict[str, dict] = {}
all_mismatches: list[dict] = []
for mc_uuid, mc_id, canonical, total in mcs:
is_large = total > 200
sample_size = args.large_sample if is_large else args.small_sample
# Sample controls
with engine.connect() as c:
controls = c.execute(text("""
SELECT mcm.control_uuid, mcm.phase, mcm.action,
cc.control_id, cc.title,
COALESCE(cc.objective, '') as objective
FROM master_control_members mcm
JOIN canonical_controls cc ON cc.id = mcm.control_uuid
WHERE mcm.master_control_uuid = CAST(:mc AS uuid)
ORDER BY RANDOM()
LIMIT :n
"""), {"mc": str(mc_uuid), "n": sample_size}).fetchall()
if not controls:
continue
control_dicts = [
{"control_uuid": str(r[0]), "phase": r[1], "action": r[2],
"control_id": r[3], "title": r[4] or "", "objective": r[5] or ""}
for r in controls
]
logger.info("\n%s: %s (%d total, sampling %d)",
mc_id, canonical, total, len(control_dicts))
mc_match = 0
mc_mismatch = 0
# Process in batches
for i in range(0, len(control_dicts), args.batch_size):
batch = control_dicts[i:i + args.batch_size]
results, usage = call_claude(batch, canonical)
total_input_tokens += usage.get("input_tokens", 0)
total_output_tokens += usage.get("output_tokens", 0)
for r in results:
if r.get("match", True):
mc_match += 1
total_match += 1
else:
mc_mismatch += 1
total_mismatch += 1
mismatch = {
"mc_id": mc_id,
"mc_topic": canonical,
"control_id": r.get("control_id", "?"),
"confidence": r.get("confidence", 0),
"reason": r.get("reason", ""),
"suggested_topic": r.get("suggested_topic", ""),
}
all_mismatches.append(mismatch)
total_checked += len(results)
# Rate limit
time.sleep(1)
accuracy = mc_match / (mc_match + mc_mismatch) if (mc_match + mc_mismatch) > 0 else 1.0
mc_results[mc_id] = {
"canonical": canonical, "total": total,
"checked": mc_match + mc_mismatch,
"match": mc_match, "mismatch": mc_mismatch,
"accuracy": accuracy,
}
logger.info("%d/%d correct (%.1f%%)",
mc_match, mc_match + mc_mismatch, accuracy * 100)
# Final report
_print_report(mc_results, all_mismatches, total_checked, total_match,
total_mismatch, total_input_tokens, total_output_tokens)
def _print_report(mc_results, mismatches, checked, match, mismatch,
input_tok, output_tok):
"""Print the quality audit report."""
logger.info("\n" + "=" * 70)
logger.info("QUALITY AUDIT REPORT")
logger.info("=" * 70)
logger.info("Total controls checked: %d", checked)
logger.info("Correct assignments: %d (%.1f%%)",
match, match / max(checked, 1) * 100)
logger.info("Wrong assignments: %d (%.1f%%)",
mismatch, mismatch / max(checked, 1) * 100)
# Cost estimate
cost_input = input_tok / 1_000_000 * 3.0 # Sonnet input: $3/MTok
cost_output = output_tok / 1_000_000 * 15.0 # Sonnet output: $15/MTok
logger.info("\nAPI Usage: %d input + %d output tokens",
input_tok, output_tok)
logger.info("Estimated cost: $%.2f", cost_input + cost_output)
# Per-MC breakdown (worst first)
logger.info("\n--- Per-MC Accuracy (worst first) ---")
sorted_mcs = sorted(mc_results.values(), key=lambda x: x["accuracy"])
for mc in sorted_mcs:
flag = "" if mc["accuracy"] < 0.9 else "⚠️" if mc["accuracy"] < 0.95 else ""
logger.info(" %s %s (%s): %d/%d = %.1f%% [total: %d]",
flag, mc["canonical"][:30].ljust(30),
"large" if mc["total"] > 200 else "small",
mc["match"], mc["checked"],
mc["accuracy"] * 100, mc["total"])
# Top mismatches
if mismatches:
logger.info("\n--- Mismatches (all %d) ---", len(mismatches))
for m in sorted(mismatches, key=lambda x: -x.get("confidence", 0)):
logger.info(" %s in %s (%s) → should be '%s': %s",
m["control_id"], m["mc_id"], m["mc_topic"],
m["suggested_topic"], m["reason"])
# Size-class breakdown
large_mcs = [m for m in mc_results.values() if m["total"] > 200]
small_mcs = [m for m in mc_results.values() if m["total"] <= 200]
if large_mcs:
lg_acc = sum(m["match"] for m in large_mcs) / max(sum(m["checked"] for m in large_mcs), 1)
logger.info("\nLarge MCs (>200): %.1f%% accuracy (%d MCs)",
lg_acc * 100, len(large_mcs))
if small_mcs:
sm_acc = sum(m["match"] for m in small_mcs) / max(sum(m["checked"] for m in small_mcs), 1)
logger.info("Small MCs (≤200): %.1f%% accuracy (%d MCs)",
sm_acc * 100, len(small_mcs))
if __name__ == "__main__":
main()