diff --git a/backend-compliance/compliance/api/agent_compare_routes.py b/backend-compliance/compliance/api/agent_compare_routes.py new file mode 100644 index 0000000..c73c8a8 --- /dev/null +++ b/backend-compliance/compliance/api/agent_compare_routes.py @@ -0,0 +1,94 @@ +""" +Agent Compare Routes — scan multiple websites and compare compliance posture. + +POST /api/compliance/agent/compare +""" + +import asyncio +import logging +from datetime import datetime, timezone + +import httpx +from fastapi import APIRouter +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/compliance/agent", tags=["agent"]) + + +class CompareRequest(BaseModel): + urls: list[str] # 2-5 URLs to compare + mode: str = "post_launch" + + +class SiteResult(BaseModel): + url: str + domain: str + risk_level: str = "" + risk_score: float = 0 + findings_count: int = 0 + services_count: int = 0 + has_impressum: bool = False + has_datenschutz: bool = False + has_cookie_banner: bool = False + has_google_fonts: bool = False + tracking_before_consent: int = 0 + classification: str = "" + scan_status: str = "pending" + + +class CompareResponse(BaseModel): + sites: list[SiteResult] + compared_at: str + + +@router.post("/compare", response_model=CompareResponse) +async def compare_websites(req: CompareRequest): + """Scan multiple websites and compare their compliance posture.""" + urls = req.urls[:5] # Max 5 + + async def scan_one(url: str) -> SiteResult: + domain = url.split("/")[2] if len(url.split("/")) > 2 else url + try: + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + "http://localhost:8002/api/compliance/agent/scan", + json={"url": url, "mode": req.mode}, + ) + if resp.status_code != 200: + return SiteResult(url=url, domain=domain, scan_status="failed") + + data = resp.json() + services = data.get("services", []) + findings = data.get("findings", []) + + return SiteResult( + url=url, + domain=domain, + risk_level=data.get("risk_level", ""), + risk_score=data.get("risk_score", 0), + findings_count=len(findings), + services_count=len(services), + has_impressum=not any("IMPRESSUM" in f.get("code", "") for f in findings if isinstance(f, dict)), + has_datenschutz=not any("DATENSCHUTZ" in f.get("code", "") for f in findings if isinstance(f, dict)), + has_cookie_banner=data.get("chatbot_detected", False) or any( + s.get("id") == "cmp" for s in services if isinstance(s, dict) + ), + has_google_fonts=any( + s.get("id") == "google_fonts" for s in services if isinstance(s, dict) + ), + classification=data.get("classification", ""), + scan_status="completed", + ) + except Exception as e: + logger.error("Compare scan failed for %s: %s", url, e) + return SiteResult(url=url, domain=domain, scan_status="error") + + # Scan all in parallel + results = await asyncio.gather(*[scan_one(u) for u in urls]) + + return CompareResponse( + sites=list(results), + compared_at=datetime.now(timezone.utc).isoformat(), + ) diff --git a/backend-compliance/compliance/api/agent_history_routes.py b/backend-compliance/compliance/api/agent_history_routes.py index ac05535..d36c1a3 100644 --- a/backend-compliance/compliance/api/agent_history_routes.py +++ b/backend-compliance/compliance/api/agent_history_routes.py @@ -13,8 +13,11 @@ import uuid from datetime import datetime, timezone from fastapi import APIRouter, Query +from fastapi.responses import Response from pydantic import BaseModel +from compliance.services.agent_pdf_export import generate_scan_pdf + logger = logging.getLogger(__name__) router = APIRouter(prefix="/compliance/agent", tags=["agent"]) @@ -195,3 +198,23 @@ async def get_scan(scan_id: str): return ScanDetail(id=scan_id, url="", scan_type="", analysis_mode="", result={}, created_at="") finally: await pool.close() + + +@router.post("/scans/pdf") +async def export_scan_pdf(req: SaveScanRequest): + """Generate a PDF report from scan results (no DB required).""" + try: + pdf_bytes = generate_scan_pdf({ + "url": req.url, + "scan_type": req.scan_type, + "analysis_mode": req.analysis_mode, + **req.result, + }) + return Response( + content=pdf_bytes, + media_type="application/pdf", + headers={"Content-Disposition": f'attachment; filename="compliance-report-{req.url.split("/")[2][:30]}.pdf"'}, + ) + except Exception as e: + logger.error("PDF generation failed: %s", e) + return {"error": str(e)} diff --git a/backend-compliance/compliance/api/agent_recurring_routes.py b/backend-compliance/compliance/api/agent_recurring_routes.py new file mode 100644 index 0000000..ca76d5b --- /dev/null +++ b/backend-compliance/compliance/api/agent_recurring_routes.py @@ -0,0 +1,111 @@ +""" +Agent Recurring Scan Routes — schedule and run automated periodic scans. + +POST /api/compliance/agent/monitored-urls — add URL to monitoring +GET /api/compliance/agent/monitored-urls — list monitored URLs +POST /api/compliance/agent/run-scheduled — trigger all scheduled scans +""" + +import json +import logging +import os +import uuid +from datetime import datetime, timezone + +from fastapi import APIRouter +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/compliance/agent", tags=["agent"]) + +DATABASE_URL = os.environ.get( + "COMPLIANCE_DATABASE_URL", + os.environ.get("DATABASE_URL", ""), +) + +# In-memory fallback when no DB available +_monitored_urls: list[dict] = [] + + +class MonitoredURL(BaseModel): + url: str + scan_type: str = "scan" # scan, consent_test + frequency: str = "weekly" # daily, weekly, monthly + recipient: str = "dsb@breakpilot.local" + enabled: bool = True + + +@router.post("/monitored-urls") +async def add_monitored_url(req: MonitoredURL): + """Add a URL to the monitoring list.""" + entry = { + "id": str(uuid.uuid4()), + "url": req.url, + "scan_type": req.scan_type, + "frequency": req.frequency, + "recipient": req.recipient, + "enabled": req.enabled, + "created_at": datetime.now(timezone.utc).isoformat(), + "last_scan_at": None, + } + _monitored_urls.append(entry) + logger.info("Added monitored URL: %s (%s)", req.url, req.frequency) + return {"status": "added", **entry} + + +@router.get("/monitored-urls") +async def list_monitored_urls(): + """List all monitored URLs.""" + return {"urls": _monitored_urls} + + +@router.delete("/monitored-urls/{url_id}") +async def remove_monitored_url(url_id: str): + """Remove a URL from monitoring.""" + global _monitored_urls + _monitored_urls = [u for u in _monitored_urls if u["id"] != url_id] + return {"status": "removed"} + + +@router.post("/run-scheduled") +async def run_scheduled_scans(): + """Trigger all enabled scheduled scans. Called by cron/ZeroClaw.""" + import httpx + + results = [] + backend_url = "http://localhost:8002" + + for entry in _monitored_urls: + if not entry["enabled"]: + continue + + url = entry["url"] + scan_type = entry["scan_type"] + logger.info("Running scheduled %s for %s", scan_type, url) + + try: + async with httpx.AsyncClient(timeout=300.0) as client: + if scan_type == "consent_test": + resp = await client.post( + "http://bp-compliance-consent-tester:8094/scan", + json={"url": url}, + ) + else: + resp = await client.post( + f"{backend_url}/api/compliance/agent/scan", + json={"url": url, "mode": "post_launch", "recipient": entry["recipient"]}, + ) + + entry["last_scan_at"] = datetime.now(timezone.utc).isoformat() + results.append({ + "url": url, + "scan_type": scan_type, + "status": "completed" if resp.status_code == 200 else "failed", + "status_code": resp.status_code, + }) + except Exception as e: + logger.error("Scheduled scan failed for %s: %s", url, e) + results.append({"url": url, "scan_type": scan_type, "status": "error", "error": str(e)}) + + return {"scans_triggered": len(results), "results": results} diff --git a/backend-compliance/compliance/services/agent_pdf_export.py b/backend-compliance/compliance/services/agent_pdf_export.py new file mode 100644 index 0000000..7785ec5 --- /dev/null +++ b/backend-compliance/compliance/services/agent_pdf_export.py @@ -0,0 +1,95 @@ +""" +Agent PDF Export — generates printable compliance scan reports. + +Uses WeasyPrint to convert HTML report to PDF. +""" + +import logging +from datetime import datetime, timezone +from io import BytesIO + +logger = logging.getLogger(__name__) + + +def generate_scan_pdf(scan_data: dict) -> bytes: + """Generate a PDF report from scan results.""" + from weasyprint import HTML + + html = _build_report_html(scan_data) + pdf_buffer = BytesIO() + HTML(string=html).write_pdf(pdf_buffer) + return pdf_buffer.getvalue() + + +def _severity_color(sev: str) -> str: + return {"HIGH": "#dc2626", "CRITICAL": "#991b1b", "MEDIUM": "#ea580c", "LOW": "#2563eb"}.get(sev, "#6b7280") + + +def _build_report_html(data: dict) -> str: + """Build HTML for the PDF report.""" + url = data.get("url", "") + scan_type = data.get("scan_type", "scan") + mode = data.get("analysis_mode", "post_launch") + findings = data.get("findings", []) + services = data.get("services", []) + risk = data.get("risk_level", "") + score = data.get("risk_score", 0) + pages = data.get("pages_scanned", 0) + now = datetime.now(timezone.utc).strftime("%d.%m.%Y %H:%M UTC") + + mode_label = "Live-Website Pruefung" if mode == "post_launch" else "Interne Pruefung" + type_label = {"quick": "Schnellanalyse", "scan": "Website-Scan", "consent_test": "Cookie-Test"}.get(scan_type, scan_type) + + findings_rows = "" + for f in findings: + sev = f.get("severity", "MEDIUM") if isinstance(f, dict) else "MEDIUM" + text = f.get("text", str(f)) if isinstance(f, dict) else str(f) + color = _severity_color(sev) + findings_rows += f'{sev}{text}' + + services_rows = "" + for s in services: + if isinstance(s, dict): + status_icon = "✓" if s.get("in_dse") or s.get("status") == "ok" else "✗" + status_color = "#16a34a" if status_icon == "✓" else "#dc2626" + services_rows += f'{status_icon}{s.get("name","")}{s.get("country","")}{s.get("category","")}' + + return f""" + + + +

