"""Master Control API — G-pre3. Provides read access to Master Controls (lifecycle-grouped atomic controls). """ import json import logging from typing import Optional from fastapi import APIRouter, HTTPException, Query from sqlalchemy import text from db.session import SessionLocal logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/master-controls", tags=["master-controls"]) @router.get("") async def list_master_controls( limit: int = Query(50, ge=1, le=500), offset: int = Query(0, ge=0), search: Optional[str] = None, min_phases: Optional[int] = None, min_controls: Optional[int] = None, sort: str = Query("total_controls", regex="^(total_controls|phases|name|created_at)$"), ): """List Master Controls with optional filtering.""" db = SessionLocal() try: where_clauses = [] params: dict = {"limit": limit, "offset": offset} if search: where_clauses.append("mc.canonical_name ILIKE :search") params["search"] = f"%{search}%" if min_phases: where_clauses.append("jsonb_array_length(mc.phases_covered) >= :min_phases") params["min_phases"] = min_phases if min_controls: where_clauses.append("mc.total_controls >= :min_controls") params["min_controls"] = min_controls where = "WHERE " + " AND ".join(where_clauses) if where_clauses else "" sort_map = { "total_controls": "mc.total_controls DESC", "phases": "jsonb_array_length(mc.phases_covered) DESC", "name": "mc.canonical_name ASC", "created_at": "mc.created_at DESC", } order = sort_map.get(sort, "mc.total_controls DESC") rows = db.execute(text(f""" SELECT mc.id, mc.master_control_id, mc.object_group_id, mc.canonical_name, mc.phases_covered, mc.phase_control_count, mc.total_controls, mc.created_at FROM master_controls mc {where} ORDER BY {order} LIMIT :limit OFFSET :offset """), params).fetchall() total = db.execute(text(f""" SELECT count(*) FROM master_controls mc {where} """), params).scalar() return { "total": total, "limit": limit, "offset": offset, "master_controls": [ { "id": str(r[0]), "master_control_id": r[1], "object_group_id": r[2], "canonical_name": r[3], "phases_covered": r[4], "phase_control_count": r[5], "total_controls": r[6], "created_at": str(r[7]), } for r in rows ], } finally: db.close() @router.get("/stats") async def master_control_stats(): """Aggregate statistics about Master Controls.""" db = SessionLocal() try: stats = db.execute(text(""" SELECT count(*) AS total_master_controls, sum(total_controls) AS total_member_controls, avg(total_controls)::int AS avg_controls_per_mc, max(total_controls) AS max_controls, avg(jsonb_array_length(phases_covered))::numeric(3,1) AS avg_phases, max(jsonb_array_length(phases_covered)) AS max_phases FROM master_controls """)).fetchone() phase_dist = db.execute(text(""" SELECT phase, count(*) AS control_count FROM master_control_members GROUP BY phase ORDER BY control_count DESC """)).fetchall() return { "total_master_controls": stats[0], "total_member_controls": stats[1], "avg_controls_per_mc": stats[2], "max_controls": stats[3], "avg_phases": float(stats[4]) if stats[4] else 0, "max_phases": stats[5], "phase_distribution": {r[0]: r[1] for r in phase_dist}, } finally: db.close() @router.get("/{mc_id}") async def get_master_control(mc_id: str): """Get a single Master Control with all phase-controls.""" db = SessionLocal() try: mc = db.execute(text(""" SELECT mc.id, mc.master_control_id, mc.object_group_id, mc.canonical_name, mc.phases_covered, mc.phase_control_count, mc.total_controls FROM master_controls mc WHERE mc.master_control_id = :mc_id """), {"mc_id": mc_id}).fetchone() if not mc: raise HTTPException(status_code=404, detail="Master Control not found") members = db.execute(text(""" SELECT mcm.phase, mcm.action, cc.control_id, cc.title, cc.severity, cc.source_citation->>'source' AS source FROM master_control_members mcm JOIN canonical_controls cc ON cc.id = mcm.control_uuid WHERE mcm.master_control_uuid = CAST(:mc_uuid AS uuid) ORDER BY mcm.phase, cc.control_id """), {"mc_uuid": str(mc[0])}).fetchall() # Group by phase phases = {} for phase, action, ctrl_id, title, severity, source in members: if phase not in phases: phases[phase] = [] phases[phase].append({ "control_id": ctrl_id, "title": title, "action": action, "severity": severity, "source": source, }) return { "id": str(mc[0]), "master_control_id": mc[1], "object_group_id": mc[2], "canonical_name": mc[3], "phases_covered": mc[4], "phase_control_count": mc[5], "total_controls": mc[6], "phases": phases, } finally: db.close()