c281464071
CI / detect-changes (push) Successful in 10s
CI / branch-name (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s
CI / test-python-backend (push) Successful in 39s
CI / test-python-document-crawler (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
jc_avv_decision.py: detect_ambiguous_jc_avv prueft ob DSE-Text sowohl JC-Signale (gemeinsame Auswertung, Schwesterunternehmen, Konzern...) als auch AVV-Signale (Auftragsverarbeiter, weisungsgebunden...) enthaelt. Bei Treffer rendert build_jc_avv_decision_html einen Block mit 4 EDPB- basierten Leitfragen + jeweiliger Empfehlung. Quellen: EDPB Guidelines 7/2020, EuGH C-25/17, C-40/17. In Mail-Render zwischen Solutions-Block und VVT eingehaengt. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2197 lines
98 KiB
Python
2197 lines
98 KiB
Python
"""
|
|
Unified Compliance Check Routes — check all documents in one request.
|
|
|
|
POST /compliance/agent/extract-text — extract text from a URL
|
|
POST /compliance/agent/compliance-check — unified check for all documents
|
|
GET /compliance/agent/compliance-check/{check_id} — poll status
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import re
|
|
import uuid as _uuid
|
|
from dataclasses import asdict
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from fastapi import APIRouter
|
|
from pydantic import BaseModel
|
|
|
|
from compliance.services.smtp_sender import send_email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
|
|
|
|
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
|
|
|
|
# In-memory job store (same pattern as doc-check)
|
|
_compliance_check_jobs: dict[str, dict] = {}
|
|
|
|
|
|
# ── Models ───────────────────────────────────────────────────────────
|
|
|
|
class ExtractTextRequest(BaseModel):
|
|
url: str
|
|
|
|
|
|
class DocumentInput(BaseModel):
|
|
doc_type: str # dse, agb, impressum, cookie, widerruf, avv, loeschkonzept, etc.
|
|
url: str = ""
|
|
text: str = "" # text has priority over URL
|
|
|
|
|
|
class ComplianceCheckRequest(BaseModel):
|
|
documents: list[DocumentInput]
|
|
use_agent: bool = False
|
|
recipient: str = "dsb@breakpilot.local"
|
|
# P12: Override fuer TDM-Vorbehalt bei dokumentierter Kunden-Erlaubnis.
|
|
# Pflichtfeld tdm_override_reason wenn tdm_override=True
|
|
# (z.B. "Auftragsbeziehung Safetykon GmbH, Email Hr. X 18.05.2026").
|
|
tdm_override: bool = False
|
|
tdm_override_reason: str = ""
|
|
# P79: 8-Feld Pre-Scan-Wizard (Branche, B2B/B2C, Direkt-Vertrieb,
|
|
# Rechtsform, Konzern, MA, Besondere Daten, Drittland). Wird im
|
|
# Snapshot persistiert und filtert die MC-Auswertung (P72).
|
|
scan_context: dict | None = None
|
|
|
|
|
|
class ComplianceCheckStartResponse(BaseModel):
|
|
check_id: str
|
|
status: str = "running"
|
|
|
|
|
|
class ComplianceCheckStatusResponse(BaseModel):
|
|
check_id: str
|
|
status: str
|
|
progress: str = ""
|
|
progress_pct: int = 0
|
|
result: dict | None = None
|
|
error: str = ""
|
|
|
|
|
|
# ── Extract text endpoint ────────────────────────────────────────────
|
|
|
|
@router.post("/extract-text")
|
|
async def extract_text(req: ExtractTextRequest):
|
|
"""Extract text from a URL via consent-tester DSI discovery.
|
|
|
|
Merges all documents found on the page (sub-pages, accordions, etc.)
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/dsi-discovery",
|
|
json={"url": req.url, "max_documents": 5},
|
|
timeout=300.0,
|
|
)
|
|
if resp.status_code != 200:
|
|
return {
|
|
"text": "", "word_count": 0, "title": "",
|
|
"error": f"HTTP {resp.status_code} von Consent-Tester",
|
|
}
|
|
|
|
data = resp.json()
|
|
docs = data.get("documents", [])
|
|
|
|
if not docs:
|
|
return {
|
|
"text": "", "word_count": 0, "title": "",
|
|
"error": "Kein Text extrahierbar",
|
|
}
|
|
|
|
# Merge all documents (handles multi-page DSIs like BMW)
|
|
texts = []
|
|
for doc in docs:
|
|
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
|
|
if t and len(t) > 50:
|
|
texts.append(t)
|
|
text = "\n\n".join(texts) if texts else ""
|
|
title = docs[0].get("title", "") or docs[0].get("doc_type", "")
|
|
word_count = len(text.split())
|
|
|
|
return {
|
|
"text": text,
|
|
"word_count": word_count,
|
|
"title": title,
|
|
"error": "",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning("extract-text failed for %s: %s", req.url, e)
|
|
return {
|
|
"text": "", "word_count": 0, "title": "",
|
|
"error": str(e)[:200],
|
|
}
|
|
|
|
|
|
# ── Unified compliance check ────────────────────────────────────────
|
|
|
|
@router.post("/compliance-check")
|
|
async def start_compliance_check(req: ComplianceCheckRequest):
|
|
"""Start async compliance check for all documents."""
|
|
check_id = str(_uuid.uuid4())[:8]
|
|
_compliance_check_jobs[check_id] = {
|
|
"status": "running",
|
|
"progress": "Pruefung gestartet...",
|
|
"progress_pct": 0,
|
|
"result": None,
|
|
"error": "",
|
|
}
|
|
asyncio.create_task(_run_compliance_check(check_id, req))
|
|
return ComplianceCheckStartResponse(check_id=check_id, status="running")
|
|
|
|
|
|
@router.get("/compliance-check/{check_id}")
|
|
async def get_compliance_check_status(check_id: str):
|
|
"""Poll compliance check status."""
|
|
job = _compliance_check_jobs.get(check_id)
|
|
if not job:
|
|
return {"check_id": check_id, "status": "not_found"}
|
|
return ComplianceCheckStatusResponse(
|
|
check_id=check_id,
|
|
status=job["status"],
|
|
progress=job.get("progress", ""),
|
|
progress_pct=job.get("progress_pct", 0),
|
|
result=job.get("result"),
|
|
error=job.get("error", ""),
|
|
)
|
|
|
|
|
|
# ── P80: Snapshot + Replay ───────────────────────────────────────────
|
|
|
|
@router.get("/snapshots")
|
|
async def list_snapshots(domain: str = "", limit: int = 20):
|
|
"""P80: list recent snapshots, optionally filtered by site_domain."""
|
|
from database import SessionLocal
|
|
from compliance.services.check_snapshot import list_snapshots_for_domain
|
|
db = SessionLocal()
|
|
try:
|
|
if domain:
|
|
return {"snapshots": list_snapshots_for_domain(db, domain, limit)}
|
|
from sqlalchemy import text
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT id, check_id, site_domain, site_label, created_at,
|
|
replay_count, notes
|
|
FROM compliance.compliance_check_snapshots
|
|
ORDER BY created_at DESC
|
|
LIMIT :lim
|
|
"""),
|
|
{"lim": limit},
|
|
).fetchall()
|
|
return {"snapshots": [
|
|
{"id": str(r[0]), "check_id": r[1], "site_domain": r[2],
|
|
"site_label": r[3], "created_at": str(r[4]),
|
|
"replay_count": r[5], "notes": r[6]}
|
|
for r in rows
|
|
]}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/snapshots/{snapshot_id}")
|
|
async def get_snapshot(snapshot_id: str):
|
|
"""P80: load full snapshot raw data."""
|
|
from fastapi import HTTPException
|
|
from database import SessionLocal
|
|
from compliance.services.check_snapshot import load_snapshot
|
|
db = SessionLocal()
|
|
try:
|
|
snap = load_snapshot(db, snapshot_id)
|
|
if not snap:
|
|
raise HTTPException(status_code=404, detail="snapshot not found")
|
|
return snap
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/snapshots/{snapshot_id}/pdf")
|
|
async def export_snapshot_pdf(snapshot_id: str):
|
|
"""P88 — PDF-Export der Audit-Mail. Liefert application/pdf."""
|
|
from fastapi import HTTPException
|
|
from fastapi.responses import Response
|
|
from database import SessionLocal
|
|
from compliance.services.mail_pdf_export import render_snapshot_as_pdf
|
|
db = SessionLocal()
|
|
try:
|
|
pdf = render_snapshot_as_pdf(db, snapshot_id)
|
|
finally:
|
|
db.close()
|
|
if not pdf:
|
|
raise HTTPException(404, f"Snapshot {snapshot_id} nicht gefunden "
|
|
"oder PDF-Render fehlgeschlagen.")
|
|
fname = f"breakpilot-audit-{snapshot_id[:8]}.pdf"
|
|
return Response(
|
|
content=pdf, media_type="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
|
|
)
|
|
|
|
|
|
@router.post("/snapshots/{snapshot_id}/replay")
|
|
async def replay_snapshot(
|
|
snapshot_id: str,
|
|
recipient: str = "",
|
|
dry_run: bool = True,
|
|
):
|
|
"""P80: replay audit mail render from snapshot. 7min->2sec test cycle.
|
|
|
|
Default dry_run=true just returns rendered HTML size + section breakdown.
|
|
Pass recipient + dry_run=false to actually send a [REPLAY] mail.
|
|
"""
|
|
from database import SessionLocal
|
|
from compliance.services.check_replay import replay_from_snapshot
|
|
db = SessionLocal()
|
|
try:
|
|
return replay_from_snapshot(
|
|
db,
|
|
snapshot_id=snapshot_id,
|
|
recipient=(recipient if recipient else None),
|
|
dry_run=dry_run,
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
|
"""Background task: check all documents with business-profile context."""
|
|
try:
|
|
from compliance.services.business_profiler import detect_business_profile
|
|
from compliance.services.doc_checks.runner import check_document_completeness
|
|
from compliance.services.rag_document_checker import check_document_with_controls
|
|
from .agent_doc_check_routes import CheckItem, DocCheckResult
|
|
from .agent_doc_check_report import build_html_report
|
|
|
|
# Reset anchor-locator cache per run (avoid cross-run leak)
|
|
try:
|
|
from compliance.services.doc_anchor_locator import reset_cache
|
|
reset_cache()
|
|
except Exception:
|
|
pass
|
|
|
|
# P7: TDM-Reservation-Check der Base-Domain (§ 44b UrhG).
|
|
# Bei reserved/denied: Run sofort beenden, kein Crawl.
|
|
try:
|
|
from compliance.services.tdm_reservation_check import (
|
|
check_tdm_reservation, is_crawl_allowed,
|
|
)
|
|
first_url = next(
|
|
(d.url for d in req.documents if d.url), "",
|
|
)
|
|
if first_url:
|
|
tdm = await check_tdm_reservation(first_url)
|
|
_compliance_check_jobs[check_id]["tdm"] = tdm
|
|
# P12: Bei tdm_override + Reason wird NICHT abgebrochen,
|
|
# sondern nur dokumentiert. Override ohne Reason wird ignoriert.
|
|
override_active = (
|
|
req.tdm_override
|
|
and len((req.tdm_override_reason or "").strip()) >= 10
|
|
)
|
|
if not is_crawl_allowed(tdm) and not override_active:
|
|
_compliance_check_jobs[check_id]["status"] = "skipped_tdm"
|
|
_compliance_check_jobs[check_id]["error"] = (
|
|
f"TDM-Vorbehalt fuer {tdm.get('domain')} erkannt "
|
|
f"(status={tdm.get('status')}) — Crawl nach § 44b "
|
|
f"UrhG nicht zulaessig. Signals: "
|
|
f"{[s.get('src') for s in tdm.get('signals', [])]}"
|
|
)
|
|
_compliance_check_jobs[check_id]["progress_pct"] = 100
|
|
logger.info("TDM-skip check_id=%s domain=%s status=%s",
|
|
check_id, tdm.get("domain"), tdm.get("status"))
|
|
return
|
|
if override_active and not is_crawl_allowed(tdm):
|
|
_compliance_check_jobs[check_id]["tdm_override"] = {
|
|
"reason": req.tdm_override_reason.strip()[:500],
|
|
"original_status": tdm.get("status"),
|
|
}
|
|
logger.warning(
|
|
"TDM-Override aktiv: check_id=%s domain=%s "
|
|
"status=%s reason=%r",
|
|
check_id, tdm.get("domain"), tdm.get("status"),
|
|
req.tdm_override_reason.strip()[:80],
|
|
)
|
|
except Exception as e:
|
|
logger.warning("TDM-check failed (proceeding): %s", e)
|
|
|
|
# Step 1: Resolve texts (fetch from URL if needed) — 0-30%
|
|
_update(check_id, "Texte werden geladen...", 1)
|
|
doc_texts: dict[str, str] = {}
|
|
doc_entries: list[dict] = []
|
|
|
|
# Cache fetched URLs to detect duplicates
|
|
url_text_cache: dict[str, str] = {}
|
|
|
|
n_docs = max(1, len(req.documents))
|
|
for i, doc in enumerate(req.documents):
|
|
pct = int(1 + (i / n_docs) * 29)
|
|
_update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct)
|
|
text = doc.text
|
|
cmp_payloads: list[dict] = []
|
|
if not text and doc.url:
|
|
url_key = doc.url.strip().rstrip("/").lower()
|
|
if url_key in url_text_cache:
|
|
text = url_text_cache[url_key]
|
|
else:
|
|
text, cmp_payloads = await _fetch_text(doc.url, doc_type=doc.doc_type)
|
|
if text:
|
|
url_text_cache[url_key] = text
|
|
if text:
|
|
doc_texts[doc.doc_type] = text
|
|
doc_entries.append({
|
|
"doc_type": doc.doc_type,
|
|
"url": doc.url,
|
|
"text": text,
|
|
"word_count": len(text.split()) if text else 0,
|
|
"auto_discovered": False,
|
|
"discovery_attempted": False,
|
|
"cmp_payloads": cmp_payloads,
|
|
})
|
|
|
|
# Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user
|
|
# did NOT submit a URL/text for, try to find it on the homepage of
|
|
# the submitted URLs. This bridges the gap between "user knows the
|
|
# exact URL" (rare) and "user pasted the homepage" (common).
|
|
await _autodiscover_missing(
|
|
check_id, doc_entries, doc_texts, url_text_cache,
|
|
)
|
|
|
|
# Step 1b: Section splitting — two cases:
|
|
# 1. Same URL used for multiple doc_types → split by heading
|
|
# 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows
|
|
from compliance.services.section_splitter import (
|
|
split_shared_texts, auto_fill_from_dsi, cross_search_documents,
|
|
)
|
|
split_shared_texts(doc_entries, url_text_cache)
|
|
auto_fill_from_dsi(doc_entries)
|
|
|
|
# Step 1c: Cross-document search — find doc_types in wrong documents (30-35%)
|
|
_update(check_id, "Dokumente werden uebergreifend durchsucht...", 32)
|
|
placement_findings = cross_search_documents(doc_entries)
|
|
|
|
# Refresh doc_texts after all splitting/searching
|
|
for entry in doc_entries:
|
|
if entry.get("text"):
|
|
doc_texts[entry["doc_type"]] = entry["text"]
|
|
|
|
# P15: Dedupe — wenn mehrere Doc-Types DASSELBE Dokument referenzieren
|
|
# (z.B. Safetykon: User gibt /datenschutz fuer dse + cookie + widerruf),
|
|
# behalten wir nur den primaeren Doc-Type. Andere: leeren + note.
|
|
# Priorität: dse > impressum > cookie > widerruf > agb > nutzungsbedingungen
|
|
_DOC_PRIORITY = ["dse", "impressum", "cookie", "widerruf", "agb",
|
|
"nutzungsbedingungen", "social_media", "dsb"]
|
|
seen_text_hash: dict[int, str] = {}
|
|
for dt in _DOC_PRIORITY:
|
|
entry = next((e for e in doc_entries if e.get("doc_type") == dt
|
|
and e.get("text")), None)
|
|
if not entry:
|
|
continue
|
|
text_hash = hash((entry.get("text") or "").strip()[:1000])
|
|
if text_hash in seen_text_hash:
|
|
primary = seen_text_hash[text_hash]
|
|
logger.info(
|
|
"P15 dedup: doc_type=%s referenziert dasselbe Dokument "
|
|
"wie %s (URL=%s) -> als Duplikat markiert.",
|
|
dt, primary, entry.get("url", "")[:60],
|
|
)
|
|
entry["text"] = ""
|
|
entry["word_count"] = 0
|
|
entry["url"] = ""
|
|
entry["dup_of"] = primary
|
|
doc_texts.pop(dt, None)
|
|
else:
|
|
seen_text_hash[text_hash] = dt
|
|
|
|
# Step 2: Detect business profile (35-40%)
|
|
_update(check_id, "Geschaeftsmodell wird erkannt...", 37)
|
|
# P16: Homepage-Text mit fuer Profile-Detection (no_direct_sales
|
|
# B2B-Indikatoren wie "CE-Zertifizierung" / "Schulungen" stehen oft
|
|
# nur im Homepage-Menue, nicht im Pflichttext).
|
|
profile_input = dict(doc_texts)
|
|
try:
|
|
base_url = ""
|
|
for e in doc_entries:
|
|
if e.get("url"):
|
|
from urllib.parse import urlparse
|
|
p = urlparse(e["url"])
|
|
if p.scheme and p.netloc:
|
|
base_url = f"{p.scheme}://{p.netloc}/"
|
|
break
|
|
if base_url:
|
|
import re as _re
|
|
async with httpx.AsyncClient(
|
|
timeout=8.0, follow_redirects=True,
|
|
headers={"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) "
|
|
"AppleWebKit/537.36 HeadlessChrome/120.0.0.0"},
|
|
) as _hc:
|
|
_hr = await _hc.get(base_url)
|
|
if _hr.status_code == 200 and "text/html" in _hr.headers.get(
|
|
"content-type", ""):
|
|
_html = _hr.text[:60000]
|
|
_html = _re.sub(r"<script[^>]*>.*?</script>", " ",
|
|
_html, flags=_re.DOTALL | _re.IGNORECASE)
|
|
_html = _re.sub(r"<style[^>]*>.*?</style>", " ",
|
|
_html, flags=_re.DOTALL | _re.IGNORECASE)
|
|
_html = _re.sub(r"<[^>]+>", " ", _html)
|
|
_html = _re.sub(r"\s+", " ", _html).strip()
|
|
if len(_html.split()) > 30:
|
|
profile_input["__homepage"] = _html[:20000]
|
|
logger.info("P16 homepage merged for profile: %d words",
|
|
len(_html.split()))
|
|
except Exception as e:
|
|
logger.debug("homepage fetch for profile failed: %s", e)
|
|
profile = await detect_business_profile(profile_input)
|
|
profile_dict = asdict(profile)
|
|
|
|
# Step 3: Check each document
|
|
results: list[DocCheckResult] = []
|
|
total_findings = 0
|
|
use_agent_flag = req.use_agent or os.getenv(
|
|
"COMPLIANCE_USE_AGENT", "false"
|
|
).lower() == "true"
|
|
|
|
# Filter out doc_types that don't apply to this business profile
|
|
skip_types = _get_skip_types(profile)
|
|
|
|
# Derive business_scope hints for the MC filter (O1 — Doc-type Scope-Flag).
|
|
# MCs that explicitly require a feature (e.g. 'biometric_processing',
|
|
# 'ai_decision_making', 'child_targeting') get dropped when the
|
|
# detected profile doesn't declare it.
|
|
business_scope: set[str] = set()
|
|
for svc in (getattr(profile, "detected_services", []) or []):
|
|
business_scope.add(str(svc).lower())
|
|
if (getattr(profile, "business_type", "") or "").lower() == "b2c":
|
|
business_scope.add("b2c")
|
|
if getattr(profile, "has_online_shop", False):
|
|
business_scope.add("ecommerce")
|
|
if getattr(profile, "is_regulated_profession", False):
|
|
business_scope.add("regulated_profession")
|
|
|
|
# Document checks: 40-80%
|
|
n_entries = max(1, len(doc_entries))
|
|
for i, entry in enumerate(doc_entries):
|
|
text = entry["text"]
|
|
doc_type = entry["doc_type"]
|
|
label = _doc_type_label(doc_type)
|
|
url = entry["url"]
|
|
|
|
if doc_type in skip_types:
|
|
results.append(DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
error=skip_types[doc_type],
|
|
))
|
|
continue
|
|
|
|
pct = int(40 + (i / n_entries) * 40)
|
|
_update(check_id, f"Pruefen {i+1}/{n_entries}: {label}...", pct)
|
|
|
|
if not text or len(text) < 50:
|
|
# P15: duplicate doc that was deduped against a primary doc
|
|
if entry.get("dup_of"):
|
|
results.append(DocCheckResult(
|
|
label=label, url="", doc_type=doc_type,
|
|
error=f"Nicht separat vorhanden — wird im Dokument "
|
|
f"'{_doc_type_label(entry['dup_of'])}' "
|
|
f"mit-geprueft.",
|
|
))
|
|
continue
|
|
# P24: DSB-Kontakt ist Pflichtangabe in der DSE (Art. 13(1)(b)
|
|
# DSGVO) — wenn kein separates DSB-Dokument vorliegt, ist das
|
|
# KEIN Fehler. DSB-Pruefung passiert ohnehin in der DSE.
|
|
if doc_type == "dsb" and not (entry.get("url") or "").strip():
|
|
results.append(DocCheckResult(
|
|
label=label, url="", doc_type=doc_type,
|
|
error="Nicht separat vorhanden — DSB-Kontaktdaten "
|
|
"werden in der Datenschutzerklaerung als "
|
|
"Pflichtangabe nach Art. 13(1)(b) DSGVO geprueft.",
|
|
))
|
|
continue
|
|
# Empty entry — either from auto-discovery padding (no URL
|
|
# to fetch) or from a fetch that returned nothing. If there
|
|
# was a URL we keep the error so the user knows the fetch
|
|
# failed; otherwise let the padding step label it
|
|
# 'Nicht eingereicht' / 'Auf der Website nicht gefunden'.
|
|
if (entry.get("url") or "").strip():
|
|
results.append(DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
error="Kein Text vorhanden oder zu kurz",
|
|
))
|
|
continue
|
|
|
|
result = await _check_single(
|
|
text, doc_type, label, url,
|
|
entry["word_count"], use_agent_flag,
|
|
business_scope=business_scope,
|
|
business_profile={"no_direct_sales": getattr(profile, "no_direct_sales", False)},
|
|
)
|
|
|
|
# Apply profile context filter
|
|
result = _apply_profile_filter(result, profile, doc_type)
|
|
|
|
# Add placement findings — but only if the regex checks confirm
|
|
# the text doesn't match. If completeness >= 50%, the text IS the
|
|
# right doc_type despite missing cross-search keywords.
|
|
if result.completeness_pct < 50:
|
|
for pf in placement_findings:
|
|
if pf.get("doc_type") == doc_type:
|
|
result.checks.insert(0, CheckItem(**{
|
|
k: v for k, v in pf.items() if k != "doc_type"
|
|
}))
|
|
|
|
results.append(result)
|
|
total_findings += result.findings_count
|
|
|
|
# Step 3b: Banner-Check (automatic, uses first URL or homepage)
|
|
banner_result = None
|
|
banner_url = req.documents[0].url if req.documents and req.documents[0].url else ""
|
|
# Use the homepage (strip path) for banner check
|
|
if banner_url:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(banner_url)
|
|
banner_url = f"{parsed.scheme}://{parsed.netloc}"
|
|
if banner_url:
|
|
_update(check_id, "Cookie-Banner wird geprueft...", 82)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=900.0) as client: # P50: +10min for vendor-detail-phase
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/scan",
|
|
json={"url": banner_url, "timeout_per_phase": 10},
|
|
)
|
|
if resp.status_code == 200:
|
|
banner_result = resp.json()
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Banner check failed: %s (%s)", e or "<empty>", type(e).__name__
|
|
)
|
|
|
|
# Step 3c: Cross-check Banner vs Cookie-Richtlinie (88-90%)
|
|
if banner_result and "cookie" in doc_texts:
|
|
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...", 89)
|
|
cross_findings = _cross_check_banner_vs_cookie(
|
|
banner_result, doc_texts["cookie"],
|
|
)
|
|
if cross_findings:
|
|
for r in results:
|
|
if r.doc_type == "cookie":
|
|
for cf in cross_findings:
|
|
r.checks.append(CheckItem(**cf))
|
|
l2 = [c for c in r.checks if c.level == 2 and not c.skipped]
|
|
l2p = sum(1 for c in l2 if c.passed)
|
|
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
|
|
|
|
# Step 3d: TCF Vendor cross-check against DSI
|
|
tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else []
|
|
vvt_entries: list[dict] = []
|
|
if tcf_vendors and "dse" in doc_texts:
|
|
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...", 91)
|
|
from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi
|
|
from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt
|
|
vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"])
|
|
if vendor_findings:
|
|
for r in results:
|
|
if r.doc_type == "dse":
|
|
for vf in vendor_findings:
|
|
r.checks.append(CheckItem(**vf))
|
|
vvt_entries = map_vendors_to_vvt(tcf_vendors)
|
|
|
|
# Step 4: Extract profile hints from documents (92-95%)
|
|
_update(check_id, "Profil wird aus Dokumenten extrahiert...", 93)
|
|
from compliance.services.profile_extractor import extract_profile_from_documents
|
|
extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
|
|
|
|
# Step 4b: Determine scenario per document
|
|
for r in results:
|
|
if r.error:
|
|
r.scenario = "skip"
|
|
elif r.completeness_pct < 30:
|
|
r.scenario = "regenerate"
|
|
elif r.completeness_pct < 95:
|
|
r.scenario = "fix"
|
|
else:
|
|
r.scenario = "import"
|
|
|
|
# Step 4c: Always render all 8 canonical doc types. Missing types
|
|
# are differentiated:
|
|
# - Discovery was tried but found nothing -> 'Auf der Website
|
|
# nicht gefunden' (suggest user provides URL manually)
|
|
# - No submitted URLs at all -> 'Nicht eingereicht'
|
|
attempted = {
|
|
e["doc_type"] for e in doc_entries if e.get("discovery_attempted")
|
|
}
|
|
results = _pad_results_with_missing(results, discovery_attempted=attempted)
|
|
|
|
# Step 5: Build report with management summary (95-98%)
|
|
_update(check_id, "Report wird erstellt...", 96)
|
|
from .agent_doc_check_report import (
|
|
build_management_summary,
|
|
build_scanned_urls_html,
|
|
build_provider_list_html,
|
|
)
|
|
from .agent_doc_check_extras import build_vvt_table_html
|
|
|
|
# Extract structured vendor records from any CMP payloads captured
|
|
# for the cookie doc (BMW ePaaS, OneTrust, etc.), validate their
|
|
# opt-out + privacy URLs concurrently, score each entry.
|
|
cmp_vendors: list[dict] = []
|
|
try:
|
|
from compliance.services.vendor_extractor import (
|
|
extract_vendors_from_payloads,
|
|
)
|
|
from compliance.services.cookie_link_validator import (
|
|
validate_vendor_urls, score_vendors,
|
|
)
|
|
cookie_payloads = []
|
|
cookie_text = ""
|
|
# P30: aggregate cmp_payloads from ALL doc_entries — sites
|
|
# like Mercedes load Usercentrics only on the homepage, so
|
|
# the JSON gets captured during DSE/Impressum discovery, not
|
|
# in the cookies.html fetch. Dedup by URL since the same
|
|
# payload is captured on every page load.
|
|
seen_cmp_urls: set[str] = set()
|
|
for e in doc_entries:
|
|
for p in (e.get("cmp_payloads") or []):
|
|
p_url = p.get("url") or ""
|
|
if p_url and p_url in seen_cmp_urls:
|
|
continue
|
|
seen_cmp_urls.add(p_url)
|
|
cookie_payloads.append(p)
|
|
if e.get("doc_type") == "cookie" and e.get("text"):
|
|
cookie_text = e["text"]
|
|
# P48: also pull cmp_payloads from the Banner-Scan (homepage
|
|
# 3-phase consent test). Mercedes' Usercentrics-JSON is
|
|
# captured there even when not in DSI-Discovery of static
|
|
# legal pages.
|
|
if banner_result:
|
|
for p in (banner_result.get("cmp_payloads") or []):
|
|
p_url = p.get("url") or ""
|
|
if p_url and p_url in seen_cmp_urls:
|
|
continue
|
|
seen_cmp_urls.add(p_url)
|
|
cookie_payloads.append(p)
|
|
if cookie_payloads:
|
|
logger.info("P48: %d CMP-payloads available for vendor-extract (after Banner-Scan merge)",
|
|
len(cookie_payloads))
|
|
# P17-D: Fallback wenn cookie via P15 deduped wurde — nutze DSE-Text
|
|
# sofern Cookie-Begriffe drin sind, damit LLM-Vendor-Extract trotzdem
|
|
# greifen kann.
|
|
if not cookie_text and not cookie_payloads:
|
|
dse_t = doc_texts.get("dse", "")
|
|
if dse_t and any(w in dse_t.lower() for w in
|
|
("cookie", "tracking", "google analytics", "consent")):
|
|
cookie_text = dse_t
|
|
logger.info("P17-D: vendor-extract Fallback auf DSE (Cookie deduped)")
|
|
# Site-owner derived from the submitted URLs — drives the
|
|
# INTERNAL/GROUP_COMPANY classification of vendor records.
|
|
owner_name = _company_name_from_url(doc_entries) or ""
|
|
if cookie_payloads:
|
|
cmp_vendors = extract_vendors_from_payloads(
|
|
cookie_payloads, owner_name=owner_name,
|
|
)
|
|
# P52: LLM-Fallback nicht nur wenn 0 Vendors, sondern auch
|
|
# wenn die strukturierten Quellen < 5 Vendors lieferten und
|
|
# der Cookie-Text substantiell ist. So holt sich VW-typische
|
|
# Setups (Generic CMP, 28 Cookies aber 0 cmp_payloads) noch
|
|
# ihre echten Vendors aus dem Text.
|
|
if (len(cmp_vendors) < 5
|
|
and cookie_text and len(cookie_text.split()) >= 500):
|
|
from compliance.services.vendor_llm_extractor import (
|
|
extract_vendors_via_llm,
|
|
)
|
|
from compliance.services.vendor_classifier import classify
|
|
_update(check_id, "Vendor-Liste per LLM extrahieren...", 94)
|
|
llm_vendors = await extract_vendors_via_llm(cookie_text)
|
|
# P52: classify die LLM-Vendors und MERGE mit existing
|
|
# statt zu ueberschreiben.
|
|
existing_names = {(v.get("name") or "").strip().lower()
|
|
for v in cmp_vendors}
|
|
added_llm = 0
|
|
for v in llm_vendors:
|
|
nm = (v.get("name") or "").strip()
|
|
if not nm or nm.lower() in existing_names:
|
|
continue
|
|
v["recipient_type"] = classify(
|
|
vendor_name=nm,
|
|
category=v.get("category", ""),
|
|
owner_name=owner_name,
|
|
)
|
|
v.setdefault("source", "llm_cascade")
|
|
cmp_vendors.append(v)
|
|
existing_names.add(nm.lower())
|
|
added_llm += 1
|
|
if added_llm:
|
|
logger.info(
|
|
"P52 LLM-Cascade: +%d Vendors (total: %d)",
|
|
added_llm, len(cmp_vendors),
|
|
)
|
|
# P57: Phase G vendor_details als zusätzliche Vendor-Quelle.
|
|
# Wenn extract_vendors_from_payloads weniger findet als
|
|
# Phase G's Info-Click-Through (z.B. Mercedes-Settings nicht
|
|
# erkannt als usercentrics-kind), die Phase-G-Namen als
|
|
# eigenständige Vendors hinzufügen.
|
|
if banner_result:
|
|
vd_list = banner_result.get("vendor_details") or []
|
|
vd_list = [v for v in vd_list if v.get("name") != "__TDM_OPTOUT__"]
|
|
existing_names = {(v.get("name") or "").strip().lower()
|
|
for v in cmp_vendors}
|
|
added = 0
|
|
for d in vd_list:
|
|
n = (d.get("name") or "").strip()
|
|
if not n or n.lower() in existing_names:
|
|
continue
|
|
# Skip generic category-labels (Mercedes-Kategorien)
|
|
if n.lower() in ("technisch erforderlich", "analyse und statistik",
|
|
"marketing", "alles auswählen",
|
|
"alles auswaehlen"):
|
|
continue
|
|
from compliance.services.vendor_classifier import classify
|
|
cmp_vendors.append({
|
|
"name": n,
|
|
"country": "",
|
|
"purpose": d.get("description", "")[:500],
|
|
"category": "",
|
|
"opt_out_url": d.get("opt_out_url", ""),
|
|
"privacy_policy_url": d.get("privacy_url", ""),
|
|
"persistence": d.get("retention", ""),
|
|
"cookies": d.get("cookies", []),
|
|
"processing_company": d.get("processing_company", ""),
|
|
"address": d.get("address", ""),
|
|
"purposes": d.get("purposes", []),
|
|
"technologies": d.get("technologies", []),
|
|
"recipient_type": classify(
|
|
vendor_name=n, category="", owner_name=owner_name,
|
|
),
|
|
})
|
|
existing_names.add(n.lower())
|
|
added += 1
|
|
if added:
|
|
logger.info("P57: added %d new vendors from Phase G (total: %d)",
|
|
added, len(cmp_vendors))
|
|
|
|
# Cookie-Library-Fallback (P52 Lite): wenn weiterhin wenige
|
|
# Vendors aber viele after_accept-Cookies, aus Library auflösen.
|
|
if banner_result and len(cmp_vendors) < 3:
|
|
try:
|
|
from compliance.services.cookie_to_vendor_fallback import (
|
|
fallback_vendors_for_run,
|
|
)
|
|
from database import SessionLocal as _SLfb
|
|
_fb_db = _SLfb()
|
|
try:
|
|
extra = fallback_vendors_for_run(
|
|
_fb_db, banner_result, len(cmp_vendors),
|
|
)
|
|
if extra:
|
|
existing_names = {(v.get("name") or "").strip().lower()
|
|
for v in cmp_vendors}
|
|
for v in extra:
|
|
if v["name"].lower() in existing_names:
|
|
continue
|
|
cmp_vendors.append(v)
|
|
logger.info(
|
|
"Cookie-Library-Fallback: cmp_vendors %d -> %d",
|
|
len(cmp_vendors) - len(extra), len(cmp_vendors),
|
|
)
|
|
finally:
|
|
_fb_db.close()
|
|
except Exception as e:
|
|
logger.warning("Cookie-Library-Fallback skipped: %s", e)
|
|
|
|
# P50: enrich vendors with per-vendor detail-modal-extracts
|
|
# (description, opt-out URL, privacy URL, cookies). Detail
|
|
# comes from Phase G Info-button-click-through in /scan.
|
|
tdm_opt_out_notice = ""
|
|
if cmp_vendors and banner_result:
|
|
vendor_details = banner_result.get("vendor_details") or []
|
|
# P50f: filter out TDM-opt-out sentinel
|
|
tdm_sentinel = next((v for v in vendor_details
|
|
if v.get("name") == "__TDM_OPTOUT__"), None)
|
|
if tdm_sentinel:
|
|
tdm_opt_out_notice = tdm_sentinel.get("description", "")
|
|
logger.info("P50f: TDM opt-out — skipped detail-enrichment for vendors")
|
|
vendor_details = [v for v in vendor_details
|
|
if v.get("name") != "__TDM_OPTOUT__"]
|
|
if vendor_details:
|
|
details_by_name = {}
|
|
for d in vendor_details:
|
|
n = (d.get("name") or "").strip().lower()
|
|
if n:
|
|
details_by_name[n] = d
|
|
enriched = 0
|
|
for v in cmp_vendors:
|
|
key = (v.get("name") or "").strip().lower()
|
|
# Substring fallback for fuzzy matches (e.g.
|
|
# "Google Analytics" detail-name may differ slightly)
|
|
d = details_by_name.get(key)
|
|
if not d:
|
|
for dn, dv in details_by_name.items():
|
|
if key in dn or dn in key:
|
|
d = dv
|
|
break
|
|
if not d:
|
|
continue
|
|
if not v.get("country") and (d.get("processing_company") or d.get("address")):
|
|
# Heuristic country extract from address (DE/EU keywords)
|
|
addr = d.get("address", "")
|
|
if re.search(r"\b(deutschland|germany|berlin|m(?:ue|ü)nchen|hamburg|stuttgart)\b", addr, re.I):
|
|
v["country"] = "DE"
|
|
elif re.search(r"\bireland|irland|dublin\b", addr, re.I):
|
|
v["country"] = "IE"
|
|
elif re.search(r"\busa|united states|california|new york|delaware\b", addr, re.I):
|
|
v["country"] = "US"
|
|
if not v.get("purpose"):
|
|
v["purpose"] = d.get("description", "")[:500]
|
|
if not v.get("opt_out_url"):
|
|
v["opt_out_url"] = d.get("opt_out_url", "")
|
|
if not v.get("privacy_policy_url"):
|
|
v["privacy_policy_url"] = d.get("privacy_url", "")
|
|
if not v.get("cookies"):
|
|
v["cookies"] = d.get("cookies", [])
|
|
v["purposes"] = d.get("purposes", [])
|
|
v["technologies"] = d.get("technologies", [])
|
|
if not v.get("persistence"):
|
|
v["persistence"] = d.get("retention", "")
|
|
v["processing_company"] = d.get("processing_company", "")
|
|
v["address"] = d.get("address", "")
|
|
enriched += 1
|
|
logger.info("P50: enriched %d/%d vendors with detail-modal data",
|
|
enriched, len(cmp_vendors))
|
|
# P59b: Cookie-Behavior-Validator — pruefe alle gesetzten Cookies
|
|
# gegen unsere Library, generiere 3-Tier-Severity-Findings.
|
|
# Background-Task hat keinen DB-Dependency-Inject -> SessionLocal
|
|
# selber oeffnen + sauber schliessen.
|
|
cookie_behavior_findings: list[dict] = []
|
|
if banner_result:
|
|
cookies_detailed = banner_result.get("cookies_detailed") or []
|
|
if cookies_detailed:
|
|
cb_session = None
|
|
try:
|
|
from database import SessionLocal
|
|
from compliance.services.cookie_behavior_validator import (
|
|
validate_cookie_behavior,
|
|
)
|
|
from urllib.parse import urlparse
|
|
fp_domain = ""
|
|
if banner_url:
|
|
fp_domain = urlparse(banner_url).netloc.replace("www.", "")
|
|
cb_session = SessionLocal()
|
|
cookie_behavior_findings = validate_cookie_behavior(
|
|
cb_session, cookies_detailed,
|
|
network_requests=[], # TODO Layer B in P59d
|
|
first_party_domain=fp_domain,
|
|
)
|
|
if cookie_behavior_findings:
|
|
sevs = {f["severity"] for f in cookie_behavior_findings}
|
|
logger.info(
|
|
"P59b: Cookie-Behavior-Check %d findings "
|
|
"(severities: %s) ueber %d Cookies",
|
|
len(cookie_behavior_findings),
|
|
sorted(sevs),
|
|
len(cookies_detailed),
|
|
)
|
|
banner_result["cookie_behavior_findings"] = (
|
|
cookie_behavior_findings
|
|
)
|
|
else:
|
|
logger.info(
|
|
"P59b: Cookie-Behavior-Check 0 findings "
|
|
"ueber %d Cookies (library miss / clean)",
|
|
len(cookies_detailed),
|
|
)
|
|
except Exception as cb_err:
|
|
logger.warning("P59b Cookie-Behavior-Check failed: %s", cb_err)
|
|
finally:
|
|
if cb_session is not None:
|
|
try:
|
|
cb_session.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# P61: "Untergeschobene Cookies" — wenn z.B. Google Tag Manager
|
|
# deklariert ist, kommen GA + GCL_AU + DoubleClick automatisch mit.
|
|
# Findings landen im banner_result fuer Mail-Render.
|
|
if banner_result and cmp_vendors:
|
|
try:
|
|
from compliance.services.vendor_package_cookies import (
|
|
detect_implicit_cookies,
|
|
)
|
|
declared = [v.get("name", "") for v in cmp_vendors if v.get("name")]
|
|
actual_cookies: list[str] = []
|
|
for phase_data in (banner_result.get("phases") or {}).values():
|
|
if isinstance(phase_data, dict):
|
|
for ck in (phase_data.get("cookies") or []):
|
|
if isinstance(ck, dict) and ck.get("name"):
|
|
actual_cookies.append(ck["name"])
|
|
implicit_findings = detect_implicit_cookies(
|
|
declared, actual_cookies_set=actual_cookies or None,
|
|
)
|
|
if implicit_findings:
|
|
banner_result["implicit_vendor_findings"] = implicit_findings
|
|
logger.info(
|
|
"P61: %d implicit vendor-package items detected "
|
|
"(%d cookies + %d vendors)",
|
|
len(implicit_findings),
|
|
sum(1 for f in implicit_findings if f["implicit"]["type"] == "cookie"),
|
|
sum(1 for f in implicit_findings if f["implicit"]["type"] == "vendor"),
|
|
)
|
|
except Exception as p61_err:
|
|
logger.warning("P61 implicit-vendor detection failed: %s", p61_err)
|
|
|
|
if cmp_vendors:
|
|
logger.info("VVT: %d vendors extracted, validating links",
|
|
len(cmp_vendors))
|
|
cmp_vendors = await validate_vendor_urls(cmp_vendors)
|
|
cmp_vendors = score_vendors(cmp_vendors)
|
|
# Enrich each vendor with per-cookie functional roles
|
|
try:
|
|
from compliance.services.cookie_function_classifier import (
|
|
annotate_vendor_cookies,
|
|
)
|
|
cmp_vendors = [annotate_vendor_cookies(v) for v in cmp_vendors]
|
|
except Exception as e:
|
|
logger.warning("Cookie function classification skipped: %s", e)
|
|
except Exception as e:
|
|
logger.warning("VVT vendor extraction skipped: %s", e)
|
|
|
|
# Vendor-Redundanz + EU-Alternativen + Cost/Savings (O4)
|
|
redundancy_report = None
|
|
try:
|
|
from compliance.services.vendor_redundancy import analyze as analyze_redundancy
|
|
from compliance.services.vendor_cost_estimator import infer_company_tier
|
|
if cmp_vendors:
|
|
# Company-Tier aus business_profile ableiten — beeinflusst die
|
|
# Cost-Range so dass z.B. fuer DAX-Konzerne nicht starter-Preise
|
|
# die untere Schranke duruecken.
|
|
bp_dict = {
|
|
"type": getattr(profile, "business_type", ""),
|
|
"features": list(business_scope),
|
|
}
|
|
ctier = infer_company_tier(bp_dict)
|
|
redundancy_report = analyze_redundancy(cmp_vendors, company_tier=ctier)
|
|
logger.info(
|
|
"Redundanz: %d Kategorien mit Mehrfach-Anbietern, "
|
|
"Spar-Schaetzung %s pro Jahr (company_tier=%s)",
|
|
redundancy_report["summary"]["redundancy_count"],
|
|
redundancy_report["summary"]["estimated_saving_pct"],
|
|
ctier,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Vendor redundancy analysis skipped: %s", e)
|
|
|
|
summary_html = build_management_summary(results)
|
|
scanned_html = build_scanned_urls_html(doc_entries)
|
|
providers_html = build_provider_list_html(banner_result, vvt_entries)
|
|
# P18: Deep-Block mit Phases + Quality-Score + Per-Category-Tracker
|
|
from .agent_doc_check_banner import build_banner_deep_html
|
|
banner_deep_html = build_banner_deep_html(banner_result)
|
|
vvt_html = build_vvt_table_html(cmp_vendors)
|
|
|
|
# MC scorecard aggregated across ALL docs in this run (DSGVO/TDDDG/
|
|
# BGB/...). Sits at the top so the GF sees the regulation-by-
|
|
# regulation view before drilling into per-doc details.
|
|
from compliance.services.mc_scorecard import build_scorecard
|
|
from .agent_doc_check_scorecard import build_scorecard_html
|
|
all_mc_checks: list[dict] = []
|
|
# P73: pro-doc Fails sammeln um Solution-Generator pro Doc-Type
|
|
# mit dem korrekten doc_text aufzurufen.
|
|
fails_by_doc: dict[str, list[dict]] = {}
|
|
for r in results:
|
|
for c in r.checks:
|
|
if c.id.startswith("mc-"):
|
|
rec = {
|
|
"id": c.id, "label": c.label, "passed": c.passed,
|
|
"severity": c.severity, "skipped": c.skipped,
|
|
"regulation": c.regulation,
|
|
"hint": getattr(c, "hint", "") or "",
|
|
}
|
|
all_mc_checks.append(rec)
|
|
if (not c.passed and not c.skipped
|
|
and (c.severity or "").upper() in ("CRITICAL", "HIGH")):
|
|
fails_by_doc.setdefault(r.doc_type, []).append(rec)
|
|
scorecard = build_scorecard(all_mc_checks) if all_mc_checks else {}
|
|
# Trend: load previous scorecard for the same tenant + domain so the
|
|
# email can show delta indicators (A6).
|
|
prev_scorecard: dict | None = None
|
|
if scorecard:
|
|
try:
|
|
from compliance.services.compliance_audit_log import (
|
|
list_runs_for_tenant,
|
|
)
|
|
tenant_id_for_trend = req.recipient or ""
|
|
base_domain_for_trend = _extract_domain(doc_entries) or ""
|
|
prev_runs = list_runs_for_tenant(
|
|
tenant_id_for_trend,
|
|
base_domain=base_domain_for_trend,
|
|
limit=1,
|
|
)
|
|
if prev_runs:
|
|
prev_scorecard = prev_runs[0].get("scorecard")
|
|
except Exception as e:
|
|
logger.debug("trend lookup skipped: %s", e)
|
|
scorecard_html = (
|
|
build_scorecard_html(scorecard, previous_scorecard=prev_scorecard)
|
|
if scorecard else ""
|
|
)
|
|
|
|
report_html = build_html_report(results, None, doc_texts)
|
|
profile_html = _build_profile_html(profile)
|
|
|
|
# O4: Vendor-Redundanz / EU-Alternativen + Cost-Savings-Block
|
|
from .agent_doc_check_redundancy import build_redundancy_html
|
|
redundancy_html = build_redundancy_html(redundancy_report)
|
|
|
|
# P1: Executive-Summary GANZ oben — CFO/GF sieht 4 KPIs + 2 CTAs.
|
|
from .agent_doc_check_exec_summary import build_exec_summary_html
|
|
# Site-Name fuer Header bestimmen (gleiche Logik wie Email-Subject)
|
|
url_company_for_exec = _company_name_from_url(doc_entries)
|
|
domain_for_exec = _extract_domain(doc_entries)
|
|
site_name_for_exec = url_company_for_exec or domain_for_exec or ""
|
|
exec_summary_html = build_exec_summary_html(
|
|
scorecard=scorecard,
|
|
previous_scorecard=prev_scorecard,
|
|
cmp_vendors=cmp_vendors,
|
|
redundancy_report=redundancy_report,
|
|
site_name=site_name_for_exec,
|
|
)
|
|
|
|
# P18: Critical-Findings-Block (rot oben, mit Sofortmassnahmen +
|
|
# Quellen + Bussgeld-Praezedenz). Wird nur gerendert wenn echte
|
|
# kritische Verstoesse vorliegen.
|
|
critical_html = ""
|
|
try:
|
|
from .agent_doc_check_critical import build_critical_findings_html
|
|
critical_html = build_critical_findings_html(
|
|
banner_result=banner_result,
|
|
scorecard=scorecard,
|
|
results=results,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Critical-findings block skipped: %s", e)
|
|
|
|
# P10: Cookie-Policy-Architecture-Detection (BMW-Pattern erkennen)
|
|
cookie_arch_html = ""
|
|
try:
|
|
from compliance.services.cookie_policy_architecture import (
|
|
detect_architecture, build_architecture_html,
|
|
)
|
|
cookie_doc_url = ""
|
|
cookie_doc_text = doc_texts.get("cookie", "")
|
|
cookie_cmp_payloads: list[dict] = []
|
|
for e in doc_entries:
|
|
if (e.get("doc_type") or "").lower() in ("cookie", "cookie_policy"):
|
|
cookie_doc_url = e.get("url", "")
|
|
cookie_cmp_payloads = e.get("cmp_payloads") or []
|
|
break
|
|
# P17-A: Fallback wenn Cookie-Doc via P15 deduped wurde — nutze
|
|
# den DSE-Text wenn er Cookie-Schluesselwoerter enthaelt.
|
|
if not cookie_doc_text:
|
|
dse_text = doc_texts.get("dse", "")
|
|
if dse_text and any(w in dse_text.lower() for w in
|
|
("cookie", "tracking", "google analytics",
|
|
"consent")):
|
|
cookie_doc_text = dse_text
|
|
dse_entry = next((e for e in doc_entries
|
|
if e.get("doc_type") == "dse"), {})
|
|
cookie_doc_url = dse_entry.get("url", "")
|
|
cookie_cmp_payloads = dse_entry.get("cmp_payloads") or []
|
|
logger.info("P17-A: cookie-arch fallback auf DSE (Cookie-Doc deduped)")
|
|
if cookie_doc_text:
|
|
arch = detect_architecture(
|
|
doc_url=cookie_doc_url,
|
|
doc_text=cookie_doc_text,
|
|
cmp_payloads=cookie_cmp_payloads,
|
|
homepage_cmp_payloads=cmp_payloads or [],
|
|
)
|
|
cookie_arch_html = build_architecture_html(arch)
|
|
logger.info("cookie-arch: layer=%s versioned=%s risk=%s",
|
|
arch["layer_separation"], arch["versioned"], arch["risk_label"])
|
|
except Exception as e:
|
|
logger.warning("cookie-architecture detection failed: %s", e)
|
|
|
|
# Reihenfolge — Sales-optimiert:
|
|
# 1) Exec-Summary (KPIs + Saving + CTAs)
|
|
# 2) summary_html (Konkrete Aufgaben fuer die Geschaeftsfuehrung)
|
|
# 3) scanned_urls (Quellen-Transparenz)
|
|
# 4) profile_html (Erkanntes Geschaeftsmodell)
|
|
# 5) scorecard_html (MC-Scorecard)
|
|
# 6) redundancy_html (Optimierungspotenzial — direkt nach Compliance-Score)
|
|
# 7) providers_html + vvt_html (Vendor-Liste)
|
|
# 8) report_html (Doc-Pruefung Details)
|
|
# P62: Marketing-Manager-Disclaimer — was wir sehen vs nicht sehen
|
|
scope_disclaimer_html = ""
|
|
try:
|
|
from .scope_disclaimer import build_scope_disclaimer_html
|
|
scope_disclaimer_html = build_scope_disclaimer_html()
|
|
except Exception as e:
|
|
logger.warning("Scope-disclaimer block skipped: %s", e)
|
|
|
|
# P102: Cookie-Klassifikations-Pruefung (deklariert vs Library)
|
|
library_mismatch_html = ""
|
|
mismatches: list[dict] = []
|
|
try:
|
|
from compliance.services.cookie_library_mismatch import (
|
|
detect_mismatches, build_mismatch_block_html,
|
|
)
|
|
from database import SessionLocal
|
|
cookie_doc_for_check = doc_texts.get("cookie") or doc_texts.get("dse") or ""
|
|
all_cookies_seen: list[str] = []
|
|
if banner_result:
|
|
for ph in (banner_result.get("phases") or {}).values():
|
|
if isinstance(ph, dict):
|
|
for ck in (ph.get("cookies") or []):
|
|
if isinstance(ck, str):
|
|
all_cookies_seen.append(ck)
|
|
elif isinstance(ck, dict) and ck.get("name"):
|
|
all_cookies_seen.append(ck["name"])
|
|
if all_cookies_seen and cookie_doc_for_check:
|
|
_mm_db = SessionLocal()
|
|
try:
|
|
mismatches = detect_mismatches(
|
|
_mm_db, all_cookies_seen, cookie_doc_for_check,
|
|
)
|
|
if mismatches:
|
|
library_mismatch_html = build_mismatch_block_html(mismatches)
|
|
logger.info(
|
|
"P102: %d Cookie-Mismatches gefunden", len(mismatches)
|
|
)
|
|
finally:
|
|
_mm_db.close()
|
|
except Exception as e:
|
|
logger.warning("P102 mismatch detection failed: %s", e)
|
|
|
|
# P35 + P77 + P78: Textsignal-Checks (Save-Label, Cookies-in-DSE,
|
|
# JC-Klausel im DSE)
|
|
signals_html = ""
|
|
try:
|
|
from compliance.services.doc_text_signals import (
|
|
run_all as run_signal_checks,
|
|
build_signals_block_html,
|
|
)
|
|
cookie_doc_missing = not bool(doc_texts.get("cookie"))
|
|
sig_findings = run_signal_checks(
|
|
banner_result, doc_texts, cookie_doc_missing,
|
|
)
|
|
if sig_findings:
|
|
signals_html = build_signals_block_html(sig_findings)
|
|
except Exception as e:
|
|
logger.warning("P35/P77/P78 signals-check failed: %s", e)
|
|
|
|
# P92 + P94: Banner-Konsistenz (CMP-Tool kaputt / Banner-vs-Doc-Diff)
|
|
consistency_html = ""
|
|
try:
|
|
from compliance.services.banner_consistency_checks import (
|
|
run_all as run_consistency_checks,
|
|
build_consistency_block_html,
|
|
)
|
|
cookie_doc_for_check = (doc_texts.get("cookie")
|
|
or doc_texts.get("dse") or "")
|
|
cons_findings = run_consistency_checks(
|
|
banner_result or {}, cookie_doc_for_check, cmp_vendors,
|
|
doc_texts=doc_texts,
|
|
)
|
|
if cons_findings:
|
|
consistency_html = build_consistency_block_html(cons_findings)
|
|
logger.info("P92/P94: %d Konsistenz-Findings", len(cons_findings))
|
|
except Exception as e:
|
|
logger.warning("P92/P94 consistency-check failed: %s", e)
|
|
|
|
# P73: MC-Solution-Generator — LLM-Vorschlaege pro HIGH-Fail.
|
|
# Max 5 Solutions pro Doc-Type um Latenz < 60s zu halten.
|
|
solutions_html = ""
|
|
try:
|
|
from compliance.services.mc_solution_generator import (
|
|
generate_solutions_for_fails, build_solutions_block_html,
|
|
)
|
|
all_solutions: list[dict] = []
|
|
for dt, fails in fails_by_doc.items():
|
|
if not fails:
|
|
continue
|
|
doc_txt = doc_texts.get(dt) or doc_texts.get("dse") or ""
|
|
if not doc_txt or len(doc_txt) < 500:
|
|
continue
|
|
sols = await generate_solutions_for_fails(
|
|
fails, doc_txt, dt, limit=3,
|
|
)
|
|
all_solutions.extend(sols)
|
|
if len(all_solutions) >= 8:
|
|
break # global cap
|
|
if all_solutions:
|
|
solutions_html = build_solutions_block_html(all_solutions[:8])
|
|
logger.info("P73: %d MC-Solutions generiert", len(all_solutions))
|
|
except Exception as e:
|
|
logger.warning("P73 MC-Solution-Generator skipped: %s", e)
|
|
|
|
# P71: JC-vs-AVV Entscheidungsbaum (nur wenn DSE ambig)
|
|
jc_decision_html = ""
|
|
try:
|
|
from compliance.services.jc_avv_decision import (
|
|
build_jc_avv_decision_html,
|
|
)
|
|
jc_decision_html = build_jc_avv_decision_html(doc_texts.get("dse"))
|
|
except Exception as e:
|
|
logger.warning("P71 jc_avv_decision skipped: %s", e)
|
|
|
|
# P82: GF-1-Pager ganz oben in der Mail — 5-Bullet-Zusammenfassung
|
|
# damit die GF nicht 124k Char lesen muss.
|
|
gf_one_pager_html = ""
|
|
try:
|
|
from compliance.services.gf_one_pager import build_gf_one_pager_html
|
|
gf_one_pager_html = build_gf_one_pager_html(
|
|
site_name=site_name_for_exec,
|
|
scorecard=scorecard,
|
|
previous_scorecard=prev_scorecard,
|
|
banner_result=banner_result,
|
|
library_mismatch_findings=mismatches,
|
|
scan_context=req.scan_context,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("P82 GF-1-pager skipped: %s", e)
|
|
|
|
# P86: Branchen-Benchmark (nur wenn scan_context.industry gesetzt)
|
|
bench_html = ""
|
|
try:
|
|
from database import SessionLocal as _SLb
|
|
from compliance.services.industry_benchmark import (
|
|
compute_benchmark, build_benchmark_html, _extract_score,
|
|
)
|
|
industry = (req.scan_context or {}).get("industry") if req.scan_context else None
|
|
curr_score = _extract_score(banner_result)
|
|
if industry and curr_score is not None:
|
|
_b_db = _SLb()
|
|
try:
|
|
bench = compute_benchmark(
|
|
_b_db, industry, curr_score, check_id,
|
|
)
|
|
if bench:
|
|
bench_html = build_benchmark_html(bench)
|
|
finally:
|
|
_b_db.close()
|
|
except Exception as e:
|
|
logger.warning("P86 industry-benchmark skipped: %s", e)
|
|
|
|
# P84: Diff-Mode — "Seit letztem Lauf X Findings weg, Y neue".
|
|
diff_html = ""
|
|
try:
|
|
from database import SessionLocal as _SL
|
|
from compliance.services.run_diff import (
|
|
compute_diff, build_diff_block_html,
|
|
)
|
|
_diff_db = _SL()
|
|
try:
|
|
diff = compute_diff(
|
|
_diff_db, check_id, domain_for_exec or "",
|
|
banner_result, scorecard,
|
|
)
|
|
if diff:
|
|
diff_html = build_diff_block_html(diff)
|
|
finally:
|
|
_diff_db.close()
|
|
except Exception as e:
|
|
logger.warning("P84 diff-mode skipped: %s", e)
|
|
|
|
full_html = (
|
|
gf_one_pager_html + bench_html + diff_html
|
|
+ critical_html + scope_disclaimer_html + exec_summary_html
|
|
+ cookie_arch_html + summary_html + scanned_html + profile_html
|
|
+ scorecard_html + redundancy_html
|
|
+ providers_html + banner_deep_html + library_mismatch_html
|
|
+ consistency_html + signals_html + solutions_html
|
|
+ jc_decision_html
|
|
+ vvt_html + report_html
|
|
)
|
|
|
|
# Step 6: Send email — derive site name primarily from entered URL.
|
|
# The extracted_profile.companyName is often noisy (e.g. picks up
|
|
# juris.de from legal references). Domain-derived name is more
|
|
# predictable for the GF email subject.
|
|
doc_count = len([r for r in results if not r.error])
|
|
url_company = _company_name_from_url(doc_entries)
|
|
domain = _extract_domain(doc_entries)
|
|
site_name = url_company or domain or "Unbekannt"
|
|
_update(check_id, "E-Mail wird versendet...", 98)
|
|
email_result = send_email(
|
|
recipient=req.recipient,
|
|
subject=f"[COMPLIANCE-CHECK] {site_name} — {doc_count} Dokumente geprueft",
|
|
body_html=full_html,
|
|
)
|
|
|
|
# Step 7: Store result
|
|
response = {
|
|
"check_id": check_id,
|
|
"results": [_result_to_dict(r) for r in results],
|
|
"business_profile": profile_dict,
|
|
"extracted_profile": extracted_profile,
|
|
# P18: vollen consent-tester-Output durchreichen statt nur 4 Felder.
|
|
# phases (before/after-accept/reject) + banner_checks.violations +
|
|
# category_tests werden vom Renderer + Critical-Findings-Block genutzt.
|
|
"banner_result": ({
|
|
"detected": banner_result.get("banner_detected", False),
|
|
"provider": banner_result.get("banner_provider", ""),
|
|
"violations": len((banner_result.get("banner_checks") or {})
|
|
.get("violations", [])),
|
|
"tcf_vendor_count": len(tcf_vendors),
|
|
"completeness_pct": banner_result.get("completeness_pct"),
|
|
"correctness_pct": banner_result.get("correctness_pct"),
|
|
"phases": banner_result.get("phases", {}),
|
|
"banner_checks": banner_result.get("banner_checks", {}),
|
|
"category_tests": banner_result.get("category_tests", []),
|
|
"structured_checks": banner_result.get("structured_checks", []),
|
|
"summary": banner_result.get("summary", {}),
|
|
} if banner_result else None),
|
|
"tcf_vendors": vvt_entries if tcf_vendors else [],
|
|
"cmp_vendors": cmp_vendors,
|
|
"total_documents": len(results),
|
|
"total_findings": total_findings,
|
|
"email_status": email_result.get("status", "failed"),
|
|
"checked_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
_compliance_check_jobs[check_id]["status"] = "completed"
|
|
_compliance_check_jobs[check_id]["result"] = response
|
|
_compliance_check_jobs[check_id]["progress"] = "Fertig"
|
|
_compliance_check_jobs[check_id]["progress_pct"] = 100
|
|
|
|
# P80: persist raw scan data so we can replay audit pipeline
|
|
# without re-crawling (7min -> 5sec test cycle).
|
|
try:
|
|
from database import SessionLocal
|
|
from compliance.services.check_snapshot import save_snapshot
|
|
snap_db = SessionLocal()
|
|
try:
|
|
save_snapshot(
|
|
snap_db,
|
|
check_id=check_id,
|
|
doc_entries=doc_entries,
|
|
banner_result=banner_result,
|
|
profile=profile,
|
|
cmp_vendors=cmp_vendors,
|
|
scan_context=req.scan_context, # P79
|
|
site_label=site_name,
|
|
notes=f"recipient={req.recipient}",
|
|
)
|
|
finally:
|
|
snap_db.close()
|
|
except Exception as snap_err:
|
|
logger.warning("P80 snapshot save skipped: %s", snap_err)
|
|
|
|
# Persist to sidecar SQLite audit log — enables /audit endpoints
|
|
# (A5 admin tab) and trend view (A6). Best-effort; failures here
|
|
# do not affect the user-facing response.
|
|
try:
|
|
from compliance.services.compliance_audit_log import record_check_run
|
|
from compliance.services.mc_scorecard import full_audit_records
|
|
audit_rows: list[dict] = []
|
|
for r in results:
|
|
doc_mc = [c for c in r.checks if c.id.startswith("mc-")]
|
|
audit_rows.extend(full_audit_records(
|
|
[{"id": c.id, "label": c.label, "passed": c.passed,
|
|
"severity": c.severity, "skipped": c.skipped,
|
|
"regulation": c.regulation, "matched_text": c.matched_text,
|
|
"hint": c.hint, "level": c.level}
|
|
for c in doc_mc],
|
|
check_id=check_id,
|
|
doc_type=r.doc_type,
|
|
))
|
|
record_check_run(
|
|
check_id=check_id,
|
|
tenant_id=req.recipient or "",
|
|
site_name=site_name,
|
|
base_domain=domain or "",
|
|
doc_count=doc_count,
|
|
scorecard=scorecard,
|
|
vvt_summary={
|
|
"total": len(cmp_vendors),
|
|
"internal": sum(1 for v in cmp_vendors
|
|
if (v.get("recipient_type") or "").upper()
|
|
in ("INTERNAL", "GROUP_COMPANY")),
|
|
"external": sum(1 for v in cmp_vendors
|
|
if (v.get("recipient_type") or "").upper()
|
|
in ("PROCESSOR", "CONTROLLER")),
|
|
},
|
|
mc_records=audit_rows,
|
|
)
|
|
from compliance.services.compliance_audit_log import record_check_payload
|
|
record_check_payload(
|
|
check_id=check_id,
|
|
vendors=cmp_vendors,
|
|
profile=extracted_profile,
|
|
banner=banner_result,
|
|
)
|
|
# Unified findings (P5): bundle MC + Pflichtangaben + Vendor +
|
|
# Redundanz in one searchable table behind /agent/findings/<id>.
|
|
try:
|
|
from compliance.services.unified_findings_collector import collect
|
|
from compliance.services.unified_findings_store import record_findings
|
|
unified = collect(
|
|
check_id=check_id,
|
|
results=results,
|
|
cmp_vendors=cmp_vendors,
|
|
redundancy_report=redundancy_report,
|
|
doc_texts=doc_texts,
|
|
)
|
|
record_findings(check_id, unified)
|
|
except Exception as e:
|
|
logger.warning("Unified findings collect failed: %s", e)
|
|
except Exception as e:
|
|
logger.warning("Audit persistence skipped: %s", e)
|
|
|
|
except Exception as e:
|
|
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
|
|
_compliance_check_jobs[check_id]["status"] = "failed"
|
|
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
|
|
|
|
|
|
def _update(check_id: str, msg: str, pct: int | None = None):
|
|
job = _compliance_check_jobs[check_id]
|
|
job["progress"] = msg
|
|
if pct is not None:
|
|
job["progress_pct"] = max(0, min(100, int(pct)))
|
|
|
|
|
|
async def _fetch_text(url: str, doc_type: str = "") -> tuple[str, list[dict]]:
|
|
"""Fetch text from URL via consent-tester, with HTTP fallback.
|
|
|
|
Returns (text, cmp_payloads). cmp_payloads is the raw CMP JSON captured
|
|
during navigation (ePaaS, OneTrust, …) — empty when no CMP fired or
|
|
HTTP fallback was used. Backend turns payloads into structured vendor
|
|
records for the VVT table in the email.
|
|
"""
|
|
# 1. Consent-tester (Playwright-based, full JS rendering).
|
|
# max_documents depends on doc_type:
|
|
# - cookie/dse/social_media: self-extract (often + CMP capture) is
|
|
# authoritative, sub-pages dilute the policy text. max=1.
|
|
# - impressum/agb/widerruf/nutzungsbedingungen/dsb: BMW & similar
|
|
# enterprise sites split this across 3-4 short sub-pages
|
|
# (Versicherungsvermittler, Aufsicht, Berufsrecht). max=3 follows
|
|
# them. The 15s networkidle bail (dsi_helpers) keeps timing safe.
|
|
short_extract_types = {"cookie", "dse", "datenschutz", "privacy", "social_media"}
|
|
max_docs = 1 if (doc_type or "") in short_extract_types else 3
|
|
try:
|
|
# P90: 120s reicht nicht fuer BMW-Impressum (Auto-Discovery folgt
|
|
# 3 Sub-Docs). 240s gibt Spielraum. Mercedes faellt aktuell mit
|
|
# 120s auch oft an Akamai-Latenz.
|
|
async with httpx.AsyncClient(timeout=240.0) as client:
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/dsi-discovery",
|
|
json={"url": url, "max_documents": max_docs},
|
|
timeout=240.0,
|
|
)
|
|
if resp.status_code == 200:
|
|
payload = resp.json()
|
|
docs = payload.get("documents", [])
|
|
cmp_payloads = payload.get("cmp_payloads") or []
|
|
cmp_cookie_text = payload.get("cmp_cookie_text") or ""
|
|
if docs:
|
|
texts = []
|
|
for doc in docs:
|
|
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
|
|
if t and len(t) > 50:
|
|
texts.append(t)
|
|
merged = "\n\n".join(texts)
|
|
# For cookie/dse/social_media: when CMP reconstruction is
|
|
# substantially richer than DOM extraction, use it. This
|
|
# fixes the BMW case where DOM yields ~600 words of
|
|
# navigation but the ePaaS payload reconstructs to ~1800
|
|
# words of actual cookie policy.
|
|
if (doc_type in short_extract_types
|
|
and cmp_cookie_text
|
|
and len(cmp_cookie_text.split()) > len(merged.split())):
|
|
logger.info(
|
|
"Preferring CMP-reconstructed text for %s on %s "
|
|
"(%d words CMP vs %d words DOM)",
|
|
doc_type, url,
|
|
len(cmp_cookie_text.split()),
|
|
len(merged.split()),
|
|
)
|
|
merged = cmp_cookie_text
|
|
if merged and len(merged.split()) > 100:
|
|
if len(texts) > 1:
|
|
logger.info("Merged %d docs from %s (%d words)",
|
|
len(texts), url, len(merged.split()))
|
|
return merged, cmp_payloads
|
|
# P90-Bug-Fix: auch wenn DSE-Text zu kurz fuer 100-Wort-
|
|
# Schwelle ist, die captured CMP-Payloads NICHT verwerfen.
|
|
# BMW-Bug: DSE liefert 10 Wort SPA-Shell, aber ePaaS-JSON
|
|
# (393KB) wurde captured. Backend braucht die fuer
|
|
# extract_vendors_from_payloads (VVT-Tabelle).
|
|
if cmp_payloads:
|
|
logger.info(
|
|
"P90: keeping %d CMP payloads for %s despite "
|
|
"short text (%d words) — HTTP fallback runs in parallel",
|
|
len(cmp_payloads), url,
|
|
len((merged or cmp_cookie_text).split()),
|
|
)
|
|
fallback_text = merged or cmp_cookie_text or ""
|
|
return fallback_text, cmp_payloads
|
|
except Exception as e:
|
|
# P90: verbose exception fuer Diagnose (war vorher empty)
|
|
logger.warning("Consent-tester fetch failed for %s: %s (%s)",
|
|
url, str(e) or "(empty)", type(e).__name__)
|
|
|
|
# 2. Fallback: direct HTTP fetch (works for SSR pages like BMW).
|
|
# P7: kenntlicher UA + per-Domain Rate-Limit.
|
|
try:
|
|
import re as _re
|
|
from compliance.services.compliance_user_agent import (
|
|
default_request_headers, DomainRateLimiter,
|
|
)
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, follow_redirects=True,
|
|
headers=default_request_headers(),
|
|
) as client:
|
|
async with DomainRateLimiter(url):
|
|
resp = await client.get(url)
|
|
if resp.status_code == 200 and "text/html" in resp.headers.get("content-type", ""):
|
|
html = resp.text
|
|
# Strip HTML tags, decode entities
|
|
text = _re.sub(r"<script[^>]*>.*?</script>", " ", html, flags=_re.DOTALL | _re.IGNORECASE)
|
|
text = _re.sub(r"<style[^>]*>.*?</style>", " ", text, flags=_re.DOTALL | _re.IGNORECASE)
|
|
text = _re.sub(r"<[^>]+>", " ", text)
|
|
text = _re.sub(r"\s+", " ", text).strip()
|
|
if len(text.split()) > 100:
|
|
logger.info("HTTP fallback for %s: %d words", url, len(text.split()))
|
|
return text, []
|
|
except Exception as e:
|
|
logger.warning("HTTP fallback failed for %s: %s", url, e)
|
|
|
|
return "", []
|
|
|
|
|
|
async def _autodiscover_missing(
|
|
check_id: str,
|
|
doc_entries: list[dict],
|
|
doc_texts: dict[str, str],
|
|
url_text_cache: dict[str, str],
|
|
) -> None:
|
|
"""For each canonical doc_type the user did not submit, try to find
|
|
the corresponding document on the homepage of the site they DID submit.
|
|
|
|
Modifies doc_entries in place: fills text/url/word_count and sets
|
|
`auto_discovered=True`. Marks `discovery_attempted=True` on every
|
|
missing entry (even when nothing was found) so the report can
|
|
distinguish 'Nicht eingereicht' from 'Auf der Website nicht gefunden'.
|
|
"""
|
|
from urllib.parse import urlparse
|
|
|
|
# VW-Fix: nur Doc-Types mit substantieller Text-Ausbeute zaehlen
|
|
# als 'submitted'. Wenn der User eine URL eingegeben hat aber die
|
|
# 404 liefert (VW cookie-richtlinie.html), oder der Crawler weniger
|
|
# als 200 Zeichen extrahiert (SPA-Shell), als 'missing' behandeln
|
|
# damit der Discovery-Pass alternative URLs probiert.
|
|
_MIN_USEFUL_CHARS = 200
|
|
submitted_types = {
|
|
e["doc_type"] for e in doc_entries
|
|
if len((e.get("text") or "").strip()) >= _MIN_USEFUL_CHARS
|
|
}
|
|
# Markiere die fehlgeschlagenen URL-Submissions damit der Discovery
|
|
# ihre URL nicht erneut probiert (waere sinnlos).
|
|
failed_urls: set[str] = {
|
|
(e.get("url") or "").strip()
|
|
for e in doc_entries
|
|
if (e.get("url") or "").strip()
|
|
and len((e.get("text") or "").strip()) < _MIN_USEFUL_CHARS
|
|
}
|
|
if failed_urls:
|
|
logger.info(
|
|
"VW-Fix: %d eingegebene URLs lieferten <%d Zeichen — Discovery "
|
|
"soll Alternativen probieren: %s",
|
|
len(failed_urls), _MIN_USEFUL_CHARS,
|
|
", ".join(list(failed_urls)[:3]),
|
|
)
|
|
# Map alias types to canonical
|
|
submitted_canon = {
|
|
"dse" if t in ("datenschutz", "privacy") else t for t in submitted_types
|
|
}
|
|
# Missing = canonical types the user did NOT submit
|
|
missing = set(_ALL_DOC_TYPES) - submitted_canon
|
|
if not missing:
|
|
return
|
|
|
|
# Pick the most common base (scheme://netloc) from submitted URLs.
|
|
bases: dict[str, int] = {}
|
|
for e in doc_entries:
|
|
u = (e.get("url") or "").strip()
|
|
if u and "://" in u:
|
|
p = urlparse(u)
|
|
base = f"{p.scheme}://{p.netloc}"
|
|
bases[base] = bases.get(base, 0) + 1
|
|
if not bases:
|
|
# No submitted URL at all — nothing to crawl from. Add empty
|
|
# placeholders (with discovery_attempted=False) so the padding
|
|
# step renders them as 'Nicht eingereicht' (not 'Nicht gefunden').
|
|
for dt in missing:
|
|
doc_entries.append({
|
|
"doc_type": dt, "url": "", "text": "", "word_count": 0,
|
|
"auto_discovered": False, "discovery_attempted": False,
|
|
})
|
|
return
|
|
|
|
# Build crawl plan: primary base + any related domains mentioned in
|
|
# the submitted texts that share the owner's SLD. Example: BMW Group
|
|
# text mentions bmwgroup.com and bmwgroup.jobs in addition to bmw.de.
|
|
primary_base = max(bases, key=bases.get) + "/"
|
|
crawl_bases: list[str] = [primary_base]
|
|
primary_netloc = urlparse(primary_base).netloc.lower().lstrip("www.")
|
|
owner_token = primary_netloc.split(".")[0] # 'bmw'
|
|
|
|
if owner_token and len(owner_token) >= 3:
|
|
domain_re = re.compile(
|
|
r"https?://([a-z0-9][a-z0-9\-]*\.)*" + re.escape(owner_token)
|
|
+ r"[a-z0-9\-]*\.[a-z]{2,}",
|
|
re.IGNORECASE,
|
|
)
|
|
seen_bases = {primary_base}
|
|
for entry in doc_entries:
|
|
text = entry.get("text") or ""
|
|
for m in domain_re.finditer(text):
|
|
p = urlparse(m.group(0))
|
|
base = f"{p.scheme}://{p.netloc}/"
|
|
base_netloc = p.netloc.lower().lstrip("www.")
|
|
if base_netloc == primary_netloc:
|
|
continue
|
|
if base in seen_bases:
|
|
continue
|
|
seen_bases.add(base)
|
|
crawl_bases.append(base)
|
|
if len(crawl_bases) >= 3:
|
|
break
|
|
if len(crawl_bases) >= 3:
|
|
break
|
|
|
|
_update(
|
|
check_id,
|
|
f"Suche fehlende Dokumente auf {', '.join(urlparse(b).netloc for b in crawl_bases)}...",
|
|
18,
|
|
)
|
|
|
|
discovered: list[dict] = []
|
|
disc_payloads: list[dict] = []
|
|
disc_cookie_texts: list[str] = []
|
|
for base in crawl_bases:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client: # P90: 180s -> 300s
|
|
resp = await client.post(
|
|
f"{CONSENT_TESTER_URL}/dsi-discovery",
|
|
json={"url": base, "max_documents": 15},
|
|
timeout=300.0, # P90: 180s -> 300s
|
|
)
|
|
if resp.status_code != 200:
|
|
logger.warning("auto-discovery: HTTP %d for %s",
|
|
resp.status_code, base)
|
|
continue
|
|
body = resp.json()
|
|
discovered.extend(body.get("documents", []) or [])
|
|
disc_payloads.extend(body.get("cmp_payloads") or [])
|
|
cmp_text = body.get("cmp_cookie_text") or ""
|
|
if cmp_text:
|
|
disc_cookie_texts.append(cmp_text)
|
|
logger.info("auto-discovery on %s: %d docs, %d CMP payloads, "
|
|
"cmp_cookie_text=%d words", base,
|
|
len(body.get("documents", []) or []),
|
|
len(body.get("cmp_payloads") or []),
|
|
len(cmp_text.split()))
|
|
except Exception as e:
|
|
# P90: verbose exception fuer Diagnose
|
|
logger.warning("auto-discovery failed for %s: %s (%s)",
|
|
base, str(e) or "(empty)", type(e).__name__)
|
|
|
|
# Classify each discovered doc into a canonical doc_type
|
|
by_type: dict[str, dict] = {}
|
|
for d in discovered:
|
|
title = (d.get("title") or "").lower()
|
|
url = (d.get("url") or "").lower()
|
|
wc = d.get("word_count") or 0
|
|
if wc < 100:
|
|
continue
|
|
canon = _classify_discovered_doc(title, url)
|
|
if canon and canon in missing and canon not in by_type:
|
|
by_type[canon] = d
|
|
|
|
# Append/Update entry for every missing canonical type. Auto-discovered
|
|
# ones get the text/URL filled; ungratched ones stay empty so the
|
|
# padding step renders them as 'Auf der Website nicht gefunden'.
|
|
# VW-Fix: wenn schon ein leerer entry existiert (URL gesetzt, aber
|
|
# fetch hat 0/Mini-Text geliefert), in-place updaten statt duplizieren.
|
|
filled = 0
|
|
for dt in missing:
|
|
existing = next((e for e in doc_entries
|
|
if e.get("doc_type") == dt), None)
|
|
new_entry: dict = existing if existing else {
|
|
"doc_type": dt, "url": "", "text": "", "word_count": 0,
|
|
"auto_discovered": False, "discovery_attempted": True,
|
|
"cmp_payloads": [],
|
|
}
|
|
new_entry["discovery_attempted"] = True
|
|
d = by_type.get(dt)
|
|
if d:
|
|
full = d.get("full_text") or d.get("text_preview") or ""
|
|
# For cookie: prefer the CMP-reconstructed text when it's
|
|
# substantially richer than the auto-discovered DOM extraction.
|
|
# BMW homepage CMP yields ~1800 words of authoritative policy;
|
|
# DOM extraction typically yields ~600 words of site chrome.
|
|
if dt == "cookie" and disc_cookie_texts:
|
|
cmp_merged = "\n\n".join(disc_cookie_texts)
|
|
if len(cmp_merged.split()) > len(full.split()):
|
|
logger.info(
|
|
"cookie: using CMP-reconstructed text (%d words) "
|
|
"instead of DOM (%d words)",
|
|
len(cmp_merged.split()), len(full.split()),
|
|
)
|
|
full = cmp_merged
|
|
if len(full.split()) >= 100:
|
|
new_entry["text"] = full
|
|
# Behalte die original URL als "rejected_url" damit Audit
|
|
# zeigt 'X war 404, wir haben Y gefunden'.
|
|
if existing and (existing.get("url") or "").strip() in failed_urls:
|
|
new_entry["rejected_url"] = existing.get("url")
|
|
new_entry["url"] = d.get("url", "")
|
|
new_entry["word_count"] = len(full.split())
|
|
new_entry["auto_discovered"] = True
|
|
if dt == "cookie" and disc_payloads:
|
|
new_entry["cmp_payloads"] = disc_payloads
|
|
doc_texts[dt] = full
|
|
filled += 1
|
|
logger.info(
|
|
"auto-discovered %s on %s: %s (%d words)%s",
|
|
dt, base, d.get("url", "")[:80], new_entry["word_count"],
|
|
" [REPLACED failed URL]" if existing else "",
|
|
)
|
|
if not existing:
|
|
doc_entries.append(new_entry)
|
|
|
|
logger.info(
|
|
"auto-discovery: filled %d/%d missing types from %s",
|
|
filled, len(missing), base,
|
|
)
|
|
|
|
|
|
# Title/URL keywords → canonical doc_type. Order matters: most-specific first.
|
|
_DISCOVERY_RULES: list[tuple[str, tuple[str, ...]]] = [
|
|
("cookie", ("cookie", "kuche", "biscuit", "cookies-")),
|
|
("widerruf", ("widerruf", "rueckgabe", "rückgabe", "cancellation",
|
|
"right-of-withdrawal", "ruecktritts", "rücktritts")),
|
|
("social_media", ("social-media", "soziale-medien", "social_media",
|
|
"social-media-policy")),
|
|
# P23: 'terms-and-conditions' kann Allgemeine Geschaeftsbedingungen ODER
|
|
# Nutzungsbedingungen meinen. Discovery-Funktion klassifiziert spaeter
|
|
# praeziser per Titel + Inhalt. Hier nur Url-Hint:
|
|
("agb", ("/agb", "geschaeftsbedingungen", "geschäftsbedingungen",
|
|
"general-terms")),
|
|
("nutzungsbedingungen", ("nutzungsbedingung", "nutzungsbedingungen",
|
|
"terms-of-use", "terms-and-conditions",
|
|
"nutzungsordnung", "terms-of-service",
|
|
"allgemeine-nutzungsbedingungen")),
|
|
("dsb", ("datenschutzbeauftragt", "data-protection-officer",
|
|
"dpo-contact", "/dsb")),
|
|
("impressum", ("impressum", "imprint", "legal-notice", "site-notice",
|
|
"anbieterkennzeichnung", "legal-disclaimer-pool")),
|
|
("dse", ("data-privacy", "datenschutz", "data-protection",
|
|
"privacy-policy", "privacy-notice", "dsgvo",
|
|
"data_privacy", "datenschutzinformation")),
|
|
]
|
|
|
|
|
|
def _classify_discovered_doc(title: str, url: str) -> str | None:
|
|
"""Map a discovered doc (by its title + URL) to one of our 8 canonical types."""
|
|
haystack = f"{title} {url}"
|
|
for canon, keywords in _DISCOVERY_RULES:
|
|
if any(kw in haystack for kw in keywords):
|
|
return canon
|
|
return None
|
|
|
|
|
|
async def _check_single(
|
|
text: str, doc_type: str, label: str, url: str,
|
|
word_count: int, use_agent: bool,
|
|
business_scope: set[str] | None = None,
|
|
business_profile: dict | None = None,
|
|
):
|
|
"""Run regex + MC checks on a single document."""
|
|
from compliance.services.doc_checks.runner import check_document_completeness
|
|
from compliance.services.rag_document_checker import check_document_with_controls
|
|
from .agent_doc_check_routes import CheckItem, DocCheckResult
|
|
|
|
# Regex checklist
|
|
findings = check_document_completeness(text, doc_type, label, url,
|
|
business_profile=business_profile)
|
|
|
|
all_checks: list[CheckItem] = []
|
|
completeness = 0
|
|
correctness = 0
|
|
|
|
for f in findings:
|
|
if "SCORE" in f.get("code", ""):
|
|
for c in f.get("all_checks", []):
|
|
all_checks.append(CheckItem(
|
|
id=c["id"], label=c["label"], passed=c["passed"],
|
|
severity=c["severity"], matched_text=c.get("matched_text", ""),
|
|
level=c.get("level", 1), parent=c.get("parent"),
|
|
skipped=c.get("skipped", False), hint=c.get("hint", ""),
|
|
))
|
|
completeness = f.get("completeness_pct", 0)
|
|
correctness = f.get("correctness_pct", 0)
|
|
|
|
# Master Control checks (top 20 by severity to avoid noise)
|
|
try:
|
|
# max_controls=0 -> evaluate ALL MCs for this doc_type (DB has
|
|
# 1874 across 8 types; regex matching is cheap and dominates
|
|
# well under 1s per doc). Caps remain on the LLM-enrich step
|
|
# (top-10 FAILs) so cost stays bounded.
|
|
mc_results = await check_document_with_controls(
|
|
text, doc_type, label, max_controls=0, use_agent=use_agent,
|
|
business_scope=business_scope,
|
|
)
|
|
if mc_results:
|
|
for mc in mc_results:
|
|
all_checks.append(CheckItem(**mc))
|
|
l2 = [c for c in all_checks if c.level == 2 and not c.skipped]
|
|
l2_passed = sum(1 for c in l2 if c.passed)
|
|
correctness = round(l2_passed / len(l2) * 100) if l2 else 0
|
|
except Exception as e:
|
|
logger.warning("MC check skipped for %s: %s", label, e)
|
|
|
|
# LLM verification of regex fails
|
|
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
|
|
if failed:
|
|
try:
|
|
from compliance.services.doc_checks.llm_verify import verify_failed_checks
|
|
overturns = await verify_failed_checks(
|
|
text,
|
|
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
|
|
label,
|
|
)
|
|
for c in all_checks:
|
|
if c.id in overturns and overturns[c.id]["overturned"]:
|
|
c.passed = True
|
|
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
|
|
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
|
|
l2_passed = sum(1 for c in l2_active if c.passed)
|
|
if l2_active:
|
|
correctness = round(l2_passed / len(l2_active) * 100)
|
|
except Exception as e:
|
|
logger.warning("LLM verification skipped: %s", e)
|
|
|
|
# Cookie-policy only: actively HTTP-probe the Opt-Out + Privacy-Policy
|
|
# URLs the document advertises. Broken links make individual provider
|
|
# entries non-compliant under Art. 7(3) DSGVO.
|
|
if doc_type == "cookie":
|
|
try:
|
|
from compliance.services.cookie_link_validator import (
|
|
extract_links, validate_links, build_check_items,
|
|
)
|
|
links = extract_links(text)
|
|
if links:
|
|
logger.info("Cookie-link validator: %d urls extracted from %s",
|
|
len(links), label)
|
|
validated = await validate_links(links)
|
|
for item in build_check_items(validated):
|
|
all_checks.append(CheckItem(**item))
|
|
# Re-compute correctness with the new L2 items
|
|
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
|
|
l2_passed = sum(1 for c in l2_active if c.passed)
|
|
if l2_active:
|
|
correctness = round(l2_passed / len(l2_active) * 100)
|
|
except Exception as e:
|
|
logger.warning("Cookie-link validation skipped for %s: %s", label, e)
|
|
|
|
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
|
|
return DocCheckResult(
|
|
label=label, url=url, doc_type=doc_type,
|
|
word_count=word_count or len(text.split()),
|
|
completeness_pct=completeness, correctness_pct=correctness,
|
|
checks=all_checks, findings_count=len(non_score),
|
|
)
|
|
|
|
|
|
def _pad_results_with_missing(
|
|
results: list,
|
|
discovery_attempted: set[str] | None = None,
|
|
) -> list:
|
|
"""Ensure every canonical doc_type has an entry in the results list.
|
|
|
|
Doc_types the user did not submit AND auto-discovery did not find get
|
|
a placeholder DocCheckResult. The error message distinguishes:
|
|
- 'Auf der Website nicht gefunden' (discovery was attempted)
|
|
- 'Nicht eingereicht' (no submitted URLs to crawl from)
|
|
|
|
Preserves the canonical ordering from _ALL_DOC_TYPES so the report
|
|
layout is stable.
|
|
"""
|
|
from .agent_doc_check_routes import DocCheckResult
|
|
attempted = discovery_attempted or set()
|
|
|
|
by_type: dict[str, object] = {}
|
|
for r in results:
|
|
canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type
|
|
by_type[canon] = r
|
|
|
|
ordered: list = []
|
|
for dt in _ALL_DOC_TYPES:
|
|
if dt in by_type:
|
|
ordered.append(by_type[dt])
|
|
continue
|
|
if dt in attempted:
|
|
msg = ("Auf der Website nicht gefunden — bitte URL des "
|
|
"Dokuments manuell eintragen, falls vorhanden")
|
|
else:
|
|
msg = "Nicht eingereicht — Quelle nicht angegeben"
|
|
ordered.append(DocCheckResult(
|
|
label=_doc_type_label(dt),
|
|
url="",
|
|
doc_type=dt,
|
|
word_count=0,
|
|
completeness_pct=0,
|
|
correctness_pct=0,
|
|
checks=[],
|
|
findings_count=0,
|
|
error=msg,
|
|
scenario="missing",
|
|
))
|
|
|
|
extras = [r for r in results
|
|
if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse")
|
|
not in _ALL_DOC_TYPES]
|
|
ordered.extend(extras)
|
|
return ordered
|
|
|
|
|
|
_COMPOUND_TLDS = {
|
|
"co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in",
|
|
"com.au", "com.br", "com.mx", "com.tr", "com.sg",
|
|
}
|
|
|
|
|
|
def _extract_domain(doc_entries: list[dict]) -> str | None:
|
|
"""Extract base domain (without www) from first URL."""
|
|
for entry in doc_entries:
|
|
url = entry.get("url", "")
|
|
if url and "://" in url:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(url).netloc.lower()
|
|
if host.startswith("www."):
|
|
host = host[4:]
|
|
return host or None
|
|
return None
|
|
|
|
|
|
def _company_name_from_url(doc_entries: list[dict]) -> str | None:
|
|
"""Derive a display company name from the entered URLs.
|
|
|
|
Heuristic: take the second-level domain (e.g. "bmw" from "www.bmw.de"),
|
|
uppercase short acronyms (<=4 chars, no hyphens), title-case the rest.
|
|
|
|
Examples:
|
|
www.bmw.de -> BMW
|
|
mercedes-benz.de -> Mercedes-Benz
|
|
shop.example.co.uk -> Example
|
|
juris.de -> Juris
|
|
"""
|
|
from urllib.parse import urlparse
|
|
|
|
for entry in doc_entries:
|
|
url = entry.get("url", "")
|
|
if not url or "://" not in url:
|
|
continue
|
|
host = urlparse(url).netloc.lower()
|
|
if host.startswith("www."):
|
|
host = host[4:]
|
|
parts = host.split(".")
|
|
if len(parts) < 2:
|
|
continue
|
|
# Handle compound TLDs (.co.uk etc.)
|
|
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
|
|
sld = parts[-3]
|
|
else:
|
|
sld = parts[-2]
|
|
if not sld:
|
|
continue
|
|
if len(sld) <= 4 and "-" not in sld:
|
|
return sld.upper()
|
|
return "-".join(p.capitalize() for p in sld.split("-"))
|
|
return None
|
|
|
|
|
|
def _get_skip_types(profile) -> dict[str, str]:
|
|
"""Doc_types to skip entirely with a per-type reason message.
|
|
|
|
Heute primaer fuer OEM-Konfigurator-Pattern (BMW/Audi/Mercedes):
|
|
wenn die Site kein Direkt-Vertrieb macht, sind AGB/Widerruf/
|
|
Nutzungsbedingungen nicht Pflicht auf der Website — sie werden
|
|
beim Vertragshaendler ausgehaendigt.
|
|
"""
|
|
if getattr(profile, "no_direct_sales", False):
|
|
msg = (
|
|
"Nicht anwendbar — die Webseite schliesst keinen Direkt-"
|
|
"Kaufvertrag (OEM-Konfigurator-Pattern, Vertrag laeuft "
|
|
"ueber Vertragshaendler). AGB/Widerruf werden beim "
|
|
"Haendler ausgehaendigt."
|
|
)
|
|
return {
|
|
"agb": msg,
|
|
"widerruf": msg,
|
|
"nutzungsbedingungen": msg,
|
|
}
|
|
return {}
|
|
|
|
|
|
def _apply_profile_filter(result, profile, doc_type: str):
|
|
"""Adjust INFO-level checks based on business profile context.
|
|
|
|
For example: ODR check only relevant for B2C online shops.
|
|
"""
|
|
from .agent_doc_check_routes import CheckItem
|
|
|
|
for check in result.checks:
|
|
cid = check.id.lower()
|
|
|
|
# ODR/OS-Link: relevant ONLY for B2C online shops. The check's
|
|
# default hint is written for B2B (it explains why it's not
|
|
# relevant) — for B2C we must replace it with action-oriented
|
|
# guidance, otherwise the report contradicts itself.
|
|
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
|
|
if profile.needs_odr:
|
|
if not check.passed:
|
|
check.hint = (
|
|
"Als B2C-Anbieter muessen Sie nach Art. 14 EU-VO 524/2013 "
|
|
"auf die OS-Plattform (https://ec.europa.eu/consumers/odr) "
|
|
"verlinken — klickbarer Link, nicht nur Text. Zusaetzlich "
|
|
"§36 VSBG: angeben, ob Sie an Verbraucher-"
|
|
"Streitbeilegungsverfahren teilnehmen (oder nicht)."
|
|
)
|
|
else:
|
|
check.skipped = True
|
|
check.hint = "Nicht relevant (kein B2C Online-Shop)"
|
|
|
|
# Widerruf: Flag entire document as unnecessary for B2B
|
|
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
|
|
check.severity = "INFO"
|
|
if not check.passed:
|
|
check.hint = (
|
|
"Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
|
|
"(§355 BGB gilt nur fuer Verbrauchervertraege). "
|
|
"Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
|
|
"Ihrer Website, da sie Verwirrung stiften kann."
|
|
)
|
|
|
|
# Regulated profession: check for Kammer info
|
|
if "kammer" in cid or "berufsordnung" in check.label.lower():
|
|
if not profile.is_regulated_profession:
|
|
check.skipped = True
|
|
check.hint = "Nicht relevant (kein regulierter Beruf)"
|
|
|
|
return result
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────
|
|
|
|
_DOC_TYPE_LABELS = {
|
|
"dse": "Datenschutzerklaerung",
|
|
"datenschutz": "Datenschutzerklaerung",
|
|
"privacy": "Datenschutzerklaerung",
|
|
"impressum": "Impressum",
|
|
"agb": "AGB",
|
|
"widerruf": "Widerrufsbelehrung",
|
|
"cookie": "Cookie-Richtlinie",
|
|
"avv": "Auftragsverarbeitung",
|
|
"loeschkonzept": "Loeschkonzept",
|
|
"dsfa": "Datenschutz-Folgenabschaetzung",
|
|
"social_media": "Social Media Datenschutz",
|
|
"nutzungsbedingungen": "Nutzungsbedingungen",
|
|
"dsb": "DSB-Kontakt",
|
|
# P74: Legal-Notice / Rechtliche Hinweise (IP, Forward-Looking, Risiko)
|
|
"legal_notice": "Rechtliche Hinweise",
|
|
# P96: Digital Services Act-Pflichtangaben (Art. 12+17 DSA)
|
|
"dsa": "DSA-Pflichtangaben",
|
|
# P97: Lizenzhinweise Dritter (OSS-Compliance)
|
|
"lizenzhinweise": "Lizenzhinweise Dritter",
|
|
}
|
|
|
|
# Canonical doc types in the same order as the frontend ComplianceCheckTab.
|
|
# The route pads `results` to always contain an entry for each — even if
|
|
# the user did not submit a URL — so the email + frontend always show
|
|
# the complete checklist (missing rows marked as 'Nicht eingereicht').
|
|
#
|
|
# DSB-Kontakt is intentionally NOT canonical: per GDPR practice the DSB is
|
|
# named *inside* the DSI/datenschutz document (email or contact block), not
|
|
# as a separate page. We check 'DSB benannt' as a sub-check of the DSE
|
|
# instead. If a tenant insists on a separate DSB document, they can still
|
|
# submit one — it just won't appear as a missing checklist row.
|
|
_ALL_DOC_TYPES = [
|
|
"dse", "impressum", "social_media", "cookie",
|
|
"agb", "nutzungsbedingungen", "widerruf",
|
|
]
|
|
|
|
|
|
def _doc_type_label(doc_type: str) -> str:
|
|
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
|
|
|
|
|
|
def _result_to_dict(r) -> dict:
|
|
"""Convert DocCheckResult to JSON-serializable dict."""
|
|
fields = ("id", "label", "passed", "severity", "matched_text",
|
|
"level", "parent", "skipped", "hint")
|
|
return {
|
|
"label": r.label, "url": r.url, "doc_type": r.doc_type,
|
|
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
|
|
"correctness_pct": r.correctness_pct,
|
|
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
|
|
"findings_count": r.findings_count, "error": r.error,
|
|
"scenario": getattr(r, "scenario", ""),
|
|
}
|
|
|
|
|
|
def _build_profile_html(profile) -> str:
|
|
from .agent_doc_check_report import build_profile_html
|
|
return build_profile_html(profile)
|
|
|
|
|
|
# Cross-check extracted to compliance.services.banner_cookie_cross_check
|
|
from compliance.services.banner_cookie_cross_check import cross_check_banner_vs_cookie as _cross_check_banner_vs_cookie
|
|
|
|
|
|
# ── Admin: audit drill-down (A5) + trend view (A6) ──────────────────
|
|
|
|
@router.get("/audit/{check_id}")
|
|
async def audit_drill_down(
|
|
check_id: str,
|
|
doc_type: str = "",
|
|
regulation: str = "",
|
|
only_failed: bool = False,
|
|
):
|
|
"""Return scorecard + filterable MC results for a single check run.
|
|
|
|
Frontend uses this to render the /sdk/agent/audit/<check_id> view.
|
|
"""
|
|
from compliance.services.compliance_audit_log import (
|
|
get_check_run, list_mc_results,
|
|
)
|
|
run = get_check_run(check_id)
|
|
if not run:
|
|
return {"check_id": check_id, "found": False}
|
|
rows = list_mc_results(
|
|
check_id,
|
|
doc_type=doc_type or None,
|
|
regulation=regulation or None,
|
|
only_failed=only_failed,
|
|
)
|
|
return {
|
|
"check_id": check_id,
|
|
"found": True,
|
|
"run": run,
|
|
"mc_count": len(rows),
|
|
"results": rows,
|
|
}
|
|
|
|
|
|
@router.get("/audit/tenant/{tenant_id}")
|
|
async def audit_tenant_history(
|
|
tenant_id: str,
|
|
base_domain: str = "",
|
|
limit: int = 30,
|
|
):
|
|
"""Tenant-level history for the trend view (A6)."""
|
|
from compliance.services.compliance_audit_log import list_runs_for_tenant
|
|
runs = list_runs_for_tenant(
|
|
tenant_id, base_domain=base_domain or None, limit=limit,
|
|
)
|
|
return {"tenant_id": tenant_id, "count": len(runs), "runs": runs}
|