Files
breakpilot-compliance/backend-compliance/compliance/api/agent_compliance_check_routes.py
T
Benjamin Admin 662327e8b4
CI / nodejs-build (push) Successful in 2m47s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / detect-changes (push) Successful in 10s
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 17s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
feat(compliance-check): MC-Classification + Embedding + Vendor-Redundanz + Action-Recipes + Borlabs-Features
Massiv-Update auf Basis BMW-Test-Iterationen (v1→v9):

Core Compliance-Check
- Sonnet check_type Klassifikation: text/process/review fuer alle 1874 MCs
  in compliance.doc_check_controls (script + Sidecar /data/mc_classification.db).
  rag_document_checker filtert auf check_type='text' fuer doc_check.
  Plus fits_doc_type-Audit (v2) + ui_only-Audit fuer DSA/E-Commerce-MCs in
  falscher doc_type-Schublade.
- scope_requires-Filter: biometric/ai_decision/child_targeting MCs werden
  per business_profile gefiltert (FRT skipped fuer BMW etc.).
- Embedding-Match (BGE-M3) als Phase-3 nach Regex-Match:
  Per-doc_type-Threshold-Override (impressum 0.50, dse/cookie 0.60),
  Short-Field-Rescue (15-Wort-Chunks) fuer Pflichtfelder im Impressum.
  Title+check_question als Embedding-Input fuer mehr Kontext.
- Cookie-Text-Routing: consent-tester gibt cmp_cookie_text aus dem
  CMP-Reconstruct zurueck, Backend bevorzugt das gegen DOM-Extraction
  wenn richer (BMW 1824 vs 600 Worte).

Vendor-Redundanz + EU-Alternativen + Cost-Saving
- vendor_redundancy.analyze() — funktionale Kategorisierung der CMP-Vendors,
  Detektion von Mehrfach-Anbietern pro Kategorie, EU-Alternative-Lookup
  (Matomo, IONOS, HERE, Friendly Captcha, Smart AdServer, ...).
- vendor_cost_estimator: Tier-Inferenz aus Cookie-Footprint (Cookie-Anzahl
  + Premium-Feature-Cookies + Third-Party-Quote → starter/professional/
  enterprise/premier).
- Self-Service-Werbung (Google/Meta/Pinterest/...) = 0 Lizenz-Kosten
  (nur Media-Spend, separat). DSP-Plattformen behalten enge Range.
- Tier-aware Saving-Range: bei Enterprise/Premier nutzen wir den
  oberen 40-100%-Band der Listpreise, nicht starter→premier.
- Multi-Function-Tools (Matomo Pro, SAP CX, IONOS Cloud, Userlike, Smart
  AdServer, HERE Maps, Vimeo Pro, LamaPoll) — ein Tool ersetzt mehrere
  Kategorien gleichzeitig.

Cookie-Wissens-DB + Funktionale Klassifikation
- cookie_knowledge_db: 50 kuratierte Top-Cookies (Google/Meta/Adobe/MS/...)
  mit vendor, exact_purpose, data_collected, IAB-TCF-IDs, reid_risk,
  schrems_ii_status, EuGH-Urteile, EU-Alternative.
- cookie_function_classifier: pro Cookie funktionale Rolle (tracking_id,
  ad_pixel, session_id, ab_test, csrf, ...) + blocking_impact.

Country-Inferenz aus Rechtsform
- cookie_link_validator: Country-Field wird aus Vendor-Name abgeleitet
  (A/S=DK, GmbH=DE, Inc=US, B.V.=NL, ...) plus Vendor-Lookup-Table.
  Reduziert false-positive no_country-Flags bei eindeutig-EU-Vendors
  (Adform DK, Pinterest IE).

Action-Recipes + Doc-Anchor-Locator
- finding_action_recipes: pro Finding-Typ (no_cookies_listed, no_country,
  broken_opt_out, "Auftragsverarbeiter erwaehnen", "Art. 22 Profiling",
  ...) eine strukturierte Anweisung mit what/why/fix_text/where/example.
  Zum 1:1-Einfuegen in Kunden-Dokumente.
- doc_anchor_locator: Embedding-basiert (BGE-M3 cosine) — sucht den
  passenden Absatz im existierenden Kundendokument fuer jeden Finding.
  Per-Run Thread-Local-Cache. Fallback: keyword-Match.
- Email-Rendering integriert Recipe + Anchor pro Doc-Pruefungs-Fail
  + Vendor-Flag-Liste mit aufklappbarer Action-Liste.
- Score-Erklaerung pro Vendor-Zeile (3/5-Untertitel + Tooltip).

Migration-Pipeline (Compliance-Check -> Customer Banner/Documents)
- migration_to_banner.py: Vendor-Liste -> CookieBannerConfig mit
  4 Kategorien + Review-Flags.
- migration_to_document.py: Vendor-Liste -> Cookie-Policy + VVT-Register
  + Privacy-Policy-Pre-Fills.
- agent_migration_routes: 3 Preview-Endpoints (banner-preview,
  document-preview, summary). Persistierung der cmp_vendors in
  /data/compliance_audits.db check_payloads-Tabelle.

Borlabs-Parity Cookie-Banner-Features
- Consent-Historie im Banner: window.bpShowConsentHistory() + localStorage.
- Content-Blocker: cookie-banner-content-blocker.ts — YouTube/Maps/Video
  Placeholder bis Einwilligung.
- Google Consent Mode v2 erweitert: wait_for_update + region=EEA/CH/GB.
- Consent-Log Export (CSV/JSON) per einwilligungen_export_routes.

Bug-Fixes
- canonical_control_routes: _jsonish-Helper fuer string-typed jsonb,
  similar-controls-Endpoint mit _has_embedding_col()-Cache (kein 500 mehr).
- Control-Library Frontend: defensive .map-Coercer in 2 Detail-Views.
- Embedding-Service-Batching (32er Batches statt 165 in einem Call).
- KeyError 'control_id' in MC-Result-Aggregation (defensive .get).
- Master-Controls-Klick-Through von /sdk/master-controls auf
  /sdk/control-library?control=<id> mit URL-Param-Auto-Open.
- Dockerfile: /data pre-chowned auf appuser (Audit-DB-Schreibrecht).
- Cookie-Text-Routing-Bug (cmp_reconstructed > DOM-extraction).
- doc_type-aware MC-Filter (statt all-text-MCs).
- Master-Contract-Dedup (60 BMW-Internal-Eintraege = 1 Adobe-Vertrag).
- A3-v2-Audit hat 24 UI-Sprache-MCs als 'process' reklassifiziert.

