diff --git a/example/distributed_training/cifar10_kvstore_hvd.py b/example/distributed_training/cifar10_kvstore_hvd.py new file mode 100644 index 000000000000..e6780e5db85e --- /dev/null +++ b/example/distributed_training/cifar10_kvstore_hvd.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""cifar10_dist_hvd.py contains code that runs distributed training of a +ResNet18 network using Horovod framework""" + +import argparse +import logging +import time +import random +import types +import warnings + +import numpy as np +import mxnet as mx +from mxnet import autograd, gluon, kv, nd +from mxnet.gluon.model_zoo import vision + +logging.basicConfig(level=logging.INFO) + +# Training settings +parser = argparse.ArgumentParser(description='MXNet CIFAR Example') + +parser.add_argument('--batch-size', type=int, default=64, + help='training batch size per worker (default: 64)') +parser.add_argument('--epochs', type=int, default=5, + help='number of training epochs (default: 5)') +parser.add_argument('--lr', type=float, default=0.01, + help='learning rate (default: 0.01)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disable training on GPU (default: False)') +args = parser.parse_args() + +if not args.no_cuda: + # Disable CUDA if there are no GPUs. + if mx.context.num_gpus() == 0: + args.no_cuda = True + + +# Transform input data +def transform(data, label): + return nd.transpose(data.astype(np.float32), (2, 0, 1))/255,\ + label.astype(np.float32) + + +# Train a batch using multiple GPUs +def train(batch_list, context, network, gluon_trainer, metric): + """ Training with multiple GPUs + + Parameters + ---------- + batch_list: List + list of dataset + context: List + a list of all GPUs to be used for training + network: + ResNet + gluon_trainer: + rain module of gluon + """ + + # Run one forward and backward pass + def forward_backward(network, data, labels, metric): + with autograd.record(): + # Compute outputs + outputs = [network(X) for X in data] + # Compute the loss + losses = [loss(yhat, y) for yhat, y in zip(outputs, labels)] + + # Run the backward pass (calculate gradients) + for l in losses: + l.backward() + + metric.update(preds=outputs, labels=labels) + + # Use cross entropy loss + loss = gluon.loss.SoftmaxCrossEntropyLoss() + + # Split and load data + data = batch_list[0] + data = gluon.utils.split_and_load(data, context) + + # Split and load label + label = batch_list[1] + label = gluon.utils.split_and_load(label, context) + + # Run the forward and backward pass + forward_backward(network, data, label, metric) + + # Update the parameters + this_batch_size = batch_list[0].shape[0] + gluon_trainer.step(this_batch_size) + + +# Evaluate accuracy of the given network using the given data +def evaluate(data_iterator, network, context): + """ Measure the accuracy of ResNet + + Parameters + ---------- + data_iterator: Iter + examples of dataset + network: + ResNet + + Returns + ---------- + tuple of array element + """ + acc = mx.metric.Accuracy() + + # Iterate through data and label + for i, (data, label) in enumerate(data_iterator): + + # Get the data and label into the GPU + data = data.as_in_context(context) + label = label.as_in_context(context) + + # Get network's output which is a probability distribution + # Apply argmax on the probability distribution to get network's + # classification. + output = network(data) + predictions = nd.argmax(output, axis=1) + + # Give network's prediction and the correct label to update the metric + acc.update(preds=predictions, labels=label) + + # Return the accuracy + return acc.get()[1] + + +class SplitSampler(gluon.data.sampler.Sampler): + """ Split the dataset into `num_parts` parts and sample from the part with + index `part_index` + + Parameters + ---------- + length: int + Number of examples in the dataset + num_parts: int + Partition the data into multiple parts + part_index: int + The index of the part to read from + """ + def __init__(self, length, num_parts=1, part_index=0): + # Compute the length of each partition + self.part_len = length // num_parts + # Compute the start index for this partition + self.start = self.part_len * part_index + # Compute the end index for this partition + self.end = self.start + self.part_len + + def __iter__(self): + # Extract examples between `start` and `end`, shuffle and return them. + indices = list(range(self.start, self.end)) + random.shuffle(indices) + return iter(indices) + + def __len__(self): + return self.part_len + + +# Use Horovod as the KVStore +store = kv.create('horovod') + +# Get the number of workers +num_workers = store.num_workers + +# Create the context based on the local rank of the current process +ctx = mx.cpu(store.local_rank) if args.no_cuda else mx.gpu(store.local_rank) + +# Load the training data +train_data = gluon.data.DataLoader(gluon.data.vision.CIFAR10(train=True, + transform=transform), args.batch_size, + sampler=SplitSampler(50000, + num_workers, + store.rank)) + +# Load the test data +test_data = gluon.data.DataLoader(gluon.data.vision.CIFAR10(train=False, + transform=transform), + args.batch_size, shuffle=False) + +# Load ResNet18 model from GluonCV model zoo +net = vision.resnet18_v1() + +# Initialize the parameters with Xavier initializer +net.initialize(mx.init.Xavier(), ctx=ctx) + +# Use Adam optimizer. Ask trainer to use the distributor kv store. +trainer = gluon.Trainer(net.collect_params(), optimizer='adam', + optimizer_params={'learning_rate': args.lr}, + kvstore=store) + +train_metric = mx.metric.Accuracy() + +# Run as many epochs as required +for epoch in range(args.epochs): + tic = time.time() + train_metric.reset() + + # Iterate through batches and run training using multiple GPUs + batch_num = 1 + btic = time.time() + for batch in train_data: + # Train the batch using multiple GPUs + train(batch, [ctx], net, trainer, train_metric) + if store.rank == 0 and batch_num % 100 == 0: + speed = args.batch_size / (time.time() - btic) + logging.info('Epoch[{}] Rank [{}] Batch[{}]\tSpeed: {:.2f} samples/sec' + .format(epoch, store.rank, batch_num, speed)) + logging.info('{} = {:.2f}'.format(*train_metric.get())) + + btic = time.time() + batch_num += 1 + + elapsed = time.time() - tic + # Print test accuracy after every epoch + test_accuracy = evaluate(test_data, net, ctx) + if store.rank == 0: + logging.info("Epoch %d: Test_acc %f" % (epoch, test_accuracy)) \ No newline at end of file