feat: Phase 6-8 — PDF export, recurring scans, multi-website compare

Phase 6: PDF export via WeasyPrint — POST /agent/scans/pdf generates
printable compliance report with findings table, service comparison,
risk badge, and legal disclaimer.

Phase 7: Recurring scans — POST /agent/monitored-urls to add URLs,
POST /agent/run-scheduled triggers all enabled scans (cron/ZeroClaw).
In-memory storage with DB upgrade path.

Phase 8: Multi-website compare — POST /agent/compare with 2-5 URLs,
parallel scanning, comparison table (risk, findings, services, compliance
features per site).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-29 15:27:51 +02:00
parent e35db90232
commit 8336c01c5c
5 changed files with 327 additions and 0 deletions
@@ -0,0 +1,94 @@
"""
Agent Compare Routes — scan multiple websites and compare compliance posture.
POST /api/compliance/agent/compare
"""
import asyncio
import logging
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
class CompareRequest(BaseModel):
urls: list[str] # 2-5 URLs to compare
mode: str = "post_launch"
class SiteResult(BaseModel):
url: str
domain: str
risk_level: str = ""
risk_score: float = 0
findings_count: int = 0
services_count: int = 0
has_impressum: bool = False
has_datenschutz: bool = False
has_cookie_banner: bool = False
has_google_fonts: bool = False
tracking_before_consent: int = 0
classification: str = ""
scan_status: str = "pending"
class CompareResponse(BaseModel):
sites: list[SiteResult]
compared_at: str
@router.post("/compare", response_model=CompareResponse)
async def compare_websites(req: CompareRequest):
"""Scan multiple websites and compare their compliance posture."""
urls = req.urls[:5] # Max 5
async def scan_one(url: str) -> SiteResult:
domain = url.split("/")[2] if len(url.split("/")) > 2 else url
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
"http://localhost:8002/api/compliance/agent/scan",
json={"url": url, "mode": req.mode},
)
if resp.status_code != 200:
return SiteResult(url=url, domain=domain, scan_status="failed")
data = resp.json()
services = data.get("services", [])
findings = data.get("findings", [])
return SiteResult(
url=url,
domain=domain,
risk_level=data.get("risk_level", ""),
risk_score=data.get("risk_score", 0),
findings_count=len(findings),
services_count=len(services),
has_impressum=not any("IMPRESSUM" in f.get("code", "") for f in findings if isinstance(f, dict)),
has_datenschutz=not any("DATENSCHUTZ" in f.get("code", "") for f in findings if isinstance(f, dict)),
has_cookie_banner=data.get("chatbot_detected", False) or any(
s.get("id") == "cmp" for s in services if isinstance(s, dict)
),
has_google_fonts=any(
s.get("id") == "google_fonts" for s in services if isinstance(s, dict)
),
classification=data.get("classification", ""),
scan_status="completed",
)
except Exception as e:
logger.error("Compare scan failed for %s: %s", url, e)
return SiteResult(url=url, domain=domain, scan_status="error")
# Scan all in parallel
results = await asyncio.gather(*[scan_one(u) for u in urls])
return CompareResponse(
sites=list(results),
compared_at=datetime.now(timezone.utc).isoformat(),
)
@@ -13,8 +13,11 @@ import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Query
from fastapi.responses import Response
from pydantic import BaseModel
from compliance.services.agent_pdf_export import generate_scan_pdf
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
@@ -195,3 +198,23 @@ async def get_scan(scan_id: str):
return ScanDetail(id=scan_id, url="", scan_type="", analysis_mode="", result={}, created_at="")
finally:
await pool.close()
@router.post("/scans/pdf")
async def export_scan_pdf(req: SaveScanRequest):
"""Generate a PDF report from scan results (no DB required)."""
try:
pdf_bytes = generate_scan_pdf({
"url": req.url,
"scan_type": req.scan_type,
"analysis_mode": req.analysis_mode,
**req.result,
})
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="compliance-report-{req.url.split("/")[2][:30]}.pdf"'},
)
except Exception as e:
logger.error("PDF generation failed: %s", e)
return {"error": str(e)}
@@ -0,0 +1,111 @@
"""
Agent Recurring Scan Routes — schedule and run automated periodic scans.
POST /api/compliance/agent/monitored-urls — add URL to monitoring
GET /api/compliance/agent/monitored-urls — list monitored URLs
POST /api/compliance/agent/run-scheduled — trigger all scheduled scans
"""
import json
import logging
import os
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
DATABASE_URL = os.environ.get(
"COMPLIANCE_DATABASE_URL",
os.environ.get("DATABASE_URL", ""),
)
# In-memory fallback when no DB available
_monitored_urls: list[dict] = []
class MonitoredURL(BaseModel):
url: str
scan_type: str = "scan" # scan, consent_test
frequency: str = "weekly" # daily, weekly, monthly
recipient: str = "dsb@breakpilot.local"
enabled: bool = True
@router.post("/monitored-urls")
async def add_monitored_url(req: MonitoredURL):
"""Add a URL to the monitoring list."""
entry = {
"id": str(uuid.uuid4()),
"url": req.url,
"scan_type": req.scan_type,
"frequency": req.frequency,
"recipient": req.recipient,
"enabled": req.enabled,
"created_at": datetime.now(timezone.utc).isoformat(),
"last_scan_at": None,
}
_monitored_urls.append(entry)
logger.info("Added monitored URL: %s (%s)", req.url, req.frequency)
return {"status": "added", **entry}
@router.get("/monitored-urls")
async def list_monitored_urls():
"""List all monitored URLs."""
return {"urls": _monitored_urls}
@router.delete("/monitored-urls/{url_id}")
async def remove_monitored_url(url_id: str):
"""Remove a URL from monitoring."""
global _monitored_urls
_monitored_urls = [u for u in _monitored_urls if u["id"] != url_id]
return {"status": "removed"}
@router.post("/run-scheduled")
async def run_scheduled_scans():
"""Trigger all enabled scheduled scans. Called by cron/ZeroClaw."""
import httpx
results = []
backend_url = "http://localhost:8002"
for entry in _monitored_urls:
if not entry["enabled"]:
continue
url = entry["url"]
scan_type = entry["scan_type"]
logger.info("Running scheduled %s for %s", scan_type, url)
try:
async with httpx.AsyncClient(timeout=300.0) as client:
if scan_type == "consent_test":
resp = await client.post(
"http://bp-compliance-consent-tester:8094/scan",
json={"url": url},
)
else:
resp = await client.post(
f"{backend_url}/api/compliance/agent/scan",
json={"url": url, "mode": "post_launch", "recipient": entry["recipient"]},
)
entry["last_scan_at"] = datetime.now(timezone.utc).isoformat()
results.append({
"url": url,
"scan_type": scan_type,
"status": "completed" if resp.status_code == 200 else "failed",
"status_code": resp.status_code,
})
except Exception as e:
logger.error("Scheduled scan failed for %s: %s", url, e)
results.append({"url": url, "scan_type": scan_type, "status": "error", "error": str(e)})
return {"scans_triggered": len(results), "results": results}