Tests
- test_migration_mappers.py (9 Tests)
- test_migration_endpoints.py (4 Tests)

Skripte (one-shot)
- classify_mc_check_type.py (v1) + _v2 (PK=control_id,doc_type)
- audit_mc_doctype_fit.py (v1 fits) + _v2 (ui_only + scope_requires)

BMW-Run-Bilanz v1 (broken) -> v9 (alle Fixes):
  DSE     7,5% -> 81-83%
  Impressum 4%   -> 100% (6 echte MCs alle erfuellt)
  Cookie  0%    -> 79-83% (CMP-Text-Routing + Embedding)
  Plus: 10 Konsolidierungs-Kategorien, geschaetzte Saving 200k-3M / Jahr
  Plus: Action-Recipes + Doc-Anchors fuer jeden Fail

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 18:30:08 +02:00

1298 lines
55 KiB
Python

"""
Unified Compliance Check Routes — check all documents in one request.
POST /compliance/agent/extract-text — extract text from a URL
POST /compliance/agent/compliance-check — unified check for all documents
GET /compliance/agent/compliance-check/{check_id} — poll status
"""
import asyncio
import logging
import os
import re
import uuid as _uuid
from dataclasses import asdict
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter
from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent"])
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
# In-memory job store (same pattern as doc-check)
_compliance_check_jobs: dict[str, dict] = {}
# ── Models ───────────────────────────────────────────────────────────
class ExtractTextRequest(BaseModel):
url: str
class DocumentInput(BaseModel):
doc_type: str # dse, agb, impressum, cookie, widerruf, avv, loeschkonzept, etc.
url: str = ""
text: str = "" # text has priority over URL
class ComplianceCheckRequest(BaseModel):
documents: list[DocumentInput]
use_agent: bool = False
recipient: str = "dsb@breakpilot.local"
class ComplianceCheckStartResponse(BaseModel):
check_id: str
status: str = "running"
class ComplianceCheckStatusResponse(BaseModel):
check_id: str
status: str
progress: str = ""
progress_pct: int = 0
result: dict | None = None
error: str = ""
# ── Extract text endpoint ────────────────────────────────────────────
@router.post("/extract-text")
async def extract_text(req: ExtractTextRequest):
"""Extract text from a URL via consent-tester DSI discovery.
Merges all documents found on the page (sub-pages, accordions, etc.)
"""
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": req.url, "max_documents": 5},
timeout=300.0,
)
if resp.status_code != 200:
return {
"text": "", "word_count": 0, "title": "",
"error": f"HTTP {resp.status_code} von Consent-Tester",
}
data = resp.json()
docs = data.get("documents", [])
if not docs:
return {
"text": "", "word_count": 0, "title": "",
"error": "Kein Text extrahierbar",
}
# Merge all documents (handles multi-page DSIs like BMW)
texts = []
for doc in docs:
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
if t and len(t) > 50:
texts.append(t)
text = "\n\n".join(texts) if texts else ""
title = docs[0].get("title", "") or docs[0].get("doc_type", "")
word_count = len(text.split())
return {
"text": text,
"word_count": word_count,
"title": title,
"error": "",
}
except Exception as e:
logger.warning("extract-text failed for %s: %s", req.url, e)
return {
"text": "", "word_count": 0, "title": "",
"error": str(e)[:200],
}
# ── Unified compliance check ────────────────────────────────────────
@router.post("/compliance-check")
async def start_compliance_check(req: ComplianceCheckRequest):
"""Start async compliance check for all documents."""
check_id = str(_uuid.uuid4())[:8]
_compliance_check_jobs[check_id] = {
"status": "running",
"progress": "Pruefung gestartet...",
"progress_pct": 0,
"result": None,
"error": "",
}
asyncio.create_task(_run_compliance_check(check_id, req))
return ComplianceCheckStartResponse(check_id=check_id, status="running")
@router.get("/compliance-check/{check_id}")
async def get_compliance_check_status(check_id: str):
"""Poll compliance check status."""
job = _compliance_check_jobs.get(check_id)
if not job:
return {"check_id": check_id, "status": "not_found"}
return ComplianceCheckStatusResponse(
check_id=check_id,
status=job["status"],
progress=job.get("progress", ""),
progress_pct=job.get("progress_pct", 0),
result=job.get("result"),
error=job.get("error", ""),
)
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
"""Background task: check all documents with business-profile context."""
try:
from compliance.services.business_profiler import detect_business_profile
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
from .agent_doc_check_report import build_html_report
# Reset anchor-locator cache per run (avoid cross-run leak)
try:
from compliance.services.doc_anchor_locator import reset_cache
reset_cache()
except Exception:
pass
# Step 1: Resolve texts (fetch from URL if needed) — 0-30%
_update(check_id, "Texte werden geladen...", 1)
doc_texts: dict[str, str] = {}
doc_entries: list[dict] = []
# Cache fetched URLs to detect duplicates
url_text_cache: dict[str, str] = {}
n_docs = max(1, len(req.documents))
for i, doc in enumerate(req.documents):
pct = int(1 + (i / n_docs) * 29)
_update(check_id, f"Texte laden {i+1}/{n_docs}: {doc.doc_type}...", pct)
text = doc.text
cmp_payloads: list[dict] = []
if not text and doc.url:
url_key = doc.url.strip().rstrip("/").lower()
if url_key in url_text_cache:
text = url_text_cache[url_key]
else:
text, cmp_payloads = await _fetch_text(doc.url, doc_type=doc.doc_type)
if text:
url_text_cache[url_key] = text
if text:
doc_texts[doc.doc_type] = text
doc_entries.append({
"doc_type": doc.doc_type,
"url": doc.url,
"text": text,
"word_count": len(text.split()) if text else 0,
"auto_discovered": False,
"discovery_attempted": False,
"cmp_payloads": cmp_payloads,
})
# Step 1a-bis: AUTO-DISCOVERY. For each canonical doc_type the user
# did NOT submit a URL/text for, try to find it on the homepage of
# the submitted URLs. This bridges the gap between "user knows the
# exact URL" (rare) and "user pasted the homepage" (common).
await _autodiscover_missing(
check_id, doc_entries, doc_texts, url_text_cache,
)
# Step 1b: Section splitting — two cases:
# 1. Same URL used for multiple doc_types → split by heading
# 2. DSI text contains Cookie/Social-Media sections → auto-fill empty rows
from compliance.services.section_splitter import (
split_shared_texts, auto_fill_from_dsi, cross_search_documents,
)
split_shared_texts(doc_entries, url_text_cache)
auto_fill_from_dsi(doc_entries)
# Step 1c: Cross-document search — find doc_types in wrong documents (30-35%)
_update(check_id, "Dokumente werden uebergreifend durchsucht...", 32)
placement_findings = cross_search_documents(doc_entries)
# Refresh doc_texts after all splitting/searching
for entry in doc_entries:
if entry.get("text"):
doc_texts[entry["doc_type"]] = entry["text"]
# Step 2: Detect business profile (35-40%)
_update(check_id, "Geschaeftsmodell wird erkannt...", 37)
profile = await detect_business_profile(doc_texts)
profile_dict = asdict(profile)
# Step 3: Check each document
results: list[DocCheckResult] = []
total_findings = 0
use_agent_flag = req.use_agent or os.getenv(
"COMPLIANCE_USE_AGENT", "false"
).lower() == "true"
# Filter out doc_types that don't apply to this business profile
skip_types = _get_skip_types(profile)
# Derive business_scope hints for the MC filter (O1 — Doc-type Scope-Flag).
# MCs that explicitly require a feature (e.g. 'biometric_processing',
# 'ai_decision_making', 'child_targeting') get dropped when the
# detected profile doesn't declare it.
business_scope: set[str] = set()
for svc in (getattr(profile, "detected_services", []) or []):
business_scope.add(str(svc).lower())
if (getattr(profile, "business_type", "") or "").lower() == "b2c":
business_scope.add("b2c")
if getattr(profile, "has_online_shop", False):
business_scope.add("ecommerce")
if getattr(profile, "is_regulated_profession", False):
business_scope.add("regulated_profession")
# Document checks: 40-80%
n_entries = max(1, len(doc_entries))
for i, entry in enumerate(doc_entries):
text = entry["text"]
doc_type = entry["doc_type"]
label = _doc_type_label(doc_type)
url = entry["url"]
if doc_type in skip_types:
results.append(DocCheckResult(
label=label, url=url, doc_type=doc_type,
error=skip_types[doc_type],
))
continue
pct = int(40 + (i / n_entries) * 40)
_update(check_id, f"Pruefen {i+1}/{n_entries}: {label}...", pct)
if not text or len(text) < 50:
# Empty entry — either from auto-discovery padding (no URL
# to fetch) or from a fetch that returned nothing. If there
# was a URL we keep the error so the user knows the fetch
# failed; otherwise let the padding step label it
# 'Nicht eingereicht' / 'Auf der Website nicht gefunden'.
if (entry.get("url") or "").strip():
results.append(DocCheckResult(
label=label, url=url, doc_type=doc_type,
error="Kein Text vorhanden oder zu kurz",
))
continue
result = await _check_single(
text, doc_type, label, url,
entry["word_count"], use_agent_flag,
business_scope=business_scope,
)
# Apply profile context filter
result = _apply_profile_filter(result, profile, doc_type)
# Add placement findings — but only if the regex checks confirm
# the text doesn't match. If completeness >= 50%, the text IS the
# right doc_type despite missing cross-search keywords.
if result.completeness_pct < 50:
for pf in placement_findings:
if pf.get("doc_type") == doc_type:
result.checks.insert(0, CheckItem(**{
k: v for k, v in pf.items() if k != "doc_type"
}))
results.append(result)
total_findings += result.findings_count
# Step 3b: Banner-Check (automatic, uses first URL or homepage)
banner_result = None
banner_url = req.documents[0].url if req.documents and req.documents[0].url else ""
# Use the homepage (strip path) for banner check
if banner_url:
from urllib.parse import urlparse
parsed = urlparse(banner_url)
banner_url = f"{parsed.scheme}://{parsed.netloc}"
if banner_url:
_update(check_id, "Cookie-Banner wird geprueft...", 82)
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/scan",
json={"url": banner_url, "timeout_per_phase": 10},
)
if resp.status_code == 200:
banner_result = resp.json()
except Exception as e:
logger.warning("Banner check failed: %s", e)
# Step 3c: Cross-check Banner vs Cookie-Richtlinie (88-90%)
if banner_result and "cookie" in doc_texts:
_update(check_id, "Banner vs. Cookie-Richtlinie abgleichen...", 89)
cross_findings = _cross_check_banner_vs_cookie(
banner_result, doc_texts["cookie"],
)
if cross_findings:
for r in results:
if r.doc_type == "cookie":
for cf in cross_findings:
r.checks.append(CheckItem(**cf))
l2 = [c for c in r.checks if c.level == 2 and not c.skipped]
l2p = sum(1 for c in l2 if c.passed)
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
# Step 3d: TCF Vendor cross-check against DSI
tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else []
vvt_entries: list[dict] = []
if tcf_vendors and "dse" in doc_texts:
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...", 91)
from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi
from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt
vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"])
if vendor_findings:
for r in results:
if r.doc_type == "dse":
for vf in vendor_findings:
r.checks.append(CheckItem(**vf))
vvt_entries = map_vendors_to_vvt(tcf_vendors)
# Step 4: Extract profile hints from documents (92-95%)
_update(check_id, "Profil wird aus Dokumenten extrahiert...", 93)
from compliance.services.profile_extractor import extract_profile_from_documents
extracted_profile = extract_profile_from_documents(doc_texts, profile_dict)
# Step 4b: Determine scenario per document
for r in results:
if r.error:
r.scenario = "skip"
elif r.completeness_pct < 30:
r.scenario = "regenerate"
elif r.completeness_pct < 95:
r.scenario = "fix"
else:
r.scenario = "import"
# Step 4c: Always render all 8 canonical doc types. Missing types
# are differentiated:
# - Discovery was tried but found nothing -> 'Auf der Website
# nicht gefunden' (suggest user provides URL manually)
# - No submitted URLs at all -> 'Nicht eingereicht'
attempted = {
e["doc_type"] for e in doc_entries if e.get("discovery_attempted")
}
results = _pad_results_with_missing(results, discovery_attempted=attempted)
# Step 5: Build report with management summary (95-98%)
_update(check_id, "Report wird erstellt...", 96)
from .agent_doc_check_report import (
build_management_summary,
build_scanned_urls_html,
build_provider_list_html,
)
from .agent_doc_check_extras import build_vvt_table_html
# Extract structured vendor records from any CMP payloads captured
# for the cookie doc (BMW ePaaS, OneTrust, etc.), validate their
# opt-out + privacy URLs concurrently, score each entry.
cmp_vendors: list[dict] = []
try:
from compliance.services.vendor_extractor import (
extract_vendors_from_payloads,
)
from compliance.services.cookie_link_validator import (
validate_vendor_urls, score_vendors,
)
cookie_payloads = []
cookie_text = ""
for e in doc_entries:
if e.get("doc_type") == "cookie":
if e.get("cmp_payloads"):
cookie_payloads.extend(e["cmp_payloads"])
if e.get("text"):
cookie_text = e["text"]
# Site-owner derived from the submitted URLs — drives the
# INTERNAL/GROUP_COMPANY classification of vendor records.
owner_name = _company_name_from_url(doc_entries) or ""
if cookie_payloads:
cmp_vendors = extract_vendors_from_payloads(
cookie_payloads, owner_name=owner_name,
)
# V3 fallback: no named CMP captured but we have substantive
# cookie text → ask Qwen/OVH to extract vendor list from the text.
# Skip on very short text (likely navigation) to save LLM cost.
if not cmp_vendors and cookie_text and len(cookie_text.split()) >= 500:
from compliance.services.vendor_llm_extractor import (
extract_vendors_via_llm,
)
from compliance.services.vendor_classifier import classify
_update(check_id, "Vendor-Liste per LLM extrahieren...", 94)
cmp_vendors = await extract_vendors_via_llm(cookie_text)
# LLM path doesn't run through extract_vendors_from_payloads,
# so classify here.
for v in cmp_vendors:
v["recipient_type"] = classify(
vendor_name=v.get("name", ""),
category=v.get("category", ""),
owner_name=owner_name,
)
if cmp_vendors:
logger.info("VVT: %d vendors extracted, validating links",
len(cmp_vendors))
cmp_vendors = await validate_vendor_urls(cmp_vendors)
cmp_vendors = score_vendors(cmp_vendors)
# Enrich each vendor with per-cookie functional roles
try:
from compliance.services.cookie_function_classifier import (
annotate_vendor_cookies,
)
cmp_vendors = [annotate_vendor_cookies(v) for v in cmp_vendors]
except Exception as e:
logger.warning("Cookie function classification skipped: %s", e)
except Exception as e:
logger.warning("VVT vendor extraction skipped: %s", e)
# Vendor-Redundanz + EU-Alternativen + Cost/Savings (O4)
redundancy_report = None
try:
from compliance.services.vendor_redundancy import analyze as analyze_redundancy
from compliance.services.vendor_cost_estimator import infer_company_tier
if cmp_vendors:
# Company-Tier aus business_profile ableiten — beeinflusst die
# Cost-Range so dass z.B. fuer DAX-Konzerne nicht starter-Preise
# die untere Schranke duruecken.
bp_dict = {
"type": getattr(profile, "business_type", ""),
"features": list(business_scope),
}
ctier = infer_company_tier(bp_dict)
redundancy_report = analyze_redundancy(cmp_vendors, company_tier=ctier)
logger.info(
"Redundanz: %d Kategorien mit Mehrfach-Anbietern, "
"Spar-Schaetzung %s pro Jahr (company_tier=%s)",
redundancy_report["summary"]["redundancy_count"],
redundancy_report["summary"]["estimated_saving_pct"],
ctier,
)
except Exception as e:
logger.warning("Vendor redundancy analysis skipped: %s", e)
summary_html = build_management_summary(results)
scanned_html = build_scanned_urls_html(doc_entries)
providers_html = build_provider_list_html(banner_result, vvt_entries)
vvt_html = build_vvt_table_html(cmp_vendors)
# MC scorecard aggregated across ALL docs in this run (DSGVO/TDDDG/
# BGB/...). Sits at the top so the GF sees the regulation-by-
# regulation view before drilling into per-doc details.
from compliance.services.mc_scorecard import build_scorecard
from .agent_doc_check_scorecard import build_scorecard_html
all_mc_checks: list[dict] = []
for r in results:
for c in r.checks:
if c.id.startswith("mc-"):
all_mc_checks.append({
"id": c.id, "label": c.label, "passed": c.passed,
"severity": c.severity, "skipped": c.skipped,
"regulation": c.regulation,
})
scorecard = build_scorecard(all_mc_checks) if all_mc_checks else {}
# Trend: load previous scorecard for the same tenant + domain so the
# email can show delta indicators (A6).
prev_scorecard: dict | None = None
if scorecard:
try:
from compliance.services.compliance_audit_log import (
list_runs_for_tenant,
)
tenant_id_for_trend = req.recipient or ""
base_domain_for_trend = _extract_domain(doc_entries) or ""
prev_runs = list_runs_for_tenant(
tenant_id_for_trend,
base_domain=base_domain_for_trend,
limit=1,
)
if prev_runs:
prev_scorecard = prev_runs[0].get("scorecard")
except Exception as e:
logger.debug("trend lookup skipped: %s", e)
scorecard_html = (
build_scorecard_html(scorecard, previous_scorecard=prev_scorecard)
if scorecard else ""
)
report_html = build_html_report(results, None, doc_texts)
profile_html = _build_profile_html(profile)
# O4: Vendor-Redundanz / EU-Alternativen + Cost-Savings-Block —
# zwischen VVT und Doc-Report einsortiert, damit Geschaeftsfuehrung
# die Einsparung sieht bevor sie in die Detail-Pruefung geht.
from .agent_doc_check_redundancy import build_redundancy_html
redundancy_html = build_redundancy_html(redundancy_report)
full_html = (
summary_html + scanned_html + profile_html + scorecard_html
+ providers_html + vvt_html + redundancy_html + report_html
)
# Step 6: Send email — derive site name primarily from entered URL.
# The extracted_profile.companyName is often noisy (e.g. picks up
# juris.de from legal references). Domain-derived name is more
# predictable for the GF email subject.
doc_count = len([r for r in results if not r.error])
url_company = _company_name_from_url(doc_entries)
domain = _extract_domain(doc_entries)
site_name = url_company or domain or "Unbekannt"
_update(check_id, "E-Mail wird versendet...", 98)
email_result = send_email(
recipient=req.recipient,
subject=f"[COMPLIANCE-CHECK] {site_name}{doc_count} Dokumente geprueft",
body_html=full_html,
)
# Step 7: Store result
response = {
"check_id": check_id,
"results": [_result_to_dict(r) for r in results],
"business_profile": profile_dict,
"extracted_profile": extracted_profile,
"banner_result": {
"detected": banner_result.get("banner_detected", False) if banner_result else False,
"provider": banner_result.get("banner_provider", "") if banner_result else "",
"violations": len(banner_result.get("banner_checks", {}).get("violations", [])) if banner_result else 0,
"tcf_vendor_count": len(tcf_vendors),
} if banner_result else None,
"tcf_vendors": vvt_entries if tcf_vendors else [],
"cmp_vendors": cmp_vendors,
"total_documents": len(results),
"total_findings": total_findings,
"email_status": email_result.get("status", "failed"),
"checked_at": datetime.now(timezone.utc).isoformat(),
}
_compliance_check_jobs[check_id]["status"] = "completed"
_compliance_check_jobs[check_id]["result"] = response
_compliance_check_jobs[check_id]["progress"] = "Fertig"
_compliance_check_jobs[check_id]["progress_pct"] = 100
# Persist to sidecar SQLite audit log — enables /audit endpoints
# (A5 admin tab) and trend view (A6). Best-effort; failures here
# do not affect the user-facing response.
try:
from compliance.services.compliance_audit_log import record_check_run
from compliance.services.mc_scorecard import full_audit_records
audit_rows: list[dict] = []
for r in results:
doc_mc = [c for c in r.checks if c.id.startswith("mc-")]
audit_rows.extend(full_audit_records(
[{"id": c.id, "label": c.label, "passed": c.passed,
"severity": c.severity, "skipped": c.skipped,
"regulation": c.regulation, "matched_text": c.matched_text,
"hint": c.hint, "level": c.level}
for c in doc_mc],
check_id=check_id,
doc_type=r.doc_type,
))
record_check_run(
check_id=check_id,
tenant_id=req.recipient or "",
site_name=site_name,
base_domain=domain or "",
doc_count=doc_count,
scorecard=scorecard,
vvt_summary={
"total": len(cmp_vendors),
"internal": sum(1 for v in cmp_vendors
if (v.get("recipient_type") or "").upper()
in ("INTERNAL", "GROUP_COMPANY")),
"external": sum(1 for v in cmp_vendors
if (v.get("recipient_type") or "").upper()
in ("PROCESSOR", "CONTROLLER")),
},
mc_records=audit_rows,
)
from compliance.services.compliance_audit_log import record_check_payload
record_check_payload(
check_id=check_id,
vendors=cmp_vendors,
profile=extracted_profile,
)
except Exception as e:
logger.warning("Audit persistence skipped: %s", e)
except Exception as e:
logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True)
_compliance_check_jobs[check_id]["status"] = "failed"
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
def _update(check_id: str, msg: str, pct: int | None = None):
job = _compliance_check_jobs[check_id]
job["progress"] = msg
if pct is not None:
job["progress_pct"] = max(0, min(100, int(pct)))
async def _fetch_text(url: str, doc_type: str = "") -> tuple[str, list[dict]]:
"""Fetch text from URL via consent-tester, with HTTP fallback.
Returns (text, cmp_payloads). cmp_payloads is the raw CMP JSON captured
during navigation (ePaaS, OneTrust, …) — empty when no CMP fired or
HTTP fallback was used. Backend turns payloads into structured vendor
records for the VVT table in the email.
"""
# 1. Consent-tester (Playwright-based, full JS rendering).
# max_documents depends on doc_type:
# - cookie/dse/social_media: self-extract (often + CMP capture) is
# authoritative, sub-pages dilute the policy text. max=1.
# - impressum/agb/widerruf/nutzungsbedingungen/dsb: BMW & similar
# enterprise sites split this across 3-4 short sub-pages
# (Versicherungsvermittler, Aufsicht, Berufsrecht). max=3 follows
# them. The 15s networkidle bail (dsi_helpers) keeps timing safe.
short_extract_types = {"cookie", "dse", "datenschutz", "privacy", "social_media"}
max_docs = 1 if (doc_type or "") in short_extract_types else 3
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": max_docs},
timeout=120.0,
)
if resp.status_code == 200:
payload = resp.json()
docs = payload.get("documents", [])
cmp_payloads = payload.get("cmp_payloads") or []
cmp_cookie_text = payload.get("cmp_cookie_text") or ""
if docs:
texts = []
for doc in docs:
t = doc.get("full_text", "") or doc.get("text_preview", "") or ""
if t and len(t) > 50:
texts.append(t)
merged = "\n\n".join(texts)
# For cookie/dse/social_media: when CMP reconstruction is
# substantially richer than DOM extraction, use it. This
# fixes the BMW case where DOM yields ~600 words of
# navigation but the ePaaS payload reconstructs to ~1800
# words of actual cookie policy.
if (doc_type in short_extract_types
and cmp_cookie_text
and len(cmp_cookie_text.split()) > len(merged.split())):
logger.info(
"Preferring CMP-reconstructed text for %s on %s "
"(%d words CMP vs %d words DOM)",
doc_type, url,
len(cmp_cookie_text.split()),
len(merged.split()),
)
merged = cmp_cookie_text
if merged and len(merged.split()) > 100:
if len(texts) > 1:
logger.info("Merged %d docs from %s (%d words)",
len(texts), url, len(merged.split()))
return merged, cmp_payloads
except Exception as e:
logger.warning("Consent-tester fetch failed for %s: %s", url, e)
# 2. Fallback: direct HTTP fetch (works for SSR pages like BMW)
try:
import re as _re
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url)
if resp.status_code == 200 and "text/html" in resp.headers.get("content-type", ""):
html = resp.text
# Strip HTML tags, decode entities
text = _re.sub(r"<script[^>]*>.*?</script>", " ", html, flags=_re.DOTALL | _re.IGNORECASE)
text = _re.sub(r"<style[^>]*>.*?</style>", " ", text, flags=_re.DOTALL | _re.IGNORECASE)
text = _re.sub(r"<[^>]+>", " ", text)
text = _re.sub(r"\s+", " ", text).strip()
if len(text.split()) > 100:
logger.info("HTTP fallback for %s: %d words", url, len(text.split()))
return text, []
except Exception as e:
logger.warning("HTTP fallback failed for %s: %s", url, e)
return "", []
async def _autodiscover_missing(
check_id: str,
doc_entries: list[dict],
doc_texts: dict[str, str],
url_text_cache: dict[str, str],
) -> None:
"""For each canonical doc_type the user did not submit, try to find
the corresponding document on the homepage of the site they DID submit.
Modifies doc_entries in place: fills text/url/word_count and sets
`auto_discovered=True`. Marks `discovery_attempted=True` on every
missing entry (even when nothing was found) so the report can
distinguish 'Nicht eingereicht' from 'Auf der Website nicht gefunden'.
"""
from urllib.parse import urlparse
# Submitted doc_types (those the user actually entered URL or text for).
submitted_types = {
e["doc_type"] for e in doc_entries
if e.get("text") or (e.get("url") or "").strip()
}
# Map alias types to canonical
submitted_canon = {
"dse" if t in ("datenschutz", "privacy") else t for t in submitted_types
}
# Missing = canonical types the user did NOT submit
missing = set(_ALL_DOC_TYPES) - submitted_canon
if not missing:
return
# Pick the most common base (scheme://netloc) from submitted URLs.
bases: dict[str, int] = {}
for e in doc_entries:
u = (e.get("url") or "").strip()
if u and "://" in u:
p = urlparse(u)
base = f"{p.scheme}://{p.netloc}"
bases[base] = bases.get(base, 0) + 1
if not bases:
# No submitted URL at all — nothing to crawl from. Add empty
# placeholders (with discovery_attempted=False) so the padding
# step renders them as 'Nicht eingereicht' (not 'Nicht gefunden').
for dt in missing:
doc_entries.append({
"doc_type": dt, "url": "", "text": "", "word_count": 0,
"auto_discovered": False, "discovery_attempted": False,
})
return
# Build crawl plan: primary base + any related domains mentioned in
# the submitted texts that share the owner's SLD. Example: BMW Group
# text mentions bmwgroup.com and bmwgroup.jobs in addition to bmw.de.
primary_base = max(bases, key=bases.get) + "/"
crawl_bases: list[str] = [primary_base]
primary_netloc = urlparse(primary_base).netloc.lower().lstrip("www.")
owner_token = primary_netloc.split(".")[0] # 'bmw'
if owner_token and len(owner_token) >= 3:
domain_re = re.compile(
r"https?://([a-z0-9][a-z0-9\-]*\.)*" + re.escape(owner_token)
+ r"[a-z0-9\-]*\.[a-z]{2,}",
re.IGNORECASE,
)
seen_bases = {primary_base}
for entry in doc_entries:
text = entry.get("text") or ""
for m in domain_re.finditer(text):
p = urlparse(m.group(0))
base = f"{p.scheme}://{p.netloc}/"
base_netloc = p.netloc.lower().lstrip("www.")
if base_netloc == primary_netloc:
continue
if base in seen_bases:
continue
seen_bases.add(base)
crawl_bases.append(base)
if len(crawl_bases) >= 3:
break
if len(crawl_bases) >= 3:
break
_update(
check_id,
f"Suche fehlende Dokumente auf {', '.join(urlparse(b).netloc for b in crawl_bases)}...",
18,
)
discovered: list[dict] = []
disc_payloads: list[dict] = []
disc_cookie_texts: list[str] = []
for base in crawl_bases:
try:
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": base, "max_documents": 15},
timeout=180.0,
)
if resp.status_code != 200:
logger.warning("auto-discovery: HTTP %d for %s",
resp.status_code, base)
continue
body = resp.json()
discovered.extend(body.get("documents", []) or [])
disc_payloads.extend(body.get("cmp_payloads") or [])
cmp_text = body.get("cmp_cookie_text") or ""
if cmp_text:
disc_cookie_texts.append(cmp_text)
logger.info("auto-discovery on %s: %d docs, %d CMP payloads, "
"cmp_cookie_text=%d words", base,
len(body.get("documents", []) or []),
len(body.get("cmp_payloads") or []),
len(cmp_text.split()))
except Exception as e:
logger.warning("auto-discovery failed for %s: %s", base, e)
# Classify each discovered doc into a canonical doc_type
by_type: dict[str, dict] = {}
for d in discovered:
title = (d.get("title") or "").lower()
url = (d.get("url") or "").lower()
wc = d.get("word_count") or 0
if wc < 100:
continue
canon = _classify_discovered_doc(title, url)
if canon and canon in missing and canon not in by_type:
by_type[canon] = d
# Append a new entry for every missing canonical type. Auto-discovered
# ones get the text/URL filled; ungratched ones stay empty so the
# padding step renders them as 'Auf der Website nicht gefunden'.
filled = 0
for dt in missing:
new_entry: dict = {
"doc_type": dt, "url": "", "text": "", "word_count": 0,
"auto_discovered": False, "discovery_attempted": True,
"cmp_payloads": [],
}
d = by_type.get(dt)
if d:
full = d.get("full_text") or d.get("text_preview") or ""
# For cookie: prefer the CMP-reconstructed text when it's
# substantially richer than the auto-discovered DOM extraction.
# BMW homepage CMP yields ~1800 words of authoritative policy;
# DOM extraction typically yields ~600 words of site chrome.
if dt == "cookie" and disc_cookie_texts:
cmp_merged = "\n\n".join(disc_cookie_texts)
if len(cmp_merged.split()) > len(full.split()):
logger.info(
"cookie: using CMP-reconstructed text (%d words) "
"instead of DOM (%d words)",
len(cmp_merged.split()), len(full.split()),
)
full = cmp_merged
if len(full.split()) >= 100:
new_entry["text"] = full
new_entry["url"] = d.get("url", "")
new_entry["word_count"] = len(full.split())
new_entry["auto_discovered"] = True
# Auto-discovery happens on the HOMEPAGE — any CMP payload
# captured at that level likely belongs to the cookie page
# (CMP widget loaded site-wide). Attach to 'cookie' entry.
if dt == "cookie" and disc_payloads:
new_entry["cmp_payloads"] = disc_payloads
doc_texts[dt] = full
filled += 1
logger.info(
"auto-discovered %s on %s: %s (%d words)",
dt, base, d.get("url", "")[:80], new_entry["word_count"],
)
doc_entries.append(new_entry)
logger.info(
"auto-discovery: filled %d/%d missing types from %s",
filled, len(missing), base,
)
# Title/URL keywords → canonical doc_type. Order matters: most-specific first.
_DISCOVERY_RULES: list[tuple[str, tuple[str, ...]]] = [
("cookie", ("cookie", "kuche", "biscuit", "cookies-")),
("widerruf", ("widerruf", "rueckgabe", "rückgabe", "cancellation",
"right-of-withdrawal", "ruecktritts", "rücktritts")),
("social_media", ("social-media", "soziale-medien", "social_media",
"social-media-policy")),
("agb", ("/agb", "geschaeftsbedingungen", "geschäftsbedingungen",
"terms-and-conditions", "general-terms")),
("nutzungsbedingungen", ("nutzungsbedingung", "terms-of-use",
"nutzungsordnung", "terms-of-service")),
("dsb", ("datenschutzbeauftragt", "data-protection-officer",
"dpo-contact", "/dsb")),
("impressum", ("impressum", "imprint", "legal-notice", "site-notice",
"anbieterkennzeichnung", "legal-disclaimer-pool")),
("dse", ("data-privacy", "datenschutz", "data-protection",
"privacy-policy", "privacy-notice", "dsgvo",
"data_privacy", "datenschutzinformation")),
]
def _classify_discovered_doc(title: str, url: str) -> str | None:
"""Map a discovered doc (by its title + URL) to one of our 8 canonical types."""
haystack = f"{title} {url}"
for canon, keywords in _DISCOVERY_RULES:
if any(kw in haystack for kw in keywords):
return canon
return None
async def _check_single(
text: str, doc_type: str, label: str, url: str,
word_count: int, use_agent: bool,
business_scope: set[str] | None = None,
):
"""Run regex + MC checks on a single document."""
from compliance.services.doc_checks.runner import check_document_completeness
from compliance.services.rag_document_checker import check_document_with_controls
from .agent_doc_check_routes import CheckItem, DocCheckResult
# Regex checklist
findings = check_document_completeness(text, doc_type, label, url)
all_checks: list[CheckItem] = []
completeness = 0
correctness = 0
for f in findings:
if "SCORE" in f.get("code", ""):
for c in f.get("all_checks", []):
all_checks.append(CheckItem(
id=c["id"], label=c["label"], passed=c["passed"],
severity=c["severity"], matched_text=c.get("matched_text", ""),
level=c.get("level", 1), parent=c.get("parent"),
skipped=c.get("skipped", False), hint=c.get("hint", ""),
))
completeness = f.get("completeness_pct", 0)
correctness = f.get("correctness_pct", 0)
# Master Control checks (top 20 by severity to avoid noise)
try:
# max_controls=0 -> evaluate ALL MCs for this doc_type (DB has
# 1874 across 8 types; regex matching is cheap and dominates
# well under 1s per doc). Caps remain on the LLM-enrich step
# (top-10 FAILs) so cost stays bounded.
mc_results = await check_document_with_controls(
text, doc_type, label, max_controls=0, use_agent=use_agent,
business_scope=business_scope,
)
if mc_results:
for mc in mc_results:
all_checks.append(CheckItem(**mc))
l2 = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2 if c.passed)
correctness = round(l2_passed / len(l2) * 100) if l2 else 0
except Exception as e:
logger.warning("MC check skipped for %s: %s", label, e)
# LLM verification of regex fails
failed = [c for c in all_checks if not c.passed and not c.skipped and c.hint]
if failed:
try:
from compliance.services.doc_checks.llm_verify import verify_failed_checks
overturns = await verify_failed_checks(
text,
[{"id": c.id, "label": c.label, "hint": c.hint} for c in failed],
label,
)
for c in all_checks:
if c.id in overturns and overturns[c.id]["overturned"]:
c.passed = True
c.matched_text = f"[LLM] {overturns[c.id]['evidence']}"
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("LLM verification skipped: %s", e)
# Cookie-policy only: actively HTTP-probe the Opt-Out + Privacy-Policy
# URLs the document advertises. Broken links make individual provider
# entries non-compliant under Art. 7(3) DSGVO.
if doc_type == "cookie":
try:
from compliance.services.cookie_link_validator import (
extract_links, validate_links, build_check_items,
)
links = extract_links(text)
if links:
logger.info("Cookie-link validator: %d urls extracted from %s",
len(links), label)
validated = await validate_links(links)
for item in build_check_items(validated):
all_checks.append(CheckItem(**item))
# Re-compute correctness with the new L2 items
l2_active = [c for c in all_checks if c.level == 2 and not c.skipped]
l2_passed = sum(1 for c in l2_active if c.passed)
if l2_active:
correctness = round(l2_passed / len(l2_active) * 100)
except Exception as e:
logger.warning("Cookie-link validation skipped for %s: %s", label, e)
non_score = [f for f in findings if "SCORE" not in f.get("code", "")]
return DocCheckResult(
label=label, url=url, doc_type=doc_type,
word_count=word_count or len(text.split()),
completeness_pct=completeness, correctness_pct=correctness,
checks=all_checks, findings_count=len(non_score),
)
def _pad_results_with_missing(
results: list,
discovery_attempted: set[str] | None = None,
) -> list:
"""Ensure every canonical doc_type has an entry in the results list.
Doc_types the user did not submit AND auto-discovery did not find get
a placeholder DocCheckResult. The error message distinguishes:
- 'Auf der Website nicht gefunden' (discovery was attempted)
- 'Nicht eingereicht' (no submitted URLs to crawl from)
Preserves the canonical ordering from _ALL_DOC_TYPES so the report
layout is stable.
"""
from .agent_doc_check_routes import DocCheckResult
attempted = discovery_attempted or set()
by_type: dict[str, object] = {}
for r in results:
canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type
by_type[canon] = r
ordered: list = []
for dt in _ALL_DOC_TYPES:
if dt in by_type:
ordered.append(by_type[dt])
continue
if dt in attempted:
msg = ("Auf der Website nicht gefunden — bitte URL des "
"Dokuments manuell eintragen, falls vorhanden")
else:
msg = "Nicht eingereicht — Quelle nicht angegeben"
ordered.append(DocCheckResult(
label=_doc_type_label(dt),
url="",
doc_type=dt,
word_count=0,
completeness_pct=0,
correctness_pct=0,
checks=[],
findings_count=0,
error=msg,
scenario="missing",
))
extras = [r for r in results
if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse")
not in _ALL_DOC_TYPES]
ordered.extend(extras)
return ordered
_COMPOUND_TLDS = {
"co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in",
"com.au", "com.br", "com.mx", "com.tr", "com.sg",
}
def _extract_domain(doc_entries: list[dict]) -> str | None:
"""Extract base domain (without www) from first URL."""
for entry in doc_entries:
url = entry.get("url", "")
if url and "://" in url:
from urllib.parse import urlparse
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
return host or None
return None
def _company_name_from_url(doc_entries: list[dict]) -> str | None:
"""Derive a display company name from the entered URLs.
Heuristic: take the second-level domain (e.g. "bmw" from "www.bmw.de"),
uppercase short acronyms (<=4 chars, no hyphens), title-case the rest.
Examples:
www.bmw.de -> BMW
mercedes-benz.de -> Mercedes-Benz
shop.example.co.uk -> Example
juris.de -> Juris
"""
from urllib.parse import urlparse
for entry in doc_entries:
url = entry.get("url", "")
if not url or "://" not in url:
continue
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
parts = host.split(".")
if len(parts) < 2:
continue
# Handle compound TLDs (.co.uk etc.)
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
sld = parts[-3]
else:
sld = parts[-2]
if not sld:
continue
if len(sld) <= 4 and "-" not in sld:
return sld.upper()
return "-".join(p.capitalize() for p in sld.split("-"))
return None
def _get_skip_types(profile) -> dict[str, str]:
"""Doc_types to skip entirely. Currently empty — we check everything
and flag irrelevant items as INFO instead of skipping."""
return {}
def _apply_profile_filter(result, profile, doc_type: str):
"""Adjust INFO-level checks based on business profile context.
For example: ODR check only relevant for B2C online shops.
"""
from .agent_doc_check_routes import CheckItem
for check in result.checks:
cid = check.id.lower()
# ODR/OS-Link: relevant ONLY for B2C online shops. The check's
# default hint is written for B2B (it explains why it's not
# relevant) — for B2C we must replace it with action-oriented
# guidance, otherwise the report contradicts itself.
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
if profile.needs_odr:
if not check.passed:
check.hint = (
"Als B2C-Anbieter muessen Sie nach Art. 14 EU-VO 524/2013 "
"auf die OS-Plattform (https://ec.europa.eu/consumers/odr) "
"verlinken — klickbarer Link, nicht nur Text. Zusaetzlich "
"§36 VSBG: angeben, ob Sie an Verbraucher-"
"Streitbeilegungsverfahren teilnehmen (oder nicht)."
)
else:
check.skipped = True
check.hint = "Nicht relevant (kein B2C Online-Shop)"
# Widerruf: Flag entire document as unnecessary for B2B
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
check.severity = "INFO"
if not check.passed:
check.hint = (
"Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
"(§355 BGB gilt nur fuer Verbrauchervertraege). "
"Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
"Ihrer Website, da sie Verwirrung stiften kann."
)
# Regulated profession: check for Kammer info
if "kammer" in cid or "berufsordnung" in check.label.lower():
if not profile.is_regulated_profession:
check.skipped = True
check.hint = "Nicht relevant (kein regulierter Beruf)"
return result
# ── Helpers ──────────────────────────────────────────────────────────
_DOC_TYPE_LABELS = {
"dse": "Datenschutzerklaerung",
"datenschutz": "Datenschutzerklaerung",
"privacy": "Datenschutzerklaerung",
"impressum": "Impressum",
"agb": "AGB",
"widerruf": "Widerrufsbelehrung",
"cookie": "Cookie-Richtlinie",
"avv": "Auftragsverarbeitung",
"loeschkonzept": "Loeschkonzept",
"dsfa": "Datenschutz-Folgenabschaetzung",
"social_media": "Social Media Datenschutz",
"nutzungsbedingungen": "Nutzungsbedingungen",
"dsb": "DSB-Kontakt",
}
# Canonical doc types in the same order as the frontend ComplianceCheckTab.
# The route pads `results` to always contain an entry for each — even if
# the user did not submit a URL — so the email + frontend always show
# the complete checklist (missing rows marked as 'Nicht eingereicht').
#
# DSB-Kontakt is intentionally NOT canonical: per GDPR practice the DSB is
# named *inside* the DSI/datenschutz document (email or contact block), not
# as a separate page. We check 'DSB benannt' as a sub-check of the DSE
# instead. If a tenant insists on a separate DSB document, they can still
# submit one — it just won't appear as a missing checklist row.
_ALL_DOC_TYPES = [
"dse", "impressum", "social_media", "cookie",
"agb", "nutzungsbedingungen", "widerruf",
]
def _doc_type_label(doc_type: str) -> str:
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
def _result_to_dict(r) -> dict:
"""Convert DocCheckResult to JSON-serializable dict."""
fields = ("id", "label", "passed", "severity", "matched_text",
"level", "parent", "skipped", "hint")
return {
"label": r.label, "url": r.url, "doc_type": r.doc_type,
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
"correctness_pct": r.correctness_pct,
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
"findings_count": r.findings_count, "error": r.error,
"scenario": getattr(r, "scenario", ""),
}
def _build_profile_html(profile) -> str:
from .agent_doc_check_report import build_profile_html
return build_profile_html(profile)
# Cross-check extracted to compliance.services.banner_cookie_cross_check
from compliance.services.banner_cookie_cross_check import cross_check_banner_vs_cookie as _cross_check_banner_vs_cookie
# ── Admin: audit drill-down (A5) + trend view (A6) ──────────────────
@router.get("/audit/{check_id}")
async def audit_drill_down(
check_id: str,
doc_type: str = "",
regulation: str = "",
only_failed: bool = False,
):
"""Return scorecard + filterable MC results for a single check run.
Frontend uses this to render the /sdk/agent/audit/<check_id> view.
"""
from compliance.services.compliance_audit_log import (
get_check_run, list_mc_results,
)
run = get_check_run(check_id)
if not run:
return {"check_id": check_id, "found": False}
rows = list_mc_results(
check_id,
doc_type=doc_type or None,
regulation=regulation or None,
only_failed=only_failed,
)
return {
"check_id": check_id,
"found": True,
"run": run,
"mc_count": len(rows),
"results": rows,
}
@router.get("/audit/tenant/{tenant_id}")
async def audit_tenant_history(
tenant_id: str,
base_domain: str = "",
limit: int = 30,
):
"""Tenant-level history for the trend view (A6)."""
from compliance.services.compliance_audit_log import list_runs_for_tenant
runs = list_runs_for_tenant(
tenant_id, base_domain=base_domain or None, limit=limit,
)
return {"tenant_id": tenant_id, "count": len(runs), "runs": runs}