feat(pipeline): add quality metrics endpoint for Pass 0b controls

GET /generate/quality-metrics — reports:
- controls_per_obligation ratio
- duplicate merge_key rate
- evidence leak rate
- truncated title rate
- MCP field coverage
- merge_key coverage

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-26 09:51:27 +02:00
parent d660a45bb5
commit d583971afd

View File

@@ -2319,3 +2319,135 @@ async def submit_pass0b(req: SubmitPass0bRequest):
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
finally: finally:
db.close() db.close()
@router.get("/generate/quality-metrics")
async def get_quality_metrics(
since_hours: int = Query(default=0, description="Only count controls created in last N hours (0=all)"),
):
"""Quality metrics for Pass 0b atomic controls.
Measures:
- controls_per_obligation: ratio of controls to obligations (target: 0.2-0.4)
- duplicate_rate: controls sharing merge_key (target: <5%)
- evidence_leak_rate: controls with evidence-like titles (target: <2%)
- container_rate: controls with composite-like titles (target: <10%)
- truncated_title_rate: titles ending abruptly (target: 0%)
- mcp_coverage: controls with assertion + pass/fail criteria
"""
db = SessionLocal()
try:
time_filter = ""
if since_hours > 0:
time_filter = f"AND cc.created_at > NOW() - interval '{since_hours} hours'"
# Total pass0b draft controls
total = db.execute(text(f"""
SELECT COUNT(*) FROM compliance.canonical_controls cc
WHERE cc.release_state = 'draft'
AND cc.generation_metadata->>'decomposition_method' = 'pass0b'
{time_filter}
""")).scalar() or 0
# Total validated obligations
total_obligations = db.execute(text("""
SELECT COUNT(*) FROM compliance.obligation_candidates
WHERE release_state = 'validated'
""")).scalar() or 0
if total == 0:
return {
"total_controls": 0,
"total_obligations": total_obligations,
"message": "No pass0b controls found",
}
# Controls per obligation
controls_per_obligation = round(total / max(total_obligations, 1), 3)
# Duplicate rate: merge_keys that appear >1 time
dup_rows = db.execute(text(f"""
SELECT COUNT(*) as dup_count FROM (
SELECT generation_metadata->>'merge_group_hint' as mk, COUNT(*) as cnt
FROM compliance.canonical_controls cc
WHERE cc.release_state = 'draft'
AND cc.generation_metadata->>'decomposition_method' = 'pass0b'
AND generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
{time_filter}
GROUP BY mk
HAVING COUNT(*) > 1
) sub
""")).scalar() or 0
total_with_mk = db.execute(text(f"""
SELECT COUNT(DISTINCT generation_metadata->>'merge_group_hint')
FROM compliance.canonical_controls cc
WHERE cc.release_state = 'draft'
AND cc.generation_metadata->>'decomposition_method' = 'pass0b'
AND generation_metadata->>'merge_group_hint' IS NOT NULL
AND generation_metadata->>'merge_group_hint' != ''
{time_filter}
""")).scalar() or 0
duplicate_rate = round(dup_rows / max(total_with_mk, 1) * 100, 1)
# Evidence leak rate: controls that look like evidence
evidence_keywords = ("nachweis", "screenshot", "export", "zertifizierung",
"auditbericht", "prüfbericht", "protokoll")
evidence_count = 0
rows = db.execute(text(f"""
SELECT title FROM compliance.canonical_controls cc
WHERE cc.release_state = 'draft'
AND cc.generation_metadata->>'decomposition_method' = 'pass0b'
{time_filter}
""")).fetchall()
for row in rows:
title_lower = (row[0] or "").lower()
if any(kw in title_lower for kw in evidence_keywords):
evidence_count += 1
evidence_leak_rate = round(evidence_count / max(total, 1) * 100, 1)
# Truncated title rate: titles ending mid-word (heuristic)
truncated_count = 0
for row in rows:
title = (row[0] or "").strip()
if title and len(title) >= 75:
# Likely truncated if it's close to max and doesn't end with a word boundary
if not title[-1] in ".!?)\"'":
truncated_count += 1
truncated_title_rate = round(truncated_count / max(total, 1) * 100, 1)
# MCP coverage
mcp_count = db.execute(text(f"""
SELECT COUNT(*) FROM compliance.canonical_controls cc
WHERE cc.release_state = 'draft'
AND cc.generation_metadata->>'decomposition_method' = 'pass0b'
AND cc.generation_metadata->>'assertion' IS NOT NULL
AND cc.generation_metadata->>'assertion' != ''
{time_filter}
""")).scalar() or 0
mcp_coverage = round(mcp_count / max(total, 1) * 100, 1)
# Merge key coverage
mk_coverage = round(total_with_mk / max(total, 1) * 100, 1) if total_with_mk else 0
return {
"total_controls": total,
"total_obligations": total_obligations,
"controls_per_obligation": controls_per_obligation,
"duplicate_merge_key_rate": f"{duplicate_rate}%",
"evidence_leak_rate": f"{evidence_leak_rate}%",
"truncated_title_rate": f"{truncated_title_rate}%",
"mcp_coverage": f"{mcp_coverage}%",
"merge_key_coverage": f"{mk_coverage}%",
"targets": {
"controls_per_obligation": "0.2-0.4",
"duplicate_rate": "<5%",
"evidence_leak_rate": "<2%",
"truncated_title_rate": "0%",
"mcp_coverage": "100%",
},
}
finally:
db.close()