diff --git a/control-pipeline/services/control_generator.py b/control-pipeline/services/control_generator.py index f1ac8d1..1f99ca1 100644 --- a/control-pipeline/services/control_generator.py +++ b/control-pipeline/services/control_generator.py @@ -544,6 +544,8 @@ class GeneratorResult: controls_too_close: int = 0 controls_duplicates_found: int = 0 controls_qa_fixed: int = 0 + controls_stored: int = 0 # Actually persisted to DB + controls_store_failed: int = 0 # Generated but failed to persist chunks_skipped_prefilter: int = 0 errors: list = field(default_factory=list) controls: list = field(default_factory=list) @@ -645,6 +647,13 @@ async def _llm_anthropic(prompt: str, system_prompt: Optional[str] = None, max_r json=payload, ) if resp.status_code != 200: + # Retry on transient HTTP errors + if resp.status_code in (429, 500, 502, 503, 504) and attempt < max_retries: + wait = 2 ** attempt + logger.warning("Anthropic API %d (transient) — retry in %ds...", resp.status_code, wait) + import asyncio + await asyncio.sleep(wait) + continue logger.error("Anthropic API %d: %s", resp.status_code, resp.text[:300]) return "" data = resp.json() @@ -1732,20 +1741,52 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne ) def _generate_control_id(self, domain: str, db: Session) -> str: - """Generate next sequential control ID like AUTH-011.""" + """Generate unique control ID using numeric MAX + collision guard. + + Uses CAST to INTEGER for correct numeric ordering (not string sort). + Falls back to UUID suffix if collision is detected. + """ prefix = domain.upper()[:4] try: + # Numeric ordering — CAST to INTEGER, not string sort result = db.execute( - text("SELECT control_id FROM canonical_controls WHERE control_id LIKE :prefix ORDER BY control_id DESC LIMIT 1"), - {"prefix": f"{prefix}-%"}, + text(""" + SELECT COALESCE( + MAX(CAST(SUBSTRING(control_id FROM :prefix_len) AS INTEGER)), + 0 + ) + 1 + FROM canonical_controls + WHERE control_id ~ (:pattern) + """), + {"prefix_len": len(prefix) + 2, "pattern": f"^{prefix}-[0-9]+$"}, ) - row = result.fetchone() - if row: - last_num = int(row[0].split("-")[-1]) - return f"{prefix}-{last_num + 1:03d}" - except Exception: - pass - return f"{prefix}-001" + next_num = result.scalar() or 1 + candidate = f"{prefix}-{next_num:03d}" + + # Collision guard — check if ID already exists + exists = db.execute( + text("SELECT 1 FROM canonical_controls WHERE control_id = :cid LIMIT 1"), + {"cid": candidate}, + ).fetchone() + + if exists: + # UUID suffix as fallback for race conditions + suffix = uuid.uuid4().hex[:6] + candidate = f"{prefix}-{next_num:03d}-{suffix}" + logger.warning( + "ID collision for %s-%03d — using unique suffix: %s", + prefix, next_num, candidate, + ) + + return candidate + except Exception as e: + # NEVER swallow silently — UUID as safe fallback + fallback = f"{prefix}-{uuid.uuid4().hex[:8]}" + logger.error( + "Failed to generate control_id for domain %s: %s — using fallback %s", + domain, e, fallback, + ) + return fallback # ── Stage QA: Automated Quality Validation ─────────────────────── @@ -1929,7 +1970,11 @@ Kategorien: {CATEGORY_LIST_STR}""" :target_audience, :pipeline_version, :applicable_industries, :applicable_company_size, :scope_conditions ) - ON CONFLICT (framework_id, control_id) DO NOTHING + ON CONFLICT (framework_id, control_id) DO UPDATE SET + updated_at = NOW(), + title = EXCLUDED.title, + objective = EXCLUDED.objective, + generation_metadata = EXCLUDED.generation_metadata RETURNING id """), { @@ -2169,12 +2214,21 @@ Kategorien: {CATEGORY_LIST_STR}""" if ctrl_uuid: path = control.generation_metadata.get("processing_path", "structured_batch") self._mark_chunk_processed(chunk, lic_info, path, [ctrl_uuid], job_id) + result.controls_generated += 1 + result.controls_stored += 1 + controls_count += 1 else: - self._mark_chunk_processed(chunk, lic_info, "store_failed", [], job_id) + # CRITICAL FIX: Do NOT mark chunk as processed — allow retry + logger.error( + "STORE_FAILED: Control '%s' (%s) nicht gespeichert — Chunk bleibt unverarbeitet fuer Retry", + control.control_id, control.title[:60], + ) + result.controls_store_failed += 1 + else: + result.controls_generated += 1 + controls_count += 1 - result.controls_generated += 1 result.controls.append(asdict(control)) - controls_count += 1 if self._existing_controls is not None: self._existing_controls.append({ @@ -2187,10 +2241,18 @@ Kategorien: {CATEGORY_LIST_STR}""" try: # Progress logging every 50 chunks if i > 0 and i % 50 == 0: + store_rate = (result.controls_stored / max(result.controls_generated, 1)) * 100 if result.controls_generated > 0 else 100 logger.info( - "Progress: %d/%d chunks processed, %d controls generated, %d skipped by prefilter", - i, len(chunks), controls_count, chunks_skipped_prefilter, + "Progress: %d/%d chunks | %d generated | %d stored (%.0f%%) | %d store_failed | %d skipped", + i, len(chunks), result.controls_generated, result.controls_stored, + store_rate, result.controls_store_failed, chunks_skipped_prefilter, ) + # ALARM bei niedriger Store-Rate + if result.controls_generated > 10 and store_rate < 80: + logger.error( + "ALARM: Store-Erfolgsrate nur %.0f%% — moeglicherweise ID-Kollisionen!", + store_rate, + ) self._update_job(job_id, result) # Stage 1.5: Local LLM pre-filter — skip chunks without requirements @@ -2235,11 +2297,38 @@ Kategorien: {CATEGORY_LIST_STR}""" await _flush_batch() result.chunks_skipped_prefilter = chunks_skipped_prefilter + + # Post-Job Validierung — DB-Realitaet pruefen + try: + actual_stored = self.db.execute( + text("SELECT count(*) FROM canonical_controls WHERE generation_metadata::text LIKE :jid"), + {"jid": f"%{job_id}%"}, + ).scalar() or 0 + except Exception: + actual_stored = -1 + + final_store_rate = (result.controls_stored / max(result.controls_generated, 1)) * 100 if result.controls_generated > 0 else 0 + logger.info( - "Pipeline complete: %d controls generated, %d chunks skipped by prefilter, %d total chunks", - controls_count, chunks_skipped_prefilter, len(chunks), + "Pipeline complete: %d chunks | %d generated | %d stored (%.0f%%) | %d store_failed | %d skipped | DB actual: %d", + len(chunks), result.controls_generated, result.controls_stored, + final_store_rate, result.controls_store_failed, + chunks_skipped_prefilter, actual_stored, ) + if result.controls_store_failed > 0: + logger.error( + "WARNUNG: %d Controls konnten NICHT gespeichert werden! " + "Diese Chunks bleiben unverarbeitet und muessen erneut verarbeitet werden.", + result.controls_store_failed, + ) + + if result.controls_generated > 0 and final_store_rate < 90: + logger.error( + "KRITISCH: Store-Rate nur %.0f%% — %d von %d Controls verloren!", + final_store_rate, result.controls_store_failed, result.controls_generated, + ) + result.status = "completed" except Exception as e: