diff --git a/ci/docker/runtime_functions.sh b/ci/docker/runtime_functions.sh index e171767d51f3..272f1591de78 100755 --- a/ci/docker/runtime_functions.sh +++ b/ci/docker/runtime_functions.sh @@ -1346,7 +1346,10 @@ integrationtest_ubuntu_gpu_scala() { integrationtest_ubuntu_gpu_dist_kvstore() { set -ex pushd . - cd tests/nightly + cd /work/mxnet/python + pip3 install -e . + pip3 install --no-cache-dir horovod + cd /work/mxnet/tests/nightly ./test_distributed_training-gpu.sh popd } diff --git a/example/distributed_training/cifar10_kvstore_hvd.py b/example/distributed_training/cifar10_kvstore_hvd.py new file mode 100644 index 000000000000..e6780e5db85e --- /dev/null +++ b/example/distributed_training/cifar10_kvstore_hvd.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""cifar10_dist_hvd.py contains code that runs distributed training of a +ResNet18 network using Horovod framework""" + +import argparse +import logging +import time +import random +import types +import warnings + +import numpy as np +import mxnet as mx +from mxnet import autograd, gluon, kv, nd +from mxnet.gluon.model_zoo import vision + +logging.basicConfig(level=logging.INFO) + +# Training settings +parser = argparse.ArgumentParser(description='MXNet CIFAR Example') + +parser.add_argument('--batch-size', type=int, default=64, + help='training batch size per worker (default: 64)') +parser.add_argument('--epochs', type=int, default=5, + help='number of training epochs (default: 5)') +parser.add_argument('--lr', type=float, default=0.01, + help='learning rate (default: 0.01)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disable training on GPU (default: False)') +args = parser.parse_args() + +if not args.no_cuda: + # Disable CUDA if there are no GPUs. + if mx.context.num_gpus() == 0: + args.no_cuda = True + + +# Transform input data +def transform(data, label): + return nd.transpose(data.astype(np.float32), (2, 0, 1))/255,\ + label.astype(np.float32) + + +# Train a batch using multiple GPUs +def train(batch_list, context, network, gluon_trainer, metric): + """ Training with multiple GPUs + + Parameters + ---------- + batch_list: List + list of dataset + context: List + a list of all GPUs to be used for training + network: + ResNet + gluon_trainer: + rain module of gluon + """ + + # Run one forward and backward pass + def forward_backward(network, data, labels, metric): + with autograd.record(): + # Compute outputs + outputs = [network(X) for X in data] + # Compute the loss + losses = [loss(yhat, y) for yhat, y in zip(outputs, labels)] + + # Run the backward pass (calculate gradients) + for l in losses: + l.backward() + + metric.update(preds=outputs, labels=labels) + + # Use cross entropy loss + loss = gluon.loss.SoftmaxCrossEntropyLoss() + + # Split and load data + data = batch_list[0] + data = gluon.utils.split_and_load(data, context) + + # Split and load label + label = batch_list[1] + label = gluon.utils.split_and_load(label, context) + + # Run the forward and backward pass + forward_backward(network, data, label, metric) + + # Update the parameters + this_batch_size = batch_list[0].shape[0] + gluon_trainer.step(this_batch_size) + + +# Evaluate accuracy of the given network using the given data +def evaluate(data_iterator, network, context): + """ Measure the accuracy of ResNet + + Parameters + ---------- + data_iterator: Iter + examples of dataset + network: + ResNet + + Returns + ---------- + tuple of array element + """ + acc = mx.metric.Accuracy() + + # Iterate through data and label + for i, (data, label) in enumerate(data_iterator): + + # Get the data and label into the GPU + data = data.as_in_context(context) + label = label.as_in_context(context) + + # Get network's output which is a probability distribution + # Apply argmax on the probability distribution to get network's + # classification. + output = network(data) + predictions = nd.argmax(output, axis=1) + + # Give network's prediction and the correct label to update the metric + acc.update(preds=predictions, labels=label) + + # Return the accuracy + return acc.get()[1] + + +class SplitSampler(gluon.data.sampler.Sampler): + """ Split the dataset into `num_parts` parts and sample from the part with + index `part_index` + + Parameters + ---------- + length: int + Number of examples in the dataset + num_parts: int + Partition the data into multiple parts + part_index: int + The index of the part to read from + """ + def __init__(self, length, num_parts=1, part_index=0): + # Compute the length of each partition + self.part_len = length // num_parts + # Compute the start index for this partition + self.start = self.part_len * part_index + # Compute the end index for this partition + self.end = self.start + self.part_len + + def __iter__(self): + # Extract examples between `start` and `end`, shuffle and return them. + indices = list(range(self.start, self.end)) + random.shuffle(indices) + return iter(indices) + + def __len__(self): + return self.part_len + + +# Use Horovod as the KVStore +store = kv.create('horovod') + +# Get the number of workers +num_workers = store.num_workers + +# Create the context based on the local rank of the current process +ctx = mx.cpu(store.local_rank) if args.no_cuda else mx.gpu(store.local_rank) + +# Load the training data +train_data = gluon.data.DataLoader(gluon.data.vision.CIFAR10(train=True, + transform=transform), args.batch_size, + sampler=SplitSampler(50000, + num_workers, + store.rank)) + +# Load the test data +test_data = gluon.data.DataLoader(gluon.data.vision.CIFAR10(train=False, + transform=transform), + args.batch_size, shuffle=False) + +# Load ResNet18 model from GluonCV model zoo +net = vision.resnet18_v1() + +# Initialize the parameters with Xavier initializer +net.initialize(mx.init.Xavier(), ctx=ctx) + +# Use Adam optimizer. Ask trainer to use the distributor kv store. +trainer = gluon.Trainer(net.collect_params(), optimizer='adam', + optimizer_params={'learning_rate': args.lr}, + kvstore=store) + +train_metric = mx.metric.Accuracy() + +# Run as many epochs as required +for epoch in range(args.epochs): + tic = time.time() + train_metric.reset() + + # Iterate through batches and run training using multiple GPUs + batch_num = 1 + btic = time.time() + for batch in train_data: + # Train the batch using multiple GPUs + train(batch, [ctx], net, trainer, train_metric) + if store.rank == 0 and batch_num % 100 == 0: + speed = args.batch_size / (time.time() - btic) + logging.info('Epoch[{}] Rank [{}] Batch[{}]\tSpeed: {:.2f} samples/sec' + .format(epoch, store.rank, batch_num, speed)) + logging.info('{} = {:.2f}'.format(*train_metric.get())) + + btic = time.time() + batch_num += 1 + + elapsed = time.time() - tic + # Print test accuracy after every epoch + test_accuracy = evaluate(test_data, net, ctx) + if store.rank == 0: + logging.info("Epoch %d: Test_acc %f" % (epoch, test_accuracy)) \ No newline at end of file diff --git a/python/mxnet/gluon/trainer.py b/python/mxnet/gluon/trainer.py index 303167d8abf2..fd03393b6374 100644 --- a/python/mxnet/gluon/trainer.py +++ b/python/mxnet/gluon/trainer.py @@ -25,6 +25,7 @@ from .parameter import ParameterDict, Parameter from ..kvstore import KVStore + class Trainer(object): """Applies an `Optimizer` on a set of Parameters. Trainer should be used together with `autograd`. diff --git a/python/mxnet/kvstore/__init__.py b/python/mxnet/kvstore/__init__.py index ccb58a1c6229..0547ed40631d 100644 --- a/python/mxnet/kvstore/__init__.py +++ b/python/mxnet/kvstore/__init__.py @@ -22,3 +22,4 @@ from .kvstore import * from .base import * from .kvstore_server import * +from .horovod import * diff --git a/python/mxnet/kvstore/horovod.py b/python/mxnet/kvstore/horovod.py new file mode 100644 index 000000000000..20a0cd89edaa --- /dev/null +++ b/python/mxnet/kvstore/horovod.py @@ -0,0 +1,161 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# coding: utf-8 +""" Key value store interface of MXNet for Horovod """ +from __future__ import absolute_import +from .base import KVStoreBase + +__all__ = ['Horovod'] + + +@KVStoreBase.register +class Horovod(KVStoreBase): + """A communication backend using Horovod.""" + + def __init__(self): + import horovod.mxnet as hvd + hvd.init() + + @property + def type(self): + return 'horovod' + + def broadcast(self, key, value, out, priority=0): + """ Broadcast the `value` NDArray at rank 0 to all ranks + + Parameters + ---------- + key : str, or int + The key is used to name the tensor for allreduce. Its + usage is different from that of parameter servers. + + value : NDArray + The tensor that is to be broadcasted. + + out : NDArray, list of NDArray + Output tensor that receives value broadcasted from root process + + priority : int, optional + The priority of the operation. + Higher priority operations are likely to be executed before other actions. + + Examples + -------- + >>> a = mx.nd.ones(shape) + >>> b = mx.nd.zeros(shape) + >>> kv.broadcast('2', value=a, out=b) + >>> print(b.asnumpy) + [[ 1. 1. 1.] + [ 1. 1. 1.]] + """ + import horovod.mxnet as hvd + + out = out if isinstance(out, list) else [out] + + # TODO (lnyuan): need to copy data to each device memory + for o in out: + o[:] = hvd.broadcast(tensor=value, root_rank=0, name=str(key), + priority=priority) + + def pushpull(self, key, value, out=None, priority=0): + """ Performs allreduce on a single tensor or a list of tensor objects + + This function performs in-place summation of the input tensor over all the processes. + + The name `pushpull` is a generic term. In Horovod, its action is implemented via + ring allreduce. Each operation is identified by the 'key'; if `key` is not provided, an + incremented auto-generated name is used. The tensor type and shape must be + the same on all processes for a given name. The reduction will not start until all processes + are ready to send and receive the tensor. + + Parameters + ---------- + key : str, int, or sequence of str or int + Keys used to uniquely tag an operation. + + value : NDArray + Tensor value on one process to be summed. If `out` is not specified, the `value` will + be modified in-place + + out: NDArray + Output tensor after allreduce. If not specified, the input tensor `value` will be + modified in-place. + + priority : int, optional + The priority of the operation. + Higher priority operations are likely to be executed before other actions. + + Examples + -------- + >>> # perform in-place allreduce on tensor a + >>> shape = (2, 3) + >>> nworker = kv.num_workers # assume there are 8 processes + >>> a = mx.nd.ones(shape) + >>> kv.pushpull('1', a) + >>> print(a.asnumpy()) + [[ 8. 8. 8.] + [ 8. 8. 8.]] + + >>> # perform allreduce on tensor a and output to b + >>> a = mx.nd.ones(shape) + >>> kv.pushpull('2', a, out=b) + >>> print(b.asnumpy()) + [[ 8. 8. 8.] + [ 8. 8. 8.]] + """ + import horovod.mxnet as hvd + + if out is None: + value = value if isinstance(value, list) else [value] + for v in value: + hvd.allreduce_(v, average=False, name=str(key), + priority=priority) + else: + out = out if isinstance(out, list) else [out] + value = value if isinstance(value, list) else [value] + for o, v in zip(out, value): + o[:] = hvd.allreduce(v, average=False, name=str(key), + priority=priority) + + def set_optimizer(self, optimizer): + pass + + @staticmethod + def is_capable(capability): + return False + + def save_optimizer_states(self, fname, dump_optimizer=False): + pass + + def load_optimizer_states(self, fname): + pass + + @property + def rank(self): + import horovod.mxnet as hvd + return hvd.rank() + + @property + def local_rank(self): + import horovod.mxnet as hvd + return hvd.local_rank() + + @property + def num_workers(self): + import horovod.mxnet as hvd + return hvd.size() diff --git a/python/mxnet/kvstore/kvstore.py b/python/mxnet/kvstore/kvstore.py index 11ec3f98178f..ad83ad4fac7c 100644 --- a/python/mxnet/kvstore/kvstore.py +++ b/python/mxnet/kvstore/kvstore.py @@ -209,6 +209,7 @@ def push(self, key, value, priority=0): Examples -------- >>> # push a single key-value pair + >>> shape = (2,3) >>> kv.push('3', mx.nd.ones(shape)*8) >>> kv.pull('3', out=a) # pull out the value >>> print a.asnumpy() @@ -295,6 +296,7 @@ def pull(self, key, out=None, priority=0, ignore_sparse=True): Examples -------- >>> # pull a single key-value pair + >>> shape = (2,3) >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() @@ -367,6 +369,7 @@ def pushpull(self, key, value, out=None, priority=0): Examples -------- >>> # pushpull a single key-value pair + >>> shape = (2,3) >>> kv.pushpull('3', mx.nd.ones(shape)*8, out=a) >>> print a.asnumpy() [[ 8. 8. 8.] diff --git a/tests/nightly/dist_device_sync_kvstore_horovod.py b/tests/nightly/dist_device_sync_kvstore_horovod.py new file mode 100644 index 000000000000..b5dfcafc8af1 --- /dev/null +++ b/tests/nightly/dist_device_sync_kvstore_horovod.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys +sys.path.insert(0, "../../python/") +import mxnet as mx +import numpy as np +import numpy.random as rnd +import time +import argparse + +# parser +parser = argparse.ArgumentParser(description='kvstore test') +args = parser.parse_args() + + +def check_diff_to_scalar(A, x, rank=None): + """ assert A == x""" + assert(np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x) + + +# setup +keys = ['3', '5', '7'] +init_test_keys = [str(i) for i in range(200,300)] +init_test_keys_big = [str(i) for i in range(300,400)] +init_test_keys_device = [str(i) for i in range(400,500)] +init_test_keys_device_big = [str(i) for i in range(500,600)] + +shape = (2, 3) +big_shape = (1200, 1200) # bigger than MXNET_KVSTORE_BIGARRAY_BOUND + +kv = mx.kv.create('horovod') +my_rank = kv.rank +my_num_workers = kv.num_workers + + +def test_pushpull(): + ctx = mx.gpu(kv.local_rank) if mx.context.num_gpus() > 0 else mx.cpu(kv.local_rank) + scale = kv.rank + 1 + tensor = mx.nd.ones(shape, ctx) * scale + kv.pushpull('3', tensor) + + expected = (kv.num_workers + 1) * kv.num_workers / 2 + check_diff_to_scalar(tensor, expected) + print('worker ' + str(kv.local_rank) + ' passed test_pushpull') + + +def test_broadcast(): + ctx = mx.gpu(kv.local_rank) if mx.context.num_gpus() > 0 else mx.cpu(kv.local_rank) + val = mx.nd.zeros(shape, ctx) + kv.broadcast('0', mx.nd.ones(shape), out=val) + expected = 1 + check_diff_to_scalar(val, expected, kv.rank) + print('worker ' + str(kv.local_rank) + ' passed test_broadcast') + + +def test_type(): + assert kv.type == 'horovod' + + +if __name__ == "__main__": + test_type() + test_broadcast() + test_pushpull() diff --git a/tests/nightly/test_distributed_training-gpu.sh b/tests/nightly/test_distributed_training-gpu.sh index 9ce9cccb09da..40b6e1464a0d 100755 --- a/tests/nightly/test_distributed_training-gpu.sh +++ b/tests/nightly/test_distributed_training-gpu.sh @@ -31,7 +31,6 @@ test_kvstore() { ) for arg in "${test_args[@]}"; do - echo $arg python3 ../../tools/launch.py $arg if [ $? -ne 0 ]; then return $? @@ -39,6 +38,16 @@ test_kvstore() { done } +test_horovod() { + echo "localhost slots=2" > hosts + mpirun -np 2 --hostfile hosts --bind-to none --map-by slot -mca pml ob1 \ + -mca btl ^openib python3 dist_device_sync_kvstore_horovod.py + if [ $? -ne 0 ]; then + return $? + fi +} + test_kvstore +test_horovod exit $errors \ No newline at end of file diff --git a/tools/launch.py b/tools/launch.py index 7000e061fd4b..117dab69b1b6 100755 --- a/tools/launch.py +++ b/tools/launch.py @@ -28,6 +28,7 @@ curr_path = os.path.abspath(os.path.dirname(__file__)) sys.path.append(os.path.join(curr_path, "../3rdparty/dmlc-core/tracker")) + def dmlc_opts(opts): """convert from mxnet's opts to dmlc's opts """ @@ -41,14 +42,14 @@ def dmlc_opts(opts): dopts = vars(opts) for key in ['env_server', 'env_worker', 'env']: for v in dopts[key]: - args.append('--' + key.replace("_","-")) + args.append('--' + key.replace("_", "-")) args.append(v) args += opts.command try: from dmlc_tracker import opts except ImportError: - print("Can't load dmlc_tracker package. Perhaps you need to run") - print(" git submodule update --init --recursive") + logging.info("Can't load dmlc_tracker package. Perhaps you need to run") + logging.info(" git submodule update --init --recursive") raise dmlc_opts = opts.get_opts(args) return dmlc_opts @@ -57,39 +58,39 @@ def dmlc_opts(opts): def main(): parser = argparse.ArgumentParser(description='Launch a distributed job') parser.add_argument('-n', '--num-workers', required=True, type=int, - help = 'number of worker nodes to be launched') + help='number of worker nodes to be launched') parser.add_argument('-s', '--num-servers', type=int, - help = 'number of server nodes to be launched, \ + help='number of server nodes to be launched, \ in default it is equal to NUM_WORKERS') parser.add_argument('-H', '--hostfile', type=str, help = 'the hostfile of slave machines which will run \ the job. Required for ssh and mpi launcher') parser.add_argument('--sync-dst-dir', type=str, - help = 'if specificed, it will sync the current \ + help='if specificed, it will sync the current \ directory into slave machines\'s SYNC_DST_DIR if ssh \ launcher is used') parser.add_argument('--launcher', type=str, default='ssh', choices = ['local', 'ssh', 'mpi', 'sge', 'yarn'], - help = 'the launcher to use') + help='the launcher to use') parser.add_argument('--env-server', action='append', default=[], - help = 'Given a pair of environment_variable:value, sets this value of \ + help='Given a pair of environment_variable:value, sets this value of \ environment variable for the server processes. This overrides values of \ those environment variable on the machine where this script is run from. \ Example OMP_NUM_THREADS:3') parser.add_argument('--env-worker', action='append', default=[], - help = 'Given a pair of environment_variable:value, sets this value of \ + help='Given a pair of environment_variable:value, sets this value of \ environment variable for the worker processes. This overrides values of \ those environment variable on the machine where this script is run from. \ Example OMP_NUM_THREADS:3') parser.add_argument('--env', action='append', default=[], - help = 'given a environment variable, passes their \ + help='given a environment variable, passes their \ values from current system to all workers and servers. \ Not necessary when launcher is local as in that case \ all environment variables which are set are copied.') parser.add_argument('--p3', action='store_true', default=False, help = 'Use P3 distributed training') parser.add_argument('command', nargs='+', - help = 'command for launching the program') + help='command for launching the program') args, unknown = parser.parse_known_args() args.command += unknown if args.num_servers is None: @@ -100,31 +101,33 @@ def main(): args = dmlc_opts(args) if args.host_file is None or args.host_file == 'None': - if args.cluster == 'yarn': - from dmlc_tracker import yarn - yarn.submit(args) - elif args.cluster == 'local': - from dmlc_tracker import local - local.submit(args) - elif args.cluster == 'sge': - from dmlc_tracker import sge - sge.submit(args) - else: - raise RuntimeError('Unknown submission cluster type %s' % args.cluster) + if args.cluster == 'yarn': + from dmlc_tracker import yarn + yarn.submit(args) + elif args.cluster == 'local': + from dmlc_tracker import local + local.submit(args) + elif args.cluster == 'sge': + from dmlc_tracker import sge + sge.submit(args) + else: + raise RuntimeError('Unknown submission cluster type %s' % args.cluster) else: - if args.cluster == 'ssh': - from dmlc_tracker import ssh - ssh.submit(args) - elif args.cluster == 'mpi': - from dmlc_tracker import mpi - mpi.submit(args) - else: - raise RuntimeError('Unknown submission cluster type %s' % args.cluster) + if args.cluster == 'ssh': + from dmlc_tracker import ssh + ssh.submit(args) + elif args.cluster == 'mpi': + from dmlc_tracker import mpi + mpi.submit(args) + else: + raise RuntimeError('Unknown submission cluster type %s' % args.cluster) + def signal_handler(signal, frame): logging.info('Stop launcher') sys.exit(0) + if __name__ == '__main__': fmt = '%(asctime)s %(levelname)s %(message)s' logging.basicConfig(format=fmt, level=logging.INFO)