Files
breakpilot-compliance/backend-compliance/compliance/services/banner_consent_service.py
T
Benjamin Admin 289ec5f396
Build + Deploy / build-admin-compliance (push) Successful in 2m28s
Build + Deploy / build-backend-compliance (push) Successful in 3m48s
Build + Deploy / build-ai-sdk (push) Failing after 45s
Build + Deploy / build-developer-portal (push) Successful in 1m28s
Build + Deploy / build-tts (push) Successful in 1m48s
Build + Deploy / build-document-crawler (push) Successful in 48s
Build + Deploy / build-dsms-gateway (push) Successful in 34s
Build + Deploy / build-dsms-node (push) Successful in 20s
CI / branch-name (push) Has been skipped
Build + Deploy / trigger-orca (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 24s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m1s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 49s
CI / test-python-backend (push) Successful in 45s
CI / test-python-document-crawler (push) Successful in 31s
CI / test-python-dsms-gateway (push) Successful in 27s
CI / validate-canonical-controls (push) Successful in 18s
feat(cmp): vendor-agnostic consent data model — 13 new fields
Extend banner consent records with consent_method, banner_version,
banner_config_hash, geo, page_url, referrer, device info, session_id
and consent_scope for full Art. 7 DSGVO proof with any tracking vendor.

Migration 107, backward-compatible (all fields nullable).
Admin detail modal shows tracking context, device info and technical data.
Fix pre-existing str|None → Optional[str] for Python 3.9 compat.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-10 23:12:20 +02:00

462 lines
17 KiB
Python

# mypy: disable-error-code="arg-type,assignment"
# SQLAlchemy 1.x Column() descriptors are Column[T] statically, T at runtime.
"""
Banner consent service — SDK-facing endpoints.
Phase 1 Step 4: extracted from ``compliance.api.banner_routes``.
Covers public device consent CRUD, site config retrieval (for banner
display), export, and per-site consent statistics.
Admin-side site/category/vendor management lives in
``compliance.services.banner_admin_service.BannerAdminService``.
DSR-facing email linking lives in
``compliance.services.banner_dsr_service.BannerDSRService``.
"""
import hashlib
import json
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy.orm import Session
from compliance.db.banner_models import (
BannerCategoryConfigDB,
BannerConsentAuditLogDB,
BannerConsentDB,
BannerSiteConfigDB,
BannerVendorConfigDB,
)
from compliance.domain import NotFoundError, ValidationError
from compliance.services._banner_serializers import (
category_to_dict,
consent_to_dict,
site_config_to_dict,
vendor_to_dict,
)
# Default consent expiration per banner category (days).
# Based on: DSGVO Art. 5(1)(e), CNIL guidelines, EDPB recommendations.
CATEGORY_RETENTION_DAYS = {
"necessary": 365, # Session + functional = max 12 months
"statistics": 790, # Max 26 months (Google Analytics default)
"marketing": 90, # Max 90 days for retargeting
"functional": 365, # Max 12 months
}
class BannerConsentService:
"""Business logic for public SDK banner consent endpoints."""
def __init__(self, db: Session) -> None:
self.db = db
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _hash_ip(ip: Optional[str]) -> Optional[str]:
if not ip:
return None
return hashlib.sha256(ip.encode()).hexdigest()[:16]
def _log(
self,
tenant_id: uuid.UUID,
consent_id: Any,
action: str,
site_id: str,
device_fingerprint: Optional[str] = None,
categories: Optional[list[str]] = None,
ip_hash: Optional[str] = None,
banner_config_hash: Optional[str] = None,
consent_version: Optional[int] = None,
*,
consent_method: Optional[str] = None,
page_url: Optional[str] = None,
) -> None:
entry = BannerConsentAuditLogDB(
tenant_id=tenant_id,
consent_id=consent_id,
action=action,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories or [],
ip_hash=ip_hash,
banner_config_hash=banner_config_hash,
consent_version=consent_version,
consent_method=consent_method,
page_url=page_url,
)
self.db.add(entry)
def _compute_config_hash(self, tenant_id: uuid.UUID, site_id: str) -> tuple[Optional[str], Optional[int]]:
"""Compute SHA256 hash of current site config for consent proof (Art. 7(1) DSGVO)."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(
BannerSiteConfigDB.tenant_id == tenant_id,
BannerSiteConfigDB.site_id == site_id,
)
.first()
)
if not config:
return None, None
snapshot = json.dumps({
"banner_title": config.banner_title,
"banner_description": config.banner_description,
"privacy_url": config.privacy_url,
"imprint_url": config.imprint_url,
}, sort_keys=True)
return hashlib.sha256(snapshot.encode()).hexdigest()[:32], config.config_version
def _get_max_retention(self, tenant_id: uuid.UUID, site_id: str, categories: list[str]) -> int:
"""Determine consent expiration based on accepted categories and vendor retention."""
config = (
self.db.query(BannerSiteConfigDB)
.filter(BannerSiteConfigDB.tenant_id == tenant_id, BannerSiteConfigDB.site_id == site_id)
.first()
)
if not config:
return 365
vendors = (
self.db.query(BannerVendorConfigDB)
.filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.category_key.in_(categories),
BannerVendorConfigDB.is_active,
)
.all()
)
if vendors:
return max(v.retention_days for v in vendors if v.retention_days)
return max((CATEGORY_RETENTION_DAYS.get(c, 365) for c in categories), default=365)
# ------------------------------------------------------------------
# Consent CRUD (public SDK)
# ------------------------------------------------------------------
def record_consent(
self,
tenant_id: str,
site_id: str,
device_fingerprint: str,
categories: list[str],
vendors: list[str],
ip_address: Optional[str],
user_agent: Optional[str],
consent_string: Optional[str],
*,
consent_method: Optional[str] = None,
page_url: Optional[str] = None,
referrer: Optional[str] = None,
device_type: Optional[str] = None,
browser: Optional[str] = None,
os: Optional[str] = None,
screen_resolution: Optional[str] = None,
session_id: Optional[str] = None,
consent_scope: Optional[str] = None,
) -> dict[str, Any]:
"""Upsert a device consent row for (tenant, site, device_fingerprint).
Expiration is derived from the maximum vendor retention for the
accepted categories (Phase 2 — DSGVO Art. 5(1)(e)).
A SHA256 hash of the banner config is stored in the audit log
for consent proof (Phase 6 — Art. 7(1) DSGVO).
"""
tid = uuid.UUID(tenant_id)
ip_hash = self._hash_ip(ip_address)
now = datetime.now(timezone.utc)
retention = self._get_max_retention(tid, site_id, categories)
expires_at = now + timedelta(days=retention)
config_hash, config_ver = self._compute_config_hash(tid, site_id)
# Vendor-agnostische Zusatzfelder
extra = {
"consent_method": consent_method,
"banner_version": config_ver,
"banner_config_hash": config_hash,
"page_url": page_url,
"referrer": referrer,
"device_type": device_type,
"browser": browser,
"os": os,
"screen_resolution": screen_resolution,
"session_id": session_id,
"consent_scope": consent_scope or "domain",
}
existing = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.first()
)
if existing:
existing.categories = categories
existing.vendors = vendors
existing.ip_hash = ip_hash
existing.user_agent = user_agent
existing.consent_string = consent_string
existing.expires_at = expires_at
existing.updated_at = now
for key, val in extra.items():
setattr(existing, key, val)
self.db.flush()
self._log(
tid, existing.id, "consent_updated", site_id, device_fingerprint,
categories, ip_hash, config_hash, config_ver,
consent_method=consent_method, page_url=page_url,
)
self.db.commit()
self.db.refresh(existing)
return consent_to_dict(existing)
consent = BannerConsentDB(
tenant_id=tid,
site_id=site_id,
device_fingerprint=device_fingerprint,
categories=categories,
vendors=vendors,
ip_hash=ip_hash,
user_agent=user_agent,
consent_string=consent_string,
expires_at=expires_at,
**extra,
)
self.db.add(consent)
self.db.flush()
self._log(
tid, consent.id, "consent_given", site_id, device_fingerprint,
categories, ip_hash, config_hash, config_ver,
consent_method=consent_method, page_url=page_url,
)
self.db.commit()
self.db.refresh(consent)
return consent_to_dict(consent)
def get_consent(
self, tenant_id: str, site_id: str, device_fingerprint: str
) -> dict[str, Any]:
"""Return the consent envelope for a device, or has_consent=false."""
tid = uuid.UUID(tenant_id)
consent = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.first()
)
if not consent:
return {"has_consent": False, "consent": None}
return {"has_consent": True, "consent": consent_to_dict(consent)}
def withdraw_consent(self, tenant_id: str, consent_id: str) -> dict[str, Any]:
"""Delete a consent row + audit the withdrawal."""
tid = uuid.UUID(tenant_id)
try:
cid = uuid.UUID(consent_id)
except ValueError as exc:
raise ValidationError("Invalid consent ID") from exc
consent = (
self.db.query(BannerConsentDB)
.filter(BannerConsentDB.id == cid, BannerConsentDB.tenant_id == tid)
.first()
)
if not consent:
raise NotFoundError("Consent not found")
self._log(
tid, cid, "consent_withdrawn", consent.site_id, consent.device_fingerprint,
)
self.db.delete(consent)
self.db.commit()
return {"success": True, "message": "Consent withdrawn"}
# ------------------------------------------------------------------
# Site config retrieval (SDK embed)
# ------------------------------------------------------------------
def get_site_config(self, tenant_id: str, site_id: str) -> dict[str, Any]:
"""Load site config + active categories + active vendors for banner display."""
tid = uuid.UUID(tenant_id)
config = (
self.db.query(BannerSiteConfigDB)
.filter(
BannerSiteConfigDB.tenant_id == tid,
BannerSiteConfigDB.site_id == site_id,
)
.first()
)
if not config:
return {
"site_id": site_id,
"banner_title": "Cookie-Einstellungen",
"banner_description": (
"Wir verwenden Cookies, um Ihnen die bestmoegliche Erfahrung zu bieten."
),
"categories": [],
"vendors": [],
}
categories = (
self.db.query(BannerCategoryConfigDB)
.filter(
BannerCategoryConfigDB.site_config_id == config.id,
BannerCategoryConfigDB.is_active,
)
.order_by(BannerCategoryConfigDB.sort_order)
.all()
)
vendors = (
self.db.query(BannerVendorConfigDB)
.filter(
BannerVendorConfigDB.site_config_id == config.id,
BannerVendorConfigDB.is_active,
)
.all()
)
result = site_config_to_dict(config)
result["categories"] = [category_to_dict(c) for c in categories]
result["vendors"] = [vendor_to_dict(v) for v in vendors]
return result
# ------------------------------------------------------------------
# DSGVO export + stats
# ------------------------------------------------------------------
def export_consent(
self, tenant_id: str, site_id: str, device_fingerprint: str
) -> dict[str, Any]:
"""DSGVO export of all consent + audit rows for a device."""
tid = uuid.UUID(tenant_id)
consents = (
self.db.query(BannerConsentDB)
.filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
BannerConsentDB.device_fingerprint == device_fingerprint,
)
.all()
)
audit = (
self.db.query(BannerConsentAuditLogDB)
.filter(
BannerConsentAuditLogDB.tenant_id == tid,
BannerConsentAuditLogDB.site_id == site_id,
BannerConsentAuditLogDB.device_fingerprint == device_fingerprint,
)
.order_by(BannerConsentAuditLogDB.created_at.desc())
.all()
)
return {
"device_fingerprint": device_fingerprint,
"site_id": site_id,
"consents": [consent_to_dict(c) for c in consents],
"audit_trail": [
{
"id": str(a.id),
"action": a.action,
"categories": a.categories or [],
"created_at": a.created_at.isoformat() if a.created_at else None,
}
for a in audit
],
}
def get_site_stats(self, tenant_id: str, site_id: str) -> dict[str, Any]:
"""Compute consent count + per-category acceptance rates for a site."""
tid = uuid.UUID(tenant_id)
base = self.db.query(BannerConsentDB).filter(
BannerConsentDB.tenant_id == tid,
BannerConsentDB.site_id == site_id,
)
total = base.count()
category_stats: dict[str, int] = {}
for c in base.all():
raw = c.categories or []
if isinstance(raw, str):
try:
import json
raw = json.loads(raw)
except (json.JSONDecodeError, TypeError):
raw = []
cats: list[str] = list(raw) if isinstance(raw, list) else []
for cat in cats:
category_stats[cat] = category_stats.get(cat, 0) + 1
return {
"site_id": site_id,
"total_consents": total,
"category_acceptance": {
cat: {
"count": count,
"rate": round(count / total * 100, 1) if total > 0 else 0,
}
for cat, count in category_stats.items()
},
}
def list_consents(
self, tenant_id: str, site_id: Optional[str] = None,
limit: int = 50, offset: int = 0,
) -> dict[str, Any]:
"""List paginated banner consents with parsed categories."""
import json as _json
tid = uuid.UUID(tenant_id)
base = self.db.query(BannerConsentDB).filter(BannerConsentDB.tenant_id == tid)
if site_id:
base = base.filter(BannerConsentDB.site_id == site_id)
total = base.count()
rows = base.order_by(BannerConsentDB.created_at.desc()).offset(offset).limit(limit).all()
consents = []
for c in rows:
raw_cats = c.categories or []
if isinstance(raw_cats, str):
try:
raw_cats = _json.loads(raw_cats)
except (ValueError, TypeError):
raw_cats = []
raw_vendors = c.vendors or []
if isinstance(raw_vendors, str):
try:
raw_vendors = _json.loads(raw_vendors)
except (ValueError, TypeError):
raw_vendors = []
consents.append({
"id": str(c.id),
"site_id": c.site_id,
"device_fingerprint": c.device_fingerprint,
"categories": list(raw_cats) if isinstance(raw_cats, list) else [],
"vendors": list(raw_vendors) if isinstance(raw_vendors, list) else [],
"ip_hash": c.ip_hash,
"user_agent": c.user_agent,
"linked_email": c.linked_email,
"consent_string": c.consent_string,
"consent_method": c.consent_method,
"banner_version": c.banner_version,
"banner_config_hash": c.banner_config_hash,
"geo_country": c.geo_country,
"geo_region": c.geo_region,
"consent_scope": c.consent_scope,
"page_url": c.page_url,
"referrer": c.referrer,
"device_type": c.device_type,
"browser": c.browser,
"os": c.os,
"screen_resolution": c.screen_resolution,
"session_id": c.session_id,
"expires_at": c.expires_at.isoformat() if c.expires_at else None,
"created_at": c.created_at.isoformat() if c.created_at else None,
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
})
return {"consents": consents, "total": total, "limit": limit, "offset": offset}