Files
breakpilot-compliance/backend-compliance/compliance/services/cross_domain_doc_check.py
T
Benjamin Admin d6b8bf87c2
CI / detect-changes (push) Successful in 9s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 10s
CI / loc-budget (push) Successful in 13s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
fix: 4 Bugs gemeinsam — B22 PDF + B17 Walk-Fallback + company_name + Plausibility-Fallback
(1) B22 Cross-Domain (fix #59):
  Elli-Test fand AGB auf logpay.de NICHT obwohl URL in doc_entries
  korrekt. Vermutete Ursache: Discovery-Phase A drops/überschreibt
  Original-URL bei PDF-Fetch-Fail (word_count=0).
  Fix: _collect_audit_urls() iteriert über state.doc_entries +
  rejected_url + req.documents — Cross-Domain-Hosting ist
  unabhängig vom Text-Inhalt. Plus Trace-Logging für künftige
  Diagnose. Dedup per (doc_type, host_sld).

(2) B17 Audit-Walk-Fail-Fallback (fix #60):
  BMW v5 hatte audit_walk=None ohne Mail-Hinweis. Vermutlich
  180s-Timeout bei OneTrust-CMP-Banner-Tour.
  Fix: Timeout 180s → 300s. Plus: Bei Fail wird ein Hinweis-
  Stub mit error-Grund in state["audit_walk"] + HTML-Block
  geschrieben — Reviewer sieht den Fail statt silent-skip.

(3) company_name + origin_domain im Backend (fix #61):
  Frontend sendet seit ec03317 die zwei Felder — Backend ignorierte
  sie.
  Fix: ComplianceCheckRequest-Schema um company_name +
  origin_domain erweitert. phase_e_email priorisiert User-Input
  vor URL-Heuristik für site_name. Bei origin_domain ohne
  ableitbare doc_entries-domain wird der User-Input als domain
  übernommen.

(4) Plausibility-LLM Fallback-Modell (fix #62):
  qwen3:30b-a3b liefert auf großen DSEs (BMW 122 FAIL) gehäuft
  leere format='json'-Responses — Circuit-Breaker griff aber
  Phase blieb nutzlos.
  Fix: Default-Modell auf qwen2.5:7b umgestellt (4× kleiner,
  zuverlässiger bei format=json, ausreichendes Reasoning für
  PASS/MODIFY/DROP-Klassifikation). Plus Strategy-C eingeführt
  — Fallback-Modell (llama3.2:3b) wenn primary leer bleibt.
  BATCH_SIZE 4 → 3. ENV-Switches PLAUSIBILITY_LLM_MODEL +
  PLAUSIBILITY_FALLBACK_MODEL für Tuning.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-08 16:39:33 +02:00

193 lines
6.8 KiB
Python

"""B22 — Cross-Domain-Legal-Doc-Detector.
Erkennt: vertragsrelevante Dokumente (AGB, DSE, Widerrufsbelehrung,
Nutzungsbedingungen) liegen auf einer anderen Second-Level-Domain als
die Site selbst. Beispiel Elli/LogPay: AGB von Elli (elli.eco) liegt
auf docs.logpay.de.
Norm-Argument:
- DSGVO Art. 28: das Hosten von Vertragsdokumenten durch einen
Dritten ist Auftragsverarbeitung — AVV-Pflicht.
- DSGVO Art. 13 Abs. 1 lit. e: Empfänger / Auftragsverarbeiter
müssen in der DSE benannt sein.
- Vertragsrechtlich: AGB-Verbindlichkeit wackelig wenn der
Dokumenten-Host wechselt — was passiert wenn der externe Host
den Pfad ändert (Cool-URLs-Problem § 312i BGB).
Severity:
- HIGH bei AGB / Widerrufsbelehrung (vertragsrelevant)
- MEDIUM bei DSE / Nutzungsbedingungen
- INFO bei Cookie-Policy / Impressum (eher Best-Practice)
"""
from __future__ import annotations
import logging
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
_COMPOUND_TLDS = {
"co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in",
"com.au", "com.br", "com.mx", "com.tr", "com.sg",
}
_SEVERITY_BY_DOC = {
"agb": "HIGH",
"widerruf": "HIGH",
"dse": "MEDIUM",
"nutzungsbedingungen": "MEDIUM",
"cookie": "INFO",
"impressum": "INFO",
"social_media": "INFO",
}
def _sld(host: str) -> str:
"""Extract the second-level domain. Handles compound TLDs."""
if not host:
return ""
host = host.lower().lstrip("www.")
parts = host.split(".")
if len(parts) < 2:
return host
if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS:
return parts[-3]
return parts[-2]
def _site_origin_sld(state: dict) -> str:
"""Find the primary site SLD by counting most common host in
submitted URLs."""
counter: dict[str, int] = {}
for e in (state.get("doc_entries") or []):
url = (e.get("url") or "").strip()
if not url or "://" not in url:
continue
# Skip auto-discovered docs (they may already be cross-domain
# by design — we want the USER's stated origin).
if e.get("auto_discovered"):
continue
try:
host = urlparse(url).netloc
sld = _sld(host)
if sld:
counter[sld] = counter.get(sld, 0) + 1
except Exception:
continue
if not counter:
# Fallback: use any URL
for e in (state.get("doc_entries") or []):
url = (e.get("url") or "").strip()
if url and "://" in url:
return _sld(urlparse(url).netloc)
return ""
return max(counter, key=counter.get)
def _collect_audit_urls(state: dict) -> list[tuple[str, str]]:
"""Sammle (doc_type, url) aus BEIDEN Quellen — state.doc_entries
(nach Discovery) UND req.documents (USER-Original-Input). Discovery
kann Original-URLs verlieren (PDF-Fetch-Fail, Auto-Reclassify), aber
Cross-Domain-Hosting ist juristisch unabhängig vom Text-Inhalt
der Datei.
"""
seen: set[tuple[str, str]] = set()
out: list[tuple[str, str]] = []
for e in (state.get("doc_entries") or []):
url = (e.get("url") or "").strip()
doc_type = (e.get("doc_type") or "").lower()
if url and doc_type and (doc_type, url) not in seen:
seen.add((doc_type, url))
out.append((doc_type, url))
# rejected_url ist die Original-URL die Discovery rejected hat
rej = (e.get("rejected_url") or "").strip()
if rej and doc_type and (doc_type, rej) not in seen:
seen.add((doc_type, rej))
out.append((doc_type, rej))
# Fallback: req.documents — USER hat sie explizit eingegeben
req = state.get("req")
if req is not None:
for d in getattr(req, "documents", []) or []:
url = (getattr(d, "url", "") or "").strip()
doc_type = (getattr(d, "doc_type", "") or "").lower()
if url and doc_type and (doc_type, url) not in seen:
seen.add((doc_type, url))
out.append((doc_type, url))
return out
def check_cross_domain_docs(state: dict) -> list[dict]:
"""Emit findings for doc-URLs whose host has a different SLD
than the site origin."""
primary = _site_origin_sld(state)
if not primary:
logger.info("B22 cross-domain: kein primary SLD ermittelbar")
return []
findings: list[dict] = []
audit_urls = _collect_audit_urls(state)
logger.info("B22 cross-domain: primary=%s, prüfe %d URL(s)",
primary, len(audit_urls))
emitted_keys: set[tuple[str, str]] = set()
for doc_type, url in audit_urls:
if "://" not in url:
continue
try:
host = urlparse(url).netloc
url_sld = _sld(host)
except Exception:
continue
if not url_sld or url_sld == primary:
continue
# Dedup pro (doc_type, host_sld) damit rejected_url + url nicht
# doppelt gemeldet werden
e_key = (doc_type, url_sld)
if e_key in emitted_keys:
continue
emitted_keys.add(e_key)
# Cross-Domain detected
severity = _SEVERITY_BY_DOC.get(doc_type, "MEDIUM")
doc_label = {
"agb": "Allgemeine Geschäftsbedingungen",
"widerruf": "Widerrufsbelehrung",
"dse": "Datenschutzerklärung",
"nutzungsbedingungen": "Nutzungsbedingungen",
"cookie": "Cookie-Richtlinie",
"impressum": "Impressum",
"social_media": "Social-Media-Hinweise",
}.get(doc_type, doc_type.upper())
findings.append({
"check_id": "CROSS-DOMAIN-DOC-001",
"severity": severity,
"severity_reason": "third_party_hosted",
"doc_type": doc_type,
"site_sld": primary,
"host_sld": url_sld,
"url": url,
"title": (
f"{doc_label} liegt auf Drittanbieter-Domain "
f"({host}) statt {primary}"
),
"norm": (
"DSGVO Art. 28 (AVV) + Art. 13 Abs. 1 lit. e (Empfänger) + "
"§ 312i BGB (Cool-URLs / Vertragspflicht)"
),
"evidence": (
f"Site-Origin: {primary} · "
f"Dokument gehostet auf: {host} · "
f"URL: {url[:120]}"
),
"recommended_action": (
f"Entweder das Dokument auf eigene Domain ({primary}) "
"migrieren ODER (a) den externen Host {host} als "
"Auftragsverarbeiter in der DSE benennen, (b) AVV "
"abschließen, (c) sicherstellen dass URL-Stabilität "
f"vertraglich garantiert ist (§ 312i BGB Cool-URL-Pflicht)."
),
})
if findings:
logger.info("B22 cross-domain: %d finding(s)", len(findings))
return findings