Files
breakpilot-compliance/backend-compliance/compliance/services/banner_consent_service.py
T
Benjamin Admin 44acd68c96 feat: Cookie-Banner ↔ Backend Integration (DSR, Retention, Consent Proof)
Phase 1: Vendor sync from service registry (82+ services → banner vendors)
Phase 2: Category-based retention (marketing=90d, statistics=790d, not hardcoded 365d)
Phase 3: DSR ↔ Banner email linking (link-email, by-email, Art.17 erasure, Art.15/20 export)
Phase 4: Consent sync (Banner → Einwilligungen bridge)
Phase 6: Consent proof (SHA256 config hash + config_version in audit log, Art. 7(1) DSGVO)

New files:
- banner_dsr_service.py — email linking + DSR integration
- vendor_banner_sync.py — service registry → vendor configs
- migration 106 — linked_email, banner_config_hash, consent_version columns

Tests: 20+ new backend tests + 2 Playwright E2E test suites (API + UI)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-02 19:55:13 +02:00

365 lines
13 KiB
Python

# mypy: disable-error-code="arg-type,assignment"
# SQLAlchemy 1.x Column() descriptors are Column[T] statically, T at runtime.
"""
Banner consent service — SDK-facing endpoints.
Phase 1 Step 4: extracted from ``compliance.api.banner_routes``.
Covers public device consent CRUD, site config retrieval (for banner
display), export, and per-site consent statistics.
Admin-side site/category/vendor management lives in
``compliance.services.banner_admin_service.BannerAdminService``.
DSR-facing email linking lives in
``compliance.services.banner_dsr_service.BannerDSRService``.
"""
import hashlib
import json
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy.orm import Session
from compliance.db.banner_models import (
BannerCategoryConfigDB,
BannerConsentAuditLogDB,
BannerConsentDB,
BannerSiteConfigDB,
BannerVendorConfigDB,
)
from compliance.domain import NotFoundError, ValidationError
from compliance.services._banner_serializers import (
category_to_dict,
consent_to_dict,
site_config_to_dict,
vendor_to_dict,
)
# Default consent expiration per banner category (days).
# Based on: DSGVO Art. 5(1)(e), CNIL guidelines, EDPB recommendations.
CATEGORY_RETENTION_DAYS = {
"necessary": 365, # Session + functional = max 12 months
"statistics": 790, # Max 26 months (Google Analytics default)
"marketing": 90, # Max 90 days for retargeting
"functional": 365, # Max 12 months
}
class BannerConsentService:
"""Business logic for public SDK banner consent endpoints."""
def __init__(self, db: Session) -> None:
self.db = db
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _hash_ip(ip: Optional[str]) -> Optional[str]:
if not ip:
return None
return hashlib.sha256(ip.encode()).hexdigest()[:16]
def _log(
self,
tenant_id: uuid.UUID,
consent_id: Any,
action: str,
site_id: str,
device_fingerprint: Optional[str] = None,
categories: Optional[list[str]] = None,
ip_hash: Optional[str] = None,
banner_config_hash: Optional[str] = None,
consent_version: Optional[int] = None,
) -> None:
entry = BannerConsentAuditLogDB(
tenant_id=tenant_id,
consent_id=consent_id,
action=action,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories or [],
ip_hash=ip_hash,
banner_config_hash=banner_config_hash,
consent_version=consent_version,
)
self.db.add(entry)
def _compute_config_hash(self, tenant_id: uuid.UUID, site_id: str) -> tuple[Optional[str], Optional[int]]:
"""Compute SHA256 hash of current site config for consent proof (Art. 7(1) DSGVO)."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(
BannerSiteConfigDB.tenant_id == tenant_id,
BannerSiteConfigDB.site_id == site_id,
)
.first()
)
if not config:
return None, None
snapshot = json.dumps({
"banner_title": config.banner_title,
"banner_description": config.banner_description,
"privacy_url": config.privacy_url,
"imprint_url": config.imprint_url,
}, sort_keys=True)
return hashlib.sha256(snapshot.encode()).hexdigest()[:32], config.config_version
def _get_max_retention(self, tenant_id: uuid.UUID, site_id: str, categories: list[str]) -> int:
"""Determine consent expiration based on accepted categories and vendor retention."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(BannerSiteConfigDB.tenant_id == tenant_id, BannerSiteConfigDB.site_id == site_id)
.first()
)
if not config:
return 365
vendors = (
self.db.query(BannerVendorConfigDB)
.filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.category_key.in_(categories),
BannerVendorConfigDB.is_active,
)
.all()
)
if vendors:
return max(v.retention_days for v in vendors if v.retention_days)
return max((CATEGORY_RETENTION_DAYS.get(c, 365) for c in categories), default=365)
# ------------------------------------------------------------------
# Consent CRUD (public SDK)
# ------------------------------------------------------------------
def record_consent(
self,
tenant_id: str,
site_id: str,
device_fingerprint: str,
categories: list[str],
vendors: list[str],
ip_address: Optional[str],
user_agent: Optional[str],
consent_string: Optional[str],
) -> dict[str, Any]:
"""Upsert a device consent row for (tenant, site, device_fingerprint).
Expiration is derived from the maximum vendor retention for the
accepted categories (Phase 2 — DSGVO Art. 5(1)(e)).
A SHA256 hash of the banner config is stored in the audit log
for consent proof (Phase 6 — Art. 7(1) DSGVO).
"""
tid = uuid.UUID(tenant_id)
ip_hash = self._hash_ip(ip_address)
now = datetime.now(timezone.utc)
retention = self._get_max_retention(tid, site_id, categories)
expires_at = now + timedelta(days=retention)
config_hash, config_ver = self._compute_config_hash(tid, site_id)
existing = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.first()
)
if existing:
existing.categories = categories
existing.vendors = vendors
existing.ip_hash = ip_hash
existing.user_agent = user_agent
existing.consent_string = consent_string
existing.expires_at = expires_at
existing.updated_at = now
self.db.flush()
self._log(
tid, existing.id, "consent_updated", site_id, device_fingerprint,
categories, ip_hash, config_hash, config_ver,
)
self.db.commit()
self.db.refresh(existing)
return consent_to_dict(existing)
consent = BannerConsentDB(
tenant_id=tid,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories,
vendors=vendors,
ip_hash=ip_hash,
user_agent=user_agent,
consent_string=consent_string,
expires_at=expires_at,
)
self.db.add(consent)
self.db.flush()
self._log(
tid, consent.id, "consent_given", site_id, device_fingerprint,
categories, ip_hash, config_hash, config_ver,
)
self.db.commit()
self.db.refresh(consent)
return consent_to_dict(consent)
def get_consent(
self, tenant_id: str, site_id: str, device_fingerprint: str
) -> dict[str, Any]:
"""Return the consent envelope for a device, or has_consent=false."""
tid = uuid.UUID(tenant_id)
consent = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.first()
)
if not consent:
return {"has_consent": False, "consent": None}
return {"has_consent": True, "consent": consent_to_dict(consent)}
def withdraw_consent(self, tenant_id: str, consent_id: str) -> dict[str, Any]:
"""Delete a consent row + audit the withdrawal."""
tid = uuid.UUID(tenant_id)
try:
cid = uuid.UUID(consent_id)
except ValueError as exc:
raise ValidationError("Invalid consent ID") from exc
consent = (
self.db.query(BannerConsentDB)
.filter(BannerConsentDB.id == cid, BannerConsentDB.tenant_id == tid)
.first()
)
if not consent:
raise NotFoundError("Consent not found")
self._log(
tid, cid, "consent_withdrawn", consent.site_id, consent.device_fingerprint,
)
self.db.delete(consent)
self.db.commit()
return {"success": True, "message": "Consent withdrawn"}
# ------------------------------------------------------------------
# Site config retrieval (SDK embed)
# ------------------------------------------------------------------
def get_site_config(self, tenant_id: str, site_id: str) -> dict[str, Any]:
"""Load site config + active categories + active vendors for banner display."""
tid = uuid.UUID(tenant_id)
config = (
self.db.query(BannerSiteConfigDB)
.filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
)
.first()
)
if not config:
return {
"site_id": site_id,
"banner_title": "Cookie-Einstellungen",
"banner_description": (
"Wir verwenden Cookies, um Ihnen die bestmoegliche Erfahrung zu bieten."
),
"categories": [],
"vendors": [],
}
categories = (
self.db.query(BannerCategoryConfigDB)
.filter(
BannerCategoryConfigDB.site_config_id == config.id,
BannerCategoryConfigDB.is_active,
)
.order_by(BannerCategoryConfigDB.sort_order)
.all()
)
vendors = (
self.db.query(BannerVendorConfigDB)
.filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.is_active,
)
.all()
)
result = site_config_to_dict(config)
result["categories"] = [category_to_dict(c) for c in categories]
result["vendors"] = [vendor_to_dict(v) for v in vendors]
return result
# ------------------------------------------------------------------
# DSGVO export + stats
# ------------------------------------------------------------------
def export_consent(
self, tenant_id: str, site_id: str, device_fingerprint: str
) -> dict[str, Any]:
"""DSGVO export of all consent + audit rows for a device."""
tid = uuid.UUID(tenant_id)
consents = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.all()
)
audit = (
self.db.query(BannerConsentAuditLogDB)
.filter(
BannerConsentAuditLogDB.tenant_id == tid,
BannerConsentAuditLogDB.site_id == site_id,
BannerConsentAuditLogDB.device_fingerprint == device_fingerprint,
)
.order_by(BannerConsentAuditLogDB.created_at.desc())
.all()
)
return {
"device_fingerprint": device_fingerprint,
"site_id": site_id,
"consents": [consent_to_dict(c) for c in consents],
"audit_trail": [
{
"id": str(a.id),
"action": a.action,
"categories": a.categories or [],
"created_at": a.created_at.isoformat() if a.created_at else None,
}
for a in audit
],
}
def get_site_stats(self, tenant_id: str, site_id: str) -> dict[str, Any]:
"""Compute consent count + per-category acceptance rates for a site."""
tid = uuid.UUID(tenant_id)
base = self.db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
)
total = base.count()
category_stats: dict[str, int] = {}
for c in base.all():
cats: list[str] = list(c.categories or [])
for cat in cats:
category_stats[cat] = category_stats.get(cat, 0) + 1
return {
"site_id": site_id,
"total_consents": total,
"category_acceptance": {
cat: {
"count": count,
"rate": round(count / total * 100, 1) if total > 0 else 0,
}
for cat, count in category_stats.items()
},
}