- Backend: mode field in request, adapts summary tone and email subject - Pre-launch: "Implementieren Sie X vor Veroeffentlichung" - Post-launch: "ACHTUNG: Maengel sind oeffentlich sichtbar, sofortige Nachbesserung" - Frontend: Mode toggle (internes Dokument vs. Live-Website) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
402 lines
15 KiB
Python
402 lines
15 KiB
Python
"""
|
|
Agent Analyze Routes — combined endpoint that fetches a URL, classifies it,
|
|
assesses DSGVO compliance, and sends a notification email.
|
|
|
|
POST /api/compliance/agent/analyze
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
from compliance.services.smtp_sender import send_email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090")
|
|
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
USER_ID = "00000000-0000-0000-0000-000000000001"
|
|
|
|
ESCALATION_ROLES = {
|
|
"E0": "Kein Handlungsbedarf",
|
|
"E1": "Teamleitung Datenschutz",
|
|
"E2": "Datenschutzbeauftragter (DSB)",
|
|
"E3": "DSB + Rechtsabteilung",
|
|
}
|
|
|
|
SDK_HEADERS = {
|
|
"Content-Type": "application/json",
|
|
"X-Tenant-ID": TENANT_ID,
|
|
"X-User-ID": USER_ID,
|
|
}
|
|
|
|
|
|
class AnalyzeRequest(BaseModel):
|
|
url: str
|
|
recipient: str = "dsb@breakpilot.local"
|
|
mode: str = "post_launch" # "pre_launch" or "post_launch"
|
|
|
|
|
|
class FollowUpQuestion(BaseModel):
|
|
id: str
|
|
question: str
|
|
legal_basis: str
|
|
severity: str # "high", "medium", "low"
|
|
finding_if_no: str # Finding text if user answers "no"
|
|
|
|
|
|
class AnalyzeResponse(BaseModel):
|
|
url: str
|
|
classification: str
|
|
risk_level: str
|
|
risk_score: float
|
|
escalation_level: str
|
|
responsible_role: str
|
|
findings: list[str]
|
|
required_controls: list[str]
|
|
summary: str
|
|
email_status: str
|
|
analyzed_at: str
|
|
follow_up_questions: list[FollowUpQuestion] = []
|
|
|
|
|
|
@router.post("/analyze", response_model=AnalyzeResponse)
|
|
async def analyze_url(req: AnalyzeRequest):
|
|
"""Fetch URL, classify, assess compliance, and notify responsible role."""
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
# Step 1: Fetch and clean
|
|
text, raw_html = await _fetch_and_clean(client, req.url)
|
|
|
|
# Step 2: Classify via SDK LLM
|
|
classification = await _classify(client, text)
|
|
|
|
# Step 3: Assess via UCCA
|
|
assessment = await _assess(client, text, classification)
|
|
|
|
# Step 4: Determine role
|
|
esc_level = assessment.get("escalation_level", "E0")
|
|
role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"])
|
|
|
|
# Step 5: Website compliance checks (§312k BGB etc.)
|
|
site_findings, follow_ups = await _check_website_compliance(client, req.url, raw_html)
|
|
|
|
# Step 6: Merge findings
|
|
findings = assessment.get("triggered_rules", [])
|
|
controls = assessment.get("required_controls", [])
|
|
findings_str = _to_string_list(findings) + site_findings
|
|
controls_str = _to_string_list(controls)
|
|
|
|
# Escalate if website checks found issues
|
|
if site_findings and esc_level == "E0":
|
|
esc_level = "E1"
|
|
role = ESCALATION_ROLES["E1"]
|
|
|
|
summary = _build_summary(req.url, classification, assessment, role, findings_str, controls_str, req.mode)
|
|
|
|
# Step 7: Send notification
|
|
mode_label = "INTERNE PRUEFUNG" if req.mode == "pre_launch" else "LIVE-WEBSITE"
|
|
email_result = send_email(
|
|
recipient=req.recipient,
|
|
subject=f"[{mode_label}] Compliance-Finding: {classification} — {req.url[:60]}",
|
|
body_html=f"<div>{summary}</div>",
|
|
)
|
|
|
|
return AnalyzeResponse(
|
|
url=req.url,
|
|
classification=classification,
|
|
risk_level=assessment.get("risk_level", "unknown"),
|
|
risk_score=assessment.get("risk_score", 0),
|
|
escalation_level=esc_level,
|
|
responsible_role=role,
|
|
findings=findings_str,
|
|
required_controls=controls_str,
|
|
summary=summary,
|
|
email_status=email_result.get("status", "failed"),
|
|
analyzed_at=datetime.now(timezone.utc).isoformat(),
|
|
follow_up_questions=follow_ups,
|
|
)
|
|
|
|
|
|
async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> tuple[str, str]:
|
|
"""Fetch URL. Returns (clean_text, raw_html)."""
|
|
resp = await client.get(url, follow_redirects=True, headers={
|
|
"User-Agent": "BreakPilot-Compliance-Agent/1.0",
|
|
})
|
|
html = resp.text
|
|
# Strip script/style blocks, then all tags
|
|
clean = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
|
|
clean = re.sub(r"<[^>]+>", " ", clean)
|
|
clean = re.sub(r" ", " ", clean)
|
|
clean = re.sub(r"\s+", " ", clean).strip()
|
|
return clean[:4000], html
|
|
|
|
|
|
async def _classify(client: httpx.AsyncClient, text: str) -> str:
|
|
"""Classify document type via SDK LLM chat."""
|
|
try:
|
|
resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={
|
|
"messages": [
|
|
{"role": "system", "content": (
|
|
"/no_think\n"
|
|
"Klassifiziere das Dokument in GENAU EINE Kategorie: "
|
|
"privacy_policy, cookie_banner, terms_of_service, imprint, dpa, other. "
|
|
"Antworte NUR mit dem Kategorienamen, nichts anderes. Kein Denken, keine Erklaerung."
|
|
)},
|
|
{"role": "user", "content": text[:2000]},
|
|
],
|
|
})
|
|
data = resp.json()
|
|
# Qwen 3.5 may use think mode — content can be in message.content or response
|
|
raw = (
|
|
data.get("response", "")
|
|
or data.get("content", "")
|
|
or (data.get("message", {}) or {}).get("content", "")
|
|
or ""
|
|
).strip().lower()
|
|
# Strip Qwen think tags if present
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
logger.info("Classification raw response: %s", raw[:200])
|
|
for cat in ["privacy_policy", "cookie_banner", "terms_of_service", "imprint", "dpa"]:
|
|
if cat in raw:
|
|
return cat
|
|
# Also check German terms
|
|
if "datenschutz" in raw:
|
|
return "privacy_policy"
|
|
if "cookie" in raw:
|
|
return "cookie_banner"
|
|
if "impressum" in raw:
|
|
return "imprint"
|
|
return "other"
|
|
except Exception as e:
|
|
logger.warning("Classification failed: %s", e)
|
|
return "other"
|
|
|
|
|
|
async def _assess(client: httpx.AsyncClient, text: str, classification: str) -> dict:
|
|
"""Run UCCA assessment via SDK. Returns flattened result dict."""
|
|
try:
|
|
# UCCA expects boolean intake flags, not string categories
|
|
resp = await client.post(f"{SDK_URL}/sdk/v1/ucca/assess", headers=SDK_HEADERS, json={
|
|
"use_case_text": text[:3000],
|
|
"domain": classification,
|
|
"data_types": {
|
|
"personal_data": True,
|
|
"customer_data": True,
|
|
"location_data": "tracking" in text.lower() or "standort" in text.lower(),
|
|
"images": False,
|
|
"biometric_data": "biometrisch" in text.lower(),
|
|
"minor_data": "kinder" in text.lower() or "minderjährig" in text.lower(),
|
|
},
|
|
"purpose": {
|
|
"marketing": "werbung" in text.lower() or "marketing" in text.lower(),
|
|
"analytics": "analyse" in text.lower() or "analytics" in text.lower(),
|
|
"profiling": "profil" in text.lower() or "personalis" in text.lower(),
|
|
"automation": False,
|
|
"customer_support": False,
|
|
},
|
|
"automation": "partially_automated",
|
|
"outputs": {
|
|
"content_generation": False,
|
|
"recommendations_to_users": "empfehl" in text.lower(),
|
|
"data_export": "export" in text.lower() or "uebertrag" in text.lower(),
|
|
},
|
|
})
|
|
data = resp.json()
|
|
# Flatten: UCCA wraps result under "assessment" and "result"
|
|
assessment = data.get("assessment", data.get("result", data))
|
|
result = data.get("result", {})
|
|
return {
|
|
"risk_level": assessment.get("risk_level", result.get("risk_level", "unknown")),
|
|
"risk_score": assessment.get("risk_score", result.get("risk_score", 0)),
|
|
"escalation_level": _risk_to_escalation(assessment.get("risk_level", "")),
|
|
"triggered_rules": assessment.get("triggered_rules", result.get("triggered_rules", [])),
|
|
"required_controls": assessment.get("required_controls", result.get("required_controls", [])),
|
|
"summary": result.get("summary", ""),
|
|
"recommendation": result.get("recommendation", ""),
|
|
"dsfa_recommended": assessment.get("dsfa_recommended", False),
|
|
}
|
|
except Exception as e:
|
|
logger.warning("Assessment failed: %s", e)
|
|
return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"}
|
|
|
|
|
|
async def _check_website_compliance(
|
|
client: httpx.AsyncClient, url: str, html: str,
|
|
) -> tuple[list[str], list[FollowUpQuestion]]:
|
|
"""Scan public website for consumer protection compliance (§312k BGB etc.)."""
|
|
findings: list[str] = []
|
|
follow_ups: list[FollowUpQuestion] = []
|
|
html_lower = html.lower()
|
|
base_domain = re.sub(r"https?://([^/]+).*", r"\1", url)
|
|
|
|
# --- §312k BGB: Kündigungsbutton ---
|
|
cancel_patterns = [
|
|
r'href="[^"]*(?:kuendig|kündig|cancel|vertrag.?beenden|abo.?beenden|mitgliedschaft.?beenden)[^"]*"',
|
|
r'(?:kündigen|kuendigen|vertrag beenden|abo beenden|mitgliedschaft kündigen)',
|
|
]
|
|
has_cancel_link = any(re.search(p, html_lower) for p in cancel_patterns)
|
|
|
|
# Also check common cancel URLs
|
|
cancel_urls_to_probe = [
|
|
f"https://{base_domain}/kuendigen",
|
|
f"https://{base_domain}/cancel",
|
|
f"https://{base_domain}/vertrag-kuendigen",
|
|
f"https://{base_domain}/abo-kuendigen",
|
|
f"https://{base_domain}/account/cancel",
|
|
]
|
|
if not has_cancel_link:
|
|
for probe_url in cancel_urls_to_probe:
|
|
try:
|
|
probe = await client.head(probe_url, follow_redirects=True, timeout=5.0)
|
|
if probe.status_code < 400:
|
|
has_cancel_link = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not has_cancel_link:
|
|
findings.append(
|
|
"[§312k BGB] Kein oeffentlich sichtbarer Kuendigungsbutton gefunden. "
|
|
"Seit 01.07.2022 muessen online geschlossene Vertraege mit max. 2 Klicks kuendbar sein."
|
|
)
|
|
follow_ups.append(FollowUpQuestion(
|
|
id="cancel_button_312k",
|
|
question="Koennen Sie nach Login im Kundenbereich innerhalb von 2 Klicks Ihren Vertrag kuendigen?",
|
|
legal_basis="§ 312k BGB (Kuendigungsbutton), Omnibus-Richtlinie (EU) 2019/2161",
|
|
severity="high",
|
|
finding_if_no=(
|
|
"[§312k BGB] VERSTOSS: Kein funktionaler Kuendigungsbutton vorhanden. "
|
|
"Der Anbieter ist verpflichtet, einen leicht auffindbaren Kuendigungsbutton "
|
|
"bereitzustellen (max. 2 Klicks). Ein Zwang zur telefonischen Kuendigung "
|
|
"oder Kuendigung per Brief ist rechtswidrig."
|
|
),
|
|
))
|
|
|
|
# --- Impressumspflicht (§5 TMG / §18 MStV) ---
|
|
imprint_patterns = [
|
|
r'href="[^"]*(?:impressum|imprint|legal.?notice|about.?us/legal)[^"]*"',
|
|
r'>impressum<',
|
|
]
|
|
has_imprint = any(re.search(p, html_lower) for p in imprint_patterns)
|
|
if not has_imprint:
|
|
findings.append(
|
|
"[§5 TMG] Kein Impressum-Link auf der Seite gefunden. "
|
|
"Geschaeftsmaessige Online-Dienste muessen ein leicht erreichbares Impressum bereitstellen."
|
|
)
|
|
|
|
# --- Datenschutzerklaerung verlinkt? ---
|
|
privacy_patterns = [
|
|
r'href="[^"]*(?:datenschutz|privacy|dsgvo)[^"]*"',
|
|
r'>datenschutz<',
|
|
]
|
|
has_privacy = any(re.search(p, html_lower) for p in privacy_patterns)
|
|
if not has_privacy:
|
|
findings.append(
|
|
"[Art. 13 DSGVO] Kein Link zur Datenschutzerklaerung gefunden. "
|
|
"Nutzer muessen ueber die Verarbeitung personenbezogener Daten informiert werden."
|
|
)
|
|
|
|
# --- Cookie-Consent-Banner ---
|
|
cookie_patterns = [
|
|
r'(?:cookie.?consent|cookie.?banner|consent.?manager|didomi|cookiebot|onetrust|usercentrics)',
|
|
r'(?:gdpr|dsgvo).?(?:consent|einwilligung)',
|
|
]
|
|
has_cookie_consent = any(re.search(p, html_lower) for p in cookie_patterns)
|
|
if not has_cookie_consent:
|
|
follow_ups.append(FollowUpQuestion(
|
|
id="cookie_consent",
|
|
question="Wird beim ersten Besuch der Website ein Cookie-Consent-Banner angezeigt?",
|
|
legal_basis="§ 25 TDDDG (ehem. TTDSG), Art. 5(3) ePrivacy-Richtlinie",
|
|
severity="medium",
|
|
finding_if_no=(
|
|
"[§25 TDDDG] Kein Cookie-Consent-Banner erkannt. "
|
|
"Vor dem Setzen nicht-essentieller Cookies ist eine Einwilligung erforderlich."
|
|
),
|
|
))
|
|
|
|
return findings, follow_ups
|
|
|
|
|
|
def _to_string_list(items: list) -> list[str]:
|
|
"""Convert list of dicts or strings to list of strings."""
|
|
result = []
|
|
for item in (items or []):
|
|
if isinstance(item, dict):
|
|
# UCCA returns {code, category, description} or {id, name, description}
|
|
desc = item.get("description", item.get("name", item.get("code", str(item))))
|
|
code = item.get("code", item.get("id", ""))
|
|
result.append(f"[{code}] {desc}" if code else str(desc))
|
|
else:
|
|
result.append(str(item))
|
|
return result
|
|
|
|
|
|
def _risk_to_escalation(risk_level: str) -> str:
|
|
"""Map UCCA risk level to escalation level."""
|
|
mapping = {
|
|
"MINIMAL": "E0",
|
|
"LIMITED": "E1",
|
|
"HIGH": "E2",
|
|
"UNACCEPTABLE": "E3",
|
|
}
|
|
return mapping.get(risk_level.upper() if risk_level else "", "E0")
|
|
|
|
|
|
def _build_summary(
|
|
url: str, classification: str, assessment: dict, role: str,
|
|
findings_str: list[str], controls_str: list[str],
|
|
mode: str = "post_launch",
|
|
) -> str:
|
|
"""Build a German manager summary, adapted to pre/post-launch context."""
|
|
risk = assessment.get("risk_level", "unbekannt")
|
|
score = assessment.get("risk_score", 0)
|
|
recommendation = assessment.get("recommendation", "")
|
|
dsfa = assessment.get("dsfa_recommended", False)
|
|
is_live = mode == "post_launch"
|
|
|
|
findings_text = "\n".join(f"- {f}" for f in findings_str[:5]) if findings_str else "Keine"
|
|
controls_text = "\n".join(f"- {c}" for c in controls_str[:5]) if controls_str else "Keine"
|
|
|
|
mode_header = (
|
|
"PRUEFUNG LIVE-WEBSITE — Das Dokument ist bereits oeffentlich zugaenglich."
|
|
if is_live else
|
|
"INTERNE PRUEFUNG — Das Dokument ist noch nicht veroeffentlicht."
|
|
)
|
|
|
|
parts = [
|
|
mode_header,
|
|
"",
|
|
f"Dokumenttyp: {classification}",
|
|
f"Quelle: {url}",
|
|
f"Risikobewertung: {risk} ({score}/100)",
|
|
f"Zustaendig: {role}",
|
|
f"DSFA empfohlen: {'Ja' if dsfa else 'Nein'}",
|
|
"",
|
|
f"Findings:\n{findings_text}",
|
|
"",
|
|
f"Erforderliche Massnahmen:\n{controls_text}",
|
|
]
|
|
|
|
if is_live and findings_str:
|
|
parts.extend([
|
|
"",
|
|
"ACHTUNG: Diese Maengel sind bereits oeffentlich sichtbar. "
|
|
"Sofortige Nachbesserung empfohlen um Abmahnrisiken zu minimieren.",
|
|
])
|
|
elif not is_live and controls_str:
|
|
parts.extend([
|
|
"",
|
|
"Empfehlung: Implementieren Sie die erforderlichen Kontrollen vor der Veroeffentlichung.",
|
|
])
|
|
|
|
if recommendation:
|
|
parts.extend(["", f"Weitere Empfehlung: {recommendation}"])
|
|
return "\n".join(parts)
|