Files
breakpilot-compliance/backend-compliance/compliance/services/mc_scorecard.py
T
Benjamin Admin 662327e8b4
CI / nodejs-build (push) Successful in 2m47s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
feat(compliance-check): MC-Classification + Embedding + Vendor-Redundanz + Action-Recipes + Borlabs-Features
Massiv-Update auf Basis BMW-Test-Iterationen (v1→v9):

Core Compliance-Check
- Sonnet check_type Klassifikation: text/process/review fuer alle 1874 MCs
  in compliance.doc_check_controls (script + Sidecar /data/mc_classification.db).
  rag_document_checker filtert auf check_type='text' fuer doc_check.
  Plus fits_doc_type-Audit (v2) + ui_only-Audit fuer DSA/E-Commerce-MCs in
  falscher doc_type-Schublade.
- scope_requires-Filter: biometric/ai_decision/child_targeting MCs werden
  per business_profile gefiltert (FRT skipped fuer BMW etc.).
- Embedding-Match (BGE-M3) als Phase-3 nach Regex-Match:
  Per-doc_type-Threshold-Override (impressum 0.50, dse/cookie 0.60),
  Short-Field-Rescue (15-Wort-Chunks) fuer Pflichtfelder im Impressum.
  Title+check_question als Embedding-Input fuer mehr Kontext.
- Cookie-Text-Routing: consent-tester gibt cmp_cookie_text aus dem
  CMP-Reconstruct zurueck, Backend bevorzugt das gegen DOM-Extraction
  wenn richer (BMW 1824 vs 600 Worte).

Vendor-Redundanz + EU-Alternativen + Cost-Saving
- vendor_redundancy.analyze() — funktionale Kategorisierung der CMP-Vendors,
  Detektion von Mehrfach-Anbietern pro Kategorie, EU-Alternative-Lookup
  (Matomo, IONOS, HERE, Friendly Captcha, Smart AdServer, ...).
- vendor_cost_estimator: Tier-Inferenz aus Cookie-Footprint (Cookie-Anzahl
  + Premium-Feature-Cookies + Third-Party-Quote → starter/professional/
  enterprise/premier).
- Self-Service-Werbung (Google/Meta/Pinterest/...) = 0 Lizenz-Kosten
  (nur Media-Spend, separat). DSP-Plattformen behalten enge Range.
- Tier-aware Saving-Range: bei Enterprise/Premier nutzen wir den
  oberen 40-100%-Band der Listpreise, nicht starter→premier.
- Multi-Function-Tools (Matomo Pro, SAP CX, IONOS Cloud, Userlike, Smart
  AdServer, HERE Maps, Vimeo Pro, LamaPoll) — ein Tool ersetzt mehrere
  Kategorien gleichzeitig.

Cookie-Wissens-DB + Funktionale Klassifikation
- cookie_knowledge_db: 50 kuratierte Top-Cookies (Google/Meta/Adobe/MS/...)
  mit vendor, exact_purpose, data_collected, IAB-TCF-IDs, reid_risk,
  schrems_ii_status, EuGH-Urteile, EU-Alternative.
- cookie_function_classifier: pro Cookie funktionale Rolle (tracking_id,
  ad_pixel, session_id, ab_test, csrf, ...) + blocking_impact.

Country-Inferenz aus Rechtsform
- cookie_link_validator: Country-Field wird aus Vendor-Name abgeleitet
  (A/S=DK, GmbH=DE, Inc=US, B.V.=NL, ...) plus Vendor-Lookup-Table.
  Reduziert false-positive no_country-Flags bei eindeutig-EU-Vendors
  (Adform DK, Pinterest IE).

Action-Recipes + Doc-Anchor-Locator
- finding_action_recipes: pro Finding-Typ (no_cookies_listed, no_country,
  broken_opt_out, "Auftragsverarbeiter erwaehnen", "Art. 22 Profiling",
  ...) eine strukturierte Anweisung mit what/why/fix_text/where/example.
  Zum 1:1-Einfuegen in Kunden-Dokumente.
- doc_anchor_locator: Embedding-basiert (BGE-M3 cosine) — sucht den
  passenden Absatz im existierenden Kundendokument fuer jeden Finding.
  Per-Run Thread-Local-Cache. Fallback: keyword-Match.
- Email-Rendering integriert Recipe + Anchor pro Doc-Pruefungs-Fail
  + Vendor-Flag-Liste mit aufklappbarer Action-Liste.
