Files
breakpilot-compliance/backend-compliance/compliance/services/dsr_export_service.py
T
Benjamin Admin 02468c94c0 feat: DSR User Data Export — Art. 15 PDF + Art. 20 JSON/CSV
- DSRExportService: aggregates all CMP data about a user from
  Banner Consents, Einwilligungen, Audit Trail, DSR History
- GET /dsr/{id}/export-user-data?format=json|csv|pdf endpoint
- PDF: A4 reportlab with 4 sections (Consents, Einwilligungen,
  Audit-Trail, DSR-Anfragen) + cover page
- CSV: BOM-encoded for Excel with flattened data rows
- JSON: structured export with all data categories
- ActionButtons.tsx: PDF/JSON/CSV export buttons now functional

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 22:42:03 +02:00

262 lines
12 KiB
Python

"""
DSR User Data Export Service — aggregates all CMP data about a user.
Supports Art. 15 (access right, PDF) and Art. 20 (data portability, JSON/CSV).
Collects from: Banner Consents, Einwilligungen, Consent Audit Trail, DSR History.
"""
import csv
import io
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import mm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.services.banner_dsr_service import BannerDSRService
logger = logging.getLogger(__name__)
PURPLE = colors.HexColor("#7c3aed")
LIGHT_PURPLE = colors.HexColor("#f5f3ff")
GRAY = colors.HexColor("#6b7280")
class DSRExportService:
"""Aggregates and exports all user data stored in the CMP."""
def __init__(self, db: Session) -> None:
self.db = db
def aggregate_user_data(self, tenant_id: str, email: str) -> dict[str, Any]:
"""Collect ALL data about a user from all CMP sources."""
now = datetime.now(timezone.utc)
tid = uuid.UUID(tenant_id) if len(tenant_id) > 20 else tenant_id
# 1. Banner consents + audit trail
banner_data: dict[str, Any] = {"banner_consents": [], "audit_trail": []}
try:
banner_svc = BannerDSRService(self.db)
banner_data = banner_svc.export_for_dsr(tenant_id, email)
except Exception as e:
logger.warning("Banner DSR export failed: %s", e)
# 2. Einwilligungen (user-based consents)
einwilligungen: list[dict] = []
try:
q = text("""
SELECT c.id, c.data_point_id, c.granted, c.granted_at, c.revoked_at,
c.consent_version, c.source, c.ip_address, c.user_agent, c.created_at
FROM compliance_einwilligungen_consents c
WHERE c.tenant_id = :tid AND c.user_id = :email
ORDER BY c.created_at DESC
""")
rows = self.db.execute(q, {"tid": tid, "email": email}).fetchall()
for r in rows:
entry = dict(r._mapping)
for k, v in entry.items():
if isinstance(v, datetime):
entry[k] = v.isoformat()
elif isinstance(v, uuid.UUID):
entry[k] = str(v)
# Get history
hist_q = text("""
SELECT action, consent_version, ip_address, user_agent, source, created_at
FROM compliance_einwilligungen_consent_history
WHERE consent_id = :cid ORDER BY created_at
""")
hist = self.db.execute(hist_q, {"cid": entry["id"]}).fetchall()
entry["history"] = [
{k: (v.isoformat() if isinstance(v, datetime) else str(v) if isinstance(v, uuid.UUID) else v)
for k, v in dict(h._mapping).items()}
for h in hist
]
einwilligungen.append(entry)
except Exception as e:
logger.warning("Einwilligungen export failed: %s", e)
# 3. DSR requests by this user
dsr_requests: list[dict] = []
try:
q = text("""
SELECT id, request_number, request_type, status, received_at, deadline_at, completed_at
FROM compliance_dsr_requests
WHERE tenant_id = :tid AND requester_email = :email
ORDER BY received_at DESC
""")
rows = self.db.execute(q, {"tid": tid, "email": email}).fetchall()
for r in rows:
entry = dict(r._mapping)
for k, v in entry.items():
if isinstance(v, datetime):
entry[k] = v.isoformat()
elif isinstance(v, uuid.UUID):
entry[k] = str(v)
dsr_requests.append(entry)
except Exception as e:
logger.warning("DSR requests export failed: %s", e)
return {
"export_date": now.isoformat(),
"data_subject": {"email": email},
"banner_consents": banner_data.get("banner_consents", []),
"consent_audit_trail": banner_data.get("audit_trail", []),
"einwilligungen": einwilligungen,
"dsr_requests": dsr_requests,
"metadata": {
"tenant_id": tenant_id,
"data_categories": ["Banner-Consents", "Einwilligungen", "Audit-Trail", "DSR-Anfragen"],
"legal_basis": "Art. 15 / Art. 20 DSGVO",
},
}
def export_json(self, tenant_id: str, email: str) -> tuple[bytes, str]:
data = self.aggregate_user_data(tenant_id, email)
data["metadata"]["export_format"] = "json"
content = json.dumps(data, indent=2, ensure_ascii=False, default=str).encode("utf-8")
return content, f"dsr-export-{email.split('@')[0]}.json"
def export_csv(self, tenant_id: str, email: str) -> tuple[bytes, str]:
data = self.aggregate_user_data(tenant_id, email)
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["Kategorie", "Schluessel", "Wert", "Zeitpunkt", "Quelle"])
# Banner consents
for c in data.get("banner_consents", []):
writer.writerow(["Banner-Consent", "site_id", c.get("site_id", ""), c.get("created_at", ""), "CMP"])
writer.writerow(["Banner-Consent", "categories", ", ".join(c.get("categories", [])), c.get("updated_at", ""), "CMP"])
writer.writerow(["Banner-Consent", "ip_hash", c.get("ip_hash", ""), c.get("created_at", ""), "CMP"])
# Audit trail
for a in data.get("consent_audit_trail", []):
writer.writerow(["Audit-Trail", a.get("action", ""), ", ".join(a.get("categories", [])), a.get("created_at", ""), "CMP"])
# Einwilligungen
for e in data.get("einwilligungen", []):
status = "Erteilt" if e.get("granted") else "Widerrufen"
writer.writerow(["Einwilligung", e.get("data_point_id", ""), status, e.get("granted_at", ""), e.get("source", "")])
# DSR requests
for d in data.get("dsr_requests", []):
writer.writerow(["DSR-Anfrage", d.get("request_type", ""), d.get("status", ""), d.get("received_at", ""), ""])
content = buf.getvalue().encode("utf-8-sig") # BOM for Excel
return content, f"dsr-export-{email.split('@')[0]}.csv"
def export_pdf(self, tenant_id: str, email: str) -> tuple[bytes, str]:
data = self.aggregate_user_data(tenant_id, email)
buf = io.BytesIO()
doc = SimpleDocTemplate(buf, pagesize=A4, leftMargin=20 * mm, rightMargin=20 * mm, topMargin=25 * mm, bottomMargin=20 * mm)
ss = getSampleStyleSheet()
ss.add(ParagraphStyle("Title2", parent=ss["Title"], fontSize=20, textColor=PURPLE, spaceAfter=6))
ss.add(ParagraphStyle("Section", parent=ss["Heading2"], fontSize=13, textColor=PURPLE, spaceBefore=10))
ss.add(ParagraphStyle("Body2", parent=ss["Normal"], fontSize=9, leading=13))
ss.add(ParagraphStyle("Small", parent=ss["Normal"], fontSize=8, textColor=GRAY))
story: list = []
# Cover
story.append(Paragraph("Datenauskunft gemaess Art. 15 DSGVO", ss["Title2"]))
story.append(Paragraph(f"Betroffene Person: {email}", ss["Body2"]))
story.append(Paragraph(f"Erstellt am: {data['export_date'][:10]}", ss["Small"]))
story.append(Spacer(1, 8 * mm))
tbl_style = TableStyle([
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
("TEXTCOLOR", (0, 0), (-1, 0), PURPLE),
("FONTSIZE", (0, 0), (-1, -1), 8),
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
])
# Section 1: Banner Consents
consents = data.get("banner_consents", [])
story.append(Paragraph(f"1. Banner-Consents ({len(consents)})", ss["Section"]))
if consents:
rows = [["Site", "Kategorien", "IP-Hash", "Erstellt", "Aktualisiert"]]
for c in consents:
rows.append([
str(c.get("site_id", "")),
", ".join(c.get("categories", [])),
str(c.get("ip_hash", ""))[:12] + "...",
str(c.get("created_at", ""))[:10],
str(c.get("updated_at", ""))[:10],
])
t = Table(rows, colWidths=[30 * mm, 40 * mm, 30 * mm, 25 * mm, 25 * mm])
t.setStyle(tbl_style)
story.append(t)
else:
story.append(Paragraph("Keine Banner-Consents gespeichert.", ss["Body2"]))
# Section 2: Einwilligungen
einw = data.get("einwilligungen", [])
story.append(Paragraph(f"2. Einwilligungen ({len(einw)})", ss["Section"]))
if einw:
rows = [["Datenpunkt", "Status", "Erteilt am", "Widerrufen am", "IP-Adresse"]]
for e in einw:
rows.append([
str(e.get("data_point_id", "")),
"Erteilt" if e.get("granted") else "Widerrufen",
str(e.get("granted_at", ""))[:10],
str(e.get("revoked_at", ""))[:10] if e.get("revoked_at") else "-",
str(e.get("ip_address", ""))[:15] if e.get("ip_address") else "-",
])
t = Table(rows, colWidths=[35 * mm, 25 * mm, 25 * mm, 25 * mm, 35 * mm])
t.setStyle(tbl_style)
story.append(t)
else:
story.append(Paragraph("Keine Einwilligungen gespeichert.", ss["Body2"]))
# Section 3: Audit Trail
trail = data.get("consent_audit_trail", [])
story.append(Paragraph(f"3. Consent-Audit-Trail ({len(trail)})", ss["Section"]))
if trail:
rows = [["Aktion", "Kategorien", "Datum"]]
for a in trail[:50]: # Limit to 50 for PDF
rows.append([
str(a.get("action", "")),
", ".join(a.get("categories", [])),
str(a.get("created_at", ""))[:19],
])
t = Table(rows, colWidths=[40 * mm, 60 * mm, 45 * mm])
t.setStyle(tbl_style)
story.append(t)
if len(trail) > 50:
story.append(Paragraph(f"... und {len(trail) - 50} weitere Eintraege (im JSON-Export enthalten)", ss["Small"]))
else:
story.append(Paragraph("Kein Audit-Trail vorhanden.", ss["Body2"]))
# Section 4: DSR Requests
dsrs = data.get("dsr_requests", [])
story.append(Paragraph(f"4. Bisherige DSR-Anfragen ({len(dsrs)})", ss["Section"]))
if dsrs:
rows = [["Typ", "Status", "Eingegangen", "Abgeschlossen"]]
for d in dsrs:
rows.append([
str(d.get("request_type", "")),
str(d.get("status", "")),
str(d.get("received_at", ""))[:10],
str(d.get("completed_at", ""))[:10] if d.get("completed_at") else "-",
])
t = Table(rows, colWidths=[35 * mm, 30 * mm, 35 * mm, 35 * mm])
t.setStyle(tbl_style)
story.append(t)
# Footer
story.append(Spacer(1, 15 * mm))
story.append(Paragraph("Erstellt mit BreakPilot Compliance SDK | Art. 15 DSGVO Datenauskunft", ss["Small"]))
doc.build(story)
return buf.getvalue(), f"dsr-export-{email.split('@')[0]}.pdf"