""" P84 — Diff-Mode pro Mail. Vergleicht den aktuellen Lauf mit dem letzten Snapshot derselben Site: "Seit letztem Lauf 3 Findings weg, 1 neues." USP — keiner der grossen Anbieter (Borlabs, OneTrust, Cookiebot, Usercentrics) hat das. Wird in der Mail-Composition nach dem GF-1-Pager gerendert (klein, neutral). Wenn kein vorheriger Lauf existiert: skip silently. Heuristik: Extrahiert Finding-Labels aus banner_result.phases[].findings und (wenn vorhanden) scorecard.failed. Vergleicht set-basiert auf normalisiertem Label. """ from __future__ import annotations import logging import re from datetime import datetime, timezone from typing import Any from sqlalchemy import text from sqlalchemy.orm import Session logger = logging.getLogger(__name__) def _norm_label(s: str) -> str: s = (s or "").lower().strip() s = re.sub(r"\s+", " ", s) s = re.sub(r"[^\w\s äöüß]", "", s) return s[:200] def _extract_finding_labels( banner_result: dict | None, scorecard: dict | None = None, ) -> set[str]: out: set[str] = set() if isinstance(banner_result, dict): for ph in (banner_result.get("phases") or {}).values(): if not isinstance(ph, dict): continue for f in (ph.get("findings") or []): if isinstance(f, dict): lbl = f.get("label") or f.get("title") or f.get("check") or "" if lbl: out.add(_norm_label(lbl)) if isinstance(scorecard, dict): for ent in (scorecard.get("failed") or scorecard.get("items") or []): if isinstance(ent, dict): lbl = ent.get("label") or ent.get("title") or "" if lbl: out.add(_norm_label(lbl)) return out def _previous_snapshot(db: Session, site_domain: str, exclude_check_id: str) -> dict | None: """Returns the most recent snapshot for the same site (excluding the current one).""" row = db.execute(text( """ SELECT check_id, banner_result, created_at FROM compliance.compliance_check_snapshots WHERE site_domain = :dom AND check_id != :ex ORDER BY created_at DESC LIMIT 1 """ ), {"dom": site_domain, "ex": exclude_check_id}).fetchone() if not row: return None return { "check_id": row[0], "banner_result": row[1] or {}, "created_at": row[2], } def compute_diff( db: Session, current_check_id: str, site_domain: str, banner_result: dict | None, scorecard: dict | None = None, ) -> dict | None: """Returns {prev_check_id, prev_at, added, removed, unchanged_count} or None if there is no previous snapshot.""" prev = _previous_snapshot(db, site_domain, current_check_id) if not prev: return None curr_set = _extract_finding_labels(banner_result, scorecard) prev_set = _extract_finding_labels(prev["banner_result"], None) if not curr_set and not prev_set: return None return { "prev_check_id": prev["check_id"], "prev_at": prev["created_at"], "added": sorted(curr_set - prev_set)[:20], "removed": sorted(prev_set - curr_set)[:20], "unchanged_count": len(curr_set & prev_set), } def _fmt_age(when: Any) -> str: if not isinstance(when, datetime): return "frueher" if when.tzinfo is None: when = when.replace(tzinfo=timezone.utc) delta = datetime.now(timezone.utc) - when days = delta.days if days <= 0: hours = delta.seconds // 3600 return f"vor {hours}h" if hours else "soeben" if days == 1: return "vor 1 Tag" if days < 14: return f"vor {days} Tagen" weeks = days // 7 return f"vor {weeks} Wochen" def build_diff_block_html(diff: dict) -> str: if not diff: return "" added = diff.get("added") or [] removed = diff.get("removed") or [] if not added and not removed: return ( '