662327e8b4
CI / nodejs-build (push) Successful in 2m47s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
Massiv-Update auf Basis BMW-Test-Iterationen (v1→v9): Core Compliance-Check - Sonnet check_type Klassifikation: text/process/review fuer alle 1874 MCs in compliance.doc_check_controls (script + Sidecar /data/mc_classification.db). rag_document_checker filtert auf check_type='text' fuer doc_check. Plus fits_doc_type-Audit (v2) + ui_only-Audit fuer DSA/E-Commerce-MCs in falscher doc_type-Schublade. - scope_requires-Filter: biometric/ai_decision/child_targeting MCs werden per business_profile gefiltert (FRT skipped fuer BMW etc.). - Embedding-Match (BGE-M3) als Phase-3 nach Regex-Match: Per-doc_type-Threshold-Override (impressum 0.50, dse/cookie 0.60), Short-Field-Rescue (15-Wort-Chunks) fuer Pflichtfelder im Impressum. Title+check_question als Embedding-Input fuer mehr Kontext. - Cookie-Text-Routing: consent-tester gibt cmp_cookie_text aus dem CMP-Reconstruct zurueck, Backend bevorzugt das gegen DOM-Extraction wenn richer (BMW 1824 vs 600 Worte). Vendor-Redundanz + EU-Alternativen + Cost-Saving - vendor_redundancy.analyze() — funktionale Kategorisierung der CMP-Vendors, Detektion von Mehrfach-Anbietern pro Kategorie, EU-Alternative-Lookup (Matomo, IONOS, HERE, Friendly Captcha, Smart AdServer, ...). - vendor_cost_estimator: Tier-Inferenz aus Cookie-Footprint (Cookie-Anzahl + Premium-Feature-Cookies + Third-Party-Quote → starter/professional/ enterprise/premier). - Self-Service-Werbung (Google/Meta/Pinterest/...) = 0 Lizenz-Kosten (nur Media-Spend, separat). DSP-Plattformen behalten enge Range. - Tier-aware Saving-Range: bei Enterprise/Premier nutzen wir den oberen 40-100%-Band der Listpreise, nicht starter→premier. - Multi-Function-Tools (Matomo Pro, SAP CX, IONOS Cloud, Userlike, Smart AdServer, HERE Maps, Vimeo Pro, LamaPoll) — ein Tool ersetzt mehrere Kategorien gleichzeitig. Cookie-Wissens-DB + Funktionale Klassifikation - cookie_knowledge_db: 50 kuratierte Top-Cookies (Google/Meta/Adobe/MS/...) mit vendor, exact_purpose, data_collected, IAB-TCF-IDs, reid_risk, schrems_ii_status, EuGH-Urteile, EU-Alternative. - cookie_function_classifier: pro Cookie funktionale Rolle (tracking_id, ad_pixel, session_id, ab_test, csrf, ...) + blocking_impact. Country-Inferenz aus Rechtsform - cookie_link_validator: Country-Field wird aus Vendor-Name abgeleitet (A/S=DK, GmbH=DE, Inc=US, B.V.=NL, ...) plus Vendor-Lookup-Table. Reduziert false-positive no_country-Flags bei eindeutig-EU-Vendors (Adform DK, Pinterest IE). Action-Recipes + Doc-Anchor-Locator - finding_action_recipes: pro Finding-Typ (no_cookies_listed, no_country, broken_opt_out, "Auftragsverarbeiter erwaehnen", "Art. 22 Profiling", ...) eine strukturierte Anweisung mit what/why/fix_text/where/example. Zum 1:1-Einfuegen in Kunden-Dokumente. - doc_anchor_locator: Embedding-basiert (BGE-M3 cosine) — sucht den passenden Absatz im existierenden Kundendokument fuer jeden Finding. Per-Run Thread-Local-Cache. Fallback: keyword-Match. - Email-Rendering integriert Recipe + Anchor pro Doc-Pruefungs-Fail + Vendor-Flag-Liste mit aufklappbarer Action-Liste. - Score-Erklaerung pro Vendor-Zeile (3/5-Untertitel + Tooltip). Migration-Pipeline (Compliance-Check -> Customer Banner/Documents) - migration_to_banner.py: Vendor-Liste -> CookieBannerConfig mit 4 Kategorien + Review-Flags. - migration_to_document.py: Vendor-Liste -> Cookie-Policy + VVT-Register + Privacy-Policy-Pre-Fills. - agent_migration_routes: 3 Preview-Endpoints (banner-preview, document-preview, summary). Persistierung der cmp_vendors in /data/compliance_audits.db check_payloads-Tabelle. Borlabs-Parity Cookie-Banner-Features - Consent-Historie im Banner: window.bpShowConsentHistory() + localStorage. - Content-Blocker: cookie-banner-content-blocker.ts — YouTube/Maps/Video Placeholder bis Einwilligung. - Google Consent Mode v2 erweitert: wait_for_update + region=EEA/CH/GB. - Consent-Log Export (CSV/JSON) per einwilligungen_export_routes. Bug-Fixes - canonical_control_routes: _jsonish-Helper fuer string-typed jsonb, similar-controls-Endpoint mit _has_embedding_col()-Cache (kein 500 mehr). - Control-Library Frontend: defensive .map-Coercer in 2 Detail-Views. - Embedding-Service-Batching (32er Batches statt 165 in einem Call). - KeyError 'control_id' in MC-Result-Aggregation (defensive .get). - Master-Controls-Klick-Through von /sdk/master-controls auf /sdk/control-library?control=<id> mit URL-Param-Auto-Open. - Dockerfile: /data pre-chowned auf appuser (Audit-DB-Schreibrecht). - Cookie-Text-Routing-Bug (cmp_reconstructed > DOM-extraction). - doc_type-aware MC-Filter (statt all-text-MCs). - Master-Contract-Dedup (60 BMW-Internal-Eintraege = 1 Adobe-Vertrag). - A3-v2-Audit hat 24 UI-Sprache-MCs als 'process' reklassifiziert. Tests - test_migration_mappers.py (9 Tests) - test_migration_endpoints.py (4 Tests) Skripte (one-shot) - classify_mc_check_type.py (v1) + _v2 (PK=control_id,doc_type) - audit_mc_doctype_fit.py (v1 fits) + _v2 (ui_only + scope_requires) BMW-Run-Bilanz v1 (broken) -> v9 (alle Fixes): DSE 7,5% -> 81-83% Impressum 4% -> 100% (6 echte MCs alle erfuellt) Cookie 0% -> 79-83% (CMP-Text-Routing + Embedding) Plus: 10 Konsolidierungs-Kategorien, geschaetzte Saving 200k-3M / Jahr Plus: Action-Recipes + Doc-Anchors fuer jeden Fail Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
182 lines
6.6 KiB
Python
182 lines
6.6 KiB
Python
"""
|
|
Consent-Log Export (Borlabs-Parity + DSB-Audit-Anforderung).
|
|
|
|
Auditors verlangen routinemaessig einen Auszug aller erteilten/
|
|
widerrufenen Einwilligungen pro Tenant — heute musste der DSB dafuer
|
|
manuell SQL schreiben. Diese Endpunkte liefern CSV + JSON direkt aus
|
|
dem Browser.
|
|
|
|
Endpoints:
|
|
GET /einwilligungen/export/consents.csv
|
|
GET /einwilligungen/export/consents.json
|
|
GET /einwilligungen/export/history.csv — Aenderungs-Historie
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter, Depends, Header, Query
|
|
from fastapi.responses import Response
|
|
from sqlalchemy.orm import Session
|
|
|
|
from classroom_engine.database import get_db
|
|
from ..db.einwilligungen_models import (
|
|
EinwilligungenConsentDB,
|
|
EinwilligungenConsentHistoryDB,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/einwilligungen/export", tags=["einwilligungen-export"])
|
|
|
|
|
|
def _get_tenant(x_tenant_id: str | None = Header(None, alias="X-Tenant-ID")) -> str:
|
|
if not x_tenant_id:
|
|
from .tenant_utils import get_tenant_id
|
|
return get_tenant_id()
|
|
return x_tenant_id
|
|
|
|
|
|
def _ts() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
|
|
|
|
|
|
def _consent_rows(consents: list[EinwilligungenConsentDB]) -> list[dict]:
|
|
return [
|
|
{
|
|
"consent_id": str(c.id),
|
|
"user_id": c.user_id or "",
|
|
"data_point_id": c.data_point_id or "",
|
|
"granted": "yes" if c.granted else "no",
|
|
"purpose": c.purpose or "",
|
|
"consent_version": c.consent_version or "",
|
|
"ip_address": c.ip_address or "",
|
|
"user_agent": (c.user_agent or "")[:200],
|
|
"source": c.source or "",
|
|
"created_at": c.created_at.isoformat() if c.created_at else "",
|
|
"updated_at": c.updated_at.isoformat() if c.updated_at else "",
|
|
"revoked_at": c.revoked_at.isoformat() if getattr(c, "revoked_at", None) else "",
|
|
}
|
|
for c in consents
|
|
]
|
|
|
|
|
|
def _history_rows(entries: list[EinwilligungenConsentHistoryDB]) -> list[dict]:
|
|
return [
|
|
{
|
|
"id": str(e.id),
|
|
"consent_id": str(e.consent_id),
|
|
"action": e.action or "",
|
|
"consent_version": e.consent_version or "",
|
|
"ip_address": e.ip_address or "",
|
|
"user_agent": (e.user_agent or "")[:200],
|
|
"source": e.source or "",
|
|
"created_at": e.created_at.isoformat() if e.created_at else "",
|
|
}
|
|
for e in entries
|
|
]
|
|
|
|
|
|
def _csv_response(rows: list[dict], filename: str) -> Response:
|
|
if not rows:
|
|
return Response(content="", media_type="text/csv",
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"})
|
|
buf = io.StringIO()
|
|
w = csv.DictWriter(buf, fieldnames=list(rows[0].keys()), quoting=csv.QUOTE_ALL)
|
|
w.writeheader()
|
|
w.writerows(rows)
|
|
return Response(content=buf.getvalue(), media_type="text/csv; charset=utf-8",
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"})
|
|
|
|
|
|
def _json_response(payload: dict, filename: str) -> Response:
|
|
body = json.dumps(payload, ensure_ascii=False, indent=2, default=str)
|
|
return Response(content=body, media_type="application/json; charset=utf-8",
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"})
|
|
|
|
|
|
@router.get("/consents.csv")
|
|
async def export_consents_csv(
|
|
user_id: str | None = Query(None, description="Filter by single user"),
|
|
granted: bool | None = Query(None),
|
|
since: str | None = Query(None, description="ISO timestamp"),
|
|
tenant_id: str = Depends(_get_tenant),
|
|
db: Session = Depends(get_db),
|
|
) -> Response:
|
|
"""Download all consent records of this tenant as CSV (auditor-ready)."""
|
|
q = db.query(EinwilligungenConsentDB).filter(
|
|
EinwilligungenConsentDB.tenant_id == tenant_id,
|
|
)
|
|
if user_id:
|
|
q = q.filter(EinwilligungenConsentDB.user_id == user_id)
|
|
if granted is not None:
|
|
q = q.filter(EinwilligungenConsentDB.granted == granted)
|
|
if since:
|
|
try:
|
|
since_dt = datetime.fromisoformat(since.rstrip("Z"))
|
|
q = q.filter(EinwilligungenConsentDB.created_at >= since_dt)
|
|
except Exception:
|
|
pass
|
|
rows = _consent_rows(q.order_by(EinwilligungenConsentDB.created_at.desc()).all())
|
|
return _csv_response(rows, f"consents_{tenant_id[:8]}_{_ts()}.csv")
|
|
|
|
|
|
@router.get("/consents.json")
|
|
async def export_consents_json(
|
|
user_id: str | None = Query(None),
|
|
granted: bool | None = Query(None),
|
|
since: str | None = Query(None),
|
|
tenant_id: str = Depends(_get_tenant),
|
|
db: Session = Depends(get_db),
|
|
) -> Response:
|
|
"""Same data as the CSV endpoint but JSON-shaped for further processing."""
|
|
q = db.query(EinwilligungenConsentDB).filter(
|
|
EinwilligungenConsentDB.tenant_id == tenant_id,
|
|
)
|
|
if user_id:
|
|
q = q.filter(EinwilligungenConsentDB.user_id == user_id)
|
|
if granted is not None:
|
|
q = q.filter(EinwilligungenConsentDB.granted == granted)
|
|
if since:
|
|
try:
|
|
since_dt = datetime.fromisoformat(since.rstrip("Z"))
|
|
q = q.filter(EinwilligungenConsentDB.created_at >= since_dt)
|
|
except Exception:
|
|
pass
|
|
rows = _consent_rows(q.order_by(EinwilligungenConsentDB.created_at.desc()).all())
|
|
payload = {
|
|
"tenant_id": tenant_id,
|
|
"exported_at": datetime.now(timezone.utc).isoformat(),
|
|
"filter": {"user_id": user_id, "granted": granted, "since": since},
|
|
"count": len(rows),
|
|
"consents": rows,
|
|
}
|
|
return _json_response(payload, f"consents_{tenant_id[:8]}_{_ts()}.json")
|
|
|
|
|
|
@router.get("/history.csv")
|
|
async def export_history_csv(
|
|
consent_id: str | None = Query(None, description="Limit to one consent"),
|
|
since: str | None = Query(None),
|
|
tenant_id: str = Depends(_get_tenant),
|
|
db: Session = Depends(get_db),
|
|
) -> Response:
|
|
"""Download the consent-change history (Art. 7(1) Nachweispflicht)."""
|
|
q = db.query(EinwilligungenConsentHistoryDB).filter(
|
|
EinwilligungenConsentHistoryDB.tenant_id == tenant_id,
|
|
)
|
|
if consent_id:
|
|
q = q.filter(EinwilligungenConsentHistoryDB.consent_id == consent_id)
|
|
if since:
|
|
try:
|
|
since_dt = datetime.fromisoformat(since.rstrip("Z"))
|
|
q = q.filter(EinwilligungenConsentHistoryDB.created_at >= since_dt)
|
|
except Exception:
|
|
pass
|
|
rows = _history_rows(q.order_by(EinwilligungenConsentHistoryDB.created_at.asc()).all())
|
|
return _csv_response(rows, f"consent-history_{tenant_id[:8]}_{_ts()}.csv")
|