Files
breakpilot-core/backend-core/woodpecker_proxy_api.py
Benjamin Boenisch b7d21daa24
All checks were successful
CI / test-bqas (push) Successful in 32s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-consent (push) Successful in 46s
CI / test-python-voice (push) Successful in 38s
feat: Add DevSecOps tools, Woodpecker proxy, Vault persistent storage, pitch-deck annex slides
- Install Gitleaks, Trivy, Grype, Syft, Semgrep, Bandit in backend-core Dockerfile
- Add Woodpecker SQLite proxy API (fallback without API token)
- Mount woodpecker_data volume read-only to backend-core
- Add backend proxy fallback in admin-core Woodpecker route
- Add Vault file-based persistent storage (config.hcl, init-vault.sh)
- Auto-init, unseal and root-token persistence for Vault
- Add 6 pitch-deck annex slides (Assumptions, Architecture, GTM, Regulatory, Engineering, AI Pipeline)
- Dynamic margin/amortization KPIs in BusinessModelSlide
- Market sources modal with citations in MarketSlide
- Redesign nginx landing page to 3-column layout (Lehrer/Compliance/Core)
- Extend MkDocs nav with Services and SDK documentation sections
- Add SDK Protection architecture doc

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 15:42:43 +01:00

134 lines
4.2 KiB
Python

"""
Woodpecker CI Proxy API
Liest Pipeline-Daten direkt aus der Woodpecker SQLite-Datenbank.
Wird als Fallback verwendet, wenn kein WOODPECKER_TOKEN konfiguriert ist.
"""
import sqlite3
from pathlib import Path
from datetime import datetime
from fastapi import APIRouter, Query
router = APIRouter(prefix="/v1/woodpecker", tags=["Woodpecker CI"])
WOODPECKER_DB = Path("/woodpecker-data/woodpecker.sqlite")
def get_db():
if not WOODPECKER_DB.exists():
return None
conn = sqlite3.connect(f"file:{WOODPECKER_DB}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
@router.get("/status")
async def get_status():
conn = get_db()
if not conn:
return {"status": "offline", "error": "Woodpecker DB nicht gefunden"}
try:
repos = [dict(r) for r in conn.execute(
"SELECT id, name, full_name, active FROM repos ORDER BY id"
).fetchall()]
total_pipelines = conn.execute("SELECT COUNT(*) FROM pipelines").fetchone()[0]
success = conn.execute("SELECT COUNT(*) FROM pipelines WHERE status='success'").fetchone()[0]
failure = conn.execute("SELECT COUNT(*) FROM pipelines WHERE status='failure'").fetchone()[0]
latest = conn.execute("SELECT MAX(created) FROM pipelines").fetchone()[0]
return {
"status": "online",
"repos": repos,
"stats": {
"total_pipelines": total_pipelines,
"success": success,
"failure": failure,
"success_rate": round(success / total_pipelines * 100, 1) if total_pipelines > 0 else 0,
},
"last_activity": datetime.fromtimestamp(latest).isoformat() if latest else None,
}
finally:
conn.close()
@router.get("/pipelines")
async def get_pipelines(
repo: int = Query(default=0, description="Repo ID (0 = alle)"),
limit: int = Query(default=10, ge=1, le=100),
):
conn = get_db()
if not conn:
return {"status": "offline", "pipelines": [], "lastUpdate": datetime.now().isoformat()}
try:
base_sql = """SELECT p.id, p.repo_id, p.number, p.status, p.event, p.branch,
p."commit", p.message, p.author, p.created, p.started, p.finished,
r.name as repo_name
FROM pipelines p
JOIN repos r ON r.id = p.repo_id"""
if repo > 0:
rows = conn.execute(
base_sql + " WHERE p.repo_id = ? ORDER BY p.id DESC LIMIT ?",
(repo, limit)
).fetchall()
else:
rows = conn.execute(
base_sql + " ORDER BY p.id DESC LIMIT ?",
(limit,)
).fetchall()
pipelines = []
for r in rows:
p = dict(r)
# Get steps directly (steps.pipeline_id links to pipelines.id)
steps = [dict(s) for s in conn.execute(
"""SELECT s.name, s.state, s.exit_code, s.error
FROM steps s
WHERE s.pipeline_id = ?
ORDER BY s.pid""",
(p["id"],)
).fetchall()]
p["steps"] = steps
p["commit"] = (p.get("commit") or "")[:7]
msg = p.get("message") or ""
p["message"] = msg.split("\n")[0][:100]
pipelines.append(p)
return {
"status": "online",
"pipelines": pipelines,
"lastUpdate": datetime.now().isoformat(),
}
finally:
conn.close()
@router.get("/repos")
async def get_repos():
conn = get_db()
if not conn:
return []
try:
repos = []
for r in conn.execute("SELECT id, name, full_name, active FROM repos ORDER BY id").fetchall():
repo = dict(r)
latest = conn.execute(
'SELECT status, created FROM pipelines WHERE repo_id = ? ORDER BY id DESC LIMIT 1',
(repo["id"],)
).fetchone()
if latest:
repo["last_status"] = latest["status"]
repo["last_activity"] = datetime.fromtimestamp(latest["created"]).isoformat()
repos.append(repo)
return repos
finally:
conn.close()