44acd68c96
Phase 1: Vendor sync from service registry (82+ services → banner vendors) Phase 2: Category-based retention (marketing=90d, statistics=790d, not hardcoded 365d) Phase 3: DSR ↔ Banner email linking (link-email, by-email, Art.17 erasure, Art.15/20 export) Phase 4: Consent sync (Banner → Einwilligungen bridge) Phase 6: Consent proof (SHA256 config hash + config_version in audit log, Art. 7(1) DSGVO) New files: - banner_dsr_service.py — email linking + DSR integration - vendor_banner_sync.py — service registry → vendor configs - migration 106 — linked_email, banner_config_hash, consent_version columns Tests: 20+ new backend tests + 2 Playwright E2E test suites (API + UI) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
537 lines
20 KiB
Python
537 lines
20 KiB
Python
"""
|
|
Tests for Banner Consent routes — device-based cookie consents.
|
|
"""
|
|
|
|
import uuid
|
|
import os
|
|
import sys
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from classroom_engine.database import Base, get_db
|
|
from compliance.db.banner_models import (
|
|
BannerConsentDB, BannerConsentAuditLogDB,
|
|
BannerSiteConfigDB, BannerCategoryConfigDB, BannerVendorConfigDB,
|
|
)
|
|
from compliance.api.banner_routes import router as banner_router
|
|
|
|
SQLALCHEMY_DATABASE_URL = "sqlite:///./test_banner.db"
|
|
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
|
|
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
HEADERS = {"X-Tenant-ID": TENANT_ID}
|
|
|
|
app = FastAPI()
|
|
app.include_router(banner_router, prefix="/api/compliance")
|
|
|
|
|
|
def override_get_db():
|
|
db = TestingSessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
client = TestClient(app)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db():
|
|
Base.metadata.create_all(bind=engine)
|
|
yield
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
# =============================================================================
|
|
# Helpers
|
|
# =============================================================================
|
|
|
|
def _create_site(site_id="example.com"):
|
|
r = client.post("/api/compliance/banner/admin/sites", json={
|
|
"site_id": site_id,
|
|
"site_name": "Example",
|
|
"banner_title": "Cookies",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200, r.text
|
|
return r.json()
|
|
|
|
|
|
def _record_consent(site_id="example.com", fingerprint="fp-123", categories=None):
|
|
r = client.post("/api/compliance/banner/consent", json={
|
|
"site_id": site_id,
|
|
"device_fingerprint": fingerprint,
|
|
"categories": categories or ["necessary"],
|
|
"ip_address": "1.2.3.4",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200, r.text
|
|
return r.json()
|
|
|
|
|
|
# =============================================================================
|
|
# Public Consent Endpoints
|
|
# =============================================================================
|
|
|
|
class TestRecordConsent:
|
|
def test_record_consent(self):
|
|
c = _record_consent()
|
|
assert c["site_id"] == "example.com"
|
|
assert c["device_fingerprint"] == "fp-123"
|
|
assert c["categories"] == ["necessary"]
|
|
assert c["ip_hash"] is not None
|
|
assert c["expires_at"] is not None
|
|
|
|
def test_upsert_consent(self):
|
|
c1 = _record_consent(categories=["necessary"])
|
|
c2 = _record_consent(categories=["necessary", "analytics"])
|
|
assert c1["id"] == c2["id"]
|
|
assert c2["categories"] == ["necessary", "analytics"]
|
|
|
|
def test_different_devices(self):
|
|
c1 = _record_consent(fingerprint="device-A")
|
|
c2 = _record_consent(fingerprint="device-B")
|
|
assert c1["id"] != c2["id"]
|
|
|
|
|
|
class TestGetConsent:
|
|
def test_get_existing(self):
|
|
_record_consent()
|
|
r = client.get("/api/compliance/banner/consent?site_id=example.com&device_fingerprint=fp-123", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["has_consent"] is True
|
|
|
|
def test_get_nonexistent(self):
|
|
r = client.get("/api/compliance/banner/consent?site_id=example.com&device_fingerprint=unknown", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["has_consent"] is False
|
|
|
|
|
|
class TestWithdrawConsent:
|
|
def test_withdraw(self):
|
|
c = _record_consent()
|
|
r = client.delete(f"/api/compliance/banner/consent/{c['id']}", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["success"] is True
|
|
|
|
# Verify gone
|
|
r2 = client.get("/api/compliance/banner/consent?site_id=example.com&device_fingerprint=fp-123", headers=HEADERS)
|
|
assert r2.json()["has_consent"] is False
|
|
|
|
def test_withdraw_not_found(self):
|
|
r = client.delete(f"/api/compliance/banner/consent/{uuid.uuid4()}", headers=HEADERS)
|
|
assert r.status_code == 404
|
|
|
|
|
|
class TestExportConsent:
|
|
def test_export(self):
|
|
_record_consent()
|
|
r = client.get(
|
|
"/api/compliance/banner/consent/export?site_id=example.com&device_fingerprint=fp-123",
|
|
headers=HEADERS,
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert len(data["consents"]) == 1
|
|
assert len(data["audit_trail"]) >= 1
|
|
assert data["audit_trail"][0]["action"] == "consent_given"
|
|
|
|
|
|
# =============================================================================
|
|
# Site Config Admin
|
|
# =============================================================================
|
|
|
|
class TestSiteConfig:
|
|
def test_create_site(self):
|
|
s = _create_site()
|
|
assert s["site_id"] == "example.com"
|
|
assert s["banner_title"] == "Cookies"
|
|
|
|
def test_create_duplicate(self):
|
|
_create_site()
|
|
r = client.post("/api/compliance/banner/admin/sites", json={
|
|
"site_id": "example.com",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 409
|
|
|
|
def test_list_sites(self):
|
|
_create_site("site-a.com")
|
|
_create_site("site-b.com")
|
|
r = client.get("/api/compliance/banner/admin/sites", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert len(r.json()) == 2
|
|
|
|
def test_update_site(self):
|
|
_create_site()
|
|
r = client.put("/api/compliance/banner/admin/sites/example.com", json={
|
|
"banner_title": "Neue Cookies",
|
|
"dsb_name": "Max DSB",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["banner_title"] == "Neue Cookies"
|
|
assert r.json()["dsb_name"] == "Max DSB"
|
|
|
|
def test_update_not_found(self):
|
|
r = client.put("/api/compliance/banner/admin/sites/nonexistent.com", json={
|
|
"banner_title": "X",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 404
|
|
|
|
def test_delete_site(self):
|
|
_create_site()
|
|
r = client.delete("/api/compliance/banner/admin/sites/example.com", headers=HEADERS)
|
|
assert r.status_code == 204
|
|
|
|
def test_get_config_default(self):
|
|
r = client.get("/api/compliance/banner/config/unknown-site", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["banner_title"] == "Cookie-Einstellungen"
|
|
assert r.json()["categories"] == []
|
|
|
|
def test_get_config_with_categories(self):
|
|
_create_site()
|
|
client.post("/api/compliance/banner/admin/sites/example.com/categories", json={
|
|
"category_key": "necessary",
|
|
"name_de": "Notwendig",
|
|
"is_required": True,
|
|
}, headers=HEADERS)
|
|
|
|
r = client.get("/api/compliance/banner/config/example.com", headers=HEADERS)
|
|
data = r.json()
|
|
assert len(data["categories"]) == 1
|
|
assert data["categories"][0]["category_key"] == "necessary"
|
|
|
|
|
|
# =============================================================================
|
|
# Categories Admin
|
|
# =============================================================================
|
|
|
|
class TestCategories:
|
|
def test_create_category(self):
|
|
_create_site()
|
|
r = client.post("/api/compliance/banner/admin/sites/example.com/categories", json={
|
|
"category_key": "analytics",
|
|
"name_de": "Analyse",
|
|
"name_en": "Analytics",
|
|
"sort_order": 20,
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["category_key"] == "analytics"
|
|
assert r.json()["name_de"] == "Analyse"
|
|
|
|
def test_list_categories(self):
|
|
_create_site()
|
|
client.post("/api/compliance/banner/admin/sites/example.com/categories", json={
|
|
"category_key": "marketing", "name_de": "Marketing", "sort_order": 30,
|
|
}, headers=HEADERS)
|
|
client.post("/api/compliance/banner/admin/sites/example.com/categories", json={
|
|
"category_key": "necessary", "name_de": "Notwendig", "sort_order": 0, "is_required": True,
|
|
}, headers=HEADERS)
|
|
|
|
r = client.get("/api/compliance/banner/admin/sites/example.com/categories", headers=HEADERS)
|
|
data = r.json()
|
|
assert len(data) == 2
|
|
assert data[0]["category_key"] == "necessary" # sorted by sort_order
|
|
|
|
def test_delete_category(self):
|
|
_create_site()
|
|
cr = client.post("/api/compliance/banner/admin/sites/example.com/categories", json={
|
|
"category_key": "temp", "name_de": "Temp",
|
|
}, headers=HEADERS)
|
|
cat_id = cr.json()["id"]
|
|
r = client.delete(f"/api/compliance/banner/admin/categories/{cat_id}")
|
|
assert r.status_code == 204
|
|
|
|
def test_site_not_found(self):
|
|
r = client.post("/api/compliance/banner/admin/sites/nonexistent/categories", json={
|
|
"category_key": "x", "name_de": "X",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Vendors Admin
|
|
# =============================================================================
|
|
|
|
class TestVendors:
|
|
def test_create_vendor(self):
|
|
_create_site()
|
|
r = client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={
|
|
"vendor_name": "Google Analytics",
|
|
"category_key": "analytics",
|
|
"cookie_names": ["_ga", "_gid"],
|
|
"retention_days": 730,
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["vendor_name"] == "Google Analytics"
|
|
assert r.json()["cookie_names"] == ["_ga", "_gid"]
|
|
|
|
def test_list_vendors(self):
|
|
_create_site()
|
|
client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={
|
|
"vendor_name": "GA", "category_key": "analytics",
|
|
}, headers=HEADERS)
|
|
r = client.get("/api/compliance/banner/admin/sites/example.com/vendors", headers=HEADERS)
|
|
assert len(r.json()) == 1
|
|
|
|
def test_delete_vendor(self):
|
|
_create_site()
|
|
cr = client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={
|
|
"vendor_name": "Temp", "category_key": "analytics",
|
|
}, headers=HEADERS)
|
|
vid = cr.json()["id"]
|
|
r = client.delete(f"/api/compliance/banner/admin/vendors/{vid}")
|
|
assert r.status_code == 204
|
|
|
|
|
|
# =============================================================================
|
|
# Stats
|
|
# =============================================================================
|
|
|
|
class TestStats:
|
|
def test_stats_empty(self):
|
|
r = client.get("/api/compliance/banner/admin/stats/example.com", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["total_consents"] == 0
|
|
|
|
def test_stats_with_data(self):
|
|
_record_consent(fingerprint="d1", categories=["necessary", "analytics"])
|
|
_record_consent(fingerprint="d2", categories=["necessary"])
|
|
_record_consent(fingerprint="d3", categories=["necessary", "analytics", "marketing"])
|
|
|
|
r = client.get("/api/compliance/banner/admin/stats/example.com", headers=HEADERS)
|
|
data = r.json()
|
|
assert data["total_consents"] == 3
|
|
assert data["category_acceptance"]["necessary"]["count"] == 3
|
|
assert data["category_acceptance"]["analytics"]["count"] == 2
|
|
assert data["category_acceptance"]["marketing"]["count"] == 1
|
|
|
|
|
|
# =============================================================================
|
|
# Phase 3: Email Linking
|
|
# =============================================================================
|
|
|
|
class TestEmailLinking:
|
|
def test_link_email(self):
|
|
_record_consent()
|
|
r = client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "fp-123",
|
|
"email": "user@example.com",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["linked_email"] == "user@example.com"
|
|
|
|
def test_link_email_normalizes(self):
|
|
_record_consent()
|
|
r = client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "fp-123",
|
|
"email": " User@Example.COM ",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["linked_email"] == "user@example.com"
|
|
|
|
def test_link_email_invalid(self):
|
|
_record_consent()
|
|
r = client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "fp-123",
|
|
"email": "not-an-email",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 400
|
|
|
|
def test_link_email_no_consent(self):
|
|
r = client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "nonexistent",
|
|
"email": "user@example.com",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 404
|
|
|
|
def test_get_consents_by_email(self):
|
|
_record_consent(fingerprint="dev-a")
|
|
_record_consent(fingerprint="dev-b")
|
|
# Link both to same email
|
|
for fp in ["dev-a", "dev-b"]:
|
|
client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": fp,
|
|
"email": "multi@example.com",
|
|
}, headers=HEADERS)
|
|
|
|
r = client.get("/api/compliance/banner/consent/by-email/multi@example.com", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert len(r.json()) == 2
|
|
|
|
def test_get_consents_by_email_empty(self):
|
|
r = client.get("/api/compliance/banner/consent/by-email/nobody@nowhere.com", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json() == []
|
|
|
|
def test_delete_consents_by_email_art17(self):
|
|
_record_consent(fingerprint="del-a")
|
|
_record_consent(fingerprint="del-b")
|
|
for fp in ["del-a", "del-b"]:
|
|
client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": fp,
|
|
"email": "erasure@example.com",
|
|
}, headers=HEADERS)
|
|
|
|
r = client.delete("/api/compliance/banner/consent/by-email/erasure@example.com", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["deleted"] == 2
|
|
assert data["email"] == "erasure@example.com"
|
|
|
|
# Verify gone
|
|
r2 = client.get("/api/compliance/banner/consent/by-email/erasure@example.com", headers=HEADERS)
|
|
assert r2.json() == []
|
|
|
|
def test_dsr_export_by_email(self):
|
|
_record_consent(fingerprint="exp-1")
|
|
client.post("/api/compliance/banner/consent/link-email", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "exp-1",
|
|
"email": "export@example.com",
|
|
}, headers=HEADERS)
|
|
|
|
r = client.get("/api/compliance/banner/consent/dsr-export/export@example.com", headers=HEADERS)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["email"] == "export@example.com"
|
|
assert len(data["banner_consents"]) == 1
|
|
assert len(data["audit_trail"]) >= 1
|
|
|
|
|
|
# =============================================================================
|
|
# Phase 4: Consent Sync (Banner → Einwilligungen)
|
|
# =============================================================================
|
|
|
|
class TestConsentSync:
|
|
def test_sync_consent(self):
|
|
_record_consent(categories=["necessary", "analytics"])
|
|
r = client.post("/api/compliance/banner/consent/sync", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "fp-123",
|
|
"email": "sync@example.com",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["synced"] >= 1
|
|
assert "necessary" in data["categories"]
|
|
assert data["email"] == "sync@example.com"
|
|
|
|
def test_sync_consent_links_email(self):
|
|
_record_consent()
|
|
# Sync should auto-link email
|
|
client.post("/api/compliance/banner/consent/sync", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "fp-123",
|
|
"email": "autolink@example.com",
|
|
}, headers=HEADERS)
|
|
|
|
r = client.get("/api/compliance/banner/consent/by-email/autolink@example.com", headers=HEADERS)
|
|
assert len(r.json()) == 1
|
|
|
|
def test_sync_no_consent(self):
|
|
r = client.post("/api/compliance/banner/consent/sync", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "nonexistent",
|
|
"email": "test@example.com",
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 404
|
|
|
|
|
|
# =============================================================================
|
|
# Phase 2: Retention per Category
|
|
# =============================================================================
|
|
|
|
class TestRetention:
|
|
def test_retention_uses_vendor_max(self):
|
|
"""Consent expiry should use max vendor retention, not hardcoded 365."""
|
|
_create_site()
|
|
# Add marketing vendor with 90 days retention
|
|
client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={
|
|
"vendor_name": "FB Pixel",
|
|
"category_key": "marketing",
|
|
"retention_days": 90,
|
|
}, headers=HEADERS)
|
|
|
|
c = _record_consent(categories=["necessary", "marketing"])
|
|
# Consent should expire in 90 days (max of vendor retentions)
|
|
from datetime import datetime
|
|
created = datetime.fromisoformat(c["created_at"])
|
|
expires = datetime.fromisoformat(c["expires_at"])
|
|
diff_days = (expires - created).days
|
|
assert 89 <= diff_days <= 91, f"Expected ~90 days, got {diff_days}"
|
|
|
|
def test_retention_default_365(self):
|
|
"""Without vendor config, should use category default."""
|
|
c = _record_consent(categories=["necessary"])
|
|
from datetime import datetime
|
|
created = datetime.fromisoformat(c["created_at"])
|
|
expires = datetime.fromisoformat(c["expires_at"])
|
|
diff_days = (expires - created).days
|
|
assert 364 <= diff_days <= 366, f"Expected ~365 days, got {diff_days}"
|
|
|
|
|
|
# =============================================================================
|
|
# Phase 6: Consent Proof (Art. 7(1) DSGVO)
|
|
# =============================================================================
|
|
|
|
class TestConsentProof:
|
|
def test_consent_has_linked_email_field(self):
|
|
"""New consent should include linked_email in response."""
|
|
c = _record_consent()
|
|
assert "linked_email" in c
|
|
assert c["linked_email"] is None # Not linked yet
|
|
|
|
def test_site_config_has_version(self):
|
|
"""Site config should have config_version field."""
|
|
s = _create_site()
|
|
assert "config_version" in s
|
|
assert s["config_version"] == 1
|
|
|
|
def test_audit_trail_after_consent(self):
|
|
"""Audit trail should exist after recording consent."""
|
|
_record_consent()
|
|
r = client.get(
|
|
"/api/compliance/banner/consent/export?site_id=example.com&device_fingerprint=fp-123",
|
|
headers=HEADERS,
|
|
)
|
|
data = r.json()
|
|
assert len(data["audit_trail"]) >= 1
|
|
audit = data["audit_trail"][0]
|
|
assert audit["action"] in ("consent_given", "consent_updated")
|
|
|
|
|
|
# =============================================================================
|
|
# IP Hashing
|
|
# =============================================================================
|
|
|
|
class TestIPHashing:
|
|
def test_ip_is_hashed(self):
|
|
"""IP address should never be stored plain — only SHA256[:16]."""
|
|
c = _record_consent()
|
|
assert c["ip_hash"] is not None
|
|
assert c["ip_hash"] != "1.2.3.4"
|
|
assert len(c["ip_hash"]) == 16
|
|
|
|
def test_no_ip_returns_none(self):
|
|
r = client.post("/api/compliance/banner/consent", json={
|
|
"site_id": "example.com",
|
|
"device_fingerprint": "fp-no-ip",
|
|
"categories": ["necessary"],
|
|
}, headers=HEADERS)
|
|
assert r.status_code == 200
|
|
assert r.json()["ip_hash"] is None
|