Add autonomous compliance agent that fetches web documents (cookie banners, privacy policies), classifies them via Qwen/Ollama, assesses DSGVO compliance, assigns to the responsible role, and sends notification emails. Components: - ZeroClaw SOP (6-step workflow: fetch, classify, assess, summarize, assign, notify) - Backend: /api/compliance/agent/analyze (combined endpoint) - Backend: /api/compliance/agent/notify (standalone email) - Frontend: /sdk/agent page (Manager UI with URL input + results) - Helper scripts + E2E test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
174 lines
5.9 KiB
Python
174 lines
5.9 KiB
Python
"""
|
|
Agent Analyze Routes — combined endpoint that fetches a URL, classifies it,
|
|
assesses DSGVO compliance, and sends a notification email.
|
|
|
|
POST /api/compliance/agent/analyze
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
from compliance.services.smtp_sender import send_email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
SDK_URL = os.environ.get("AI_SDK_URL", "http://ai-compliance-sdk:8093")
|
|
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
USER_ID = "00000000-0000-0000-0000-000000000001"
|
|
|
|
ESCALATION_ROLES = {
|
|
"E0": "Kein Handlungsbedarf",
|
|
"E1": "Teamleitung Datenschutz",
|
|
"E2": "Datenschutzbeauftragter (DSB)",
|
|
"E3": "DSB + Rechtsabteilung",
|
|
}
|
|
|
|
SDK_HEADERS = {
|
|
"Content-Type": "application/json",
|
|
"X-Tenant-ID": TENANT_ID,
|
|
"X-User-ID": USER_ID,
|
|
}
|
|
|
|
|
|
class AnalyzeRequest(BaseModel):
|
|
url: str
|
|
recipient: str = "dsb@breakpilot.local"
|
|
|
|
|
|
class AnalyzeResponse(BaseModel):
|
|
url: str
|
|
classification: str
|
|
risk_level: str
|
|
risk_score: float
|
|
escalation_level: str
|
|
responsible_role: str
|
|
findings: list[str]
|
|
required_controls: list[str]
|
|
summary: str
|
|
email_status: str
|
|
analyzed_at: str
|
|
|
|
|
|
@router.post("/analyze", response_model=AnalyzeResponse)
|
|
async def analyze_url(req: AnalyzeRequest):
|
|
"""Fetch URL, classify, assess compliance, and notify responsible role."""
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
# Step 1: Fetch and clean
|
|
text = await _fetch_and_clean(client, req.url)
|
|
|
|
# Step 2: Classify via SDK LLM
|
|
classification = await _classify(client, text)
|
|
|
|
# Step 3: Assess via UCCA
|
|
assessment = await _assess(client, text, classification)
|
|
|
|
# Step 4: Determine role
|
|
esc_level = assessment.get("escalation_level", "E0")
|
|
role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"])
|
|
|
|
# Step 5: Build summary
|
|
findings = assessment.get("triggered_rules", [])
|
|
controls = assessment.get("required_controls", [])
|
|
summary = _build_summary(req.url, classification, assessment, role)
|
|
|
|
# Step 6: Send notification
|
|
email_result = send_email(
|
|
recipient=req.recipient,
|
|
subject=f"Compliance-Finding: {classification} — {req.url[:60]}",
|
|
body_html=f"<div>{summary}</div>",
|
|
)
|
|
|
|
return AnalyzeResponse(
|
|
url=req.url,
|
|
classification=classification,
|
|
risk_level=assessment.get("risk_level", "unknown"),
|
|
risk_score=assessment.get("risk_score", 0),
|
|
escalation_level=esc_level,
|
|
responsible_role=role,
|
|
findings=findings if isinstance(findings, list) else [str(findings)],
|
|
required_controls=controls if isinstance(controls, list) else [str(controls)],
|
|
summary=summary,
|
|
email_status=email_result.get("status", "failed"),
|
|
analyzed_at=datetime.now(timezone.utc).isoformat(),
|
|
)
|
|
|
|
|
|
async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> str:
|
|
"""Fetch URL and strip HTML to plain text."""
|
|
resp = await client.get(url, follow_redirects=True, headers={
|
|
"User-Agent": "BreakPilot-Compliance-Agent/1.0",
|
|
})
|
|
html = resp.text
|
|
# Strip script/style blocks, then all tags
|
|
clean = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
|
|
clean = re.sub(r"<[^>]+>", " ", clean)
|
|
clean = re.sub(r" ", " ", clean)
|
|
clean = re.sub(r"\s+", " ", clean).strip()
|
|
return clean[:4000]
|
|
|
|
|
|
async def _classify(client: httpx.AsyncClient, text: str) -> str:
|
|
"""Classify document type via SDK LLM chat."""
|
|
try:
|
|
resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={
|
|
"messages": [
|
|
{"role": "system", "content": (
|
|
"Klassifiziere das Dokument in GENAU EINE Kategorie: "
|
|
"privacy_policy, cookie_banner, terms_of_service, imprint, dpa, other. "
|
|
"Antworte NUR mit dem Kategorienamen, nichts anderes."
|
|
)},
|
|
{"role": "user", "content": text[:2000]},
|
|
],
|
|
})
|
|
data = resp.json()
|
|
raw = data.get("response", data.get("content", "other")).strip().lower()
|
|
for cat in ["privacy_policy", "cookie_banner", "terms_of_service", "imprint", "dpa"]:
|
|
if cat in raw:
|
|
return cat
|
|
return "other"
|
|
except Exception as e:
|
|
logger.warning("Classification failed: %s", e)
|
|
return "other"
|
|
|
|
|
|
async def _assess(client: httpx.AsyncClient, text: str, classification: str) -> dict:
|
|
"""Run UCCA assessment via SDK."""
|
|
try:
|
|
resp = await client.post(f"{SDK_URL}/sdk/v1/ucca/assess", headers=SDK_HEADERS, json={
|
|
"use_case_text": text[:3000],
|
|
"domain": classification,
|
|
"data_categories": ["personal_data", "tracking", "cookies", "third_party_sharing"],
|
|
})
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.warning("Assessment failed: %s", e)
|
|
return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"}
|
|
|
|
|
|
def _build_summary(url: str, classification: str, assessment: dict, role: str) -> str:
|
|
"""Build a German manager summary."""
|
|
risk = assessment.get("risk_level", "unbekannt")
|
|
score = assessment.get("risk_score", 0)
|
|
findings = assessment.get("triggered_rules", [])
|
|
controls = assessment.get("required_controls", [])
|
|
|
|
findings_text = "\n".join(f"- {f}" for f in findings[:5]) if findings else "Keine"
|
|
controls_text = "\n".join(f"- {c}" for c in controls[:5]) if controls else "Keine"
|
|
|
|
return (
|
|
f"Dokumenttyp: {classification}\n"
|
|
f"Quelle: {url}\n"
|
|
f"Risikobewertung: {risk} ({score}/100)\n"
|
|
f"Zustaendig: {role}\n\n"
|
|
f"Findings:\n{findings_text}\n\n"
|
|
f"Erforderliche Massnahmen:\n{controls_text}"
|
|
)
|