diff --git a/backend-compliance/compliance/api/agent_scan_routes.py b/backend-compliance/compliance/api/agent_scan_routes.py new file mode 100644 index 0000000..c308974 --- /dev/null +++ b/backend-compliance/compliance/api/agent_scan_routes.py @@ -0,0 +1,302 @@ +""" +Agent Website Scan Routes — deep scan endpoint that performs multi-page +website analysis with SOLL/IST service comparison. + +POST /api/compliance/agent/scan +""" + +import logging +import os +from datetime import datetime, timezone + +import httpx +from fastapi import APIRouter +from pydantic import BaseModel + +from compliance.services.website_scanner import scan_website, DetectedService +from compliance.services.dse_service_extractor import extract_dse_services, compare_services +from compliance.services.smtp_sender import send_email + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/compliance/agent", tags=["agent"]) + +SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090") +TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" +USER_ID = "00000000-0000-0000-0000-000000000001" +SDK_HEADERS = { + "Content-Type": "application/json", + "X-Tenant-ID": TENANT_ID, + "X-User-ID": USER_ID, +} + + +class ScanRequest(BaseModel): + url: str + mode: str = "post_launch" + recipient: str = "dsb@breakpilot.local" + + +class ServiceInfo(BaseModel): + name: str + category: str + provider: str + country: str + eu_adequate: bool + requires_consent: bool + legal_ref: str + in_dse: bool + status: str # "ok", "undocumented", "outdated" + + +class ScanFinding(BaseModel): + code: str + severity: str + text: str + correction: str = "" + + +class ScanResponse(BaseModel): + url: str + pages_scanned: int + services: list[ServiceInfo] + findings: list[ScanFinding] + ai_detected: bool + chatbot_detected: bool + chatbot_provider: str + missing_pages: dict + summary: str + email_status: str + scanned_at: str + + +@router.post("/scan", response_model=ScanResponse) +async def scan_website_endpoint(req: ScanRequest): + """Deep website scan: multi-page crawl + SOLL/IST service comparison.""" + is_live = req.mode == "post_launch" + + # Step 1: Scan website (5-10 pages) + scan = await scan_website(req.url) + logger.info("Scanned %d pages, found %d services", len(scan.pages_scanned), len(scan.detected_services)) + + # Step 2: Fetch privacy policy text for SOLL extraction + dse_text = await _fetch_dse_text(req.url, scan.pages_scanned) + + # Step 3: Extract services mentioned in DSE via LLM + dse_services = await extract_dse_services(dse_text) if dse_text else [] + logger.info("DSE mentions %d services", len(dse_services)) + + # Step 4: SOLL/IST comparison + detected_dicts = [_service_to_dict(s) for s in scan.detected_services] + comparison = compare_services(detected_dicts, dse_services) + + # Step 5: Generate findings + services_info, findings = _build_findings(comparison, scan, is_live) + + # Step 6: Generate corrections for pre-launch mode + if not is_live and findings: + await _add_corrections(findings, dse_text) + + # Step 7: Build summary + summary = _build_scan_summary(req.url, scan, comparison, findings, is_live) + + # Step 8: Send notification + mode_label = "INTERNE PRUEFUNG" if not is_live else "LIVE-WEBSITE" + email_result = send_email( + recipient=req.recipient, + subject=f"[{mode_label}] Website-Scan: {req.url[:50]}", + body_html=f"
{summary}
", + ) + + return ScanResponse( + url=req.url, + pages_scanned=len(scan.pages_scanned), + services=services_info, + findings=findings, + ai_detected=len(scan.ai_mentions) > 0, + chatbot_detected=scan.chatbot_detected, + chatbot_provider=scan.chatbot_provider, + missing_pages=scan.missing_pages, + summary=summary, + email_status=email_result.get("status", "failed"), + scanned_at=datetime.now(timezone.utc).isoformat(), + ) + + +async def _fetch_dse_text(url: str, scanned_pages: list[str]) -> str: + """Find and fetch the privacy policy page text.""" + import re + # Find DSE URL from scanned pages + dse_url = None + for page in scanned_pages: + if re.search(r"datenschutz|privacy|dsgvo", page, re.IGNORECASE): + dse_url = page + break + if not dse_url: + dse_url = url # Fallback to provided URL + + try: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + resp = await client.get(dse_url, headers={"User-Agent": "BreakPilot-Compliance-Agent/1.0"}) + html = resp.text + clean = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) + clean = re.sub(r"<[^>]+>", " ", clean) + clean = re.sub(r"\s+", " ", clean).strip() + return clean[:4000] + except Exception: + return "" + + +def _service_to_dict(svc: DetectedService) -> dict: + return { + "id": svc.id, "name": svc.name, "category": svc.category, + "provider": svc.provider, "country": svc.country, + "eu_adequate": svc.eu_adequate, "requires_consent": svc.requires_consent, + "legal_ref": svc.legal_ref, + } + + +def _build_findings( + comparison: dict, scan, is_live: bool, +) -> tuple[list[ServiceInfo], list[ScanFinding]]: + """Build service info list and findings from comparison.""" + services = [] + findings = [] + + # Undocumented services (on website, NOT in DSE) + for svc in comparison["undocumented"]: + services.append(ServiceInfo( + name=svc["name"], category=svc.get("category", "other"), + provider=svc.get("provider", ""), country=svc.get("country", ""), + eu_adequate=svc.get("eu_adequate", False), + requires_consent=svc.get("requires_consent", False), + legal_ref=svc.get("legal_ref", ""), in_dse=False, status="undocumented", + )) + severity = "HIGH" if is_live else "MEDIUM" + findings.append(ScanFinding( + code=f"DSE-MISSING-{svc['id'].upper()}", + severity=severity, + text=f"{svc['name']} ({svc.get('provider', '')}, {svc.get('country', '')}) " + f"ist auf der Website eingebunden aber NICHT in der Datenschutzerklaerung " + f"dokumentiert (Art. 13 DSGVO).", + )) + + # Documented services (OK) + for item in comparison["documented"]: + svc = item["detected"] + services.append(ServiceInfo( + name=svc["name"], category=svc.get("category", "other"), + provider=svc.get("provider", ""), country=svc.get("country", ""), + eu_adequate=svc.get("eu_adequate", False), + requires_consent=svc.get("requires_consent", False), + legal_ref=svc.get("legal_ref", ""), in_dse=True, status="ok", + )) + # Check third-country transfer + if not svc.get("eu_adequate", False): + findings.append(ScanFinding( + code=f"TRANSFER-{svc['id'].upper()}", + severity="MEDIUM", + text=f"{svc['name']} ({svc.get('country', '')}) — Drittlandtransfer. " + f"Pruefen ob SCCs oder Angemessenheitsbeschluss dokumentiert sind.", + )) + + # Outdated services (in DSE, NOT on website) + for svc in comparison["outdated"]: + services.append(ServiceInfo( + name=svc["name"], category="other", + provider=svc.get("provider", ""), country=svc.get("country", ""), + eu_adequate=True, requires_consent=False, + legal_ref="", in_dse=True, status="outdated", + )) + findings.append(ScanFinding( + code=f"DSE-OUTDATED-{svc['name'].upper().replace(' ', '_')[:20]}", + severity="LOW", + text=f"{svc['name']} in Datenschutzerklaerung erwaehnt aber auf der Website " + f"nicht mehr gefunden. Eintrag bei naechster Aktualisierung entfernen.", + )) + + # Missing pages (e.g., /impressum returns 404) + for page_url, status_code in scan.missing_pages.items(): + if "impressum" in page_url.lower(): + findings.append(ScanFinding( + code="MISSING-IMPRESSUM", + severity="HIGH", + text=f"Impressum-Seite gibt HTTP {status_code} zurueck (§5 TMG Verstoss).", + )) + + return services, findings + + +async def _add_corrections(findings: list[ScanFinding], dse_text: str) -> None: + """Add correction suggestions for pre-launch mode via LLM.""" + for finding in findings: + if finding.severity in ("HIGH", "MEDIUM") and "MISSING" in finding.code: + service_name = finding.code.replace("DSE-MISSING-", "").replace("_", " ").title() + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={ + "messages": [ + {"role": "system", "content": ( + "/no_think\n" + "Du bist Datenschutzexperte. Erstelle einen einbaufertigen " + "Textbaustein fuer eine deutsche Datenschutzerklaerung fuer " + f"den Dienst '{service_name}'. Enthalte: Ueberschrift, " + "Anbietername, Zweck, Rechtsgrundlage nach DSGVO, " + "Drittlandtransfer-Hinweis wenn noetig, " + "Widerspruchsmoeglichkeit. Max 150 Woerter." + )}, + {"role": "user", "content": f"Erstelle DSE-Textbaustein fuer: {service_name}"}, + ], + }) + data = resp.json() + import re + raw = ( + data.get("response", "") + or (data.get("message", {}) or {}).get("content", "") + or "" + ).strip() + raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() + if raw: + finding.correction = raw + except Exception as e: + logger.warning("Correction generation failed for %s: %s", service_name, e) + + +def _build_scan_summary( + url: str, scan, comparison: dict, findings: list[ScanFinding], is_live: bool, +) -> str: + """Build German scan summary.""" + mode = "PRUEFUNG LIVE-WEBSITE" if is_live else "INTERNE PRUEFUNG" + n_undoc = len(comparison["undocumented"]) + n_ok = len(comparison["documented"]) + n_outdated = len(comparison["outdated"]) + n_findings = len(findings) + high = sum(1 for f in findings if f.severity == "HIGH") + + parts = [ + f"{mode} — Website-Scan", + f"URL: {url}", + f"Seiten gescannt: {len(scan.pages_scanned)}", + "", + f"Dienstleister-Abgleich (DSE vs. Website):", + f" Korrekt dokumentiert: {n_ok}", + f" NICHT in DSE (Verstoss): {n_undoc}", + f" Veraltet in DSE: {n_outdated}", + "", + f"Findings: {n_findings} ({high} mit hoher Prioritaet)", + ] + + if findings: + parts.append("") + for f in findings[:10]: + marker = "!!" if f.severity == "HIGH" else "!" if f.severity == "MEDIUM" else "i" + parts.append(f" [{marker}] {f.text}") + + if is_live and high > 0: + parts.extend([ + "", + "ACHTUNG: Verstoesse auf einer bereits veroeffentlichten Website. " + "Sofortige Korrektur empfohlen.", + ]) + + return "\n".join(parts) diff --git a/backend-compliance/compliance/services/dse_service_extractor.py b/backend-compliance/compliance/services/dse_service_extractor.py new file mode 100644 index 0000000..ab2917e --- /dev/null +++ b/backend-compliance/compliance/services/dse_service_extractor.py @@ -0,0 +1,127 @@ +""" +DSE Service Extractor — extracts mentioned third-party services from +a privacy policy text using LLM (Qwen) and compares against detected services. + +Produces SOLL/IST comparison: what's in the DSE vs. what's on the website. +""" + +import logging +import os +import re + +import httpx + +logger = logging.getLogger(__name__) + +SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090") +TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" +USER_ID = "00000000-0000-0000-0000-000000000001" + +SDK_HEADERS = { + "Content-Type": "application/json", + "X-Tenant-ID": TENANT_ID, + "X-User-ID": USER_ID, +} + + +async def extract_dse_services(dse_text: str) -> list[dict]: + """Extract mentioned services from privacy policy text via LLM.""" + prompt = ( + "/no_think\n" + "Extrahiere aus dieser Datenschutzerklaerung ALLE erwaehnten Dienstleister, " + "Tools und externen Dienste. Fuer jeden nenne:\n" + "- name: Name des Dienstes (z.B. 'Google Analytics')\n" + "- purpose: Zweck (z.B. 'Webanalyse')\n" + "- country: Land/Sitz (z.B. 'USA')\n" + "- legal_basis: Genannte Rechtsgrundlage (z.B. 'Einwilligung')\n\n" + "Antworte als JSON-Array. Wenn keine Dienstleister erwaehnt werden, " + "antworte mit [].\n" + "Beispiel: [{\"name\": \"Google Analytics\", \"purpose\": \"Webanalyse\", " + "\"country\": \"USA\", \"legal_basis\": \"Einwilligung\"}]" + ) + try: + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={ + "messages": [ + {"role": "system", "content": prompt}, + {"role": "user", "content": dse_text[:3500]}, + ], + }) + data = resp.json() + raw = ( + data.get("response", "") + or (data.get("message", {}) or {}).get("content", "") + or "" + ).strip() + raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() + # Extract JSON array from response + match = re.search(r"\[.*\]", raw, re.DOTALL) + if match: + import json + return json.loads(match.group()) + except Exception as e: + logger.warning("DSE service extraction failed: %s", e) + return [] + + +def compare_services( + detected: list[dict], dse_services: list[dict], +) -> dict: + """Compare detected website services against DSE-mentioned services. + + Returns dict with three categories: + - undocumented: on website but NOT in DSE (Art. 13 violation) + - outdated: in DSE but NOT on website (cleanup) + - documented: on website AND in DSE (OK, check details) + """ + # Normalize names for matching + def normalize(name: str) -> str: + return re.sub(r"[^a-z0-9]", "", name.lower()) + + detected_names = {normalize(d["name"]): d for d in detected} + dse_names = {normalize(d["name"]): d for d in dse_services} + + undocumented = [] + documented = [] + outdated = [] + + for key, svc in detected_names.items(): + # Skip CMP — consent managers don't need DSE mention + if svc.get("category") == "other" and svc.get("id") == "cmp": + continue + matched = False + for dse_key, dse_svc in dse_names.items(): + if key == dse_key or _fuzzy_match(svc["name"], dse_svc["name"]): + documented.append({"detected": svc, "dse": dse_svc, "status": "ok"}) + matched = True + break + if not matched: + undocumented.append(svc) + + for key, dse_svc in dse_names.items(): + matched = False + for det_key in detected_names: + if key == det_key or _fuzzy_match(dse_svc["name"], detected_names[det_key]["name"]): + matched = True + break + if not matched: + outdated.append(dse_svc) + + return { + "undocumented": undocumented, + "documented": documented, + "outdated": outdated, + } + + +def _fuzzy_match(a: str, b: str) -> bool: + """Simple fuzzy matching — checks if one name contains the core of the other.""" + a_lower = a.lower() + b_lower = b.lower() + # Direct substring + if a_lower in b_lower or b_lower in a_lower: + return True + # Core word match (e.g., "Google" in "Google Analytics" and "Google Ireland") + a_words = set(re.findall(r"\w{4,}", a_lower)) + b_words = set(re.findall(r"\w{4,}", b_lower)) + return bool(a_words & b_words) diff --git a/backend-compliance/compliance/services/website_scanner.py b/backend-compliance/compliance/services/website_scanner.py new file mode 100644 index 0000000..18256a7 --- /dev/null +++ b/backend-compliance/compliance/services/website_scanner.py @@ -0,0 +1,248 @@ +""" +Website Scanner — scans multiple pages of a website for third-party services, +chatbots, tracking, AI indicators, and compares against privacy policy. + +Used by the Compliance Agent for SOLL/IST analysis. +""" + +import logging +import re +from dataclasses import dataclass, field +from urllib.parse import urljoin, urlparse + +import httpx + +logger = logging.getLogger(__name__) + +USER_AGENT = "BreakPilot-Compliance-Agent/1.0" + + +@dataclass +class DetectedService: + id: str + name: str + category: str # "tracking", "chatbot", "cdn", "payment", "marketing", "other" + provider: str + country: str + eu_adequate: bool + requires_consent: bool + legal_ref: str + found_on: str = "" # URL where detected + + +@dataclass +class ScanResult: + pages_scanned: list[str] = field(default_factory=list) + detected_services: list[DetectedService] = field(default_factory=list) + ai_mentions: list[str] = field(default_factory=list) + chatbot_detected: bool = False + chatbot_provider: str = "" + missing_pages: dict = field(default_factory=dict) # url -> status_code + + +# ── Service Registry ────────────────────────────────────────────────────────── +# Each entry: regex pattern -> service metadata +SERVICE_REGISTRY: dict[str, dict] = { + # --- Tracking & Analytics --- + r"google.?analytics|gtag\(|UA-\d+|G-\w{5,}": { + "id": "google_analytics", "name": "Google Analytics", "category": "tracking", + "provider": "Google LLC", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, §25 TDDDG", + }, + r"googletagmanager|gtm\.js": { + "id": "google_tag_manager", "name": "Google Tag Manager", "category": "tracking", + "provider": "Google LLC", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO", + }, + r"facebook\.net/.*fbevents|fbq\(": { + "id": "facebook_pixel", "name": "Meta/Facebook Pixel", "category": "marketing", + "provider": "Meta Platforms", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, §25 TDDDG", + }, + r"hotjar\.com|_hjSettings": { + "id": "hotjar", "name": "Hotjar", "category": "tracking", + "provider": "Hotjar Ltd", "country": "MT", "eu_adequate": True, + "requires_consent": True, "legal_ref": "§25 TDDDG (Session Recording)", + }, + r"clarity\.ms": { + "id": "ms_clarity", "name": "Microsoft Clarity", "category": "tracking", + "provider": "Microsoft", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "§25 TDDDG (Session Replay), Art. 44 DSGVO", + }, + r"matomo|piwik": { + "id": "matomo", "name": "Matomo", "category": "tracking", + "provider": "InnoCraft/Self-hosted", "country": "EU/Self", "eu_adequate": True, + "requires_consent": False, "legal_ref": "Cookieless moeglich, §25 TDDDG", + }, + r"plausible\.io": { + "id": "plausible", "name": "Plausible Analytics", "category": "tracking", + "provider": "Plausible Insights", "country": "EE", "eu_adequate": True, + "requires_consent": False, "legal_ref": "EU-Anbieter, cookieless", + }, + # --- CDN & Fonts --- + r"fonts\.googleapis\.com|fonts\.gstatic\.com": { + "id": "google_fonts", "name": "Google Fonts (remote)", "category": "cdn", + "provider": "Google LLC", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "LG Muenchen I, Az. 3 O 17493/20", + }, + r"cdn\.cloudflare\.com|cdnjs\.cloudflare\.com": { + "id": "cloudflare_cdn", "name": "Cloudflare CDN", "category": "cdn", + "provider": "Cloudflare Inc", "country": "US", "eu_adequate": False, + "requires_consent": False, "legal_ref": "Art. 44-49 DSGVO, berechtigtes Interesse", + }, + # --- Chatbots --- + r"widget\.intercom\.io|intercomcdn": { + "id": "intercom", "name": "Intercom", "category": "chatbot", + "provider": "Intercom Inc", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, KI-gestuetzt", + }, + r"tidio\.co|tidioChatApi": { + "id": "tidio", "name": "Tidio Chat", "category": "chatbot", + "provider": "Tidio LLC", "country": "PL", "eu_adequate": True, + "requires_consent": False, "legal_ref": "EU-Anbieter", + }, + r"zendesk\.com/embeddable|zdassets": { + "id": "zendesk", "name": "Zendesk", "category": "chatbot", + "provider": "Zendesk Inc", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO", + }, + # --- Payment --- + r"js\.stripe\.com|stripe\.com/v3": { + "id": "stripe", "name": "Stripe", "category": "payment", + "provider": "Stripe Inc", "country": "US", "eu_adequate": False, + "requires_consent": False, "legal_ref": "Art. 6(1)(b) Vertragserfuellung, SCCs", + }, + r"paypal\.com/sdk|paypalobjects": { + "id": "paypal", "name": "PayPal", "category": "payment", + "provider": "PayPal Holdings", "country": "US", "eu_adequate": False, + "requires_consent": False, "legal_ref": "Art. 6(1)(b) Vertragserfuellung", + }, + r"klarna\.com|klarna-payments": { + "id": "klarna", "name": "Klarna", "category": "payment", + "provider": "Klarna AB", "country": "SE", "eu_adequate": True, + "requires_consent": False, "legal_ref": "EU, aber Art. 22 DSGVO bei Bonitaetspruefung!", + }, + # --- Captcha --- + r"recaptcha|grecaptcha": { + "id": "recaptcha", "name": "Google reCAPTCHA", "category": "other", + "provider": "Google LLC", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, §25 TDDDG", + }, + # --- Video --- + r"youtube\.com/embed|youtube-nocookie|ytimg": { + "id": "youtube", "name": "YouTube", "category": "other", + "provider": "Google LLC", "country": "US", "eu_adequate": False, + "requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, 2-Klick empfohlen", + }, + # --- Consent Management --- + r"didomi|cookiebot|onetrust|usercentrics|consentmanager|quantcast": { + "id": "cmp", "name": "Consent Management Platform", "category": "other", + "provider": "Various", "country": "EU", "eu_adequate": True, + "requires_consent": False, "legal_ref": "CMP vorhanden — gut", + }, +} + +AI_TEXT_PATTERNS = [ + r"k(?:ue|ü)nstliche.?intelligenz", + r"artificial.?intelligence", + r"machine.?learning", + r"maschinelles.?lernen", + r"KI.?gest(?:ue|ü)tzt", + r"AI.?powered", + r"chatgpt|openai", + r"deep.?learning", + r"neural.?net", + r"automatisierte.?entscheidung", +] + +FOOTER_LINK_PATTERNS = [ + (r'href="([^"]*(?:impressum|imprint|legal-notice)[^"]*)"', "impressum"), + (r'href="([^"]*(?:datenschutz|privacy|dsgvo)[^"]*)"', "datenschutz"), + (r'href="([^"]*(?:agb|terms|nutzungsbedingungen)[^"]*)"', "agb"), + (r'href="([^"]*(?:cookie)[^"]*)"', "cookies"), +] + + +async def scan_website(base_url: str) -> ScanResult: + """Scan a website: start page + footer links for services and AI indicators.""" + result = ScanResult() + parsed = urlparse(base_url) + origin = f"{parsed.scheme}://{parsed.netloc}" + + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: + # 1. Fetch start page + start_html = await _fetch_page(client, origin, result) + if not start_html: + return result + + # 2. Discover footer links + page_urls = {origin} + page_urls.add(base_url) # Also scan the provided URL + for pattern, _ in FOOTER_LINK_PATTERNS: + for match in re.finditer(pattern, start_html, re.IGNORECASE): + href = match.group(1) + if href.startswith("/"): + href = urljoin(origin, href) + if href.startswith(origin): + page_urls.add(href) + + # 3. Scan all pages (max 10) + for url in list(page_urls)[:10]: + html = start_html if url == origin else await _fetch_page(client, url, result) + if html: + _detect_services(html, url, result) + _detect_ai_mentions(html, url, result) + + # Deduplicate services + seen = set() + unique = [] + for svc in result.detected_services: + if svc.id not in seen: + seen.add(svc.id) + unique.append(svc) + result.detected_services = unique + + result.chatbot_detected = any(s.category == "chatbot" for s in result.detected_services) + if result.chatbot_detected: + result.chatbot_provider = next( + s.name for s in result.detected_services if s.category == "chatbot" + ) + + return result + + +async def _fetch_page( + client: httpx.AsyncClient, url: str, result: ScanResult, +) -> str: + """Fetch a single page. Returns HTML or empty string on failure.""" + try: + resp = await client.get(url, headers={"User-Agent": USER_AGENT}) + result.pages_scanned.append(url) + if resp.status_code >= 400: + result.missing_pages[url] = resp.status_code + return "" + return resp.text + except Exception as e: + logger.warning("Failed to fetch %s: %s", url, e) + return "" + + +def _detect_services(html: str, url: str, result: ScanResult) -> None: + """Detect third-party services in HTML.""" + for pattern, meta in SERVICE_REGISTRY.items(): + if re.search(pattern, html, re.IGNORECASE): + result.detected_services.append(DetectedService( + found_on=url, **meta, + )) + + +def _detect_ai_mentions(html: str, url: str, result: ScanResult) -> None: + """Detect AI/ML text mentions in page content.""" + # Strip scripts/styles first for text-only search + clean = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) + clean = re.sub(r"<[^>]+>", " ", clean) + for pattern in AI_TEXT_PATTERNS: + match = re.search(pattern, clean, re.IGNORECASE) + if match: + context = clean[max(0, match.start() - 40):match.end() + 40].strip() + result.ai_mentions.append(f"{url}: ...{context}...") diff --git a/backend-compliance/main.py b/backend-compliance/main.py index 792c78a..e60d92d 100644 --- a/backend-compliance/main.py +++ b/backend-compliance/main.py @@ -44,6 +44,7 @@ from compliance.api.company_profile_routes import router as company_profile_rout # Agent (ZeroClaw compliance agent) from compliance.api.agent_notification_routes import router as agent_notify_router from compliance.api.agent_analyze_routes import router as agent_analyze_router +from compliance.api.agent_scan_routes import router as agent_scan_router # Middleware from middleware import ( @@ -142,6 +143,7 @@ app.include_router(company_profile_router, prefix="/api") # Agent (ZeroClaw compliance agent → analyze + email via SMTP) app.include_router(agent_notify_router, prefix="/api") app.include_router(agent_analyze_router, prefix="/api") +app.include_router(agent_scan_router, prefix="/api") if __name__ == "__main__":