Files
breakpilot-compliance/backend-compliance/compliance/services/banner_consent_service.py
T
Benjamin Admin 397de741c1 feat(cmp): Phase 2 — script blocking + cookie tracking
Migration 108: scripts_blocked, scripts_released, cookies_set JSONB columns.
Backend models/schema/service/serializer/routes extended.
Admin detail modal shows released scripts and set cookies with categories.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 22:52:26 +02:00

455 lines
17 KiB
Python

# mypy: disable-error-code="arg-type,assignment"
# SQLAlchemy 1.x Column() descriptors are Column[T] statically, T at runtime.
"""
Banner consent service — SDK-facing endpoints.
Phase 1 Step 4: extracted from ``compliance.api.banner_routes``.
Covers public device consent CRUD, site config retrieval (for banner
display), export, and per-site consent statistics.
Admin-side site/category/vendor management lives in
``compliance.services.banner_admin_service.BannerAdminService``.
DSR-facing email linking lives in
``compliance.services.banner_dsr_service.BannerDSRService``.
"""
import hashlib
import json
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy.orm import Session
from compliance.db.banner_models import (
BannerCategoryConfigDB,
BannerConsentAuditLogDB,
BannerConsentDB,
BannerSiteConfigDB,
BannerVendorConfigDB,
)
from compliance.domain import NotFoundError, ValidationError
from compliance.services._banner_serializers import (
category_to_dict,
consent_to_dict,
site_config_to_dict,
vendor_to_dict,
)
# Default consent expiration per banner category (days).
# Based on: DSGVO Art. 5(1)(e), CNIL guidelines, EDPB recommendations.
CATEGORY_RETENTION_DAYS = {
"necessary": 365, # Session + functional = max 12 months
"statistics": 790, # Max 26 months (Google Analytics default)
"marketing": 90, # Max 90 days for retargeting
"functional": 365, # Max 12 months
}
class BannerConsentService:
"""Business logic for public SDK banner consent endpoints."""
def __init__(self, db: Session) -> None:
self.db = db
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _hash_ip(ip: Optional[str]) -> Optional[str]:
if not ip:
return None
return hashlib.sha256(ip.encode()).hexdigest()[:16]
def _log(
self,
tenant_id: uuid.UUID,
consent_id: Any,
action: str,
site_id: str,
device_fingerprint: Optional[str] = None,
categories: Optional[list[str]] = None,
ip_hash: Optional[str] = None,
banner_config_hash: Optional[str] = None,
consent_version: Optional[int] = None,
vendor_consents: Optional[dict[str, bool]] = None,
user_agent: Optional[str] = None,
*,
consent_method: Optional[str] = None,
page_url: Optional[str] = None,
) -> None:
entry = BannerConsentAuditLogDB(
tenant_id=tenant_id,
consent_id=consent_id,
action=action,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories or [],
vendor_consents=vendor_consents or {},
ip_hash=ip_hash,
user_agent=user_agent,
banner_config_hash=banner_config_hash,
consent_version=consent_version,
consent_method=consent_method,
page_url=page_url,
)
self.db.add(entry)
def _compute_config_hash(self, tenant_id: uuid.UUID, site_id: str) -> tuple[Optional[str], Optional[int]]:
"""Compute SHA256 hash of current site config for consent proof (Art. 7(1) DSGVO)."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(
BannerSiteConfigDB.tenant_id == tenant_id,
BannerSiteConfigDB.site_id == site_id,
)
.first()
)
if not config:
return None, None
snapshot = json.dumps({
"banner_title": config.banner_title,
"banner_description": config.banner_description,
"privacy_url": config.privacy_url,
"imprint_url": config.imprint_url,
}, sort_keys=True)
return hashlib.sha256(snapshot.encode()).hexdigest()[:32], config.config_version
def _get_max_retention(self, tenant_id: uuid.UUID, site_id: str, categories: list[str]) -> int:
"""Determine consent expiration based on accepted categories and vendor retention."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(BannerSiteConfigDB.tenant_id == tenant_id, BannerSiteConfigDB.site_id == site_id)
.first()
)
if not config:
return 365
vendors = (
self.db.query(BannerVendorConfigDB)
.filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.category_key.in_(categories),
BannerVendorConfigDB.is_active,
)
.all()
)
if vendors:
return max(v.retention_days for v in vendors if v.retention_days)
return max((CATEGORY_RETENTION_DAYS.get(c, 365) for c in categories), default=365)
def _maybe_generate_tc_string(
self, tenant_id: uuid.UUID, site_id: str, categories: list[str],
) -> Optional[str]:
"""Generate TC String if TCF is enabled for this site."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(BannerSiteConfigDB.tenant_id == tenant_id, BannerSiteConfigDB.site_id == site_id)
.first()
)
if not config or not config.tcf_enabled:
return None
try:
from compliance.services.tcf_encoder_service import TCFEncoderService
encoder = TCFEncoderService()
return encoder.encode_from_categories(categories)
except Exception:
return None
# ------------------------------------------------------------------
# Consent CRUD (public SDK)
# ------------------------------------------------------------------
def record_consent(
self,
tenant_id: str,
site_id: str,
device_fingerprint: str,
categories: list[str],
vendors: list[str],
ip_address: Optional[str],
user_agent: Optional[str],
consent_string: Optional[str],
vendor_consents: Optional[dict[str, bool]] = None,
*,
consent_method: Optional[str] = None,
page_url: Optional[str] = None,
referrer: Optional[str] = None,
device_type: Optional[str] = None,
browser: Optional[str] = None,
os: Optional[str] = None,
screen_resolution: Optional[str] = None,
session_id: Optional[str] = None,
consent_scope: Optional[str] = None,
scripts_blocked: Optional[list[dict]] = None,
scripts_released: Optional[list[dict]] = None,
cookies_set: Optional[list[dict]] = None,
) -> dict[str, Any]:
"""Upsert a device consent row for (tenant, site, device_fingerprint).
Expiration is derived from the maximum vendor retention for the
accepted categories (Phase 2 — DSGVO Art. 5(1)(e)).
A SHA256 hash of the banner config is stored in the audit log
for consent proof (Phase 6 — Art. 7(1) DSGVO).
"""
tid = uuid.UUID(tenant_id)
ip_hash = self._hash_ip(ip_address)
now = datetime.now(timezone.utc)
retention = self._get_max_retention(tid, site_id, categories)
expires_at = now + timedelta(days=retention)
config_hash, config_ver = self._compute_config_hash(tid, site_id)
# Auto-generate TC String if TCF is enabled for this site
if not consent_string:
consent_string = self._maybe_generate_tc_string(tid, site_id, categories)
# Vendor-agnostische Zusatzfelder
extra = {
"consent_method": consent_method,
"banner_version": config_ver,
"banner_config_hash": config_hash,
"page_url": page_url,
"referrer": referrer,
"device_type": device_type,
"browser": browser,
"os": os,
"screen_resolution": screen_resolution,
"session_id": session_id,
"consent_scope": consent_scope or "domain",
"scripts_blocked": scripts_blocked or [],
"scripts_released": scripts_released or [],
"cookies_set": cookies_set or [],
}
existing = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.first()
)
if existing:
existing.categories = categories
existing.vendors = vendors
existing.vendor_consents = vendor_consents or {}
existing.ip_hash = ip_hash
existing.user_agent = user_agent
existing.consent_string = consent_string
existing.expires_at = expires_at
existing.updated_at = now
for key, val in extra.items():
setattr(existing, key, val)
self.db.flush()
self._log(
tid, existing.id, "consent_updated", site_id, device_fingerprint,
categories, ip_hash, config_hash, config_ver,
vendor_consents=vendor_consents, user_agent=user_agent,
consent_method=consent_method, page_url=page_url,
)
self.db.commit()
self.db.refresh(existing)
return consent_to_dict(existing)
consent = BannerConsentDB(
tenant_id=tid,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories,
vendors=vendors,
vendor_consents=vendor_consents or {},
ip_hash=ip_hash,
user_agent=user_agent,
consent_string=consent_string,
expires_at=expires_at,
**extra,
)
self.db.add(consent)
self.db.flush()
self._log(
tid, consent.id, "consent_given", site_id, device_fingerprint,
categories, ip_hash, config_hash, config_ver,
vendor_consents=vendor_consents, user_agent=user_agent,
consent_method=consent_method, page_url=page_url,
)
self.db.commit()
self.db.refresh(consent)
return consent_to_dict(consent)
def get_consent(
self, tenant_id: str, site_id: str, device_fingerprint: str
) -> dict[str, Any]:
"""Return the consent envelope for a device, or has_consent=false."""
tid = uuid.UUID(tenant_id)
consent = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.first()
)
if not consent:
return {"has_consent": False, "consent": None}
return {"has_consent": True, "consent": consent_to_dict(consent)}
def withdraw_consent(self, tenant_id: str, consent_id: str) -> dict[str, Any]:
"""Delete a consent row + audit the withdrawal."""
tid = uuid.UUID(tenant_id)
try:
cid = uuid.UUID(consent_id)
except ValueError as exc:
raise ValidationError("Invalid consent ID") from exc
consent = (
self.db.query(BannerConsentDB)
.filter(BannerConsentDB.id == cid, BannerConsentDB.tenant_id == tid)
.first()
)
if not consent:
raise NotFoundError("Consent not found")
self._log(
tid, cid, "consent_withdrawn", consent.site_id, consent.device_fingerprint,
)
self.db.delete(consent)
self.db.commit()
return {"success": True, "message": "Consent withdrawn"}
# ------------------------------------------------------------------
# Site config retrieval (SDK embed)
# ------------------------------------------------------------------
def get_site_config(self, tenant_id: str, site_id: str) -> dict[str, Any]:
"""Load site config + active categories + active vendors for banner display."""
tid = uuid.UUID(tenant_id)
config = (
self.db.query(BannerSiteConfigDB)
.filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
)
.first()
)
if not config:
return {
"site_id": site_id,
"banner_title": "Cookie-Einstellungen",
"banner_description": (
"Wir verwenden Cookies, um Ihnen die bestmoegliche Erfahrung zu bieten."
),
"categories": [],
"vendors": [],
}
categories = (
self.db.query(BannerCategoryConfigDB)
.filter(
BannerCategoryConfigDB.site_config_id == config.id,
BannerCategoryConfigDB.is_active,
)
.order_by(BannerCategoryConfigDB.sort_order)
.all()
)
vendors = (
self.db.query(BannerVendorConfigDB)
.filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.is_active,
)
.all()
)
result = site_config_to_dict(config)
result["categories"] = [category_to_dict(c) for c in categories]
result["vendors"] = [vendor_to_dict(v) for v in vendors]
return result
# ------------------------------------------------------------------
# DSGVO export + stats
# ------------------------------------------------------------------
def export_consent(
self, tenant_id: str, site_id: str, device_fingerprint: str
) -> dict[str, Any]:
"""DSGVO export of all consent + audit rows for a device."""
tid = uuid.UUID(tenant_id)
consents = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.all()
)
audit = (
self.db.query(BannerConsentAuditLogDB)
.filter(
BannerConsentAuditLogDB.tenant_id == tid,
BannerConsentAuditLogDB.site_id == site_id,
BannerConsentAuditLogDB.device_fingerprint == device_fingerprint,
)
.order_by(BannerConsentAuditLogDB.created_at.desc())
.all()
)
return {
"device_fingerprint": device_fingerprint,
"site_id": site_id,
"consents": [consent_to_dict(c) for c in consents],
"audit_trail": [
{
"id": str(a.id),
"action": a.action,
"categories": a.categories or [],
"created_at": a.created_at.isoformat() if a.created_at else None,
}
for a in audit
],
}
def list_consents(
self, tenant_id: str, site_id: Optional[str] = None,
limit: int = 50, offset: int = 0,
) -> dict[str, Any]:
"""List paginated banner consents for admin dashboard."""
tid = uuid.UUID(tenant_id)
base = self.db.query(BannerConsentDB).filter(BannerConsentDB.tenant_id == tid)
if site_id:
base = base.filter(BannerConsentDB.site_id == site_id)
total = base.count()
rows = base.order_by(BannerConsentDB.created_at.desc()).offset(offset).limit(limit).all()
return {
"consents": [consent_to_dict(c) for c in rows],
"total": total,
"limit": limit,
"offset": offset,
}
def get_site_stats(self, tenant_id: str, site_id: str) -> dict[str, Any]:
"""Compute consent count + per-category acceptance rates for a site."""
tid = uuid.UUID(tenant_id)
base = self.db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
)
total = base.count()
category_stats: dict[str, int] = {}
for c in base.all():
cats: list[str] = list(c.categories or [])
for cat in cats:
category_stats[cat] = category_stats.get(cat, 0) + 1
return {
"site_id": site_id,
"total_consents": total,
"category_acceptance": {
cat: {
"count": count,
"rate": round(count / total * 100, 1) if total > 0 else 0,
}
for cat, count in category_stats.items()
},
}