""" Tests for Compliance API endpoints. Tests cover: - GET /api/v1/compliance/regulations - GET /api/v1/compliance/requirements (with pagination) - GET /api/v1/compliance/controls - GET /api/v1/compliance/dashboard - POST /api/v1/compliance/evidence/collect - GET /api/v1/compliance/evidence/ci-status """ import pytest from datetime import datetime, timedelta from unittest.mock import MagicMock, patch from fastapi.testclient import TestClient # Test with in-memory SQLite for isolation from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool from classroom_engine.database import Base, get_db from compliance.api.routes import router from compliance.db.models import ( RegulationDB, RequirementDB, ControlDB, EvidenceDB, RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, EvidenceStatusEnum, ControlTypeEnum ) from compliance.db.repository import ( RegulationRepository, RequirementRepository, ControlRepository, EvidenceRepository ) # FastAPI app setup for testing from fastapi import FastAPI app = FastAPI() app.include_router(router, prefix="/api/v1") @pytest.fixture def db_session(): """Create in-memory SQLite session for tests.""" # Use StaticPool to ensure single connection for SQLite in-memory # This is critical because SQLite :memory: DBs are connection-specific engine = create_engine( "sqlite:///:memory:", echo=False, connect_args={"check_same_thread": False}, poolclass=StaticPool ) # Ensure all compliance models are imported and registered with Base # before creating tables (import order matters for SQLAlchemy metadata) from compliance.db import models as compliance_models # noqa: F401 from classroom_engine import db_models as classroom_models # noqa: F401 Base.metadata.create_all(engine) SessionLocal = sessionmaker(bind=engine) session = SessionLocal() yield session session.close() @pytest.fixture def client(db_session): """Create test client with DB override.""" def override_get_db(): try: yield db_session finally: pass app.dependency_overrides[get_db] = override_get_db client = TestClient(app) yield client app.dependency_overrides.clear() @pytest.fixture def sample_regulation(db_session): """Create a sample regulation for testing.""" repo = RegulationRepository(db_session) return repo.create( code="GDPR", name="General Data Protection Regulation", regulation_type=RegulationTypeEnum.EU_REGULATION, full_name="Regulation (EU) 2016/679", description="EU data protection regulation", ) @pytest.fixture def sample_requirement(db_session, sample_regulation): """Create a sample requirement for testing.""" repo = RequirementRepository(db_session) return repo.create( regulation_id=sample_regulation.id, article="Art. 32", title="Security of processing", description="Test requirement", requirement_text="The controller shall implement appropriate technical measures...", is_applicable=True, priority=1, ) @pytest.fixture def sample_control(db_session): """Create a sample control for testing.""" repo = ControlRepository(db_session) return repo.create( control_id="CRYPTO-001", title="TLS 1.3 Encryption", description="All external communication uses TLS 1.3", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.PREVENTIVE, pass_criteria="All connections use TLS 1.3", ) # ============================================================================ # Regulations Tests # ============================================================================ class TestRegulationsAPI: """Tests for regulations endpoints.""" def test_list_regulations_empty(self, client, db_session): """Test listing regulations when database is empty.""" response = client.get("/api/v1/compliance/regulations") assert response.status_code == 200 data = response.json() assert data["total"] == 0 assert data["regulations"] == [] def test_list_regulations_with_data(self, client, db_session, sample_regulation): """Test listing regulations with data.""" response = client.get("/api/v1/compliance/regulations") assert response.status_code == 200 data = response.json() assert data["total"] == 1 assert len(data["regulations"]) == 1 assert data["regulations"][0]["code"] == "GDPR" assert data["regulations"][0]["name"] == "General Data Protection Regulation" def test_list_regulations_filter_by_type(self, client, db_session): """Test filtering regulations by type.""" # Create regulations of different types repo = RegulationRepository(db_session) repo.create( code="GDPR", name="GDPR", regulation_type=RegulationTypeEnum.EU_REGULATION, ) repo.create( code="BSI-TR", name="BSI Technical Guideline", regulation_type=RegulationTypeEnum.BSI_STANDARD, ) # Filter by EU_REGULATION response = client.get("/api/v1/compliance/regulations?regulation_type=eu_regulation") assert response.status_code == 200 data = response.json() assert data["total"] == 1 assert data["regulations"][0]["code"] == "GDPR" def test_list_regulations_filter_by_active(self, client, db_session): """Test filtering regulations by active status.""" repo = RegulationRepository(db_session) active = repo.create(code="ACTIVE", name="Active Reg", regulation_type=RegulationTypeEnum.EU_REGULATION) inactive = repo.create(code="INACTIVE", name="Inactive Reg", regulation_type=RegulationTypeEnum.EU_REGULATION) repo.update(inactive.id, is_active=False) # Get only active response = client.get("/api/v1/compliance/regulations?is_active=true") assert response.status_code == 200 data = response.json() assert data["total"] == 1 assert data["regulations"][0]["code"] == "ACTIVE" def test_get_regulation_by_code(self, client, db_session, sample_regulation): """Test getting specific regulation by code.""" response = client.get("/api/v1/compliance/regulations/GDPR") assert response.status_code == 200 data = response.json() assert data["code"] == "GDPR" assert data["name"] == "General Data Protection Regulation" assert "requirement_count" in data def test_get_regulation_not_found(self, client, db_session): """Test getting non-existent regulation.""" response = client.get("/api/v1/compliance/regulations/NONEXISTENT") assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() # ============================================================================ # Requirements Tests # ============================================================================ class TestRequirementsAPI: """Tests for requirements endpoints.""" def test_list_requirements_paginated_empty(self, client, db_session): """Test paginated requirements with empty database.""" response = client.get("/api/v1/compliance/requirements") assert response.status_code == 200 data = response.json() assert data["pagination"]["total"] == 0 assert data["data"] == [] def test_list_requirements_paginated_with_data(self, client, db_session, sample_regulation, sample_requirement): """Test paginated requirements with data.""" response = client.get("/api/v1/compliance/requirements") assert response.status_code == 200 data = response.json() assert data["pagination"]["total"] == 1 assert len(data["data"]) == 1 assert data["data"][0]["article"] == "Art. 32" assert data["data"][0]["title"] == "Security of processing" def test_list_requirements_pagination_parameters(self, client, db_session, sample_regulation): """Test pagination parameters.""" # Create 5 requirements repo = RequirementRepository(db_session) for i in range(5): repo.create( regulation_id=sample_regulation.id, article=f"Art. {i}", title=f"Requirement {i}", is_applicable=True, ) # Test page size response = client.get("/api/v1/compliance/requirements?page=1&page_size=2") assert response.status_code == 200 data = response.json() assert len(data["data"]) == 2 assert data["pagination"]["page"] == 1 assert data["pagination"]["page_size"] == 2 assert data["pagination"]["total"] == 5 assert data["pagination"]["total_pages"] == 3 assert data["pagination"]["has_next"] is True assert data["pagination"]["has_prev"] is False # Test page 2 response = client.get("/api/v1/compliance/requirements?page=2&page_size=2") data = response.json() assert data["pagination"]["page"] == 2 assert data["pagination"]["has_next"] is True assert data["pagination"]["has_prev"] is True def test_list_requirements_filter_by_regulation(self, client, db_session): """Test filtering requirements by regulation code.""" # Create two regulations with requirements repo_reg = RegulationRepository(db_session) repo_req = RequirementRepository(db_session) gdpr = repo_reg.create(code="GDPR", name="GDPR", regulation_type=RegulationTypeEnum.EU_REGULATION) bsi = repo_reg.create(code="BSI", name="BSI", regulation_type=RegulationTypeEnum.BSI_STANDARD) repo_req.create(regulation_id=gdpr.id, article="Art. 1", title="GDPR Req") repo_req.create(regulation_id=bsi.id, article="T.1", title="BSI Req") # Filter by GDPR response = client.get("/api/v1/compliance/requirements?regulation_code=GDPR") data = response.json() assert data["pagination"]["total"] == 1 assert data["data"][0]["title"] == "GDPR Req" def test_list_requirements_filter_by_applicable(self, client, db_session, sample_regulation): """Test filtering by applicability.""" repo = RequirementRepository(db_session) applicable = repo.create( regulation_id=sample_regulation.id, article="Art. 1", title="Applicable", is_applicable=True, ) not_applicable = repo.create( regulation_id=sample_regulation.id, article="Art. 2", title="Not Applicable", is_applicable=False, ) # Get only applicable response = client.get("/api/v1/compliance/requirements?is_applicable=true") data = response.json() assert data["pagination"]["total"] == 1 assert data["data"][0]["title"] == "Applicable" def test_list_requirements_search(self, client, db_session, sample_regulation): """Test search functionality.""" repo = RequirementRepository(db_session) repo.create( regulation_id=sample_regulation.id, article="Art. 1", title="Security of processing", description="Encryption requirements", ) repo.create( regulation_id=sample_regulation.id, article="Art. 2", title="Data minimization", description="Minimize data collection", ) # Search for "security" response = client.get("/api/v1/compliance/requirements?search=security") data = response.json() assert data["pagination"]["total"] == 1 assert "security" in data["data"][0]["title"].lower() def test_get_requirement_by_id(self, client, db_session, sample_requirement): """Test getting specific requirement by ID.""" response = client.get(f"/api/v1/compliance/requirements/{sample_requirement.id}") assert response.status_code == 200 data = response.json() assert data["id"] == sample_requirement.id assert data["article"] == "Art. 32" def test_get_requirement_not_found(self, client, db_session): """Test getting non-existent requirement.""" response = client.get("/api/v1/compliance/requirements/nonexistent-id") assert response.status_code == 404 # ============================================================================ # Controls Tests # ============================================================================ class TestControlsAPI: """Tests for controls endpoints.""" def test_list_controls_empty(self, client, db_session): """Test listing controls with empty database.""" response = client.get("/api/v1/compliance/controls") assert response.status_code == 200 data = response.json() assert data["total"] == 0 assert data["controls"] == [] def test_list_controls_with_data(self, client, db_session, sample_control): """Test listing controls with data.""" response = client.get("/api/v1/compliance/controls") assert response.status_code == 200 data = response.json() assert data["total"] == 1 assert len(data["controls"]) == 1 assert data["controls"][0]["control_id"] == "CRYPTO-001" def test_list_controls_filter_by_domain(self, client, db_session): """Test filtering controls by domain.""" repo = ControlRepository(db_session) repo.create( control_id="CRYPTO-001", title="Crypto Control", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.PREVENTIVE, pass_criteria="Test criteria", ) repo.create( control_id="IAM-001", title="IAM Control", domain=ControlDomainEnum.IAM, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="Test criteria", ) response = client.get("/api/v1/compliance/controls?domain=crypto") data = response.json() assert data["total"] == 1 assert data["controls"][0]["control_id"] == "CRYPTO-001" def test_list_controls_filter_by_status(self, client, db_session): """Test filtering controls by status.""" repo = ControlRepository(db_session) control1 = repo.create( control_id="PASS-001", title="Passing Control", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.PREVENTIVE, pass_criteria="Test criteria", ) # Update status after creation control1.status = ControlStatusEnum.PASS db_session.commit() control2 = repo.create( control_id="FAIL-001", title="Failing Control", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="Test criteria", ) control2.status = ControlStatusEnum.FAIL db_session.commit() response = client.get("/api/v1/compliance/controls?status=pass") data = response.json() assert data["total"] == 1 assert data["controls"][0]["control_id"] == "PASS-001" # ============================================================================ # Dashboard Tests # ============================================================================ class TestDashboardAPI: """Tests for dashboard endpoint.""" def test_dashboard_empty(self, client, db_session): """Test dashboard with empty database.""" response = client.get("/api/v1/compliance/dashboard") assert response.status_code == 200 data = response.json() assert data["compliance_score"] == 0 assert data["total_regulations"] == 0 assert data["total_requirements"] == 0 assert data["total_controls"] == 0 def test_dashboard_with_data(self, client, db_session, sample_regulation, sample_requirement, sample_control): """Test dashboard with data.""" response = client.get("/api/v1/compliance/dashboard") assert response.status_code == 200 data = response.json() # Check basic counts assert data["total_regulations"] > 0 assert data["total_requirements"] > 0 assert data["total_controls"] > 0 # Check compliance score calculation assert 0 <= data["compliance_score"] <= 100 # Check structure assert "controls_by_status" in data assert "controls_by_domain" in data assert "evidence_by_status" in data assert "risks_by_level" in data def test_dashboard_compliance_score_calculation(self, client, db_session): """Test compliance score is calculated correctly.""" repo = ControlRepository(db_session) # Create controls with different statuses c1 = repo.create(control_id="PASS-1", title="Pass 1", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.PREVENTIVE, pass_criteria="Test") c1.status = ControlStatusEnum.PASS c2 = repo.create(control_id="PASS-2", title="Pass 2", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.PREVENTIVE, pass_criteria="Test") c2.status = ControlStatusEnum.PASS c3 = repo.create(control_id="PARTIAL-1", title="Partial", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="Test") c3.status = ControlStatusEnum.PARTIAL c4 = repo.create(control_id="FAIL-1", title="Fail", domain=ControlDomainEnum.CRYPTO, control_type=ControlTypeEnum.CORRECTIVE, pass_criteria="Test") c4.status = ControlStatusEnum.FAIL db_session.commit() response = client.get("/api/v1/compliance/dashboard") data = response.json() # Score = (2 pass + 0.5 * 1 partial) / 4 total = 2.5 / 4 = 62.5% expected_score = ((2 + 0.5) / 4) * 100 assert data["compliance_score"] == round(expected_score, 1) # ============================================================================ # Evidence Collection Tests # ============================================================================ class TestEvidenceCollectionAPI: """Tests for evidence collection endpoints.""" def test_collect_evidence_missing_source(self, client, db_session): """Test evidence collection without source parameter.""" response = client.post("/api/v1/compliance/evidence/collect") assert response.status_code == 422 # Missing required parameter def test_collect_evidence_invalid_source(self, client, db_session): """Test evidence collection with invalid source.""" response = client.post("/api/v1/compliance/evidence/collect?source=invalid_source") assert response.status_code == 400 assert "Unknown source" in response.json()["detail"] def test_collect_evidence_control_not_found(self, client, db_session): """Test evidence collection when control doesn't exist.""" response = client.post("/api/v1/compliance/evidence/collect?source=sast") # Should return 404 because control SDLC-001 doesn't exist assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() def test_collect_evidence_sast(self, client, db_session): """Test SAST evidence collection.""" # First create the control repo = ControlRepository(db_session) control = repo.create( control_id="SDLC-001", title="SAST Scanning", domain=ControlDomainEnum.SDLC, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="No critical vulnerabilities", ) control.status = ControlStatusEnum.PASS db_session.commit() report_data = { "findings": [ {"severity": "high", "rule": "sql-injection"}, ], "summary": {"total": 1, "high": 1} } response = client.post( "/api/v1/compliance/evidence/collect?source=sast&ci_job_id=12345", json=report_data ) assert response.status_code == 200 data = response.json() assert data["success"] is True assert "evidence_id" in data def test_collect_evidence_dependency_scan(self, client, db_session): """Test dependency scan evidence collection.""" repo = ControlRepository(db_session) repo.create( control_id="SDLC-002", title="Dependency Scanning", domain=ControlDomainEnum.SDLC, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="No critical vulnerabilities", ) report_data = { "vulnerabilities": [], "summary": {"total": 0, "critical": 0} } response = client.post( "/api/v1/compliance/evidence/collect?source=dependency_scan", json=report_data ) assert response.status_code == 200 def test_collect_evidence_with_ci_metadata(self, client, db_session): """Test evidence collection with CI/CD metadata.""" repo = ControlRepository(db_session) repo.create( control_id="SDLC-001", title="SAST Scanning", domain=ControlDomainEnum.SDLC, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="No critical vulnerabilities", ) response = client.post( "/api/v1/compliance/evidence/collect" "?source=sast" "&ci_job_id=job-123" "&ci_job_url=https://github.com/actions/runs/123", json={"findings": []} ) assert response.status_code == 200 class TestEvidenceStatusAPI: """Tests for CI evidence status endpoint.""" def test_ci_status_empty(self, client, db_session): """Test CI status with no evidence.""" response = client.get("/api/v1/compliance/evidence/ci-status") assert response.status_code == 200 data = response.json() assert "controls" in data or "message" in data def test_ci_status_with_evidence(self, client, db_session): """Test CI status with evidence.""" # Create control and evidence ctrl_repo = ControlRepository(db_session) evidence_repo = EvidenceRepository(db_session) control = ctrl_repo.create( control_id="SDLC-001", title="SAST", domain=ControlDomainEnum.SDLC, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="No critical vulnerabilities", ) evidence_repo.create( control_id=control.control_id, # Use control_id string, not UUID evidence_type="report", title="CI Pipeline Evidence", source="ci_pipeline", ci_job_id="123", ) response = client.get("/api/v1/compliance/evidence/ci-status") assert response.status_code == 200 def test_ci_status_filter_by_control(self, client, db_session): """Test filtering CI status by control ID.""" ctrl_repo = ControlRepository(db_session) evidence_repo = EvidenceRepository(db_session) control1 = ctrl_repo.create(control_id="SDLC-001", title="SAST", domain=ControlDomainEnum.SDLC, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="Test") control2 = ctrl_repo.create(control_id="SDLC-002", title="Deps", domain=ControlDomainEnum.SDLC, control_type=ControlTypeEnum.DETECTIVE, pass_criteria="Test") evidence_repo.create(control_id=control1.control_id, evidence_type="report", title="Evidence 1", source="ci_pipeline") evidence_repo.create(control_id=control2.control_id, evidence_type="report", title="Evidence 2", source="ci_pipeline") response = client.get("/api/v1/compliance/evidence/ci-status?control_id=SDLC-001") assert response.status_code == 200 def test_ci_status_days_filter(self, client, db_session): """Test filtering CI status by days.""" response = client.get("/api/v1/compliance/evidence/ci-status?days=7") assert response.status_code == 200 if __name__ == "__main__": pytest.main([__file__, "-v"])