refactor: split agent_analyze_routes (420→309 LOC) + agent docs + migration

- Extracted website compliance checks + helpers to website_compliance_checks.py
- Created agent documentation (zeroclaw/docs/compliance-agent.md)
- DB migration 086 executed (compliance_agent_scans table)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-02 08:22:52 +02:00
parent 6864849115
commit e318215cc5
3 changed files with 261 additions and 119 deletions
@@ -17,6 +17,12 @@ from pydantic import BaseModel
from compliance.services.smtp_sender import send_email
from compliance.services.intake_extractor import extract_intake_flags, flags_to_ucca_intake
from compliance.services.relevance_filter import filter_controls
from compliance.services.website_compliance_checks import (
check_website_compliance as _check_website_compliance,
FollowUpQuestion,
to_string_list as _to_string_list,
risk_to_escalation as _risk_to_escalation,
)
logger = logging.getLogger(__name__)
@@ -222,126 +228,9 @@ async def _assess(client: httpx.AsyncClient, text: str, classification: str, int
return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"}
async def _check_website_compliance(
client: httpx.AsyncClient, url: str, html: str,
) -> tuple[list[str], list[FollowUpQuestion]]:
"""Scan public website for consumer protection compliance (§312k BGB etc.)."""
findings: list[str] = []
follow_ups: list[FollowUpQuestion] = []
html_lower = html.lower()
base_domain = re.sub(r"https?://([^/]+).*", r"\1", url)
# --- §312k BGB: Kündigungsbutton ---
cancel_patterns = [
r'href="[^"]*(?:kuendig|kündig|cancel|vertrag.?beenden|abo.?beenden|mitgliedschaft.?beenden)[^"]*"',
r'(?:kündigen|kuendigen|vertrag beenden|abo beenden|mitgliedschaft kündigen)',
]
has_cancel_link = any(re.search(p, html_lower) for p in cancel_patterns)
# Also check common cancel URLs
cancel_urls_to_probe = [
f"https://{base_domain}/kuendigen",
f"https://{base_domain}/cancel",
f"https://{base_domain}/vertrag-kuendigen",
f"https://{base_domain}/abo-kuendigen",
f"https://{base_domain}/account/cancel",
]
if not has_cancel_link:
for probe_url in cancel_urls_to_probe:
try:
probe = await client.head(probe_url, follow_redirects=True, timeout=5.0)
if probe.status_code < 400:
has_cancel_link = True
break
except Exception:
continue
if not has_cancel_link:
findings.append(
"[§312k BGB] Kein oeffentlich sichtbarer Kuendigungsbutton gefunden. "
"Seit 01.07.2022 muessen online geschlossene Vertraege mit max. 2 Klicks kuendbar sein."
)
follow_ups.append(FollowUpQuestion(
id="cancel_button_312k",
question="Koennen Sie nach Login im Kundenbereich innerhalb von 2 Klicks Ihren Vertrag kuendigen?",
legal_basis="§ 312k BGB (Kuendigungsbutton), Omnibus-Richtlinie (EU) 2019/2161",
severity="high",
finding_if_no=(
"[§312k BGB] VERSTOSS: Kein funktionaler Kuendigungsbutton vorhanden. "
"Der Anbieter ist verpflichtet, einen leicht auffindbaren Kuendigungsbutton "
"bereitzustellen (max. 2 Klicks). Ein Zwang zur telefonischen Kuendigung "
"oder Kuendigung per Brief ist rechtswidrig."
),
))
# --- Impressumspflicht (§5 TMG / §18 MStV) ---
imprint_patterns = [
r'href="[^"]*(?:impressum|imprint|legal.?notice|about.?us/legal)[^"]*"',
r'>impressum<',
]
has_imprint = any(re.search(p, html_lower) for p in imprint_patterns)
if not has_imprint:
findings.append(
"[§5 TMG] Kein Impressum-Link auf der Seite gefunden. "
"Geschaeftsmaessige Online-Dienste muessen ein leicht erreichbares Impressum bereitstellen."
)
# --- Datenschutzerklaerung verlinkt? ---
privacy_patterns = [
r'href="[^"]*(?:datenschutz|privacy|dsgvo)[^"]*"',
r'>datenschutz<',
]
has_privacy = any(re.search(p, html_lower) for p in privacy_patterns)
if not has_privacy:
findings.append(
"[Art. 13 DSGVO] Kein Link zur Datenschutzerklaerung gefunden. "
"Nutzer muessen ueber die Verarbeitung personenbezogener Daten informiert werden."
)
# --- Cookie-Consent-Banner ---
cookie_patterns = [
r'(?:cookie.?consent|cookie.?banner|consent.?manager|didomi|cookiebot|onetrust|usercentrics)',
r'(?:gdpr|dsgvo).?(?:consent|einwilligung)',
]
has_cookie_consent = any(re.search(p, html_lower) for p in cookie_patterns)
if not has_cookie_consent:
follow_ups.append(FollowUpQuestion(
id="cookie_consent",
question="Wird beim ersten Besuch der Website ein Cookie-Consent-Banner angezeigt?",
legal_basis="§ 25 TDDDG (ehem. TTDSG), Art. 5(3) ePrivacy-Richtlinie",
severity="medium",
finding_if_no=(
"[§25 TDDDG] Kein Cookie-Consent-Banner erkannt. "
"Vor dem Setzen nicht-essentieller Cookies ist eine Einwilligung erforderlich."
),
))
return findings, follow_ups
def _to_string_list(items: list) -> list[str]:
"""Convert list of dicts or strings to list of strings."""
result = []
for item in (items or []):
if isinstance(item, dict):
# UCCA returns {code, category, description} or {id, name, description}
desc = item.get("description", item.get("name", item.get("code", str(item))))
code = item.get("code", item.get("id", ""))
result.append(f"[{code}] {desc}" if code else str(desc))
else:
result.append(str(item))
return result
def _risk_to_escalation(risk_level: str) -> str:
"""Map UCCA risk level to escalation level."""
mapping = {
"MINIMAL": "E0",
"LIMITED": "E1",
"HIGH": "E2",
"UNACCEPTABLE": "E3",
}
return mapping.get(risk_level.upper() if risk_level else "", "E0")
# _check_website_compliance, _to_string_list, _risk_to_escalation
# → extracted to compliance/services/website_compliance_checks.py
DOC_TYPE_LABELS = {
@@ -0,0 +1,139 @@
"""
Website Compliance Checks — checks public website for consumer protection
compliance (§312k BGB, §5 TMG, Art. 13 DSGVO, Cookie-Banner).
Extracted from agent_analyze_routes.py to keep route files slim.
"""
import re
import httpx
class FollowUpQuestion:
def __init__(self, id: str, question: str, legal_basis: str, severity: str, finding_if_no: str):
self.id = id
self.question = question
self.legal_basis = legal_basis
self.severity = severity
self.finding_if_no = finding_if_no
async def check_website_compliance(
client: httpx.AsyncClient, url: str, html: str,
) -> tuple[list[str], list[FollowUpQuestion]]:
"""Scan public website for consumer protection compliance."""
findings: list[str] = []
follow_ups: list[FollowUpQuestion] = []
html_lower = html.lower()
base_domain = re.sub(r"https?://([^/]+).*", r"\1", url)
# --- §312k BGB: Kündigungsbutton ---
cancel_patterns = [
r'href="[^"]*(?:kuendig|kündig|cancel|vertrag.?beenden|abo.?beenden|mitgliedschaft.?beenden)[^"]*"',
r'(?:kündigen|kuendigen|vertrag beenden|abo beenden|mitgliedschaft kündigen)',
]
has_cancel_link = any(re.search(p, html_lower) for p in cancel_patterns)
cancel_urls_to_probe = [
f"https://{base_domain}/kuendigen",
f"https://{base_domain}/cancel",
f"https://{base_domain}/vertrag-kuendigen",
f"https://{base_domain}/abo-kuendigen",
f"https://{base_domain}/account/cancel",
]
if not has_cancel_link:
for probe_url in cancel_urls_to_probe:
try:
probe = await client.head(probe_url, follow_redirects=True, timeout=5.0)
if probe.status_code < 400:
has_cancel_link = True
break
except Exception:
continue
if not has_cancel_link:
findings.append(
"[§312k BGB] Kein oeffentlich sichtbarer Kuendigungsbutton gefunden. "
"Seit 01.07.2022 muessen online geschlossene Vertraege mit max. 2 Klicks kuendbar sein."
)
follow_ups.append(FollowUpQuestion(
id="cancel_button_312k",
question="Koennen Sie nach Login im Kundenbereich innerhalb von 2 Klicks Ihren Vertrag kuendigen?",
legal_basis="§ 312k BGB (Kuendigungsbutton), Omnibus-Richtlinie (EU) 2019/2161",
severity="high",
finding_if_no=(
"[§312k BGB] VERSTOSS: Kein funktionaler Kuendigungsbutton vorhanden. "
"Der Anbieter ist verpflichtet, einen leicht auffindbaren Kuendigungsbutton "
"bereitzustellen (max. 2 Klicks). Ein Zwang zur telefonischen Kuendigung "
"oder Kuendigung per Brief ist rechtswidrig."
),
))
# --- Impressumspflicht (§5 TMG / §18 MStV) ---
imprint_patterns = [
r'href="[^"]*(?:impressum|imprint|legal.?notice|about.?us/legal)[^"]*"',
r'>impressum<',
]
has_imprint = any(re.search(p, html_lower) for p in imprint_patterns)
if not has_imprint:
findings.append(
"[§5 TMG] Kein Impressum-Link auf der Seite gefunden. "
"Geschaeftsmaessige Online-Dienste muessen ein leicht erreichbares Impressum bereitstellen."
)
# --- Datenschutzerklaerung verlinkt? ---
privacy_patterns = [
r'href="[^"]*(?:datenschutz|privacy|dsgvo)[^"]*"',
r'>datenschutz<',
]
has_privacy = any(re.search(p, html_lower) for p in privacy_patterns)
if not has_privacy:
findings.append(
"[Art. 13 DSGVO] Kein Link zur Datenschutzerklaerung gefunden. "
"Nutzer muessen ueber die Verarbeitung personenbezogener Daten informiert werden."
)
# --- Cookie-Consent-Banner ---
cookie_patterns = [
r'(?:cookie.?consent|cookie.?banner|consent.?manager|didomi|cookiebot|onetrust|usercentrics)',
r'(?:gdpr|dsgvo).?(?:consent|einwilligung)',
]
has_cookie_consent = any(re.search(p, html_lower) for p in cookie_patterns)
if not has_cookie_consent:
follow_ups.append(FollowUpQuestion(
id="cookie_consent",
question="Wird beim ersten Besuch der Website ein Cookie-Consent-Banner angezeigt?",
legal_basis="§ 25 TDDDG (ehem. TTDSG), Art. 5(3) ePrivacy-Richtlinie",
severity="medium",
finding_if_no=(
"[§25 TDDDG] Kein Cookie-Consent-Banner erkannt. "
"Vor dem Setzen nicht-essentieller Cookies ist eine Einwilligung erforderlich."
),
))
return findings, follow_ups
def to_string_list(items: list) -> list[str]:
"""Convert list of dicts or strings to list of strings."""
result = []
for item in (items or []):
if isinstance(item, dict):
desc = item.get("description", item.get("name", item.get("code", str(item))))
code = item.get("code", item.get("id", ""))
result.append(f"[{code}] {desc}" if code else str(desc))
else:
result.append(str(item))
return result
def risk_to_escalation(risk_level: str) -> str:
"""Map UCCA risk level to escalation level."""
mapping = {
"MINIMAL": "E0",
"LIMITED": "E1",
"HIGH": "E2",
"UNACCEPTABLE": "E3",
}
return mapping.get(risk_level.upper() if risk_level else "", "E0")