Files
breakpilot-compliance/backend-compliance/compliance/api/agent_compliance_check_routes.py
T
Benjamin Admin b2b4d77877 fix(auto-discovery): compute missing against canonical 8 types, not submitted
Frontend filters out empty doc rows -> req.documents only contains the
N submitted entries (3 in BMW case). The old auto-discovery loop
computed 'missing' as 'entries in doc_entries with empty text', which
was always empty for those N entries -> discovery never fired.

Fix:
- missing = _ALL_DOC_TYPES - {canonical doc_types in doc_entries}
- For each missing type, APPEND a new entry to doc_entries with
  discovery_attempted=True. If a discovered doc matched, fill text/url
  and set auto_discovered=True.
- Check loop: skip entries with no URL and no text (let padding label
  them). Entries with URL but no text keep the 'Kein Text' error so the
  user sees fetch failures explicitly.
2026-05-17 01:28:51 +02:00

928 lines
37 KiB
Python

"""
Unified Compliance Check Routes — check all documents in one request.
POST /compliance/agent/extract-text — extract text from a URL
POST /compliance/agent/compliance-check — unified check for all documents
GET /compliance/agent/compliance-check/{check_id} — poll status
"""
import asyncio
import logging
import os
import uuid as _uuid
from dataclasses import asdict
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
# In-memory job store (same pattern as doc-check)
_compliance_check_jobs: dict[str, dict] = {}
# ── Models ───────────────────────────────────────────────────────────
class ExtractTextRequest(BaseModel):
url: str
class DocumentInput(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, avv, loeschkonzept, etc.
url: str = ""
text: str = "" # text has priority over URL
class ComplianceCheckRequest(BaseModel):
documents: list[DocumentInput]
use_agent: bool = False
recipient: str = "dsb@breakpilot.local"
class ComplianceCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class ComplianceCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
progress_pct: int = 0
result: dict | None = None
error: str = ""
# ── Extract text endpoint ────────────────────────────────────────────
@router.post("/extract-text")
async def extract_text(req: ExtractTextRequest):
"""Extract text from a URL via consent-tester DSI discovery.
Merges all documents found on the page (sub-pages, accordions, etc.)
"""
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": req.url, "max_documents": 5},
timeout=300.0,
)
if resp.status_code != 200:
return {
"text": "", "word_count": 0, "title": "",
"error": f"HTTP {resp.status_code} von Consent-Tester",
}
data = resp.json()
docs = data.get("documents", [])
if not docs:
return {
"text": "", "word_count": 0, "title": "",
"error": "Kein Text extrahierbar",
}
# Merge all documents (handles multi-page DSIs like BMW)
texts = []
for doc in docs:
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
if t and len(t) > 50:
texts.append(t)
text = "\n\n".join(texts) if texts else ""
title = docs[0].get("title", "") or docs[0].get("doc_type", "")
word_count = len(text.split())
return {
"text": text,
"word_count": word_count,
"title": title,
"error": "",
}
except Exception as e:
logger.warning("extract-text failed for %s: %s", req.url, e)
return {
"text": "", "word_count": 0, "title": "",
"error": str(e)[:200],
}
# ── Unified compliance check ────────────────────────────────────────
@router.post("/compliance-check")
async def start_compliance_check(req: ComplianceCheckRequest):
"""Start async compliance check for all documents."""
check_id = str(_uuid.uuid4())[:8]
_compliance_check_jobs[check_id] = {
"status": "running",
"progress": "Pruefung gestartet...",
"progress_pct": 0,
"result": None,
"error": "",
}
asyncio.create_task(_run_compliance_check(check_id, req))
return ComplianceCheckStartResponse(check_id=check_id, status="running")
@router.get("/compliance-check/{check_id}")
async def get_compliance_check_status(check_id: str):
"""Poll compliance check status."""
job = _compliance_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return ComplianceCheckStatusResponse(
check_id=check_id,
status=job["status"],
progress=job.get("progress", ""),
progress_pct=job.get("progress_pct", 0),
result=job.get("result"),
error=job.get("error", ""),
)
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
"""Background task: check all documents with business-profile context."""
try:
from compliance.services.business_profiler import detect_business_profile
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
from .agent_doc_check_report import build_html_report
# Step 1: Resolve texts (fetch from URL if needed) — 0-30%
_update(check_id, "Texte werden geladen...", 1)
doc_texts: dict[str, str] = {}
doc_entries: list[dict] = []
# Cache fetched URLs to detect duplicates
url_text_cache: dict[str, str] = {}
n_docs = max(1, len(req.documents))
for i, doc in enumerate(req.documents):
pct = int(1 + (i / n_docs) * 29)
_update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct)
text = doc.text
if not text and doc.url:
url_key = doc.url.strip().rstrip("/").lower()
if url_key in url_text_cache:
text = url_text_cache[url_key]
else:
text = await _fetch_text(doc.url, doc_type=doc.doc_type)
if text:
url_text_cache[url_key] = text
if text:
doc_texts[doc.doc_type] = text
doc_entries.append({
"doc_type": doc.doc_type,
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
"auto_discovered": False,
"discovery_attempted": False,
})
# Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user
# did NOT submit a URL/text for, try to find it on the homepage of
# the submitted URLs. This bridges the gap between "user knows the
# exact URL" (rare) and "user pasted the homepage" (common).
await _autodiscover_missing(
check_id, doc_entries, doc_texts, url_text_cache,
)
# Step 1b: Section splitting — two cases:
# 1. Same URL used for multiple doc_types → split by heading
# 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows
from compliance.services.section_splitter import (
split_shared_texts, auto_fill_from_dsi, cross_search_documents,
)
split_shared_texts(doc_entries, url_text_cache)
auto_fill_from_dsi(doc_entries)
# Step 1c: Cross-document search — find doc_types in wrong documents (30-35%)
_update(check_id, "Dokumente werden uebergreifend durchsucht...", 32)
placement_findings = cross_search_documents(doc_entries)
# Refresh doc_texts after all splitting/searching
for entry in doc_entries:
if entry.get("text"):
doc_texts[entry["doc_type"]] = entry["text"]
# Step 2: Detect business profile (35-40%)
_update(check_id, "Geschaeftsmodell wird erkannt...", 37)
profile = await detect_business_profile(doc_texts)
profile_dict = asdict(profile)
# Step 3: Check each document
results: list[DocCheckResult] = []
total_findings = 0
use_agent_flag = req.use_agent or os.getenv(
"COMPLIANCE_USE_AGENT", "false"
).lower() == "true"
# Filter out doc_types that don't apply to this business profile
skip_types = _get_skip_types(profile)
# Document checks: 40-80%
n_entries = max(1, len(doc_entries))
for i, entry in enumerate(doc_entries):
text = entry["text"]
doc_type = entry["doc_type"]
label = _doc_type_label(doc_type)
url = entry["url"]
if doc_type in skip_types:
results.append(DocCheckResult(
label=label, url=url, doc_type=doc_type,
error=skip_types[doc_type],
))
continue
pct = int(40 + (i / n_entries) * 40)
_update(check_id, f"Pruefen {i+1}/{n_entries}: {label}...", pct)
if not text or len(text) < 50:
# Empty entry — either from auto-discovery padding (no URL
# to fetch) or from a fetch that returned nothing. If there
# was a URL we keep the error so the user knows the fetch
# failed; otherwise let the padding step label it
# 'Nicht eingereicht' / 'Auf der Website nicht gefunden'.
if (entry.get("url") or "").strip():
results.append(DocCheckResult(
label=label, url=url, doc_type=doc_type,
error="Kein Text vorhanden oder zu kurz",
))
continue
result = await _check_single(
text, doc_type, label, url,
entry["word_count"], use_agent_flag,
)
# Apply profile context filter
result = _apply_profile_filter(result, profile, doc_type)
# Add placement findings — but only if the regex checks confirm
# the text doesn't match. If completeness >= 50%, the text IS the
# right doc_type despite missing cross-search keywords.
if result.completeness_pct < 50:
for pf in placement_findings:
if pf.get("doc_type") == doc_type:
result.checks.insert(0, CheckItem(**{
k: v for k, v in pf.items() if k != "doc_type"
}))
results.append(result)
total_findings += result.findings_count
# Step 3b: Banner-Check (automatic, uses first URL or homepage)
banner_result = None
banner_url = req.documents[0].url if req.documents and req.documents[0].url else ""
# Use the homepage (strip path) for banner check
if banner_url:
from urllib.parse import urlparse
parsed = urlparse(banner_url)
banner_url = f"{parsed.scheme}://{parsed.netloc}"
if banner_url:
_update(check_id, "Cookie-Banner wird geprueft...", 82)
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": banner_url, "timeout_per_phase": 10},
)
if resp.status_code == 200:
banner_result = resp.json()
except Exception as e:
logger.warning("Banner check failed: %s", e)
# Step 3c: Cross-check Banner vs Cookie-Richtlinie (88-90%)
if banner_result and "cookie" in doc_texts:
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...", 89)
cross_findings = _cross_check_banner_vs_cookie(
banner_result, doc_texts["cookie"],
)
if cross_findings:
for r in results:
if r.doc_type == "cookie":
for cf in cross_findings:
r.checks.append(CheckItem(**cf))
l2 = [c for c in r.checks if c.level == 2 and not c.skipped]
l2p = sum(1 for c in l2 if c.passed)
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
# Step 3d: TCF Vendor cross-check against DSI
tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else []
vvt_entries: list[dict] = []
if tcf_vendors and "dse" in doc_texts:
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...", 91)
from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi
from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt
vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"])
if vendor_findings:
for r in results:
if r.doc_type == "dse":
for vf in vendor_findings:
r.checks.append(CheckItem(**vf))
vvt_entries = map_vendors_to_vvt(tcf_vendors)
# Step 4: Extract profile hints from documents (92-95%)
_update(check_id, "Profil wird aus Dokumenten extrahiert...", 93)
from compliance.services.profile_extractor import extract_profile_from_documents
extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
# Step 4b: Determine scenario per document
for r in results:
if r.error:
r.scenario = "skip"
elif r.completeness_pct < 30:
r.scenario = "regenerate"
elif r.completeness_pct < 95:
r.scenario = "fix"
else:
r.scenario = "import"
# Step 4c: Always render all 8 canonical doc types. Missing types
# are differentiated:
# - Discovery was tried but found nothing -> 'Auf der Website
# nicht gefunden' (suggest user provides URL manually)
# - No submitted URLs at all -> 'Nicht eingereicht'
attempted = {
e["doc_type"] for e in doc_entries if e.get("discovery_attempted")
}
results = _pad_results_with_missing(results, discovery_attempted=attempted)
# Step 5: Build report with management summary (95-98%)
_update(check_id, "Report wird erstellt...", 96)
from .agent_doc_check_report import (
build_management_summary,
build_scanned_urls_html,
build_provider_list_html,
)
summary_html = build_management_summary(results)
scanned_html = build_scanned_urls_html(doc_entries)
providers_html = build_provider_list_html(banner_result, vvt_entries)
report_html = build_html_report(results, None)
profile_html = _build_profile_html(profile)
full_html = (
summary_html + scanned_html + profile_html
+ providers_html + report_html
)
# Step 6: Send email — derive site name primarily from entered URL.
# The extracted_profile.companyName is often noisy (e.g. picks up
# juris.de from legal references). Domain-derived name is more
# predictable for the GF email subject.
doc_count = len([r for r in results if not r.error])
url_company = _company_name_from_url(doc_entries)
domain = _extract_domain(doc_entries)
site_name = url_company or domain or "Unbekannt"
_update(check_id, "E-Mail wird versendet...", 98)
email_result = send_email(
recipient=req.recipient,
subject=f"[COMPLIANCE-CHECK] {site_name}{doc_count} Dokumente geprueft",
body_html=full_html,
)
# Step 7: Store result
response = {
"results": [_result_to_dict(r) for r in results],
"business_profile": profile_dict,
"extracted_profile": extracted_profile,
"banner_result": {
"detected": banner_result.get("banner_detected", False) if banner_result else False,
"provider": banner_result.get("banner_provider", "") if banner_result else "",
"violations": len(banner_result.get("banner_checks", {}).get("violations", [])) if banner_result else 0,
"tcf_vendor_count": len(tcf_vendors),
} if banner_result else None,
"tcf_vendors": vvt_entries if tcf_vendors else [],
"total_documents": len(results),
"total_findings": total_findings,
"email_status": email_result.get("status", "failed"),
"checked_at": datetime.now(timezone.utc).isoformat(),
}
_compliance_check_jobs[check_id]["status"] = "completed"
_compliance_check_jobs[check_id]["result"] = response
_compliance_check_jobs[check_id]["progress"] = "Fertig"
_compliance_check_jobs[check_id]["progress_pct"] = 100
except Exception as e:
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
_compliance_check_jobs[check_id]["status"] = "failed"
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
def _update(check_id: str, msg: str, pct: int | None = None):
job = _compliance_check_jobs[check_id]
job["progress"] = msg
if pct is not None:
job["progress_pct"] = max(0, min(100, int(pct)))
async def _fetch_text(url: str, doc_type: str = "") -> str:
"""Fetch text from URL via consent-tester, with HTTP fallback.
1. Try consent-tester (Playwright) — handles JS-heavy SPAs
2. Fallback: direct HTTP fetch + HTML strip — fast, works for SSR pages
doc_type controls how aggressively we follow sub-links — cookie/dse
pages prefer self-extract only (CMP capture is authoritative); legal/
imprint pages need to follow sub-pages (Versicherungsvermittler etc).
"""
# 1. Consent-tester (Playwright-based, full JS rendering).
# max_documents depends on doc_type:
# - cookie/dse/social_media: self-extract (often + CMP capture) is
# authoritative, sub-pages dilute the policy text. max=1.
# - impressum/agb/widerruf/nutzungsbedingungen/dsb: BMW & similar
# enterprise sites split this across 3-4 short sub-pages
# (Versicherungsvermittler, Aufsicht, Berufsrecht). max=3 follows
# them. The 15s networkidle bail (dsi_helpers) keeps timing safe.
short_extract_types = {"cookie", "dse", "datenschutz", "privacy", "social_media"}
max_docs = 1 if (doc_type or "") in short_extract_types else 3
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": max_docs},
timeout=120.0,
)
if resp.status_code == 200:
docs = resp.json().get("documents", [])
if docs:
texts = []
for doc in docs:
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
if t and len(t) > 50:
texts.append(t)
merged = "\n\n".join(texts)
if merged and len(merged.split()) > 100:
if len(texts) > 1:
logger.info("Merged %d docs from %s (%d words)",
len(texts), url, len(merged.split()))
return merged
except Exception as e:
logger.warning("Consent-tester fetch failed for %s: %s", url, e)
# 2. Fallback: direct HTTP fetch (works for SSR pages like BMW)
try:
import re as _re
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url)
if resp.status_code == 200 and "text/html" in resp.headers.get("content-type", ""):
html = resp.text
# Strip HTML tags, decode entities
text = _re.sub(r"<script[^>]*>.*?</script>", " ", html, flags=_re.DOTALL | _re.IGNORECASE)
text = _re.sub(r"<style[^>]*>.*?</style>", " ", text, flags=_re.DOTALL | _re.IGNORECASE)
text = _re.sub(r"<[^>]+>", " ", text)
text = _re.sub(r"\s+", " ", text).strip()
if len(text.split()) > 100:
logger.info("HTTP fallback for %s: %d words", url, len(text.split()))
return text
except Exception as e:
logger.warning("HTTP fallback failed for %s: %s", url, e)
return ""
async def _autodiscover_missing(
check_id: str,
doc_entries: list[dict],
doc_texts: dict[str, str],
url_text_cache: dict[str, str],
) -> None:
"""For each canonical doc_type the user did not submit, try to find
the corresponding document on the homepage of the site they DID submit.
Modifies doc_entries in place: fills text/url/word_count and sets
`auto_discovered=True`. Marks `discovery_attempted=True` on every
missing entry (even when nothing was found) so the report can
distinguish 'Nicht eingereicht' from 'Auf der Website nicht gefunden'.
"""
from urllib.parse import urlparse
# Submitted doc_types (those the user actually entered URL or text for).
submitted_types = {
e["doc_type"] for e in doc_entries
if e.get("text") or (e.get("url") or "").strip()
}
# Map alias types to canonical
submitted_canon = {
"dse" if t in ("datenschutz", "privacy") else t for t in submitted_types
}
# Missing = canonical types the user did NOT submit
missing = set(_ALL_DOC_TYPES) - submitted_canon
if not missing:
return
# Pick the most common base (scheme://netloc) from submitted URLs.
bases: dict[str, int] = {}
for e in doc_entries:
u = (e.get("url") or "").strip()
if u and "://" in u:
p = urlparse(u)
base = f"{p.scheme}://{p.netloc}"
bases[base] = bases.get(base, 0) + 1
if not bases:
# No submitted URL at all — nothing to crawl from. Add empty
# placeholders (with discovery_attempted=False) so the padding
# step renders them as 'Nicht eingereicht' (not 'Nicht gefunden').
for dt in missing:
doc_entries.append({
"doc_type": dt, "url": "", "text": "", "word_count": 0,
"auto_discovered": False, "discovery_attempted": False,
})
return
base = max(bases, key=bases.get) + "/"
_update(
check_id,
f"Suche fehlende Dokumente auf {urlparse(base).netloc}...",
18,
)
try:
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": base, "max_documents": 15},
timeout=180.0,
)
if resp.status_code != 200:
logger.warning("auto-discovery: HTTP %d for %s", resp.status_code, base)
discovered: list[dict] = []
else:
discovered = resp.json().get("documents", [])
except Exception as e:
logger.warning("auto-discovery failed for %s: %s", base, e)
discovered = []
# Classify each discovered doc into a canonical doc_type
by_type: dict[str, dict] = {}
for d in discovered:
title = (d.get("title") or "").lower()
url = (d.get("url") or "").lower()
wc = d.get("word_count") or 0
if wc < 100:
continue
canon = _classify_discovered_doc(title, url)
if canon and canon in missing and canon not in by_type:
by_type[canon] = d
# Append a new entry for every missing canonical type. Auto-discovered
# ones get the text/URL filled; ungratched ones stay empty so the
# padding step renders them as 'Auf der Website nicht gefunden'.
filled = 0
for dt in missing:
new_entry: dict = {
"doc_type": dt, "url": "", "text": "", "word_count": 0,
"auto_discovered": False, "discovery_attempted": True,
}
d = by_type.get(dt)
if d:
full = d.get("full_text") or d.get("text_preview") or ""
if len(full.split()) >= 100:
new_entry["text"] = full
new_entry["url"] = d.get("url", "")
new_entry["word_count"] = len(full.split())
new_entry["auto_discovered"] = True
doc_texts[dt] = full
filled += 1
logger.info(
"auto-discovered %s on %s: %s (%d words)",
dt, base, d.get("url", "")[:80], new_entry["word_count"],
)
doc_entries.append(new_entry)
logger.info(
"auto-discovery: filled %d/%d missing types from %s",
filled, len(missing), base,
)
# Title/URL keywords → canonical doc_type. Order matters: most-specific first.
_DISCOVERY_RULES: list[tuple[str, tuple[str, ...]]] = [
("cookie", ("cookie", "kuche", "biscuit", "cookies-")),
("widerruf", ("widerruf", "rueckgabe", "rückgabe", "cancellation",
"right-of-withdrawal", "ruecktritts", "rücktritts")),
("social_media", ("social-media", "soziale-medien", "social_media",
"social-media-policy")),
("agb", ("/agb", "geschaeftsbedingungen", "geschäftsbedingungen",
"terms-and-conditions", "general-terms")),
("nutzungsbedingungen", ("nutzungsbedingung", "terms-of-use",
"nutzungsordnung", "terms-of-service")),
("dsb", ("datenschutzbeauftragt", "data-protection-officer",
"dpo-contact", "/dsb")),
("impressum", ("impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung", "legal-disclaimer-pool")),
("dse", ("data-privacy", "datenschutz", "data-protection",
"privacy-policy", "privacy-notice", "dsgvo",
"data_privacy", "datenschutzinformation")),
]
def _classify_discovered_doc(title: str, url: str) -> str | None:
"""Map a discovered doc (by its title + URL) to one of our 8 canonical types."""
haystack = f"{title} {url}"
for canon, keywords in _DISCOVERY_RULES:
if any(kw in haystack for kw in keywords):
return canon
return None
async def _check_single(
text: str, doc_type: str, label: str, url: str,
word_count: int, use_agent: bool,
):
"""Run regex + MC checks on a single document."""
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
# Regex checklist
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
correctness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
level=c.get("level", 1), parent=c.get("parent"),
skipped=c.get("skipped", False), hint=c.get("hint", ""),
))
completeness = f.get("completeness_pct", 0)
correctness = f.get("correctness_pct", 0)
# Master Control checks (top 20 by severity to avoid noise)
try:
mc_results = await check_document_with_controls(
text, doc_type, label, max_controls=20, use_agent=use_agent,
)
if mc_results:
for mc in mc_results:
all_checks.append(CheckItem(**mc))
l2 = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2 if c.passed)
correctness = round(l2_passed / len(l2) * 100) if l2 else 0
except Exception as e:
logger.warning("MC check skipped for %s: %s", label, e)
# LLM verification of regex fails
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
if failed:
try:
from compliance.services.doc_checks.llm_verify import verify_failed_checks
overturns = await verify_failed_checks(
text,
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
label,
)
for c in all_checks:
if c.id in overturns and overturns[c.id]["overturned"]:
c.passed = True
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("LLM verification skipped: %s", e)
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness, correctness_pct=correctness,
checks=all_checks, findings_count=len(non_score),
)
def _pad_results_with_missing(
results: list,
discovery_attempted: set[str] | None = None,
) -> list:
"""Ensure every canonical doc_type has an entry in the results list.
Doc_types the user did not submit AND auto-discovery did not find get
a placeholder DocCheckResult. The error message distinguishes:
- 'Auf der Website nicht gefunden' (discovery was attempted)
- 'Nicht eingereicht' (no submitted URLs to crawl from)
Preserves the canonical ordering from _ALL_DOC_TYPES so the report
layout is stable.
"""
from .agent_doc_check_routes import DocCheckResult
attempted = discovery_attempted or set()
by_type: dict[str, object] = {}
for r in results:
canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type
by_type[canon] = r
ordered: list = []
for dt in _ALL_DOC_TYPES:
if dt in by_type:
ordered.append(by_type[dt])
continue
if dt in attempted:
msg = ("Auf der Website nicht gefunden — bitte URL des "
"Dokuments manuell eintragen, falls vorhanden")
else:
msg = "Nicht eingereicht — Quelle nicht angegeben"
ordered.append(DocCheckResult(
label=_doc_type_label(dt),
url="",
doc_type=dt,
word_count=0,
completeness_pct=0,
correctness_pct=0,
checks=[],
findings_count=0,
error=msg,
scenario="missing",
))
extras = [r for r in results
if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse")
not in _ALL_DOC_TYPES]
ordered.extend(extras)
return ordered
_COMPOUND_TLDS = {
"co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in",
"com.au", "com.br", "com.mx", "com.tr", "com.sg",
}
def _extract_domain(doc_entries: list[dict]) -> str | None:
"""Extract base domain (without www) from first URL."""
for entry in doc_entries:
url = entry.get("url", "")
if url and "://" in url:
from urllib.parse import urlparse
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
return host or None
return None
def _company_name_from_url(doc_entries: list[dict]) -> str | None:
"""Derive a display company name from the entered URLs.
Heuristic: take the second-level domain (e.g. "bmw" from "www.bmw.de"),
uppercase short acronyms (<=4 chars, no hyphens), title-case the rest.
Examples:
www.bmw.de -> BMW
mercedes-benz.de -> Mercedes-Benz
shop.example.co.uk -> Example
juris.de -> Juris
"""
from urllib.parse import urlparse
for entry in doc_entries:
url = entry.get("url", "")
if not url or "://" not in url:
continue
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
parts = host.split(".")
if len(parts) < 2:
continue
# Handle compound TLDs (.co.uk etc.)
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
sld = parts[-3]
else:
sld = parts[-2]
if not sld:
continue
if len(sld) <= 4 and "-" not in sld:
return sld.upper()
return "-".join(p.capitalize() for p in sld.split("-"))
return None
def _get_skip_types(profile) -> dict[str, str]:
"""Doc_types to skip entirely. Currently empty — we check everything
and flag irrelevant items as INFO instead of skipping."""
return {}
def _apply_profile_filter(result, profile, doc_type: str):
"""Adjust INFO-level checks based on business profile context.
For example: ODR check only relevant for B2C online shops.
"""
from .agent_doc_check_routes import CheckItem
for check in result.checks:
cid = check.id.lower()
# ODR/OS-Link: relevant ONLY for B2C online shops. The check's
# default hint is written for B2B (it explains why it's not
# relevant) — for B2C we must replace it with action-oriented
# guidance, otherwise the report contradicts itself.
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
if profile.needs_odr:
if not check.passed:
check.hint = (
"Als B2C-Anbieter muessen Sie nach Art. 14 EU-VO 524/2013 "
"auf die OS-Plattform (https://ec.europa.eu/consumers/odr) "
"verlinken — klickbarer Link, nicht nur Text. Zusaetzlich "
"§36 VSBG: angeben, ob Sie an Verbraucher-"
"Streitbeilegungsverfahren teilnehmen (oder nicht)."
)
else:
check.skipped = True
check.hint = "Nicht relevant (kein B2C Online-Shop)"
# Widerruf: Flag entire document as unnecessary for B2B
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
check.severity = "INFO"
if not check.passed:
check.hint = (
"Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
"(§355 BGB gilt nur fuer Verbrauchervertraege). "
"Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
"Ihrer Website, da sie Verwirrung stiften kann."
)
# Regulated profession: check for Kammer info
if "kammer" in cid or "berufsordnung" in check.label.lower():
if not profile.is_regulated_profession:
check.skipped = True
check.hint = "Nicht relevant (kein regulierter Beruf)"
return result
# ── Helpers ──────────────────────────────────────────────────────────
_DOC_TYPE_LABELS = {
"dse": "Datenschutzerklaerung",
"datenschutz": "Datenschutzerklaerung",
"privacy": "Datenschutzerklaerung",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"avv": "Auftragsverarbeitung",
"loeschkonzept": "Loeschkonzept",
"dsfa": "Datenschutz-Folgenabschaetzung",
"social_media": "Social Media Datenschutz",
"nutzungsbedingungen": "Nutzungsbedingungen",
"dsb": "DSB-Kontakt",
}
# Canonical 8 doc types in the same order as the frontend ComplianceCheckTab.
# The route pads `results` to always contain an entry for each — even if
# the user did not submit a URL — so the email + frontend always show
# the complete checklist (missing rows marked as 'Nicht eingereicht').
_ALL_DOC_TYPES = [
"dse", "impressum", "social_media", "cookie",
"agb", "nutzungsbedingungen", "widerruf", "dsb",
]
def _doc_type_label(doc_type: str) -> str:
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
def _result_to_dict(r) -> dict:
"""Convert DocCheckResult to JSON-serializable dict."""
fields = ("id", "label", "passed", "severity", "matched_text",
"level", "parent", "skipped", "hint")
return {
"label": r.label, "url": r.url, "doc_type": r.doc_type,
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
"correctness_pct": r.correctness_pct,
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
"findings_count": r.findings_count, "error": r.error,
"scenario": getattr(r, "scenario", ""),
}
def _build_profile_html(profile) -> str:
from .agent_doc_check_report import build_profile_html
return build_profile_html(profile)
# Cross-check extracted to compliance.services.banner_cookie_cross_check
from compliance.services.banner_cookie_cross_check import cross_check_banner_vs_cookie as _cross_check_banner_vs_cookie