- Score-Erklaerung pro Vendor-Zeile (3/5-Untertitel + Tooltip).

Migration-Pipeline (Compliance-Check -> Customer Banner/Documents)
- migration_to_banner.py: Vendor-Liste -> CookieBannerConfig mit
  4 Kategorien + Review-Flags.
- migration_to_document.py: Vendor-Liste -> Cookie-Policy + VVT-Register
  + Privacy-Policy-Pre-Fills.
- agent_migration_routes: 3 Preview-Endpoints (banner-preview,
  document-preview, summary). Persistierung der cmp_vendors in
  /data/compliance_audits.db check_payloads-Tabelle.

Borlabs-Parity Cookie-Banner-Features
- Consent-Historie im Banner: window.bpShowConsentHistory() + localStorage.
- Content-Blocker: cookie-banner-content-blocker.ts — YouTube/Maps/Video
  Placeholder bis Einwilligung.
- Google Consent Mode v2 erweitert: wait_for_update + region=EEA/CH/GB.
- Consent-Log Export (CSV/JSON) per einwilligungen_export_routes.

Bug-Fixes
- canonical_control_routes: _jsonish-Helper fuer string-typed jsonb,
  similar-controls-Endpoint mit _has_embedding_col()-Cache (kein 500 mehr).
- Control-Library Frontend: defensive .map-Coercer in 2 Detail-Views.
- Embedding-Service-Batching (32er Batches statt 165 in einem Call).
- KeyError 'control_id' in MC-Result-Aggregation (defensive .get).
- Master-Controls-Klick-Through von /sdk/master-controls auf
  /sdk/control-library?control=<id> mit URL-Param-Auto-Open.
- Dockerfile: /data pre-chowned auf appuser (Audit-DB-Schreibrecht).
- Cookie-Text-Routing-Bug (cmp_reconstructed > DOM-extraction).
- doc_type-aware MC-Filter (statt all-text-MCs).
- Master-Contract-Dedup (60 BMW-Internal-Eintraege = 1 Adobe-Vertrag).
- A3-v2-Audit hat 24 UI-Sprache-MCs als 'process' reklassifiziert.

Tests
- test_migration_mappers.py (9 Tests)
- test_migration_endpoints.py (4 Tests)

Skripte (one-shot)
- classify_mc_check_type.py (v1) + _v2 (PK=control_id,doc_type)
- audit_mc_doctype_fit.py (v1 fits) + _v2 (ui_only + scope_requires)

BMW-Run-Bilanz v1 (broken) -> v9 (alle Fixes):
  DSE     7,5% -> 81-83%
  Impressum 4%   -> 100% (6 echte MCs alle erfuellt)
  Cookie  0%    -> 79-83% (CMP-Text-Routing + Embedding)
  Plus: 10 Konsolidierungs-Kategorien, geschaetzte Saving 200k-3M / Jahr
  Plus: Action-Recipes + Doc-Anchors fuer jeden Fail

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 18:30:08 +02:00

187 lines
6.2 KiB
Python

