diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 0fa23a8..a5f1c49 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -2321,6 +2321,238 @@ async def submit_pass0b(req: SubmitPass0bRequest): db.close() +# ============================================================================= +# HAIKU BACKFILL: Extended Fields (applicability, check_type, scanner_hint, etc.) +# ============================================================================= + +_BACKFILL_SYSTEM_PROMPT = """\ +Du bist ein Security-Compliance-Experte. Du erhaeltst bestehende atomare Controls \ +und ergaenzt fehlende Felder. Aendere NIEMALS bestehende Felder (title, assertion, \ +objective, requirements, evidence, severity, category). + +Ergaenze NUR diese 6 Felder: + +1. applicability: Unter welchen Bedingungen gilt dieses Control? + Universell: {} + Bedingt: {"field": "context.SIGNAL", "op": "==", "value": true} + Zusammengesetzt: {"operator": "AND", "clauses": [{...}, {...}]} + Typische Felder: context.uses_oauth, context.has_public_api, + context.processes_personal_data, context.uses_ai_system, + context.has_employees, context.sells_online, context.uses_encryption + +2. check_type: EINEN der 10 Werte: + technical_config_check, code_pattern_check, runtime_security_test, + document_policy_check, document_classification_check, document_contract_check, + evidence_artifact_check, process_verification, training_verification, interview_assessment + +3. scanner_hint: {"search_terms": [...], "negative_indicators": [...]} + +4. manual_review_required_if: ["Bedingung 1", "Bedingung 2"] + +5. evidence_type: code|process|hybrid + +6. provides_context: ["context.VARIABLE", ...] oder [] + +Antworte als JSON-Objekt mit Control-ID als Key.""" + + +class BackfillExtendedRequest(BaseModel): + limit: int = 0 + batch_size: int = 10 + dry_run: bool = True + + +_backfill_extended_status: dict = {} + + +async def _run_backfill_extended(req: BackfillExtendedRequest, job_id: str): + """Run extended field backfill via Haiku Batch API.""" + from services.decomposition_pass import ( + create_anthropic_batch, fetch_batch_results, check_batch_status, + ) + db = SessionLocal() + try: + _backfill_extended_status[job_id] = {"status": "loading_controls"} + + query = """ + SELECT id::text, control_id, title, + generation_metadata->>'assertion' as assertion, + objective, category, severity, + generation_metadata->>'merge_group_hint' as merge_key + FROM canonical_controls + WHERE release_state = 'draft' + AND generation_metadata->>'decomposition_method' = 'pass0b' + AND (generation_metadata->>'applicability' IS NULL + OR generation_metadata->>'applicability' = '' + OR generation_metadata->>'applicability' = '{}') + """ + if req.limit > 0: + query += f" LIMIT {req.limit}" + + rows = db.execute(text(query)).fetchall() + total = len(rows) + _backfill_extended_status[job_id] = { + "status": "preparing_batches", "total_controls": total, + } + + if total == 0: + _backfill_extended_status[job_id] = { + "status": "completed", "total_controls": 0, "message": "Nothing to backfill", + } + return + + if req.dry_run: + _backfill_extended_status[job_id] = { + "status": "dry_run_complete", "total_controls": total, + "estimated_requests": (total + req.batch_size - 1) // req.batch_size, + } + return + + # Build batch requests + requests = [] + for i in range(0, total, req.batch_size): + batch = rows[i:i + req.batch_size] + controls_text = "" + for r in batch: + controls_text += f""" +Control-ID: {r[1]} +Titel: {r[2]} +Assertion: {r[3] or ''} +Objective: {r[4] or ''} +Kategorie: {r[5] or ''} +Severity: {r[6] or ''} +Merge-Key: {r[7] or ''} +--- +""" + prompt = f"Ergaenze die fehlenden 6 Felder fuer diese {len(batch)} Controls:\n{controls_text}" + + batch_idx = i // req.batch_size + requests.append({ + "custom_id": f"bf_ext_b{batch_idx:05d}", + "params": { + "model": "claude-haiku-4-5-20251001", + "max_tokens": max(2048, len(batch) * 400), + "system": [ + { + "type": "text", + "text": _BACKFILL_SYSTEM_PROMPT, + "cache_control": {"type": "ephemeral"}, + } + ], + "messages": [{"role": "user", "content": prompt}], + }, + }) + + _backfill_extended_status[job_id] = { + "status": "submitting_batch", "total_controls": total, + "total_requests": len(requests), + } + + batch_result = await create_anthropic_batch(requests) + batch_id = batch_result.get("id", "") + + _backfill_extended_status[job_id] = { + "status": "batch_submitted", "batch_id": batch_id, + "total_controls": total, "total_requests": len(requests), + } + + # Poll for completion + import asyncio + for _ in range(360): # max 1 hour + await asyncio.sleep(10) + status = await check_batch_status(batch_id) + if status.get("processing_status") == "ended": + break + + # Process results + results = await fetch_batch_results(batch_id) + updated = 0 + errors = 0 + + for result in results: + result_data = result.get("result", {}) + if result_data.get("type") != "succeeded": + errors += 1 + continue + + message = result_data.get("message", {}) + content = message.get("content", []) + text_content = content[0].get("text", "") if content else "" + + try: + import json as json_mod + # Try to parse JSON object + parsed = json_mod.loads(text_content) if text_content.strip().startswith("{") else {} + if not parsed: + # Try extracting JSON from markdown + import re + json_match = re.search(r'\{[\s\S]*\}', text_content) + if json_match: + parsed = json_mod.loads(json_match.group()) + + for control_id, fields in parsed.items(): + if not isinstance(fields, dict): + continue + # Merge into generation_metadata + db.execute(text(""" + UPDATE canonical_controls + SET generation_metadata = generation_metadata || CAST(:new_fields AS jsonb), + updated_at = NOW() + WHERE control_id = :cid + AND release_state = 'draft' + """), { + "cid": control_id, + "new_fields": json_mod.dumps({ + "applicability": fields.get("applicability", {}), + "check_type": fields.get("check_type", ""), + "scanner_hint": fields.get("scanner_hint", {}), + "manual_review_required_if": fields.get("manual_review_required_if", []), + "evidence_type": fields.get("evidence_type", ""), + "provides_context": fields.get("provides_context", []), + }), + }) + updated += 1 + db.commit() + except Exception as e: + logger.error("Backfill parse error: %s", e) + errors += 1 + try: + db.rollback() + except Exception: + pass + + _backfill_extended_status[job_id] = { + "status": "completed", "batch_id": batch_id, + "total_controls": total, "updated": updated, "errors": errors, + } + except Exception as e: + logger.error("Backfill extended %s failed: %s", job_id, e) + _backfill_extended_status[job_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/backfill-extended") +async def start_backfill_extended(req: BackfillExtendedRequest): + """Backfill extended fields (applicability, scanner_hint, etc.) via Haiku Batch API.""" + import uuid as uuid_mod + job_id = str(uuid_mod.uuid4())[:8] + _backfill_extended_status[job_id] = {"status": "starting"} + asyncio.create_task(_run_backfill_extended(req, job_id)) + return { + "status": "running", "job_id": job_id, + "message": f"Backfill started. Poll /generate/backfill-extended-status/{job_id}", + } + + +@router.get("/generate/backfill-extended-status/{job_id}") +async def get_backfill_extended_status(job_id: str): + status = _backfill_extended_status.get(job_id) + if not status: + raise HTTPException(status_code=404, detail="Backfill job not found") + return status + + @router.get("/generate/quality-metrics") async def get_quality_metrics( since_hours: int = Query(default=0, description="Only count controls created in last N hours (0=all)"), diff --git a/control-pipeline/services/decomposition_pass.py b/control-pipeline/services/decomposition_pass.py index 3dede6a..c740298 100644 --- a/control-pipeline/services/decomposition_pass.py +++ b/control-pipeline/services/decomposition_pass.py @@ -223,6 +223,12 @@ class AtomicControlCandidate: # Dependency Engine Felder dependency_hints: list = field(default_factory=list) lifecycle_phase_order: int = 0 + # Erweiterte Felder (v4) + applicability: dict = field(default_factory=dict) + scanner_hint: dict = field(default_factory=dict) + manual_review_required_if: list = field(default_factory=list) + evidence_type: str = "" + provides_context: list = field(default_factory=list) def to_dict(self) -> dict: return { @@ -243,6 +249,11 @@ class AtomicControlCandidate: "check_type": self.check_type, "dependency_hints": self.dependency_hints, "lifecycle_phase_order": self.lifecycle_phase_order, + "applicability": self.applicability, + "scanner_hint": self.scanner_hint, + "manual_review_required_if": self.manual_review_required_if, + "evidence_type": self.evidence_type, + "provides_context": self.provides_context, } @@ -457,6 +468,23 @@ WICHTIGE REGELN: - "test:authentication_mechanism:testing" - "report:supervisory_authority:reporting" +8. APPLICABILITY + SCANNER: Bestimme fuer jedes Control: + - applicability: Unter welchen Bedingungen gilt dieses Control? + Wenn universell anwendbar: leeres Objekt {} + Sonst: {"field": "context.SIGNAL", "op": "==", "value": true} + Zusammengesetzt: {"operator": "AND", "clauses": [{...}, {...}]} + Typische Felder: context.uses_oauth, context.has_public_api, + context.processes_personal_data, context.uses_ai_system, + context.has_employees, context.sells_online, context.uses_encryption, + context.has_third_party_components, context.is_critical_infrastructure + - check_type: Praeziser Prueftyp (EINEN der 10 Werte waehlen) + - scanner_hint: Technische Suchbegriffe fuer automatisierte Pruefung + + negative_indicators die auf Nicht-Einhaltung hindeuten + - manual_review_required_if: Wann manuelle Pruefung statt Scanner noetig + - evidence_type: code (technisch pruefbar), process (organisatorisch), hybrid + - provides_context: Welche Context-Variablen erzeugt dieses Control bei Pruefung? + Beispiel: Ein Control "OAuth-Clients klassifizieren" liefert context.oauth_client_types + Das Control muss UMSETZBAR sein — keine Gesetzesparaphrase. Antworte NUR als JSON. Keine Erklaerungen.""" @@ -2137,10 +2165,15 @@ Antworte als JSON: "fail_criteria": ["Wann gilt dieses Control als nicht erfuellt?"], "severity": "critical|high|medium|low", "category": "security|privacy|governance|operations|finance|reporting", - "check_type": "technical_config_check|document_clause_check|code_pattern_check|evidence_check|interview_required", + "check_type": "technical_config_check|code_pattern_check|runtime_security_test|document_policy_check|document_classification_check|document_contract_check|evidence_artifact_check|process_verification|training_verification|interview_assessment", "merge_key": "action_type:normalized_object:control_phase", "dependency_hints": ["dependency_type:action_type:normalized_object (Voraussetzungen, Ersetzungen, Kompensationen)"], - "lifecycle_phase_order": "1-13 (1=scope, 2=definition, 4=implementation, 7=monitoring, 8=testing, 12=reporting)" + "lifecycle_phase_order": "1-13 (1=scope, 2=definition, 4=implementation, 7=monitoring, 8=testing, 12=reporting)", + "applicability": {{}}, + "scanner_hint": {{"search_terms": ["technischer Suchbegriff"], "negative_indicators": ["Negativindikator"]}}, + "manual_review_required_if": ["Bedingung fuer manuelle Pruefung"], + "evidence_type": "code|process|hybrid", + "provides_context": ["context.VARIABLE die dieses Control bei Pruefung erzeugt"] }}""" @@ -2235,10 +2268,15 @@ Jedes Control hat dieses Format: "fail_criteria": ["Wann gilt dieses Control als nicht erfuellt?"], "severity": "critical|high|medium|low", "category": "security|privacy|governance|operations|finance|reporting", - "check_type": "technical_config_check|document_clause_check|code_pattern_check|evidence_check|interview_required", + "check_type": "technical_config_check|code_pattern_check|runtime_security_test|document_policy_check|document_classification_check|document_contract_check|evidence_artifact_check|process_verification|training_verification|interview_assessment", "merge_key": "action_type:normalized_object:control_phase", "dependency_hints": ["dependency_type:action_type:normalized_object (Voraussetzungen, Ersetzungen, Kompensationen)"], - "lifecycle_phase_order": "1-13 (1=scope, 2=definition, 4=implementation, 7=monitoring, 8=testing, 12=reporting)" + "lifecycle_phase_order": "1-13 (1=scope, 2=definition, 4=implementation, 7=monitoring, 8=testing, 12=reporting)", + "applicability": {{}}, + "scanner_hint": {{"search_terms": ["technischer Suchbegriff"], "negative_indicators": ["Negativindikator"]}}, + "manual_review_required_if": ["Bedingung fuer manuelle Pruefung"], + "evidence_type": "code|process|hybrid", + "provides_context": ["context.VARIABLE die dieses Control bei Pruefung erzeugt"] }}""" @@ -2982,6 +3020,11 @@ class DecompositionPass: check_type=parsed.get("check_type", ""), dependency_hints=_ensure_list(parsed.get("dependency_hints", [])), lifecycle_phase_order=int(parsed.get("lifecycle_phase_order", 0) or 0), + applicability=parsed.get("applicability") or {}, + scanner_hint=parsed.get("scanner_hint") or {}, + manual_review_required_if=_ensure_list(parsed.get("manual_review_required_if", [])), + evidence_type=parsed.get("evidence_type", ""), + provides_context=_ensure_list(parsed.get("provides_context", [])), ) # Store merge_key from LLM output in metadata llm_merge_key = parsed.get("merge_key", "") @@ -3458,6 +3501,12 @@ class DecompositionPass: # Dependency Engine Felder "dependency_hints": atomic.dependency_hints or [], "lifecycle_phase_order": atomic.lifecycle_phase_order or 0, + # Erweiterte Felder (v4) + "applicability": atomic.applicability or {}, + "scanner_hint": atomic.scanner_hint or {}, + "manual_review_required_if": atomic.manual_review_required_if or [], + "evidence_type": atomic.evidence_type or "", + "provides_context": atomic.provides_context or [], }), "framework_id": "14b1bdd2-abc7-4a43-adae-14471ee5c7cf", },