""" Woodpecker CI Proxy API Liest Pipeline-Daten direkt aus der Woodpecker SQLite-Datenbank. Wird als Fallback verwendet, wenn kein WOODPECKER_TOKEN konfiguriert ist. """ import sqlite3 from pathlib import Path from datetime import datetime from fastapi import APIRouter, Query router = APIRouter(prefix="/v1/woodpecker", tags=["Woodpecker CI"]) WOODPECKER_DB = Path("/woodpecker-data/woodpecker.sqlite") def get_db(): if not WOODPECKER_DB.exists(): return None conn = sqlite3.connect(f"file:{WOODPECKER_DB}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn @router.get("/status") async def get_status(): conn = get_db() if not conn: return {"status": "offline", "error": "Woodpecker DB nicht gefunden"} try: repos = [dict(r) for r in conn.execute( "SELECT id, name, full_name, active FROM repos ORDER BY id" ).fetchall()] total_pipelines = conn.execute("SELECT COUNT(*) FROM pipelines").fetchone()[0] success = conn.execute("SELECT COUNT(*) FROM pipelines WHERE status='success'").fetchone()[0] failure = conn.execute("SELECT COUNT(*) FROM pipelines WHERE status='failure'").fetchone()[0] latest = conn.execute("SELECT MAX(created) FROM pipelines").fetchone()[0] return { "status": "online", "repos": repos, "stats": { "total_pipelines": total_pipelines, "success": success, "failure": failure, "success_rate": round(success / total_pipelines * 100, 1) if total_pipelines > 0 else 0, }, "last_activity": datetime.fromtimestamp(latest).isoformat() if latest else None, } finally: conn.close() @router.get("/pipelines") async def get_pipelines( repo: int = Query(default=0, description="Repo ID (0 = alle)"), limit: int = Query(default=10, ge=1, le=100), ): conn = get_db() if not conn: return {"status": "offline", "pipelines": [], "lastUpdate": datetime.now().isoformat()} try: base_sql = """SELECT p.id, p.repo_id, p.number, p.status, p.event, p.branch, p."commit", p.message, p.author, p.created, p.started, p.finished, r.name as repo_name FROM pipelines p JOIN repos r ON r.id = p.repo_id""" if repo > 0: rows = conn.execute( base_sql + " WHERE p.repo_id = ? ORDER BY p.id DESC LIMIT ?", (repo, limit) ).fetchall() else: rows = conn.execute( base_sql + " ORDER BY p.id DESC LIMIT ?", (limit,) ).fetchall() pipelines = [] for r in rows: p = dict(r) # Get steps directly (steps.pipeline_id links to pipelines.id) steps = [dict(s) for s in conn.execute( """SELECT s.name, s.state, s.exit_code, s.error FROM steps s WHERE s.pipeline_id = ? ORDER BY s.pid""", (p["id"],) ).fetchall()] p["steps"] = steps p["commit"] = (p.get("commit") or "")[:7] msg = p.get("message") or "" p["message"] = msg.split("\n")[0][:100] pipelines.append(p) return { "status": "online", "pipelines": pipelines, "lastUpdate": datetime.now().isoformat(), } finally: conn.close() @router.get("/repos") async def get_repos(): conn = get_db() if not conn: return [] try: repos = [] for r in conn.execute("SELECT id, name, full_name, active FROM repos ORDER BY id").fetchall(): repo = dict(r) latest = conn.execute( 'SELECT status, created FROM pipelines WHERE repo_id = ? ORDER BY id DESC LIMIT 1', (repo["id"],) ).fetchone() if latest: repo["last_status"] = latest["status"] repo["last_activity"] = datetime.fromtimestamp(latest["created"]).isoformat() repos.append(repo) return repos finally: conn.close()