Compliance Agent Report

+

{type_label} | {mode_label} | {now}

+ + + + + + +
URL{url}
Risikobewertung{risk} ({score}/100)
Seiten gescannt{pages}
Findings{len(findings)}
+ +{'
ACHTUNG: Maengel auf einer bereits veroeffentlichten Website. Sofortige Korrektur empfohlen.
' if mode == "post_launch" and findings else ''} + +

Findings ({len(findings)})

+ + + {findings_rows if findings_rows else ''} +
SchwereBeschreibung
Keine Findings — alles OK
+ +{'

Dienstleister-Abgleich

' + services_rows + '
StatusDienstLandKategorie
' if services_rows else ''} + + +""" diff --git a/backend-compliance/main.py b/backend-compliance/main.py index 671f833..1698af6 100644 --- a/backend-compliance/main.py +++ b/backend-compliance/main.py @@ -46,6 +46,8 @@ from compliance.api.agent_notification_routes import router as agent_notify_rout from compliance.api.agent_analyze_routes import router as agent_analyze_router from compliance.api.agent_scan_routes import router as agent_scan_router from compliance.api.agent_history_routes import router as agent_history_router +from compliance.api.agent_recurring_routes import router as agent_recurring_router +from compliance.api.agent_compare_routes import router as agent_compare_router # Middleware from middleware import ( @@ -146,6 +148,8 @@ app.include_router(agent_notify_router, prefix="/api") app.include_router(agent_analyze_router, prefix="/api") app.include_router(agent_scan_router, prefix="/api") app.include_router(agent_history_router, prefix="/api") +app.include_router(agent_recurring_router, prefix="/api") +app.include_router(agent_compare_router, prefix="/api") if __name__ == "__main__":