diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index ebafb54..4770462 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -985,6 +985,216 @@ async def get_domain_backfill_status(backfill_id: str): return status +# ============================================================================= +# APPLICABILITY BACKFILL — Industry, company size, scope conditions +# ============================================================================= + +class ApplicabilityBackfillRequest(BaseModel): + dry_run: bool = True + limit: int = 0 # 0 = all + batch_size: int = 10 + + +_applicability_backfill_status: dict = {} + + +async def _run_applicability_backfill(req: ApplicabilityBackfillRequest, backfill_id: str): + """Backfill applicable_industries, applicable_company_size, scope_conditions.""" + import os + import httpx + + ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") + ANTHROPIC_MODEL = os.getenv("CONTROL_GEN_ANTHROPIC_MODEL", "claude-sonnet-4-6") + + if not ANTHROPIC_API_KEY: + _applicability_backfill_status[backfill_id] = { + "status": "failed", "error": "ANTHROPIC_API_KEY not set" + } + return + + db = SessionLocal() + try: + limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else "" + rows = db.execute(text(f""" + SELECT id, control_id, title, objective, category, + source_citation->>'source' as source_name, + source_citation->>'regulation_code' as regulation_code + FROM compliance.canonical_controls + WHERE release_state = 'draft' + AND (applicable_industries IS NULL OR applicable_industries::text = 'null') + ORDER BY control_id + {limit_clause} + """)).fetchall() + + total = len(rows) + updated = 0 + errors = [] + + _applicability_backfill_status[backfill_id] = { + "status": "running", "total": total, "updated": 0, + "dry_run": req.dry_run, "errors": [], + } + + BATCH_SIZE = req.batch_size + for batch_start in range(0, total, BATCH_SIZE): + batch = rows[batch_start:batch_start + BATCH_SIZE] + + entries = [] + for idx, row in enumerate(batch): + entries.append( + f"--- CONTROL {idx + 1}: {row.control_id} ---\n" + f"Titel: {row.title or ''}\n" + f"Objective: {(row.objective or '')[:400]}\n" + f"Kategorie: {row.category or ''}\n" + f"Quelle: {row.source_name or ''} ({row.regulation_code or ''})" + ) + + prompt = f"""Analysiere die folgenden {len(batch)} Compliance-Controls und bestimme fuer jedes: + +1. applicable_industries: Liste der Branchen fuer die dieser Control relevant ist. + Verwende "all" wenn der Control branchenuebergreifend gilt. + Moegliche Werte: "all", "Technologie/IT", "Finanzdienstleistungen", "Versicherungen", + "Gesundheitswesen", "Pharma", "Telekommunikation", "Energie", "Produktion/Industrie", + "Logistik/Transport", "E-Commerce/Handel", "Oeffentlicher Dienst", "Bildung", + "Beratung/Consulting", "Immobilien", "Bau", "Automobil", "Maschinenbau", + "Luft-/Raumfahrt", "Medien/Verlage", "Gastronomie/Hotellerie", "Recht/Kanzlei", + "Agrar", "Chemie", "Verteidigung" + Beispiel DSGVO: ["all"], Beispiel TKG: ["Telekommunikation"], Beispiel NIS2: ["Energie", "Gesundheitswesen", "Transport", "Finanzdienstleistungen"] + +2. applicable_company_size: Ab welcher Unternehmensgroesse gilt dieser Control? + Verwende "all" wenn keine Groessenbeschraenkung. + Moegliche Werte: "all", "micro", "small", "medium", "large", "enterprise" + Beispiel NIS2 Art.21: ["medium", "large", "enterprise"], Beispiel DSGVO Art.5: ["all"] + +3. scope_conditions: Optionale Bedingungen. Null wenn keine besonderen Bedingungen. + Sonst JSON-Objekt mit requires_any und description. + Moegliche Signale: "uses_ai", "third_country_transfer", "processes_health_data", + "processes_minors_data", "automated_decisions", "employee_monitoring", + "video_surveillance", "financial_data", "is_kritis_operator", "payment_services" + Beispiel AI Act: {{"requires_any": ["uses_ai"], "description": "Nur bei KI-Einsatz"}} + Beispiel DSGVO Art.32: null (gilt immer) + +Antworte mit einem JSON-Array. Jedes Objekt hat: +- control_index: 1-basierter Index +- applicable_industries: Liste +- applicable_company_size: Liste +- scope_conditions: Objekt oder null + +{chr(10).join(entries)}""" + + try: + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload = { + "model": ANTHROPIC_MODEL, + "max_tokens": 4096, + "system": "Du bist ein Compliance-Experte. Klassifiziere Controls nach Branche, Unternehmensgroesse und Scope-Bedingungen. Antworte NUR mit validem JSON.", + "messages": [{"role": "user", "content": prompt}], + } + + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + "https://api.anthropic.com/v1/messages", + headers=headers, + json=payload, + ) + + if resp.status_code != 200: + errors.append(f"Batch {batch_start}: API {resp.status_code}") + continue + + content = resp.json().get("content", [{}])[0].get("text", "") + parsed = _parse_llm_json(content) + if not parsed: + errors.append(f"Batch {batch_start}: JSON parse failed") + continue + + items = parsed if isinstance(parsed, list) else [parsed] + + for item in items: + idx = item.get("control_index", 0) - 1 + if idx < 0 or idx >= len(batch): + continue + + row = batch[idx] + industries = item.get("applicable_industries", ["all"]) + company_size = item.get("applicable_company_size", ["all"]) + scope = item.get("scope_conditions") + + if not req.dry_run: + db.execute(text(""" + UPDATE compliance.canonical_controls + SET applicable_industries = CAST(:ind AS jsonb), + applicable_company_size = CAST(:size AS jsonb), + scope_conditions = CAST(:scope AS jsonb), + updated_at = NOW() + WHERE id = CAST(:cid AS uuid) + """), { + "ind": json.dumps(industries), + "size": json.dumps(company_size), + "scope": json.dumps(scope) if scope else None, + "cid": str(row.id), + }) + + updated += 1 + + if not req.dry_run: + db.commit() + + except Exception as e: + errors.append(f"Batch {batch_start}: {str(e)[:200]}") + logger.warning("Applicability backfill batch %d error: %s", batch_start, e) + db.rollback() + + _applicability_backfill_status[backfill_id] = { + "status": "running", "total": total, "updated": updated, + "progress": f"{min(batch_start + BATCH_SIZE, total)}/{total}", + "dry_run": req.dry_run, "errors": errors[-10:], + } + + _applicability_backfill_status[backfill_id] = { + "status": "completed", "total": total, "updated": updated, + "dry_run": req.dry_run, "errors": errors[-50:], + } + logger.info("Applicability backfill %s completed: %d/%d updated", + backfill_id, updated, total) + + except Exception as e: + logger.error("Applicability backfill %s failed: %s", backfill_id, e) + _applicability_backfill_status[backfill_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/backfill-applicability") +async def start_applicability_backfill(req: ApplicabilityBackfillRequest): + """Backfill applicable_industries, applicable_company_size, scope_conditions + for controls missing these fields. Uses Anthropic API. + Default is dry_run=True (preview only). + """ + import uuid + backfill_id = str(uuid.uuid4())[:8] + _applicability_backfill_status[backfill_id] = {"status": "starting"} + asyncio.create_task(_run_applicability_backfill(req, backfill_id)) + return { + "status": "running", + "backfill_id": backfill_id, + "message": f"Applicability backfill started. Poll /generate/applicability-backfill-status/{backfill_id}", + } + + +@router.get("/generate/applicability-backfill-status/{backfill_id}") +async def get_applicability_backfill_status(backfill_id: str): + """Get status of an applicability backfill job.""" + status = _applicability_backfill_status.get(backfill_id) + if not status: + raise HTTPException(status_code=404, detail="Applicability backfill job not found") + return status + + # --------------------------------------------------------------------------- # Source-Type Backfill — Classify law vs guideline vs standard vs restricted # ---------------------------------------------------------------------------