diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 754c0fb9..6e09c788 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -1091,7 +1091,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): cookie_doc_for_check = (doc_texts.get("cookie") or doc_texts.get("dse") or "") cons_findings = run_consistency_checks( - banner_result or {}, cookie_doc_for_check, + banner_result or {}, cookie_doc_for_check, cmp_vendors, ) if cons_findings: consistency_html = build_consistency_block_html(cons_findings) @@ -1115,8 +1115,28 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("P82 GF-1-pager skipped: %s", e) + # P84: Diff-Mode — "Seit letztem Lauf X Findings weg, Y neue". + diff_html = "" + try: + from database import SessionLocal as _SL + from compliance.services.run_diff import ( + compute_diff, build_diff_block_html, + ) + _diff_db = _SL() + try: + diff = compute_diff( + _diff_db, check_id, domain_for_exec or "", + banner_result, scorecard, + ) + if diff: + diff_html = build_diff_block_html(diff) + finally: + _diff_db.close() + except Exception as e: + logger.warning("P84 diff-mode skipped: %s", e) + full_html = ( - gf_one_pager_html + gf_one_pager_html + diff_html + critical_html + scope_disclaimer_html + exec_summary_html + cookie_arch_html + summary_html + scanned_html + profile_html + scorecard_html + redundancy_html @@ -1895,6 +1915,12 @@ _DOC_TYPE_LABELS = { "social_media": "Social Media Datenschutz", "nutzungsbedingungen": "Nutzungsbedingungen", "dsb": "DSB-Kontakt", + # P74: Legal-Notice / Rechtliche Hinweise (IP, Forward-Looking, Risiko) + "legal_notice": "Rechtliche Hinweise", + # P96: Digital Services Act-Pflichtangaben (Art. 12+17 DSA) + "dsa": "DSA-Pflichtangaben", + # P97: Lizenzhinweise Dritter (OSS-Compliance) + "lizenzhinweise": "Lizenzhinweise Dritter", } # Canonical doc types in the same order as the frontend ComplianceCheckTab. diff --git a/backend-compliance/compliance/services/banner_consistency_checks.py b/backend-compliance/compliance/services/banner_consistency_checks.py index 4eec49bf..0275dbc4 100644 --- a/backend-compliance/compliance/services/banner_consistency_checks.py +++ b/backend-compliance/compliance/services/banner_consistency_checks.py @@ -178,7 +178,54 @@ def check_init_banner_vs_cookie_doc( } -def run_all(banner_result: dict, cookie_doc_text: str | None = None) -> list[dict]: +def check_banner_vs_cmp_partner_count( + banner_result: dict, + cmp_vendors: list | None, +) -> dict | None: + """P75 — Banner nennt N Partner, CMP-Payload listet viel mehr. + + Wenn der Banner-Text behauptet "5 Partner" oder "Wir und unsere + Partner", die CMP-Payload aber 100+ Vendors enthaelt, wird der + User getaeuscht. + """ + cmp_count = len(cmp_vendors or []) + if cmp_count < 20: + return None + initial_ph = (_phases(banner_result).get("initial") + or _phases(banner_result).get("before_accept") or {}) + banner_text = (initial_ph.get("banner_text") or "")[:5000] + if not banner_text: + return None + m = re.search(r"\b(\d{1,4})\s*(?:partner|drittanbieter|vendor|" + r"anbieter|dienstleister)", banner_text, re.I) + if not m: + return None + claimed = int(m.group(1)) + if claimed >= cmp_count * 0.6: + return None # Zahl im Banner ist plausibel. + return { + "severity": "HIGH", + "code": "banner_understates_vendor_count", + "label": ( + f"Banner-Text nennt {claimed} Partner, CMP-Payload listet " + f"{cmp_count} Vendors" + ), + "detail": ( + f"Die im Banner-Text genannte Zahl ({claimed}) unterschaetzt die " + f"tatsaechliche Anzahl der Empfaenger ({cmp_count}) deutlich. " + "Empfehlung: Banner-Text auf die echte Vendor-Zahl heben oder " + "die Vendor-Liste reduzieren." + ), + "legal_basis": ( + "Art. 13(1)(e) DSGVO + EDPB 5/2020 — die Empfaenger / " + "Empfaengerkategorien muessen vollstaendig und nicht " + "verharmlosend angegeben sein." + ), + } + + +def run_all(banner_result: dict, cookie_doc_text: str | None = None, + cmp_vendors: list | None = None) -> list[dict]: findings: list[dict] = [] try: f1 = check_cmp_tool_availability(banner_result) @@ -192,6 +239,12 @@ def run_all(banner_result: dict, cookie_doc_text: str | None = None) -> list[dic findings.append(f2) except Exception as e: logger.warning("P94 init_vs_cookie_doc failed: %s", e) + try: + f3 = check_banner_vs_cmp_partner_count(banner_result, cmp_vendors) + if f3: + findings.append(f3) + except Exception as e: + logger.warning("P75 banner_vs_cmp_count failed: %s", e) return findings diff --git a/backend-compliance/compliance/services/check_replay.py b/backend-compliance/compliance/services/check_replay.py index f6e5b51f..abeaf63f 100644 --- a/backend-compliance/compliance/services/check_replay.py +++ b/backend-compliance/compliance/services/check_replay.py @@ -139,7 +139,9 @@ def replay_from_snapshot( build_consistency_block_html, ) cookie_doc_for_check = doc_texts.get("cookie") or doc_texts.get("dse") or "" - cons = run_consistency_checks(banner_result or {}, cookie_doc_for_check) + cons = run_consistency_checks( + banner_result or {}, cookie_doc_for_check, cmp_vendors, + ) if cons: cons_html = build_consistency_block_html(cons) parts.append(cons_html) diff --git a/backend-compliance/compliance/services/run_diff.py b/backend-compliance/compliance/services/run_diff.py new file mode 100644 index 00000000..afcd9c0a --- /dev/null +++ b/backend-compliance/compliance/services/run_diff.py @@ -0,0 +1,182 @@ +""" +P84 — Diff-Mode pro Mail. + +Vergleicht den aktuellen Lauf mit dem letzten Snapshot derselben Site: +"Seit letztem Lauf 3 Findings weg, 1 neues." USP — keiner der grossen +Anbieter (Borlabs, OneTrust, Cookiebot, Usercentrics) hat das. + +Wird in der Mail-Composition nach dem GF-1-Pager gerendert (klein, +neutral). Wenn kein vorheriger Lauf existiert: skip silently. + +Heuristik: Extrahiert Finding-Labels aus banner_result.phases[].findings +und (wenn vorhanden) scorecard.failed. Vergleicht set-basiert auf +normalisiertem Label. +""" + +from __future__ import annotations + +import logging +import re +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy import text +from sqlalchemy.orm import Session + +logger = logging.getLogger(__name__) + + +def _norm_label(s: str) -> str: + s = (s or "").lower().strip() + s = re.sub(r"\s+", " ", s) + s = re.sub(r"[^\w\s äöüß]", "", s) + return s[:200] + + +def _extract_finding_labels( + banner_result: dict | None, + scorecard: dict | None = None, +) -> set[str]: + out: set[str] = set() + if isinstance(banner_result, dict): + for ph in (banner_result.get("phases") or {}).values(): + if not isinstance(ph, dict): + continue + for f in (ph.get("findings") or []): + if isinstance(f, dict): + lbl = f.get("label") or f.get("title") or f.get("check") or "" + if lbl: + out.add(_norm_label(lbl)) + if isinstance(scorecard, dict): + for ent in (scorecard.get("failed") or scorecard.get("items") or []): + if isinstance(ent, dict): + lbl = ent.get("label") or ent.get("title") or "" + if lbl: + out.add(_norm_label(lbl)) + return out + + +def _previous_snapshot(db: Session, site_domain: str, + exclude_check_id: str) -> dict | None: + """Returns the most recent snapshot for the same site (excluding the + current one).""" + row = db.execute(text( + """ + SELECT check_id, banner_result, created_at + FROM compliance.compliance_check_snapshots + WHERE site_domain = :dom AND check_id != :ex + ORDER BY created_at DESC LIMIT 1 + """ + ), {"dom": site_domain, "ex": exclude_check_id}).fetchone() + if not row: + return None + return { + "check_id": row[0], + "banner_result": row[1] or {}, + "created_at": row[2], + } + + +def compute_diff( + db: Session, + current_check_id: str, + site_domain: str, + banner_result: dict | None, + scorecard: dict | None = None, +) -> dict | None: + """Returns {prev_check_id, prev_at, added, removed, unchanged_count} + or None if there is no previous snapshot.""" + prev = _previous_snapshot(db, site_domain, current_check_id) + if not prev: + return None + curr_set = _extract_finding_labels(banner_result, scorecard) + prev_set = _extract_finding_labels(prev["banner_result"], None) + if not curr_set and not prev_set: + return None + + return { + "prev_check_id": prev["check_id"], + "prev_at": prev["created_at"], + "added": sorted(curr_set - prev_set)[:20], + "removed": sorted(prev_set - curr_set)[:20], + "unchanged_count": len(curr_set & prev_set), + } + + +def _fmt_age(when: Any) -> str: + if not isinstance(when, datetime): + return "frueher" + if when.tzinfo is None: + when = when.replace(tzinfo=timezone.utc) + delta = datetime.now(timezone.utc) - when + days = delta.days + if days <= 0: + hours = delta.seconds // 3600 + return f"vor {hours}h" if hours else "soeben" + if days == 1: + return "vor 1 Tag" + if days < 14: + return f"vor {days} Tagen" + weeks = days // 7 + return f"vor {weeks} Wochen" + + +def build_diff_block_html(diff: dict) -> str: + if not diff: + return "" + added = diff.get("added") or [] + removed = diff.get("removed") or [] + if not added and not removed: + return ( + '