"""The MCP HTTP transport must gate on the Bearer token the scanner sends. The scanner registers us with an access_token (McpServerConfig.access_token) and calls over Streamable HTTP. When CRA_MCP_TOKEN is set, requests without the exact ``Authorization: Bearer `` must be rejected before reaching the MCP layer. """ import importlib import pytest pytest.importorskip("mcp") # MCP SDK only present in the container/CI image _MCP_HDR = {"Accept": "application/json, text/event-stream", "Content-Type": "application/json"} _INIT = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "t", "version": "1"}}, } @pytest.fixture() def client(monkeypatch): monkeypatch.setenv("CRA_MCP_TOKEN", "testtok") from starlette.testclient import TestClient import compliance.mcp.server as srv importlib.reload(srv) # rebuild app under the patched env with TestClient(srv._build_http_app()) as c: yield c def test_missing_token_rejected(client): r = client.post("/mcp", json=_INIT, headers=_MCP_HDR) assert r.status_code == 401 def test_wrong_token_rejected(client): r = client.post("/mcp", json=_INIT, headers={**_MCP_HDR, "Authorization": "Bearer nope"}) assert r.status_code == 401 def test_correct_token_passes_auth(client): # Reaches the MCP layer (any non-401 status) → the Bearer gate let it through. r = client.post("/mcp", json=_INIT, headers={**_MCP_HDR, "Authorization": "Bearer testtok"}) assert r.status_code != 401 def test_tools_registered(): import compliance.mcp.server as srv importlib.reload(srv) names = {t.name for t in srv.mcp._tool_manager.list_tools()} assert {"cra_assess_findings", "cra_list_requirements"} <= names