Files
breakpilot-compliance/backend-compliance/compliance/api/agent_analyze_routes.py
T
Benjamin Admin 4298ae17ab feat: Phase 0+1 — LLM intake extraction + control relevance filter
Phase 0: Qwen extracts 14 structured intake flags (personal_data,
marketing, profiling, ai_usage, etc.) instead of keyword matching.
Fallback to keywords if LLM unavailable. Flags feed into UCCA for
accurate scoring.

Phase 1: Control relevance filter removes false positives.
C_TRANSPARENCY only recommended if AI/ML keywords found in text.
7 control rules with keyword lists + intake flag fallback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-29 11:36:24 +02:00

421 lines
17 KiB
Python

"""
Agent Analyze Routes — combined endpoint that fetches a URL, classifies it,
assesses DSGVO compliance, and sends a notification email.
POST /api/compliance/agent/analyze
"""
import logging
import re
import os
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
from compliance.services.intake_extractor import extract_intake_flags, flags_to_ucca_intake
from compliance.services.relevance_filter import filter_controls
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
SDK_URL = os.environ.get("AI_SDK_URL", "http://bp-compliance-ai-sdk:8090")
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
USER_ID = "00000000-0000-0000-0000-000000000001"
ESCALATION_ROLES = {
"E0": "Kein Handlungsbedarf",
"E1": "Teamleitung Datenschutz",
"E2": "Datenschutzbeauftragter (DSB)",
"E3": "DSB + Rechtsabteilung",
}
SDK_HEADERS = {
"Content-Type": "application/json",
"X-Tenant-ID": TENANT_ID,
"X-User-ID": USER_ID,
}
class AnalyzeRequest(BaseModel):
url: str
recipient: str = "dsb@breakpilot.local"
mode: str = "post_launch" # "pre_launch" or "post_launch"
class FollowUpQuestion(BaseModel):
id: str
question: str
legal_basis: str
severity: str # "high", "medium", "low"
finding_if_no: str # Finding text if user answers "no"
class AnalyzeResponse(BaseModel):
url: str
classification: str
risk_level: str
risk_score: float
escalation_level: str
responsible_role: str
findings: list[str]
required_controls: list[str]
summary: str
email_status: str
analyzed_at: str
follow_up_questions: list[FollowUpQuestion] = []
@router.post("/analyze", response_model=AnalyzeResponse)
async def analyze_url(req: AnalyzeRequest):
"""Fetch URL, classify, assess compliance, and notify responsible role."""
async with httpx.AsyncClient(timeout=60.0) as client:
# Step 1: Fetch and clean
text, raw_html = await _fetch_and_clean(client, req.url)
# Step 2: Classify via SDK LLM
classification = await _classify(client, text)
# Step 3: Extract intake flags via LLM (better than keyword matching)
intake_flags = await extract_intake_flags(text)
# Step 4: Assess via UCCA with LLM-extracted flags
assessment = await _assess(client, text, classification, intake_flags)
# Step 5: Determine role
esc_level = assessment.get("escalation_level", "E0")
role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"])
# Step 6: Website compliance checks (§312k BGB etc.)
site_findings, follow_ups = await _check_website_compliance(client, req.url, raw_html)
# Step 7: Merge and filter findings/controls
findings = assessment.get("triggered_rules", [])
controls = assessment.get("required_controls", [])
findings_str = _to_string_list(findings) + site_findings
controls_str = filter_controls(_to_string_list(controls), text, intake_flags)
# Escalate if website checks found issues
if site_findings and esc_level == "E0":
esc_level = "E1"
role = ESCALATION_ROLES["E1"]
summary = _build_summary(req.url, classification, assessment, role, findings_str, controls_str, req.mode)
# Step 7: Send notification
mode_label = "INTERNE PRUEFUNG" if req.mode == "pre_launch" else "LIVE-WEBSITE"
email_result = send_email(
recipient=req.recipient,
subject=f"[{mode_label}] Compliance-Finding: {classification}{req.url[:60]}",
body_html=summary,
)
return AnalyzeResponse(
url=req.url,
classification=classification,
risk_level=assessment.get("risk_level", "unknown"),
risk_score=assessment.get("risk_score", 0),
escalation_level=esc_level,
responsible_role=role,
findings=findings_str,
required_controls=controls_str,
summary=summary,
email_status=email_result.get("status", "failed"),
analyzed_at=datetime.now(timezone.utc).isoformat(),
follow_up_questions=follow_ups,
)
async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> tuple[str, str]:
"""Fetch URL. Returns (clean_text, raw_html)."""
resp = await client.get(url, follow_redirects=True, headers={
"User-Agent": "BreakPilot-Compliance-Agent/1.0",
})
html = resp.text
# Strip script/style blocks, then all tags
clean = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
clean = re.sub(r"<[^>]+>", " ", clean)
clean = re.sub(r"&nbsp;", " ", clean)
clean = re.sub(r"\s+", " ", clean).strip()
return clean[:4000], html
async def _classify(client: httpx.AsyncClient, text: str) -> str:
"""Classify document type via SDK LLM chat."""
try:
resp = await client.post(f"{SDK_URL}/sdk/v1/llm/chat", headers=SDK_HEADERS, json={
"messages": [
{"role": "system", "content": (
"/no_think\n"
"Klassifiziere das Dokument in GENAU EINE Kategorie: "
"privacy_policy, cookie_banner, terms_of_service, imprint, dpa, other. "
"Antworte NUR mit dem Kategorienamen, nichts anderes. Kein Denken, keine Erklaerung."
)},
{"role": "user", "content": text[:2000]},
],
})
data = resp.json()
# Qwen 3.5 may use think mode — content can be in message.content or response
raw = (
data.get("response", "")
or data.get("content", "")
or (data.get("message", {}) or {}).get("content", "")
or ""
).strip().lower()
# Strip Qwen think tags if present
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
logger.info("Classification raw response: %s", raw[:200])
for cat in ["privacy_policy", "cookie_banner", "terms_of_service", "imprint", "dpa"]:
if cat in raw:
return cat
# Also check German terms
if "datenschutz" in raw:
return "privacy_policy"
if "cookie" in raw:
return "cookie_banner"
if "impressum" in raw:
return "imprint"
return "other"
except Exception as e:
logger.warning("Classification failed: %s", e)
return "other"
async def _assess(client: httpx.AsyncClient, text: str, classification: str, intake_flags: dict | None = None) -> dict:
"""Run UCCA assessment via SDK. Returns flattened result dict."""
try:
# Use LLM-extracted flags if available, otherwise minimal defaults
if intake_flags:
ucca_intake = flags_to_ucca_intake(intake_flags)
else:
ucca_intake = {
"data_types": {"personal_data": True},
"purpose": {},
"automation": "manual",
"outputs": {},
}
resp = await client.post(f"{SDK_URL}/sdk/v1/ucca/assess", headers=SDK_HEADERS, json={
"use_case_text": text[:3000],
"domain": classification,
**ucca_intake,
})
data = resp.json()
# Flatten: UCCA wraps result under "assessment" and "result"
assessment = data.get("assessment", data.get("result", data))
result = data.get("result", {})
return {
"risk_level": assessment.get("risk_level", result.get("risk_level", "unknown")),
"risk_score": assessment.get("risk_score", result.get("risk_score", 0)),
"escalation_level": _risk_to_escalation(assessment.get("risk_level", "")),
"triggered_rules": assessment.get("triggered_rules", result.get("triggered_rules", [])),
"required_controls": assessment.get("required_controls", result.get("required_controls", [])),
"summary": result.get("summary", ""),
"recommendation": result.get("recommendation", ""),
"dsfa_recommended": assessment.get("dsfa_recommended", False),
}
except Exception as e:
logger.warning("Assessment failed: %s", e)
return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"}
async def _check_website_compliance(
client: httpx.AsyncClient, url: str, html: str,
) -> tuple[list[str], list[FollowUpQuestion]]:
"""Scan public website for consumer protection compliance (§312k BGB etc.)."""
findings: list[str] = []
follow_ups: list[FollowUpQuestion] = []
html_lower = html.lower()
base_domain = re.sub(r"https?://([^/]+).*", r"\1", url)
# --- §312k BGB: Kündigungsbutton ---
cancel_patterns = [
r'href="[^"]*(?:kuendig|kündig|cancel|vertrag.?beenden|abo.?beenden|mitgliedschaft.?beenden)[^"]*"',
r'(?:kündigen|kuendigen|vertrag beenden|abo beenden|mitgliedschaft kündigen)',
]
has_cancel_link = any(re.search(p, html_lower) for p in cancel_patterns)
# Also check common cancel URLs
cancel_urls_to_probe = [
f"https://{base_domain}/kuendigen",
f"https://{base_domain}/cancel",
f"https://{base_domain}/vertrag-kuendigen",
f"https://{base_domain}/abo-kuendigen",
f"https://{base_domain}/account/cancel",
]
if not has_cancel_link:
for probe_url in cancel_urls_to_probe:
try:
probe = await client.head(probe_url, follow_redirects=True, timeout=5.0)
if probe.status_code < 400:
has_cancel_link = True
break
except Exception:
continue
if not has_cancel_link:
findings.append(
"[§312k BGB] Kein oeffentlich sichtbarer Kuendigungsbutton gefunden. "
"Seit 01.07.2022 muessen online geschlossene Vertraege mit max. 2 Klicks kuendbar sein."
)
follow_ups.append(FollowUpQuestion(
id="cancel_button_312k",
question="Koennen Sie nach Login im Kundenbereich innerhalb von 2 Klicks Ihren Vertrag kuendigen?",
legal_basis="§ 312k BGB (Kuendigungsbutton), Omnibus-Richtlinie (EU) 2019/2161",
severity="high",
finding_if_no=(
"[§312k BGB] VERSTOSS: Kein funktionaler Kuendigungsbutton vorhanden. "
"Der Anbieter ist verpflichtet, einen leicht auffindbaren Kuendigungsbutton "
"bereitzustellen (max. 2 Klicks). Ein Zwang zur telefonischen Kuendigung "
"oder Kuendigung per Brief ist rechtswidrig."
),
))
# --- Impressumspflicht (§5 TMG / §18 MStV) ---
imprint_patterns = [
r'href="[^"]*(?:impressum|imprint|legal.?notice|about.?us/legal)[^"]*"',
r'>impressum<',
]
has_imprint = any(re.search(p, html_lower) for p in imprint_patterns)
if not has_imprint:
findings.append(
"[§5 TMG] Kein Impressum-Link auf der Seite gefunden. "
"Geschaeftsmaessige Online-Dienste muessen ein leicht erreichbares Impressum bereitstellen."
)
# --- Datenschutzerklaerung verlinkt? ---
privacy_patterns = [
r'href="[^"]*(?:datenschutz|privacy|dsgvo)[^"]*"',
r'>datenschutz<',
]
has_privacy = any(re.search(p, html_lower) for p in privacy_patterns)
if not has_privacy:
findings.append(
"[Art. 13 DSGVO] Kein Link zur Datenschutzerklaerung gefunden. "
"Nutzer muessen ueber die Verarbeitung personenbezogener Daten informiert werden."
)
# --- Cookie-Consent-Banner ---
cookie_patterns = [
r'(?:cookie.?consent|cookie.?banner|consent.?manager|didomi|cookiebot|onetrust|usercentrics)',
r'(?:gdpr|dsgvo).?(?:consent|einwilligung)',
]
has_cookie_consent = any(re.search(p, html_lower) for p in cookie_patterns)
if not has_cookie_consent:
follow_ups.append(FollowUpQuestion(
id="cookie_consent",
question="Wird beim ersten Besuch der Website ein Cookie-Consent-Banner angezeigt?",
legal_basis="§ 25 TDDDG (ehem. TTDSG), Art. 5(3) ePrivacy-Richtlinie",
severity="medium",
finding_if_no=(
"[§25 TDDDG] Kein Cookie-Consent-Banner erkannt. "
"Vor dem Setzen nicht-essentieller Cookies ist eine Einwilligung erforderlich."
),
))
return findings, follow_ups
def _to_string_list(items: list) -> list[str]:
"""Convert list of dicts or strings to list of strings."""
result = []
for item in (items or []):
if isinstance(item, dict):
# UCCA returns {code, category, description} or {id, name, description}
desc = item.get("description", item.get("name", item.get("code", str(item))))
code = item.get("code", item.get("id", ""))
result.append(f"[{code}] {desc}" if code else str(desc))
else:
result.append(str(item))
return result
def _risk_to_escalation(risk_level: str) -> str:
"""Map UCCA risk level to escalation level."""
mapping = {
"MINIMAL": "E0",
"LIMITED": "E1",
"HIGH": "E2",
"UNACCEPTABLE": "E3",
}
return mapping.get(risk_level.upper() if risk_level else "", "E0")
DOC_TYPE_LABELS = {
"privacy_policy": "Datenschutzerklaerung",
"cookie_banner": "Cookie-Banner",
"terms_of_service": "AGB",
"imprint": "Impressum",
"dpa": "Auftragsverarbeitung (AVV)",
"other": "Sonstiges",
}
RISK_COLORS = {
"MINIMAL": ("#16a34a", "Niedrig"),
"LOW": ("#ca8a04", "Gering"),
"LIMITED": ("#ea580c", "Mittel"),
"HIGH": ("#dc2626", "Hoch"),
"UNACCEPTABLE": ("#991b1b", "Kritisch"),
}
def _build_summary(
url: str, classification: str, assessment: dict, role: str,
findings_str: list[str], controls_str: list[str],
mode: str = "post_launch",
) -> str:
"""Build HTML summary for email and frontend."""
risk = assessment.get("risk_level", "unbekannt")
score = assessment.get("risk_score", 0)
recommendation = assessment.get("recommendation", "")
dsfa = assessment.get("dsfa_recommended", False)
is_live = mode == "post_launch"
risk_color, risk_label = RISK_COLORS.get(risk, ("#6b7280", risk))
doc_label = DOC_TYPE_LABELS.get(classification, classification)
mode_banner = (
'<div style="background:#fef2f2;border-left:4px solid #dc2626;padding:12px 16px;margin-bottom:16px;">'
'<strong style="color:#991b1b;">LIVE-WEBSITE</strong> — Das Dokument ist bereits oeffentlich zugaenglich.</div>'
if is_live else
'<div style="background:#eff6ff;border-left:4px solid #3b82f6;padding:12px 16px;margin-bottom:16px;">'
'<strong style="color:#1e40af;">INTERNE PRUEFUNG</strong> — Dokument noch nicht veroeffentlicht.</div>'
)
findings_html = "".join(f'<li style="margin-bottom:4px;">{f}</li>' for f in findings_str[:8]) if findings_str else '<li style="color:#6b7280;">Keine</li>'
controls_html = "".join(f'<li style="margin-bottom:4px;">{c}</li>' for c in controls_str[:8]) if controls_str else '<li style="color:#6b7280;">Keine</li>'
warning = ""
if is_live and findings_str:
warning = (
'<div style="background:#fef2f2;border:1px solid #fecaca;border-radius:8px;padding:12px 16px;margin-top:16px;">'
'<strong style="color:#dc2626;">⚠ ACHTUNG:</strong> Diese Maengel sind bereits oeffentlich sichtbar. '
'Sofortige Nachbesserung empfohlen um Abmahnrisiken zu minimieren.</div>'
)
elif not is_live and controls_str:
warning = (
'<div style="background:#f0fdf4;border:1px solid #bbf7d0;border-radius:8px;padding:12px 16px;margin-top:16px;">'
'Empfehlung: Implementieren Sie die erforderlichen Kontrollen vor der Veroeffentlichung.</div>'
)
rec_html = f'<p style="color:#475569;margin-top:12px;"><em>{recommendation}</em></p>' if recommendation else ""
return f"""
{mode_banner}
<table style="width:100%;border-collapse:collapse;margin-bottom:16px;">
<tr><td style="padding:6px 0;color:#64748b;width:180px;">Dokumenttyp</td><td style="padding:6px 0;font-weight:600;">{doc_label}</td></tr>
<tr><td style="padding:6px 0;color:#64748b;">Quelle</td><td style="padding:6px 0;"><a href="{url}" style="color:#6366f1;">{url}</a></td></tr>
<tr><td style="padding:6px 0;color:#64748b;">Risikobewertung</td><td style="padding:6px 0;"><span style="background:{risk_color};color:white;padding:2px 8px;border-radius:4px;font-size:13px;">{risk_label} ({score}/100)</span></td></tr>
<tr><td style="padding:6px 0;color:#64748b;">Zustaendig</td><td style="padding:6px 0;font-weight:600;">{role}</td></tr>
<tr><td style="padding:6px 0;color:#64748b;">DSFA empfohlen</td><td style="padding:6px 0;">{'Ja' if dsfa else 'Nein'}</td></tr>
</table>
<h3 style="color:#1e293b;font-size:15px;margin:16px 0 8px;">Findings</h3>
<ul style="margin:0;padding-left:20px;color:#334155;">{findings_html}</ul>
<h3 style="color:#1e293b;font-size:15px;margin:16px 0 8px;">Erforderliche Massnahmen</h3>
<ul style="margin:0;padding-left:20px;color:#334155;">{controls_html}</ul>
{warning}
{rec_html}
"""