diff --git a/admin-compliance/app/sdk/agent/_components/ChecklistView.tsx b/admin-compliance/app/sdk/agent/_components/ChecklistView.tsx index 17ff82c8..e6ade776 100644 --- a/admin-compliance/app/sdk/agent/_components/ChecklistView.tsx +++ b/admin-compliance/app/sdk/agent/_components/ChecklistView.tsx @@ -167,7 +167,11 @@ export function ChecklistView({ results }: { results: DocResult[] }) {
- {r.error && r.error.startsWith("Nicht eingereicht") ? ( + {r.error && r.error.startsWith("Auf der Website nicht gefunden") ? ( + + Nicht gefunden + + ) : r.error && r.error.startsWith("Nicht eingereicht") ? ( Nicht eingereicht diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index ee99d960..926cb240 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -186,8 +186,18 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): "url": doc.url, "text": text, "word_count": len(text.split()) if text else 0, + "auto_discovered": False, + "discovery_attempted": False, }) + # Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user + # did NOT submit a URL/text for, try to find it on the homepage of + # the submitted URLs. This bridges the gap between "user knows the + # exact URL" (rare) and "user pasted the homepage" (common). + await _autodiscover_missing( + check_id, doc_entries, doc_texts, url_text_cache, + ) + # Step 1b: Section splitting — two cases: # 1. Same URL used for multiple doc_types → split by heading # 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows @@ -334,10 +344,15 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): else: r.scenario = "import" - # Step 4c: Always render all 8 canonical doc types, even when the - # user left a row blank. Missing types get a placeholder so the - # email + frontend make absent documents immediately visible. - results = _pad_results_with_missing(results) + # Step 4c: Always render all 8 canonical doc types. Missing types + # are differentiated: + # - Discovery was tried but found nothing -> 'Auf der Website + # nicht gefunden' (suggest user provides URL manually) + # - No submitted URLs at all -> 'Nicht eingereicht' + attempted = { + e["doc_type"] for e in doc_entries if e.get("discovery_attempted") + } + results = _pad_results_with_missing(results, discovery_attempted=attempted) # Step 5: Build report with management summary (95-98%) _update(check_id, "Report wird erstellt...", 96) @@ -472,6 +487,136 @@ async def _fetch_text(url: str, doc_type: str = "") -> str: return "" +async def _autodiscover_missing( + check_id: str, + doc_entries: list[dict], + doc_texts: dict[str, str], + url_text_cache: dict[str, str], +) -> None: + """For each canonical doc_type the user did not submit, try to find + the corresponding document on the homepage of the site they DID submit. + + Modifies doc_entries in place: fills text/url/word_count and sets + `auto_discovered=True`. Marks `discovery_attempted=True` on every + missing entry (even when nothing was found) so the report can + distinguish 'Nicht eingereicht' from 'Auf der Website nicht gefunden'. + """ + from urllib.parse import urlparse + + # Which canonical types are still empty (no text, no submitted URL)? + missing = { + e["doc_type"] for e in doc_entries + if not e.get("text") and not (e.get("url") or "").strip() + } + if not missing: + return + + # Pick the most common base (scheme://netloc) from submitted URLs. + bases: dict[str, int] = {} + for e in doc_entries: + u = (e.get("url") or "").strip() + if u and "://" in u: + p = urlparse(u) + base = f"{p.scheme}://{p.netloc}" + bases[base] = bases.get(base, 0) + 1 + if not bases: + # No submitted URL at all — nothing to crawl from. + for e in doc_entries: + if not e.get("text") and not (e.get("url") or "").strip(): + e["discovery_attempted"] = False + return + + base = max(bases, key=bases.get) + "/" + _update( + check_id, + f"Suche fehlende Dokumente auf {urlparse(base).netloc}...", + 18, + ) + + try: + async with httpx.AsyncClient(timeout=180.0) as client: + resp = await client.post( + f"{CONSENT_TESTER_URL}/dsi-discovery", + json={"url": base, "max_documents": 15}, + timeout=180.0, + ) + if resp.status_code != 200: + logger.warning("auto-discovery: HTTP %d for %s", resp.status_code, base) + discovered: list[dict] = [] + else: + discovered = resp.json().get("documents", []) + except Exception as e: + logger.warning("auto-discovery failed for %s: %s", base, e) + discovered = [] + + # Classify each discovered doc into a canonical doc_type + by_type: dict[str, dict] = {} + for d in discovered: + title = (d.get("title") or "").lower() + url = (d.get("url") or "").lower() + wc = d.get("word_count") or 0 + if wc < 100: + continue + canon = _classify_discovered_doc(title, url) + if canon and canon in missing and canon not in by_type: + by_type[canon] = d + + # Fill matching entries + filled = 0 + for entry in doc_entries: + dt = entry["doc_type"] + entry["discovery_attempted"] = dt in missing + if dt not in missing or dt not in by_type: + continue + d = by_type[dt] + full = d.get("full_text") or d.get("text_preview") or "" + if len(full.split()) < 100: + continue + entry["text"] = full + entry["url"] = d.get("url", "") + entry["word_count"] = len(full.split()) + entry["auto_discovered"] = True + doc_texts[dt] = full + filled += 1 + logger.info( + "auto-discovered %s on %s: %s (%d words)", + dt, base, d.get("url", "")[:80], entry["word_count"], + ) + + if filled: + logger.info("auto-discovery: filled %d/%d missing types", filled, len(missing)) + + +# Title/URL keywords → canonical doc_type. Order matters: most-specific first. +_DISCOVERY_RULES: list[tuple[str, tuple[str, ...]]] = [ + ("cookie", ("cookie", "kuche", "biscuit", "cookies-")), + ("widerruf", ("widerruf", "rueckgabe", "rückgabe", "cancellation", + "right-of-withdrawal", "ruecktritts", "rücktritts")), + ("social_media", ("social-media", "soziale-medien", "social_media", + "social-media-policy")), + ("agb", ("/agb", "geschaeftsbedingungen", "geschäftsbedingungen", + "terms-and-conditions", "general-terms")), + ("nutzungsbedingungen", ("nutzungsbedingung", "terms-of-use", + "nutzungsordnung", "terms-of-service")), + ("dsb", ("datenschutzbeauftragt", "data-protection-officer", + "dpo-contact", "/dsb")), + ("impressum", ("impressum", "imprint", "legal-notice", "site-notice", + "anbieterkennzeichnung", "legal-disclaimer-pool")), + ("dse", ("data-privacy", "datenschutz", "data-protection", + "privacy-policy", "privacy-notice", "dsgvo", + "data_privacy", "datenschutzinformation")), +] + + +def _classify_discovered_doc(title: str, url: str) -> str | None: + """Map a discovered doc (by its title + URL) to one of our 8 canonical types.""" + haystack = f"{title} {url}" + for canon, keywords in _DISCOVERY_RULES: + if any(kw in haystack for kw in keywords): + return canon + return None + + async def _check_single( text: str, doc_type: str, label: str, url: str, word_count: int, use_agent: bool, @@ -544,21 +689,25 @@ async def _check_single( ) -def _pad_results_with_missing(results: list) -> list: +def _pad_results_with_missing( + results: list, + discovery_attempted: set[str] | None = None, +) -> list: """Ensure every canonical doc_type has an entry in the results list. - Doc_types the user did not submit get a placeholder DocCheckResult - with a 'Nicht eingereicht' marker so the email + frontend make - absent documents visible at a glance. + Doc_types the user did not submit AND auto-discovery did not find get + a placeholder DocCheckResult. The error message distinguishes: + - 'Auf der Website nicht gefunden' (discovery was attempted) + - 'Nicht eingereicht' (no submitted URLs to crawl from) Preserves the canonical ordering from _ALL_DOC_TYPES so the report layout is stable. """ from .agent_doc_check_routes import DocCheckResult + attempted = discovery_attempted or set() by_type: dict[str, object] = {} for r in results: - # Map alias types (datenschutz/privacy → dse) to the canonical key canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type by_type[canon] = r @@ -567,6 +716,11 @@ def _pad_results_with_missing(results: list) -> list: if dt in by_type: ordered.append(by_type[dt]) continue + if dt in attempted: + msg = ("Auf der Website nicht gefunden — bitte URL des " + "Dokuments manuell eintragen, falls vorhanden") + else: + msg = "Nicht eingereicht — Quelle nicht angegeben" ordered.append(DocCheckResult( label=_doc_type_label(dt), url="", @@ -576,11 +730,10 @@ def _pad_results_with_missing(results: list) -> list: correctness_pct=0, checks=[], findings_count=0, - error="Nicht eingereicht — Quelle nicht angegeben", + error=msg, scenario="missing", )) - # Append any results not in _ALL_DOC_TYPES (e.g. avv, dsfa) at the end extras = [r for r in results if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse") not in _ALL_DOC_TYPES] diff --git a/backend-compliance/compliance/api/agent_doc_check_extras.py b/backend-compliance/compliance/api/agent_doc_check_extras.py index 74578347..c9c121af 100644 --- a/backend-compliance/compliance/api/agent_doc_check_extras.py +++ b/backend-compliance/compliance/api/agent_doc_check_extras.py @@ -30,15 +30,21 @@ def build_scanned_urls_html(doc_entries: list[dict]) -> str: seen.add(url) label = _doc_type_label(entry.get("doc_type", "")) words = entry.get("word_count") or 0 + auto = entry.get("auto_discovered") try: netloc = urlparse(url).netloc.lower().lstrip("www.") if netloc: domains.setdefault(netloc, []).append(label) except Exception: pass + badge = ('' + 'auto-entdeckt') if auto else "" rows.append( f'' - f'{label}' + f'' + f'{label}{badge}' f'' f'{url}' diff --git a/backend-compliance/compliance/api/agent_doc_check_report.py b/backend-compliance/compliance/api/agent_doc_check_report.py index 739ec8b9..0435ab0b 100644 --- a/backend-compliance/compliance/api/agent_doc_check_report.py +++ b/backend-compliance/compliance/api/agent_doc_check_report.py @@ -184,9 +184,14 @@ def _render_document(html: list[str], r: DocCheckResult) -> None: cpct = r.correctness_pct bar_color = "green" if pct >= 80 else "yellow" if pct >= 50 else "red" status_label = "OK" if pct == 100 else "LUECKENHAFT" if pct >= 50 else "MANGELHAFT" - is_missing = bool(r.error) and r.error.startswith("Nicht eingereicht") + is_missing = bool(r.error) and ( + r.error.startswith("Nicht eingereicht") + or r.error.startswith("Auf der Website nicht gefunden") + ) if is_missing: - status_label = "NICHT EINGEREICHT" + status_label = ("NICHT GEFUNDEN" + if r.error.startswith("Auf der Website") + else "NICHT EINGEREICHT") elif r.error: status_label = "FEHLER" @@ -220,13 +225,19 @@ def _render_document(html: list[str], r: DocCheckResult) -> None: # Body if is_missing: + body_msg = ( + "Wir haben die Hauptseite durchsucht, aber kein Dokument fuer " + "diese Pflichtangabe gefunden. Pruefen Sie, ob es auf der " + "Website existiert und tragen Sie die URL manuell nach." + if r.error.startswith("Auf der Website") + else "Keine URL oder Text fuer dieses Dokument angegeben. " + "Tragen Sie die Quelle im Compliance-Check Formular nach, " + "um diese Pflichtangabe zu pruefen." + ) html.append( '
' - 'Keine URL oder Text fuer dieses Dokument angegeben. ' - 'Tragen Sie die Quelle im Compliance-Check Formular nach, ' - 'um diese Pflichtangabe zu pruefen.' - '
' + + body_msg + '
' ) elif r.error: html.append(f'
{r.error}
')