44acd68c96
Phase 1: Vendor sync from service registry (82+ services → banner vendors) Phase 2: Category-based retention (marketing=90d, statistics=790d, not hardcoded 365d) Phase 3: DSR ↔ Banner email linking (link-email, by-email, Art.17 erasure, Art.15/20 export) Phase 4: Consent sync (Banner → Einwilligungen bridge) Phase 6: Consent proof (SHA256 config hash + config_version in audit log, Art. 7(1) DSGVO) New files: - banner_dsr_service.py — email linking + DSR integration - vendor_banner_sync.py — service registry → vendor configs - migration 106 — linked_email, banner_config_hash, consent_version columns Tests: 20+ new backend tests + 2 Playwright E2E test suites (API + UI) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
365 lines
13 KiB
Python
365 lines
13 KiB
Python
# mypy: disable-error-code="arg-type,assignment"
|
|
# SQLAlchemy 1.x Column() descriptors are Column[T] statically, T at runtime.
|
|
"""
|
|
Banner consent service — SDK-facing endpoints.
|
|
|
|
Phase 1 Step 4: extracted from ``compliance.api.banner_routes``.
|
|
Covers public device consent CRUD, site config retrieval (for banner
|
|
display), export, and per-site consent statistics.
|
|
|
|
Admin-side site/category/vendor management lives in
|
|
``compliance.services.banner_admin_service.BannerAdminService``.
|
|
DSR-facing email linking lives in
|
|
``compliance.services.banner_dsr_service.BannerDSRService``.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from compliance.db.banner_models import (
|
|
BannerCategoryConfigDB,
|
|
BannerConsentAuditLogDB,
|
|
BannerConsentDB,
|
|
BannerSiteConfigDB,
|
|
BannerVendorConfigDB,
|
|
)
|
|
from compliance.domain import NotFoundError, ValidationError
|
|
from compliance.services._banner_serializers import (
|
|
category_to_dict,
|
|
consent_to_dict,
|
|
site_config_to_dict,
|
|
vendor_to_dict,
|
|
)
|
|
|
|
# Default consent expiration per banner category (days).
|
|
# Based on: DSGVO Art. 5(1)(e), CNIL guidelines, EDPB recommendations.
|
|
CATEGORY_RETENTION_DAYS = {
|
|
"necessary": 365, # Session + functional = max 12 months
|
|
"statistics": 790, # Max 26 months (Google Analytics default)
|
|
"marketing": 90, # Max 90 days for retargeting
|
|
"functional": 365, # Max 12 months
|
|
}
|
|
|
|
|
|
class BannerConsentService:
|
|
"""Business logic for public SDK banner consent endpoints."""
|
|
|
|
def __init__(self, db: Session) -> None:
|
|
self.db = db
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _hash_ip(ip: Optional[str]) -> Optional[str]:
|
|
if not ip:
|
|
return None
|
|
return hashlib.sha256(ip.encode()).hexdigest()[:16]
|
|
|
|
def _log(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
consent_id: Any,
|
|
action: str,
|
|
site_id: str,
|
|
device_fingerprint: Optional[str] = None,
|
|
categories: Optional[list[str]] = None,
|
|
ip_hash: Optional[str] = None,
|
|
banner_config_hash: Optional[str] = None,
|
|
consent_version: Optional[int] = None,
|
|
) -> None:
|
|
entry = BannerConsentAuditLogDB(
|
|
tenant_id=tenant_id,
|
|
consent_id=consent_id,
|
|
action=action,
|
|
site_id=site_id,
|
|
device_fingerprint=device_fingerprint,
|
|
categories=categories or [],
|
|
ip_hash=ip_hash,
|
|
banner_config_hash=banner_config_hash,
|
|
consent_version=consent_version,
|
|
)
|
|
self.db.add(entry)
|
|
|
|
def _compute_config_hash(self, tenant_id: uuid.UUID, site_id: str) -> tuple[Optional[str], Optional[int]]:
|
|
"""Compute SHA256 hash of current site config for consent proof (Art. 7(1) DSGVO)."""
|
|
config = (
|
|
self.db.query(BannerSiteConfigDB)
|
|
.filter(
|
|
BannerSiteConfigDB.tenant_id == tenant_id,
|
|
BannerSiteConfigDB.site_id == site_id,
|
|
)
|
|
.first()
|
|
)
|
|
if not config:
|
|
return None, None
|
|
snapshot = json.dumps({
|
|
"banner_title": config.banner_title,
|
|
"banner_description": config.banner_description,
|
|
"privacy_url": config.privacy_url,
|
|
"imprint_url": config.imprint_url,
|
|
}, sort_keys=True)
|
|
return hashlib.sha256(snapshot.encode()).hexdigest()[:32], config.config_version
|
|
|
|
def _get_max_retention(self, tenant_id: uuid.UUID, site_id: str, categories: list[str]) -> int:
|
|
"""Determine consent expiration based on accepted categories and vendor retention."""
|
|
config = (
|
|
self.db.query(BannerSiteConfigDB)
|
|
.filter(BannerSiteConfigDB.tenant_id == tenant_id, BannerSiteConfigDB.site_id == site_id)
|
|
.first()
|
|
)
|
|
if not config:
|
|
return 365
|
|
vendors = (
|
|
self.db.query(BannerVendorConfigDB)
|
|
.filter(
|
|
BannerVendorConfigDB.site_config_id == config.id,
|
|
BannerVendorConfigDB.category_key.in_(categories),
|
|
BannerVendorConfigDB.is_active,
|
|
)
|
|
.all()
|
|
)
|
|
if vendors:
|
|
return max(v.retention_days for v in vendors if v.retention_days)
|
|
return max((CATEGORY_RETENTION_DAYS.get(c, 365) for c in categories), default=365)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Consent CRUD (public SDK)
|
|
# ------------------------------------------------------------------
|
|
|
|
def record_consent(
|
|
self,
|
|
tenant_id: str,
|
|
site_id: str,
|
|
device_fingerprint: str,
|
|
categories: list[str],
|
|
vendors: list[str],
|
|
ip_address: Optional[str],
|
|
user_agent: Optional[str],
|
|
consent_string: Optional[str],
|
|
) -> dict[str, Any]:
|
|
"""Upsert a device consent row for (tenant, site, device_fingerprint).
|
|
|
|
Expiration is derived from the maximum vendor retention for the
|
|
accepted categories (Phase 2 — DSGVO Art. 5(1)(e)).
|
|
A SHA256 hash of the banner config is stored in the audit log
|
|
for consent proof (Phase 6 — Art. 7(1) DSGVO).
|
|
"""
|
|
tid = uuid.UUID(tenant_id)
|
|
ip_hash = self._hash_ip(ip_address)
|
|
now = datetime.now(timezone.utc)
|
|
retention = self._get_max_retention(tid, site_id, categories)
|
|
expires_at = now + timedelta(days=retention)
|
|
config_hash, config_ver = self._compute_config_hash(tid, site_id)
|
|
|
|
existing = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.site_id == site_id,
|
|
BannerConsentDB.device_fingerprint == device_fingerprint,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if existing:
|
|
existing.categories = categories
|
|
existing.vendors = vendors
|
|
existing.ip_hash = ip_hash
|
|
existing.user_agent = user_agent
|
|
existing.consent_string = consent_string
|
|
existing.expires_at = expires_at
|
|
existing.updated_at = now
|
|
self.db.flush()
|
|
self._log(
|
|
tid, existing.id, "consent_updated", site_id, device_fingerprint,
|
|
categories, ip_hash, config_hash, config_ver,
|
|
)
|
|
self.db.commit()
|
|
self.db.refresh(existing)
|
|
return consent_to_dict(existing)
|
|
|
|
consent = BannerConsentDB(
|
|
tenant_id=tid,
|
|
site_id=site_id,
|
|
device_fingerprint=device_fingerprint,
|
|
categories=categories,
|
|
vendors=vendors,
|
|
ip_hash=ip_hash,
|
|
user_agent=user_agent,
|
|
consent_string=consent_string,
|
|
expires_at=expires_at,
|
|
)
|
|
self.db.add(consent)
|
|
self.db.flush()
|
|
self._log(
|
|
tid, consent.id, "consent_given", site_id, device_fingerprint,
|
|
categories, ip_hash, config_hash, config_ver,
|
|
)
|
|
self.db.commit()
|
|
self.db.refresh(consent)
|
|
return consent_to_dict(consent)
|
|
|
|
def get_consent(
|
|
self, tenant_id: str, site_id: str, device_fingerprint: str
|
|
) -> dict[str, Any]:
|
|
"""Return the consent envelope for a device, or has_consent=false."""
|
|
tid = uuid.UUID(tenant_id)
|
|
consent = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.site_id == site_id,
|
|
BannerConsentDB.device_fingerprint == device_fingerprint,
|
|
)
|
|
.first()
|
|
)
|
|
if not consent:
|
|
return {"has_consent": False, "consent": None}
|
|
return {"has_consent": True, "consent": consent_to_dict(consent)}
|
|
|
|
def withdraw_consent(self, tenant_id: str, consent_id: str) -> dict[str, Any]:
|
|
"""Delete a consent row + audit the withdrawal."""
|
|
tid = uuid.UUID(tenant_id)
|
|
try:
|
|
cid = uuid.UUID(consent_id)
|
|
except ValueError as exc:
|
|
raise ValidationError("Invalid consent ID") from exc
|
|
|
|
consent = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(BannerConsentDB.id == cid, BannerConsentDB.tenant_id == tid)
|
|
.first()
|
|
)
|
|
if not consent:
|
|
raise NotFoundError("Consent not found")
|
|
|
|
self._log(
|
|
tid, cid, "consent_withdrawn", consent.site_id, consent.device_fingerprint,
|
|
)
|
|
self.db.delete(consent)
|
|
self.db.commit()
|
|
return {"success": True, "message": "Consent withdrawn"}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Site config retrieval (SDK embed)
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_site_config(self, tenant_id: str, site_id: str) -> dict[str, Any]:
|
|
"""Load site config + active categories + active vendors for banner display."""
|
|
tid = uuid.UUID(tenant_id)
|
|
config = (
|
|
self.db.query(BannerSiteConfigDB)
|
|
.filter(
|
|
BannerSiteConfigDB.tenant_id == tid,
|
|
BannerSiteConfigDB.site_id == site_id,
|
|
)
|
|
.first()
|
|
)
|
|
if not config:
|
|
return {
|
|
"site_id": site_id,
|
|
"banner_title": "Cookie-Einstellungen",
|
|
"banner_description": (
|
|
"Wir verwenden Cookies, um Ihnen die bestmoegliche Erfahrung zu bieten."
|
|
),
|
|
"categories": [],
|
|
"vendors": [],
|
|
}
|
|
|
|
categories = (
|
|
self.db.query(BannerCategoryConfigDB)
|
|
.filter(
|
|
BannerCategoryConfigDB.site_config_id == config.id,
|
|
BannerCategoryConfigDB.is_active,
|
|
)
|
|
.order_by(BannerCategoryConfigDB.sort_order)
|
|
.all()
|
|
)
|
|
vendors = (
|
|
self.db.query(BannerVendorConfigDB)
|
|
.filter(
|
|
BannerVendorConfigDB.site_config_id == config.id,
|
|
BannerVendorConfigDB.is_active,
|
|
)
|
|
.all()
|
|
)
|
|
result = site_config_to_dict(config)
|
|
result["categories"] = [category_to_dict(c) for c in categories]
|
|
result["vendors"] = [vendor_to_dict(v) for v in vendors]
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# DSGVO export + stats
|
|
# ------------------------------------------------------------------
|
|
|
|
def export_consent(
|
|
self, tenant_id: str, site_id: str, device_fingerprint: str
|
|
) -> dict[str, Any]:
|
|
"""DSGVO export of all consent + audit rows for a device."""
|
|
tid = uuid.UUID(tenant_id)
|
|
consents = (
|
|
self.db.query(BannerConsentDB)
|
|
.filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.site_id == site_id,
|
|
BannerConsentDB.device_fingerprint == device_fingerprint,
|
|
)
|
|
.all()
|
|
)
|
|
audit = (
|
|
self.db.query(BannerConsentAuditLogDB)
|
|
.filter(
|
|
BannerConsentAuditLogDB.tenant_id == tid,
|
|
BannerConsentAuditLogDB.site_id == site_id,
|
|
BannerConsentAuditLogDB.device_fingerprint == device_fingerprint,
|
|
)
|
|
.order_by(BannerConsentAuditLogDB.created_at.desc())
|
|
.all()
|
|
)
|
|
return {
|
|
"device_fingerprint": device_fingerprint,
|
|
"site_id": site_id,
|
|
"consents": [consent_to_dict(c) for c in consents],
|
|
"audit_trail": [
|
|
{
|
|
"id": str(a.id),
|
|
"action": a.action,
|
|
"categories": a.categories or [],
|
|
"created_at": a.created_at.isoformat() if a.created_at else None,
|
|
}
|
|
for a in audit
|
|
],
|
|
}
|
|
|
|
def get_site_stats(self, tenant_id: str, site_id: str) -> dict[str, Any]:
|
|
"""Compute consent count + per-category acceptance rates for a site."""
|
|
tid = uuid.UUID(tenant_id)
|
|
base = self.db.query(BannerConsentDB).filter(
|
|
BannerConsentDB.tenant_id == tid,
|
|
BannerConsentDB.site_id == site_id,
|
|
)
|
|
total = base.count()
|
|
category_stats: dict[str, int] = {}
|
|
for c in base.all():
|
|
cats: list[str] = list(c.categories or [])
|
|
for cat in cats:
|
|
category_stats[cat] = category_stats.get(cat, 0) + 1
|
|
return {
|
|
"site_id": site_id,
|
|
"total_consents": total,
|
|
"category_acceptance": {
|
|
cat: {
|
|
"count": count,
|
|
"rate": round(count / total * 100, 1) if total > 0 else 0,
|
|
}
|
|
for cat, count in category_stats.items()
|
|
},
|
|
}
|