8336c01c5c
Phase 6: PDF export via WeasyPrint — POST /agent/scans/pdf generates printable compliance report with findings table, service comparison, risk badge, and legal disclaimer. Phase 7: Recurring scans — POST /agent/monitored-urls to add URLs, POST /agent/run-scheduled triggers all enabled scans (cron/ZeroClaw). In-memory storage with DB upgrade path. Phase 8: Multi-website compare — POST /agent/compare with 2-5 URLs, parallel scanning, comparison table (risk, findings, services, compliance features per site). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
95 lines
3.2 KiB
Python
95 lines
3.2 KiB
Python
"""
|
|
Agent Compare Routes — scan multiple websites and compare compliance posture.
|
|
|
|
POST /api/compliance/agent/compare
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
|
|
class CompareRequest(BaseModel):
|
|
urls: list[str] # 2-5 URLs to compare
|
|
mode: str = "post_launch"
|
|
|
|
|
|
class SiteResult(BaseModel):
|
|
url: str
|
|
domain: str
|
|
risk_level: str = ""
|
|
risk_score: float = 0
|
|
findings_count: int = 0
|
|
services_count: int = 0
|
|
has_impressum: bool = False
|
|
has_datenschutz: bool = False
|
|
has_cookie_banner: bool = False
|
|
has_google_fonts: bool = False
|
|
tracking_before_consent: int = 0
|
|
classification: str = ""
|
|
scan_status: str = "pending"
|
|
|
|
|
|
class CompareResponse(BaseModel):
|
|
sites: list[SiteResult]
|
|
compared_at: str
|
|
|
|
|
|
@router.post("/compare", response_model=CompareResponse)
|
|
async def compare_websites(req: CompareRequest):
|
|
"""Scan multiple websites and compare their compliance posture."""
|
|
urls = req.urls[:5] # Max 5
|
|
|
|
async def scan_one(url: str) -> SiteResult:
|
|
domain = url.split("/")[2] if len(url.split("/")) > 2 else url
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
resp = await client.post(
|
|
"http://localhost:8002/api/compliance/agent/scan",
|
|
json={"url": url, "mode": req.mode},
|
|
)
|
|
if resp.status_code != 200:
|
|
return SiteResult(url=url, domain=domain, scan_status="failed")
|
|
|
|
data = resp.json()
|
|
services = data.get("services", [])
|
|
findings = data.get("findings", [])
|
|
|
|
return SiteResult(
|
|
url=url,
|
|
domain=domain,
|
|
risk_level=data.get("risk_level", ""),
|
|
risk_score=data.get("risk_score", 0),
|
|
findings_count=len(findings),
|
|
services_count=len(services),
|
|
has_impressum=not any("IMPRESSUM" in f.get("code", "") for f in findings if isinstance(f, dict)),
|
|
has_datenschutz=not any("DATENSCHUTZ" in f.get("code", "") for f in findings if isinstance(f, dict)),
|
|
has_cookie_banner=data.get("chatbot_detected", False) or any(
|
|
s.get("id") == "cmp" for s in services if isinstance(s, dict)
|
|
),
|
|
has_google_fonts=any(
|
|
s.get("id") == "google_fonts" for s in services if isinstance(s, dict)
|
|
),
|
|
classification=data.get("classification", ""),
|
|
scan_status="completed",
|
|
)
|
|
except Exception as e:
|
|
logger.error("Compare scan failed for %s: %s", url, e)
|
|
return SiteResult(url=url, domain=domain, scan_status="error")
|
|
|
|
# Scan all in parallel
|
|
results = await asyncio.gather(*[scan_one(u) for u in urls])
|
|
|
|
return CompareResponse(
|
|
sites=list(results),
|
|
compared_at=datetime.now(timezone.utc).isoformat(),
|
|
)
|