feat: Async scan with polling — no more timeout issues

Fundamental fix: scans now run asynchronously with progress polling.

Backend:
- POST /scan starts background task, returns scan_id immediately
- GET /scan/{scan_id} returns status + progress + result when done
- 7 progress steps shown: Website scan, DSI discovery, DSE analysis,
  SOLL/IST comparison, corrections, report, email
- In-memory job store (dict with scan_id → status/result)
- No timeout limits on scan duration

Frontend:
- POST starts scan, receives scan_id
- Polls GET every 5 seconds (max 120 attempts = 10 min)
- Shows live progress message during scan
- Displays result when completed, error when failed

Proxy:
- POST timeout reduced to 30s (just starts the job)
- GET timeout 10s (just status check)
- No more 504/connection-dropped errors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-05 07:30:09 +02:00
parent d7b287889e
commit cb607bf228
4 changed files with 189 additions and 57 deletions
@@ -107,3 +107,41 @@ def build_scan_summary(
])
return "\n".join(parts)
async def fetch_dse_text(url: str, scanned_pages: list[str]) -> str:
"""Find and fetch the privacy policy page text."""
dse_url = None
for page in scanned_pages:
if re.search(r"datenschutz|privacy|dsgvo", page, re.IGNORECASE):
dse_url = page
break
if not dse_url:
dse_url = url
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
resp = await client.get(dse_url, headers={"User-Agent": "BreakPilot-Compliance-Agent/1.0"})
html = resp.text
clean = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
clean = re.sub(r"<[^>]+>", " ", clean)
clean = re.sub(r"\s+", " ", clean).strip()
return clean[:8000]
except Exception:
return ""
async def fetch_dse_html(url: str, scanned_pages: list[str]) -> str:
"""Fetch the raw HTML of the privacy policy page."""
dse_url = None
for page in scanned_pages:
if re.search(r"datenschutz|privacy|dsgvo", page, re.IGNORECASE):
dse_url = page
break
if not dse_url:
dse_url = url
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
resp = await client.get(dse_url, headers={"User-Agent": "BreakPilot-Compliance-Agent/1.0"})
return resp.text
except Exception:
return ""
@@ -23,7 +23,9 @@ from compliance.services.mandatory_content_checker import (
check_mandatory_documents, check_dse_mandatory_content, MandatoryFinding,
)
from compliance.services.legal_basis_validator import validate_legal_bases
from compliance.api.agent_scan_helpers import add_corrections, build_scan_summary
from compliance.api.agent_scan_helpers import (
add_corrections, build_scan_summary, fetch_dse_text, fetch_dse_html,
)
logger = logging.getLogger(__name__)
@@ -106,11 +108,76 @@ class ScanResponse(BaseModel):
scanned_at: str
@router.post("/scan", response_model=ScanResponse)
async def scan_website_endpoint(req: ScanRequest):
"""Deep website scan: multi-page crawl + SOLL/IST service comparison."""
is_live = req.mode == "post_launch"
import asyncio
import uuid as _uuid
# In-memory scan job store (survives until container restart)
_scan_jobs: dict[str, dict] = {}
class ScanStartResponse(BaseModel):
scan_id: str
status: str = "running"
message: str = ""
class ScanStatusResponse(BaseModel):
scan_id: str
status: str # "running", "completed", "failed"
progress: str = ""
result: ScanResponse | None = None
error: str = ""
@router.post("/scan")
async def scan_website_endpoint(req: ScanRequest):
"""Start async website scan. Returns scan_id immediately.
Poll GET /scan/{scan_id} for status and results."""
scan_id = str(_uuid.uuid4())[:8]
_scan_jobs[scan_id] = {"status": "running", "progress": "Scan gestartet...", "result": None, "error": ""}
# Launch scan in background
asyncio.create_task(_run_scan(scan_id, req))
return ScanStartResponse(scan_id=scan_id, status="running", message="Scan gestartet. Ergebnisse unter GET /scan/{scan_id}")
@router.get("/scan/{scan_id}")
async def get_scan_status(scan_id: str):
"""Poll scan status. Returns result when completed."""
job = _scan_jobs.get(scan_id)
if not job:
return {"scan_id": scan_id, "status": "not_found", "error": "Scan-ID nicht gefunden"}
return ScanStatusResponse(
scan_id=scan_id,
status=job["status"],
progress=job.get("progress", ""),
result=job.get("result"),
error=job.get("error", ""),
)
async def _run_scan(scan_id: str, req: ScanRequest):
"""Background scan task — updates _scan_jobs with progress."""
try:
result = await _execute_scan(req, scan_id)
_scan_jobs[scan_id]["status"] = "completed"
_scan_jobs[scan_id]["result"] = result
_scan_jobs[scan_id]["progress"] = "Fertig"
except Exception as e:
logger.error("Scan %s failed: %s", scan_id, e)
_scan_jobs[scan_id]["status"] = "failed"
_scan_jobs[scan_id]["error"] = str(e)[:500]
async def _execute_scan(req: ScanRequest, scan_id: str = "") -> ScanResponse:
"""Execute the full scan pipeline (called as background task)."""
is_live = req.mode == "post_launch"
def _progress(msg: str):
if scan_id and scan_id in _scan_jobs:
_scan_jobs[scan_id]["progress"] = msg
_progress("Schritt 1/7: Website wird gescannt...")
# Step 1: Scan website — try Playwright first (JS-rendered), fallback to httpx
playwright_htmls: dict[str, str] = {}
try:
@@ -153,14 +220,15 @@ async def scan_website_endpoint(req: ScanRequest):
logger.info("Scanned %d pages, found %d services", len(scan.pages_scanned), len(scan.detected_services))
_progress(f"Schritt 2/7: Rechtliche Dokumente suchen... ({len(scan.pages_scanned)} Seiten gescannt)")
# Step 1b: DSI Discovery — find all legal documents on the website
discovered_docs: list[DiscoveredDocument] = []
dsi_findings: list[ScanFinding] = []
try:
async with httpx.AsyncClient(timeout=180.0) as dsi_client:
async with httpx.AsyncClient(timeout=300.0) as dsi_client:
dsi_resp = await dsi_client.post(
"http://bp-compliance-consent-tester:8094/dsi-discovery",
json={"url": req.url, "max_documents": 20},
json={"url": req.url, "max_documents": 30},
)
if dsi_resp.status_code == 200:
dsi_data = dsi_resp.json()
@@ -198,8 +266,9 @@ async def scan_website_endpoint(req: ScanRequest):
code=df["code"], severity=df["severity"], text=df["text"],
))
except Exception as e:
logger.warning("DSI discovery failed: %s", e)
logger.warning("DSI discovery failed: %s %s", type(e).__name__, e)
_progress(f"Schritt 3/7: Datenschutzerklaerung analysieren... ({len(discovered_docs)} Dokumente gefunden)")
# Step 2: Fetch privacy policy text
# Priority: 1) Playwright HTMLs, 2) DSI Discovery full_text, 3) httpx fallback
dse_text = ""
@@ -223,7 +292,7 @@ async def scan_website_endpoint(req: ScanRequest):
except Exception:
pass
if not dse_text:
dse_text = await _fetch_dse_text(req.url, scan.pages_scanned)
dse_text = await fetch_dse_text(req.url, scan.pages_scanned)
# Step 3: Extract services mentioned in DSE via LLM + text fallback
dse_services = await extract_dse_services(dse_text) if dse_text else []
@@ -248,10 +317,11 @@ async def scan_website_endpoint(req: ScanRequest):
dse_html = html
break
if not dse_html:
dse_html = await _fetch_dse_html(req.url, scan.pages_scanned)
dse_html = await fetch_dse_html(req.url, scan.pages_scanned)
dse_sections = parse_dse(dse_html, req.url) if dse_html else []
logger.info("Parsed %d DSE sections", len(dse_sections))
_progress("Schritt 4/7: SOLL/IST Vergleich...")
# Step 5: SOLL/IST comparison
detected_dicts = [_service_to_dict(s) for s in scan.detected_services]
comparison = compare_services(detected_dicts, dse_services)
@@ -290,13 +360,16 @@ async def scan_website_endpoint(req: ScanRequest):
# Step 8c: Add DSI document findings
findings.extend(dsi_findings)
_progress(f"Schritt 5/7: Korrekturen generieren... ({len(findings)} Findings)")
# Step 9: Generate corrections for pre-launch mode
if not is_live and findings:
await add_corrections(findings, dse_text)
_progress("Schritt 6/7: Report erstellen...")
# Step 7: Build summary
summary = build_scan_summary(req.url, scan, comparison, findings, is_live, discovered_docs)
_progress("Schritt 7/7: E-Mail senden...")
# Step 8: Send notification
mode_label = "INTERNE PRUEFUNG" if not is_live else "LIVE-WEBSITE"
email_result = send_email(
@@ -322,46 +395,6 @@ async def scan_website_endpoint(req: ScanRequest):
)
async def _fetch_dse_text(url: str, scanned_pages: list[str]) -> str:
"""Find and fetch the privacy policy page text."""
import re
# Find DSE URL from scanned pages
dse_url = None
for page in scanned_pages:
if re.search(r"datenschutz|privacy|dsgvo", page, re.IGNORECASE):
dse_url = page
break
if not dse_url:
dse_url = url # Fallback to provided URL
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
resp = await client.get(dse_url, headers={"User-Agent": "BreakPilot-Compliance-Agent/1.0"})
html = resp.text
clean = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
clean = re.sub(r"<[^>]+>", " ", clean)
clean = re.sub(r"\s+", " ", clean).strip()
return clean[:4000]
except Exception:
return ""
async def _fetch_dse_html(url: str, scanned_pages: list[str]) -> str:
"""Fetch the raw HTML of the privacy policy page (for structured parsing)."""
import re
dse_url = None
for page in scanned_pages:
if re.search(r"datenschutz|privacy|dsgvo", page, re.IGNORECASE):
dse_url = page
break
if not dse_url:
dse_url = url
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
resp = await client.get(dse_url, headers={"User-Agent": "BreakPilot-Compliance-Agent/1.0"})
return resp.text
except Exception:
return ""
def _service_to_dict(svc: DetectedService) -> dict: