diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 6e09c788..8d0f2935 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -1081,6 +1081,23 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("P102 mismatch detection failed: %s", e) + # P35 + P77 + P78: Textsignal-Checks (Save-Label, Cookies-in-DSE, + # JC-Klausel im DSE) + signals_html = "" + try: + from compliance.services.doc_text_signals import ( + run_all as run_signal_checks, + build_signals_block_html, + ) + cookie_doc_missing = not bool(doc_texts.get("cookie")) + sig_findings = run_signal_checks( + banner_result, doc_texts, cookie_doc_missing, + ) + if sig_findings: + signals_html = build_signals_block_html(sig_findings) + except Exception as e: + logger.warning("P35/P77/P78 signals-check failed: %s", e) + # P92 + P94: Banner-Konsistenz (CMP-Tool kaputt / Banner-vs-Doc-Diff) consistency_html = "" try: @@ -1115,6 +1132,28 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("P82 GF-1-pager skipped: %s", e) + # P86: Branchen-Benchmark (nur wenn scan_context.industry gesetzt) + bench_html = "" + try: + from database import SessionLocal as _SLb + from compliance.services.industry_benchmark import ( + compute_benchmark, build_benchmark_html, _extract_score, + ) + industry = (req.scan_context or {}).get("industry") if req.scan_context else None + curr_score = _extract_score(banner_result) + if industry and curr_score is not None: + _b_db = _SLb() + try: + bench = compute_benchmark( + _b_db, industry, curr_score, check_id, + ) + if bench: + bench_html = build_benchmark_html(bench) + finally: + _b_db.close() + except Exception as e: + logger.warning("P86 industry-benchmark skipped: %s", e) + # P84: Diff-Mode — "Seit letztem Lauf X Findings weg, Y neue". diff_html = "" try: @@ -1136,12 +1175,12 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): logger.warning("P84 diff-mode skipped: %s", e) full_html = ( - gf_one_pager_html + diff_html + gf_one_pager_html + bench_html + diff_html + critical_html + scope_disclaimer_html + exec_summary_html + cookie_arch_html + summary_html + scanned_html + profile_html + scorecard_html + redundancy_html + providers_html + banner_deep_html + library_mismatch_html - + consistency_html + + consistency_html + signals_html + vvt_html + report_html ) diff --git a/backend-compliance/compliance/services/doc_text_signals.py b/backend-compliance/compliance/services/doc_text_signals.py new file mode 100644 index 00000000..49e8c619 --- /dev/null +++ b/backend-compliance/compliance/services/doc_text_signals.py @@ -0,0 +1,214 @@ +""" +P35 + P77 + P78 — Post-hoc Textsignal-Checks auf den geladenen +Dokumenten-Texten (DSE / Cookie-Richtlinie / Banner-Text). + +P35 — "Speichern" als mehrdeutiges Reject-Label im Banner. Wenn das + einzige Schliess-Element nur "Speichern" heisst (statt + "Alle ablehnen" / "Nur notwendige"), ist das ein MEDIUM-Finding, + weil der Nutzer nicht versteht ob er gerade akzeptiert oder + abgelehnt hat. + +P77 — Cookie-Doc-Architecture: wenn keine eigene Cookie-Richtlinie + ausgeliefert wurde, aber die DSE einen prominent benannten + Cookie-Abschnitt enthaelt (mit Vendor-Liste + Speicherdauer), + ist das ein gleichwertiger OEM-Pattern. Liefert positives Signal + statt MEDIUM-Finding "Cookie-Richtlinie fehlt". + +P78 — JC-Detection in DSE-Text: erkennt 'gemeinsam Verantwortliche'- + Klauseln (Art. 26 DSGVO) im DSE-Text. Liefert positives Signal + "JC-Konstrukt dokumentiert" — verhindert False-Positive + "JC nicht erwaehnt obwohl Kooperation mit Konzern-Schwester". + +Alle drei liefern dict shape {"severity": ...} oder positive-signal-dict. +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + +_REJECT_LABEL_KEYS = ( + "alle ablehnen", "ablehnen", "reject all", "deny all", + "nur notwendige", "nur essenzielle", "nur erforderliche", + "essentials only", "verweigern", "block all", +) + +_SAVE_ONLY_KEYS = ( + "speichern", "auswahl speichern", "save selection", + "auswahl bestaetigen", +) + +_COOKIE_SECTION_HEADINGS = ( + "cookies und tracking", "cookies und vergleichbare technologien", + "cookies und aehnliche technologien", "verwendung von cookies", + "informationen zu cookies", "uebersicht der cookies", + "eingesetzte cookies", "cookies im einsatz", +) + +_VENDOR_HINTS = ( + "speicherdauer", "lebensdauer", "anbieter", "drittanbieter", + "datenempfaenger", "datenkategorie", "rechtsgrundlage", +) + +_JC_PATTERNS = ( + "gemeinsam verantwortlich", "joint controller", + "gemeinsame verantwortung", "art. 26 dsgvo", "art 26 dsgvo", + "vereinbarung gemaess art. 26", "joint-controller-vereinbarung", + "gemeinsame verarbeitung", +) + + +def check_save_only_reject(banner_result: dict) -> dict | None: + """P35 — Banner hat keinen klaren Reject, nur "Speichern".""" + initial = ((banner_result or {}).get("phases") or {}).get("initial") or {} + if not isinstance(initial, dict): + return None + btext = (initial.get("banner_text") or "").lower() + if not btext or len(btext) < 30: + return None + has_clear_reject = any(k in btext for k in _REJECT_LABEL_KEYS) + has_save_only = any(k in btext for k in _SAVE_ONLY_KEYS) + if has_clear_reject or not has_save_only: + return None + return { + "severity": "MEDIUM", + "code": "save_label_ambiguous", + "label": ( + 'Banner verwendet "Speichern" ohne erkennbares "Ablehnen" ' + '— mehrdeutig fuer den Nutzer' + ), + "detail": ( + 'Der Button "Speichern" laesst offen, ob die aktuelle ' + 'Vorauswahl (oft alles aktiv) bestaetigt oder nur die ' + 'getroffene Auswahl uebernommen wird. EDPB 03/2022 empfiehlt ' + 'eindeutige Labels: "Alle akzeptieren" + "Alle ablehnen".' + ), + "legal_basis": "Art. 7 (1) DSGVO + EDPB 03/2022 Guidelines on " + "deceptive design patterns.", + } + + +def check_cookies_in_dse( + doc_texts: dict[str, str], + cookie_doc_missing: bool, +) -> dict | None: + """P77 — DSE hat eigenen Cookie-Abschnitt mit Vendor-Hints.""" + if not cookie_doc_missing: + return None + dse = (doc_texts or {}).get("dse") or "" + if len(dse) < 1000: + return None + dse_lower = dse.lower() + has_heading = any(h in dse_lower for h in _COOKIE_SECTION_HEADINGS) + if not has_heading: + return None + vendor_hint_count = sum(1 for h in _VENDOR_HINTS if h in dse_lower) + if vendor_hint_count < 3: + return None # zu wenig substanziell + return { + "severity": "INFO", # Positives Signal, kein Finding + "code": "cookies_in_dse_accepted", + "label": ( + "Cookie-Informationen sind im Datenschutz-Dokument enthalten " + "(eigener Abschnitt mit Vendor-Hinweisen)" + ), + "detail": ( + "Die Praxis vieler OEM-Sites, Cookies als eigenen Abschnitt " + 'in der DSE zu fuehren (statt als separate Datei), wird als ' + "gleichwertig akzeptiert. Empfehlung trotzdem: separate " + "Cookie-Richtlinie erleichtert kuenftige Aenderungen und " + "Versionierung." + ), + "legal_basis": "Art. 13(1)(c) DSGVO — Form ist nicht vorgegeben, " + "Inhalt muss vollstaendig sein.", + } + + +def check_jc_clause_in_dse(doc_texts: dict[str, str]) -> dict | None: + """P78 — DSE enthaelt Art. 26 JC-Klausel.""" + dse = (doc_texts or {}).get("dse") or "" + if not dse: + return None + dse_lower = dse.lower() + matches = [p for p in _JC_PATTERNS if p in dse_lower] + if not matches: + return None + return { + "severity": "INFO", + "code": "jc_clause_documented", + "label": "Gemeinsame Verantwortlichkeit (Art. 26 DSGVO) im " + "DSE-Text dokumentiert", + "detail": ( + f'Erkannte Signale: {", ".join(sorted(set(matches))[:3])}. ' + 'Das verhindert das False-Positive "JC-Konstrukt nicht ' + 'erwaehnt" bei Sites mit Konzern-Schwesterunternehmen.' + ), + "legal_basis": "Art. 26 DSGVO + EDPB 7/2020 Guidelines on the " + "concepts of controller and processor.", + } + + +def run_all( + banner_result: dict | None, + doc_texts: dict[str, str] | None, + cookie_doc_missing: bool = False, +) -> list[dict]: + findings: list[dict] = [] + try: + f = check_save_only_reject(banner_result or {}) + if f: + findings.append(f) + except Exception as e: + logger.warning("P35 save_only_reject failed: %s", e) + try: + f = check_cookies_in_dse(doc_texts or {}, cookie_doc_missing) + if f: + findings.append(f) + except Exception as e: + logger.warning("P77 cookies_in_dse failed: %s", e) + try: + f = check_jc_clause_in_dse(doc_texts or {}) + if f: + findings.append(f) + except Exception as e: + logger.warning("P78 jc_clause failed: %s", e) + return findings + + +def build_signals_block_html(findings: list[dict]) -> str: + if not findings: + return "" + pos = [f for f in findings if f.get("severity") == "INFO"] + neg = [f for f in findings if f.get("severity") != "INFO"] + items: list[str] = [] + for f in neg + pos: + sev = f.get("severity", "MEDIUM") + if sev == "INFO": + color = "#16a34a" + tag = "✓ POSITIV" + elif sev == "HIGH": + color = "#dc2626" + tag = "HOCH" + else: + color = "#d97706" + tag = "MITTEL" + items.append( + f'
  • ' + f'[{tag}] {f.get("label","")}' + f'
    {f.get("detail","")}
    ' + f'
    ' + f'{f.get("legal_basis","")}
  • ' + ) + return ( + '
    ' + '
    ' + 'Weitere Textsignale
    ' + '
    ' + ) diff --git a/backend-compliance/compliance/services/industry_benchmark.py b/backend-compliance/compliance/services/industry_benchmark.py new file mode 100644 index 00000000..befcad43 --- /dev/null +++ b/backend-compliance/compliance/services/industry_benchmark.py @@ -0,0 +1,117 @@ +""" +P86 — Branchen-Benchmark. + +Vergleicht den eigenen Compliance-Score mit dem Branchen-Median aus +allen bisherigen Snapshots derselben industry (P79 scan_context). +Liefert: "Sie 42% — Automotive-Median 58% (Stichprobe: 12 Sites)". + +Wird in der Mail-Composition direkt unter dem Score im GF-1-Pager +gerendert. Mindest-Stichprobe = 3 vergleichbare Snapshots, sonst skip. + +Heuristik fuer Score-Extraktion aus banner_result: +- banner_result.completeness_pct ODER +- banner_result.correctness_pct ODER +- 100 - len(banner_checks.violations) * 5 als Fallback. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from sqlalchemy import text +from sqlalchemy.orm import Session + +logger = logging.getLogger(__name__) + +_MIN_SAMPLE = 3 + + +def _extract_score(banner_result: dict | None) -> float | None: + if not isinstance(banner_result, dict): + return None + for key in ("compliance_score", "completeness_pct", "correctness_pct"): + v = banner_result.get(key) + if isinstance(v, (int, float)): + return float(v) + bc = banner_result.get("banner_checks") or {} + if isinstance(bc, dict): + viols = bc.get("violations") or [] + if isinstance(viols, list): + return max(0.0, 100.0 - len(viols) * 5) + return None + + +def compute_benchmark( + db: Session, + industry: str, + current_score: float | None, + current_check_id: str, +) -> dict | None: + if not industry or current_score is None: + return None + # Snapshots mit gleicher industry in scan_context. + rows = db.execute(text( + """ + SELECT banner_result FROM compliance.compliance_check_snapshots + WHERE check_id != :cid + AND scan_context IS NOT NULL + AND scan_context->>'industry' = :ind + ORDER BY created_at DESC + LIMIT 50 + """ + ), {"cid": current_check_id, "ind": industry}).fetchall() + scores: list[float] = [] + for r in rows: + br = r[0] + if isinstance(br, str): + try: + br = json.loads(br) + except Exception: + continue + s = _extract_score(br) + if s is not None: + scores.append(s) + if len(scores) < _MIN_SAMPLE: + return None + scores.sort() + n = len(scores) + median = scores[n // 2] if n % 2 else (scores[n // 2 - 1] + scores[n // 2]) / 2 + pct_lower = round(sum(1 for s in scores if s < current_score) / n * 100) + return { + "industry": industry, + "current": round(current_score, 1), + "median": round(median, 1), + "sample_size": n, + "percentile": pct_lower, # 80 = besser als 80% der Branche + } + + +def build_benchmark_html(bench: dict) -> str: + if not bench: + return "" + delta = bench["current"] - bench["median"] + if delta >= 5: + color = "#16a34a" + verdict = "ueber dem Branchen-Median" + elif delta <= -5: + color = "#dc2626" + verdict = "unter dem Branchen-Median" + else: + color = "#ca8a04" + verdict = "etwa auf Branchen-Median" + return ( + '
    ' + f'Branchen-Vergleich ({bench["industry"]}): ' + f'Ihr Score {bench["current"]:.1f} ' + f'({verdict}, ' + f'Median {bench["median"]:.1f}). ' + f'Sie sind besser als ' + f'{bench["percentile"]}% der bisher von uns gepruften ' + f'{bench["sample_size"]} Sites in dieser Branche.' + '
    ' + )