feat(vvt): Go-Features nach Python portieren (Source of Truth)
Review-Daten (last_reviewed_at, next_review_at), created_by, DSFA-Link, CSV-Export mit Semikolon-Trennung, overdue_review_count in Stats. Go-VVT-Handler als DEPRECATED markiert. 32 Tests bestanden. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -251,6 +251,7 @@ func main() {
|
||||
// Statistics
|
||||
dsgvoRoutes.GET("/stats", dsgvoHandlers.GetStats)
|
||||
|
||||
// DEPRECATED: VVT routes - frontend uses backend-compliance proxy instead
|
||||
// VVT - Verarbeitungsverzeichnis (Art. 30)
|
||||
vvt := dsgvoRoutes.Group("/processing-activities")
|
||||
{
|
||||
@@ -299,7 +300,7 @@ func main() {
|
||||
// Export routes
|
||||
exports := dsgvoRoutes.Group("/export")
|
||||
{
|
||||
exports.GET("/vvt", dsgvoHandlers.ExportVVT)
|
||||
exports.GET("/vvt", dsgvoHandlers.ExportVVT) // DEPRECATED: use backend-compliance /vvt/export?format=csv
|
||||
exports.GET("/tom", dsgvoHandlers.ExportTOM)
|
||||
exports.GET("/dsr", dsgvoHandlers.ExportDSR)
|
||||
exports.GET("/retention", dsgvoHandlers.ExportRetentionPolicies)
|
||||
|
||||
@@ -24,6 +24,8 @@ func NewDSGVOHandlers(store *dsgvo.Store) *DSGVOHandlers {
|
||||
|
||||
// ============================================================================
|
||||
// VVT - Verarbeitungsverzeichnis (Processing Activities)
|
||||
// DEPRECATED: VVT is now managed by backend-compliance (Python).
|
||||
// These handlers will be removed once all DSGVO sub-modules are consolidated.
|
||||
// ============================================================================
|
||||
|
||||
// ListProcessingActivities returns all processing activities for a tenant
|
||||
|
||||
@@ -1910,6 +1910,10 @@ class VVTActivityCreate(BaseModel):
|
||||
status: str = 'DRAFT'
|
||||
responsible: Optional[str] = None
|
||||
owner: Optional[str] = None
|
||||
last_reviewed_at: Optional[datetime] = None
|
||||
next_review_at: Optional[datetime] = None
|
||||
created_by: Optional[str] = None
|
||||
dsfa_id: Optional[str] = None
|
||||
|
||||
|
||||
class VVTActivityUpdate(BaseModel):
|
||||
@@ -1934,6 +1938,10 @@ class VVTActivityUpdate(BaseModel):
|
||||
status: Optional[str] = None
|
||||
responsible: Optional[str] = None
|
||||
owner: Optional[str] = None
|
||||
last_reviewed_at: Optional[datetime] = None
|
||||
next_review_at: Optional[datetime] = None
|
||||
created_by: Optional[str] = None
|
||||
dsfa_id: Optional[str] = None
|
||||
|
||||
|
||||
class VVTActivityResponse(BaseModel):
|
||||
@@ -1960,6 +1968,10 @@ class VVTActivityResponse(BaseModel):
|
||||
status: str = 'DRAFT'
|
||||
responsible: Optional[str] = None
|
||||
owner: Optional[str] = None
|
||||
last_reviewed_at: Optional[datetime] = None
|
||||
next_review_at: Optional[datetime] = None
|
||||
created_by: Optional[str] = None
|
||||
dsfa_id: Optional[str] = None
|
||||
created_at: datetime
|
||||
updated_at: Optional[datetime] = None
|
||||
|
||||
@@ -1975,6 +1987,7 @@ class VVTStatsResponse(BaseModel):
|
||||
third_country_count: int
|
||||
draft_count: int
|
||||
approved_count: int
|
||||
overdue_review_count: int = 0
|
||||
|
||||
|
||||
class VVTAuditLogEntry(BaseModel):
|
||||
|
||||
@@ -14,12 +14,15 @@ Endpoints:
|
||||
GET /vvt/stats — Statistics
|
||||
"""
|
||||
|
||||
import csv
|
||||
import io
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, List
|
||||
from uuid import uuid4
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from classroom_engine.database import get_db
|
||||
@@ -150,6 +153,10 @@ def _activity_to_response(act: VVTActivityDB) -> VVTActivityResponse:
|
||||
status=act.status or 'DRAFT',
|
||||
responsible=act.responsible,
|
||||
owner=act.owner,
|
||||
last_reviewed_at=act.last_reviewed_at,
|
||||
next_review_at=act.next_review_at,
|
||||
created_by=act.created_by,
|
||||
dsfa_id=str(act.dsfa_id) if act.dsfa_id else None,
|
||||
created_at=act.created_at,
|
||||
updated_at=act.updated_at,
|
||||
)
|
||||
@@ -160,6 +167,7 @@ async def list_activities(
|
||||
status: Optional[str] = Query(None),
|
||||
business_function: Optional[str] = Query(None),
|
||||
search: Optional[str] = Query(None),
|
||||
review_overdue: Optional[bool] = Query(None),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""List all processing activities with optional filters."""
|
||||
@@ -169,6 +177,12 @@ async def list_activities(
|
||||
query = query.filter(VVTActivityDB.status == status)
|
||||
if business_function:
|
||||
query = query.filter(VVTActivityDB.business_function == business_function)
|
||||
if review_overdue:
|
||||
now = datetime.now(timezone.utc)
|
||||
query = query.filter(
|
||||
VVTActivityDB.next_review_at.isnot(None),
|
||||
VVTActivityDB.next_review_at < now,
|
||||
)
|
||||
if search:
|
||||
term = f"%{search}%"
|
||||
query = query.filter(
|
||||
@@ -184,6 +198,7 @@ async def list_activities(
|
||||
@router.post("/activities", response_model=VVTActivityResponse, status_code=201)
|
||||
async def create_activity(
|
||||
request: VVTActivityCreate,
|
||||
http_request: Request,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Create a new processing activity."""
|
||||
@@ -197,7 +212,12 @@ async def create_activity(
|
||||
detail=f"Activity with VVT-ID '{request.vvt_id}' already exists"
|
||||
)
|
||||
|
||||
act = VVTActivityDB(**request.dict())
|
||||
data = request.dict()
|
||||
# Set created_by from X-User-ID header if not provided in body
|
||||
if not data.get('created_by'):
|
||||
data['created_by'] = http_request.headers.get('X-User-ID', 'system')
|
||||
|
||||
act = VVTActivityDB(**data)
|
||||
db.add(act)
|
||||
db.flush() # get ID before audit log
|
||||
|
||||
@@ -312,8 +332,11 @@ async def get_audit_log(
|
||||
# ============================================================================
|
||||
|
||||
@router.get("/export")
|
||||
async def export_activities(db: Session = Depends(get_db)):
|
||||
"""JSON export of all activities for external review / PDF generation."""
|
||||
async def export_activities(
|
||||
format: str = Query("json", pattern="^(json|csv)$"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Export all activities as JSON or CSV (semicolon-separated, DE locale)."""
|
||||
org = db.query(VVTOrganizationDB).order_by(VVTOrganizationDB.created_at).first()
|
||||
activities = db.query(VVTActivityDB).order_by(VVTActivityDB.created_at).all()
|
||||
|
||||
@@ -321,10 +344,13 @@ async def export_activities(db: Session = Depends(get_db)):
|
||||
db,
|
||||
action="EXPORT",
|
||||
entity_type="all_activities",
|
||||
new_values={"count": len(activities)},
|
||||
new_values={"count": len(activities), "format": format},
|
||||
)
|
||||
db.commit()
|
||||
|
||||
if format == "csv":
|
||||
return _export_csv(activities)
|
||||
|
||||
return {
|
||||
"exported_at": datetime.utcnow().isoformat(),
|
||||
"organization": {
|
||||
@@ -351,6 +377,10 @@ async def export_activities(db: Session = Depends(get_db)):
|
||||
"protection_level": a.protection_level,
|
||||
"business_function": a.business_function,
|
||||
"responsible": a.responsible,
|
||||
"created_by": a.created_by,
|
||||
"dsfa_id": str(a.dsfa_id) if a.dsfa_id else None,
|
||||
"last_reviewed_at": a.last_reviewed_at.isoformat() if a.last_reviewed_at else None,
|
||||
"next_review_at": a.next_review_at.isoformat() if a.next_review_at else None,
|
||||
"created_at": a.created_at.isoformat(),
|
||||
"updated_at": a.updated_at.isoformat() if a.updated_at else None,
|
||||
}
|
||||
@@ -359,6 +389,48 @@ async def export_activities(db: Session = Depends(get_db)):
|
||||
}
|
||||
|
||||
|
||||
def _export_csv(activities: list) -> StreamingResponse:
|
||||
"""Generate semicolon-separated CSV with UTF-8 BOM for German Excel compatibility."""
|
||||
output = io.StringIO()
|
||||
# UTF-8 BOM for Excel
|
||||
output.write('\ufeff')
|
||||
|
||||
writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_MINIMAL)
|
||||
writer.writerow([
|
||||
'ID', 'VVT-ID', 'Name', 'Zweck', 'Rechtsgrundlage',
|
||||
'Datenkategorien', 'Betroffene', 'Empfaenger', 'Drittland',
|
||||
'Aufbewahrung', 'Status', 'Verantwortlich', 'Erstellt von',
|
||||
'Erstellt am',
|
||||
])
|
||||
|
||||
for a in activities:
|
||||
writer.writerow([
|
||||
str(a.id),
|
||||
a.vvt_id,
|
||||
a.name,
|
||||
'; '.join(a.purposes or []),
|
||||
'; '.join(a.legal_bases or []),
|
||||
'; '.join(a.personal_data_categories or []),
|
||||
'; '.join(a.data_subject_categories or []),
|
||||
'; '.join(a.recipient_categories or []),
|
||||
'Ja' if a.third_country_transfers else 'Nein',
|
||||
str(a.retention_period) if a.retention_period else '',
|
||||
a.status or 'DRAFT',
|
||||
a.responsible or '',
|
||||
a.created_by or 'system',
|
||||
a.created_at.strftime('%d.%m.%Y %H:%M') if a.created_at else '',
|
||||
])
|
||||
|
||||
output.seek(0)
|
||||
return StreamingResponse(
|
||||
iter([output.getvalue()]),
|
||||
media_type='text/csv; charset=utf-8',
|
||||
headers={
|
||||
'Content-Disposition': f'attachment; filename="vvt_export_{datetime.utcnow().strftime("%Y%m%d")}.csv"'
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/stats", response_model=VVTStatsResponse)
|
||||
async def get_stats(db: Session = Depends(get_db)):
|
||||
"""Get VVT statistics summary."""
|
||||
@@ -366,12 +438,16 @@ async def get_stats(db: Session = Depends(get_db)):
|
||||
|
||||
by_status: dict = {}
|
||||
by_bf: dict = {}
|
||||
now = datetime.now(timezone.utc)
|
||||
overdue_count = 0
|
||||
|
||||
for a in activities:
|
||||
status = a.status or 'DRAFT'
|
||||
bf = a.business_function or 'unknown'
|
||||
by_status[status] = by_status.get(status, 0) + 1
|
||||
by_bf[bf] = by_bf.get(bf, 0) + 1
|
||||
if a.next_review_at and a.next_review_at < now:
|
||||
overdue_count += 1
|
||||
|
||||
return VVTStatsResponse(
|
||||
total=len(activities),
|
||||
@@ -381,4 +457,5 @@ async def get_stats(db: Session = Depends(get_db)):
|
||||
third_country_count=sum(1 for a in activities if a.third_country_transfers),
|
||||
draft_count=by_status.get('DRAFT', 0),
|
||||
approved_count=by_status.get('APPROVED', 0),
|
||||
overdue_review_count=overdue_count,
|
||||
)
|
||||
|
||||
@@ -73,6 +73,10 @@ class VVTActivityDB(Base):
|
||||
status = Column(String(20), default='DRAFT')
|
||||
responsible = Column(String(200))
|
||||
owner = Column(String(200))
|
||||
last_reviewed_at = Column(DateTime(timezone=True), nullable=True)
|
||||
next_review_at = Column(DateTime(timezone=True), nullable=True)
|
||||
created_by = Column(String(200), default='system')
|
||||
dsfa_id = Column(UUID(as_uuid=True), nullable=True)
|
||||
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||||
|
||||
|
||||
14
backend-compliance/migrations/033_vvt_consolidation.sql
Normal file
14
backend-compliance/migrations/033_vvt_consolidation.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
-- 033_vvt_consolidation.sql
|
||||
-- Portiert Go-exklusive VVT-Features nach Python (Source of Truth)
|
||||
-- P0: Review-Daten, created_by | P1: DSFA-Link
|
||||
|
||||
ALTER TABLE compliance_vvt_activities
|
||||
ADD COLUMN IF NOT EXISTS last_reviewed_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS next_review_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS created_by VARCHAR(200) DEFAULT 'system',
|
||||
ADD COLUMN IF NOT EXISTS dsfa_id UUID;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_vvt_activities_dsfa
|
||||
ON compliance_vvt_activities(dsfa_id) WHERE dsfa_id IS NOT NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_vvt_activities_next_review
|
||||
ON compliance_vvt_activities(next_review_at) WHERE next_review_at IS NOT NULL;
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from datetime import datetime, date
|
||||
from datetime import datetime, date, timedelta, timezone
|
||||
import uuid
|
||||
|
||||
from compliance.api.schemas import (
|
||||
@@ -104,10 +104,24 @@ class TestVVTStatsResponse:
|
||||
third_country_count=0,
|
||||
draft_count=3,
|
||||
approved_count=2,
|
||||
overdue_review_count=1,
|
||||
)
|
||||
assert stats.total == 5
|
||||
assert stats.by_status["DRAFT"] == 3
|
||||
assert stats.dpia_required_count == 1
|
||||
assert stats.overdue_review_count == 1
|
||||
|
||||
def test_stats_overdue_default_zero(self):
|
||||
stats = VVTStatsResponse(
|
||||
total=0,
|
||||
by_status={},
|
||||
by_business_function={},
|
||||
dpia_required_count=0,
|
||||
third_country_count=0,
|
||||
draft_count=0,
|
||||
approved_count=0,
|
||||
)
|
||||
assert stats.overdue_review_count == 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -168,6 +182,10 @@ class TestActivityToResponse:
|
||||
act.status = kwargs.get("status", "DRAFT")
|
||||
act.responsible = kwargs.get("responsible", None)
|
||||
act.owner = kwargs.get("owner", None)
|
||||
act.last_reviewed_at = kwargs.get("last_reviewed_at", None)
|
||||
act.next_review_at = kwargs.get("next_review_at", None)
|
||||
act.created_by = kwargs.get("created_by", None)
|
||||
act.dsfa_id = kwargs.get("dsfa_id", None)
|
||||
act.created_at = datetime.utcnow()
|
||||
act.updated_at = None
|
||||
return act
|
||||
@@ -220,3 +238,196 @@ class TestLogAudit:
|
||||
_log_audit(mock_db, "DELETE", "activity")
|
||||
added = mock_db.add.call_args[0][0]
|
||||
assert added.changed_by == "system"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Consolidation Tests (Go → Python feature parity)
|
||||
# =============================================================================
|
||||
|
||||
class TestVVTConsolidationSchemas:
|
||||
"""Tests for new fields ported from Go: review dates, created_by, dsfa_id."""
|
||||
|
||||
def test_activity_create_with_review_dates(self):
|
||||
now = datetime.now(timezone.utc)
|
||||
future = now + timedelta(days=365)
|
||||
req = VVTActivityCreate(
|
||||
vvt_id="VVT-REV-001",
|
||||
name="Review-Test",
|
||||
last_reviewed_at=now,
|
||||
next_review_at=future,
|
||||
)
|
||||
assert req.last_reviewed_at == now
|
||||
assert req.next_review_at == future
|
||||
|
||||
def test_activity_create_sets_created_by(self):
|
||||
req = VVTActivityCreate(
|
||||
vvt_id="VVT-CB-001",
|
||||
name="Created-By Test",
|
||||
created_by="admin@example.com",
|
||||
)
|
||||
assert req.created_by == "admin@example.com"
|
||||
|
||||
def test_activity_create_created_by_defaults_none(self):
|
||||
req = VVTActivityCreate(vvt_id="VVT-CB-002", name="Default Test")
|
||||
assert req.created_by is None
|
||||
|
||||
def test_activity_create_with_dsfa_id(self):
|
||||
dsfa_uuid = str(uuid.uuid4())
|
||||
req = VVTActivityCreate(
|
||||
vvt_id="VVT-DSFA-001",
|
||||
name="DSFA-Link Test",
|
||||
dsfa_id=dsfa_uuid,
|
||||
)
|
||||
assert req.dsfa_id == dsfa_uuid
|
||||
|
||||
def test_activity_update_review_dates(self):
|
||||
now = datetime.now(timezone.utc)
|
||||
req = VVTActivityUpdate(
|
||||
last_reviewed_at=now,
|
||||
next_review_at=now + timedelta(days=180),
|
||||
)
|
||||
data = req.model_dump(exclude_none=True)
|
||||
assert "last_reviewed_at" in data
|
||||
assert "next_review_at" in data
|
||||
|
||||
def test_activity_update_dsfa_id(self):
|
||||
dsfa_uuid = str(uuid.uuid4())
|
||||
req = VVTActivityUpdate(dsfa_id=dsfa_uuid)
|
||||
data = req.model_dump(exclude_none=True)
|
||||
assert data["dsfa_id"] == dsfa_uuid
|
||||
|
||||
|
||||
class TestVVTConsolidationResponse:
|
||||
"""Tests for new fields in response mapping."""
|
||||
|
||||
def _make_activity(self, **kwargs) -> VVTActivityDB:
|
||||
act = VVTActivityDB()
|
||||
act.id = uuid.uuid4()
|
||||
act.vvt_id = kwargs.get("vvt_id", "VVT-001")
|
||||
act.name = kwargs.get("name", "Test")
|
||||
act.description = None
|
||||
act.purposes = []
|
||||
act.legal_bases = []
|
||||
act.data_subject_categories = []
|
||||
act.personal_data_categories = []
|
||||
act.recipient_categories = []
|
||||
act.third_country_transfers = []
|
||||
act.retention_period = {}
|
||||
act.tom_description = None
|
||||
act.business_function = None
|
||||
act.systems = []
|
||||
act.deployment_model = None
|
||||
act.data_sources = []
|
||||
act.data_flows = []
|
||||
act.protection_level = "MEDIUM"
|
||||
act.dpia_required = False
|
||||
act.structured_toms = {}
|
||||
act.status = "DRAFT"
|
||||
act.responsible = None
|
||||
act.owner = None
|
||||
act.last_reviewed_at = kwargs.get("last_reviewed_at", None)
|
||||
act.next_review_at = kwargs.get("next_review_at", None)
|
||||
act.created_by = kwargs.get("created_by", None)
|
||||
act.dsfa_id = kwargs.get("dsfa_id", None)
|
||||
act.created_at = datetime.utcnow()
|
||||
act.updated_at = None
|
||||
return act
|
||||
|
||||
def test_response_includes_review_dates(self):
|
||||
now = datetime.now(timezone.utc)
|
||||
future = now + timedelta(days=365)
|
||||
act = self._make_activity(last_reviewed_at=now, next_review_at=future)
|
||||
resp = _activity_to_response(act)
|
||||
assert resp.last_reviewed_at == now
|
||||
assert resp.next_review_at == future
|
||||
|
||||
def test_response_includes_created_by(self):
|
||||
act = self._make_activity(created_by="admin@example.com")
|
||||
resp = _activity_to_response(act)
|
||||
assert resp.created_by == "admin@example.com"
|
||||
|
||||
def test_response_includes_dsfa_id(self):
|
||||
dsfa_uuid = uuid.uuid4()
|
||||
act = self._make_activity(dsfa_id=dsfa_uuid)
|
||||
resp = _activity_to_response(act)
|
||||
assert resp.dsfa_id == str(dsfa_uuid)
|
||||
|
||||
def test_response_null_new_fields(self):
|
||||
act = self._make_activity()
|
||||
resp = _activity_to_response(act)
|
||||
assert resp.last_reviewed_at is None
|
||||
assert resp.next_review_at is None
|
||||
assert resp.created_by is None
|
||||
assert resp.dsfa_id is None
|
||||
|
||||
|
||||
class TestVVTCsvExport:
|
||||
"""Tests for CSV export functionality."""
|
||||
|
||||
def _collect_csv_body(self, response) -> str:
|
||||
"""Extract text from StreamingResponse (async generator)."""
|
||||
import asyncio
|
||||
async def _read():
|
||||
chunks = []
|
||||
async for chunk in response.body_iterator:
|
||||
chunks.append(chunk)
|
||||
return ''.join(chunks)
|
||||
return asyncio.get_event_loop().run_until_complete(_read())
|
||||
|
||||
def test_export_csv_format(self):
|
||||
from compliance.api.vvt_routes import _export_csv
|
||||
act = VVTActivityDB()
|
||||
act.id = uuid.uuid4()
|
||||
act.vvt_id = "VVT-CSV-001"
|
||||
act.name = "CSV Test"
|
||||
act.purposes = ["Zweck A", "Zweck B"]
|
||||
act.legal_bases = ["Art. 6 Abs. 1b"]
|
||||
act.personal_data_categories = ["Email"]
|
||||
act.data_subject_categories = ["Kunden"]
|
||||
act.recipient_categories = ["IT-Dienstleister"]
|
||||
act.third_country_transfers = ["USA"]
|
||||
act.retention_period = {"duration": "3 Jahre"}
|
||||
act.status = "APPROVED"
|
||||
act.responsible = "DSB"
|
||||
act.created_by = "admin"
|
||||
act.created_at = datetime(2026, 1, 15, 10, 30)
|
||||
act.updated_at = None
|
||||
|
||||
response = _export_csv([act])
|
||||
text = self._collect_csv_body(response)
|
||||
assert 'VVT-CSV-001' in text
|
||||
assert 'CSV Test' in text
|
||||
assert 'APPROVED' in text
|
||||
|
||||
def test_export_csv_semicolon_separator(self):
|
||||
from compliance.api.vvt_routes import _export_csv
|
||||
act = VVTActivityDB()
|
||||
act.id = uuid.uuid4()
|
||||
act.vvt_id = "VVT-SEP-001"
|
||||
act.name = "Separator Test"
|
||||
act.purposes = []
|
||||
act.legal_bases = []
|
||||
act.personal_data_categories = []
|
||||
act.data_subject_categories = []
|
||||
act.recipient_categories = []
|
||||
act.third_country_transfers = []
|
||||
act.retention_period = {}
|
||||
act.status = "DRAFT"
|
||||
act.responsible = ""
|
||||
act.created_by = "system"
|
||||
act.created_at = datetime(2026, 3, 1, 12, 0)
|
||||
act.updated_at = None
|
||||
|
||||
response = _export_csv([act])
|
||||
text = self._collect_csv_body(response)
|
||||
lines = text.strip().split('\n')
|
||||
header = lines[0]
|
||||
assert ';' in header
|
||||
assert 'ID;VVT-ID;Name' in header.replace('\ufeff', '')
|
||||
|
||||
def test_export_csv_empty_list(self):
|
||||
from compliance.api.vvt_routes import _export_csv
|
||||
response = _export_csv([])
|
||||
text = self._collect_csv_body(response)
|
||||
lines = text.strip().split('\n')
|
||||
assert len(lines) == 1
|
||||
|
||||
Reference in New Issue
Block a user