""" BreakPilot Health Aggregator Service Checks TCP connectivity to all configured services and exposes aggregate health status via a FastAPI HTTP interface. Configuration via environment variables: CHECK_SERVICES - comma-separated "host:port" pairs e.g. "postgres:5432,redis:6379,api:3000" CHECK_TIMEOUT - per-service TCP timeout in seconds (default: 3) CACHE_TTL - seconds to cache results (default: 10) """ from __future__ import annotations import asyncio import os import time from typing import Any from fastapi import FastAPI from fastapi.responses import JSONResponse app = FastAPI(title="BreakPilot Health Aggregator") # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- CHECK_TIMEOUT: float = float(os.environ.get("CHECK_TIMEOUT", "3")) CACHE_TTL: float = float(os.environ.get("CACHE_TTL", "10")) def _parse_services() -> list[dict[str, Any]]: """Parse CHECK_SERVICES env var into a list of {host, port} dicts.""" raw = os.environ.get("CHECK_SERVICES", "") services: list[dict[str, Any]] = [] for entry in raw.split(","): entry = entry.strip() if not entry: continue if ":" not in entry: continue host, port_str = entry.rsplit(":", 1) try: port = int(port_str) except ValueError: continue services.append({"host": host, "port": port}) return services # --------------------------------------------------------------------------- # In-memory cache # --------------------------------------------------------------------------- _cache: dict[str, Any] = { "timestamp": 0.0, "results": [], } # --------------------------------------------------------------------------- # TCP health check # --------------------------------------------------------------------------- async def _check_service(host: str, port: int) -> dict[str, Any]: """Attempt a TCP connection to host:port and return status + timing.""" start = time.monotonic() try: _, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=CHECK_TIMEOUT, ) elapsed_ms = round((time.monotonic() - start) * 1000, 2) writer.close() await writer.wait_closed() return { "service": f"{host}:{port}", "status": "up", "response_time_ms": elapsed_ms, } except (OSError, asyncio.TimeoutError) as exc: elapsed_ms = round((time.monotonic() - start) * 1000, 2) return { "service": f"{host}:{port}", "status": "down", "response_time_ms": elapsed_ms, "error": str(exc) or type(exc).__name__, } async def _check_all() -> list[dict[str, Any]]: """Check every configured service concurrently, with caching.""" now = time.monotonic() if _cache["results"] and (now - _cache["timestamp"]) < CACHE_TTL: return _cache["results"] services = _parse_services() if not services: return [] tasks = [_check_service(s["host"], s["port"]) for s in services] results = await asyncio.gather(*tasks) _cache["timestamp"] = time.monotonic() _cache["results"] = list(results) return _cache["results"] # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/health") async def health(): """Aggregate health endpoint. Returns 200 when all services are reachable, 503 otherwise. """ results = await _check_all() all_up = all(r["status"] == "up" for r in results) total = len(results) healthy = sum(1 for r in results if r["status"] == "up") body = { "status": "healthy" if all_up else "degraded", "services_total": total, "services_healthy": healthy, } if not results: body["status"] = "no_services_configured" return JSONResponse(content=body, status_code=200) status_code = 200 if all_up else 503 return JSONResponse(content=body, status_code=status_code) @app.get("/health/details") async def health_details(): """Detailed per-service health information. Returns 200 when all services are reachable, 503 otherwise. """ results = await _check_all() all_up = all(r["status"] == "up" for r in results) total = len(results) healthy = sum(1 for r in results if r["status"] == "up") body = { "status": "healthy" if all_up else "degraded", "services_total": total, "services_healthy": healthy, "services": results, } if not results: body["status"] = "no_services_configured" return JSONResponse(content=body, status_code=200) status_code = 200 if all_up else 503 return JSONResponse(content=body, status_code=status_code)