diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 045f85f..1012d85 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -2319,3 +2319,135 @@ async def submit_pass0b(req: SubmitPass0bRequest): raise HTTPException(status_code=500, detail=str(e)) finally: db.close() + + +@router.get("/generate/quality-metrics") +async def get_quality_metrics( + since_hours: int = Query(default=0, description="Only count controls created in last N hours (0=all)"), +): + """Quality metrics for Pass 0b atomic controls. + + Measures: + - controls_per_obligation: ratio of controls to obligations (target: 0.2-0.4) + - duplicate_rate: controls sharing merge_key (target: <5%) + - evidence_leak_rate: controls with evidence-like titles (target: <2%) + - container_rate: controls with composite-like titles (target: <10%) + - truncated_title_rate: titles ending abruptly (target: 0%) + - mcp_coverage: controls with assertion + pass/fail criteria + """ + db = SessionLocal() + try: + time_filter = "" + if since_hours > 0: + time_filter = f"AND cc.created_at > NOW() - interval '{since_hours} hours'" + + # Total pass0b draft controls + total = db.execute(text(f""" + SELECT COUNT(*) FROM compliance.canonical_controls cc + WHERE cc.release_state = 'draft' + AND cc.generation_metadata->>'decomposition_method' = 'pass0b' + {time_filter} + """)).scalar() or 0 + + # Total validated obligations + total_obligations = db.execute(text(""" + SELECT COUNT(*) FROM compliance.obligation_candidates + WHERE release_state = 'validated' + """)).scalar() or 0 + + if total == 0: + return { + "total_controls": 0, + "total_obligations": total_obligations, + "message": "No pass0b controls found", + } + + # Controls per obligation + controls_per_obligation = round(total / max(total_obligations, 1), 3) + + # Duplicate rate: merge_keys that appear >1 time + dup_rows = db.execute(text(f""" + SELECT COUNT(*) as dup_count FROM ( + SELECT generation_metadata->>'merge_group_hint' as mk, COUNT(*) as cnt + FROM compliance.canonical_controls cc + WHERE cc.release_state = 'draft' + AND cc.generation_metadata->>'decomposition_method' = 'pass0b' + AND generation_metadata->>'merge_group_hint' IS NOT NULL + AND generation_metadata->>'merge_group_hint' != '' + {time_filter} + GROUP BY mk + HAVING COUNT(*) > 1 + ) sub + """)).scalar() or 0 + + total_with_mk = db.execute(text(f""" + SELECT COUNT(DISTINCT generation_metadata->>'merge_group_hint') + FROM compliance.canonical_controls cc + WHERE cc.release_state = 'draft' + AND cc.generation_metadata->>'decomposition_method' = 'pass0b' + AND generation_metadata->>'merge_group_hint' IS NOT NULL + AND generation_metadata->>'merge_group_hint' != '' + {time_filter} + """)).scalar() or 0 + + duplicate_rate = round(dup_rows / max(total_with_mk, 1) * 100, 1) + + # Evidence leak rate: controls that look like evidence + evidence_keywords = ("nachweis", "screenshot", "export", "zertifizierung", + "auditbericht", "prüfbericht", "protokoll") + evidence_count = 0 + rows = db.execute(text(f""" + SELECT title FROM compliance.canonical_controls cc + WHERE cc.release_state = 'draft' + AND cc.generation_metadata->>'decomposition_method' = 'pass0b' + {time_filter} + """)).fetchall() + for row in rows: + title_lower = (row[0] or "").lower() + if any(kw in title_lower for kw in evidence_keywords): + evidence_count += 1 + evidence_leak_rate = round(evidence_count / max(total, 1) * 100, 1) + + # Truncated title rate: titles ending mid-word (heuristic) + truncated_count = 0 + for row in rows: + title = (row[0] or "").strip() + if title and len(title) >= 75: + # Likely truncated if it's close to max and doesn't end with a word boundary + if not title[-1] in ".!?)\"'": + truncated_count += 1 + truncated_title_rate = round(truncated_count / max(total, 1) * 100, 1) + + # MCP coverage + mcp_count = db.execute(text(f""" + SELECT COUNT(*) FROM compliance.canonical_controls cc + WHERE cc.release_state = 'draft' + AND cc.generation_metadata->>'decomposition_method' = 'pass0b' + AND cc.generation_metadata->>'assertion' IS NOT NULL + AND cc.generation_metadata->>'assertion' != '' + {time_filter} + """)).scalar() or 0 + mcp_coverage = round(mcp_count / max(total, 1) * 100, 1) + + # Merge key coverage + mk_coverage = round(total_with_mk / max(total, 1) * 100, 1) if total_with_mk else 0 + + return { + "total_controls": total, + "total_obligations": total_obligations, + "controls_per_obligation": controls_per_obligation, + "duplicate_merge_key_rate": f"{duplicate_rate}%", + "evidence_leak_rate": f"{evidence_leak_rate}%", + "truncated_title_rate": f"{truncated_title_rate}%", + "mcp_coverage": f"{mcp_coverage}%", + "merge_key_coverage": f"{mk_coverage}%", + "targets": { + "controls_per_obligation": "0.2-0.4", + "duplicate_rate": "<5%", + "evidence_leak_rate": "<2%", + "truncated_title_rate": "0%", + "mcp_coverage": "100%", + }, + } + finally: + db.close()