Files
breakpilot-compliance/backend-compliance/compliance/services/dsr_export_service.py
T
Benjamin Admin c55d0ab12a fix: DSR export type-cast bug + session rollback on partial failures
- tenant_id kept as string (PostgreSQL handles UUID cast)
- Einwilligungen query uses CAST(:tid AS VARCHAR) for compatibility
- Each data source query wrapped with rollback on failure to prevent
  cascading "transaction aborted" errors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 23:15:25 +02:00

274 lines
12 KiB
Python

"""
DSR User Data Export Service — aggregates all CMP data about a user.
Supports Art. 15 (access right, PDF) and Art. 20 (data portability, JSON/CSV).
Collects from: Banner Consents, Einwilligungen, Consent Audit Trail, DSR History.
"""
import csv
import io
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import mm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from sqlalchemy import text
from sqlalchemy.orm import Session
from compliance.services.banner_dsr_service import BannerDSRService
logger = logging.getLogger(__name__)
PURPLE = colors.HexColor("#7c3aed")
LIGHT_PURPLE = colors.HexColor("#f5f3ff")
GRAY = colors.HexColor("#6b7280")
class DSRExportService:
"""Aggregates and exports all user data stored in the CMP."""
def __init__(self, db: Session) -> None:
self.db = db
def aggregate_user_data(self, tenant_id: str, email: str) -> dict[str, Any]:
"""Collect ALL data about a user from all CMP sources."""
now = datetime.now(timezone.utc)
tid = tenant_id # Keep as string — let PostgreSQL cast
# 1. Banner consents + audit trail
banner_data: dict[str, Any] = {"banner_consents": [], "audit_trail": []}
try:
banner_svc = BannerDSRService(self.db)
banner_data = banner_svc.export_for_dsr(tenant_id, email)
except Exception as e:
logger.warning("Banner DSR export failed: %s", e)
try:
self.db.rollback()
except Exception:
pass
# 2. Einwilligungen (user-based consents)
einwilligungen: list[dict] = []
try:
q = text("""
SELECT c.id, c.data_point_id, c.granted, c.granted_at, c.revoked_at,
c.consent_version, c.source, c.ip_address, c.user_agent, c.created_at
FROM compliance_einwilligungen_consents c
WHERE c.tenant_id = CAST(:tid AS VARCHAR) AND c.user_id = :email
ORDER BY c.created_at DESC
""")
rows = self.db.execute(q, {"tid": tid, "email": email}).fetchall()
for r in rows:
entry = dict(r._mapping)
for k, v in entry.items():
if isinstance(v, datetime):
entry[k] = v.isoformat()
elif isinstance(v, uuid.UUID):
entry[k] = str(v)
# Get history
hist_q = text("""
SELECT action, consent_version, ip_address, user_agent, source, created_at
FROM compliance_einwilligungen_consent_history
WHERE consent_id = :cid ORDER BY created_at
""")
hist = self.db.execute(hist_q, {"cid": entry["id"]}).fetchall()
entry["history"] = [
{k: (v.isoformat() if isinstance(v, datetime) else str(v) if isinstance(v, uuid.UUID) else v)
for k, v in dict(h._mapping).items()}
for h in hist
]
einwilligungen.append(entry)
except Exception as e:
logger.warning("Einwilligungen export failed: %s", e)
try:
self.db.rollback()
except Exception:
pass
# 3. DSR requests by this user
dsr_requests: list[dict] = []
try:
q = text("""
SELECT id, request_number, request_type, status, received_at, deadline_at, completed_at
FROM compliance_dsr_requests
WHERE tenant_id = :tid AND requester_email = :email
ORDER BY received_at DESC
""")
rows = self.db.execute(q, {"tid": tid, "email": email}).fetchall()
for r in rows:
entry = dict(r._mapping)
for k, v in entry.items():
if isinstance(v, datetime):
entry[k] = v.isoformat()
elif isinstance(v, uuid.UUID):
entry[k] = str(v)
dsr_requests.append(entry)
except Exception as e:
logger.warning("DSR requests export failed: %s", e)
try:
self.db.rollback()
except Exception:
pass
return {
"export_date": now.isoformat(),
"data_subject": {"email": email},
"banner_consents": banner_data.get("banner_consents", []),
"consent_audit_trail": banner_data.get("audit_trail", []),
"einwilligungen": einwilligungen,
"dsr_requests": dsr_requests,
"metadata": {
"tenant_id": tenant_id,
"data_categories": ["Banner-Consents", "Einwilligungen", "Audit-Trail", "DSR-Anfragen"],
"legal_basis": "Art. 15 / Art. 20 DSGVO",
},
}
def export_json(self, tenant_id: str, email: str) -> tuple[bytes, str]:
data = self.aggregate_user_data(tenant_id, email)
data["metadata"]["export_format"] = "json"
content = json.dumps(data, indent=2, ensure_ascii=False, default=str).encode("utf-8")
return content, f"dsr-export-{email.split('@')[0]}.json"
def export_csv(self, tenant_id: str, email: str) -> tuple[bytes, str]:
data = self.aggregate_user_data(tenant_id, email)
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["Kategorie", "Schluessel", "Wert", "Zeitpunkt", "Quelle"])
# Banner consents
for c in data.get("banner_consents", []):
writer.writerow(["Banner-Consent", "site_id", c.get("site_id", ""), c.get("created_at", ""), "CMP"])
writer.writerow(["Banner-Consent", "categories", ", ".join(c.get("categories", [])), c.get("updated_at", ""), "CMP"])
writer.writerow(["Banner-Consent", "ip_hash", c.get("ip_hash", ""), c.get("created_at", ""), "CMP"])
# Audit trail
for a in data.get("consent_audit_trail", []):
writer.writerow(["Audit-Trail", a.get("action", ""), ", ".join(a.get("categories", [])), a.get("created_at", ""), "CMP"])
# Einwilligungen
for e in data.get("einwilligungen", []):
status = "Erteilt" if e.get("granted") else "Widerrufen"
writer.writerow(["Einwilligung", e.get("data_point_id", ""), status, e.get("granted_at", ""), e.get("source", "")])
# DSR requests
for d in data.get("dsr_requests", []):
writer.writerow(["DSR-Anfrage", d.get("request_type", ""), d.get("status", ""), d.get("received_at", ""), ""])
content = buf.getvalue().encode("utf-8-sig") # BOM for Excel
return content, f"dsr-export-{email.split('@')[0]}.csv"
def export_pdf(self, tenant_id: str, email: str) -> tuple[bytes, str]:
data = self.aggregate_user_data(tenant_id, email)
buf = io.BytesIO()
doc = SimpleDocTemplate(buf, pagesize=A4, leftMargin=20 * mm, rightMargin=20 * mm, topMargin=25 * mm, bottomMargin=20 * mm)
ss = getSampleStyleSheet()
ss.add(ParagraphStyle("Title2", parent=ss["Title"], fontSize=20, textColor=PURPLE, spaceAfter=6))
ss.add(ParagraphStyle("Section", parent=ss["Heading2"], fontSize=13, textColor=PURPLE, spaceBefore=10))
ss.add(ParagraphStyle("Body2", parent=ss["Normal"], fontSize=9, leading=13))
ss.add(ParagraphStyle("Small", parent=ss["Normal"], fontSize=8, textColor=GRAY))
story: list = []
# Cover
story.append(Paragraph("Datenauskunft gemaess Art. 15 DSGVO", ss["Title2"]))
story.append(Paragraph(f"Betroffene Person: {email}", ss["Body2"]))
story.append(Paragraph(f"Erstellt am: {data['export_date'][:10]}", ss["Small"]))
story.append(Spacer(1, 8 * mm))
tbl_style = TableStyle([
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
("TEXTCOLOR", (0, 0), (-1, 0), PURPLE),
("FONTSIZE", (0, 0), (-1, -1), 8),
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
])
# Section 1: Banner Consents
consents = data.get("banner_consents", [])
story.append(Paragraph(f"1. Banner-Consents ({len(consents)})", ss["Section"]))
if consents:
rows = [["Site", "Kategorien", "IP-Hash", "Erstellt", "Aktualisiert"]]
for c in consents:
rows.append([
str(c.get("site_id", "")),
", ".join(c.get("categories", [])),
str(c.get("ip_hash", ""))[:12] + "...",
str(c.get("created_at", ""))[:10],
str(c.get("updated_at", ""))[:10],
])
t = Table(rows, colWidths=[30 * mm, 40 * mm, 30 * mm, 25 * mm, 25 * mm])
t.setStyle(tbl_style)
story.append(t)
else:
story.append(Paragraph("Keine Banner-Consents gespeichert.", ss["Body2"]))
# Section 2: Einwilligungen
einw = data.get("einwilligungen", [])
story.append(Paragraph(f"2. Einwilligungen ({len(einw)})", ss["Section"]))
if einw:
rows = [["Datenpunkt", "Status", "Erteilt am", "Widerrufen am", "IP-Adresse"]]
for e in einw:
rows.append([
str(e.get("data_point_id", "")),
"Erteilt" if e.get("granted") else "Widerrufen",
str(e.get("granted_at", ""))[:10],
str(e.get("revoked_at", ""))[:10] if e.get("revoked_at") else "-",
str(e.get("ip_address", ""))[:15] if e.get("ip_address") else "-",
])
t = Table(rows, colWidths=[35 * mm, 25 * mm, 25 * mm, 25 * mm, 35 * mm])
t.setStyle(tbl_style)
story.append(t)
else:
story.append(Paragraph("Keine Einwilligungen gespeichert.", ss["Body2"]))
# Section 3: Audit Trail
trail = data.get("consent_audit_trail", [])
story.append(Paragraph(f"3. Consent-Audit-Trail ({len(trail)})", ss["Section"]))
if trail:
rows = [["Aktion", "Kategorien", "Datum"]]
for a in trail[:50]: # Limit to 50 for PDF
rows.append([
str(a.get("action", "")),
", ".join(a.get("categories", [])),
str(a.get("created_at", ""))[:19],
])
t = Table(rows, colWidths=[40 * mm, 60 * mm, 45 * mm])
t.setStyle(tbl_style)
story.append(t)
if len(trail) > 50:
story.append(Paragraph(f"... und {len(trail) - 50} weitere Eintraege (im JSON-Export enthalten)", ss["Small"]))
else:
story.append(Paragraph("Kein Audit-Trail vorhanden.", ss["Body2"]))
# Section 4: DSR Requests
dsrs = data.get("dsr_requests", [])
story.append(Paragraph(f"4. Bisherige DSR-Anfragen ({len(dsrs)})", ss["Section"]))
if dsrs:
rows = [["Typ", "Status", "Eingegangen", "Abgeschlossen"]]
for d in dsrs:
rows.append([
str(d.get("request_type", "")),
str(d.get("status", "")),
str(d.get("received_at", ""))[:10],
str(d.get("completed_at", ""))[:10] if d.get("completed_at") else "-",
])
t = Table(rows, colWidths=[35 * mm, 30 * mm, 35 * mm, 35 * mm])
t.setStyle(tbl_style)
story.append(t)
# Footer
story.append(Spacer(1, 15 * mm))
story.append(Paragraph("Erstellt mit BreakPilot Compliance SDK | Art. 15 DSGVO Datenauskunft", ss["Small"]))
doc.build(story)
return buf.getvalue(), f"dsr-export-{email.split('@')[0]}.pdf"