Distributed training with multiple machines

In the previous two tutorials, we saw that using multiple GPUs within a machine can accelerate training. The speedup, however, is limited by the number of GPUs installed in that machine. And it’s rare to find a single machine with more than 16 GPUs nowadays. For some truly large-scale applications, this speedup might still be insufficient. For example, it could still take many days to train a state-of-the-art CNN on millions of images.

In this tutorial, we’ll discuss the key concepts you’ll need in order go from a progra that does single-machine training to one that executes distributied training across multiple machines. We depict a typical distributed syste in the following figure, where multiple machines are connected by network switches.

Note that the way we used copyto to copy data from one GPU to another in the multiple-GPU tutorial does not work when our GPUs are sitting on different machines. To make use of the available resources here well need a better abstraction.

Key-value store

MXNet provides a key-value store to synchronize data among devices. The following codes initialize a ndarray associated with the key “weight” on a key-value store.

In [1]:
from mxnet import kv, nd
store = kv.create('local')
shape = (2, 3)
x = nd.random_uniform(shape=shape)
store.init('weight', x)
print('=== init "weight" ==={}'.format(x))
=== init "weight" ===
[[ 0.54881352  0.59284461  0.71518934]
 [ 0.84426576  0.60276335  0.85794562]]
<NDArray 2x3 @cpu(0)>

After initialization, we can pull the value to multiple devices.

In [2]:
from mxnet import gpu
ctx = [gpu(0), gpu(1)]
y = [nd.zeros(shape, ctx=c) for c in ctx]
store.pull('weight', out=y)
print('=== pull "weight" to {} ===\n{}'.format(ctx, y))
=== pull "weight" to [gpu(0), gpu(1)] ===
[
[[ 0.54881352  0.59284461  0.71518934]
 [ 0.84426576  0.60276335  0.85794562]]
<NDArray 2x3 @gpu(0)>,
[[ 0.54881352  0.59284461  0.71518934]
 [ 0.84426576  0.60276335  0.85794562]]
<NDArray 2x3 @gpu(1)>]

We can also push new data value into the store. It will first sum the data on the same key and then overwrite the current value.

In [3]:
z = [nd.ones(shape, ctx=ctx[i])+i for i in range(len(ctx))]
store.push('weight', z)
print('=== push to "weight" ===\n{}'.format(z))
store.pull('weight', out=y)
print('=== pull "weight" ===\n{}'.format(y))
=== push to "weight" ===
[
[[ 1.  1.  1.]
 [ 1.  1.  1.]]
<NDArray 2x3 @gpu(0)>,
[[ 2.  2.  2.]
 [ 2.  2.  2.]]
<NDArray 2x3 @gpu(1)>]
=== pull "weight" ===
[
[[ 3.  3.  3.]
 [ 3.  3.  3.]]
<NDArray 2x3 @gpu(0)>,
[[ 3.  3.  3.]
 [ 3.  3.  3.]]
<NDArray 2x3 @gpu(1)>]

With push and pull we can replace the allreduce function defined in multiple-gpus-scratch by

def allreduce(data, data_name, store):
    store.push(data_name, data)
    store.pull(data_name, out=data)

Distributed key-value store

Not only for data synchronization within a machine, the key-value store also supports inter-machine communication. To use it, one can create a distributed kvstore by using the following command: (Note: distributed key-value store requires MXNet to be compiled with the flag USE_DIST_KVSTORE=1, e.g. make USE_DIST_KVSTORE=1.)

store = kv.create('dist')

Now if we run the codes from the previous section on two machines at the same time, then the store will aggregate the two ndarrays pushed from each machine, and after that, the pulled results will be:

[[ 6.  6.  6.]
 [ 6.  6.  6.]]

In the distributed setting, MXNet launches three kinds of processes (each time, running python myprog.py will create a process). One is worker, which runs the user program, such as the codes in the previous section. The other twos are server, which maintains the data pushed into the store, and scheduler, which monitors the aliveness of each node.

It’s up to users which machines to run these processes on. But to simplify the process placement and launching, MXNet provides a tool located at tools/launch.py.

Assume there are two machines, A and B. They are ssh-able, and their IPs are saved in a file named hostfile. Then we can start one worker in each machine through:

$ mxnet_path/tools/launch.py -H hostfile -n 2 python myprog.py

It will also start a server in each machine, and the scheduler on the same machine we are currently on.

Use kvstore in gluon

As mentioned in our section on training with multiple GPUs from scratch, to implement data parallelism we just need to specify

  • how to split data
  • how to synchronize gradients and weights

We already see from multiple-gpu-gluon that a gluon trainer can automatically aggregate the gradients among different GPUs. What it really does is having a key-value store with type local within it. Therefore, to change to multi-machine training we only need to pass a distributed key-value store, for example,

store = kv.create('dist')
trainer = gluon.Trainer(..., kvstore=store)

To split the data, however, we cannot directly copy the previous approach. One commonly used solution is to split the whole dataset into k parts at the beginning, then let the i-th worker only reads the i-th part of the data.

We can obtain the total number of workers by the attribute num_workers and the rank of the current worker by the attribute rank.

In [4]:
print('total number of workers: %d'%(store.num_workers))
print('my rank among workers: %d'%(store.rank))
total number of workers: 1
my rank among workers: 0

With this information, we can manually seek to a proper position of the input data. In addition, several data iterators provided by MXNet already support reading a part of the data. For example,

from mxnet.io import ImageRecordIter
data = ImageRecordIter(num_parts=store.num_workers, part_index=store.rank, ...)

For whinges or inquiries, open an issue on GitHub.