""" HTML email report builder for document checks. Generates a styled HTML report similar to the frontend ChecklistView, including L1/L2 check hierarchy, progress bars, and actionable hints. """ from __future__ import annotations import logging import re from typing import TYPE_CHECKING logger = logging.getLogger(__name__) if TYPE_CHECKING: from .agent_doc_check_routes import CheckItem, DocCheckResult def _bar(pct: int, color: str) -> str: bg = {"green": "#22c55e", "yellow": "#eab308", "red": "#ef4444", "blue": "#60a5fa"} c = bg.get(color, "#60a5fa") return ( f'
' f'
' f'
{pct}%' ) def _icon(passed: bool, skipped: bool = False) -> str: if skipped: return '' if passed: return '' return '' def _first_sentence(text: str, max_chars: int = 300) -> str: """Erster vollstaendiger Satz statt erste Zeile — robust gegen mehrzeilige Fix-Texte die mit Bullet-Listen anfangen.""" if not text: return "" # Suche Satz-Endezeichen vor max_chars snippet = text[:max_chars] m = re.search(r"^(.+?[\.\?\!])(?:\s|$)", snippet, re.DOTALL) if m: first = m.group(1).strip() # Wenn der "Satz" eine Variant-Header wie "Variante A:" ist, nimm # weiter — der echte Inhalt kommt erst danach if re.fullmatch(r"(Variante [A-Z]\s*\([^\)]+\):?|Beispiel\s*\d*:?)", first, re.IGNORECASE): rest = text[m.end():].lstrip() return _first_sentence(rest, max_chars) return first # Kein Satz-Endezeichen — nimm bis max_chars line = (text.splitlines() or [""])[0] return line[:max_chars] + ("…" if len(line) > max_chars else "") def _hint_box(hint: str, check_label: str = "", doc_text: str = "", doc_id: str | None = None) -> str: """Hint-Block mit angereichertem Recipe + Doc-Anchor wenn moeglich.""" base = ( f'
{hint}' ) # Recipe + Anker hinzufuegen wenn check_label bekannt if check_label: try: from compliance.services.finding_action_recipes import recipe_for from compliance.services.doc_anchor_locator import locate_anchor rec = recipe_for(check_label) if rec and rec.get("fix_text"): first_sentence = _first_sentence(rec["fix_text"], 300) full = rec["fix_text"] # Statt
ein einfaches Inline-Block-Layout — # robuster bei Plain-Text-Mail-Render more = "" if len(full) > len(first_sentence) + 10: more = ( f'
' f'' f'Vollstaendiger Textbaustein zum Einfuegen:' f'{full}
' ) base += ( f'
' f'Konkrete Massnahme: ' f'{first_sentence}' f'{more}' ) # Anker via Embedding-Locator (mit doc_id-Cache) if doc_text: anchor = locate_anchor(check_label, doc_text, doc_id) if anchor and anchor.get("anchor_phrase") and anchor.get("confidence") != "low": conf_label = anchor.get("confidence", "") conf_badge = ( f' ' f'(Match-Konfidenz {conf_label}, ' f'Score {anchor.get("score", "—")})' ) base += ( f'
' f'Einfuegen: {anchor["position_hint"]}' f'{conf_badge}
' ) elif rec.get("where"): # Kein guter Anchor-Match — zeige generischen Fallback base += ( f'
' f'Einfuegen: {rec["where"]} ' f'' f'(kein eindeutiger Absatz im Dokument gefunden — ' f'Anweisung allgemein)
' ) base += '
' except Exception as e: logger.debug("Hint-box enrichment failed: %s", e) pass # Recipes optional — Hint-Box muss nie crashen base += '
' return base def build_management_summary(results: list[DocCheckResult]) -> str: """Build a plain-language management summary for the CEO/GF. No legal jargon — concrete actions that can be delegated to staff, lawyers, or the DPO. """ ok = [r for r in results if r.completeness_pct == 100 and not r.error] fixable = [r for r in results if 0 < r.completeness_pct < 100 and not r.error] critical = [r for r in results if r.completeness_pct == 0 and not r.error] not_applicable = [r for r in results if r.error and r.error.startswith("Nicht anwendbar")] errors = [r for r in results if r.error and r not in not_applicable] html = [ '
', '

' 'Zusammenfassung fuer die Geschaeftsfuehrung

', ] # Overall status total = len(results) - len(errors) if total == 0: html.append('

Keine Dokumente geprueft.

') return "\n".join(html) na_note = ( f' Zusaetzlich {len(not_applicable)} Dokument{"" if len(not_applicable) == 1 else "e"} ' f'als NICHT ANWENDBAR markiert (kein Direkt-Vertrieb — ' f'OEM-Konfigurator-Pattern).' if not_applicable else "" ) if len(ok) == total: html.append( f'

' f'Alle Dokumente sind vollstaendig. Keine dringenden Massnahmen noetig.' f'{na_note}

' ) else: html.append( f'

' f'{len(ok)} von {total} Dokumenten sind vollstaendig. ' f'{len(fixable)} brauchen Korrekturen' f'{f", {len(critical)} fehlen oder sind unbrauchbar" if critical else ""}.' f'{na_note}

' ) # Concrete actions actions: list[str] = [] for r in results: if r.error or r.completeness_pct == 100: continue failed_checks = [ c for c in r.checks if c.level == 1 and not c.passed and not c.skipped and c.severity != "INFO" ] for c in failed_checks[:3]: # Max 3 per document action = _check_to_action(r.label, c.label, c.hint) if action: actions.append(action) if actions: html.append( '

' 'Konkrete Aufgaben:

' '
    ' ) for a in actions[:10]: # Max 10 actions html.append(f'
  1. {a}
  2. ') html.append('
') html.append('') return "\n".join(html) def _check_to_action(doc_label: str, check_label: str, hint: str) -> str: """Convert a failed check into a plain-language action item.""" # Map technical check labels to business-language actions label_lower = check_label.lower() if "datenschutzbeauftragter" in label_lower or "dsb" in label_lower: return (f"{doc_label}: Ihren Datenschutzbeauftragten " f"mit Kontaktdaten erwaehnen. Pflicht ab 20 Mitarbeitern.") if "beschwerderecht" in label_lower or "art. 77" in label_lower: return (f"{doc_label}: Hinweis auf das Beschwerderecht " f"bei der Aufsichtsbehoerde ergaenzen (Name + Kontakt der Behoerde).") if "betroffenenrechte" in label_lower: return (f"{doc_label}: Alle Betroffenenrechte " f"(Auskunft, Berichtigung, Loeschung, etc.) einzeln auffuehren.") if "verantwortlicher" in label_lower: return (f"{doc_label}: Vollstaendige Firmenbezeichnung " f"mit Rechtsform, Adresse, E-Mail und Telefon eintragen.") if "interessenabwaegung" in label_lower: return (f"{doc_label}: Bei 'berechtigtem Interesse' " f"die Abwaegung dokumentieren. Aufgabe fuer den DSB/Rechtsanwalt.") if "widerrufsbelehrung" in label_lower or "widerruf" in label_lower: return (f"{doc_label}: Gesetzliche Widerrufsbelehrung " f"mit 14-Tage-Frist und Musterformular bereitstellen.") if "loeschkonzept" in label_lower: return (f"{doc_label}: Loeschfristen und -prozess " f"dokumentieren. Aufgabe fuer den DSB.") if "profiling" in label_lower or "art. 22" in label_lower: return (f"{doc_label}: Hinweis ergaenzen ob " f"automatisierte Entscheidungen stattfinden oder nicht.") if "nicht im eingereichten text" in label_lower: return (f"{doc_label}: Das eingereichte Dokument " f"enthaelt nicht den erwarteten Inhalt. Bitte korrekte URL pruefen.") # Generic fallback if hint and len(hint) < 150: return f"{doc_label}: {hint[:120]}" return f"{doc_label}: '{check_label}' muss ergaenzt werden." def build_html_report( results: list[DocCheckResult], cookie_result: dict | None, doc_texts: dict[str, str] | None = None, ) -> str: """Build HTML email report styled like the frontend. `doc_texts` is the doc_type→text dict so hint-boxes can locate the relevant Absatz in the original document for the Einfuege-Empfehlung. """ doc_texts = doc_texts or {} ok_count = sum(1 for r in results if r.completeness_pct == 100) html = [ '
', '

Dokumenten-Pruefung

', f'

' f'{len(results)} Dokumente, {ok_count} vollstaendig

', ] for r in results: _render_document(html, r, doc_texts.get(r.doc_type, "")) if cookie_result: _render_cookie_banner(html, cookie_result) html.append('
') return "\n".join(html) def _render_document(html: list[str], r: DocCheckResult, doc_text: str = "") -> None: pct = r.completeness_pct cpct = r.correctness_pct bar_color = "green" if pct >= 80 else "yellow" if pct >= 50 else "red" status_label = "OK" if pct == 100 else "LUECKENHAFT" if pct >= 50 else "MANGELHAFT" is_missing = bool(r.error) and ( r.error.startswith("Nicht eingereicht") or r.error.startswith("Auf der Website nicht gefunden") ) is_not_applicable = bool(r.error) and r.error.startswith("Nicht anwendbar") if is_missing: status_label = ("NICHT GEFUNDEN" if r.error.startswith("Auf der Website") else "NICHT EINGEREICHT") elif is_not_applicable: status_label = "NICHT ANWENDBAR" elif r.error: status_label = "FEHLER" l1_checks = [c for c in r.checks if c.level == 1] l2_by_parent: dict[str, list[CheckItem]] = {} for c in r.checks: if c.level == 2 and c.parent: l2_by_parent.setdefault(c.parent, []).append(c) l1_passed = sum(1 for c in l1_checks if c.passed) l2_active = [c for c in r.checks if c.level == 2 and not c.skipped] l2_passed = sum(1 for c in l2_active if c.passed) # Header html.append( f'
' f'
' f'
' f'{status_label}' f'{r.label}' f'
' f'{l1_passed}/{len(l1_checks)} Pflichtangaben' ) if l2_active: html.append(f', {l2_passed}/{len(l2_active)} Detailpruefungen') html.append(f'
{_bar(pct, bar_color)}') if cpct and l2_active: html.append(f'
{_bar(cpct, "blue")}') html.append('
') # Body if is_missing: body_msg = ( "Wir haben die Hauptseite durchsucht, aber kein Dokument fuer " "diese Pflichtangabe gefunden. Pruefen Sie, ob es auf der " "Website existiert und tragen Sie die URL manuell nach." if r.error.startswith("Auf der Website") else "Keine URL oder Text fuer dieses Dokument angegeben. " "Tragen Sie die Quelle im Compliance-Check Formular nach, " "um diese Pflichtangabe zu pruefen." ) html.append( '
' + body_msg + '
' ) elif is_not_applicable: html.append( '
' + r.error + '
' ) elif r.error: html.append(f'
{r.error}
') else: html.append('
') for c in l1_checks: _render_l1_check(html, c, l2_by_parent.get(c.id, []), doc_text) # Master-Control aggregation: with 1874 MCs evaluated per run, # rendering every L2 check inline produces ~600 rows per doc and # makes the email unreadable. Show only top-N severe fails plus a # one-line summary. Full results live in /sdk/agent/audit/. from compliance.api.agent_doc_check_scorecard import build_top_fails_html from compliance.services.mc_scorecard import top_fails mc_results = [ {"id": c.id, "label": c.label, "passed": c.passed, "severity": c.severity, "skipped": c.skipped, "hint": c.hint, "regulation": c.regulation} for c in r.checks if c.id.startswith("mc-") ] if mc_results: n_total = len(mc_results) n_passed = sum(1 for x in mc_results if x["passed"]) n_skipped = sum(1 for x in mc_results if x["skipped"]) n_failed = n_total - n_passed - n_skipped html.append( f'
' f'Master-Controls: {n_passed}/' f'{n_total - n_skipped} bestanden ' f'({n_failed} Fail)' f'{f" + {n_skipped} nicht anwendbar" if n_skipped else ""}.' f'
' ) top = top_fails(mc_results, n=10) html.append(build_top_fails_html(top, r.label)) if r.word_count: html.append( f'
' f'{r.word_count} Woerter analysiert
' ) html.append('
') html.append('
') def _render_l1_check( html: list[str], c: CheckItem, children: list[CheckItem], doc_text: str = "", ) -> None: l2_sub = [ch for ch in children if not ch.skipped] l2_passed = sum(1 for ch in l2_sub if ch.passed) style = "color:#991b1b;font-weight:600" if not c.passed else "color:#374151" html.append( f'
{_icon(c.passed)} ' f'{c.label}' ) if l2_sub: html.append(f' ({l2_passed}/{len(l2_sub)})') if not c.passed and c.hint: html.append(_hint_box(c.hint, c.label, doc_text)) html.append('
') for ch in children: if ch.skipped: continue _render_l2_check(html, ch, doc_text) def _render_l2_check(html: list[str], ch: CheckItem, doc_text: str = "") -> None: style = "color:#dc2626;font-weight:500" if not ch.passed else "color:#6b7280" html.append( f'
' f'{_icon(ch.passed)} ' f'{ch.label}' ) if ch.passed and ch.matched_text: html.append( f'
"...{ch.matched_text[:80]}..."
' ) if not ch.passed and ch.hint: html.append(_hint_box(ch.hint, ch.label, doc_text)) html.append('
') def _render_cookie_banner(html: list[str], cookie_result: dict) -> None: html.append( '
' 'Cookie-Banner Pruefung
' f'Banner erkannt: {cookie_result.get("banner_detected", False)}
' f'Anbieter: {cookie_result.get("banner_provider", "unbekannt")}' ) violations = cookie_result.get("banner_checks", {}).get("violations", []) if violations: for v in violations[:10]: html.append(f'
{_icon(False)} {v.get("text", "")[:80]}') else: html.append('
Keine Verstoesse erkannt.') html.append('
') # Re-export the helpers extracted to agent_doc_check_extras.py so existing # callers that did `from .agent_doc_check_report import build_scanned_urls_html` # keep working. from .agent_doc_check_extras import ( # noqa: E402,F401 build_provider_list_html, build_scanned_urls_html, ) def build_profile_html(profile) -> str: """Build a small HTML block summarizing the detected business profile.""" service_tags = ", ".join(profile.detected_services[:10]) or "keine erkannt" flags = [] if profile.has_online_shop: flags.append("Online-Shop") if profile.has_editorial_content: flags.append("Redaktionelle Inhalte") if profile.is_regulated_profession: flags.append(f"Regulierter Beruf ({profile.regulated_profession_type})") if profile.needs_odr: flags.append("ODR-pflichtig") flags_str = ", ".join(flags) or "keine" return ( '
' '

' 'Erkanntes Geschaeftsmodell

' '' f'' f'' f'' f'' f'' f'' f'' f'' '
Typ:{profile.business_type.upper()}' f' ({profile.industry})
Merkmale:{flags_str}
Dienste:{service_tags}
Konfidenz:{int(profile.confidence * 100)}%
' )