Merge feat/zeroclaw-compliance-agent into main
Brings all compliance doc-check features: - 162 regex checks + 1874 Master Controls - LLM-agnostic agent with tool calling - Banner check (46 checks, 30 CMPs, stealth, Shadow DOM) - Impressum check (24 checks) - Deep consent verification (DataLayer, GCM, TCF) - CMP E2E tests (39 tests) - HTML email reports, FAQ, persistent history Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -23,23 +23,10 @@ def consent_to_dict(c: BannerConsentDB) -> dict[str, Any]:
|
||||
"device_fingerprint": c.device_fingerprint,
|
||||
"categories": c.categories or [],
|
||||
"vendors": c.vendors or [],
|
||||
"vendor_consents": c.vendor_consents or {},
|
||||
"ip_hash": c.ip_hash,
|
||||
"user_agent": c.user_agent,
|
||||
"consent_string": c.consent_string,
|
||||
"linked_email": c.linked_email,
|
||||
"consent_method": c.consent_method,
|
||||
"banner_version": c.banner_version,
|
||||
"banner_config_hash": c.banner_config_hash,
|
||||
"geo_country": c.geo_country,
|
||||
"geo_region": c.geo_region,
|
||||
"consent_scope": c.consent_scope,
|
||||
"page_url": c.page_url,
|
||||
"referrer": c.referrer,
|
||||
"device_type": c.device_type,
|
||||
"browser": c.browser,
|
||||
"os": c.os,
|
||||
"screen_resolution": c.screen_resolution,
|
||||
"session_id": c.session_id,
|
||||
"expires_at": c.expires_at.isoformat() if c.expires_at else None,
|
||||
"created_at": c.created_at.isoformat() if c.created_at else None,
|
||||
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
"""
|
||||
Agent PDF Export — generates printable compliance scan reports.
|
||||
|
||||
Uses WeasyPrint to convert HTML report to PDF.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from io import BytesIO
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_scan_pdf(scan_data: dict) -> bytes:
|
||||
"""Generate a PDF report from scan results."""
|
||||
from weasyprint import HTML
|
||||
|
||||
html = _build_report_html(scan_data)
|
||||
pdf_buffer = BytesIO()
|
||||
HTML(string=html).write_pdf(pdf_buffer)
|
||||
return pdf_buffer.getvalue()
|
||||
|
||||
|
||||
def _severity_color(sev: str) -> str:
|
||||
return {"HIGH": "#dc2626", "CRITICAL": "#991b1b", "MEDIUM": "#ea580c", "LOW": "#2563eb"}.get(sev, "#6b7280")
|
||||
|
||||
|
||||
def _build_report_html(data: dict) -> str:
|
||||
"""Build HTML for the PDF report."""
|
||||
url = data.get("url", "")
|
||||
scan_type = data.get("scan_type", "scan")
|
||||
mode = data.get("analysis_mode", "post_launch")
|
||||
findings = data.get("findings", [])
|
||||
services = data.get("services", [])
|
||||
risk = data.get("risk_level", "")
|
||||
score = data.get("risk_score", 0)
|
||||
pages = data.get("pages_scanned", 0)
|
||||
now = datetime.now(timezone.utc).strftime("%d.%m.%Y %H:%M UTC")
|
||||
|
||||
mode_label = "Live-Website Pruefung" if mode == "post_launch" else "Interne Pruefung"
|
||||
type_label = {"quick": "Schnellanalyse", "scan": "Website-Scan", "consent_test": "Cookie-Test"}.get(scan_type, scan_type)
|
||||
|
||||
findings_rows = ""
|
||||
for f in findings:
|
||||
sev = f.get("severity", "MEDIUM") if isinstance(f, dict) else "MEDIUM"
|
||||
text = f.get("text", str(f)) if isinstance(f, dict) else str(f)
|
||||
color = _severity_color(sev)
|
||||
findings_rows += f'<tr><td style="color:{color};font-weight:bold;padding:6px 8px;border-bottom:1px solid #e5e7eb;">{sev}</td><td style="padding:6px 8px;border-bottom:1px solid #e5e7eb;">{text}</td></tr>'
|
||||
|
||||
services_rows = ""
|
||||
for s in services:
|
||||
if isinstance(s, dict):
|
||||
status_icon = "✓" if s.get("in_dse") or s.get("status") == "ok" else "✗"
|
||||
status_color = "#16a34a" if status_icon == "✓" else "#dc2626"
|
||||
services_rows += f'<tr><td style="color:{status_color};font-weight:bold;padding:4px 8px;border-bottom:1px solid #f3f4f6;">{status_icon}</td><td style="padding:4px 8px;border-bottom:1px solid #f3f4f6;">{s.get("name","")}</td><td style="padding:4px 8px;border-bottom:1px solid #f3f4f6;">{s.get("country","")}</td><td style="padding:4px 8px;border-bottom:1px solid #f3f4f6;">{s.get("category","")}</td></tr>'
|
||||
|
||||
return f"""<!DOCTYPE html>
|
||||
<html><head><meta charset="utf-8">
|
||||
<style>
|
||||
body {{ font-family: -apple-system, Arial, sans-serif; font-size: 11px; color: #1e293b; margin: 40px; }}
|
||||
h1 {{ font-size: 20px; color: #1e1b4b; margin-bottom: 4px; }}
|
||||
h2 {{ font-size: 14px; color: #334155; border-bottom: 2px solid #e2e8f0; padding-bottom: 4px; margin-top: 24px; }}
|
||||
.meta {{ color: #64748b; font-size: 10px; margin-bottom: 20px; }}
|
||||
.badge {{ display: inline-block; padding: 2px 8px; border-radius: 4px; color: white; font-size: 10px; font-weight: bold; }}
|
||||
table {{ width: 100%; border-collapse: collapse; }}
|
||||
th {{ text-align: left; padding: 6px 8px; background: #f8fafc; border-bottom: 2px solid #e2e8f0; font-size: 10px; color: #64748b; }}
|
||||
.warning {{ background: #fef2f2; border-left: 4px solid #dc2626; padding: 10px 14px; margin: 16px 0; }}
|
||||
.footer {{ margin-top: 30px; padding-top: 10px; border-top: 1px solid #e2e8f0; color: #94a3b8; font-size: 9px; }}
|
||||
</style></head><body>
|
||||
|
||||
<h1>Compliance Agent Report</h1>
|
||||
<p class="meta">{type_label} | {mode_label} | {now}</p>
|
||||
|
||||
<table style="margin-bottom:20px;">
|
||||
<tr><td style="padding:4px 0;color:#64748b;width:150px;">URL</td><td style="padding:4px 0;"><strong>{url}</strong></td></tr>
|
||||
<tr><td style="padding:4px 0;color:#64748b;">Risikobewertung</td><td style="padding:4px 0;"><span class="badge" style="background:{_severity_color(risk) if risk else '#6b7280'}">{risk} ({score}/100)</span></td></tr>
|
||||
<tr><td style="padding:4px 0;color:#64748b;">Seiten gescannt</td><td style="padding:4px 0;">{pages}</td></tr>
|
||||
<tr><td style="padding:4px 0;color:#64748b;">Findings</td><td style="padding:4px 0;"><strong>{len(findings)}</strong></td></tr>
|
||||
</table>
|
||||
|
||||
{'<div class="warning"><strong>ACHTUNG:</strong> Maengel auf einer bereits veroeffentlichten Website. Sofortige Korrektur empfohlen.</div>' if mode == "post_launch" and findings else ''}
|
||||
|
||||
<h2>Findings ({len(findings)})</h2>
|
||||
<table>
|
||||
<tr><th>Schwere</th><th>Beschreibung</th></tr>
|
||||
{findings_rows if findings_rows else '<tr><td colspan="2" style="padding:8px;color:#16a34a;">Keine Findings — alles OK</td></tr>'}
|
||||
</table>
|
||||
|
||||
{'<h2>Dienstleister-Abgleich</h2><table><tr><th>Status</th><th>Dienst</th><th>Land</th><th>Kategorie</th></tr>' + services_rows + '</table>' if services_rows else ''}
|
||||
|
||||
<div class="footer">
|
||||
Automatisch erstellt vom BreakPilot Compliance Agent | {now}<br>
|
||||
Dieses Dokument ersetzt keine Rechtsberatung.
|
||||
</div>
|
||||
</body></html>"""
|
||||
@@ -0,0 +1,193 @@
|
||||
"""
|
||||
Banner A/B Testing Service — variant assignment, stats, significance.
|
||||
|
||||
Deterministic variant assignment via device fingerprint hash ensures
|
||||
the same device always sees the same variant (sticky bucketing).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import math
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
class BannerABService:
|
||||
"""A/B testing for consent banner variants."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Variant CRUD
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def list_variants(self, tenant_id: str, site_config_id: str) -> list[dict]:
|
||||
q = text("""
|
||||
SELECT * FROM compliance_banner_variants
|
||||
WHERE tenant_id = :tid AND site_config_id = :scid
|
||||
ORDER BY variant_key
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tenant_id, "scid": site_config_id}).fetchall()
|
||||
return [dict(r._mapping) for r in rows]
|
||||
|
||||
def create_variant(self, tenant_id: str, site_config_id: str, data: dict) -> dict:
|
||||
q = text("""
|
||||
INSERT INTO compliance_banner_variants
|
||||
(tenant_id, site_config_id, variant_name, variant_key, traffic_percent, is_control,
|
||||
banner_title, banner_description, position, style, primary_color, show_decline_all, theme_overrides)
|
||||
VALUES (:tid, :scid, :name, :key, :pct, :ctrl,
|
||||
:title, :desc, :pos, :style, :color, :decline, :theme)
|
||||
RETURNING *
|
||||
""")
|
||||
row = self.db.execute(q, {
|
||||
"tid": tenant_id, "scid": site_config_id,
|
||||
"name": data.get("variant_name", ""),
|
||||
"key": data.get("variant_key", "A"),
|
||||
"pct": data.get("traffic_percent", 50),
|
||||
"ctrl": data.get("is_control", False),
|
||||
"title": data.get("banner_title"),
|
||||
"desc": data.get("banner_description"),
|
||||
"pos": data.get("position"),
|
||||
"style": data.get("style"),
|
||||
"color": data.get("primary_color"),
|
||||
"decline": data.get("show_decline_all"),
|
||||
"theme": data.get("theme_overrides", "{}"),
|
||||
}).fetchone()
|
||||
self.db.commit()
|
||||
return dict(row._mapping)
|
||||
|
||||
def update_variant(self, variant_id: str, data: dict) -> Optional[dict]:
|
||||
sets, params = [], {"vid": variant_id}
|
||||
for field in ["variant_name", "traffic_percent", "is_control", "banner_title",
|
||||
"banner_description", "position", "style", "primary_color",
|
||||
"show_decline_all", "is_active"]:
|
||||
if field in data and data[field] is not None:
|
||||
sets.append(f"{field} = :{field}")
|
||||
params[field] = data[field]
|
||||
if not sets:
|
||||
return None
|
||||
sets.append("updated_at = NOW()")
|
||||
q = text(f"UPDATE compliance_banner_variants SET {', '.join(sets)} WHERE id = :vid RETURNING *")
|
||||
row = self.db.execute(q, params).fetchone()
|
||||
self.db.commit()
|
||||
return dict(row._mapping) if row else None
|
||||
|
||||
def delete_variant(self, variant_id: str) -> bool:
|
||||
q = text("DELETE FROM compliance_banner_variants WHERE id = :vid")
|
||||
result = self.db.execute(q, {"vid": variant_id})
|
||||
self.db.commit()
|
||||
return result.rowcount > 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Variant Assignment (deterministic sticky bucketing)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def assign_variant(self, site_config_id: str, device_fingerprint: str) -> Optional[dict]:
|
||||
"""Assign a variant based on device fingerprint hash. Returns variant or None."""
|
||||
variants = self.db.execute(text("""
|
||||
SELECT * FROM compliance_banner_variants
|
||||
WHERE site_config_id = :scid AND is_active = TRUE
|
||||
ORDER BY variant_key
|
||||
"""), {"scid": site_config_id}).fetchall()
|
||||
if not variants:
|
||||
return None
|
||||
|
||||
# Deterministic bucket 0-99 from device fingerprint
|
||||
bucket = int(hashlib.md5(f"{site_config_id}:{device_fingerprint}".encode()).hexdigest(), 16) % 100
|
||||
|
||||
cumulative = 0
|
||||
for v in variants:
|
||||
cumulative += v.traffic_percent
|
||||
if bucket < cumulative:
|
||||
return dict(v._mapping)
|
||||
# Fallback to last variant
|
||||
return dict(variants[-1]._mapping)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Stats with statistical significance
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_variant_stats(self, tenant_id: str, site_config_id: str) -> list[dict]:
|
||||
"""Per-variant stats with chi-squared significance test."""
|
||||
variants = self.list_variants(tenant_id, site_config_id)
|
||||
if not variants:
|
||||
return []
|
||||
|
||||
results = []
|
||||
for v in variants:
|
||||
vid = str(v["id"])
|
||||
vkey = v["variant_key"]
|
||||
q = text("""
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
COUNT(*) FILTER (WHERE action = 'consent_given') AS accepted,
|
||||
COUNT(*) FILTER (WHERE action IN ('consent_withdrawn', 'consent_revoked')) AS rejected
|
||||
FROM compliance_banner_consent_audit_log
|
||||
WHERE tenant_id = :tid AND variant_key = :vkey
|
||||
""")
|
||||
row = self.db.execute(q, {"tid": tenant_id, "vkey": vkey}).fetchone()
|
||||
total = row.total if row else 0
|
||||
accepted = row.accepted if row else 0
|
||||
results.append({
|
||||
"variant_id": vid,
|
||||
"variant_key": vkey,
|
||||
"variant_name": v["variant_name"],
|
||||
"traffic_percent": v["traffic_percent"],
|
||||
"is_control": v["is_control"],
|
||||
"total": total,
|
||||
"accepted": accepted,
|
||||
"opt_in_rate": round(accepted / total * 100, 1) if total > 0 else 0,
|
||||
})
|
||||
|
||||
# Chi-squared test between control and best variant
|
||||
control = next((r for r in results if r["is_control"]), None)
|
||||
if control and len(results) > 1:
|
||||
best = max((r for r in results if not r["is_control"]), key=lambda x: x["opt_in_rate"], default=None)
|
||||
if best and control["total"] > 0 and best["total"] > 0:
|
||||
sig = self._chi_squared_significance(
|
||||
control["accepted"], control["total"],
|
||||
best["accepted"], best["total"],
|
||||
)
|
||||
best["is_winner"] = sig > 0.95
|
||||
best["significance"] = round(sig * 100, 1)
|
||||
control["is_winner"] = False
|
||||
control["significance"] = round((1 - sig) * 100, 1)
|
||||
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def _chi_squared_significance(a_success: int, a_total: int, b_success: int, b_total: int) -> float:
|
||||
"""Simple chi-squared test for 2x2 contingency table. Returns confidence 0-1."""
|
||||
a_fail = a_total - a_success
|
||||
b_fail = b_total - b_success
|
||||
n = a_total + b_total
|
||||
if n == 0:
|
||||
return 0.0
|
||||
|
||||
# Expected values
|
||||
exp_a_s = a_total * (a_success + b_success) / n
|
||||
exp_a_f = a_total * (a_fail + b_fail) / n
|
||||
exp_b_s = b_total * (a_success + b_success) / n
|
||||
exp_b_f = b_total * (a_fail + b_fail) / n
|
||||
|
||||
chi2 = 0.0
|
||||
for obs, exp in [(a_success, exp_a_s), (a_fail, exp_a_f), (b_success, exp_b_s), (b_fail, exp_b_f)]:
|
||||
if exp > 0:
|
||||
chi2 += (obs - exp) ** 2 / exp
|
||||
|
||||
# Approximate p-value for 1 df using Wilson-Hilferty
|
||||
if chi2 < 0.001:
|
||||
return 0.0
|
||||
if chi2 > 10.83:
|
||||
return 0.999
|
||||
# Lookup table for common thresholds (1 df)
|
||||
thresholds = [(2.706, 0.90), (3.841, 0.95), (5.024, 0.975), (6.635, 0.99), (10.83, 0.999)]
|
||||
confidence = 0.0
|
||||
for threshold, conf in thresholds:
|
||||
if chi2 >= threshold:
|
||||
confidence = conf
|
||||
return confidence
|
||||
@@ -0,0 +1,135 @@
|
||||
"""
|
||||
Banner consent analytics — time-series, device breakdown, bounce rate.
|
||||
|
||||
Reads from BannerConsentAuditLogDB for aggregated analytics.
|
||||
"""
|
||||
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
class BannerAnalyticsService:
|
||||
"""Provides aggregated consent analytics for a site."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
def get_time_series(
|
||||
self,
|
||||
tenant_id: str,
|
||||
site_id: str,
|
||||
period: str = "daily",
|
||||
days: int = 30,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Opt-in rate per day/week over the last N days."""
|
||||
trunc = "day" if period == "daily" else "week"
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
q = text(f"""
|
||||
SELECT DATE_TRUNC(:trunc, created_at) AS period,
|
||||
COUNT(*) FILTER (WHERE action = 'consent_given') AS given,
|
||||
COUNT(*) FILTER (WHERE action = 'consent_updated') AS updated,
|
||||
COUNT(*) FILTER (WHERE action IN ('consent_withdrawn', 'consent_revoked')) AS withdrawn,
|
||||
COUNT(*) AS total
|
||||
FROM compliance_banner_consent_audit_log
|
||||
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
|
||||
GROUP BY 1 ORDER BY 1
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff, "trunc": trunc}).fetchall()
|
||||
return [
|
||||
{
|
||||
"period": r.period.isoformat() if r.period else None,
|
||||
"given": r.given,
|
||||
"updated": r.updated,
|
||||
"withdrawn": r.withdrawn,
|
||||
"total": r.total,
|
||||
"opt_in_rate": round((r.given + r.updated) / r.total * 100, 1) if r.total > 0 else 0,
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
def get_category_breakdown(
|
||||
self,
|
||||
tenant_id: str,
|
||||
site_id: str,
|
||||
days: int = 30,
|
||||
) -> dict[str, dict[str, int]]:
|
||||
"""Acceptance count per category."""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
q = text("""
|
||||
SELECT categories FROM compliance_banner_consent_audit_log
|
||||
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
|
||||
AND action IN ('consent_given', 'consent_updated')
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff}).fetchall()
|
||||
counts: dict[str, int] = {}
|
||||
total = len(rows)
|
||||
for r in rows:
|
||||
cats = r.categories if isinstance(r.categories, list) else []
|
||||
for cat in cats:
|
||||
counts[cat] = counts.get(cat, 0) + 1
|
||||
return {
|
||||
cat: {"count": count, "total": total, "rate": round(count / total * 100, 1) if total > 0 else 0}
|
||||
for cat, count in sorted(counts.items())
|
||||
}
|
||||
|
||||
def get_device_breakdown(
|
||||
self,
|
||||
tenant_id: str,
|
||||
site_id: str,
|
||||
days: int = 30,
|
||||
) -> dict[str, int]:
|
||||
"""Mobile/Desktop/Tablet classification from user_agent."""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
q = text("""
|
||||
SELECT user_agent FROM compliance_banner_consent_audit_log
|
||||
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
|
||||
AND user_agent IS NOT NULL
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff}).fetchall()
|
||||
result = {"desktop": 0, "mobile": 0, "tablet": 0, "unknown": 0}
|
||||
mobile_re = re.compile(r"Mobile|Android|iPhone|iPod", re.IGNORECASE)
|
||||
tablet_re = re.compile(r"iPad|Tablet|PlayBook|Silk", re.IGNORECASE)
|
||||
for r in rows:
|
||||
ua = r.user_agent or ""
|
||||
if tablet_re.search(ua):
|
||||
result["tablet"] += 1
|
||||
elif mobile_re.search(ua):
|
||||
result["mobile"] += 1
|
||||
elif ua:
|
||||
result["desktop"] += 1
|
||||
else:
|
||||
result["unknown"] += 1
|
||||
return result
|
||||
|
||||
def get_overview_stats(
|
||||
self,
|
||||
tenant_id: str,
|
||||
site_id: str,
|
||||
days: int = 30,
|
||||
) -> dict[str, Any]:
|
||||
"""High-level stats: total consents, active, withdrawn, opt-in rate."""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
q = text("""
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE action = 'consent_given') AS given,
|
||||
COUNT(*) FILTER (WHERE action = 'consent_updated') AS updated,
|
||||
COUNT(*) FILTER (WHERE action IN ('consent_withdrawn', 'consent_revoked')) AS withdrawn,
|
||||
COUNT(*) AS total
|
||||
FROM compliance_banner_consent_audit_log
|
||||
WHERE tenant_id = :tid AND site_id = :sid AND created_at >= :cutoff
|
||||
""")
|
||||
r = self.db.execute(q, {"tid": tenant_id, "sid": site_id, "cutoff": cutoff}).fetchone()
|
||||
total = r.total if r else 0
|
||||
given = (r.given or 0) + (r.updated or 0) if r else 0
|
||||
return {
|
||||
"period_days": days,
|
||||
"total_interactions": total,
|
||||
"consents_given": r.given if r else 0,
|
||||
"consents_updated": r.updated if r else 0,
|
||||
"consents_withdrawn": r.withdrawn if r else 0,
|
||||
"opt_in_rate": round(given / total * 100, 1) if total > 0 else 0,
|
||||
}
|
||||
@@ -73,9 +73,8 @@ class BannerConsentService:
|
||||
ip_hash: Optional[str] = None,
|
||||
banner_config_hash: Optional[str] = None,
|
||||
consent_version: Optional[int] = None,
|
||||
*,
|
||||
consent_method: Optional[str] = None,
|
||||
page_url: Optional[str] = None,
|
||||
vendor_consents: Optional[dict[str, bool]] = None,
|
||||
user_agent: Optional[str] = None,
|
||||
) -> None:
|
||||
entry = BannerConsentAuditLogDB(
|
||||
tenant_id=tenant_id,
|
||||
@@ -84,11 +83,11 @@ class BannerConsentService:
|
||||
site_id=site_id,
|
||||
device_fingerprint=device_fingerprint,
|
||||
categories=categories or [],
|
||||
vendor_consents=vendor_consents or {},
|
||||
ip_hash=ip_hash,
|
||||
user_agent=user_agent,
|
||||
banner_config_hash=banner_config_hash,
|
||||
consent_version=consent_version,
|
||||
consent_method=consent_method,
|
||||
page_url=page_url,
|
||||
)
|
||||
self.db.add(entry)
|
||||
|
||||
@@ -134,6 +133,24 @@ class BannerConsentService:
|
||||
return max(v.retention_days for v in vendors if v.retention_days)
|
||||
return max((CATEGORY_RETENTION_DAYS.get(c, 365) for c in categories), default=365)
|
||||
|
||||
def _maybe_generate_tc_string(
|
||||
self, tenant_id: uuid.UUID, site_id: str, categories: list[str],
|
||||
) -> Optional[str]:
|
||||
"""Generate TC String if TCF is enabled for this site."""
|
||||
config = (
|
||||
self.db.query(BannerSiteConfigDB)
|
||||
.filter(BannerSiteConfigDB.tenant_id == tenant_id, BannerSiteConfigDB.site_id == site_id)
|
||||
.first()
|
||||
)
|
||||
if not config or not config.tcf_enabled:
|
||||
return None
|
||||
try:
|
||||
from compliance.services.tcf_encoder_service import TCFEncoderService
|
||||
encoder = TCFEncoderService()
|
||||
return encoder.encode_from_categories(categories)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Consent CRUD (public SDK)
|
||||
# ------------------------------------------------------------------
|
||||
@@ -148,16 +165,7 @@ class BannerConsentService:
|
||||
ip_address: Optional[str],
|
||||
user_agent: Optional[str],
|
||||
consent_string: Optional[str],
|
||||
*,
|
||||
consent_method: Optional[str] = None,
|
||||
page_url: Optional[str] = None,
|
||||
referrer: Optional[str] = None,
|
||||
device_type: Optional[str] = None,
|
||||
browser: Optional[str] = None,
|
||||
os: Optional[str] = None,
|
||||
screen_resolution: Optional[str] = None,
|
||||
session_id: Optional[str] = None,
|
||||
consent_scope: Optional[str] = None,
|
||||
vendor_consents: Optional[dict[str, bool]] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Upsert a device consent row for (tenant, site, device_fingerprint).
|
||||
|
||||
@@ -173,20 +181,9 @@ class BannerConsentService:
|
||||
expires_at = now + timedelta(days=retention)
|
||||
config_hash, config_ver = self._compute_config_hash(tid, site_id)
|
||||
|
||||
# Vendor-agnostische Zusatzfelder
|
||||
extra = {
|
||||
"consent_method": consent_method,
|
||||
"banner_version": config_ver,
|
||||
"banner_config_hash": config_hash,
|
||||
"page_url": page_url,
|
||||
"referrer": referrer,
|
||||
"device_type": device_type,
|
||||
"browser": browser,
|
||||
"os": os,
|
||||
"screen_resolution": screen_resolution,
|
||||
"session_id": session_id,
|
||||
"consent_scope": consent_scope or "domain",
|
||||
}
|
||||
# Auto-generate TC String if TCF is enabled for this site
|
||||
if not consent_string:
|
||||
consent_string = self._maybe_generate_tc_string(tid, site_id, categories)
|
||||
|
||||
existing = (
|
||||
self.db.query(BannerConsentDB)
|
||||
@@ -201,18 +198,17 @@ class BannerConsentService:
|
||||
if existing:
|
||||
existing.categories = categories
|
||||
existing.vendors = vendors
|
||||
existing.vendor_consents = vendor_consents or {}
|
||||
existing.ip_hash = ip_hash
|
||||
existing.user_agent = user_agent
|
||||
existing.consent_string = consent_string
|
||||
existing.expires_at = expires_at
|
||||
existing.updated_at = now
|
||||
for key, val in extra.items():
|
||||
setattr(existing, key, val)
|
||||
self.db.flush()
|
||||
self._log(
|
||||
tid, existing.id, "consent_updated", site_id, device_fingerprint,
|
||||
categories, ip_hash, config_hash, config_ver,
|
||||
consent_method=consent_method, page_url=page_url,
|
||||
vendor_consents=vendor_consents, user_agent=user_agent,
|
||||
)
|
||||
self.db.commit()
|
||||
self.db.refresh(existing)
|
||||
@@ -224,18 +220,18 @@ class BannerConsentService:
|
||||
device_fingerprint=device_fingerprint,
|
||||
categories=categories,
|
||||
vendors=vendors,
|
||||
vendor_consents=vendor_consents or {},
|
||||
ip_hash=ip_hash,
|
||||
user_agent=user_agent,
|
||||
consent_string=consent_string,
|
||||
expires_at=expires_at,
|
||||
**extra,
|
||||
)
|
||||
self.db.add(consent)
|
||||
self.db.flush()
|
||||
self._log(
|
||||
tid, consent.id, "consent_given", site_id, device_fingerprint,
|
||||
categories, ip_hash, config_hash, config_ver,
|
||||
consent_method=consent_method, page_url=page_url,
|
||||
vendor_consents=vendor_consents, user_agent=user_agent,
|
||||
)
|
||||
self.db.commit()
|
||||
self.db.refresh(consent)
|
||||
@@ -383,14 +379,7 @@ class BannerConsentService:
|
||||
total = base.count()
|
||||
category_stats: dict[str, int] = {}
|
||||
for c in base.all():
|
||||
raw = c.categories or []
|
||||
if isinstance(raw, str):
|
||||
try:
|
||||
import json
|
||||
raw = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
raw = []
|
||||
cats: list[str] = list(raw) if isinstance(raw, list) else []
|
||||
cats: list[str] = list(c.categories or [])
|
||||
for cat in cats:
|
||||
category_stats[cat] = category_stats.get(cat, 0) + 1
|
||||
return {
|
||||
@@ -404,58 +393,3 @@ class BannerConsentService:
|
||||
for cat, count in category_stats.items()
|
||||
},
|
||||
}
|
||||
|
||||
def list_consents(
|
||||
self, tenant_id: str, site_id: Optional[str] = None,
|
||||
limit: int = 50, offset: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""List paginated banner consents with parsed categories."""
|
||||
import json as _json
|
||||
tid = uuid.UUID(tenant_id)
|
||||
base = self.db.query(BannerConsentDB).filter(BannerConsentDB.tenant_id == tid)
|
||||
if site_id:
|
||||
base = base.filter(BannerConsentDB.site_id == site_id)
|
||||
total = base.count()
|
||||
rows = base.order_by(BannerConsentDB.created_at.desc()).offset(offset).limit(limit).all()
|
||||
consents = []
|
||||
for c in rows:
|
||||
raw_cats = c.categories or []
|
||||
if isinstance(raw_cats, str):
|
||||
try:
|
||||
raw_cats = _json.loads(raw_cats)
|
||||
except (ValueError, TypeError):
|
||||
raw_cats = []
|
||||
raw_vendors = c.vendors or []
|
||||
if isinstance(raw_vendors, str):
|
||||
try:
|
||||
raw_vendors = _json.loads(raw_vendors)
|
||||
except (ValueError, TypeError):
|
||||
raw_vendors = []
|
||||
consents.append({
|
||||
"id": str(c.id),
|
||||
"site_id": c.site_id,
|
||||
"device_fingerprint": c.device_fingerprint,
|
||||
"categories": list(raw_cats) if isinstance(raw_cats, list) else [],
|
||||
"vendors": list(raw_vendors) if isinstance(raw_vendors, list) else [],
|
||||
"ip_hash": c.ip_hash,
|
||||
"user_agent": c.user_agent,
|
||||
"linked_email": c.linked_email,
|
||||
"consent_string": c.consent_string,
|
||||
"consent_method": c.consent_method,
|
||||
"banner_version": c.banner_version,
|
||||
"banner_config_hash": c.banner_config_hash,
|
||||
"geo_country": c.geo_country,
|
||||
"geo_region": c.geo_region,
|
||||
"consent_scope": c.consent_scope,
|
||||
"page_url": c.page_url,
|
||||
"referrer": c.referrer,
|
||||
"device_type": c.device_type,
|
||||
"browser": c.browser,
|
||||
"os": c.os,
|
||||
"screen_resolution": c.screen_resolution,
|
||||
"session_id": c.session_id,
|
||||
"expires_at": c.expires_at.isoformat() if c.expires_at else None,
|
||||
"created_at": c.created_at.isoformat() if c.created_at else None,
|
||||
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
|
||||
})
|
||||
return {"consents": consents, "total": total, "limit": limit, "offset": offset}
|
||||
|
||||
@@ -40,6 +40,22 @@ _CONTROL_COLUMNS = """
|
||||
"""
|
||||
|
||||
|
||||
def _ensure_list(val: Any) -> list:
|
||||
"""Ensure a JSONB value is always a Python list."""
|
||||
if isinstance(val, list):
|
||||
return val
|
||||
if val is None:
|
||||
return []
|
||||
if isinstance(val, str):
|
||||
try:
|
||||
import json
|
||||
parsed = json.loads(val)
|
||||
return parsed if isinstance(parsed, list) else []
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return []
|
||||
return []
|
||||
|
||||
|
||||
def _control_row(r: Any) -> dict[str, Any]:
|
||||
"""Serialize a canonical_controls SELECT row to a response dict."""
|
||||
return {
|
||||
@@ -49,19 +65,19 @@ def _control_row(r: Any) -> dict[str, Any]:
|
||||
"title": r.title,
|
||||
"objective": r.objective,
|
||||
"rationale": r.rationale,
|
||||
"scope": r.scope,
|
||||
"requirements": r.requirements,
|
||||
"test_procedure": r.test_procedure,
|
||||
"evidence": r.evidence,
|
||||
"scope": r.scope if isinstance(r.scope, dict) else {},
|
||||
"requirements": _ensure_list(r.requirements),
|
||||
"test_procedure": _ensure_list(r.test_procedure),
|
||||
"evidence": _ensure_list(r.evidence),
|
||||
"severity": r.severity,
|
||||
"risk_score": float(r.risk_score) if r.risk_score is not None else None,
|
||||
"implementation_effort": r.implementation_effort,
|
||||
"evidence_confidence": (
|
||||
float(r.evidence_confidence) if r.evidence_confidence is not None else None
|
||||
),
|
||||
"open_anchors": r.open_anchors,
|
||||
"open_anchors": _ensure_list(r.open_anchors),
|
||||
"release_state": r.release_state,
|
||||
"tags": r.tags or [],
|
||||
"tags": _ensure_list(r.tags),
|
||||
"created_at": r.created_at.isoformat() if r.created_at else None,
|
||||
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,216 @@
|
||||
"""
|
||||
Compliance Report PDF Generator — generates a comprehensive A4 PDF
|
||||
covering all compliance modules for a project.
|
||||
|
||||
Uses reportlab (same as audit_pdf_generator.py).
|
||||
"""
|
||||
|
||||
import io
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from reportlab.lib import colors
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.lib.units import mm
|
||||
from reportlab.platypus import (
|
||||
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak,
|
||||
)
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Colors
|
||||
PURPLE = colors.HexColor("#7c3aed")
|
||||
LIGHT_PURPLE = colors.HexColor("#f5f3ff")
|
||||
GRAY = colors.HexColor("#6b7280")
|
||||
GREEN = colors.HexColor("#16a34a")
|
||||
RED = colors.HexColor("#dc2626")
|
||||
YELLOW = colors.HexColor("#ca8a04")
|
||||
|
||||
|
||||
def _styles():
|
||||
ss = getSampleStyleSheet()
|
||||
ss.add(ParagraphStyle("Title2", parent=ss["Title"], fontSize=24, textColor=PURPLE, spaceAfter=6))
|
||||
ss.add(ParagraphStyle("Section", parent=ss["Heading2"], fontSize=14, textColor=PURPLE, spaceBefore=12, spaceAfter=6))
|
||||
ss.add(ParagraphStyle("Body2", parent=ss["Normal"], fontSize=10, leading=14, spaceAfter=4))
|
||||
ss.add(ParagraphStyle("Small", parent=ss["Normal"], fontSize=8, textColor=GRAY))
|
||||
return ss
|
||||
|
||||
|
||||
class CompliancePDFGenerator:
|
||||
"""Generates a full compliance status report as PDF."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
def generate(self, tenant_id: str, project_id: str | None = None, language: str = "de") -> tuple[bytes, str]:
|
||||
buf = io.BytesIO()
|
||||
doc = SimpleDocTemplate(buf, pagesize=A4, leftMargin=20 * mm, rightMargin=20 * mm, topMargin=25 * mm, bottomMargin=20 * mm)
|
||||
ss = _styles()
|
||||
story: list = []
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
story.append(Paragraph("Compliance-Report", ss["Title2"]))
|
||||
story.append(Paragraph(f"Stand: {now.strftime('%d.%m.%Y %H:%M')} UTC", ss["Small"]))
|
||||
story.append(Spacer(1, 10 * mm))
|
||||
|
||||
# Company Profile
|
||||
self._add_company_section(story, ss, tenant_id, project_id)
|
||||
# TOM
|
||||
self._add_count_section(story, ss, "TOM (Technisch-Organisatorische Massnahmen)",
|
||||
"compliance_toms", tenant_id)
|
||||
# VVT
|
||||
self._add_count_section(story, ss, "VVT (Verarbeitungstaetigkeiten)",
|
||||
"compliance_vvt_activities", tenant_id)
|
||||
# DSFA
|
||||
self._add_count_section(story, ss, "Datenschutz-Folgenabschaetzungen",
|
||||
"compliance_dsfa_assessments", tenant_id)
|
||||
# Risks
|
||||
self._add_risk_section(story, ss, tenant_id)
|
||||
# Vendors
|
||||
self._add_count_section(story, ss, "Auftragsverarbeiter",
|
||||
"compliance_vendor_assessments", tenant_id)
|
||||
# Incidents
|
||||
self._add_count_section(story, ss, "Datenschutz-Vorfaelle",
|
||||
"compliance_notfallplan_incidents", tenant_id)
|
||||
# Document Reviews
|
||||
self._add_review_section(story, ss, tenant_id)
|
||||
# Banner Consents
|
||||
self._add_consent_section(story, ss, tenant_id)
|
||||
# Org Roles
|
||||
self._add_role_section(story, ss, tenant_id, project_id)
|
||||
# Footer
|
||||
story.append(Spacer(1, 15 * mm))
|
||||
story.append(Paragraph("Erstellt mit BreakPilot Compliance SDK", ss["Small"]))
|
||||
|
||||
doc.build(story)
|
||||
filename = f"compliance-report-{now.strftime('%Y%m%d')}.pdf"
|
||||
return buf.getvalue(), filename
|
||||
|
||||
def _add_company_section(self, story, ss, tid, pid):
|
||||
story.append(Paragraph("Unternehmensprofil", ss["Section"]))
|
||||
try:
|
||||
where = "tenant_id = :tid"
|
||||
params: dict[str, Any] = {"tid": tid}
|
||||
if pid:
|
||||
where += " AND project_id = :pid"
|
||||
params["pid"] = pid
|
||||
row = self.db.execute(text(f"SELECT * FROM compliance_company_profiles WHERE {where} LIMIT 1"), params).fetchone()
|
||||
if row:
|
||||
d = dict(row._mapping)
|
||||
data = [
|
||||
["Feld", "Wert"],
|
||||
["Firma", d.get("company_name", "-")],
|
||||
["Branche", d.get("industry", "-")],
|
||||
["Rechtsform", d.get("legal_form", "-")],
|
||||
["Mitarbeiter", str(d.get("employee_count", "-"))],
|
||||
]
|
||||
t = Table(data, colWidths=[60 * mm, 100 * mm])
|
||||
t.setStyle(TableStyle([
|
||||
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
|
||||
("TEXTCOLOR", (0, 0), (-1, 0), PURPLE),
|
||||
("FONTSIZE", (0, 0), (-1, -1), 9),
|
||||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||||
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
||||
]))
|
||||
story.append(t)
|
||||
else:
|
||||
story.append(Paragraph("Kein Unternehmensprofil hinterlegt.", ss["Body2"]))
|
||||
except Exception as e:
|
||||
story.append(Paragraph(f"Fehler beim Laden: {e}", ss["Small"]))
|
||||
story.append(Spacer(1, 5 * mm))
|
||||
|
||||
def _add_count_section(self, story, ss, title, table_name, tid):
|
||||
story.append(Paragraph(title, ss["Section"]))
|
||||
try:
|
||||
count = self.db.execute(text(f"SELECT COUNT(*) FROM {table_name} WHERE tenant_id = :tid"), {"tid": tid}).scalar()
|
||||
story.append(Paragraph(f"Eintraege: <b>{count or 0}</b>", ss["Body2"]))
|
||||
except Exception:
|
||||
story.append(Paragraph("Tabelle nicht vorhanden oder leer.", ss["Small"]))
|
||||
story.append(Spacer(1, 3 * mm))
|
||||
|
||||
def _add_risk_section(self, story, ss, tid):
|
||||
story.append(Paragraph("Risikobewertung", ss["Section"]))
|
||||
try:
|
||||
q = text("""
|
||||
SELECT severity, COUNT(*) as cnt FROM compliance_risks
|
||||
WHERE tenant_id = :tid GROUP BY severity ORDER BY severity
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tid}).fetchall()
|
||||
if rows:
|
||||
data = [["Schweregrad", "Anzahl"]]
|
||||
for r in rows:
|
||||
data.append([r.severity or "UNKNOWN", str(r.cnt)])
|
||||
t = Table(data, colWidths=[80 * mm, 40 * mm])
|
||||
t.setStyle(TableStyle([
|
||||
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
|
||||
("TEXTCOLOR", (0, 0), (-1, 0), PURPLE),
|
||||
("FONTSIZE", (0, 0), (-1, -1), 9),
|
||||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||||
]))
|
||||
story.append(t)
|
||||
else:
|
||||
story.append(Paragraph("Keine Risiken erfasst.", ss["Body2"]))
|
||||
except Exception:
|
||||
story.append(Paragraph("Risiko-Tabelle nicht vorhanden.", ss["Small"]))
|
||||
story.append(Spacer(1, 3 * mm))
|
||||
|
||||
def _add_review_section(self, story, ss, tid):
|
||||
story.append(Paragraph("Dokumenten-Reviews", ss["Section"]))
|
||||
try:
|
||||
q = text("SELECT status, COUNT(*) as cnt FROM compliance_document_reviews WHERE tenant_id = :tid GROUP BY status")
|
||||
rows = self.db.execute(q, {"tid": tid}).fetchall()
|
||||
if rows:
|
||||
data = [["Status", "Anzahl"]]
|
||||
for r in rows:
|
||||
data.append([r.status, str(r.cnt)])
|
||||
t = Table(data, colWidths=[80 * mm, 40 * mm])
|
||||
t.setStyle(TableStyle([
|
||||
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
|
||||
("FONTSIZE", (0, 0), (-1, -1), 9),
|
||||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||||
]))
|
||||
story.append(t)
|
||||
else:
|
||||
story.append(Paragraph("Keine Reviews vorhanden.", ss["Body2"]))
|
||||
except Exception:
|
||||
story.append(Paragraph("Review-Tabelle nicht vorhanden.", ss["Small"]))
|
||||
story.append(Spacer(1, 3 * mm))
|
||||
|
||||
def _add_consent_section(self, story, ss, tid):
|
||||
story.append(Paragraph("Banner-Consents", ss["Section"]))
|
||||
try:
|
||||
count = self.db.execute(text("SELECT COUNT(*) FROM compliance_banner_consents WHERE tenant_id = :tid"), {"tid": tid}).scalar()
|
||||
story.append(Paragraph(f"Gesamte Consents: <b>{count or 0}</b>", ss["Body2"]))
|
||||
except Exception:
|
||||
story.append(Paragraph("Banner-Tabelle nicht vorhanden.", ss["Small"]))
|
||||
story.append(Spacer(1, 3 * mm))
|
||||
|
||||
def _add_role_section(self, story, ss, tid, pid):
|
||||
story.append(Paragraph("Rollenkonzept", ss["Section"]))
|
||||
try:
|
||||
where = "tenant_id = :tid"
|
||||
params: dict[str, Any] = {"tid": tid}
|
||||
if pid:
|
||||
where += " AND (project_id = :pid OR project_id IS NULL)"
|
||||
params["pid"] = pid
|
||||
rows = self.db.execute(text(f"SELECT role_key, role_label, person_name, person_email FROM compliance_org_roles WHERE {where} ORDER BY role_key"), params).fetchall()
|
||||
if rows:
|
||||
data = [["Rolle", "Name", "E-Mail"]]
|
||||
for r in rows:
|
||||
data.append([r.role_label or r.role_key, r.person_name or "-", r.person_email or "-"])
|
||||
t = Table(data, colWidths=[60 * mm, 50 * mm, 50 * mm])
|
||||
t.setStyle(TableStyle([
|
||||
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
|
||||
("TEXTCOLOR", (0, 0), (-1, 0), PURPLE),
|
||||
("FONTSIZE", (0, 0), (-1, -1), 9),
|
||||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||||
]))
|
||||
story.append(t)
|
||||
else:
|
||||
story.append(Paragraph("Keine Rollen zugewiesen.", ss["Body2"]))
|
||||
except Exception:
|
||||
story.append(Paragraph("Rollen-Tabelle nicht vorhanden.", ss["Small"]))
|
||||
@@ -87,9 +87,10 @@ def compare_services(
|
||||
|
||||
for key, svc in detected_names.items():
|
||||
# Skip CMP — consent managers don't need DSE mention
|
||||
if svc.get("category") == "other" and svc.get("id") == "cmp":
|
||||
if svc.get("category") == "cmp" or (svc.get("category") == "other" and svc.get("id") == "cmp"):
|
||||
continue
|
||||
matched = False
|
||||
# Method 1: Match against LLM-extracted service list
|
||||
for dse_key, dse_svc in dse_names.items():
|
||||
if key == dse_key or _fuzzy_match(svc["name"], dse_svc["name"]):
|
||||
documented.append({"detected": svc, "dse": dse_svc, "status": "ok"})
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
"""
|
||||
DSR Art. 11 Service — handles "data subject not identifiable" rejections.
|
||||
|
||||
Art. 11 Abs. 1 DSGVO: If the controller is unable to identify the data
|
||||
subject, it is not obligated to obtain additional information solely to
|
||||
comply with Art. 15-20 requests.
|
||||
|
||||
Common scenario: Website visitor requests access, but only anonymous
|
||||
cookies/IP-hashes are stored — no way to link to a person.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from compliance.domain import ValidationError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DSRArt11Service:
|
||||
"""Handles Art. 11 DSGVO rejections for non-identifiable data subjects."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self._db = db
|
||||
|
||||
def reject_not_identifiable(
|
||||
self, dsr_id: str, tenant_id: str, notes: str = "",
|
||||
) -> Dict[str, Any]:
|
||||
"""Reject DSR because data subject cannot be identified."""
|
||||
from compliance.db.dsr_models import DSRRequestDB
|
||||
from compliance.services.dsr_workflow_service import _dsr_to_dict, _record_history
|
||||
|
||||
dsr = (
|
||||
self._db.query(DSRRequestDB)
|
||||
.filter(DSRRequestDB.id == dsr_id, DSRRequestDB.tenant_id == tenant_id)
|
||||
.first()
|
||||
)
|
||||
if not dsr:
|
||||
raise ValidationError("DSR not found")
|
||||
if dsr.status in ("completed", "rejected", "cancelled"):
|
||||
raise ValidationError("DSR already closed")
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
reason = (
|
||||
"Die bei uns gespeicherten Daten (anonymisierte Cookies, IP-Hashes, "
|
||||
"Device-Fingerprints) erlauben keine Identifikation der betroffenen Person. "
|
||||
"Gemaess Art. 11 Abs. 1 DSGVO sind wir nicht verpflichtet, zusaetzliche "
|
||||
"Informationen zu erheben, um die betroffene Person zu identifizieren."
|
||||
)
|
||||
if notes:
|
||||
reason += f" Ergaenzung: {notes}"
|
||||
|
||||
_record_history(self._db, dsr, "rejected",
|
||||
comment="Art. 11 DSGVO — Identifikation nicht moeglich")
|
||||
dsr.status = "rejected"
|
||||
dsr.rejection_reason = reason
|
||||
dsr.rejection_legal_basis = "Art. 11 Abs. 1 DSGVO"
|
||||
dsr.identity_verified = False
|
||||
dsr.verification_method = "art11_not_identifiable"
|
||||
dsr.verification_notes = "Daten erlauben keine Identifikation der betroffenen Person"
|
||||
dsr.completed_at = now
|
||||
dsr.updated_at = now
|
||||
self._db.commit()
|
||||
self._db.refresh(dsr)
|
||||
|
||||
# Send rejection notification
|
||||
self._send_art11_notification(dsr)
|
||||
|
||||
return _dsr_to_dict(dsr)
|
||||
|
||||
def _send_art11_notification(self, dsr: Any) -> None:
|
||||
if not dsr.requester_email:
|
||||
return
|
||||
try:
|
||||
from compliance.services.email_delivery_service import EmailDeliveryService
|
||||
delivery = EmailDeliveryService(self._db)
|
||||
variables = {
|
||||
"requester_name": dsr.requester_name or "Antragsteller/in",
|
||||
"reference_number": dsr.request_number or "",
|
||||
"rejection_reason": "Identifikation nicht moeglich — Art. 11 Abs. 1 DSGVO",
|
||||
"legal_basis": "Art. 11 Abs. 1 DSGVO",
|
||||
"sender_name": "Datenschutzbeauftragter",
|
||||
}
|
||||
# Use published dsr_rejection template, fallback to inline
|
||||
delivery.send(
|
||||
tenant_id=str(dsr.tenant_id),
|
||||
template_type="dsr_rejection",
|
||||
recipient=dsr.requester_email,
|
||||
variables=variables,
|
||||
fallback_subject=f"Zu Ihrer Anfrage {dsr.request_number} — Art. 11 DSGVO",
|
||||
fallback_html=f"""<p>Sehr geehrte/r {dsr.requester_name or 'Antragsteller/in'},</p>
|
||||
<p>wir koennen die bei uns gespeicherten Daten keiner identifizierbaren Person zuordnen.
|
||||
Gemaess Art. 11 Abs. 1 DSGVO ist eine Auskunftserteilung nicht moeglich.</p>
|
||||
<p>Mit freundlichen Gruessen<br/>Datenschutzbeauftragter</p>""",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Art. 11 notification failed: %s", e)
|
||||
@@ -0,0 +1,273 @@
|
||||
"""
|
||||
DSR User Data Export Service — aggregates all CMP data about a user.
|
||||
|
||||
Supports Art. 15 (access right, PDF) and Art. 20 (data portability, JSON/CSV).
|
||||
Collects from: Banner Consents, Einwilligungen, Consent Audit Trail, DSR History.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from reportlab.lib import colors
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.lib.units import mm
|
||||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from compliance.services.banner_dsr_service import BannerDSRService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PURPLE = colors.HexColor("#7c3aed")
|
||||
LIGHT_PURPLE = colors.HexColor("#f5f3ff")
|
||||
GRAY = colors.HexColor("#6b7280")
|
||||
|
||||
|
||||
class DSRExportService:
|
||||
"""Aggregates and exports all user data stored in the CMP."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
def aggregate_user_data(self, tenant_id: str, email: str) -> dict[str, Any]:
|
||||
"""Collect ALL data about a user from all CMP sources."""
|
||||
now = datetime.now(timezone.utc)
|
||||
tid = tenant_id # Keep as string — let PostgreSQL cast
|
||||
|
||||
# 1. Banner consents + audit trail
|
||||
banner_data: dict[str, Any] = {"banner_consents": [], "audit_trail": []}
|
||||
try:
|
||||
banner_svc = BannerDSRService(self.db)
|
||||
banner_data = banner_svc.export_for_dsr(tenant_id, email)
|
||||
except Exception as e:
|
||||
logger.warning("Banner DSR export failed: %s", e)
|
||||
try:
|
||||
self.db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2. Einwilligungen (user-based consents)
|
||||
einwilligungen: list[dict] = []
|
||||
try:
|
||||
q = text("""
|
||||
SELECT c.id, c.data_point_id, c.granted, c.granted_at, c.revoked_at,
|
||||
c.consent_version, c.source, c.ip_address, c.user_agent, c.created_at
|
||||
FROM compliance_einwilligungen_consents c
|
||||
WHERE c.tenant_id = CAST(:tid AS VARCHAR) AND c.user_id = :email
|
||||
ORDER BY c.created_at DESC
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tid, "email": email}).fetchall()
|
||||
for r in rows:
|
||||
entry = dict(r._mapping)
|
||||
for k, v in entry.items():
|
||||
if isinstance(v, datetime):
|
||||
entry[k] = v.isoformat()
|
||||
elif isinstance(v, uuid.UUID):
|
||||
entry[k] = str(v)
|
||||
# Get history
|
||||
hist_q = text("""
|
||||
SELECT action, consent_version, ip_address, user_agent, source, created_at
|
||||
FROM compliance_einwilligungen_consent_history
|
||||
WHERE consent_id = :cid ORDER BY created_at
|
||||
""")
|
||||
hist = self.db.execute(hist_q, {"cid": entry["id"]}).fetchall()
|
||||
entry["history"] = [
|
||||
{k: (v.isoformat() if isinstance(v, datetime) else str(v) if isinstance(v, uuid.UUID) else v)
|
||||
for k, v in dict(h._mapping).items()}
|
||||
for h in hist
|
||||
]
|
||||
einwilligungen.append(entry)
|
||||
except Exception as e:
|
||||
logger.warning("Einwilligungen export failed: %s", e)
|
||||
try:
|
||||
self.db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 3. DSR requests by this user
|
||||
dsr_requests: list[dict] = []
|
||||
try:
|
||||
q = text("""
|
||||
SELECT id, request_number, request_type, status, received_at, deadline_at, completed_at
|
||||
FROM compliance_dsr_requests
|
||||
WHERE tenant_id = :tid AND requester_email = :email
|
||||
ORDER BY received_at DESC
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tid, "email": email}).fetchall()
|
||||
for r in rows:
|
||||
entry = dict(r._mapping)
|
||||
for k, v in entry.items():
|
||||
if isinstance(v, datetime):
|
||||
entry[k] = v.isoformat()
|
||||
elif isinstance(v, uuid.UUID):
|
||||
entry[k] = str(v)
|
||||
dsr_requests.append(entry)
|
||||
except Exception as e:
|
||||
logger.warning("DSR requests export failed: %s", e)
|
||||
try:
|
||||
self.db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"export_date": now.isoformat(),
|
||||
"data_subject": {"email": email},
|
||||
"banner_consents": banner_data.get("banner_consents", []),
|
||||
"consent_audit_trail": banner_data.get("audit_trail", []),
|
||||
"einwilligungen": einwilligungen,
|
||||
"dsr_requests": dsr_requests,
|
||||
"metadata": {
|
||||
"tenant_id": tenant_id,
|
||||
"data_categories": ["Banner-Consents", "Einwilligungen", "Audit-Trail", "DSR-Anfragen"],
|
||||
"legal_basis": "Art. 15 / Art. 20 DSGVO",
|
||||
},
|
||||
}
|
||||
|
||||
def export_json(self, tenant_id: str, email: str) -> tuple[bytes, str]:
|
||||
data = self.aggregate_user_data(tenant_id, email)
|
||||
data["metadata"]["export_format"] = "json"
|
||||
content = json.dumps(data, indent=2, ensure_ascii=False, default=str).encode("utf-8")
|
||||
return content, f"dsr-export-{email.split('@')[0]}.json"
|
||||
|
||||
def export_csv(self, tenant_id: str, email: str) -> tuple[bytes, str]:
|
||||
data = self.aggregate_user_data(tenant_id, email)
|
||||
buf = io.StringIO()
|
||||
writer = csv.writer(buf)
|
||||
writer.writerow(["Kategorie", "Schluessel", "Wert", "Zeitpunkt", "Quelle"])
|
||||
|
||||
# Banner consents
|
||||
for c in data.get("banner_consents", []):
|
||||
writer.writerow(["Banner-Consent", "site_id", c.get("site_id", ""), c.get("created_at", ""), "CMP"])
|
||||
writer.writerow(["Banner-Consent", "categories", ", ".join(c.get("categories", [])), c.get("updated_at", ""), "CMP"])
|
||||
writer.writerow(["Banner-Consent", "ip_hash", c.get("ip_hash", ""), c.get("created_at", ""), "CMP"])
|
||||
|
||||
# Audit trail
|
||||
for a in data.get("consent_audit_trail", []):
|
||||
writer.writerow(["Audit-Trail", a.get("action", ""), ", ".join(a.get("categories", [])), a.get("created_at", ""), "CMP"])
|
||||
|
||||
# Einwilligungen
|
||||
for e in data.get("einwilligungen", []):
|
||||
status = "Erteilt" if e.get("granted") else "Widerrufen"
|
||||
writer.writerow(["Einwilligung", e.get("data_point_id", ""), status, e.get("granted_at", ""), e.get("source", "")])
|
||||
|
||||
# DSR requests
|
||||
for d in data.get("dsr_requests", []):
|
||||
writer.writerow(["DSR-Anfrage", d.get("request_type", ""), d.get("status", ""), d.get("received_at", ""), ""])
|
||||
|
||||
content = buf.getvalue().encode("utf-8-sig") # BOM for Excel
|
||||
return content, f"dsr-export-{email.split('@')[0]}.csv"
|
||||
|
||||
def export_pdf(self, tenant_id: str, email: str) -> tuple[bytes, str]:
|
||||
data = self.aggregate_user_data(tenant_id, email)
|
||||
buf = io.BytesIO()
|
||||
doc = SimpleDocTemplate(buf, pagesize=A4, leftMargin=20 * mm, rightMargin=20 * mm, topMargin=25 * mm, bottomMargin=20 * mm)
|
||||
ss = getSampleStyleSheet()
|
||||
ss.add(ParagraphStyle("Title2", parent=ss["Title"], fontSize=20, textColor=PURPLE, spaceAfter=6))
|
||||
ss.add(ParagraphStyle("Section", parent=ss["Heading2"], fontSize=13, textColor=PURPLE, spaceBefore=10))
|
||||
ss.add(ParagraphStyle("Body2", parent=ss["Normal"], fontSize=9, leading=13))
|
||||
ss.add(ParagraphStyle("Small", parent=ss["Normal"], fontSize=8, textColor=GRAY))
|
||||
story: list = []
|
||||
|
||||
# Cover
|
||||
story.append(Paragraph("Datenauskunft gemaess Art. 15 DSGVO", ss["Title2"]))
|
||||
story.append(Paragraph(f"Betroffene Person: {email}", ss["Body2"]))
|
||||
story.append(Paragraph(f"Erstellt am: {data['export_date'][:10]}", ss["Small"]))
|
||||
story.append(Spacer(1, 8 * mm))
|
||||
|
||||
tbl_style = TableStyle([
|
||||
("BACKGROUND", (0, 0), (-1, 0), LIGHT_PURPLE),
|
||||
("TEXTCOLOR", (0, 0), (-1, 0), PURPLE),
|
||||
("FONTSIZE", (0, 0), (-1, -1), 8),
|
||||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||||
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
||||
("TOPPADDING", (0, 0), (-1, -1), 3),
|
||||
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
|
||||
])
|
||||
|
||||
# Section 1: Banner Consents
|
||||
consents = data.get("banner_consents", [])
|
||||
story.append(Paragraph(f"1. Banner-Consents ({len(consents)})", ss["Section"]))
|
||||
if consents:
|
||||
rows = [["Site", "Kategorien", "IP-Hash", "Erstellt", "Aktualisiert"]]
|
||||
for c in consents:
|
||||
rows.append([
|
||||
str(c.get("site_id", "")),
|
||||
", ".join(c.get("categories", [])),
|
||||
str(c.get("ip_hash", ""))[:12] + "...",
|
||||
str(c.get("created_at", ""))[:10],
|
||||
str(c.get("updated_at", ""))[:10],
|
||||
])
|
||||
t = Table(rows, colWidths=[30 * mm, 40 * mm, 30 * mm, 25 * mm, 25 * mm])
|
||||
t.setStyle(tbl_style)
|
||||
story.append(t)
|
||||
else:
|
||||
story.append(Paragraph("Keine Banner-Consents gespeichert.", ss["Body2"]))
|
||||
|
||||
# Section 2: Einwilligungen
|
||||
einw = data.get("einwilligungen", [])
|
||||
story.append(Paragraph(f"2. Einwilligungen ({len(einw)})", ss["Section"]))
|
||||
if einw:
|
||||
rows = [["Datenpunkt", "Status", "Erteilt am", "Widerrufen am", "IP-Adresse"]]
|
||||
for e in einw:
|
||||
rows.append([
|
||||
str(e.get("data_point_id", "")),
|
||||
"Erteilt" if e.get("granted") else "Widerrufen",
|
||||
str(e.get("granted_at", ""))[:10],
|
||||
str(e.get("revoked_at", ""))[:10] if e.get("revoked_at") else "-",
|
||||
str(e.get("ip_address", ""))[:15] if e.get("ip_address") else "-",
|
||||
])
|
||||
t = Table(rows, colWidths=[35 * mm, 25 * mm, 25 * mm, 25 * mm, 35 * mm])
|
||||
t.setStyle(tbl_style)
|
||||
story.append(t)
|
||||
else:
|
||||
story.append(Paragraph("Keine Einwilligungen gespeichert.", ss["Body2"]))
|
||||
|
||||
# Section 3: Audit Trail
|
||||
trail = data.get("consent_audit_trail", [])
|
||||
story.append(Paragraph(f"3. Consent-Audit-Trail ({len(trail)})", ss["Section"]))
|
||||
if trail:
|
||||
rows = [["Aktion", "Kategorien", "Datum"]]
|
||||
for a in trail[:50]: # Limit to 50 for PDF
|
||||
rows.append([
|
||||
str(a.get("action", "")),
|
||||
", ".join(a.get("categories", [])),
|
||||
str(a.get("created_at", ""))[:19],
|
||||
])
|
||||
t = Table(rows, colWidths=[40 * mm, 60 * mm, 45 * mm])
|
||||
t.setStyle(tbl_style)
|
||||
story.append(t)
|
||||
if len(trail) > 50:
|
||||
story.append(Paragraph(f"... und {len(trail) - 50} weitere Eintraege (im JSON-Export enthalten)", ss["Small"]))
|
||||
else:
|
||||
story.append(Paragraph("Kein Audit-Trail vorhanden.", ss["Body2"]))
|
||||
|
||||
# Section 4: DSR Requests
|
||||
dsrs = data.get("dsr_requests", [])
|
||||
story.append(Paragraph(f"4. Bisherige DSR-Anfragen ({len(dsrs)})", ss["Section"]))
|
||||
if dsrs:
|
||||
rows = [["Typ", "Status", "Eingegangen", "Abgeschlossen"]]
|
||||
for d in dsrs:
|
||||
rows.append([
|
||||
str(d.get("request_type", "")),
|
||||
str(d.get("status", "")),
|
||||
str(d.get("received_at", ""))[:10],
|
||||
str(d.get("completed_at", ""))[:10] if d.get("completed_at") else "-",
|
||||
])
|
||||
t = Table(rows, colWidths=[35 * mm, 30 * mm, 35 * mm, 35 * mm])
|
||||
t.setStyle(tbl_style)
|
||||
story.append(t)
|
||||
|
||||
# Footer
|
||||
story.append(Spacer(1, 15 * mm))
|
||||
story.append(Paragraph("Erstellt mit BreakPilot Compliance SDK | Art. 15 DSGVO Datenauskunft", ss["Small"]))
|
||||
|
||||
doc.build(story)
|
||||
return buf.getvalue(), f"dsr-export-{email.split('@')[0]}.pdf"
|
||||
@@ -0,0 +1,122 @@
|
||||
"""
|
||||
Email Template Delivery Service — the missing integration layer.
|
||||
|
||||
Combines: template loading → published version → variable rendering → SMTP → audit log.
|
||||
Used by DSR workflow, document reviews, and other modules that need to send
|
||||
templated emails.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from compliance.db.email_template_models import (
|
||||
EmailSendLogDB,
|
||||
EmailTemplateDB,
|
||||
EmailTemplateVersionDB,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _render(html: str, variables: dict[str, str]) -> str:
|
||||
"""Replace {{variable}} placeholders with values."""
|
||||
result = html
|
||||
for key, value in variables.items():
|
||||
result = result.replace(f"{{{{{key}}}}}", str(value))
|
||||
return result
|
||||
|
||||
|
||||
class EmailDeliveryService:
|
||||
"""Load template → render → send via SMTP → log."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
def get_published_version(
|
||||
self, tenant_id: str, template_type: str,
|
||||
) -> Optional[EmailTemplateVersionDB]:
|
||||
"""Get the latest published version of a template by type."""
|
||||
tid = uuid.UUID(tenant_id)
|
||||
template = (
|
||||
self.db.query(EmailTemplateDB)
|
||||
.filter(EmailTemplateDB.tenant_id == tid, EmailTemplateDB.template_type == template_type)
|
||||
.first()
|
||||
)
|
||||
if not template:
|
||||
return None
|
||||
return (
|
||||
self.db.query(EmailTemplateVersionDB)
|
||||
.filter(
|
||||
EmailTemplateVersionDB.template_id == template.id,
|
||||
EmailTemplateVersionDB.status == "published",
|
||||
)
|
||||
.order_by(EmailTemplateVersionDB.created_at.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
def send(
|
||||
self,
|
||||
tenant_id: str,
|
||||
template_type: str,
|
||||
recipient: str,
|
||||
variables: dict[str, str],
|
||||
fallback_subject: Optional[str] = None,
|
||||
fallback_html: Optional[str] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Send a templated email. Falls back to inline HTML if no published template.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant UUID string.
|
||||
template_type: E.g. 'dsr_receipt', 'dsr_completion'.
|
||||
recipient: Email address.
|
||||
variables: Dict of {{key}}: value for rendering.
|
||||
fallback_subject: Subject if no template found.
|
||||
fallback_html: HTML body if no template found.
|
||||
"""
|
||||
from compliance.services.smtp_sender import send_email
|
||||
|
||||
tid = uuid.UUID(tenant_id)
|
||||
version = self.get_published_version(tenant_id, template_type)
|
||||
|
||||
if version:
|
||||
subject = _render(version.subject, variables)
|
||||
body_html = _render(version.body_html, variables)
|
||||
version_id = version.id
|
||||
elif fallback_subject and fallback_html:
|
||||
subject = _render(fallback_subject, variables)
|
||||
body_html = _render(fallback_html, variables)
|
||||
version_id = None
|
||||
else:
|
||||
logger.warning("No published template for '%s' and no fallback provided", template_type)
|
||||
return {"success": False, "error": f"No template for {template_type}"}
|
||||
|
||||
result = send_email(recipient=recipient, subject=subject, body_html=body_html)
|
||||
|
||||
# Audit log
|
||||
try:
|
||||
log = EmailSendLogDB(
|
||||
tenant_id=tid,
|
||||
template_type=template_type,
|
||||
version_id=version_id,
|
||||
recipient=recipient,
|
||||
subject=subject,
|
||||
status=result.get("status", "unknown"),
|
||||
variables=variables,
|
||||
error_message=result.get("error"),
|
||||
)
|
||||
self.db.add(log)
|
||||
self.db.commit()
|
||||
except Exception as e:
|
||||
logger.warning("Failed to log email send: %s", e)
|
||||
|
||||
return {
|
||||
"success": result.get("status") == "sent",
|
||||
"template_type": template_type,
|
||||
"recipient": recipient,
|
||||
"subject": subject,
|
||||
"used_template": version is not None,
|
||||
"status": result.get("status"),
|
||||
}
|
||||
@@ -0,0 +1,179 @@
|
||||
"""
|
||||
Intake Extractor — derives UCCA intake flags from DETECTED SERVICES,
|
||||
not from website text content.
|
||||
|
||||
The actual data processing happens through APIs, scripts, and cookies —
|
||||
NOT through visible text on the page. A news website reporting about
|
||||
healthcare does NOT process health data.
|
||||
|
||||
Flags are derived deterministically from:
|
||||
1. Which third-party services are embedded (Google Analytics → tracking)
|
||||
2. Which payment providers are used (Stripe → payment_data)
|
||||
3. Which CDN/fonts are loaded (Google Fonts → cross_border_transfer)
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Service category → intake flags mapping
|
||||
# This is the ONLY source of truth for what a service implies
|
||||
SERVICE_TO_FLAGS: dict[str, dict[str, bool]] = {
|
||||
# Tracking & Analytics → personal_data + tracking
|
||||
"tracking": {
|
||||
"personal_data": True,
|
||||
"tracking": True,
|
||||
},
|
||||
# Marketing → marketing + tracking + third_party_sharing
|
||||
"marketing": {
|
||||
"personal_data": True,
|
||||
"tracking": True,
|
||||
"marketing": True,
|
||||
"third_party_sharing": True,
|
||||
},
|
||||
# Heatmap/Session Recording → tracking + profiling
|
||||
"heatmap": {
|
||||
"personal_data": True,
|
||||
"tracking": True,
|
||||
"profiling": True,
|
||||
},
|
||||
# Payment → payment_data
|
||||
"payment": {
|
||||
"personal_data": True,
|
||||
"payment_data": True,
|
||||
},
|
||||
# Chatbot → personal_data (user sends messages)
|
||||
"chatbot": {
|
||||
"personal_data": True,
|
||||
"customer_data": True,
|
||||
},
|
||||
# CRM → customer_data + profiling
|
||||
"crm": {
|
||||
"personal_data": True,
|
||||
"customer_data": True,
|
||||
"profiling": True,
|
||||
},
|
||||
# CDN from non-EU → cross_border_transfer (IP sent to US)
|
||||
"cdn": {
|
||||
"personal_data": True,
|
||||
},
|
||||
}
|
||||
|
||||
# Specific services with special flags
|
||||
SPECIFIC_SERVICE_FLAGS: dict[str, dict[str, bool]] = {
|
||||
"klarna": {"automated_decisions": True, "payment_data": True},
|
||||
"paypal": {"cross_border_transfer": True, "payment_data": True},
|
||||
"stripe": {"cross_border_transfer": True, "payment_data": True},
|
||||
"google_analytics": {"cross_border_transfer": True, "tracking": True},
|
||||
"facebook_pixel": {"cross_border_transfer": True, "marketing": True, "profiling": True},
|
||||
"hotjar": {"profiling": True, "tracking": True},
|
||||
"ms_clarity": {"cross_border_transfer": True, "profiling": True},
|
||||
"tiktok_pixel": {"cross_border_transfer": True, "marketing": True},
|
||||
"intercom": {"cross_border_transfer": True, "ai_usage": True},
|
||||
}
|
||||
|
||||
|
||||
def extract_intake_flags_from_services(detected_services: list[dict]) -> dict:
|
||||
"""Derive intake flags from detected third-party services.
|
||||
|
||||
This is deterministic and 100% accurate — if Google Analytics is
|
||||
embedded, tracking IS happening. No guessing needed.
|
||||
"""
|
||||
flags = {
|
||||
"personal_data": False,
|
||||
"customer_data": False,
|
||||
"payment_data": False,
|
||||
"location_data": False,
|
||||
"biometric_data": False,
|
||||
"minor_data": False,
|
||||
"health_data": False,
|
||||
"marketing": False,
|
||||
"profiling": False,
|
||||
"automated_decisions": False,
|
||||
"third_party_sharing": False,
|
||||
"cross_border_transfer": False,
|
||||
"tracking": False,
|
||||
"ai_usage": False,
|
||||
}
|
||||
|
||||
for svc in detected_services:
|
||||
category = svc.get("category", "other")
|
||||
service_id = svc.get("id", "")
|
||||
eu_adequate = svc.get("eu_adequate", True)
|
||||
|
||||
# Apply category-level flags
|
||||
cat_flags = SERVICE_TO_FLAGS.get(category, {})
|
||||
for key, value in cat_flags.items():
|
||||
if value:
|
||||
flags[key] = True
|
||||
|
||||
# Apply service-specific flags
|
||||
svc_flags = SPECIFIC_SERVICE_FLAGS.get(service_id, {})
|
||||
for key, value in svc_flags.items():
|
||||
if value:
|
||||
flags[key] = True
|
||||
|
||||
# Non-EU service → cross_border_transfer
|
||||
if not eu_adequate:
|
||||
flags["cross_border_transfer"] = True
|
||||
flags["third_party_sharing"] = True
|
||||
|
||||
# Any website with detected services processes personal data (IP at minimum)
|
||||
if detected_services:
|
||||
flags["personal_data"] = True
|
||||
|
||||
active = {k: v for k, v in flags.items() if v}
|
||||
logger.info("Intake flags from %d services: %s", len(detected_services), active)
|
||||
return flags
|
||||
|
||||
|
||||
# Keep backward compatibility
|
||||
async def extract_intake_flags(text: str) -> dict:
|
||||
"""DEPRECATED — use extract_intake_flags_from_services() instead.
|
||||
|
||||
This function used LLM to guess flags from text content.
|
||||
Text content does NOT represent actual data processing.
|
||||
"""
|
||||
logger.warning(
|
||||
"extract_intake_flags(text) called — DEPRECATED. "
|
||||
"Use extract_intake_flags_from_services(detected_services) instead."
|
||||
)
|
||||
# Return minimal flags — website exists = personal_data (IP)
|
||||
return {"personal_data": True, "tracking": False}
|
||||
|
||||
|
||||
def flags_to_ucca_intake(flags: dict) -> dict:
|
||||
"""Convert extracted flags to UCCA intake format."""
|
||||
return {
|
||||
"data_types": {
|
||||
"personal_data": flags.get("personal_data", False),
|
||||
"customer_data": flags.get("customer_data", False),
|
||||
"location_data": flags.get("location_data", False),
|
||||
"biometric_data": flags.get("biometric_data", False),
|
||||
"minor_data": flags.get("minor_data", False),
|
||||
"images": False,
|
||||
"audio": False,
|
||||
"financial_data": flags.get("payment_data", False),
|
||||
"employee_data": False,
|
||||
"article_9_data": flags.get("health_data", False) or flags.get("biometric_data", False),
|
||||
},
|
||||
"purpose": {
|
||||
"marketing": flags.get("marketing", False),
|
||||
"analytics": flags.get("tracking", False),
|
||||
"profiling": flags.get("profiling", False),
|
||||
"automation": flags.get("ai_usage", False),
|
||||
"customer_support": False,
|
||||
"evaluation_scoring": flags.get("automated_decisions", False),
|
||||
"decision_making": flags.get("automated_decisions", False),
|
||||
},
|
||||
"automation": "fully_automated" if flags.get("automated_decisions") else
|
||||
"partially_automated" if flags.get("ai_usage") else "manual",
|
||||
"outputs": {
|
||||
"recommendations_to_users": flags.get("profiling", False),
|
||||
"data_export": flags.get("cross_border_transfer", False),
|
||||
"legal_effects": flags.get("automated_decisions", False),
|
||||
},
|
||||
"hosting": {
|
||||
"region": "non_eu" if flags.get("cross_border_transfer") else "eu",
|
||||
},
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
"""
|
||||
Control Relevance Filter — filters out controls that are not relevant
|
||||
for the analyzed document based on keyword matching.
|
||||
|
||||
Prevents false positives like C_TRANSPARENCY being recommended when
|
||||
no AI usage is evident.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Top controls with their relevance conditions.
|
||||
# A control is only relevant if ANY keyword from 'requires_any' matches the text.
|
||||
# If 'requires_any' is empty, the control is always relevant.
|
||||
CONTROL_RELEVANCE: dict[str, dict] = {
|
||||
"C_TRANSPARENCY": {
|
||||
"description": "KI-Transparenz-Hinweis (Art. 52 AI Act)",
|
||||
"requires_any": [
|
||||
"künstliche intelligenz", "kuenstliche intelligenz",
|
||||
"artificial intelligence", "machine learning", "maschinelles lernen",
|
||||
"ki-gestützt", "ki-gestuetzt", "ai-powered", "ai system",
|
||||
"chatbot", "neural", "deep learning", "algorithmus", "algorithmen",
|
||||
"automatisierte entscheidung", "automated decision",
|
||||
],
|
||||
"reason": "Nur relevant wenn KI/ML tatsaechlich eingesetzt wird",
|
||||
},
|
||||
"C_DSFA_REQUIRED": {
|
||||
"description": "Datenschutz-Folgenabschaetzung durchfuehren",
|
||||
"requires_any": [
|
||||
"gesundheit", "biometrisch", "genetisch", "health", "biometric",
|
||||
"scoring", "profiling", "systematisch", "umfangreich",
|
||||
"videoüberwachung", "videoueberwachung", "kamera",
|
||||
"minderjährig", "minderjaehrig", "kinder",
|
||||
],
|
||||
"reason": "Nur bei hohem Risiko (Art. 9 Daten, Profiling, Ueberwachung)",
|
||||
},
|
||||
"C_ART22_INFO": {
|
||||
"description": "Info ueber automatisierte Einzelentscheidung (Art. 22 DSGVO)",
|
||||
"requires_any": [
|
||||
"automatisierte entscheidung", "automated decision", "scoring",
|
||||
"bonitaet", "kredit", "rating", "algorithmische entscheidung",
|
||||
"profiling", "klarna", "ratenzahlung",
|
||||
],
|
||||
"reason": "Nur bei automatisierten Einzelentscheidungen mit Rechtswirkung",
|
||||
},
|
||||
"C_DPO_REQUIRED": {
|
||||
"description": "Datenschutzbeauftragten bestellen",
|
||||
"requires_any": [], # Always relevant — empty means no filter
|
||||
"reason": "Generell relevant fuer Unternehmen",
|
||||
},
|
||||
"C_EXPLICIT_CONSENT": {
|
||||
"description": "Explizite Einwilligung einholen",
|
||||
"requires_any": [
|
||||
"cookie", "tracking", "analytics", "pixel", "marketing",
|
||||
"werbung", "newsletter", "remarketing", "retargeting",
|
||||
"einwilligung", "consent", "opt-in",
|
||||
],
|
||||
"reason": "Nur bei Tracking/Marketing das Einwilligung erfordert",
|
||||
},
|
||||
"C_CHILD_PROTECTION": {
|
||||
"description": "Besonderer Schutz fuer Minderdjaehrige",
|
||||
"requires_any": [
|
||||
"kinder", "minderjährig", "minderjaehrig", "jugend",
|
||||
"under 16", "unter 16", "schüler", "schueler", "child",
|
||||
],
|
||||
"reason": "Nur wenn Daten von Minderjaehrigen verarbeitet werden",
|
||||
},
|
||||
"C_THIRD_COUNTRY_SAFEGUARDS": {
|
||||
"description": "Drittlandtransfer absichern (Art. 44-49 DSGVO)",
|
||||
"requires_any": [
|
||||
"usa", "united states", "drittland", "drittst", "third countr",
|
||||
"standardvertragsklausel", "sccs", "binding corporate",
|
||||
"angemessenheitsbeschluss", "adequacy",
|
||||
"google", "meta", "facebook", "amazon", "microsoft", "apple",
|
||||
"cloudflare", "stripe", "paypal",
|
||||
],
|
||||
"reason": "Nur bei Datentransfer in Drittlaender",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def filter_controls(
|
||||
controls: list[str],
|
||||
source_text: str,
|
||||
intake_flags: dict | None = None,
|
||||
) -> list[str]:
|
||||
"""Filter controls based on relevance to the analyzed text.
|
||||
|
||||
Returns only controls that are relevant (keyword match or no filter defined).
|
||||
"""
|
||||
if not controls:
|
||||
return controls
|
||||
|
||||
text_lower = source_text.lower()
|
||||
filtered = []
|
||||
removed = []
|
||||
|
||||
for control in controls:
|
||||
# Extract control ID from string like "[C_TRANSPARENCY] Nutzer informieren..."
|
||||
control_id = _extract_control_id(control)
|
||||
|
||||
if control_id and control_id in CONTROL_RELEVANCE:
|
||||
rules = CONTROL_RELEVANCE[control_id]
|
||||
keywords = rules["requires_any"]
|
||||
|
||||
if not keywords:
|
||||
# No filter = always relevant
|
||||
filtered.append(control)
|
||||
continue
|
||||
|
||||
# Check if any keyword matches
|
||||
if any(kw in text_lower for kw in keywords):
|
||||
filtered.append(control)
|
||||
else:
|
||||
# Also check intake flags as fallback
|
||||
if intake_flags and _check_flags(control_id, intake_flags):
|
||||
filtered.append(control)
|
||||
else:
|
||||
removed.append((control_id, rules["reason"]))
|
||||
else:
|
||||
# Unknown control — keep it (don't filter what we don't understand)
|
||||
filtered.append(control)
|
||||
|
||||
if removed:
|
||||
logger.info(
|
||||
"Relevance filter removed %d controls: %s",
|
||||
len(removed),
|
||||
", ".join(f"{cid} ({reason})" for cid, reason in removed),
|
||||
)
|
||||
|
||||
return filtered
|
||||
|
||||
|
||||
def _extract_control_id(control: str) -> str | None:
|
||||
"""Extract control ID from '[C_XXX] description' format."""
|
||||
match = re.match(r"\[([A-Z_0-9]+)\]", control)
|
||||
return match.group(1) if match else None
|
||||
|
||||
|
||||
def _check_flags(control_id: str, flags: dict) -> bool:
|
||||
"""Check if intake flags make a control relevant."""
|
||||
flag_map = {
|
||||
"C_TRANSPARENCY": flags.get("ai_usage", False),
|
||||
"C_DSFA_REQUIRED": flags.get("health_data", False) or flags.get("biometric_data", False),
|
||||
"C_ART22_INFO": flags.get("automated_decisions", False),
|
||||
"C_EXPLICIT_CONSENT": flags.get("tracking", False) or flags.get("marketing", False),
|
||||
"C_CHILD_PROTECTION": flags.get("minor_data", False),
|
||||
"C_THIRD_COUNTRY_SAFEGUARDS": flags.get("cross_border_transfer", False),
|
||||
}
|
||||
return flag_map.get(control_id, False)
|
||||
@@ -0,0 +1,209 @@
|
||||
"""
|
||||
TCF 2.2 TC String Encoder — generates IAB Transparency & Consent strings.
|
||||
|
||||
Implements the TC String v2.2 format per IAB specification.
|
||||
The TC String is a base64url-encoded bitfield containing:
|
||||
- CMP metadata (ID, version, screen, consent language)
|
||||
- Purpose consents (12 standard IAB purposes)
|
||||
- Vendor consents (per IAB vendor ID)
|
||||
- Legitimate interest signals
|
||||
|
||||
Reference: https://github.com/InteractiveAdvertisingBureau/GDPR-Transparency-and-Consent-Framework
|
||||
|
||||
NOTE: This is a simplified encoder for CMP integration. For full GVL
|
||||
(Global Vendor List) support, integrate with the IAB GVL API.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
|
||||
# IAB TCF 2.2 Standard Purposes
|
||||
IAB_PURPOSES = {
|
||||
1: {"name": "Store and/or access information on a device", "name_de": "Informationen auf Geraet speichern/abrufen"},
|
||||
2: {"name": "Select basic ads", "name_de": "Einfache Anzeigen auswaehlen"},
|
||||
3: {"name": "Create a personalised ads profile", "name_de": "Personalisiertes Anzeigenprofil erstellen"},
|
||||
4: {"name": "Select personalised ads", "name_de": "Personalisierte Anzeigen auswaehlen"},
|
||||
5: {"name": "Create a personalised content profile", "name_de": "Personalisiertes Inhaltsprofil erstellen"},
|
||||
6: {"name": "Select personalised content", "name_de": "Personalisierte Inhalte auswaehlen"},
|
||||
7: {"name": "Measure ad performance", "name_de": "Anzeigen-Leistung messen"},
|
||||
8: {"name": "Measure content performance", "name_de": "Inhalte-Leistung messen"},
|
||||
9: {"name": "Apply market research to generate audience insights", "name_de": "Marktforschung fuer Zielgruppen"},
|
||||
10: {"name": "Develop and improve products", "name_de": "Produkte entwickeln und verbessern"},
|
||||
11: {"name": "Use limited data to select content", "name_de": "Eingeschraenkte Daten fuer Inhalte nutzen"},
|
||||
12: {"name": "Use limited data to select ads", "name_de": "Eingeschraenkte Daten fuer Anzeigen nutzen"},
|
||||
}
|
||||
|
||||
# IAB Special Features
|
||||
IAB_SPECIAL_FEATURES = {
|
||||
1: {"name": "Use precise geolocation data", "name_de": "Praezise Standortdaten verwenden"},
|
||||
2: {"name": "Actively scan device characteristics for identification", "name_de": "Geraetemerkmale aktiv scannen"},
|
||||
}
|
||||
|
||||
# Category-to-Purpose mapping (how our banner categories map to IAB purposes)
|
||||
CATEGORY_PURPOSE_MAP = {
|
||||
"necessary": [], # No consent needed
|
||||
"functional": [1, 11], # Device access + limited data for content
|
||||
"statistics": [1, 7, 8, 9, 10], # Device access + measurement + research
|
||||
"marketing": [1, 2, 3, 4, 5, 6, 7, 12], # Most purposes
|
||||
}
|
||||
|
||||
|
||||
def _int_to_bits(value: int, length: int) -> str:
|
||||
"""Convert integer to fixed-length bit string."""
|
||||
return bin(value)[2:].zfill(length)
|
||||
|
||||
|
||||
def _datetime_to_deciseconds(dt: datetime) -> int:
|
||||
"""Convert datetime to deciseconds since epoch (IAB format)."""
|
||||
epoch = datetime(2000, 1, 1, tzinfo=timezone.utc)
|
||||
return int((dt - epoch).total_seconds() * 10)
|
||||
|
||||
|
||||
def _bits_to_base64url(bits: str) -> str:
|
||||
"""Convert bit string to base64url encoding (TC String format)."""
|
||||
# Pad to multiple of 8
|
||||
padding = (8 - len(bits) % 8) % 8
|
||||
bits += "0" * padding
|
||||
# Convert to bytes
|
||||
byte_array = bytearray()
|
||||
for i in range(0, len(bits), 8):
|
||||
byte_array.append(int(bits[i:i+8], 2))
|
||||
# Base64url encode (no padding)
|
||||
return base64.urlsafe_b64encode(bytes(byte_array)).rstrip(b"=").decode("ascii")
|
||||
|
||||
|
||||
class TCFEncoderService:
|
||||
"""Generates TC Strings per IAB TCF 2.2 specification."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cmp_id: int = 1,
|
||||
cmp_version: int = 1,
|
||||
consent_screen: int = 1,
|
||||
consent_language: str = "DE",
|
||||
):
|
||||
self.cmp_id = cmp_id
|
||||
self.cmp_version = cmp_version
|
||||
self.consent_screen = consent_screen
|
||||
self.consent_language = consent_language
|
||||
|
||||
def encode(
|
||||
self,
|
||||
purpose_consents: dict[int, bool],
|
||||
vendor_consents: dict[int, bool],
|
||||
purpose_li: dict[int, bool] | None = None,
|
||||
special_features: dict[int, bool] | None = None,
|
||||
) -> str:
|
||||
"""Generate a TC String from consent decisions.
|
||||
|
||||
Args:
|
||||
purpose_consents: {purpose_id: True/False} for purposes 1-12
|
||||
vendor_consents: {vendor_id: True/False} for IAB vendor IDs
|
||||
purpose_li: Legitimate interest signals per purpose
|
||||
special_features: Special feature opt-ins
|
||||
Returns:
|
||||
Base64url-encoded TC String
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
created = _datetime_to_deciseconds(now)
|
||||
updated = created
|
||||
|
||||
bits = ""
|
||||
# Core TC String v2 fields
|
||||
bits += _int_to_bits(2, 6) # Version (6 bits) = 2
|
||||
bits += _int_to_bits(created, 36) # Created (36 bits)
|
||||
bits += _int_to_bits(updated, 36) # LastUpdated (36 bits)
|
||||
bits += _int_to_bits(self.cmp_id, 12) # CmpId (12 bits)
|
||||
bits += _int_to_bits(self.cmp_version, 12) # CmpVersion (12 bits)
|
||||
bits += _int_to_bits(self.consent_screen, 6) # ConsentScreen (6 bits)
|
||||
|
||||
# ConsentLanguage (12 bits = 2 × 6-bit letters)
|
||||
lang = self.consent_language.upper()[:2]
|
||||
bits += _int_to_bits(ord(lang[0]) - ord("A"), 6)
|
||||
bits += _int_to_bits(ord(lang[1]) - ord("A"), 6)
|
||||
|
||||
# VendorListVersion (12 bits) — use 0 if not fetching GVL
|
||||
bits += _int_to_bits(0, 12)
|
||||
# TcfPolicyVersion (6 bits) = 4 for TCF 2.2
|
||||
bits += _int_to_bits(4, 6)
|
||||
# IsServiceSpecific (1 bit) = 1
|
||||
bits += "1"
|
||||
# UseNonStandardTexts (1 bit) = 0
|
||||
bits += "0"
|
||||
|
||||
# SpecialFeatureOptIns (12 bits)
|
||||
sf = special_features or {}
|
||||
for i in range(1, 13):
|
||||
bits += "1" if sf.get(i, False) else "0"
|
||||
|
||||
# PurposesConsent (24 bits)
|
||||
for i in range(1, 25):
|
||||
bits += "1" if purpose_consents.get(i, False) else "0"
|
||||
|
||||
# PurposesLITransparency (24 bits)
|
||||
li = purpose_li or {}
|
||||
for i in range(1, 25):
|
||||
bits += "1" if li.get(i, False) else "0"
|
||||
|
||||
# Purpose one treatment (1 bit) = 0, PublisherCC (12 bits) = DE
|
||||
bits += "0"
|
||||
bits += _int_to_bits(ord("D") - ord("A"), 6)
|
||||
bits += _int_to_bits(ord("E") - ord("A"), 6)
|
||||
|
||||
# Vendor consents — Range encoding
|
||||
max_vendor = max(vendor_consents.keys()) if vendor_consents else 0
|
||||
bits += _int_to_bits(max_vendor, 16) # MaxVendorId
|
||||
# Use bitfield encoding (simpler than range)
|
||||
bits += "0" # IsRangeEncoding = 0 (bitfield)
|
||||
for i in range(1, max_vendor + 1):
|
||||
bits += "1" if vendor_consents.get(i, False) else "0"
|
||||
|
||||
# Vendor legitimate interests (same pattern)
|
||||
bits += _int_to_bits(max_vendor, 16)
|
||||
bits += "0"
|
||||
for i in range(1, max_vendor + 1):
|
||||
bits += "1" if vendor_consents.get(i, False) else "0" # Simplified: same as consent
|
||||
|
||||
return _bits_to_base64url(bits)
|
||||
|
||||
def encode_from_categories(
|
||||
self,
|
||||
categories: list[str],
|
||||
vendor_consents: dict[int, bool] | None = None,
|
||||
) -> str:
|
||||
"""Generate TC String from banner category selections.
|
||||
|
||||
Maps our banner categories (necessary, statistics, marketing, functional)
|
||||
to IAB purposes and generates the TC String.
|
||||
"""
|
||||
purpose_consents: dict[int, bool] = {}
|
||||
for cat in categories:
|
||||
for purpose_id in CATEGORY_PURPOSE_MAP.get(cat, []):
|
||||
purpose_consents[purpose_id] = True
|
||||
|
||||
return self.encode(
|
||||
purpose_consents=purpose_consents,
|
||||
vendor_consents=vendor_consents or {},
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_purposes() -> list[dict[str, Any]]:
|
||||
"""Return all 12 IAB purposes with translations."""
|
||||
return [
|
||||
{"id": pid, "name": info["name"], "name_de": info["name_de"]}
|
||||
for pid, info in IAB_PURPOSES.items()
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_special_features() -> list[dict[str, Any]]:
|
||||
return [
|
||||
{"id": fid, "name": info["name"], "name_de": info["name_de"]}
|
||||
for fid, info in IAB_SPECIAL_FEATURES.items()
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_category_purpose_map() -> dict[str, list[int]]:
|
||||
return CATEGORY_PURPOSE_MAP
|
||||
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
Training Link Service — bridges document review approvals with the Academy.
|
||||
|
||||
After a document is approved, checks which roles need training on that
|
||||
document type and identifies gaps (missing/overdue assignments).
|
||||
|
||||
Gracefully handles missing training tables (Go service not migrated yet).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TrainingLinkService:
|
||||
"""Links document approvals to training requirements."""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
|
||||
def _training_tables_exist(self) -> bool:
|
||||
"""Check if the Go-managed training tables exist."""
|
||||
try:
|
||||
self.db.execute(text("SELECT 1 FROM training_modules LIMIT 0"))
|
||||
return True
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
return False
|
||||
|
||||
def get_role_codes_for_document(self, tenant_id: str, document_type: str) -> list[dict]:
|
||||
"""Map document type → org roles → training role codes."""
|
||||
try:
|
||||
q = text("""
|
||||
SELECT m.role_key, t.training_role_code
|
||||
FROM compliance_document_role_mapping m
|
||||
LEFT JOIN compliance_role_training_mapping t
|
||||
ON t.org_role_key = m.role_key
|
||||
AND (t.tenant_id = :tid OR t.tenant_id = '__default__')
|
||||
WHERE m.tenant_id = :tid OR m.tenant_id = '__default__'
|
||||
AND m.document_type = :dt
|
||||
""")
|
||||
rows = self.db.execute(q, {"tid": tenant_id, "dt": document_type}).fetchall()
|
||||
return [{"role_key": r.role_key, "training_role_code": r.training_role_code} for r in rows]
|
||||
except Exception as e:
|
||||
logger.warning("Failed to get role codes: %s", e)
|
||||
return []
|
||||
|
||||
def get_training_requirements(self, tenant_id: str, document_type: str) -> dict[str, Any]:
|
||||
"""Get training modules required for roles associated with a document type."""
|
||||
if not self._training_tables_exist():
|
||||
return {
|
||||
"academy_available": False,
|
||||
"message": "Academy noch nicht eingerichtet. Training-Module werden nach Aktivierung automatisch verknuepft.",
|
||||
"requirements": [],
|
||||
}
|
||||
|
||||
role_mappings = self.get_role_codes_for_document(tenant_id, document_type)
|
||||
if not role_mappings:
|
||||
return {"academy_available": True, "message": "Keine Rollen-Zuordnung fuer diesen Dokumenttyp.", "requirements": []}
|
||||
|
||||
role_codes = [r["training_role_code"] for r in role_mappings if r.get("training_role_code")]
|
||||
if not role_codes:
|
||||
return {"academy_available": True, "message": "Keine Training-Codes konfiguriert.", "requirements": []}
|
||||
|
||||
try:
|
||||
placeholders = ",".join(f":rc{i}" for i in range(len(role_codes)))
|
||||
params: dict[str, Any] = {"tid": tenant_id}
|
||||
for i, rc in enumerate(role_codes):
|
||||
params[f"rc{i}"] = rc
|
||||
|
||||
q = text(f"""
|
||||
SELECT tm.role_code, m.module_code, m.title, m.description,
|
||||
m.frequency_type, m.duration_minutes, tm.is_mandatory
|
||||
FROM training_matrix tm
|
||||
JOIN training_modules m ON m.id = tm.module_id
|
||||
WHERE tm.tenant_id = :tid AND tm.role_code IN ({placeholders})
|
||||
AND m.is_active = TRUE
|
||||
ORDER BY tm.role_code, m.sort_order
|
||||
""")
|
||||
rows = self.db.execute(q, params).fetchall()
|
||||
reqs = [dict(r._mapping) for r in rows]
|
||||
return {"academy_available": True, "requirements": reqs, "total": len(reqs)}
|
||||
except Exception as e:
|
||||
logger.warning("Failed to query training requirements: %s", e)
|
||||
return {"academy_available": True, "requirements": [], "error": str(e)}
|
||||
|
||||
def check_training_gaps(
|
||||
self, tenant_id: str, document_type: str, project_id: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Check which persons assigned to roles have outstanding training."""
|
||||
if not self._training_tables_exist():
|
||||
return {"academy_available": False, "gaps": [], "total_gaps": 0}
|
||||
|
||||
role_mappings = self.get_role_codes_for_document(tenant_id, document_type)
|
||||
if not role_mappings:
|
||||
return {"academy_available": True, "gaps": [], "total_gaps": 0}
|
||||
|
||||
gaps = []
|
||||
for rm in role_mappings:
|
||||
role_key = rm["role_key"]
|
||||
role_code = rm.get("training_role_code")
|
||||
if not role_code:
|
||||
continue
|
||||
|
||||
# Get person assigned to this role
|
||||
where = "tenant_id = :tid AND role_key = :rk"
|
||||
params: dict[str, Any] = {"tid": tenant_id, "rk": role_key}
|
||||
if project_id:
|
||||
where += " AND (project_id = :pid OR project_id IS NULL)"
|
||||
params["pid"] = project_id
|
||||
|
||||
try:
|
||||
person = self.db.execute(text(
|
||||
f"SELECT person_name, person_email, role_label FROM compliance_org_roles WHERE {where} LIMIT 1"
|
||||
), params).fetchone()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not person or not person.person_name:
|
||||
continue
|
||||
|
||||
# Get required modules for this role code
|
||||
try:
|
||||
modules = self.db.execute(text("""
|
||||
SELECT m.id, m.module_code, m.title FROM training_matrix tm
|
||||
JOIN training_modules m ON m.id = tm.module_id
|
||||
WHERE tm.tenant_id = :tid AND tm.role_code = :rc AND m.is_active = TRUE AND tm.is_mandatory = TRUE
|
||||
"""), {"tid": tenant_id, "rc": role_code}).fetchall()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
for mod in modules:
|
||||
# Check if assignment exists and is completed
|
||||
try:
|
||||
assignment = self.db.execute(text("""
|
||||
SELECT status, progress_percent FROM training_assignments
|
||||
WHERE tenant_id = :tid AND module_id = :mid AND user_email = :email
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
"""), {"tid": tenant_id, "mid": mod.id, "email": person.person_email}).fetchone()
|
||||
except Exception:
|
||||
assignment = None
|
||||
|
||||
if not assignment or assignment.status not in ("completed", "passed"):
|
||||
gaps.append({
|
||||
"person_name": person.person_name,
|
||||
"person_email": person.person_email,
|
||||
"role": person.role_label,
|
||||
"role_key": role_key,
|
||||
"module_code": mod.module_code,
|
||||
"module_title": mod.title,
|
||||
"status": assignment.status if assignment else "nicht_begonnen",
|
||||
"progress": assignment.progress_percent if assignment else 0,
|
||||
})
|
||||
|
||||
return {"academy_available": True, "gaps": gaps, "total_gaps": len(gaps)}
|
||||
@@ -0,0 +1,148 @@
|
||||
"""
|
||||
Website Compliance Checks — checks public website for consumer protection
|
||||
compliance (§312k BGB, §5 TMG, Art. 13 DSGVO, Cookie-Banner).
|
||||
|
||||
Extracted from agent_analyze_routes.py to keep route files slim.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
class FollowUpQuestion:
|
||||
def __init__(self, id: str, question: str, legal_basis: str, severity: str, finding_if_no: str):
|
||||
self.id = id
|
||||
self.question = question
|
||||
self.legal_basis = legal_basis
|
||||
self.severity = severity
|
||||
self.finding_if_no = finding_if_no
|
||||
|
||||
|
||||
async def check_website_compliance(
|
||||
client: httpx.AsyncClient, url: str, html: str,
|
||||
) -> tuple[list[str], list[FollowUpQuestion]]:
|
||||
"""Scan public website for consumer protection compliance."""
|
||||
findings: list[str] = []
|
||||
follow_ups: list[FollowUpQuestion] = []
|
||||
html_lower = html.lower()
|
||||
base_domain = re.sub(r"https?://([^/]+).*", r"\1", url)
|
||||
|
||||
# E-Commerce detection — §312k only applies to sites with online contracts
|
||||
ecommerce_indicators = [
|
||||
r"warenkorb", r"cart", r"shop", r"bestell", r"order",
|
||||
r"checkout", r"kasse", r"kaufen", r"add.?to.?cart",
|
||||
r"stripe|paypal|klarna|mollie|adyen",
|
||||
r"abo", r"mitgliedschaft", r"subscription", r"premium",
|
||||
]
|
||||
is_ecommerce = any(re.search(p, html_lower) for p in ecommerce_indicators)
|
||||
|
||||
# --- §312k BGB: Kündigungsbutton (NUR bei E-Commerce/Abo-Websites) ---
|
||||
cancel_patterns = [
|
||||
r'href="[^"]*(?:kuendig|kündig|cancel|vertrag.?beenden|abo.?beenden|mitgliedschaft.?beenden)[^"]*"',
|
||||
r'(?:kündigen|kuendigen|vertrag beenden|abo beenden|mitgliedschaft kündigen)',
|
||||
]
|
||||
has_cancel_link = any(re.search(p, html_lower) for p in cancel_patterns)
|
||||
|
||||
cancel_urls_to_probe = [
|
||||
f"https://{base_domain}/kuendigen",
|
||||
f"https://{base_domain}/cancel",
|
||||
f"https://{base_domain}/vertrag-kuendigen",
|
||||
f"https://{base_domain}/abo-kuendigen",
|
||||
f"https://{base_domain}/account/cancel",
|
||||
]
|
||||
if not has_cancel_link:
|
||||
for probe_url in cancel_urls_to_probe:
|
||||
try:
|
||||
probe = await client.head(probe_url, follow_redirects=True, timeout=5.0)
|
||||
if probe.status_code < 400:
|
||||
has_cancel_link = True
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not has_cancel_link and is_ecommerce:
|
||||
findings.append(
|
||||
"[§312k BGB] Kein oeffentlich sichtbarer Kuendigungsbutton gefunden. "
|
||||
"Seit 01.07.2022 muessen online geschlossene Vertraege mit max. 2 Klicks kuendbar sein."
|
||||
)
|
||||
follow_ups.append(FollowUpQuestion(
|
||||
id="cancel_button_312k",
|
||||
question="Koennen Sie nach Login im Kundenbereich innerhalb von 2 Klicks Ihren Vertrag kuendigen?",
|
||||
legal_basis="§ 312k BGB (Kuendigungsbutton), Omnibus-Richtlinie (EU) 2019/2161",
|
||||
severity="high",
|
||||
finding_if_no=(
|
||||
"[§312k BGB] VERSTOSS: Kein funktionaler Kuendigungsbutton vorhanden. "
|
||||
"Der Anbieter ist verpflichtet, einen leicht auffindbaren Kuendigungsbutton "
|
||||
"bereitzustellen (max. 2 Klicks). Ein Zwang zur telefonischen Kuendigung "
|
||||
"oder Kuendigung per Brief ist rechtswidrig."
|
||||
),
|
||||
))
|
||||
|
||||
# --- Impressumspflicht (§5 TMG / §18 MStV) ---
|
||||
imprint_patterns = [
|
||||
r'href="[^"]*(?:impressum|imprint|legal.?notice|about.?us/legal)[^"]*"',
|
||||
r'>impressum<',
|
||||
]
|
||||
has_imprint = any(re.search(p, html_lower) for p in imprint_patterns)
|
||||
if not has_imprint:
|
||||
findings.append(
|
||||
"[§5 TMG] Kein Impressum-Link auf der Seite gefunden. "
|
||||
"Geschaeftsmaessige Online-Dienste muessen ein leicht erreichbares Impressum bereitstellen."
|
||||
)
|
||||
|
||||
# --- Datenschutzerklaerung verlinkt? ---
|
||||
privacy_patterns = [
|
||||
r'href="[^"]*(?:datenschutz|privacy|dsgvo)[^"]*"',
|
||||
r'>datenschutz<',
|
||||
]
|
||||
has_privacy = any(re.search(p, html_lower) for p in privacy_patterns)
|
||||
if not has_privacy:
|
||||
findings.append(
|
||||
"[Art. 13 DSGVO] Kein Link zur Datenschutzerklaerung gefunden. "
|
||||
"Nutzer muessen ueber die Verarbeitung personenbezogener Daten informiert werden."
|
||||
)
|
||||
|
||||
# --- Cookie-Consent-Banner ---
|
||||
cookie_patterns = [
|
||||
r'(?:cookie.?consent|cookie.?banner|consent.?manager|didomi|cookiebot|onetrust|usercentrics)',
|
||||
r'(?:gdpr|dsgvo).?(?:consent|einwilligung)',
|
||||
]
|
||||
has_cookie_consent = any(re.search(p, html_lower) for p in cookie_patterns)
|
||||
if not has_cookie_consent:
|
||||
follow_ups.append(FollowUpQuestion(
|
||||
id="cookie_consent",
|
||||
question="Wird beim ersten Besuch der Website ein Cookie-Consent-Banner angezeigt?",
|
||||
legal_basis="§ 25 TDDDG (ehem. TTDSG), Art. 5(3) ePrivacy-Richtlinie",
|
||||
severity="medium",
|
||||
finding_if_no=(
|
||||
"[§25 TDDDG] Kein Cookie-Consent-Banner erkannt. "
|
||||
"Vor dem Setzen nicht-essentieller Cookies ist eine Einwilligung erforderlich."
|
||||
),
|
||||
))
|
||||
|
||||
return findings, follow_ups
|
||||
|
||||
|
||||
def to_string_list(items: list) -> list[str]:
|
||||
"""Convert list of dicts or strings to list of strings."""
|
||||
result = []
|
||||
for item in (items or []):
|
||||
if isinstance(item, dict):
|
||||
desc = item.get("description", item.get("name", item.get("code", str(item))))
|
||||
code = item.get("code", item.get("id", ""))
|
||||
result.append(f"[{code}] {desc}" if code else str(desc))
|
||||
else:
|
||||
result.append(str(item))
|
||||
return result
|
||||
|
||||
|
||||
def risk_to_escalation(risk_level: str) -> str:
|
||||
"""Map UCCA risk level to escalation level."""
|
||||
mapping = {
|
||||
"MINIMAL": "E0",
|
||||
"LIMITED": "E1",
|
||||
"HIGH": "E2",
|
||||
"UNACCEPTABLE": "E3",
|
||||
}
|
||||
return mapping.get(risk_level.upper() if risk_level else "", "E0")
|
||||
@@ -40,107 +40,8 @@ class ScanResult:
|
||||
missing_pages: dict = field(default_factory=dict) # url -> status_code
|
||||
|
||||
|
||||
# ── Service Registry ──────────────────────────────────────────────────────────
|
||||
# Each entry: regex pattern -> service metadata
|
||||
SERVICE_REGISTRY: dict[str, dict] = {
|
||||
# --- Tracking & Analytics ---
|
||||
r"google.?analytics|gtag\(|UA-\d+|G-\w{5,}": {
|
||||
"id": "google_analytics", "name": "Google Analytics", "category": "tracking",
|
||||
"provider": "Google LLC", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, §25 TDDDG",
|
||||
},
|
||||
r"googletagmanager|gtm\.js": {
|
||||
"id": "google_tag_manager", "name": "Google Tag Manager", "category": "tracking",
|
||||
"provider": "Google LLC", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO",
|
||||
},
|
||||
r"facebook\.net/.*fbevents|fbq\(": {
|
||||
"id": "facebook_pixel", "name": "Meta/Facebook Pixel", "category": "marketing",
|
||||
"provider": "Meta Platforms", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, §25 TDDDG",
|
||||
},
|
||||
r"hotjar\.com|_hjSettings": {
|
||||
"id": "hotjar", "name": "Hotjar", "category": "tracking",
|
||||
"provider": "Hotjar Ltd", "country": "MT", "eu_adequate": True,
|
||||
"requires_consent": True, "legal_ref": "§25 TDDDG (Session Recording)",
|
||||
},
|
||||
r"clarity\.ms": {
|
||||
"id": "ms_clarity", "name": "Microsoft Clarity", "category": "tracking",
|
||||
"provider": "Microsoft", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "§25 TDDDG (Session Replay), Art. 44 DSGVO",
|
||||
},
|
||||
r"matomo|piwik": {
|
||||
"id": "matomo", "name": "Matomo", "category": "tracking",
|
||||
"provider": "InnoCraft/Self-hosted", "country": "EU/Self", "eu_adequate": True,
|
||||
"requires_consent": False, "legal_ref": "Cookieless moeglich, §25 TDDDG",
|
||||
},
|
||||
r"plausible\.io": {
|
||||
"id": "plausible", "name": "Plausible Analytics", "category": "tracking",
|
||||
"provider": "Plausible Insights", "country": "EE", "eu_adequate": True,
|
||||
"requires_consent": False, "legal_ref": "EU-Anbieter, cookieless",
|
||||
},
|
||||
# --- CDN & Fonts ---
|
||||
r"fonts\.googleapis\.com|fonts\.gstatic\.com": {
|
||||
"id": "google_fonts", "name": "Google Fonts (remote)", "category": "cdn",
|
||||
"provider": "Google LLC", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "LG Muenchen I, Az. 3 O 17493/20",
|
||||
},
|
||||
r"cdn\.cloudflare\.com|cdnjs\.cloudflare\.com": {
|
||||
"id": "cloudflare_cdn", "name": "Cloudflare CDN", "category": "cdn",
|
||||
"provider": "Cloudflare Inc", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": False, "legal_ref": "Art. 44-49 DSGVO, berechtigtes Interesse",
|
||||
},
|
||||
# --- Chatbots ---
|
||||
r"widget\.intercom\.io|intercomcdn": {
|
||||
"id": "intercom", "name": "Intercom", "category": "chatbot",
|
||||
"provider": "Intercom Inc", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, KI-gestuetzt",
|
||||
},
|
||||
r"tidio\.co|tidioChatApi": {
|
||||
"id": "tidio", "name": "Tidio Chat", "category": "chatbot",
|
||||
"provider": "Tidio LLC", "country": "PL", "eu_adequate": True,
|
||||
"requires_consent": False, "legal_ref": "EU-Anbieter",
|
||||
},
|
||||
r"zendesk\.com/embeddable|zdassets": {
|
||||
"id": "zendesk", "name": "Zendesk", "category": "chatbot",
|
||||
"provider": "Zendesk Inc", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO",
|
||||
},
|
||||
# --- Payment ---
|
||||
r"js\.stripe\.com|stripe\.com/v3": {
|
||||
"id": "stripe", "name": "Stripe", "category": "payment",
|
||||
"provider": "Stripe Inc", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": False, "legal_ref": "Art. 6(1)(b) Vertragserfuellung, SCCs",
|
||||
},
|
||||
r"paypal\.com/sdk|paypalobjects": {
|
||||
"id": "paypal", "name": "PayPal", "category": "payment",
|
||||
"provider": "PayPal Holdings", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": False, "legal_ref": "Art. 6(1)(b) Vertragserfuellung",
|
||||
},
|
||||
r"klarna\.com|klarna-payments": {
|
||||
"id": "klarna", "name": "Klarna", "category": "payment",
|
||||
"provider": "Klarna AB", "country": "SE", "eu_adequate": True,
|
||||
"requires_consent": False, "legal_ref": "EU, aber Art. 22 DSGVO bei Bonitaetspruefung!",
|
||||
},
|
||||
# --- Captcha ---
|
||||
r"recaptcha|grecaptcha": {
|
||||
"id": "recaptcha", "name": "Google reCAPTCHA", "category": "other",
|
||||
"provider": "Google LLC", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, §25 TDDDG",
|
||||
},
|
||||
# --- Video ---
|
||||
r"youtube\.com/embed|youtube-nocookie|ytimg": {
|
||||
"id": "youtube", "name": "YouTube", "category": "other",
|
||||
"provider": "Google LLC", "country": "US", "eu_adequate": False,
|
||||
"requires_consent": True, "legal_ref": "Art. 44-49 DSGVO, 2-Klick empfohlen",
|
||||
},
|
||||
# --- Consent Management ---
|
||||
r"didomi|cookiebot|onetrust|usercentrics|consentmanager|quantcast": {
|
||||
"id": "cmp", "name": "Consent Management Platform", "category": "other",
|
||||
"provider": "Various", "country": "EU", "eu_adequate": True,
|
||||
"requires_consent": False, "legal_ref": "CMP vorhanden — gut",
|
||||
},
|
||||
}
|
||||
# ── Service Registry (imported from master) ──────────────────────────────────
|
||||
from compliance.services.service_registry import SERVICE_REGISTRY # noqa: E402
|
||||
|
||||
AI_TEXT_PATTERNS = [
|
||||
r"k(?:ue|ü)nstliche.?intelligenz",
|
||||
@@ -157,9 +58,13 @@ AI_TEXT_PATTERNS = [
|
||||
|
||||
FOOTER_LINK_PATTERNS = [
|
||||
(r'href="([^"]*(?:impressum|imprint|legal-notice)[^"]*)"', "impressum"),
|
||||
(r'href="([^"]*(?:datenschutz|privacy|dsgvo)[^"]*)"', "datenschutz"),
|
||||
(r'href="([^"]*(?:datenschutz|privacy|dsgvo|hinweise.?zum.?datenschutz)[^"]*)"', "datenschutz"),
|
||||
(r'href="([^"]*(?:agb|terms|nutzungsbedingungen)[^"]*)"', "agb"),
|
||||
(r'href="([^"]*(?:cookie)[^"]*)"', "cookies"),
|
||||
# Deep DSE links (regional pages, sub-pages, service marks)
|
||||
(r'href="([^"]*(?:datenschutzinformation|datenschutzerklaerung|datenschutzerkl)[^"]*)"', "datenschutz_deep"),
|
||||
# Navigation links often contain DSB/privacy sub-pages
|
||||
(r'href="([^"]*(?:ueber.?uns.*datenschutz|servicemarken.*datenschutz|kontakt.*datenschutz)[^"]*)"', "datenschutz_nav"),
|
||||
]
|
||||
|
||||
|
||||
@@ -183,15 +88,46 @@ async def scan_website(base_url: str) -> ScanResult:
|
||||
href = match.group(1)
|
||||
if href.startswith("/"):
|
||||
href = urljoin(origin, href)
|
||||
if href.startswith(origin):
|
||||
if href.startswith(origin) and not re.search(r"\.(css|js|png|jpg|gif|svg|pdf|zip)(\?|$)", href):
|
||||
page_urls.add(href)
|
||||
|
||||
# 3. Scan all pages (max 10)
|
||||
for url in list(page_urls)[:10]:
|
||||
html = start_html if url == origin else await _fetch_page(client, url, result)
|
||||
if html:
|
||||
# 3. Scan all pages in PARALLEL (max 10)
|
||||
import asyncio
|
||||
other_urls = [u for u in list(page_urls)[:10] if u != origin]
|
||||
fetch_tasks = [_fetch_page(client, u, result) for u in other_urls]
|
||||
other_htmls = await asyncio.gather(*fetch_tasks, return_exceptions=True)
|
||||
|
||||
# Process start page
|
||||
_detect_services(start_html, origin, result)
|
||||
_detect_ai_mentions(start_html, origin, result)
|
||||
|
||||
# Process other pages + discover DSE-internal links
|
||||
dse_internal_urls = set()
|
||||
for url, html in zip(other_urls, other_htmls):
|
||||
if isinstance(html, str) and html:
|
||||
_detect_services(html, url, result)
|
||||
_detect_ai_mentions(html, url, result)
|
||||
# If this is a DSE page, find links within it (SAME DOMAIN only)
|
||||
if re.search(r"datenschutz|privacy|dsgvo", url, re.IGNORECASE):
|
||||
for pattern, _ in FOOTER_LINK_PATTERNS:
|
||||
for match in re.finditer(pattern, html, re.IGNORECASE):
|
||||
href = match.group(1)
|
||||
if href.startswith("/"):
|
||||
href = urljoin(origin, href)
|
||||
# IMPORTANT: Only follow links on the SAME domain
|
||||
# External links (etracker.com, google.de) must NOT be scanned
|
||||
if href.startswith(origin) and href not in page_urls:
|
||||
dse_internal_urls.add(href)
|
||||
|
||||
# 4. Follow DSE-internal links (additional pages linked from privacy policy)
|
||||
if dse_internal_urls:
|
||||
extra_urls = [u for u in list(dse_internal_urls)[:5] if u not in page_urls]
|
||||
if extra_urls:
|
||||
extra_tasks = [_fetch_page(client, u, result) for u in extra_urls]
|
||||
extra_htmls = await asyncio.gather(*extra_tasks, return_exceptions=True)
|
||||
for url, html in zip(extra_urls, extra_htmls):
|
||||
if isinstance(html, str) and html:
|
||||
_detect_services(html, url, result)
|
||||
|
||||
# Deduplicate services
|
||||
seen = set()
|
||||
|
||||
Reference in New Issue
Block a user