Docker Compose with 24+ services: - PostgreSQL (PostGIS), Valkey, MinIO, Qdrant - Vault (PKI/TLS), Nginx (Reverse Proxy) - Backend Core API, Consent Service, Billing Service - RAG Service, Embedding Service - Gitea, Woodpecker CI/CD - Night Scheduler, Health Aggregator - Jitsi (Web/XMPP/JVB/Jicofo), Mailpit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
170 lines
4.9 KiB
Python
170 lines
4.9 KiB
Python
"""
|
|
BreakPilot Health Aggregator Service
|
|
|
|
Checks TCP connectivity to all configured services and exposes
|
|
aggregate health status via a FastAPI HTTP interface.
|
|
|
|
Configuration via environment variables:
|
|
CHECK_SERVICES - comma-separated "host:port" pairs
|
|
e.g. "postgres:5432,redis:6379,api:3000"
|
|
CHECK_TIMEOUT - per-service TCP timeout in seconds (default: 3)
|
|
CACHE_TTL - seconds to cache results (default: 10)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import JSONResponse
|
|
|
|
app = FastAPI(title="BreakPilot Health Aggregator")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CHECK_TIMEOUT: float = float(os.environ.get("CHECK_TIMEOUT", "3"))
|
|
CACHE_TTL: float = float(os.environ.get("CACHE_TTL", "10"))
|
|
|
|
|
|
def _parse_services() -> list[dict[str, Any]]:
|
|
"""Parse CHECK_SERVICES env var into a list of {host, port} dicts."""
|
|
raw = os.environ.get("CHECK_SERVICES", "")
|
|
services: list[dict[str, Any]] = []
|
|
for entry in raw.split(","):
|
|
entry = entry.strip()
|
|
if not entry:
|
|
continue
|
|
if ":" not in entry:
|
|
continue
|
|
host, port_str = entry.rsplit(":", 1)
|
|
try:
|
|
port = int(port_str)
|
|
except ValueError:
|
|
continue
|
|
services.append({"host": host, "port": port})
|
|
return services
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# In-memory cache
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_cache: dict[str, Any] = {
|
|
"timestamp": 0.0,
|
|
"results": [],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TCP health check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _check_service(host: str, port: int) -> dict[str, Any]:
|
|
"""Attempt a TCP connection to host:port and return status + timing."""
|
|
start = time.monotonic()
|
|
try:
|
|
_, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(host, port),
|
|
timeout=CHECK_TIMEOUT,
|
|
)
|
|
elapsed_ms = round((time.monotonic() - start) * 1000, 2)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return {
|
|
"service": f"{host}:{port}",
|
|
"status": "up",
|
|
"response_time_ms": elapsed_ms,
|
|
}
|
|
except (OSError, asyncio.TimeoutError) as exc:
|
|
elapsed_ms = round((time.monotonic() - start) * 1000, 2)
|
|
return {
|
|
"service": f"{host}:{port}",
|
|
"status": "down",
|
|
"response_time_ms": elapsed_ms,
|
|
"error": str(exc) or type(exc).__name__,
|
|
}
|
|
|
|
|
|
async def _check_all() -> list[dict[str, Any]]:
|
|
"""Check every configured service concurrently, with caching."""
|
|
now = time.monotonic()
|
|
|
|
if _cache["results"] and (now - _cache["timestamp"]) < CACHE_TTL:
|
|
return _cache["results"]
|
|
|
|
services = _parse_services()
|
|
if not services:
|
|
return []
|
|
|
|
tasks = [_check_service(s["host"], s["port"]) for s in services]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
_cache["timestamp"] = time.monotonic()
|
|
_cache["results"] = list(results)
|
|
|
|
return _cache["results"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Aggregate health endpoint.
|
|
|
|
Returns 200 when all services are reachable, 503 otherwise.
|
|
"""
|
|
results = await _check_all()
|
|
|
|
all_up = all(r["status"] == "up" for r in results)
|
|
total = len(results)
|
|
healthy = sum(1 for r in results if r["status"] == "up")
|
|
|
|
body = {
|
|
"status": "healthy" if all_up else "degraded",
|
|
"services_total": total,
|
|
"services_healthy": healthy,
|
|
}
|
|
|
|
if not results:
|
|
body["status"] = "no_services_configured"
|
|
return JSONResponse(content=body, status_code=200)
|
|
|
|
status_code = 200 if all_up else 503
|
|
return JSONResponse(content=body, status_code=status_code)
|
|
|
|
|
|
@app.get("/health/details")
|
|
async def health_details():
|
|
"""Detailed per-service health information.
|
|
|
|
Returns 200 when all services are reachable, 503 otherwise.
|
|
"""
|
|
results = await _check_all()
|
|
|
|
all_up = all(r["status"] == "up" for r in results)
|
|
total = len(results)
|
|
healthy = sum(1 for r in results if r["status"] == "up")
|
|
|
|
body = {
|
|
"status": "healthy" if all_up else "degraded",
|
|
"services_total": total,
|
|
"services_healthy": healthy,
|
|
"services": results,
|
|
}
|
|
|
|
if not results:
|
|
body["status"] = "no_services_configured"
|
|
return JSONResponse(content=body, status_code=200)
|
|
|
|
status_code = 200 if all_up else 503
|
|
return JSONResponse(content=body, status_code=status_code)
|