plexus.cli.procedure.test_mcp_transport module

Tests for the in-process MCP transport system.

Tests the EmbeddedMCPServer, InProcessMCPTransport, and ProcedureMCPClient to ensure they properly implement MCP protocol patterns without requiring a separate server process.

class plexus.cli.procedure.test_mcp_transport.TestConvenienceFunctions

Bases: object

Test convenience functions for procedure integration.

async test_create_procedure_mcp_server_basic()

Test basic server creation.

async test_create_procedure_mcp_server_with_tools()

Test server creation with Plexus tool registration.

class plexus.cli.procedure.test_mcp_transport.TestEmbeddedMCPServer

Bases: object

Test the embedded MCP server functionality.

experiment_context()
server(experiment_context)
async test_connection_context_manager(server)

Test the connection context manager.

async test_connection_default_client_info(server)

Test connection with default client info.

async test_core_tool_get_context(server)

Test the get_experiment_context core tool.

async test_core_tool_log_message(server)

Test the log_message core tool.

test_server_initialization(server, experiment_context)

Test server initialization with context.

class plexus.cli.procedure.test_mcp_transport.TestInProcessMCPTransport

Bases: object

Test the core in-process MCP transport functionality.

sample_resource()
sample_tool()
async test_call_tool_error_handling(transport)

Test error handling in tool execution.

async test_call_tool_not_found(transport)

Test calling non-existent tool.

async test_call_tool_not_initialized(transport, sample_tool)

Test calling tool without initialization.

async test_call_tool_success(transport, sample_tool)

Test successful tool execution.

async test_initialize_connection(transport)

Test MCP connection initialization.

async test_list_resources(transport, sample_resource)

Test listing available resources.

async test_list_tools(transport, sample_tool)

Test listing available tools.

async test_read_resource_not_found(transport)

Test reading non-existent resource.

async test_read_resource_success(transport, sample_resource)

Test successful resource reading.

test_resource_registration(transport, sample_resource)

Test resource registration.

test_tool_registration(transport, sample_tool)

Test tool registration.

test_transport_initialization(transport)

Test basic transport initialization.

transport()
class plexus.cli.procedure.test_mcp_transport.TestIntegrationScenarios

Bases: object

Integration tests for common procedure MCP usage patterns.

async test_error_handling_workflow()

Test error handling in a typical workflow.

async test_full_experiment_mcp_workflow()

Test a complete procedure MCP workflow.

class plexus.cli.procedure.test_mcp_transport.TestProcedureMCPClient

Bases: object

Test the procedure MCP client interface.

client(transport)
initialized_client(transport)
async test_client_call_tool(transport)

Test client tool calling.

async test_client_list_resources(transport)

Test client resource listing.

async test_client_list_tools(transport)

Test client tool listing.

transport()