feat(compliance-check): auto-discover missing doc types from homepage

When the user leaves some doc-type rows empty, the tool now actively
searches the website for them — only marks 'not found' as last resort.

Flow:
1. User submits N URLs (e.g. just DSI)
2. For each canonical doc_type with no submitted URL/text, the route
   identifies the most-common base (scheme://netloc) from submitted URLs
3. Calls consent-tester /dsi-discovery on the homepage with
   max_documents=15 (180s timeout)
4. Classifies every discovered doc into a canonical doc_type via
   title/URL keyword rules (_DISCOVERY_RULES — covers cookie/widerruf/
   social_media/agb/nutzungsbedingungen/dsb/impressum/dse)
5. Fills matching empty entries with the discovered text, marks
   auto_discovered=True and discovery_attempted=True

Padding now differentiates:
- 'Auf der Website nicht gefunden' — discovery was attempted, no doc
  matched. Amber badge, friendly hint to add URL manually.
- 'Nicht eingereicht — Quelle nicht angegeben' — user gave NO URLs at
  all, nothing to crawl from. Grey badge.

Email + frontend:
- Status labels: NICHT GEFUNDEN (amber) vs NICHT EINGEREICHT (grey)
- 'Gepruefte Quellen' table tags auto-discovered URLs with a small blue
  'auto-entdeckt' badge so GF sees what tool found vs user submitted.

Implementation only runs when ≥1 URL was submitted (no base to crawl
from otherwise). Adds 30-90s for unsubmitted types but avoids the
'just say nicht gefunden' anti-pattern.
This commit is contained in:
Benjamin Admin
2026-05-17 01:14:05 +02:00
parent 79efa54898
commit 525038359a
4 changed files with 193 additions and 19 deletions
@@ -167,7 +167,11 @@ export function ChecklistView({ results }: { results: DocResult[] }) {
</div>
</div>
<div className="flex items-center gap-3 shrink-0 ml-3">
{r.error && r.error.startsWith("Nicht eingereicht") ? (
{r.error && r.error.startsWith("Auf der Website nicht gefunden") ? (
<span className="text-xs text-amber-700 font-medium px-2 py-0.5 bg-amber-100 rounded-full whitespace-nowrap">
Nicht gefunden
</span>
) : r.error && r.error.startsWith("Nicht eingereicht") ? (
<span className="text-xs text-gray-500 font-medium px-2 py-0.5 bg-gray-100 rounded-full whitespace-nowrap">
Nicht eingereicht
</span>
@@ -186,8 +186,18 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
"auto_discovered": False,
"discovery_attempted": False,
})
# Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user
# did NOT submit a URL/text for, try to find it on the homepage of
# the submitted URLs. This bridges the gap between "user knows the
# exact URL" (rare) and "user pasted the homepage" (common).
await _autodiscover_missing(
check_id, doc_entries, doc_texts, url_text_cache,
)
# Step 1b: Section splitting — two cases:
# 1. Same URL used for multiple doc_types → split by heading
# 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows
@@ -334,10 +344,15 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
else:
r.scenario = "import"
# Step 4c: Always render all 8 canonical doc types, even when the
# user left a row blank. Missing types get a placeholder so the
# email + frontend make absent documents immediately visible.
results = _pad_results_with_missing(results)
# Step 4c: Always render all 8 canonical doc types. Missing types
# are differentiated:
# - Discovery was tried but found nothing -> 'Auf der Website
# nicht gefunden' (suggest user provides URL manually)
# - No submitted URLs at all -> 'Nicht eingereicht'
attempted = {
e["doc_type"] for e in doc_entries if e.get("discovery_attempted")
}
results = _pad_results_with_missing(results, discovery_attempted=attempted)
# Step 5: Build report with management summary (95-98%)
_update(check_id, "Report wird erstellt...", 96)
@@ -472,6 +487,136 @@ async def _fetch_text(url: str, doc_type: str = "") -> str:
return ""
async def _autodiscover_missing(
check_id: str,
doc_entries: list[dict],
doc_texts: dict[str, str],
url_text_cache: dict[str, str],
) -> None:
"""For each canonical doc_type the user did not submit, try to find
the corresponding document on the homepage of the site they DID submit.
Modifies doc_entries in place: fills text/url/word_count and sets
`auto_discovered=True`. Marks `discovery_attempted=True` on every
missing entry (even when nothing was found) so the report can
distinguish 'Nicht eingereicht' from 'Auf der Website nicht gefunden'.
"""
from urllib.parse import urlparse
# Which canonical types are still empty (no text, no submitted URL)?
missing = {
e["doc_type"] for e in doc_entries
if not e.get("text") and not (e.get("url") or "").strip()
}
if not missing:
return
# Pick the most common base (scheme://netloc) from submitted URLs.
bases: dict[str, int] = {}
for e in doc_entries:
u = (e.get("url") or "").strip()
if u and "://" in u:
p = urlparse(u)
base = f"{p.scheme}://{p.netloc}"
bases[base] = bases.get(base, 0) + 1
if not bases:
# No submitted URL at all — nothing to crawl from.
for e in doc_entries:
if not e.get("text") and not (e.get("url") or "").strip():
e["discovery_attempted"] = False
return
base = max(bases, key=bases.get) + "/"
_update(
check_id,
f"Suche fehlende Dokumente auf {urlparse(base).netloc}...",
18,
)
try:
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": base, "max_documents": 15},
timeout=180.0,
)
if resp.status_code != 200:
logger.warning("auto-discovery: HTTP %d for %s", resp.status_code, base)
discovered: list[dict] = []
else:
discovered = resp.json().get("documents", [])
except Exception as e:
logger.warning("auto-discovery failed for %s: %s", base, e)
discovered = []
# Classify each discovered doc into a canonical doc_type
by_type: dict[str, dict] = {}
for d in discovered:
title = (d.get("title") or "").lower()
url = (d.get("url") or "").lower()
wc = d.get("word_count") or 0
if wc < 100:
continue
canon = _classify_discovered_doc(title, url)
if canon and canon in missing and canon not in by_type:
by_type[canon] = d
# Fill matching entries
filled = 0
for entry in doc_entries:
dt = entry["doc_type"]
entry["discovery_attempted"] = dt in missing
if dt not in missing or dt not in by_type:
continue
d = by_type[dt]
full = d.get("full_text") or d.get("text_preview") or ""
if len(full.split()) < 100:
continue
entry["text"] = full
entry["url"] = d.get("url", "")
entry["word_count"] = len(full.split())
entry["auto_discovered"] = True
doc_texts[dt] = full
filled += 1
logger.info(
"auto-discovered %s on %s: %s (%d words)",
dt, base, d.get("url", "")[:80], entry["word_count"],
)
if filled:
logger.info("auto-discovery: filled %d/%d missing types", filled, len(missing))
# Title/URL keywords → canonical doc_type. Order matters: most-specific first.
_DISCOVERY_RULES: list[tuple[str, tuple[str, ...]]] = [
("cookie", ("cookie", "kuche", "biscuit", "cookies-")),
("widerruf", ("widerruf", "rueckgabe", "rückgabe", "cancellation",
"right-of-withdrawal", "ruecktritts", "rücktritts")),
("social_media", ("social-media", "soziale-medien", "social_media",
"social-media-policy")),
("agb", ("/agb", "geschaeftsbedingungen", "geschäftsbedingungen",
"terms-and-conditions", "general-terms")),
("nutzungsbedingungen", ("nutzungsbedingung", "terms-of-use",
"nutzungsordnung", "terms-of-service")),
("dsb", ("datenschutzbeauftragt", "data-protection-officer",
"dpo-contact", "/dsb")),
("impressum", ("impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung", "legal-disclaimer-pool")),
("dse", ("data-privacy", "datenschutz", "data-protection",
"privacy-policy", "privacy-notice", "dsgvo",
"data_privacy", "datenschutzinformation")),
]
def _classify_discovered_doc(title: str, url: str) -> str | None:
"""Map a discovered doc (by its title + URL) to one of our 8 canonical types."""
haystack = f"{title} {url}"
for canon, keywords in _DISCOVERY_RULES:
if any(kw in haystack for kw in keywords):
return canon
return None
async def _check_single(
text: str, doc_type: str, label: str, url: str,
word_count: int, use_agent: bool,
@@ -544,21 +689,25 @@ async def _check_single(
)
def _pad_results_with_missing(results: list) -> list:
def _pad_results_with_missing(
results: list,
discovery_attempted: set[str] | None = None,
) -> list:
"""Ensure every canonical doc_type has an entry in the results list.
Doc_types the user did not submit get a placeholder DocCheckResult
with a 'Nicht eingereicht' marker so the email + frontend make
absent documents visible at a glance.
Doc_types the user did not submit AND auto-discovery did not find get
a placeholder DocCheckResult. The error message distinguishes:
- 'Auf der Website nicht gefunden' (discovery was attempted)
- 'Nicht eingereicht' (no submitted URLs to crawl from)
Preserves the canonical ordering from _ALL_DOC_TYPES so the report
layout is stable.
"""
from .agent_doc_check_routes import DocCheckResult
attempted = discovery_attempted or set()
by_type: dict[str, object] = {}
for r in results:
# Map alias types (datenschutz/privacy → dse) to the canonical key
canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type
by_type[canon] = r
@@ -567,6 +716,11 @@ def _pad_results_with_missing(results: list) -> list:
if dt in by_type:
ordered.append(by_type[dt])
continue
if dt in attempted:
msg = ("Auf der Website nicht gefunden — bitte URL des "
"Dokuments manuell eintragen, falls vorhanden")
else:
msg = "Nicht eingereicht — Quelle nicht angegeben"
ordered.append(DocCheckResult(
label=_doc_type_label(dt),
url="",
@@ -576,11 +730,10 @@ def _pad_results_with_missing(results: list) -> list:
correctness_pct=0,
checks=[],
findings_count=0,
error="Nicht eingereicht — Quelle nicht angegeben",
error=msg,
scenario="missing",
))
# Append any results not in _ALL_DOC_TYPES (e.g. avv, dsfa) at the end
extras = [r for r in results
if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse")
not in _ALL_DOC_TYPES]
@@ -30,15 +30,21 @@ def build_scanned_urls_html(doc_entries: list[dict]) -> str:
seen.add(url)
label = _doc_type_label(entry.get("doc_type", ""))
words = entry.get("word_count") or 0
auto = entry.get("auto_discovered")
try:
netloc = urlparse(url).netloc.lower().lstrip("www.")
if netloc:
domains.setdefault(netloc, []).append(label)
except Exception:
pass
badge = ('<span style="display:inline-block;margin-left:6px;'
'background:#dbeafe;color:#1e40af;font-size:10px;'
'padding:1px 6px;border-radius:8px;font-family:sans-serif">'
'auto-entdeckt</span>') if auto else ""
rows.append(
f'<tr>'
f'<td style="padding:3px 12px 3px 0;color:#475569;font-size:12px">{label}</td>'
f'<td style="padding:3px 12px 3px 0;color:#475569;font-size:12px">'
f'{label}{badge}</td>'
f'<td style="padding:3px 12px 3px 0;font-size:12px;'
f'font-family:ui-monospace,monospace;color:#1e293b;word-break:break-all">'
f'<a href="{url}" style="color:#2563eb;text-decoration:none">{url}</a></td>'
@@ -184,9 +184,14 @@ def _render_document(html: list[str], r: DocCheckResult) -> None:
cpct = r.correctness_pct
bar_color = "green" if pct >= 80 else "yellow" if pct >= 50 else "red"
status_label = "OK" if pct == 100 else "LUECKENHAFT" if pct >= 50 else "MANGELHAFT"
is_missing = bool(r.error) and r.error.startswith("Nicht eingereicht")
is_missing = bool(r.error) and (
r.error.startswith("Nicht eingereicht")
or r.error.startswith("Auf der Website nicht gefunden")
)
if is_missing:
status_label = "NICHT EINGEREICHT"
status_label = ("NICHT GEFUNDEN"
if r.error.startswith("Auf der Website")
else "NICHT EINGEREICHT")
elif r.error:
status_label = "FEHLER"
@@ -220,13 +225,19 @@ def _render_document(html: list[str], r: DocCheckResult) -> None:
# Body
if is_missing:
body_msg = (
"Wir haben die Hauptseite durchsucht, aber kein Dokument fuer "
"diese Pflichtangabe gefunden. Pruefen Sie, ob es auf der "
"Website existiert und tragen Sie die URL manuell nach."
if r.error.startswith("Auf der Website")
else "Keine URL oder Text fuer dieses Dokument angegeben. "
"Tragen Sie die Quelle im Compliance-Check Formular nach, "
"um diese Pflichtangabe zu pruefen."
)
html.append(
'<div style="padding:12px 16px;color:#6b7280;font-size:12px;'
'background:#fafafa;border-top:1px solid #f3f4f6">'
'Keine URL oder Text fuer dieses Dokument angegeben. '
'Tragen Sie die Quelle im Compliance-Check Formular nach, '
'um diese Pflichtangabe zu pruefen.'
'</div>'
+ body_msg + '</div>'
)
elif r.error:
html.append(f'<div style="padding:12px 16px;color:#991b1b">{r.error}</div>')