Files
breakpilot-compliance/backend-compliance/compliance/api/agent_check/_helpers.py
T
Benjamin Admin c2c8783fee refactor(agent-check): split routes file (2692→347 LOC) + wire B1/B3/A1 [guardrail-change]
Phase-5 split of agent_compliance_check_routes.py — the 2700-line
monolith was decomposed into 19 modules in compliance/api/agent_check/:

  - Phase A-F: resolve / profile+check / banner+TCF / vendors raw+finalize /
    HTML blocks top+mid+bot / email / persist
  - Helpers: _constants, _helpers, _fetch, _discovery, _single_check
  - Schemas + State + thin _orchestrator

A1 ZIP-Anhang nativ in _phase_e_email: evidence_zip_builder.py bundles
slices + manifest.json + audit_metadata.json (SHA256 per slice +
build_sha + source_url). smtp_sender.py erweitert um attachments-Parameter.

B1 COOKIE-CONSENT-UX-001 (Mobile Reachability): consent_reachability_check.py
parses footer anchors, classifies intent (reopen_cmp / info_only /
browser_deflect) + target (same_page_cmp / new_tab / external).
_b1_wiring.py fetches homepage with iPhone-UA + renders Art-7-Abs-3
severity-coloured block.

B3 TH-RETENTION (Cross-Doc Speicherdauer): retention_comparator.py
compares DSI claim ↔ cookie-table duration ↔ actual Max-Age/expires
with 5% tolerance + severity hierarchy (dsi_under_actual HIGH,
table_under_actual HIGH, dsi_vs_table MEDIUM, actual_under_table LOW
Safari-ITP-Hint). _b3_wiring.py + Top-10 mismatches table in mail.

Side-effects:
- Fixed silent UnboundLocalError in original Step 5 (gf_one_pager used
  audit_quality_findings before declaration, caught by surrounding
  except → block never rendered). New _phase_d3_blocks_bot.py runs
  audit-quality FIRST.
- agent_compliance_check_routes.py removed from loc-exceptions.txt
  ("Phase 5 split target" — done).

Tests: 55/55 grün (B1 22 + B3 27 + saving_scan 6).
E2E: smoke against Elli DSE+Cookie produced HIGH/missing B1 finding,
TH-RETENTION table (17 cookies / 3 ✓ / 3 ✗ / 11 ?), evidence-zip
with 2 slices + manifest + audit_metadata (12089B, SHA256-chained,
source verified), email sent (attachments=1).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-06 14:47:25 +02:00

229 lines
8.0 KiB
Python

