"""
Agent Analyze Routes — combined endpoint that fetches a URL, classifies it,
assesses DSGVO compliance, and sends a notification email.
POST /api/compliance/agent/analyze
"""
import logging
import re
import os
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090")
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
USER_ID = "00000000-0000-0000-0000-000000000001"
ESCALATION_ROLES = {
"E0": "Kein Handlungsbedarf",
"E1": "Teamleitung Datenschutz",
"E2": "Datenschutzbeauftragter (DSB)",
"E3": "DSB + Rechtsabteilung",
}
SDK_HEADERS = {
"Content-Type": "application/json",
"X-Tenant-ID": TENANT_ID,
"X-User-ID": USER_ID,
}
class AnalyzeRequest(BaseModel):
url: str
recipient: str = "dsb@breakpilot.local"
class AnalyzeResponse(BaseModel):
url: str
classification: str
risk_level: str
risk_score: float
escalation_level: str
responsible_role: str
findings: list[str]
required_controls: list[str]
summary: str
email_status: str
analyzed_at: str
@router.post("/analyze", response_model=AnalyzeResponse)
async def analyze_url(req: AnalyzeRequest):
"""Fetch URL, classify, assess compliance, and notify responsible role."""
async with httpx.AsyncClient(timeout=60.0) as client:
# Step 1: Fetch and clean
text = await _fetch_and_clean(client, req.url)
# Step 2: Classify via SDK LLM
classification = await _classify(client, text)
# Step 3: Assess via UCCA
assessment = await _assess(client, text, classification)
# Step 4: Determine role
esc_level = assessment.get("escalation_level", "E0")
role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"])
# Step 5: Build summary
findings = assessment.get("triggered_rules", [])
controls = assessment.get("required_controls", [])
summary = _build_summary(req.url, classification, assessment, role)
# Step 6: Send notification
email_result = send_email(
recipient=req.recipient,
subject=f"Compliance-Finding: {classification} — {req.url[:60]}",
body_html=f"
{summary}
",
)
return AnalyzeResponse(
url=req.url,
classification=classification,
risk_level=assessment.get("risk_level", "unknown"),
risk_score=assessment.get("risk_score", 0),
escalation_level=esc_level,
responsible_role=role,
findings=_to_string_list(findings),
required_controls=_to_string_list(controls),
summary=summary,
email_status=email_result.get("status", "failed"),
analyzed_at=datetime.now(timezone.utc).isoformat(),
)
async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> str:
"""Fetch URL and strip HTML to plain text."""
resp = await client.get(url, follow_redirects=True, headers={
"User-Agent": "BreakPilot-Compliance-Agent/1.0",
})
html = resp.text
# Strip script/style blocks, then all tags
clean = re.sub(r"<(script|style)[^>]*>.*?\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
clean = re.sub(r"<[^>]+>", " ", clean)
clean = re.sub(r" ", " ", clean)
clean = re.sub(r"\s+", " ", clean).strip()
return clean[:4000]
async def _classify(client: httpx.AsyncClient, text: str) -> str:
"""Classify document type via SDK LLM chat."""
try:
resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={
"messages": [
{"role": "system", "content": (
"/no_think\n"
"Klassifiziere das Dokument in GENAU EINE Kategorie: "
"privacy_policy, cookie_banner, terms_of_service, imprint, dpa, other. "
"Antworte NUR mit dem Kategorienamen, nichts anderes. Kein Denken, keine Erklaerung."
)},
{"role": "user", "content": text[:2000]},
],
})
data = resp.json()
# Qwen 3.5 may use think mode — content can be in message.content or response
raw = (
data.get("response", "")
or data.get("content", "")
or (data.get("message", {}) or {}).get("content", "")
or ""
).strip().lower()
# Strip Qwen think tags if present
raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip()
logger.info("Classification raw response: %s", raw[:200])
for cat in ["privacy_policy", "cookie_banner", "terms_of_service", "imprint", "dpa"]:
if cat in raw:
return cat
# Also check German terms
if "datenschutz" in raw:
return "privacy_policy"
if "cookie" in raw:
return "cookie_banner"
if "impressum" in raw:
return "imprint"
return "other"
except Exception as e:
logger.warning("Classification failed: %s", e)
return "other"
async def _assess(client: httpx.AsyncClient, text: str, classification: str) -> dict:
"""Run UCCA assessment via SDK. Returns flattened result dict."""
try:
# UCCA expects boolean intake flags, not string categories
resp = await client.post(f"{SDK_URL}/sdk/v1/ucca/assess", headers=SDK_HEADERS, json={
"use_case_text": text[:3000],
"domain": classification,
"data_types": {
"personal_data": True,
"customer_data": True,
"location_data": "tracking" in text.lower() or "standort" in text.lower(),
"images": False,
"biometric_data": "biometrisch" in text.lower(),
"minor_data": "kinder" in text.lower() or "minderjährig" in text.lower(),
},
"purpose": {
"marketing": "werbung" in text.lower() or "marketing" in text.lower(),
"analytics": "analyse" in text.lower() or "analytics" in text.lower(),
"profiling": "profil" in text.lower() or "personalis" in text.lower(),
"automation": False,
"customer_support": False,
},
"automation": "partially_automated",
"outputs": {
"content_generation": False,
"recommendations_to_users": "empfehl" in text.lower(),
"data_export": "export" in text.lower() or "uebertrag" in text.lower(),
},
})
data = resp.json()
# Flatten: UCCA wraps result under "assessment" and "result"
assessment = data.get("assessment", data.get("result", data))
result = data.get("result", {})
return {
"risk_level": assessment.get("risk_level", result.get("risk_level", "unknown")),
"risk_score": assessment.get("risk_score", result.get("risk_score", 0)),
"escalation_level": _risk_to_escalation(assessment.get("risk_level", "")),
"triggered_rules": assessment.get("triggered_rules", result.get("triggered_rules", [])),
"required_controls": assessment.get("required_controls", result.get("required_controls", [])),
"summary": result.get("summary", ""),
"recommendation": result.get("recommendation", ""),
"dsfa_recommended": assessment.get("dsfa_recommended", False),
}
except Exception as e:
logger.warning("Assessment failed: %s", e)
return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"}
def _to_string_list(items: list) -> list[str]:
"""Convert list of dicts or strings to list of strings."""
result = []
for item in (items or []):
if isinstance(item, dict):
# UCCA returns {code, category, description} or {id, name, description}
desc = item.get("description", item.get("name", item.get("code", str(item))))
code = item.get("code", item.get("id", ""))
result.append(f"[{code}] {desc}" if code else str(desc))
else:
result.append(str(item))
return result
def _risk_to_escalation(risk_level: str) -> str:
"""Map UCCA risk level to escalation level."""
mapping = {
"MINIMAL": "E0",
"LIMITED": "E1",
"HIGH": "E2",
"UNACCEPTABLE": "E3",
}
return mapping.get(risk_level.upper() if risk_level else "", "E0")
def _build_summary(url: str, classification: str, assessment: dict, role: str) -> str:
"""Build a German manager summary."""
risk = assessment.get("risk_level", "unbekannt")
score = assessment.get("risk_score", 0)
findings = assessment.get("triggered_rules", [])
controls = assessment.get("required_controls", [])
recommendation = assessment.get("recommendation", "")
dsfa = assessment.get("dsfa_recommended", False)
findings_text = "\n".join(f"- {f}" for f in findings[:5]) if findings else "Keine"
controls_text = "\n".join(f"- {c}" for c in controls[:5]) if controls else "Keine"
parts = [
f"Dokumenttyp: {classification}",
f"Quelle: {url}",
f"Risikobewertung: {risk} ({score}/100)",
f"Zustaendig: {role}",
f"DSFA empfohlen: {'Ja' if dsfa else 'Nein'}",
"",
f"Findings:\n{findings_text}",
"",
f"Erforderliche Massnahmen:\n{controls_text}",
]
if recommendation:
parts.extend(["", f"Empfehlung: {recommendation}"])
return "\n".join(parts)