6c223c7c9b
CI / detect-changes (push) Successful in 10s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 14s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m43s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 37s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
P1 — Exec-Summary oben im Email-Report (4 KPIs + 2 CTAs, dunkler Gradient)
P3 — no_direct_sales-Flag fuer OEM-Konfigurator-Sites; AGB/Widerruf/AGB als
"NICHT ANWENDBAR" (grau) statt "NICHT GEFUNDEN" (rot)
P5 — Voll-Audit Unification: alle Findings (MC + Pflichtangaben + Vendor +
Redundanz) in /data/compliance_audits.db.unified_findings; neuer
/api/compliance/agent/findings/<id> Endpoint + FindingsTab im Audit-UI
mit Filter + CSV-Export
P7 — Crawl-Hardening: TDM-Reservation-Check (robots.txt / ai.txt / Header /
Meta) vor jedem Run mit 24h-Cache; HeadlessChrome-UA (Firma noch nicht
gegruendet — Switch via BREAKPILOT_BRANDED_UA env); per-Domain
Rate-Limit 1 req/s + max 2 concurrent
P2 — Cookie-Knowledge-DB additiv erweitert (35 -> 74 Cookies): Adobe, Meta,
Microsoft, LinkedIn, TikTok, HubSpot, Marketo, Salesforce, Hotjar,
FullStory, Mouseflow, Intercom, Drift, Zendesk, Cloudflare, Stripe,
OneTrust/Cookiebot/Usercentrics, Matomo, Pinterest, Snapchat, X/Twitter,
YouTube, Vimeo, Klaviyo, Mailchimp, Mixpanel, Segment, Amplitude,
Optimizely, Datadog; Wire-in in cookie_function_classifier liefert
compliance_risk-Label (kritisch/hoch/mittel/gering) pro Vendor
A — k-Anonymitaets-Helper (benchmark_k_anonymity) fuer P6-Vorbereitung
B — Cross-Tenant-Domain-Assertion im /findings-Endpoint (expected_domain
Query-Param -> 403 bei Mismatch)
C — Saving-Scan-Funnel: /api/compliance/agent/saving-scan/start mit
Validierung + 24h-Rate-Limit pro Domain + Lead-Persistenz in
saving_scan_leads + Auto-Discovery via _run_compliance_check; 6 Tests
D — Risk-Badge im Email-Vendor-Row
Rechtliche Leitplanken (Memory feedback_oem_data_legal.md): nur eigene
Knapp-Bewertungen + Source-Pointer, keine 1:1-Kopien fremder CMP-Texte.
TDM-Opt-Out-Respect nach § 44b UrhG. KEINE Schema-Aenderungen — alles in
Sidecar-SQLite.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
150 lines
4.6 KiB
Python
150 lines
4.6 KiB
Python
"""
|
|
k-Anonymitaets-Helper fuer Branchen-Benchmarks (P6-Vorbereitung).
|
|
|
|
Vor jeder Veroeffentlichung von Benchmark-Aussagen pruefen, ob die
|
|
zugrundeliegende Stichprobe gross genug ist, dass keine Re-Identifikation
|
|
einzelner Hersteller moeglich wird.
|
|
|
|
Default k=5: jede publizierbare Aussage muss auf mindestens 5 verschiedenen
|
|
Datensubjekten (z.B. OEM-Sites) beruhen. Bei OEM-Markt mit ~30 Spielern
|
|
ist k=5 das Minimum, um "ein deutscher Premium-Hersteller mit X Modellen"
|
|
auszuschliessen.
|
|
|
|
Memory: feedback_oem_data_legal.md + project_legal_contracts_2026_07.md.
|
|
|
|
Verwendung:
|
|
from compliance.services.benchmark_k_anonymity import (
|
|
enforce_k_anonymity, quantize_value, KAnonymityError,
|
|
)
|
|
|
|
rows = [...] # pro Hersteller 1 Row
|
|
safe_groups = enforce_k_anonymity(rows, group_keys=["segment", "country"])
|
|
# safe_groups: nur Gruppen mit count >= 5 zurueck
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Iterable
|
|
from typing import Any
|
|
|
|
DEFAULT_K = 5
|
|
|
|
|
|
class KAnonymityError(RuntimeError):
|
|
"""Stichprobe ist zu klein fuer eine publizierbare Aussage."""
|
|
|
|
|
|
def assert_min_sample(n: int, k: int = DEFAULT_K, context: str = "") -> None:
|
|
"""Wirft KAnonymityError wenn n < k."""
|
|
if n < k:
|
|
raise KAnonymityError(
|
|
f"Stichprobe zu klein fuer Publikation: n={n} < k={k}"
|
|
+ (f" — Kontext: {context}" if context else "")
|
|
)
|
|
|
|
|
|
def quantize_value(value: float | int, step: int = 5) -> int:
|
|
"""Quantisiere Zahlenwerte auf step-Vielfache (Generalisierung).
|
|
|
|
quantize_value(67, 5) -> 65
|
|
quantize_value(83, 10) -> 80
|
|
|
|
Verhindert exakte Identifizierung ueber numerische Signale.
|
|
"""
|
|
if step <= 0:
|
|
return int(value)
|
|
return int(value // step) * step
|
|
|
|
|
|
def quantize_range(value: float | int, step: int = 10) -> str:
|
|
"""Gib ein Range-Bucket zurueck als String: '60-70%', '80-90%'."""
|
|
base = quantize_value(value, step)
|
|
return f"{base}-{base + step}%"
|
|
|
|
|
|
def group_and_count(
|
|
rows: Iterable[dict],
|
|
keys: list[str],
|
|
) -> dict[tuple, int]:
|
|
"""Gruppiere Rows nach allen `keys` und zaehle pro Bucket."""
|
|
counts: dict[tuple, int] = {}
|
|
for r in rows:
|
|
bucket = tuple(r.get(k, "") for k in keys)
|
|
counts[bucket] = counts.get(bucket, 0) + 1
|
|
return counts
|
|
|
|
|
|
def enforce_k_anonymity(
|
|
rows: list[dict],
|
|
group_keys: list[str],
|
|
k: int = DEFAULT_K,
|
|
) -> list[dict]:
|
|
"""Filtere Rows so, dass jede ueberlebende Gruppe >= k Mitglieder hat.
|
|
|
|
Returns: Rows die in ausreichend grossen Gruppen sind.
|
|
Rows in zu kleinen Gruppen werden suppressed (entfernt).
|
|
"""
|
|
counts = group_and_count(rows, group_keys)
|
|
safe_buckets = {bucket for bucket, n in counts.items() if n >= k}
|
|
return [
|
|
r for r in rows
|
|
if tuple(r.get(key, "") for key in group_keys) in safe_buckets
|
|
]
|
|
|
|
|
|
def summarize_benchmark(
|
|
rows: list[dict],
|
|
group_keys: list[str],
|
|
measure_key: str,
|
|
k: int = DEFAULT_K,
|
|
quantize_step: int = 5,
|
|
) -> list[dict]:
|
|
"""Erzeuge publizierbare Benchmark-Aggregat-Zeilen.
|
|
|
|
Pro Gruppe: count, mean (quantisiert), only-if count >= k.
|
|
Liefert sortiert nach count desc.
|
|
|
|
Beispiel:
|
|
rows = [{"segment": "premium", "consent_score": 84}, ...]
|
|
summarize_benchmark(rows, ["segment"], "consent_score")
|
|
-> [{"segment": "premium", "n": 8, "mean_quantized": 80}, ...]
|
|
"""
|
|
buckets: dict[tuple, list[float]] = {}
|
|
for r in rows:
|
|
bucket = tuple(r.get(k, "") for k in group_keys)
|
|
val = r.get(measure_key)
|
|
if val is not None:
|
|
buckets.setdefault(bucket, []).append(float(val))
|
|
|
|
out: list[dict] = []
|
|
for bucket, values in buckets.items():
|
|
n = len(values)
|
|
if n < k:
|
|
continue
|
|
mean = sum(values) / n
|
|
entry: dict[str, Any] = {key: bucket[i] for i, key in enumerate(group_keys)}
|
|
entry["n"] = n
|
|
entry["mean_quantized"] = quantize_value(mean, quantize_step)
|
|
entry["mean_range"] = quantize_range(mean, quantize_step * 2)
|
|
out.append(entry)
|
|
out.sort(key=lambda e: e["n"], reverse=True)
|
|
return out
|
|
|
|
|
|
def safe_to_publish(
|
|
statement: str,
|
|
sample_size: int,
|
|
k: int = DEFAULT_K,
|
|
) -> tuple[bool, str]:
|
|
"""Validator fuer Marketing/Press-Statements.
|
|
|
|
Returns (ok, message). Wenn ok=False, NICHT publishen.
|
|
"""
|
|
if sample_size < k:
|
|
return False, (
|
|
f'Aussage NICHT publizierbar: "{statement[:60]}…" '
|
|
f'(n={sample_size} < k={k}). Risiko: Re-Identifikation '
|
|
f'einzelner Hersteller moeglich.'
|
|
)
|
|
return True, f"OK (n={sample_size}, k={k})"
|