"""Pure helpers for the compliance-check route — no I/O, no async.
Grouped here because each is small and they share the same constants
imports. Splitting further would not improve readability.
"""
from __future__ import annotations
import logging
from urllib.parse import urlparse
from ._constants import (
_ALL_DOC_TYPES,
_COMPOUND_TLDS,
_DISCOVERY_RULES,
_DOC_TYPE_LABELS,
_compliance_check_jobs,
)
logger = logging.getLogger(__name__)
def _update(check_id: str, msg: str, pct: int | None = None) -> None:
"""Update the in-memory job entry with a progress message + pct."""
job = _compliance_check_jobs[check_id]
job["progress"] = msg
if pct is not None:
job["progress_pct"] = max(0, min(100, int(pct)))
def _doc_type_label(doc_type: str) -> str:
return _DOC_TYPE_LABELS.get(doc_type, doc_type.upper())
def _classify_discovered_doc(title: str, url: str) -> str | None:
"""Map a discovered doc (by its title + URL) to one of our 8 canonical types."""
haystack = f"{title} {url}"
for canon, keywords in _DISCOVERY_RULES:
if any(kw in haystack for kw in keywords):
return canon
return None
def _extract_domain(doc_entries: list[dict]) -> str | None:
"""Extract base domain (without www) from first URL."""
for entry in doc_entries:
url = entry.get("url", "")
if url and "://" in url:
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
return host or None
return None
def _company_name_from_url(doc_entries: list[dict]) -> str | None:
"""Derive a display company name from the entered URLs.
Heuristic: take the second-level domain (e.g. "bmw" from "www.bmw.de"),
uppercase short acronyms (<=4 chars, no hyphens), title-case the rest.
Examples:
www.bmw.de -> BMW
mercedes-benz.de -> Mercedes-Benz
shop.example.co.uk -> Example
juris.de -> Juris
"""
for entry in doc_entries:
url = entry.get("url", "")
if not url or "://" not in url:
continue
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
parts = host.split(".")
if len(parts) < 2:
continue
# Handle compound TLDs (.co.uk etc.)
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
sld = parts[-3]
else:
sld = parts[-2]
if not sld:
continue
if len(sld) <= 4 and "-" not in sld:
return sld.upper()
return "-".join(p.capitalize() for p in sld.split("-"))
return None
def _get_skip_types(profile) -> dict[str, str]:
"""Doc_types to skip entirely with a per-type reason message.
Heute primaer fuer OEM-Konfigurator-Pattern (BMW/Audi/Mercedes):
wenn die Site kein Direkt-Vertrieb macht, sind AGB/Widerruf/
Nutzungsbedingungen nicht Pflicht auf der Website — sie werden
beim Vertragshaendler ausgehaendigt.
"""
if getattr(profile, "no_direct_sales", False):
msg = (
"Nicht anwendbar — die Webseite schliesst keinen Direkt-"
"Kaufvertrag (OEM-Konfigurator-Pattern, Vertrag laeuft "
"ueber Vertragshaendler). AGB/Widerruf werden beim "
"Haendler ausgehaendigt."
)
return {
"agb": msg,
"widerruf": msg,
"nutzungsbedingungen": msg,
}
return {}
def _apply_profile_filter(result, profile, doc_type: str):
"""Adjust INFO-level checks based on business profile context.
For example: ODR check only relevant for B2C online shops.
"""
for check in result.checks:
cid = check.id.lower()
# ODR/OS-Link: relevant ONLY for B2C online shops. The check's
# default hint is written for B2B (it explains why it's not
# relevant) — for B2C we must replace it with action-oriented
# guidance, otherwise the report contradicts itself.
if "odr" in cid or "os-link" in cid or "streitbeilegung" in check.label.lower():
if profile.needs_odr:
if not check.passed:
check.hint = (
"Als B2C-Anbieter muessen Sie nach Art. 14 EU-VO 524/2013 "
"auf die OS-Plattform (https://ec.europa.eu/consumers/odr) "
"verlinken — klickbarer Link, nicht nur Text. Zusaetzlich "
"§36 VSBG: angeben, ob Sie an Verbraucher-"
"Streitbeilegungsverfahren teilnehmen (oder nicht)."
)
else:
check.skipped = True
check.hint = "Nicht relevant (kein B2C Online-Shop)"
# Widerruf: Flag entire document as unnecessary for B2B
if doc_type == "widerruf" and profile.business_type not in ("b2c", "unknown"):
check.severity = "INFO"
if not check.passed:
check.hint = (
"Als B2B-Unternehmen benoetigen Sie keine Widerrufsbelehrung "
"(§355 BGB gilt nur fuer Verbrauchervertraege). "
"Empfehlung: Entfernen Sie die Widerrufsbelehrung von "
"Ihrer Website, da sie Verwirrung stiften kann."
)
# Regulated profession: check for Kammer info
if "kammer" in cid or "berufsordnung" in check.label.lower():
if not profile.is_regulated_profession:
check.skipped = True
check.hint = "Nicht relevant (kein regulierter Beruf)"
return result
def _pad_results_with_missing(
results: list,
discovery_attempted: set[str] | None = None,
) -> list:
"""Ensure every canonical doc_type has an entry in the results list.
Doc_types the user did not submit AND auto-discovery did not find get
a placeholder DocCheckResult. The error message distinguishes:
- 'Auf der Website nicht gefunden' (discovery was attempted)
- 'Nicht eingereicht' (no submitted URLs to crawl from)
Preserves the canonical ordering from _ALL_DOC_TYPES so the report
layout is stable.
"""
from ..agent_doc_check_routes import DocCheckResult
attempted = discovery_attempted or set()
by_type: dict[str, object] = {}
for r in results:
canon = "dse" if r.doc_type in ("datenschutz", "privacy") else r.doc_type
by_type[canon] = r
ordered: list = []
for dt in _ALL_DOC_TYPES:
if dt in by_type:
ordered.append(by_type[dt])
continue
if dt in attempted:
msg = ("Auf der Website nicht gefunden — bitte URL des "
"Dokuments manuell eintragen, falls vorhanden")
else:
msg = "Nicht eingereicht — Quelle nicht angegeben"
ordered.append(DocCheckResult(
label=_doc_type_label(dt),
url="",
doc_type=dt,
word_count=0,
completeness_pct=0,
correctness_pct=0,
checks=[],
findings_count=0,
error=msg,
scenario="missing",
))
extras = [r for r in results
if (r.doc_type if r.doc_type not in ("datenschutz", "privacy") else "dse")
not in _ALL_DOC_TYPES]
ordered.extend(extras)
return ordered
def _result_to_dict(r) -> dict:
"""Convert DocCheckResult to JSON-serializable dict."""
fields = ("id", "label", "passed", "severity", "matched_text",
"level", "parent", "skipped", "hint")
return {
"label": r.label, "url": r.url, "doc_type": r.doc_type,
"word_count": r.word_count, "completeness_pct": r.completeness_pct,
"correctness_pct": r.correctness_pct,
"checks": [{f: getattr(c, f) for f in fields} for c in r.checks],
"findings_count": r.findings_count, "error": r.error,
"scenario": getattr(r, "scenario", ""),
}
def _build_profile_html(profile) -> str:
from ..agent_doc_check_report import build_profile_html
return build_profile_html(profile)