Files
breakpilot-compliance/backend-compliance/tests/test_agent_notification_routes.py
Benjamin Admin 0c0dd4e3a6 feat: ZeroClaw compliance agent — document analysis + role assignment + email
Add autonomous compliance agent that fetches web documents (cookie banners,
privacy policies), classifies them via Qwen/Ollama, assesses DSGVO compliance,
assigns to the responsible role, and sends notification emails.

Components:
- ZeroClaw SOP (6-step workflow: fetch, classify, assess, summarize, assign, notify)
- Backend: /api/compliance/agent/analyze (combined endpoint)
- Backend: /api/compliance/agent/notify (standalone email)
- Frontend: /sdk/agent page (Manager UI with URL input + results)
- Helper scripts + E2E test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 23:28:21 +02:00

84 lines
2.8 KiB
Python

"""Tests for agent notification endpoint."""
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client():
from main import app
return TestClient(app)
class TestAgentNotify:
"""Tests for POST /api/compliance/agent/notify."""
@patch("compliance.services.smtp_sender.smtplib.SMTP")
def test_send_notification_success(self, mock_smtp, client):
mock_instance = mock_smtp.return_value.__enter__.return_value
resp = client.post("/api/compliance/agent/notify", json={
"recipient": "dsb@firma.de",
"subject": "Test Finding",
"body_html": "<p>Test body</p>",
"role": "Datenschutzbeauftragter",
})
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "sent"
assert data["recipient"] == "dsb@firma.de"
assert data["role"] == "Datenschutzbeauftragter"
assert data["sent_at"] is not None
mock_instance.sendmail.assert_called_once()
@patch("compliance.services.smtp_sender.smtplib.SMTP")
def test_send_notification_with_escalation(self, mock_smtp, client):
mock_smtp.return_value.__enter__.return_value
resp = client.post("/api/compliance/agent/notify", json={
"recipient": "legal@firma.de",
"subject": "Escalation E3",
"body_html": "<h2>Urgent</h2>",
"role": "DSB + Rechtsabteilung",
"escalation_id": "esc-123",
})
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "sent"
assert data["role"] == "DSB + Rechtsabteilung"
def test_send_notification_invalid_email(self, client):
resp = client.post("/api/compliance/agent/notify", json={
"recipient": "not-an-email",
"subject": "Test",
"body_html": "<p>Test</p>",
"role": "DSB",
})
assert resp.status_code == 422
def test_send_notification_missing_fields(self, client):
resp = client.post("/api/compliance/agent/notify", json={
"recipient": "dsb@firma.de",
})
assert resp.status_code == 422
@patch("compliance.services.smtp_sender.smtplib.SMTP")
def test_send_notification_smtp_failure(self, mock_smtp, client):
mock_smtp.return_value.__enter__.side_effect = ConnectionRefusedError("SMTP down")
resp = client.post("/api/compliance/agent/notify", json={
"recipient": "dsb@firma.de",
"subject": "Test",
"body_html": "<p>Test</p>",
"role": "DSB",
})
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "failed"
assert "SMTP down" in data["error"]