"""
Master-Control Scorecard — group + summarise MC results.
With max_controls=0 (#30 fix) every doc-check now evaluates 75-571 MCs
per document. Rendering all of them verbatim makes the email + frontend
unreadable. This module produces three structured artefacts:
1. `build_scorecard(check_results)` — per-regulation aggregate (PASS /
FAIL / SKIP counts + severity histogram + compliance %)
2. `top_fails(check_results, n=10)` — top-N failed MCs ranked by
severity then absence of evidence
3. `full_audit_records(check_results, check_id, tenant_id)` — flat
list ready for SQLite persistence + JSON export
The functions are pure — no DB / network — so they're cheap to call
from inside the route and unit-testable.
"""
from __future__ import annotations
import logging
from collections import defaultdict
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# Severity order: CRITICAL > HIGH > MEDIUM > LOW > INFO
_SEV_RANK = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
def build_scorecard(check_results: list[dict]) -> dict:
"""Aggregate per-regulation pass/fail/skip + severity buckets.
Args:
check_results: list of dicts, each typically a CheckItem-like
record with keys: id, label, passed, severity, skipped,
regulation, doc_type.
Returns:
{
"by_regulation": [
{"regulation": "DSGVO", "total": 193, "passed": 167,
"failed": 24, "skipped": 2, "pct": 87,
"severity": {"HIGH": 22, "MEDIUM": 2}}
],
"totals": {"total": 1874, "passed": 1300, "failed": 540,
"skipped": 34, "pct": 70},
}
"""
buckets: dict[str, dict] = defaultdict(
lambda: {"total": 0, "passed": 0, "failed": 0, "skipped": 0,
"severity": defaultdict(int)},
)
for r in check_results or []:
reg = (r.get("regulation") or "").strip() or ""
b = buckets[reg]
b["total"] += 1
if r.get("skipped"):
b["skipped"] += 1
elif r.get("passed"):
b["passed"] += 1
else:
b["failed"] += 1
sev = (r.get("severity") or "MEDIUM").upper()
b["severity"][sev] += 1
rows = []
grand_total = grand_passed = grand_failed = grand_skipped = 0
for reg, b in buckets.items():
# Convert defaultdict for serialisability
sev_dict = dict(b["severity"])
active = b["total"] - b["skipped"]
pct = round(b["passed"] / active * 100) if active else 0
rows.append({
"regulation": reg,
"total": b["total"],
"passed": b["passed"],
"failed": b["failed"],
"skipped": b["skipped"],
"pct": pct,
"severity": sev_dict,
})
grand_total += b["total"]
grand_passed += b["passed"]
grand_failed += b["failed"]
grand_skipped += b["skipped"]
rows.sort(key=lambda r: (-r["failed"], r["regulation"]))
grand_active = grand_total - grand_skipped
grand_pct = round(grand_passed / grand_active * 100) if grand_active else 0
return {
"by_regulation": rows,
"totals": {
"total": grand_total, "passed": grand_passed,
"failed": grand_failed, "skipped": grand_skipped,
"pct": grand_pct,
},
}
_DEDUP_KEYWORDS = [
"einfache sprache", "verstaendliche sprache", "verständliche sprache",
"klare sprache", "einwilligungstexte", "einwilligungsaufforderung",
"einwilligungserklaerung", "einwilligungserklärung",
"mehrdeutige", "verstaendliche form", "verständliche form",
"fachbegriffe erklaeren", "fachbegriffe erklären",
]
def _dedup_key(label: str) -> str:
"""Cluster label to a stable dedup-key: if it contains one of the
well-known repetitive Sprache/Einwilligungs-Aufforderungs-Concepts,
collapse them all to that single concept. Otherwise return original."""
l = (label or "").lower()
for kw in _DEDUP_KEYWORDS:
if kw in l:
return f"_dup:{kw}"
return label
def top_fails(check_results: list[dict], n: int = 10) -> list[dict]:
"""Return top-N failing MCs sorted by severity then label.
Skipped + passed MCs are excluded. INFO severity is excluded by
default since those are guidance, not findings.
Near-duplicates (multiple MCs that all complain about "einfache
Sprache" / "Einwilligungsaufforderung" / ...) are collapsed to ONE
representative entry — sonst dominieren UI-Sprache-Hinweise die
Top-Liste und echte Lecks gehen unter.
"""
fails = [
r for r in (check_results or [])
if not r.get("passed") and not r.get("skipped")
and (r.get("severity") or "").upper() != "INFO"
]
fails.sort(key=lambda r: (
_SEV_RANK.get((r.get("severity") or "MEDIUM").upper(), 5),
r.get("label", ""),
))
seen_keys: set[str] = set()
deduped: list[dict] = []
for r in fails:
k = _dedup_key(r.get("label", ""))
if k in seen_keys:
continue
seen_keys.add(k)
deduped.append(r)
if len(deduped) >= n:
break
return deduped
def full_audit_records(
check_results: list[dict],
check_id: str,
tenant_id: str = "",
doc_type: str = "",
) -> list[dict]:
"""Flatten check results into rows ready for SQLite persistence.
Returns one record per MC. Keeps the original fields plus
check_id + doc_type + tenant_id + ts.
"""
ts = datetime.now(timezone.utc).isoformat()
out: list[dict] = []
for r in check_results or []:
out.append({
"check_id": check_id,
"tenant_id": tenant_id,
"doc_type": doc_type,
"ts": ts,
"mc_id": r.get("id", ""),
"label": (r.get("label") or "")[:300],
"passed": bool(r.get("passed")),
"skipped": bool(r.get("skipped")),
"severity": (r.get("severity") or "").upper(),
"regulation": r.get("regulation") or "",
"matched_text": (r.get("matched_text") or "")[:500],
"hint": (r.get("hint") or "")[:500],
"level": int(r.get("level") or 1),
})
return out