""" Agent Analyze Routes — combined endpoint that fetches a URL, classifies it, assesses DSGVO compliance, and sends a notification email. POST /api/compliance/agent/analyze """ import logging import re import os from datetime import datetime, timezone import httpx from fastapi import APIRouter from pydantic import BaseModel from compliance.services.smtp_sender import send_email logger = logging.getLogger(__name__) router = APIRouter(prefix="/compliance/agent", tags=["agent"]) SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090") TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" USER_ID = "00000000-0000-0000-0000-000000000001" ESCALATION_ROLES = { "E0": "Kein Handlungsbedarf", "E1": "Teamleitung Datenschutz", "E2": "Datenschutzbeauftragter (DSB)", "E3": "DSB + Rechtsabteilung", } SDK_HEADERS = { "Content-Type": "application/json", "X-Tenant-ID": TENANT_ID, "X-User-ID": USER_ID, } class AnalyzeRequest(BaseModel): url: str recipient: str = "dsb@breakpilot.local" class AnalyzeResponse(BaseModel): url: str classification: str risk_level: str risk_score: float escalation_level: str responsible_role: str findings: list[str] required_controls: list[str] summary: str email_status: str analyzed_at: str @router.post("/analyze", response_model=AnalyzeResponse) async def analyze_url(req: AnalyzeRequest): """Fetch URL, classify, assess compliance, and notify responsible role.""" async with httpx.AsyncClient(timeout=60.0) as client: # Step 1: Fetch and clean text = await _fetch_and_clean(client, req.url) # Step 2: Classify via SDK LLM classification = await _classify(client, text) # Step 3: Assess via UCCA assessment = await _assess(client, text, classification) # Step 4: Determine role esc_level = assessment.get("escalation_level", "E0") role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"]) # Step 5: Build summary findings = assessment.get("triggered_rules", []) controls = assessment.get("required_controls", []) summary = _build_summary(req.url, classification, assessment, role) # Step 6: Send notification email_result = send_email( recipient=req.recipient, subject=f"Compliance-Finding: {classification} — {req.url[:60]}", body_html=f"
{summary}
", ) return AnalyzeResponse( url=req.url, classification=classification, risk_level=assessment.get("risk_level", "unknown"), risk_score=assessment.get("risk_score", 0), escalation_level=esc_level, responsible_role=role, findings=findings if isinstance(findings, list) else [str(findings)], required_controls=controls if isinstance(controls, list) else [str(controls)], summary=summary, email_status=email_result.get("status", "failed"), analyzed_at=datetime.now(timezone.utc).isoformat(), ) async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> str: """Fetch URL and strip HTML to plain text.""" resp = await client.get(url, follow_redirects=True, headers={ "User-Agent": "BreakPilot-Compliance-Agent/1.0", }) html = resp.text # Strip script/style blocks, then all tags clean = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) clean = re.sub(r"<[^>]+>", " ", clean) clean = re.sub(r" ", " ", clean) clean = re.sub(r"\s+", " ", clean).strip() return clean[:4000] async def _classify(client: httpx.AsyncClient, text: str) -> str: """Classify document type via SDK LLM chat.""" try: resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={ "messages": [ {"role": "system", "content": ( "/no_think\n" "Klassifiziere das Dokument in GENAU EINE Kategorie: " "privacy_policy, cookie_banner, terms_of_service, imprint, dpa, other. " "Antworte NUR mit dem Kategorienamen, nichts anderes. Kein Denken, keine Erklaerung." )}, {"role": "user", "content": text[:2000]}, ], }) data = resp.json() # Qwen 3.5 may use think mode — content can be in message.content or response raw = ( data.get("response", "") or data.get("content", "") or (data.get("message", {}) or {}).get("content", "") or "" ).strip().lower() # Strip Qwen think tags if present raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() logger.info("Classification raw response: %s", raw[:200]) for cat in ["privacy_policy", "cookie_banner", "terms_of_service", "imprint", "dpa"]: if cat in raw: return cat # Also check German terms if "datenschutz" in raw: return "privacy_policy" if "cookie" in raw: return "cookie_banner" if "impressum" in raw: return "imprint" return "other" except Exception as e: logger.warning("Classification failed: %s", e) return "other" async def _assess(client: httpx.AsyncClient, text: str, classification: str) -> dict: """Run UCCA assessment via SDK.""" try: resp = await client.post(f"{SDK_URL}/sdk/v1/ucca/assess", headers=SDK_HEADERS, json={ "use_case_text": text[:3000], "domain": classification, "data_categories": ["personal_data", "tracking", "cookies", "third_party_sharing"], }) return resp.json() except Exception as e: logger.warning("Assessment failed: %s", e) return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"} def _build_summary(url: str, classification: str, assessment: dict, role: str) -> str: """Build a German manager summary.""" risk = assessment.get("risk_level", "unbekannt") score = assessment.get("risk_score", 0) findings = assessment.get("triggered_rules", []) controls = assessment.get("required_controls", []) findings_text = "\n".join(f"- {f}" for f in findings[:5]) if findings else "Keine" controls_text = "\n".join(f"- {c}" for c in controls[:5]) if controls else "Keine" return ( f"Dokumenttyp: {classification}\n" f"Quelle: {url}\n" f"Risikobewertung: {risk} ({score}/100)\n" f"Zustaendig: {role}\n\n" f"Findings:\n{findings_text}\n\n" f"Erforderliche Massnahmen:\n{controls_text}" )