8336c01c5c
Phase 6: PDF export via WeasyPrint — POST /agent/scans/pdf generates printable compliance report with findings table, service comparison, risk badge, and legal disclaimer. Phase 7: Recurring scans — POST /agent/monitored-urls to add URLs, POST /agent/run-scheduled triggers all enabled scans (cron/ZeroClaw). In-memory storage with DB upgrade path. Phase 8: Multi-website compare — POST /agent/compare with 2-5 URLs, parallel scanning, comparison table (risk, findings, services, compliance features per site). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
112 lines
3.5 KiB
Python
112 lines
3.5 KiB
Python
"""
|
|
Agent Recurring Scan Routes — schedule and run automated periodic scans.
|
|
|
|
POST /api/compliance/agent/monitored-urls — add URL to monitoring
|
|
GET /api/compliance/agent/monitored-urls — list monitored URLs
|
|
POST /api/compliance/agent/run-scheduled — trigger all scheduled scans
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
DATABASE_URL = os.environ.get(
|
|
"COMPLIANCE_DATABASE_URL",
|
|
os.environ.get("DATABASE_URL", ""),
|
|
)
|
|
|
|
# In-memory fallback when no DB available
|
|
_monitored_urls: list[dict] = []
|
|
|
|
|
|
class MonitoredURL(BaseModel):
|
|
url: str
|
|
scan_type: str = "scan" # scan, consent_test
|
|
frequency: str = "weekly" # daily, weekly, monthly
|
|
recipient: str = "dsb@breakpilot.local"
|
|
enabled: bool = True
|
|
|
|
|
|
@router.post("/monitored-urls")
|
|
async def add_monitored_url(req: MonitoredURL):
|
|
"""Add a URL to the monitoring list."""
|
|
entry = {
|
|
"id": str(uuid.uuid4()),
|
|
"url": req.url,
|
|
"scan_type": req.scan_type,
|
|
"frequency": req.frequency,
|
|
"recipient": req.recipient,
|
|
"enabled": req.enabled,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"last_scan_at": None,
|
|
}
|
|
_monitored_urls.append(entry)
|
|
logger.info("Added monitored URL: %s (%s)", req.url, req.frequency)
|
|
return {"status": "added", **entry}
|
|
|
|
|
|
@router.get("/monitored-urls")
|
|
async def list_monitored_urls():
|
|
"""List all monitored URLs."""
|
|
return {"urls": _monitored_urls}
|
|
|
|
|
|
@router.delete("/monitored-urls/{url_id}")
|
|
async def remove_monitored_url(url_id: str):
|
|
"""Remove a URL from monitoring."""
|
|
global _monitored_urls
|
|
_monitored_urls = [u for u in _monitored_urls if u["id"] != url_id]
|
|
return {"status": "removed"}
|
|
|
|
|
|
@router.post("/run-scheduled")
|
|
async def run_scheduled_scans():
|
|
"""Trigger all enabled scheduled scans. Called by cron/ZeroClaw."""
|
|
import httpx
|
|
|
|
results = []
|
|
backend_url = "http://localhost:8002"
|
|
|
|
for entry in _monitored_urls:
|
|
if not entry["enabled"]:
|
|
continue
|
|
|
|
url = entry["url"]
|
|
scan_type = entry["scan_type"]
|
|
logger.info("Running scheduled %s for %s", scan_type, url)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
if scan_type == "consent_test":
|
|
resp = await client.post(
|
|
"http://bp-compliance-consent-tester:8094/scan",
|
|
json={"url": url},
|
|
)
|
|
else:
|
|
resp = await client.post(
|
|
f"{backend_url}/api/compliance/agent/scan",
|
|
json={"url": url, "mode": "post_launch", "recipient": entry["recipient"]},
|
|
)
|
|
|
|
entry["last_scan_at"] = datetime.now(timezone.utc).isoformat()
|
|
results.append({
|
|
"url": url,
|
|
"scan_type": scan_type,
|
|
"status": "completed" if resp.status_code == 200 else "failed",
|
|
"status_code": resp.status_code,
|
|
})
|
|
except Exception as e:
|
|
logger.error("Scheduled scan failed for %s: %s", url, e)
|
|
results.append({"url": url, "scan_type": scan_type, "status": "error", "error": str(e)})
|
|
|
|
return {"scans_triggered": len(results), "results": results}
|