""" Tests for Banner Consent routes — device-based cookie consents. """ import uuid import os import sys import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from classroom_engine.database import Base, get_db from compliance.db.banner_models import ( BannerConsentDB, BannerConsentAuditLogDB, BannerSiteConfigDB, BannerCategoryConfigDB, BannerVendorConfigDB, ) from compliance.api.banner_routes import router as banner_router SQLALCHEMY_DATABASE_URL = "sqlite:///./test_banner.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" HEADERS = {"X-Tenant-ID": TENANT_ID} app = FastAPI() app.include_router(banner_router, prefix="/api/compliance") def override_get_db(): db = TestingSessionLocal() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_get_db client = TestClient(app) @pytest.fixture(autouse=True) def setup_db(): Base.metadata.create_all(bind=engine) yield Base.metadata.drop_all(bind=engine) # ============================================================================= # Helpers # ============================================================================= def _create_site(site_id="example.com"): r = client.post("/api/compliance/banner/admin/sites", json={ "site_id": site_id, "site_name": "Example", "banner_title": "Cookies", }, headers=HEADERS) assert r.status_code == 200, r.text return r.json() def _record_consent(site_id="example.com", fingerprint="fp-123", categories=None): r = client.post("/api/compliance/banner/consent", json={ "site_id": site_id, "device_fingerprint": fingerprint, "categories": categories or ["necessary"], "ip_address": "1.2.3.4", }, headers=HEADERS) assert r.status_code == 200, r.text return r.json() # ============================================================================= # Public Consent Endpoints # ============================================================================= class TestRecordConsent: def test_record_consent(self): c = _record_consent() assert c["site_id"] == "example.com" assert c["device_fingerprint"] == "fp-123" assert c["categories"] == ["necessary"] assert c["ip_hash"] is not None assert c["expires_at"] is not None def test_upsert_consent(self): c1 = _record_consent(categories=["necessary"]) c2 = _record_consent(categories=["necessary", "analytics"]) assert c1["id"] == c2["id"] assert c2["categories"] == ["necessary", "analytics"] def test_different_devices(self): c1 = _record_consent(fingerprint="device-A") c2 = _record_consent(fingerprint="device-B") assert c1["id"] != c2["id"] class TestGetConsent: def test_get_existing(self): _record_consent() r = client.get("/api/compliance/banner/consent?site_id=example.com&device_fingerprint=fp-123", headers=HEADERS) assert r.status_code == 200 assert r.json()["has_consent"] is True def test_get_nonexistent(self): r = client.get("/api/compliance/banner/consent?site_id=example.com&device_fingerprint=unknown", headers=HEADERS) assert r.status_code == 200 assert r.json()["has_consent"] is False class TestWithdrawConsent: def test_withdraw(self): c = _record_consent() r = client.delete(f"/api/compliance/banner/consent/{c['id']}", headers=HEADERS) assert r.status_code == 200 assert r.json()["success"] is True # Verify gone r2 = client.get("/api/compliance/banner/consent?site_id=example.com&device_fingerprint=fp-123", headers=HEADERS) assert r2.json()["has_consent"] is False def test_withdraw_not_found(self): r = client.delete(f"/api/compliance/banner/consent/{uuid.uuid4()}", headers=HEADERS) assert r.status_code == 404 class TestExportConsent: def test_export(self): _record_consent() r = client.get( "/api/compliance/banner/consent/export?site_id=example.com&device_fingerprint=fp-123", headers=HEADERS, ) assert r.status_code == 200 data = r.json() assert len(data["consents"]) == 1 assert len(data["audit_trail"]) >= 1 assert data["audit_trail"][0]["action"] == "consent_given" # ============================================================================= # Site Config Admin # ============================================================================= class TestSiteConfig: def test_create_site(self): s = _create_site() assert s["site_id"] == "example.com" assert s["banner_title"] == "Cookies" def test_create_duplicate(self): _create_site() r = client.post("/api/compliance/banner/admin/sites", json={ "site_id": "example.com", }, headers=HEADERS) assert r.status_code == 409 def test_list_sites(self): _create_site("site-a.com") _create_site("site-b.com") r = client.get("/api/compliance/banner/admin/sites", headers=HEADERS) assert r.status_code == 200 assert len(r.json()) == 2 def test_update_site(self): _create_site() r = client.put("/api/compliance/banner/admin/sites/example.com", json={ "banner_title": "Neue Cookies", "dsb_name": "Max DSB", }, headers=HEADERS) assert r.status_code == 200 assert r.json()["banner_title"] == "Neue Cookies" assert r.json()["dsb_name"] == "Max DSB" def test_update_not_found(self): r = client.put("/api/compliance/banner/admin/sites/nonexistent.com", json={ "banner_title": "X", }, headers=HEADERS) assert r.status_code == 404 def test_delete_site(self): _create_site() r = client.delete("/api/compliance/banner/admin/sites/example.com", headers=HEADERS) assert r.status_code == 204 def test_get_config_default(self): r = client.get("/api/compliance/banner/config/unknown-site", headers=HEADERS) assert r.status_code == 200 assert r.json()["banner_title"] == "Cookie-Einstellungen" assert r.json()["categories"] == [] def test_get_config_with_categories(self): _create_site() client.post("/api/compliance/banner/admin/sites/example.com/categories", json={ "category_key": "necessary", "name_de": "Notwendig", "is_required": True, }, headers=HEADERS) r = client.get("/api/compliance/banner/config/example.com", headers=HEADERS) data = r.json() assert len(data["categories"]) == 1 assert data["categories"][0]["category_key"] == "necessary" # ============================================================================= # Categories Admin # ============================================================================= class TestCategories: def test_create_category(self): _create_site() r = client.post("/api/compliance/banner/admin/sites/example.com/categories", json={ "category_key": "analytics", "name_de": "Analyse", "name_en": "Analytics", "sort_order": 20, }, headers=HEADERS) assert r.status_code == 200 assert r.json()["category_key"] == "analytics" assert r.json()["name_de"] == "Analyse" def test_list_categories(self): _create_site() client.post("/api/compliance/banner/admin/sites/example.com/categories", json={ "category_key": "marketing", "name_de": "Marketing", "sort_order": 30, }, headers=HEADERS) client.post("/api/compliance/banner/admin/sites/example.com/categories", json={ "category_key": "necessary", "name_de": "Notwendig", "sort_order": 0, "is_required": True, }, headers=HEADERS) r = client.get("/api/compliance/banner/admin/sites/example.com/categories", headers=HEADERS) data = r.json() assert len(data) == 2 assert data[0]["category_key"] == "necessary" # sorted by sort_order def test_delete_category(self): _create_site() cr = client.post("/api/compliance/banner/admin/sites/example.com/categories", json={ "category_key": "temp", "name_de": "Temp", }, headers=HEADERS) cat_id = cr.json()["id"] r = client.delete(f"/api/compliance/banner/admin/categories/{cat_id}") assert r.status_code == 204 def test_site_not_found(self): r = client.post("/api/compliance/banner/admin/sites/nonexistent/categories", json={ "category_key": "x", "name_de": "X", }, headers=HEADERS) assert r.status_code == 404 # ============================================================================= # Vendors Admin # ============================================================================= class TestVendors: def test_create_vendor(self): _create_site() r = client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={ "vendor_name": "Google Analytics", "category_key": "analytics", "cookie_names": ["_ga", "_gid"], "retention_days": 730, }, headers=HEADERS) assert r.status_code == 200 assert r.json()["vendor_name"] == "Google Analytics" assert r.json()["cookie_names"] == ["_ga", "_gid"] def test_list_vendors(self): _create_site() client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={ "vendor_name": "GA", "category_key": "analytics", }, headers=HEADERS) r = client.get("/api/compliance/banner/admin/sites/example.com/vendors", headers=HEADERS) assert len(r.json()) == 1 def test_delete_vendor(self): _create_site() cr = client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={ "vendor_name": "Temp", "category_key": "analytics", }, headers=HEADERS) vid = cr.json()["id"] r = client.delete(f"/api/compliance/banner/admin/vendors/{vid}") assert r.status_code == 204 # ============================================================================= # Stats # ============================================================================= class TestStats: def test_stats_empty(self): r = client.get("/api/compliance/banner/admin/stats/example.com", headers=HEADERS) assert r.status_code == 200 assert r.json()["total_consents"] == 0 def test_stats_with_data(self): _record_consent(fingerprint="d1", categories=["necessary", "analytics"]) _record_consent(fingerprint="d2", categories=["necessary"]) _record_consent(fingerprint="d3", categories=["necessary", "analytics", "marketing"]) r = client.get("/api/compliance/banner/admin/stats/example.com", headers=HEADERS) data = r.json() assert data["total_consents"] == 3 assert data["category_acceptance"]["necessary"]["count"] == 3 assert data["category_acceptance"]["analytics"]["count"] == 2 assert data["category_acceptance"]["marketing"]["count"] == 1 # ============================================================================= # Phase 3: Email Linking # ============================================================================= class TestEmailLinking: def test_link_email(self): _record_consent() r = client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": "fp-123", "email": "user@example.com", }, headers=HEADERS) assert r.status_code == 200 assert r.json()["linked_email"] == "user@example.com" def test_link_email_normalizes(self): _record_consent() r = client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": "fp-123", "email": " User@Example.COM ", }, headers=HEADERS) assert r.status_code == 200 assert r.json()["linked_email"] == "user@example.com" def test_link_email_invalid(self): _record_consent() r = client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": "fp-123", "email": "not-an-email", }, headers=HEADERS) assert r.status_code == 400 def test_link_email_no_consent(self): r = client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": "nonexistent", "email": "user@example.com", }, headers=HEADERS) assert r.status_code == 404 def test_get_consents_by_email(self): _record_consent(fingerprint="dev-a") _record_consent(fingerprint="dev-b") # Link both to same email for fp in ["dev-a", "dev-b"]: client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": fp, "email": "multi@example.com", }, headers=HEADERS) r = client.get("/api/compliance/banner/consent/by-email/multi@example.com", headers=HEADERS) assert r.status_code == 200 assert len(r.json()) == 2 def test_get_consents_by_email_empty(self): r = client.get("/api/compliance/banner/consent/by-email/nobody@nowhere.com", headers=HEADERS) assert r.status_code == 200 assert r.json() == [] def test_delete_consents_by_email_art17(self): _record_consent(fingerprint="del-a") _record_consent(fingerprint="del-b") for fp in ["del-a", "del-b"]: client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": fp, "email": "erasure@example.com", }, headers=HEADERS) r = client.delete("/api/compliance/banner/consent/by-email/erasure@example.com", headers=HEADERS) assert r.status_code == 200 data = r.json() assert data["deleted"] == 2 assert data["email"] == "erasure@example.com" # Verify gone r2 = client.get("/api/compliance/banner/consent/by-email/erasure@example.com", headers=HEADERS) assert r2.json() == [] def test_dsr_export_by_email(self): _record_consent(fingerprint="exp-1") client.post("/api/compliance/banner/consent/link-email", json={ "site_id": "example.com", "device_fingerprint": "exp-1", "email": "export@example.com", }, headers=HEADERS) r = client.get("/api/compliance/banner/consent/dsr-export/export@example.com", headers=HEADERS) assert r.status_code == 200 data = r.json() assert data["email"] == "export@example.com" assert len(data["banner_consents"]) == 1 assert len(data["audit_trail"]) >= 1 # ============================================================================= # Phase 4: Consent Sync (Banner → Einwilligungen) # ============================================================================= class TestConsentSync: def test_sync_consent(self): _record_consent(categories=["necessary", "analytics"]) r = client.post("/api/compliance/banner/consent/sync", json={ "site_id": "example.com", "device_fingerprint": "fp-123", "email": "sync@example.com", }, headers=HEADERS) assert r.status_code == 200 data = r.json() assert data["synced"] >= 1 assert "necessary" in data["categories"] assert data["email"] == "sync@example.com" def test_sync_consent_links_email(self): _record_consent() # Sync should auto-link email client.post("/api/compliance/banner/consent/sync", json={ "site_id": "example.com", "device_fingerprint": "fp-123", "email": "autolink@example.com", }, headers=HEADERS) r = client.get("/api/compliance/banner/consent/by-email/autolink@example.com", headers=HEADERS) assert len(r.json()) == 1 def test_sync_no_consent(self): r = client.post("/api/compliance/banner/consent/sync", json={ "site_id": "example.com", "device_fingerprint": "nonexistent", "email": "test@example.com", }, headers=HEADERS) assert r.status_code == 404 # ============================================================================= # Phase 2: Retention per Category # ============================================================================= class TestRetention: def test_retention_uses_vendor_max(self): """Consent expiry should use max vendor retention, not hardcoded 365.""" _create_site() # Add marketing vendor with 90 days retention client.post("/api/compliance/banner/admin/sites/example.com/vendors", json={ "vendor_name": "FB Pixel", "category_key": "marketing", "retention_days": 90, }, headers=HEADERS) c = _record_consent(categories=["necessary", "marketing"]) # Consent should expire in 90 days (max of vendor retentions) from datetime import datetime created = datetime.fromisoformat(c["created_at"]) expires = datetime.fromisoformat(c["expires_at"]) diff_days = (expires - created).days assert 89 <= diff_days <= 91, f"Expected ~90 days, got {diff_days}" def test_retention_default_365(self): """Without vendor config, should use category default.""" c = _record_consent(categories=["necessary"]) from datetime import datetime created = datetime.fromisoformat(c["created_at"]) expires = datetime.fromisoformat(c["expires_at"]) diff_days = (expires - created).days assert 364 <= diff_days <= 366, f"Expected ~365 days, got {diff_days}" # ============================================================================= # Phase 6: Consent Proof (Art. 7(1) DSGVO) # ============================================================================= class TestConsentProof: def test_consent_has_linked_email_field(self): """New consent should include linked_email in response.""" c = _record_consent() assert "linked_email" in c assert c["linked_email"] is None # Not linked yet def test_site_config_has_version(self): """Site config should have config_version field.""" s = _create_site() assert "config_version" in s assert s["config_version"] == 1 def test_audit_trail_after_consent(self): """Audit trail should exist after recording consent.""" _record_consent() r = client.get( "/api/compliance/banner/consent/export?site_id=example.com&device_fingerprint=fp-123", headers=HEADERS, ) data = r.json() assert len(data["audit_trail"]) >= 1 audit = data["audit_trail"][0] assert audit["action"] in ("consent_given", "consent_updated") # ============================================================================= # IP Hashing # ============================================================================= class TestIPHashing: def test_ip_is_hashed(self): """IP address should never be stored plain — only SHA256[:16].""" c = _record_consent() assert c["ip_hash"] is not None assert c["ip_hash"] != "1.2.3.4" assert len(c["ip_hash"]) == 16 def test_no_ip_returns_none(self): r = client.post("/api/compliance/banner/consent", json={ "site_id": "example.com", "device_fingerprint": "fp-no-ip", "categories": ["necessary"], }, headers=HEADERS) assert r.status_code == 200 assert r.json()["ip_hash"] is None