44acd68c96
Phase 1: Vendor sync from service registry (82+ services → banner vendors) Phase 2: Category-based retention (marketing=90d, statistics=790d, not hardcoded 365d) Phase 3: DSR ↔ Banner email linking (link-email, by-email, Art.17 erasure, Art.15/20 export) Phase 4: Consent sync (Banner → Einwilligungen bridge) Phase 6: Consent proof (SHA256 config hash + config_version in audit log, Art. 7(1) DSGVO) New files: - banner_dsr_service.py — email linking + DSR integration - vendor_banner_sync.py — service registry → vendor configs - migration 106 — linked_email, banner_config_hash, consent_version columns Tests: 20+ new backend tests + 2 Playwright E2E test suites (API + UI) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
267 lines
9.0 KiB
Python
267 lines
9.0 KiB
Python
# mypy: disable-error-code="arg-type,assignment"
|
|
"""
|
|
Banner DSR service — bridges device-based banner consents with
|
|
user-based DSR (Data Subject Request) processing.
|
|
|
|
Phase 3: Email linking allows correlating anonymous device fingerprints
|
|
with user emails (e.g. after newsletter signup or account creation).
|
|
This enables Art. 15 (access), Art. 17 (erasure), and Art. 20
|
|
(portability) requests to include/delete banner consent data.
|
|
|
|
Phase 4: Consent sync bridges banner consents (device-based) with
|
|
Einwilligungen (user-based) for unified consent management.
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from compliance.db.banner_models import (
|
|
BannerConsentAuditLogDB,
|
|
BannerConsentDB,
|
|
)
|
|
from compliance.db.einwilligungen_models import (
|
|
EinwilligungenConsentDB,
|
|
EinwilligungenConsentHistoryDB,
|
|
)
|
|
from compliance.domain import NotFoundError, ValidationError
|
|
from compliance.services._banner_serializers import consent_to_dict
|
|
|
|
|
|
class BannerDSRService:
|
|
"""Email linking + DSR integration for banner consents."""
|
|
|
|
def __init__(self, db: Session) -> None:
|
|
self.db = db
|
|
|
|
# ------------------------------------------------------------------
|
|
# Phase 3: Email linking
|
|
# ------------------------------------------------------------------
|
|
|
|
def link_email(
|
|
self,
|
|
tenant_id: str,
|
|
site_id: str,
|
|
device_fingerprint: str,
|
|
email: str,
|
|
) -> dict[str, Any]:
|
|
"""Link an email address to a device fingerprint's consent.
|
|
|
|
Typically called after newsletter signup, account creation, or
|
|
login — any point where the user's email becomes known.
|
|
"""
|
|
tid = uuid.UUID(tenant_id)
|
|
if not email or "@" not in email:
|
|
raise ValidationError("Invalid email address")
|
|
|
|
consent = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.site_id == site_id,
|
|
BannerConsentDB.device_fingerprint == device_fingerprint,
|
|
)
|
|
.first()
|
|
)
|
|
if not consent:
|
|
raise NotFoundError("No consent found for this device")
|
|
|
|
consent.linked_email = email.lower().strip()
|
|
consent.updated_at = datetime.now(timezone.utc)
|
|
|
|
# Audit the linking
|
|
self.db.add(BannerConsentAuditLogDB(
|
|
tenant_id=tid,
|
|
consent_id=consent.id,
|
|
action="email_linked",
|
|
site_id=site_id,
|
|
device_fingerprint=device_fingerprint,
|
|
categories=consent.categories or [],
|
|
))
|
|
self.db.commit()
|
|
self.db.refresh(consent)
|
|
return consent_to_dict(consent)
|
|
|
|
def get_consents_by_email(
|
|
self, tenant_id: str, email: str,
|
|
) -> list[dict[str, Any]]:
|
|
"""Find all banner consents linked to an email (Art. 15 DSGVO)."""
|
|
tid = uuid.UUID(tenant_id)
|
|
consents = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.linked_email == email.lower().strip(),
|
|
)
|
|
.all()
|
|
)
|
|
return [consent_to_dict(c) for c in consents]
|
|
|
|
def delete_consents_by_email(
|
|
self, tenant_id: str, email: str,
|
|
) -> dict[str, Any]:
|
|
"""Delete all banner consents for an email (Art. 17 DSGVO erasure).
|
|
|
|
Creates audit log entries before deletion for compliance proof.
|
|
"""
|
|
tid = uuid.UUID(tenant_id)
|
|
normalized = email.lower().strip()
|
|
consents = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.linked_email == normalized,
|
|
)
|
|
.all()
|
|
)
|
|
deleted = 0
|
|
for c in consents:
|
|
self.db.add(BannerConsentAuditLogDB(
|
|
tenant_id=tid,
|
|
consent_id=c.id,
|
|
action="consent_deleted_dsr",
|
|
site_id=c.site_id,
|
|
device_fingerprint=c.device_fingerprint,
|
|
categories=c.categories or [],
|
|
))
|
|
self.db.delete(c)
|
|
deleted += 1
|
|
|
|
self.db.commit()
|
|
return {"deleted": deleted, "email": normalized}
|
|
|
|
def export_for_dsr(
|
|
self, tenant_id: str, email: str,
|
|
) -> dict[str, Any]:
|
|
"""Export all banner consent data for a DSR (Art. 15/20 DSGVO).
|
|
|
|
Returns consent records + audit trail for the email.
|
|
"""
|
|
tid = uuid.UUID(tenant_id)
|
|
normalized = email.lower().strip()
|
|
consents = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.linked_email == normalized,
|
|
)
|
|
.all()
|
|
)
|
|
consent_ids = [c.id for c in consents]
|
|
audit = []
|
|
if consent_ids:
|
|
audit = (
|
|
self.db.query(BannerConsentAuditLogDB)
|
|
.filter(BannerConsentAuditLogDB.consent_id.in_(consent_ids))
|
|
.order_by(BannerConsentAuditLogDB.created_at.desc())
|
|
.all()
|
|
)
|
|
return {
|
|
"email": normalized,
|
|
"banner_consents": [consent_to_dict(c) for c in consents],
|
|
"audit_trail": [
|
|
{
|
|
"id": str(a.id),
|
|
"action": a.action,
|
|
"site_id": a.site_id,
|
|
"categories": a.categories or [],
|
|
"banner_config_hash": a.banner_config_hash,
|
|
"consent_version": a.consent_version,
|
|
"created_at": a.created_at.isoformat() if a.created_at else None,
|
|
}
|
|
for a in audit
|
|
],
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Phase 4: Consent sync (Banner ↔ Einwilligungen)
|
|
# ------------------------------------------------------------------
|
|
|
|
def sync_consent_to_einwilligungen(
|
|
self,
|
|
tenant_id: str,
|
|
device_fingerprint: str,
|
|
email: str,
|
|
site_id: str,
|
|
) -> dict[str, Any]:
|
|
"""Sync banner consent categories to user-based Einwilligungen.
|
|
|
|
Called when a user logs in and their email becomes known.
|
|
Creates/updates EinwilligungenConsent entries for each accepted
|
|
banner category, bridging device-based and user-based systems.
|
|
"""
|
|
tid = uuid.UUID(tenant_id)
|
|
consent = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.site_id == site_id,
|
|
BannerConsentDB.device_fingerprint == device_fingerprint,
|
|
)
|
|
.first()
|
|
)
|
|
if not consent:
|
|
raise NotFoundError("No banner consent found for this device")
|
|
|
|
# Link email if not already linked
|
|
normalized = email.lower().strip()
|
|
if not consent.linked_email:
|
|
consent.linked_email = normalized
|
|
consent.updated_at = datetime.now(timezone.utc)
|
|
|
|
synced = 0
|
|
categories = consent.categories or []
|
|
for cat in categories:
|
|
data_point_id = f"banner_{cat}"
|
|
existing = (
|
|
self.db.query(EinwilligungenConsentDB)
|
|
.filter(
|
|
EinwilligungenConsentDB.tenant_id == tid,
|
|
EinwilligungenConsentDB.user_id == normalized,
|
|
EinwilligungenConsentDB.data_point_id == data_point_id,
|
|
)
|
|
.first()
|
|
)
|
|
now = datetime.now(timezone.utc)
|
|
if existing:
|
|
if not existing.granted:
|
|
existing.granted = True
|
|
existing.granted_at = now
|
|
existing.revoked_at = None
|
|
existing.source = "banner_sync"
|
|
self.db.add(EinwilligungenConsentHistoryDB(
|
|
consent_id=existing.id,
|
|
tenant_id=tid,
|
|
action="granted",
|
|
source="banner_sync",
|
|
))
|
|
synced += 1
|
|
else:
|
|
new_consent = EinwilligungenConsentDB(
|
|
tenant_id=tid,
|
|
user_id=normalized,
|
|
data_point_id=data_point_id,
|
|
granted=True,
|
|
granted_at=now,
|
|
consent_version="1",
|
|
source="banner_sync",
|
|
)
|
|
self.db.add(new_consent)
|
|
self.db.flush()
|
|
self.db.add(EinwilligungenConsentHistoryDB(
|
|
consent_id=new_consent.id,
|
|
tenant_id=tid,
|
|
action="granted",
|
|
source="banner_sync",
|
|
))
|
|
synced += 1
|
|
|
|
self.db.commit()
|
|
return {
|
|
"synced": synced,
|
|
"categories": categories,
|
|
"email": normalized,
|
|
}
|