bc21480a2a
Always-show-8 (user-requested): - agent_compliance_check_routes.py: _pad_results_with_missing pads the results list to always include all 8 canonical doc_types in canonical order. Missing types get a placeholder DocCheckResult with error= 'Nicht eingereicht' + scenario='missing'. - agent_doc_check_report.py: NICHT EINGEREICHT status label (neutral), friendly grey body block instead of red error. - ChecklistView.tsx: 'Nicht eingereicht' chip (neutral grey, not red 'Fehler'); SCENARIO_LABELS adds missing entry + header chip counter. Impressum-Regression fix (#18): - _fetch_text(url, doc_type): cookie/dse/social_media -> max_documents=1 (CMP capture authoritative, sub-pages dilute). Other types -> =3 (Impressum needs Versicherungsvermittler, Aufsicht, Berufsrecht sub- pages). 15s networkidle bail keeps timing safe. ODR/Verbraucherstreitbeilegung filter (#19): - _apply_profile_filter: when profile.needs_odr=True (B2C), override the check's default B2B-oriented hint with action-oriented B2C guidance pointing at Art. 14 EU-VO 524/2013 + §36 VSBG. Previously the check contradicted itself: 'profile says B2C' + hint 'only relevant for B2C online vendors'. Registergericht regex (#20): - impressum_checks.py: accept colon/dot/dash between keyword and city (BMW writes 'registergericht: münchen hrb 42243'). Add 'sitz und registergericht: X' as separate pattern. Industry detection (#21): - business_profiler.py: 'automotive' keywords broadened (antriebs, motor, leasing, werkstatt, probefahrt, plus brand names BMW/Mercedes/ Audi/VW/Porsche/Opel). 'it_services' keywords narrowed — software/ cloud/hosting are mentioned in every privacy policy and were biasing the result toward IT for any tech-aware company.
754 lines
30 KiB
Python
754 lines
30 KiB
Python
"""
|
|
Unified Compliance Check Routes — check all documents in one request.
|
|
|
|
POST /compliance/agent/extract-text — extract text from a URL
|
|
POST /compliance/agent/compliance-check — unified check for all documents
|
|
GET /compliance/agent/compliance-check/{check_id} — poll status
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import uuid as _uuid
|
|
from dataclasses import asdict
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
from compliance.services.smtp_sender import send_email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
|
|
|
|
# In-memory job store (same pattern as doc-check)
|
|
_compliance_check_jobs: dict[str, dict] = {}
|
|
|
|
|
|
# ── Models ───────────────────────────────────────────────────────────
|
|
|
|
class ExtractTextRequest(BaseModel):
|
|
url: str
|
|
|
|
|
|
class DocumentInput(BaseModel):
|
|
doc_type: str # dse, agb, impressum, cookie, widerruf, avv, loeschkonzept, etc.
|
|
url: str = ""
|
|
text: str = "" # text has priority over URL
|
|
|
|
|
|
class ComplianceCheckRequest(BaseModel):
|
|
documents: list[DocumentInput]
|
|
use_agent: bool = False
|
|
recipient: str = "dsb@breakpilot.local"
|
|
|
|
|
|
class ComplianceCheckStartResponse(BaseModel):
|
|
check_id: str
|
|
status: str = "running"
|
|
|
|
|
|
class ComplianceCheckStatusResponse(BaseModel):
|
|
check_id: str
|
|
status: str
|
|
progress: str = ""
|
|
progress_pct: int = 0
|
|
result: dict | None = None
|
|
error: str = ""
|
|
|
|
|
|
# ── Extract text endpoint ────────────────────────────────────────────
|
|
|
|
@router.post("/extract-text")
|
|
async def extract_text(req: ExtractTextRequest):
|
|
"""Extract text from a URL via consent-tester DSI discovery.
|
|
|
|
Merges all documents found on the page (sub-pages, accordions, etc.)
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/dsi-discovery",
|
|
json={"url": req.url, "max_documents": 5},
|
|
timeout=300.0,
|
|
)
|
|
if resp.status_code != 200:
|
|
return {
|
|
"text": "", "word_count": 0, "title": "",
|
|
"error": f"HTTP {resp.status_code} von Consent-Tester",
|
|
}
|
|
|
|
data = resp.json()
|
|
docs = data.get("documents", [])
|
|
|
|
if not docs:
|
|
return {
|
|
"text": "", "word_count": 0, "title": "",
|
|
"error": "Kein Text extrahierbar",
|
|
}
|
|
|
|
# Merge all documents (handles multi-page DSIs like BMW)
|
|
texts = []
|
|
for doc in docs:
|
|
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
|
|
if t and len(t) > 50:
|
|
texts.append(t)
|
|
text = "\n\n".join(texts) if texts else ""
|
|
title = docs[0].get("title", "") or docs[0].get("doc_type", "")
|
|
word_count = len(text.split())
|
|
|
|
return {
|
|
"text": text,
|
|
"word_count": word_count,
|
|
"title": title,
|
|
"error": "",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning("extract-text failed for %s: %s", req.url, e)
|
|
return {
|
|
"text": "", "word_count": 0, "title": "",
|
|
"error": str(e)[:200],
|
|
}
|
|
|
|
|
|
# ── Unified compliance check ────────────────────────────────────────
|
|
|
|
@router.post("/compliance-check")
|
|
async def start_compliance_check(req: ComplianceCheckRequest):
|
|
"""Start async compliance check for all documents."""
|
|
check_id = str(_uuid.uuid4())[:8]
|
|
_compliance_check_jobs[check_id] = {
|
|
"status": "running",
|
|
"progress": "Pruefung gestartet...",
|
|
"progress_pct": 0,
|
|
"result": None,
|
|
"error": "",
|
|
}
|
|
asyncio.create_task(_run_compliance_check(check_id, req))
|
|
return ComplianceCheckStartResponse(check_id=check_id, status="running")
|
|
|
|
|
|
@router.get("/compliance-check/{check_id}")
|
|
async def get_compliance_check_status(check_id: str):
|
|
"""Poll compliance check status."""
|
|
job = _compliance_check_jobs.get(check_id)
|
|
if not job:
|
|
return {"check_id": check_id, "status": "not_found"}
|
|
return ComplianceCheckStatusResponse(
|
|
check_id=check_id,
|
|
status=job["status"],
|
|
progress=job.get("progress", ""),
|
|
progress_pct=job.get("progress_pct", 0),
|
|
result=job.get("result"),
|
|
error=job.get("error", ""),
|
|
)
|
|
|
|
|
|
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
|
"""Background task: check all documents with business-profile context."""
|
|
try:
|
|
from compliance.services.business_profiler import detect_business_profile
|
|
from compliance.services.doc_checks.runner import check_document_completeness
|
|
from compliance.services.rag_document_checker import check_document_with_controls
|
|
from .agent_doc_check_routes import CheckItem, DocCheckResult
|
|
from .agent_doc_check_report import build_html_report
|
|
|
|
# Step 1: Resolve texts (fetch from URL if needed) — 0-30%
|
|
_update(check_id, "Texte werden geladen...", 1)
|
|
doc_texts: dict[str, str] = {}
|
|
doc_entries: list[dict] = []
|
|
|
|
# Cache fetched URLs to detect duplicates
|
|
url_text_cache: dict[str, str] = {}
|
|
|
|
n_docs = max(1, len(req.documents))
|
|
for i, doc in enumerate(req.documents):
|
|
pct = int(1 + (i / n_docs) * 29)
|
|
_update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct)
|
|
text = doc.text
|
|
if not text and doc.url:
|
|
url_key = doc.url.strip().rstrip("/").lower()
|
|
if url_key in url_text_cache:
|
|
text = url_text_cache[url_key]
|
|
else:
|
|
text = await _fetch_text(doc.url, doc_type=doc.doc_type)
|
|
if text:
|
|
url_text_cache[url_key] = text
|
|
if text:
|
|
doc_texts[doc.doc_type] = text
|
|
doc_entries.append({
|
|
"doc_type": doc.doc_type,
|
|
"url": doc.url,
|
|
"text": text,
|
|
"word_count": len(text.split()) if text else 0,
|
|
})
|
|
|
|
# Step 1b: Section splitting — two cases:
|
|
# 1. Same URL used for multiple doc_types → split by heading
|
|
# 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows
|
|
from compliance.services.section_splitter import (
|
|
split_shared_texts, auto_fill_from_dsi, cross_search_documents,
|
|
)
|
|
split_shared_texts(doc_entries, url_text_cache)
|
|
auto_fill_from_dsi(doc_entries)
|
|
|
|
# Step 1c: Cross-document search — find doc_types in wrong documents (30-35%)
|
|
_update(check_id, "Dokumente werden uebergreifend durchsucht...", 32)
|
|
placement_findings = cross_search_documents(doc_entries)
|
|
|
|
# Refresh doc_texts after all splitting/searching
|
|
for entry in doc_entries:
|
|
if entry.get("text"):
|
|
doc_texts[entry["doc_type"]] = entry["text"]
|
|
|
|
# Step 2: Detect business profile (35-40%)
|
|
_update(check_id, "Geschaeftsmodell wird erkannt...", 37)
|
|
profile = await detect_business_profile(doc_texts)
|
|
profile_dict = asdict(profile)
|
|
|
|
# Step 3: Check each document
|
|
results: list[DocCheckResult] = []
|
|
total_findings = 0
|
|
use_agent_flag = req.use_agent or os.getenv(
|
|
"COMPLIANCE_USE_AGENT", "false"
|
|
).lower() == "true"
|
|
|
|
# Filter out doc_types that don't apply to this business profile
|
|
skip_types = _get_skip_types(profile)
|
|
|
|
# Document checks: 40-80%
|
|
n_entries = max(1, len(doc_entries))
|
|
for i, entry in enumerate(doc_entries):
|
|
text = entry["text"]
|
|
doc_type = entry["doc_type"]
|
|
label = _doc_type_label(doc_type)
|
|
url = entry["url"]
|
|
|
|
if doc_type in skip_types:
|
|
results.append(DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
error=skip_types[doc_type],
|
|
))
|
|
continue
|
|
|
|
pct = int(40 + (i / n_entries) * 40)
|
|
_update(check_id, f"Pruefen {i+1}/{n_entries}: {label}...", pct)
|
|
|
|
if not text or len(text) < 50:
|
|
results.append(DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
error="Kein Text vorhanden oder zu kurz",
|
|
))
|
|
continue
|
|
|
|
result = await _check_single(
|
|
text, doc_type, label, url,
|
|
entry["word_count"], use_agent_flag,
|
|
)
|
|
|
|
# Apply profile context filter
|
|
result = _apply_profile_filter(result, profile, doc_type)
|
|
|
|
# Add placement findings — but only if the regex checks confirm
|
|
# the text doesn't match. If completeness >= 50%, the text IS the
|
|
# right doc_type despite missing cross-search keywords.
|
|
if result.completeness_pct < 50:
|
|
for pf in placement_findings:
|
|
if pf.get("doc_type") == doc_type:
|
|
result.checks.insert(0, CheckItem(**{
|
|
k: v for k, v in pf.items() if k != "doc_type"
|
|
}))
|
|
|
|
results.append(result)
|
|
total_findings += result.findings_count
|
|
|
|
# Step 3b: Banner-Check (automatic, uses first URL or homepage)
|
|
banner_result = None
|
|
banner_url = req.documents[0].url if req.documents and req.documents[0].url else ""
|
|
# Use the homepage (strip path) for banner check
|
|
if banner_url:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(banner_url)
|
|
banner_url = f"{parsed.scheme}://{parsed.netloc}"
|
|
if banner_url:
|
|
_update(check_id, "Cookie-Banner wird geprueft...", 82)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/scan",
|
|
json={"url": banner_url, "timeout_per_phase": 10},
|
|
)
|
|
if resp.status_code == 200:
|
|
banner_result = resp.json()
|
|
except Exception as e:
|
|
logger.warning("Banner check failed: %s", e)
|
|
|
|
# Step 3c: Cross-check Banner vs Cookie-Richtlinie (88-90%)
|
|
if banner_result and "cookie" in doc_texts:
|
|
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...", 89)
|
|
cross_findings = _cross_check_banner_vs_cookie(
|
|
banner_result, doc_texts["cookie"],
|
|
)
|
|
if cross_findings:
|
|
for r in results:
|
|
if r.doc_type == "cookie":
|
|
for cf in cross_findings:
|
|
r.checks.append(CheckItem(**cf))
|
|
l2 = [c for c in r.checks if c.level == 2 and not c.skipped]
|
|
l2p = sum(1 for c in l2 if c.passed)
|
|
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
|
|
|
|
# Step 3d: TCF Vendor cross-check against DSI
|
|
tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else []
|
|
vvt_entries: list[dict] = []
|
|
if tcf_vendors and "dse" in doc_texts:
|
|
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...", 91)
|
|
from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi
|
|
from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt
|
|
vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"])
|
|
if vendor_findings:
|
|
for r in results:
|
|
if r.doc_type == "dse":
|
|
for vf in vendor_findings:
|
|
r.checks.append(CheckItem(**vf))
|
|
vvt_entries = map_vendors_to_vvt(tcf_vendors)
|
|
|
|
# Step 4: Extract profile hints from documents (92-95%)
|
|
_update(check_id, "Profil wird aus Dokumenten extrahiert...", 93)
|
|
from compliance.services.profile_extractor import extract_profile_from_documents
|
|
extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
|
|
|
|
# Step 4b: Determine scenario per document
|
|
for r in results:
|
|
if r.error:
|
|
r.scenario = "skip"
|
|
elif r.completeness_pct < 30:
|
|
r.scenario = "regenerate"
|
|
elif r.completeness_pct < 95:
|
|
r.scenario = "fix"
|
|
else:
|
|
r.scenario = "import"
|
|
|
|
# Step 4c: Always render all 8 canonical doc types, even when the
|
|
# user left a row blank. Missing types get a placeholder so the
|
|
# email + frontend make absent documents immediately visible.
|
|
results = _pad_results_with_missing(results)
|
|
|
|
# Step 5: Build report with management summary (95-98%)
|
|
_update(check_id, "Report wird erstellt...", 96)
|
|
from .agent_doc_check_report import (
|
|
build_management_summary,
|
|
build_scanned_urls_html,
|
|
build_provider_list_html,
|
|
)
|
|
summary_html = build_management_summary(results)
|
|
scanned_html = build_scanned_urls_html(doc_entries)
|
|
providers_html = build_provider_list_html(banner_result, vvt_entries)
|
|
report_html = build_html_report(results, None)
|
|
profile_html = _build_profile_html(profile)
|
|
full_html = (
|
|
summary_html + scanned_html + profile_html
|
|
+ providers_html + report_html
|
|
)
|
|
|
|
# Step 6: Send email — derive site name primarily from entered URL.
|
|
# The extracted_profile.companyName is often noisy (e.g. picks up
|
|
# juris.de from legal references). Domain-derived name is more
|
|
# predictable for the GF email subject.
|
|
doc_count = len([r for r in results if not r.error])
|
|
url_company = _company_name_from_url(doc_entries)
|
|
domain = _extract_domain(doc_entries)
|
|
site_name = url_company or domain or "Unbekannt"
|
|
_update(check_id, "E-Mail wird versendet...", 98)
|
|
email_result = send_email(
|
|
recipient=req.recipient,
|
|
subject=f"[COMPLIANCE-CHECK] {site_name} — {doc_count} Dokumente geprueft",
|
|
body_html=full_html,
|
|
)
|
|
|
|
# Step 7: Store result
|
|
response = {
|
|
"results": [_result_to_dict(r) for r in results],
|
|
"business_profile": profile_dict,
|
|
"extracted_profile": extracted_profile,
|
|
"banner_result": {
|
|
"detected": banner_result.get("banner_detected", False) if banner_result else False,
|
|
"provider": banner_result.get("banner_provider", "") if banner_result else "",
|
|
"violations": len(banner_result.get("banner_checks", {}).get("violations", [])) if banner_result else 0,
|
|
"tcf_vendor_count": len(tcf_vendors),
|
|
} if banner_result else None,
|
|
"tcf_vendors": vvt_entries if tcf_vendors else [],
|
|
"total_documents": len(results),
|
|
"total_findings": total_findings,
|
|
"email_status": email_result.get("status", "failed"),
|
|
"checked_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
_compliance_check_jobs[check_id]["status"] = "completed"
|
|
_compliance_check_jobs[check_id]["result"] = response
|
|
_compliance_check_jobs[check_id]["progress"] = "Fertig"
|
|
_compliance_check_jobs[check_id]["progress_pct"] = 100
|
|
|
|
except Exception as e:
|
|
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
|
|
_compliance_check_jobs[check_id]["status"] = "failed"
|
|
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
|
|
|
|
|
|
def _update(check_id: str, msg: str, pct: int | None = None):
|
|
job = _compliance_check_jobs[check_id]
|
|
job["progress"] = msg
|
|
if pct is not None:
|
|
job["progress_pct"] = max(0, min(100, int(pct)))
|
|
|
|
|
|
async def _fetch_text(url: str, doc_type: str = "") -> str:
|
|
"""Fetch text from URL via consent-tester, with HTTP fallback.
|
|
|
|
1. Try consent-tester (Playwright) — handles JS-heavy SPAs
|
|
2. Fallback: direct HTTP fetch + HTML strip — fast, works for SSR pages
|
|
|
|
doc_type controls how aggressively we follow sub-links — cookie/dse
|
|
pages prefer self-extract only (CMP capture is authoritative); legal/
|
|
imprint pages need to follow sub-pages (Versicherungsvermittler etc).
|
|
"""
|
|
# 1. Consent-tester (Playwright-based, full JS rendering).
|
|
# max_documents depends on doc_type:
|
|
# - cookie/dse/social_media: self-extract (often + CMP capture) is
|
|
# authoritative, sub-pages dilute the policy text. max=1.
|
|
# - impressum/agb/widerruf/nutzungsbedingungen/dsb: BMW & similar
|
|
# enterprise sites split this across 3-4 short sub-pages
|
|
# (Versicherungsvermittler, Aufsicht, Berufsrecht). max=3 follows
|
|
# them. The 15s networkidle bail (dsi_helpers) keeps timing safe.
|
|
short_extract_types = {"cookie", "dse", "datenschutz", "privacy", "social_media"}
|
|
max_docs = 1 if (doc_type or "") in short_extract_types else 3
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/dsi-discovery",
|
|
json={"url": url, "max_documents": max_docs},
|
|
timeout=120.0,
|
|
)
|
|
if resp.status_code == 200:
|
|
docs = resp.json().get("documents", [])
|
|
if docs:
|
|
texts = []
|
|
for doc in docs:
|
|
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
|
|
if t and len(t) > 50:
|
|
texts.append(t)
|
|
merged = "\n\n".join(texts)
|
|
if merged and len(merged.split()) > 100:
|
|
if len(texts) > 1:
|
|
logger.info("Merged %d docs from %s (%d words)",
|
|
len(texts), url, len(merged.split()))
|
|
return merged
|
|
except Exception as e:
|
|
logger.warning("Consent-tester fetch failed for %s: %s", url, e)
|
|
|
|
# 2. Fallback: direct HTTP fetch (works for SSR pages like BMW)
|
|
try:
|
|
import re as _re
|
|
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
|
|
resp = await client.get(url)
|
|
if resp.status_code == 200 and "text/html" in resp.headers.get("content-type", ""):
|
|
html = resp.text
|
|
# Strip HTML tags, decode entities
|
|
text = _re.sub(r"<script[^>]*>.*?</script>", " ", html, flags=_re.DOTALL | _re.IGNORECASE)
|
|
text = _re.sub(r"<style[^>]*>.*?</style>", " ", text, flags=_re.DOTALL | _re.IGNORECASE)
|
|
text = _re.sub(r"<[^>]+>", " ", text)
|
|
text = _re.sub(r"\s+", " ", text).strip()
|
|
if len(text.split()) > 100:
|
|
logger.info("HTTP fallback for %s: %d words", url, len(text.split()))
|
|
return text
|
|
except Exception as e:
|
|
logger.warning("HTTP fallback failed for %s: %s", url, e)
|
|
|
|
return ""
|
|
|
|
|
|
async def _check_single(
|
|
text: str, doc_type: str, label: str, url: str,
|
|
word_count: int, use_agent: bool,
|
|
):
|
|
"""Run regex + MC checks on a single document."""
|
|
from compliance.services.doc_checks.runner import check_document_completeness
|
|
from compliance.services.rag_document_checker import check_document_with_controls
|
|
from .agent_doc_check_routes import CheckItem, DocCheckResult
|
|
|
|
# Regex checklist
|
|
findings = check_document_completeness(text, doc_type, label, url)
|
|
|
|
all_checks: list[CheckItem] = []
|
|
completeness = 0
|
|
correctness = 0
|
|
|
|
for f in findings:
|
|
if "SCORE" in f.get("code", ""):
|
|
for c in f.get("all_checks", []):
|
|
all_checks.append(CheckItem(
|
|
id=c["id"], label=c["label"], passed=c["passed"],
|
|
severity=c["severity"], matched_text=c.get("matched_text", ""),
|
|
level=c.get("level", 1), parent=c.get("parent"),
|
|
skipped=c.get("skipped", False), hint=c.get("hint", ""),
|
|
))
|
|
completeness = f.get("completeness_pct", 0)
|
|
correctness = f.get("correctness_pct", 0)
|
|
|
|
# Master Control checks (top 20 by severity to avoid noise)
|
|
try:
|
|
mc_results = await check_document_with_controls(
|
|
text, doc_type, label, max_controls=20, use_agent=use_agent,
|
|
)
|
|
if mc_results:
|
|
for mc in mc_results:
|
|
all_checks.append(CheckItem(**mc))
|
|
l2 = [c for c in all_checks if c.level == 2 and not c.skipped]
|
|
l2_passed = sum(1 for c in l2 if c.passed)
|
|
correctness = round(l2_passed / len(l2) * 100) if l2 else 0
|
|
except Exception as e:
|
|
logger.warning("MC check skipped for %s: %s", label, e)
|
|
|
|
# LLM verification of regex fails
|
|
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
|
|
if failed:
|
|
try:
|
|
from compliance.services.doc_checks.llm_verify import verify_failed_checks
|
|
overturns = await verify_failed_checks(
|
|
text,
|
|
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
|
|
label,
|
|
)
|
|
for c in all_checks:
|
|
if c.id in overturns and overturns[c.id]["overturned"]:
|
|
c.passed = True
|
|
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
|
|
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
|
|
l2_passed = sum(1 for c in l2_active if c.passed)
|
|
if l2_active:
|
|
correctness = round(l2_passed / len(l2_active) * 100)
|
|
except Exception as e:
|
|
logger.warning("LLM verification skipped: %s", e)
|
|
|
|
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
|
|
return DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
word_count=word_count or len(text.split()),
|
|
completeness_pct=completeness, correctness_pct=correctness,
|
|
checks=all_checks, findings_count=len(non_score),
|
|
)
|
|
|
|
|
|
def _pad_results_with_missing(results: list) -> list:
|
|
"""Ensure every canonical doc_type has an entry in the results list.
|
|
|
|
Doc_types the user did not submit get a placeholder DocCheckResult
|
|
with a 'Nicht eingereicht' marker so the email + frontend make
|
|
absent documents visible at a glance.
|
|
|
|
Preserves the canonical ordering from _ALL_DOC_TYPES so the report
|
|
layout is stable.
|
|
"""
|
|
from .agent_doc_check_routes import DocCheckResult
|
|
|
|
by_type: dict[str, object] = {}
|
|
for r in results:
|
|
# Map alias types (datenschutz/privacy → dse) to the canonical key
|
|
canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type
|
|
by_type[canon] = r
|
|
|
|
ordered: list = []
|
|
for dt in _ALL_DOC_TYPES:
|
|
if dt in by_type:
|
|
ordered.append(by_type[dt])
|
|
continue
|
|
ordered.append(DocCheckResult(
|
|
label=_doc_type_label(dt),
|
|
url="",
|
|
doc_type=dt,
|
|
word_count=0,
|
|
completeness_pct=0,
|
|
correctness_pct=0,
|
|
checks=[],
|
|
findings_count=0,
|
|
error="Nicht eingereicht — Quelle nicht angegeben",
|
|
scenario="missing",
|
|
))
|
|
|
|
# Append any results not in _ALL_DOC_TYPES (e.g. avv, dsfa) at the end
|
|
extras = [r for r in results
|
|
if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse")
|
|
not in _ALL_DOC_TYPES]
|
|
ordered.extend(extras)
|
|
return ordered
|
|
|
|
|
|
_COMPOUND_TLDS = {
|
|
"co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in",
|
|
"com.au", "com.br", "com.mx", "com.tr", "com.sg",
|
|
}
|
|
|
|
|
|
def _extract_domain(doc_entries: list[dict]) -> str | None:
|
|
"""Extract base domain (without www) from first URL."""
|
|
for entry in doc_entries:
|
|
url = entry.get("url", "")
|
|
if url and "://" in url:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(url).netloc.lower()
|
|
if host.startswith("www."):
|
|
host = host[4:]
|
|
return host or None
|
|
return None
|
|
|
|
|
|
def _company_name_from_url(doc_entries: list[dict]) -> str | None:
|
|
"""Derive a display company name from the entered URLs.
|
|
|
|
Heuristic: take the second-level domain (e.g. "bmw" from "www.bmw.de"),
|
|
uppercase short acronyms (<=4 chars, no hyphens), title-case the rest.
|
|
|
|
Examples:
|
|
www.bmw.de -> BMW
|
|
mercedes-benz.de -> Mercedes-Benz
|
|
shop.example.co.uk -> Example
|
|
juris.de -> Juris
|
|
"""
|
|
from urllib.parse import urlparse
|
|
|
|
for entry in doc_entries:
|
|
url = entry.get("url", "")
|
|
if not url or "://" not in url:
|
|
continue
|
|
host = urlparse(url).netloc.lower()
|
|
if host.startswith("www."):
|
|
host = host[4:]
|
|
parts = host.split(".")
|
|
if len(parts) < 2:
|
|
continue
|
|
# Handle compound TLDs (.co.uk etc.)
|
|
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
|
|
sld = parts[-3]
|
|
else:
|
|
sld = parts[-2]
|
|
if not sld:
|
|
continue
|
|
if len(sld) <= 4 and "-" not in sld:
|
|
return sld.upper()
|
|
return "-".join(p.capitalize() for p in sld.split("-"))
|
|
return None
|
|
|
|
|
|
def _get_skip_types(profile) -> dict[str, str]:
|
|
"""Doc_types to skip entirely. Currently empty — we check everything
|
|
and flag irrelevant items as INFO instead of skipping."""
|
|
return {}
|
|
|
|
|
|
def _apply_profile_filter(result, profile, doc_type: str):
|
|
"""Adjust INFO-level checks based on business profile context.
|
|
|
|
For example: ODR check only relevant for B2C online shops.
|
|
"""
|
|
from .agent_doc_check_routes import CheckItem
|
|
|
|
for check in result.checks:
|
|
cid = check.id.lower()
|
|
|
|
# ODR/OS-Link: relevant ONLY for B2C online shops. The check's
|
|
# default hint is written for B2B (it explains why it's not
|
|
# relevant) — for B2C we must replace it with action-oriented
|
|
# guidance, otherwise the report contradicts itself.
|
|
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
|
|
if profile.needs_odr:
|
|
if not check.passed:
|
|
check.hint = (
|
|
"Als B2C-Anbieter muessen Sie nach Art. 14 EU-VO 524/2013 "
|
|
"auf die OS-Plattform (https://ec.europa.eu/consumers/odr) "
|
|
"verlinken — klickbarer Link, nicht nur Text. Zusaetzlich "
|
|
"§36 VSBG: angeben, ob Sie an Verbraucher-"
|
|
"Streitbeilegungsverfahren teilnehmen (oder nicht)."
|
|
)
|
|
else:
|
|
check.skipped = True
|
|
check.hint = "Nicht relevant (kein B2C Online-Shop)"
|
|
|
|
# Widerruf: Flag entire document as unnecessary for B2B
|
|
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
|
|
check.severity = "INFO"
|
|
if not check.passed:
|
|
check.hint = (
|
|
"Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
|
|
"(§355 BGB gilt nur fuer Verbrauchervertraege). "
|
|
"Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
|
|
"Ihrer Website, da sie Verwirrung stiften kann."
|
|
)
|
|
|
|
# Regulated profession: check for Kammer info
|
|
if "kammer" in cid or "berufsordnung" in check.label.lower():
|
|
if not profile.is_regulated_profession:
|
|
check.skipped = True
|
|
check.hint = "Nicht relevant (kein regulierter Beruf)"
|
|
|
|
return result
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────
|
|
|
|
_DOC_TYPE_LABELS = {
|
|
"dse": "Datenschutzerklaerung",
|
|
"datenschutz": "Datenschutzerklaerung",
|
|
"privacy": "Datenschutzerklaerung",
|
|
"impressum": "Impressum",
|
|
"agb": "AGB",
|
|
"widerruf": "Widerrufsbelehrung",
|
|
"cookie": "Cookie-Richtlinie",
|
|
"avv": "Auftragsverarbeitung",
|
|
"loeschkonzept": "Loeschkonzept",
|
|
"dsfa": "Datenschutz-Folgenabschaetzung",
|
|
"social_media": "Social Media Datenschutz",
|
|
"nutzungsbedingungen": "Nutzungsbedingungen",
|
|
"dsb": "DSB-Kontakt",
|
|
}
|
|
|
|
# Canonical 8 doc types in the same order as the frontend ComplianceCheckTab.
|
|
# The route pads `results` to always contain an entry for each — even if
|
|
# the user did not submit a URL — so the email + frontend always show
|
|
# the complete checklist (missing rows marked as 'Nicht eingereicht').
|
|
_ALL_DOC_TYPES = [
|
|
"dse", "impressum", "social_media", "cookie",
|
|
"agb", "nutzungsbedingungen", "widerruf", "dsb",
|
|
]
|
|
|
|
|
|
def _doc_type_label(doc_type: str) -> str:
|
|
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
|
|
|
|
|
|
def _result_to_dict(r) -> dict:
|
|
"""Convert DocCheckResult to JSON-serializable dict."""
|
|
fields = ("id", "label", "passed", "severity", "matched_text",
|
|
"level", "parent", "skipped", "hint")
|
|
return {
|
|
"label": r.label, "url": r.url, "doc_type": r.doc_type,
|
|
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
|
|
"correctness_pct": r.correctness_pct,
|
|
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
|
|
"findings_count": r.findings_count, "error": r.error,
|
|
"scenario": getattr(r, "scenario", ""),
|
|
}
|
|
|
|
|
|
def _build_profile_html(profile) -> str:
|
|
from .agent_doc_check_report import build_profile_html
|
|
return build_profile_html(profile)
|
|
|
|
|
|
# Cross-check extracted to compliance.services.banner_cookie_cross_check
|
|
from compliance.services.banner_cookie_cross_check import cross_check_banner_vs_cookie as _cross_check_banner_vs_cookie
|