Files
breakpilot-compliance/backend-compliance/compliance/services/banner_analytics_service.py
T
Benjamin Admin c3fcfe88ee feat: Vendor-level consent + Consent analytics (F4 + F6)
F4: Granular Vendor-Level Consent
- Migration 113: vendor_consents JSONB on banner_consents + audit_log
- ConsentCreate schema + BannerConsentDB model extended
- banner_consent_service stores vendor_consents alongside categories
- Audit trail includes vendor-level decisions + user_agent

F6: Consent Rate Analytics
- Migration 114: user_agent on audit_log + time-series index
- BannerAnalyticsService: time series, category breakdown, device stats
- banner_analytics_routes: 4 endpoints (overview, time-series, categories, devices)
- AnalyticsDashboard.tsx: KPIs, bar chart, category bars, device breakdown
- New "Analytik" tab in cookie-banner page

[migration-approved]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 20:58:06 +02:00

136 lines
5.3 KiB
Python

"""
Banner consent analytics — time-series, device breakdown, bounce rate.
Reads from BannerConsentAuditLogDB for aggregated analytics.
"""
import re
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy import text
from sqlalchemy.orm import Session
class BannerAnalyticsService:
"""Provides aggregated consent analytics for a site."""
def __init__(self, db: Session) -> None:
self.db = db
def get_time_series(
self,
tenant_id: str,
site_id: str,
period: str = "daily",
days: int = 30,
) -> list[dict[str, Any]]:
"""Opt-in rate per day/week over the last N days."""
trunc = "day" if period == "daily" else "week"
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
q = text(f"""
SELECT DATE_TRUNC(:trunc, created_at) AS period,
COUNT(*) FILTER (WHERE action = 'consent_given') AS given,
COUNT(*) FILTER (WHERE action = 'consent_updated') AS updated,
COUNT(*) FILTER (WHERE action IN ('consent_withdrawn', 'consent_revoked')) AS withdrawn,
COUNT(*) AS total
FROM compliance_banner_consent_audit_log
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
GROUP BY 1 ORDER BY 1
""")
rows = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff, "trunc": trunc}).fetchall()
return [
{
"period": r.period.isoformat() if r.period else None,
"given": r.given,
"updated": r.updated,
"withdrawn": r.withdrawn,
"total": r.total,
"opt_in_rate": round((r.given + r.updated) / r.total * 100, 1) if r.total > 0 else 0,
}
for r in rows
]
def get_category_breakdown(
self,
tenant_id: str,
site_id: str,
days: int = 30,
) -> dict[str, dict[str, int]]:
"""Acceptance count per category."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
q = text("""
SELECT categories FROM compliance_banner_consent_audit_log
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
AND action IN ('consent_given', 'consent_updated')
""")
rows = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff}).fetchall()
counts: dict[str, int] = {}
total = len(rows)
for r in rows:
cats = r.categories if isinstance(r.categories, list) else []
for cat in cats:
counts[cat] = counts.get(cat, 0) + 1
return {
cat: {"count": count, "total": total, "rate": round(count / total * 100, 1) if total > 0 else 0}
for cat, count in sorted(counts.items())
}
def get_device_breakdown(
self,
tenant_id: str,
site_id: str,
days: int = 30,
) -> dict[str, int]:
"""Mobile/Desktop/Tablet classification from user_agent."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
q = text("""
SELECT user_agent FROM compliance_banner_consent_audit_log
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
AND user_agent IS NOT NULL
""")
rows = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff}).fetchall()
result = {"desktop": 0, "mobile": 0, "tablet": 0, "unknown": 0}
mobile_re = re.compile(r"Mobile|Android|iPhone|iPod", re.IGNORECASE)
tablet_re = re.compile(r"iPad|Tablet|PlayBook|Silk", re.IGNORECASE)
for r in rows:
ua = r.user_agent or ""
if tablet_re.search(ua):
result["tablet"] += 1
elif mobile_re.search(ua):
result["mobile"] += 1
elif ua:
result["desktop"] += 1
else:
result["unknown"] += 1
return result
def get_overview_stats(
self,
tenant_id: str,
site_id: str,
days: int = 30,
) -> dict[str, Any]:
"""High-level stats: total consents, active, withdrawn, opt-in rate."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
q = text("""
SELECT
COUNT(*) FILTER (WHERE action = 'consent_given') AS given,
COUNT(*) FILTER (WHERE action = 'consent_updated') AS updated,
COUNT(*) FILTER (WHERE action IN ('consent_withdrawn', 'consent_revoked')) AS withdrawn,
COUNT(*) AS total
FROM compliance_banner_consent_audit_log
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
""")
r = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff}).fetchone()
total = r.total if r else 0
given = (r.given or 0) + (r.updated or 0) if r else 0
return {
"period_days": days,
"total_interactions": total,
"consents_given": r.given if r else 0,
"consents_updated": r.updated if r else 0,
"consents_withdrawn": r.withdrawn if r else 0,
"opt_in_rate": round(given / total * 100, 1) if total > 0 else 0,
}