Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 37s
CI / test-python-backend-compliance (push) Successful in 32s
CI / test-python-dsms-gateway (push) Has been cancelled
CI / test-python-document-crawler (push) Has been cancelled
Import-Backend:
- GET /v1/import (Root-Alias) → list_documents; behebt URL-Mismatch im Proxy
- DELETE /v1/import/{document_id} → löscht Dokument + Gap-Analyse (mit Tenant-Isolierung)
- 6 neue Tests (65 total, alle grün)
Screening-Frontend:
- Simulierten Fortschrittsbalken (Math.random) entfernt — war inhaltlich falsch
- Ersetzt durch indeterminate Spinner + rotierende ehrliche Status-Texte
(z.B. "OSV.dev Datenbank wird abgefragt...") im 2-Sek.-Takt
- Kein scanProgress-State mehr benötigt
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
670 lines
26 KiB
Python
670 lines
26 KiB
Python
"""Tests for Document Import routes (import_routes.py)."""
|
|
|
|
import pytest
|
|
from unittest.mock import MagicMock, patch, AsyncMock
|
|
|
|
from compliance.api.import_routes import (
|
|
detect_document_type,
|
|
analyze_gaps,
|
|
extract_text_from_pdf,
|
|
)
|
|
|
|
|
|
class TestDetectDocumentType:
|
|
"""Tests for keyword-based document type detection."""
|
|
|
|
def test_dsfa_detection(self):
|
|
text = "Dies ist eine Datenschutz-Folgenabschaetzung (DSFA) nach Art. 35 DSGVO"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "DSFA"
|
|
assert confidence >= 0.5
|
|
|
|
def test_tom_detection(self):
|
|
text = "Technisch-organisatorische Massnahmen (TOM) zum Schutz personenbezogener Daten"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "TOM"
|
|
assert confidence >= 0.5
|
|
|
|
def test_vvt_detection(self):
|
|
text = "Verarbeitungsverzeichnis nach Art. 30 DSGVO - VVT processing activities"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "VVT"
|
|
assert confidence >= 0.5
|
|
|
|
def test_privacy_policy_detection(self):
|
|
text = "Datenschutzerklaerung - Privacy Policy fuer unsere Nutzer"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "PRIVACY_POLICY"
|
|
assert confidence >= 0.5
|
|
|
|
def test_unknown_document(self):
|
|
text = "Lorem ipsum dolor sit amet"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "OTHER"
|
|
assert confidence == 0.3
|
|
|
|
def test_empty_text(self):
|
|
doc_type, confidence = detect_document_type("")
|
|
assert doc_type == "OTHER"
|
|
assert confidence == 0.3
|
|
|
|
def test_confidence_increases_with_more_keywords(self):
|
|
text_single = "dsfa"
|
|
text_multi = "dsfa dpia datenschutz-folgenabschaetzung privacy impact"
|
|
_, conf_single = detect_document_type(text_single)
|
|
_, conf_multi = detect_document_type(text_multi)
|
|
assert conf_multi > conf_single
|
|
|
|
def test_confidence_capped_at_095(self):
|
|
text = "dsfa dpia datenschutz-folgenabschaetzung privacy impact assessment report analysis"
|
|
_, confidence = detect_document_type(text)
|
|
assert confidence <= 0.95
|
|
|
|
|
|
class TestAnalyzeGaps:
|
|
"""Tests for gap analysis rules."""
|
|
|
|
def test_ai_gap_detected(self):
|
|
text = "Wir setzen KI und AI in unserer Anwendung ein"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
# Should detect AI Act gap (missing risk classification)
|
|
ai_gaps = [g for g in gaps if g["category"] == "AI Act Compliance"]
|
|
assert len(ai_gaps) > 0
|
|
assert ai_gaps[0]["severity"] == "CRITICAL"
|
|
|
|
def test_no_gap_when_requirement_present(self):
|
|
text = "KI-System mit Risikoklassifizierung nach EU AI Act"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
ai_gaps = [g for g in gaps if g["category"] == "AI Act Compliance"]
|
|
assert len(ai_gaps) == 0
|
|
|
|
def test_tom_gap_detected(self):
|
|
text = "Cloud-basiertes SaaS-System mit KI-Funktionen"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
tom_gaps = [g for g in gaps if g["category"] == "TOMs"]
|
|
assert len(tom_gaps) > 0
|
|
|
|
def test_no_gaps_for_irrelevant_text(self):
|
|
text = "Ein einfacher Flyer ohne Relevanz"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
assert len(gaps) == 0
|
|
|
|
def test_gap_has_required_fields(self):
|
|
text = "KI-System mit automatisierten Entscheidungen"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
assert len(gaps) > 0
|
|
for gap in gaps:
|
|
assert "id" in gap
|
|
assert "category" in gap
|
|
assert "severity" in gap
|
|
assert "regulation" in gap
|
|
assert "required_action" in gap
|
|
|
|
|
|
class TestExtractTextFromPdf:
|
|
"""Tests for PDF text extraction."""
|
|
|
|
def test_empty_bytes_returns_empty(self):
|
|
result = extract_text_from_pdf(b"")
|
|
assert result == ""
|
|
|
|
def test_invalid_pdf_returns_empty(self):
|
|
result = extract_text_from_pdf(b"not a pdf")
|
|
assert result == ""
|
|
|
|
def test_fitz_import_error(self):
|
|
"""When fitz is not installed, extract_text_from_pdf returns empty string."""
|
|
import sys
|
|
# Temporarily hide fitz from imports
|
|
original = sys.modules.get("fitz")
|
|
sys.modules["fitz"] = None # type: ignore
|
|
try:
|
|
result = extract_text_from_pdf(b"fake pdf content")
|
|
assert isinstance(result, str)
|
|
finally:
|
|
if original is None:
|
|
sys.modules.pop("fitz", None)
|
|
else:
|
|
sys.modules["fitz"] = original
|
|
|
|
|
|
# =============================================================================
|
|
# Additional tests — extended coverage
|
|
# =============================================================================
|
|
|
|
class TestDetectDocumentTypeExtended:
|
|
"""Extended tests for document type detection edge cases."""
|
|
|
|
def test_agb_detection(self):
|
|
text = "Allgemeine Geschaeftsbedingungen (AGB) fuer die Nutzung unserer Plattform"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "AGB"
|
|
assert confidence >= 0.5
|
|
|
|
def test_cookie_policy_detection(self):
|
|
text = "Cookie-Richtlinie: Wir setzen Tracking und Einwilligung nach DSGVO ein"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "COOKIE_POLICY"
|
|
assert confidence >= 0.5
|
|
|
|
def test_risk_assessment_detection(self):
|
|
text = "Risikobewertung und Risikoanalyse fuer Cloud-Services"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "RISK_ASSESSMENT"
|
|
assert confidence >= 0.5
|
|
|
|
def test_audit_report_detection(self):
|
|
text = "Audit-Pruefbericht nach ISO 27001 Zertifizierung"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "AUDIT_REPORT"
|
|
assert confidence >= 0.5
|
|
|
|
def test_case_insensitive_matching(self):
|
|
text = "DATENSCHUTZ-FOLGENABSCHAETZUNG NACH DSGVO"
|
|
doc_type, confidence = detect_document_type(text)
|
|
assert doc_type == "DSFA"
|
|
|
|
def test_returns_tuple(self):
|
|
result = detect_document_type("some text")
|
|
assert isinstance(result, tuple)
|
|
assert len(result) == 2
|
|
|
|
def test_confidence_is_float(self):
|
|
_, confidence = detect_document_type("some text")
|
|
assert isinstance(confidence, float)
|
|
|
|
def test_confidence_minimum_is_03(self):
|
|
_, confidence = detect_document_type("")
|
|
assert confidence == 0.3
|
|
|
|
def test_confidence_maximum_is_095(self):
|
|
# Jam all DSFA keywords in
|
|
text = " ".join(["dsfa", "dpia", "datenschutz-folgenabschaetzung", "privacy impact"] * 5)
|
|
_, confidence = detect_document_type(text)
|
|
assert confidence <= 0.95
|
|
|
|
def test_winning_type_has_most_keywords(self):
|
|
# TOM has 4 keywords, DSFA has 1
|
|
text = "technisch-organisatorische massnahmen tom technical measures dsfa"
|
|
doc_type, _ = detect_document_type(text)
|
|
assert doc_type == "TOM"
|
|
|
|
def test_whitespace_only_text(self):
|
|
doc_type, confidence = detect_document_type(" \n\t ")
|
|
assert doc_type == "OTHER"
|
|
assert confidence == 0.3
|
|
|
|
def test_numbers_only_text(self):
|
|
doc_type, confidence = detect_document_type("12345 67890")
|
|
assert doc_type == "OTHER"
|
|
|
|
|
|
class TestAnalyzeGapsExtended:
|
|
"""Extended tests for gap analysis logic."""
|
|
|
|
def test_vvt_gap_detected(self):
|
|
text = "Verarbeitung personenbezogener Daten in unserer Plattform"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
vvt_gaps = [g for g in gaps if g["category"] == "VVT"]
|
|
assert len(vvt_gaps) > 0
|
|
|
|
def test_human_oversight_gap_detected(self):
|
|
text = "KI-System mit autonomen Entscheidungen ohne menschliche Kontrolle"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
oversight_gaps = [g for g in gaps if g["category"] == "Menschliche Aufsicht"]
|
|
assert len(oversight_gaps) > 0
|
|
|
|
def test_no_oversight_gap_when_present(self):
|
|
text = "KI-System mit menschlicher Aufsicht und human-in-the-loop Prozessen"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
oversight_gaps = [g for g in gaps if g["category"] == "Menschliche Aufsicht"]
|
|
assert len(oversight_gaps) == 0
|
|
|
|
def test_transparenz_gap_detected(self):
|
|
text = "Wir setzen automatisierte Entscheidungen und Profiling ein"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
transp_gaps = [g for g in gaps if g["category"] == "Transparenz"]
|
|
assert len(transp_gaps) > 0
|
|
|
|
def test_gap_id_is_unique(self):
|
|
text = "KI-System mit Verarbeitung und automatisierten Entscheidungen ai cloud"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
ids = [g["id"] for g in gaps]
|
|
assert len(ids) == len(set(ids))
|
|
|
|
def test_gap_id_starts_with_gap(self):
|
|
text = "KI-Anwendung mit machine learning"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
if gaps:
|
|
assert gaps[0]["id"].startswith("gap-")
|
|
|
|
def test_related_step_id_matches_doc_type(self):
|
|
text = "KI-Anwendung mit machine learning"
|
|
gaps = analyze_gaps(text, "DSFA")
|
|
if gaps:
|
|
assert gaps[0]["related_step_id"] == "dsfa"
|
|
|
|
def test_severity_values_are_valid(self):
|
|
text = "KI-System mit cloud ai saas automatisierten Entscheidungen profiling"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW"}
|
|
for gap in gaps:
|
|
assert gap["severity"] in valid_severities
|
|
|
|
def test_returns_list(self):
|
|
result = analyze_gaps("", "OTHER")
|
|
assert isinstance(result, list)
|
|
|
|
def test_all_gap_fields_present(self):
|
|
text = "KI ki ai machine learning"
|
|
gaps = analyze_gaps(text, "TOM")
|
|
required_fields = {"id", "category", "description", "severity", "regulation", "required_action", "related_step_id"}
|
|
for gap in gaps:
|
|
assert required_fields.issubset(gap.keys())
|
|
|
|
def test_no_false_positives_for_empty_text(self):
|
|
gaps = analyze_gaps("", "VVT")
|
|
assert gaps == []
|
|
|
|
def test_multiple_gaps_can_be_detected(self):
|
|
# Text that triggers multiple rules
|
|
text = "ki ai cloud verarbeitung daten automatisiert profiling"
|
|
gaps = analyze_gaps(text, "OTHER")
|
|
assert len(gaps) >= 2
|
|
|
|
|
|
class TestDocumentTypeKeywords:
|
|
"""Tests for the DOCUMENT_TYPE_KEYWORDS constant."""
|
|
|
|
def test_keywords_dict_not_empty(self):
|
|
from compliance.api.import_routes import DOCUMENT_TYPE_KEYWORDS
|
|
assert len(DOCUMENT_TYPE_KEYWORDS) > 0
|
|
|
|
def test_all_types_have_keywords(self):
|
|
from compliance.api.import_routes import DOCUMENT_TYPE_KEYWORDS
|
|
for doc_type, keywords in DOCUMENT_TYPE_KEYWORDS.items():
|
|
assert len(keywords) > 0, f"{doc_type} has no keywords"
|
|
|
|
def test_dsfa_in_keywords(self):
|
|
from compliance.api.import_routes import DOCUMENT_TYPE_KEYWORDS
|
|
assert "DSFA" in DOCUMENT_TYPE_KEYWORDS
|
|
|
|
def test_tom_in_keywords(self):
|
|
from compliance.api.import_routes import DOCUMENT_TYPE_KEYWORDS
|
|
assert "TOM" in DOCUMENT_TYPE_KEYWORDS
|
|
|
|
|
|
class TestGapRules:
|
|
"""Tests for the GAP_RULES constant."""
|
|
|
|
def test_gap_rules_not_empty(self):
|
|
from compliance.api.import_routes import GAP_RULES
|
|
assert len(GAP_RULES) > 0
|
|
|
|
def test_each_rule_has_required_keys(self):
|
|
from compliance.api.import_routes import GAP_RULES
|
|
required = {"category", "regulation", "check_keywords", "gap_if_missing", "severity", "action"}
|
|
for rule in GAP_RULES:
|
|
assert required.issubset(rule.keys())
|
|
|
|
def test_check_keywords_are_lowercase(self):
|
|
from compliance.api.import_routes import GAP_RULES
|
|
for rule in GAP_RULES:
|
|
for kw in rule["check_keywords"]:
|
|
assert kw == kw.lower(), f"Keyword '{kw}' is not lowercase"
|
|
|
|
def test_gap_if_missing_are_lowercase(self):
|
|
from compliance.api.import_routes import GAP_RULES
|
|
for rule in GAP_RULES:
|
|
for kw in rule["gap_if_missing"]:
|
|
assert kw == kw.lower(), f"Keyword '{kw}' is not lowercase"
|
|
|
|
|
|
# =============================================================================
|
|
# API Endpoint Tests
|
|
# =============================================================================
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from compliance.api.import_routes import router as import_router
|
|
|
|
_app_import = FastAPI()
|
|
_app_import.include_router(import_router)
|
|
_client_import = TestClient(_app_import)
|
|
|
|
TENANT_ID = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
|
|
HEADERS = {"X-Tenant-ID": TENANT_ID}
|
|
|
|
|
|
class TestAnalyzeEndpoint:
|
|
"""API tests for POST /v1/import/analyze."""
|
|
|
|
def test_analyze_text_file_success(self):
|
|
"""Text file upload succeeds and returns DocumentAnalysisResponse fields."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL, \
|
|
patch("compliance.api.import_routes.classify_with_llm", new_callable=AsyncMock) as mock_llm:
|
|
mock_llm.return_value = None # fallback to keyword detection
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_session.execute.return_value = MagicMock()
|
|
|
|
text_content = b"Datenschutz-Folgenabschaetzung DSFA nach Art. 35 DSGVO"
|
|
response = _client_import.post(
|
|
"/v1/import/analyze",
|
|
files={"file": ("dsfa.txt", text_content, "text/plain")},
|
|
data={"document_type": "OTHER", "tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "document_id" in data
|
|
assert "detected_type" in data
|
|
assert "confidence" in data
|
|
assert "gap_analysis" in data
|
|
assert "recommendations" in data
|
|
assert isinstance(data["extracted_entities"], list)
|
|
|
|
def test_analyze_explicit_type_success(self):
|
|
"""Explicit document_type bypasses detection."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
|
|
response = _client_import.post(
|
|
"/v1/import/analyze",
|
|
files={"file": ("tom.txt", b"Some TOM content", "text/plain")},
|
|
data={"document_type": "TOM", "tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["detected_type"] == "TOM"
|
|
assert data["confidence"] == 1.0
|
|
|
|
def test_analyze_missing_file_returns_422(self):
|
|
"""Request without file returns 422."""
|
|
response = _client_import.post(
|
|
"/v1/import/analyze",
|
|
data={"document_type": "OTHER", "tenant_id": TENANT_ID},
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
def test_analyze_db_error_still_returns_200(self):
|
|
"""Even if DB write fails, the analysis response is returned."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL, \
|
|
patch("compliance.api.import_routes.classify_with_llm", new_callable=AsyncMock) as mock_llm:
|
|
mock_llm.return_value = None
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_session.execute.side_effect = Exception("DB connection failed")
|
|
|
|
response = _client_import.post(
|
|
"/v1/import/analyze",
|
|
files={"file": ("doc.txt", b"Verarbeitungsverzeichnis VVT", "text/plain")},
|
|
data={"document_type": "OTHER", "tenant_id": TENANT_ID},
|
|
)
|
|
|
|
# Analysis is returned even if DB fails (error is caught internally)
|
|
assert response.status_code == 200
|
|
|
|
def test_analyze_returns_filename(self):
|
|
"""Response contains the uploaded filename."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL, \
|
|
patch("compliance.api.import_routes.classify_with_llm", new_callable=AsyncMock) as mock_llm:
|
|
mock_llm.return_value = None
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
|
|
response = _client_import.post(
|
|
"/v1/import/analyze",
|
|
files={"file": ("my-document.txt", b"Audit report", "text/plain")},
|
|
data={"tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["filename"] == "my-document.txt"
|
|
|
|
|
|
class TestListDocumentsEndpoint:
|
|
"""API tests for GET /v1/import/documents."""
|
|
|
|
def test_list_documents_empty(self):
|
|
"""Returns empty list when no documents exist."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchall.return_value = []
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get("/v1/import/documents", params={"tenant_id": TENANT_ID})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["documents"] == []
|
|
assert data["total"] == 0
|
|
|
|
def test_list_documents_with_data(self):
|
|
"""Returns documents with correct total count."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
# Row: id, filename, file_type, file_size, detected_type, confidence,
|
|
# extracted_entities, recommendations, status, analyzed_at, created_at
|
|
mock_result.fetchall.return_value = [
|
|
["uuid-1", "dsfa.pdf", "application/pdf", 2048, "DSFA", 0.85,
|
|
["AI Act"], ["Review"], "analyzed", None, "2024-01-15"],
|
|
["uuid-2", "tom.txt", "text/plain", 512, "TOM", 0.75,
|
|
[], [], "analyzed", None, "2024-01-16"],
|
|
]
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get("/v1/import/documents", params={"tenant_id": TENANT_ID})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total"] == 2
|
|
assert len(data["documents"]) == 2
|
|
assert data["documents"][0]["filename"] == "dsfa.pdf"
|
|
|
|
def test_list_documents_tenant_filter_used(self):
|
|
"""Tenant ID is passed as query parameter."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchall.return_value = []
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get(
|
|
"/v1/import/documents",
|
|
params={"tenant_id": "custom-tenant-id"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
# Verify execute was called with the correct tenant_id
|
|
call_kwargs = mock_session.execute.call_args
|
|
assert "custom-tenant-id" in str(call_kwargs)
|
|
|
|
|
|
class TestGapAnalysisEndpoint:
|
|
"""API tests for GET /v1/import/gap-analysis/{document_id}."""
|
|
|
|
def test_get_gap_analysis_success(self):
|
|
"""Returns gap analysis when found."""
|
|
gap_row = {
|
|
"id": "gap-uuid-001",
|
|
"document_id": "doc-uuid-001",
|
|
"tenant_id": TENANT_ID,
|
|
"total_gaps": 2,
|
|
"critical_gaps": 1,
|
|
"high_gaps": 1,
|
|
"medium_gaps": 0,
|
|
"low_gaps": 0,
|
|
"gaps": [],
|
|
"recommended_packages": ["analyse"],
|
|
}
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchone.return_value = gap_row
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get(
|
|
"/v1/import/gap-analysis/doc-uuid-001",
|
|
params={"tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["document_id"] == "doc-uuid-001"
|
|
assert data["total_gaps"] == 2
|
|
|
|
def test_get_gap_analysis_not_found(self):
|
|
"""Returns 404 when no gap analysis exists for the document."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchone.return_value = None
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get(
|
|
"/v1/import/gap-analysis/nonexistent-doc",
|
|
params={"tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
assert "not found" in response.json()["detail"].lower()
|
|
|
|
def test_get_gap_analysis_uses_header_tenant(self):
|
|
"""X-Tenant-ID header takes precedence over query param."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchone.return_value = None
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
_client_import.get(
|
|
"/v1/import/gap-analysis/doc-uuid",
|
|
headers={"X-Tenant-ID": "header-tenant"},
|
|
params={"tenant_id": "query-tenant"},
|
|
)
|
|
|
|
# execute call should use "header-tenant" (X-Tenant-ID takes precedence)
|
|
call_args = mock_session.execute.call_args
|
|
assert "header-tenant" in str(call_args)
|
|
|
|
|
|
class TestListDocumentsRootEndpoint:
|
|
"""API tests for GET /v1/import (root alias — proxy-compatible URL)."""
|
|
|
|
def test_root_alias_returns_documents(self):
|
|
"""GET /v1/import returns same result as /v1/import/documents."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchall.return_value = [
|
|
["uuid-1", "dsfa.pdf", "application/pdf", 1024, "DSFA", 0.85,
|
|
[], [], "analyzed", None, "2024-01-15"],
|
|
]
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get("/v1/import", params={"tenant_id": TENANT_ID})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total"] == 1
|
|
assert data["documents"][0]["filename"] == "dsfa.pdf"
|
|
|
|
def test_root_alias_empty(self):
|
|
"""GET /v1/import returns empty list when no documents."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.fetchall.return_value = []
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.get("/v1/import", params={"tenant_id": TENANT_ID})
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["total"] == 0
|
|
|
|
|
|
class TestDeleteDocumentEndpoint:
|
|
"""API tests for DELETE /v1/import/{document_id}."""
|
|
|
|
def test_delete_existing_document(self):
|
|
"""Deletes document and returns success."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.rowcount = 1
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.delete(
|
|
"/v1/import/doc-uuid-001",
|
|
params={"tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["success"] is True
|
|
assert data["deleted_id"] == "doc-uuid-001"
|
|
mock_session.commit.assert_called_once()
|
|
|
|
def test_delete_not_found(self):
|
|
"""Returns 404 when document does not exist for tenant."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.rowcount = 0
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
response = _client_import.delete(
|
|
"/v1/import/nonexistent-doc",
|
|
params={"tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
|
|
def test_delete_uses_tenant_isolation(self):
|
|
"""Tenant ID is passed to the delete query."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_result = MagicMock()
|
|
mock_result.rowcount = 1
|
|
mock_session.execute.return_value = mock_result
|
|
|
|
_client_import.delete(
|
|
"/v1/import/doc-uuid",
|
|
params={"tenant_id": "custom-tenant"},
|
|
)
|
|
|
|
# Both execute calls should use the tenant ID
|
|
call_args_list = mock_session.execute.call_args_list
|
|
for call in call_args_list:
|
|
assert "custom-tenant" in str(call)
|
|
|
|
def test_delete_db_error_returns_500(self):
|
|
"""Database error returns 500."""
|
|
with patch("compliance.api.import_routes.SessionLocal") as MockSL:
|
|
mock_session = MagicMock()
|
|
MockSL.return_value = mock_session
|
|
mock_session.execute.side_effect = Exception("DB error")
|
|
|
|
response = _client_import.delete(
|
|
"/v1/import/doc-uuid",
|
|
params={"tenant_id": TENANT_ID},
|
|
)
|
|
|
|
assert response.status_code == 500
|