feat: hybrid website compliance checks (§312k BGB, §5 TMG, Art. 13 DSGVO)
- Scan public website for cancellation button, imprint, privacy link, cookie consent - Generate follow-up questions when checks can't be verified without login - User answers "no" → finding with legal basis is added to results - Frontend: FollowUpQuestions component with Ja/Nein buttons - Sidebar: "Compliance Agent" entry added under KI-Compliance Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -43,6 +43,14 @@ class AnalyzeRequest(BaseModel):
|
||||
recipient: str = "dsb@breakpilot.local"
|
||||
|
||||
|
||||
class FollowUpQuestion(BaseModel):
|
||||
id: str
|
||||
question: str
|
||||
legal_basis: str
|
||||
severity: str # "high", "medium", "low"
|
||||
finding_if_no: str # Finding text if user answers "no"
|
||||
|
||||
|
||||
class AnalyzeResponse(BaseModel):
|
||||
url: str
|
||||
classification: str
|
||||
@@ -55,6 +63,7 @@ class AnalyzeResponse(BaseModel):
|
||||
summary: str
|
||||
email_status: str
|
||||
analyzed_at: str
|
||||
follow_up_questions: list[FollowUpQuestion] = []
|
||||
|
||||
|
||||
@router.post("/analyze", response_model=AnalyzeResponse)
|
||||
@@ -62,7 +71,7 @@ async def analyze_url(req: AnalyzeRequest):
|
||||
"""Fetch URL, classify, assess compliance, and notify responsible role."""
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
# Step 1: Fetch and clean
|
||||
text = await _fetch_and_clean(client, req.url)
|
||||
text, raw_html = await _fetch_and_clean(client, req.url)
|
||||
|
||||
# Step 2: Classify via SDK LLM
|
||||
classification = await _classify(client, text)
|
||||
@@ -74,15 +83,23 @@ async def analyze_url(req: AnalyzeRequest):
|
||||
esc_level = assessment.get("escalation_level", "E0")
|
||||
role = ESCALATION_ROLES.get(esc_level, ESCALATION_ROLES["E0"])
|
||||
|
||||
# Step 5: Build summary
|
||||
# Step 5: Website compliance checks (§312k BGB etc.)
|
||||
site_findings, follow_ups = await _check_website_compliance(client, req.url, raw_html)
|
||||
|
||||
# Step 6: Merge findings
|
||||
findings = assessment.get("triggered_rules", [])
|
||||
controls = assessment.get("required_controls", [])
|
||||
# Convert for summary (use string lists, not raw dicts)
|
||||
findings_str = _to_string_list(findings)
|
||||
findings_str = _to_string_list(findings) + site_findings
|
||||
controls_str = _to_string_list(controls)
|
||||
|
||||
# Escalate if website checks found issues
|
||||
if site_findings and esc_level == "E0":
|
||||
esc_level = "E1"
|
||||
role = ESCALATION_ROLES["E1"]
|
||||
|
||||
summary = _build_summary(req.url, classification, assessment, role, findings_str, controls_str)
|
||||
|
||||
# Step 6: Send notification
|
||||
# Step 7: Send notification
|
||||
email_result = send_email(
|
||||
recipient=req.recipient,
|
||||
subject=f"Compliance-Finding: {classification} — {req.url[:60]}",
|
||||
@@ -96,16 +113,17 @@ async def analyze_url(req: AnalyzeRequest):
|
||||
risk_score=assessment.get("risk_score", 0),
|
||||
escalation_level=esc_level,
|
||||
responsible_role=role,
|
||||
findings=_to_string_list(findings),
|
||||
required_controls=_to_string_list(controls),
|
||||
findings=findings_str,
|
||||
required_controls=controls_str,
|
||||
summary=summary,
|
||||
email_status=email_result.get("status", "failed"),
|
||||
analyzed_at=datetime.now(timezone.utc).isoformat(),
|
||||
follow_up_questions=follow_ups,
|
||||
)
|
||||
|
||||
|
||||
async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> str:
|
||||
"""Fetch URL and strip HTML to plain text."""
|
||||
async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> tuple[str, str]:
|
||||
"""Fetch URL. Returns (clean_text, raw_html)."""
|
||||
resp = await client.get(url, follow_redirects=True, headers={
|
||||
"User-Agent": "BreakPilot-Compliance-Agent/1.0",
|
||||
})
|
||||
@@ -115,7 +133,7 @@ async def _fetch_and_clean(client: httpx.AsyncClient, url: str) -> str:
|
||||
clean = re.sub(r"<[^>]+>", " ", clean)
|
||||
clean = re.sub(r" ", " ", clean)
|
||||
clean = re.sub(r"\s+", " ", clean).strip()
|
||||
return clean[:4000]
|
||||
return clean[:4000], html
|
||||
|
||||
|
||||
async def _classify(client: httpx.AsyncClient, text: str) -> str:
|
||||
@@ -207,6 +225,103 @@ async def _assess(client: httpx.AsyncClient, text: str, classification: str) ->
|
||||
return {"risk_level": "unknown", "risk_score": 0, "escalation_level": "E0"}
|
||||
|
||||
|
||||
async def _check_website_compliance(
|
||||
client: httpx.AsyncClient, url: str, html: str,
|
||||
) -> tuple[list[str], list[FollowUpQuestion]]:
|
||||
"""Scan public website for consumer protection compliance (§312k BGB etc.)."""
|
||||
findings: list[str] = []
|
||||
follow_ups: list[FollowUpQuestion] = []
|
||||
html_lower = html.lower()
|
||||
base_domain = re.sub(r"https?://([^/]+).*", r"\1", url)
|
||||
|
||||
# --- §312k BGB: Kündigungsbutton ---
|
||||
cancel_patterns = [
|
||||
r'href="[^"]*(?:kuendig|kündig|cancel|vertrag.?beenden|abo.?beenden|mitgliedschaft.?beenden)[^"]*"',
|
||||
r'(?:kündigen|kuendigen|vertrag beenden|abo beenden|mitgliedschaft kündigen)',
|
||||
]
|
||||
has_cancel_link = any(re.search(p, html_lower) for p in cancel_patterns)
|
||||
|
||||
# Also check common cancel URLs
|
||||
cancel_urls_to_probe = [
|
||||
f"https://{base_domain}/kuendigen",
|
||||
f"https://{base_domain}/cancel",
|
||||
f"https://{base_domain}/vertrag-kuendigen",
|
||||
f"https://{base_domain}/abo-kuendigen",
|
||||
f"https://{base_domain}/account/cancel",
|
||||
]
|
||||
if not has_cancel_link:
|
||||
for probe_url in cancel_urls_to_probe:
|
||||
try:
|
||||
probe = await client.head(probe_url, follow_redirects=True, timeout=5.0)
|
||||
if probe.status_code < 400:
|
||||
has_cancel_link = True
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not has_cancel_link:
|
||||
findings.append(
|
||||
"[§312k BGB] Kein oeffentlich sichtbarer Kuendigungsbutton gefunden. "
|
||||
"Seit 01.07.2022 muessen online geschlossene Vertraege mit max. 2 Klicks kuendbar sein."
|
||||
)
|
||||
follow_ups.append(FollowUpQuestion(
|
||||
id="cancel_button_312k",
|
||||
question="Koennen Sie nach Login im Kundenbereich innerhalb von 2 Klicks Ihren Vertrag kuendigen?",
|
||||
legal_basis="§ 312k BGB (Kuendigungsbutton), Omnibus-Richtlinie (EU) 2019/2161",
|
||||
severity="high",
|
||||
finding_if_no=(
|
||||
"[§312k BGB] VERSTOSS: Kein funktionaler Kuendigungsbutton vorhanden. "
|
||||
"Der Anbieter ist verpflichtet, einen leicht auffindbaren Kuendigungsbutton "
|
||||
"bereitzustellen (max. 2 Klicks). Ein Zwang zur telefonischen Kuendigung "
|
||||
"oder Kuendigung per Brief ist rechtswidrig."
|
||||
),
|
||||
))
|
||||
|
||||
# --- Impressumspflicht (§5 TMG / §18 MStV) ---
|
||||
imprint_patterns = [
|
||||
r'href="[^"]*(?:impressum|imprint|legal.?notice|about.?us/legal)[^"]*"',
|
||||
r'>impressum<',
|
||||
]
|
||||
has_imprint = any(re.search(p, html_lower) for p in imprint_patterns)
|
||||
if not has_imprint:
|
||||
findings.append(
|
||||
"[§5 TMG] Kein Impressum-Link auf der Seite gefunden. "
|
||||
"Geschaeftsmaessige Online-Dienste muessen ein leicht erreichbares Impressum bereitstellen."
|
||||
)
|
||||
|
||||
# --- Datenschutzerklaerung verlinkt? ---
|
||||
privacy_patterns = [
|
||||
r'href="[^"]*(?:datenschutz|privacy|dsgvo)[^"]*"',
|
||||
r'>datenschutz<',
|
||||
]
|
||||
has_privacy = any(re.search(p, html_lower) for p in privacy_patterns)
|
||||
if not has_privacy:
|
||||
findings.append(
|
||||
"[Art. 13 DSGVO] Kein Link zur Datenschutzerklaerung gefunden. "
|
||||
"Nutzer muessen ueber die Verarbeitung personenbezogener Daten informiert werden."
|
||||
)
|
||||
|
||||
# --- Cookie-Consent-Banner ---
|
||||
cookie_patterns = [
|
||||
r'(?:cookie.?consent|cookie.?banner|consent.?manager|didomi|cookiebot|onetrust|usercentrics)',
|
||||
r'(?:gdpr|dsgvo).?(?:consent|einwilligung)',
|
||||
]
|
||||
has_cookie_consent = any(re.search(p, html_lower) for p in cookie_patterns)
|
||||
if not has_cookie_consent:
|
||||
follow_ups.append(FollowUpQuestion(
|
||||
id="cookie_consent",
|
||||
question="Wird beim ersten Besuch der Website ein Cookie-Consent-Banner angezeigt?",
|
||||
legal_basis="§ 25 TDDDG (ehem. TTDSG), Art. 5(3) ePrivacy-Richtlinie",
|
||||
severity="medium",
|
||||
finding_if_no=(
|
||||
"[§25 TDDDG] Kein Cookie-Consent-Banner erkannt. "
|
||||
"Vor dem Setzen nicht-essentieller Cookies ist eine Einwilligung erforderlich."
|
||||
),
|
||||
))
|
||||
|
||||
return findings, follow_ups
|
||||
|
||||
|
||||
def _to_string_list(items: list) -> list[str]:
|
||||
"""Convert list of dicts or strings to list of strings."""
|
||||
result = []
|
||||
|
||||
Reference in New Issue
Block a user