diff --git a/docs/manual/cluster-user/README.md b/docs/manual/cluster-user/README.md index 56f5009700..4107b41c53 100644 --- a/docs/manual/cluster-user/README.md +++ b/docs/manual/cluster-user/README.md @@ -11,6 +11,7 @@ This manual is for cluster users to learn how to submit job, debug job, manage d 3. [How to Manage Data](./how-to-manage-data.md) 4. [How to Debug Jobs](./how-to-debug-jobs.md) 5. [How to Use Advanced Job Settings](./how-to-use-advanced-job-settings.md) -6. [Use Marketplace](./use-marketplace.md) -7. [Use VSCode Extension](./use-vscode-extension.md) -8. [Use Jupyter Notebook Extension](./use-jupyter-notebook-extension.md) \ No newline at end of file +6. [How to Run Distributed Job](./how-to-run-distributed-job.md) +7. [Use Marketplace](./use-marketplace.md) +8. [Use VSCode Extension](./use-vscode-extension.md) +9. [Use Jupyter Notebook Extension](./use-jupyter-notebook-extension.md) diff --git a/docs/manual/cluster-user/how-to-run-distributed-job.md b/docs/manual/cluster-user/how-to-run-distributed-job.md new file mode 100644 index 0000000000..3ac0ee716d --- /dev/null +++ b/docs/manual/cluster-user/how-to-run-distributed-job.md @@ -0,0 +1,34 @@ +## How OpenPAI Deploy Distributed Jobs +### Taskrole and Instance +When we execute distributed programs on PAI, we can add different task roles for our job. For single server jobs, there is only one task role. For distributed jobs, there may be multiple task roles. For example, when TensorFlow is used to running distributed jobs, it has two roles, including the parameter server and the worker. In distributed jobs, each role may have one or more instances. For example, if it's 8 instances in a worker role of TensorFlow. It means there should be 8 Docker containers for the worker role. Please visit [here](./how-to-use-advanced-job-settings.md#multiple-task-roles) for specific operations. + +### Environmental variables +In a distributed job, one task might communicate with others (When we say task, we mean a single instance of a task role). So a task need to be aware of other tasks' runtime information such as IP, port, etc. The system exposes such runtime information as environment variables to each task's Docker container. For mutual communication, users can write code in the container to access those runtime environment variables. Please visit [here](./how-to-use-advanced-job-settings.md#environmental-variables-and-port-reservation) for specific operations. + +### Retry policy and Completion policy +If unknown error happens, PAI will retry the job according to user settings. To set a retry policy and completion policy for user's job,PAI asks user to switch to Advanced mode. Please visit [here](./how-to-use-advanced-job-settings.md#job-exit-spec-retry-policy-and-completion-policy) for specific operations. +### Run PyTorch Distributed Jobs in OpenPAI +Example Name | Multi-GPU | Multi-Node | Backend |Apex| Job protocol | +---|---|---|---|---|---| +Single-Node DataParallel CIFAR-10 | ✓| x | -|-| [cifar10-single-node-gpus-cpu-DP.yaml](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.yaml)| +cifar10-single-mul-DDP-gloo.yaml | ✓| ✓ | gloo|-| [cifar10-single-mul-DDP-gloo.yaml](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml)| +cifar10-single-mul-DDP-nccl | ✓| ✓ |nccl|-| [cifar10-single-mul-DDP-nccl.yaml](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml)| +cifar10-single-mul-DDP-gloo-Apex-mixed | ✓| ✓ | gloo|✓ | [cifar10-single-mul-DDP-gloo-Apex-mixed.yaml](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml)| +cifar10-single-mul-DDP-nccl-Apex-mixed | ✓| ✓ | nccl| ✓ | [cifar10-single-mul-DDP-gloo-Apex-mixed.yaml](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml)| +imagenet-single-mul-DDP-gloo | ✓| ✓| gloo|-| [imagenet-single-mul-DDP-gloo.yaml](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/Lite-imagenet-single-mul-DDP-gloo.yaml)| + +## DataParallel +The single node program is simple. The program executed in PAI is exactly the same as the program in our machine. It should be noted that an Worker can be applied in PAI and a Instance can be applied in Worker. In a worker, we can apply for GPUs that we need. We provide an [example](../../../examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.py) of DP. + +## DistributedDataParallel +DDP requires users set a master node ip and port for synchronization in PyTorch. For the port, you can simply set one certain port, such as `5000` as your master port. However, this port may conflict with others. To prevent port conflict, you can reserve a port in OpenPAI, as we mentioned [here](./how-to-use-advanced-job-settings.md#environmental-variables-and-port-reservation). The port you reserved is available in environmental variables like `PAI_PORT_LIST_$taskRole_$taskIndex_$portLabel`, where `$taskIndex` means the instance index of that task role. For example, if your task role name is `work` and port label is `SyncPort`, you can add the following code in your PyTorch DDP program: + +``` +os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] +os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] +``` +If you are using `gloo` as your DDP communication backend, please set correct network interface such as `export GLOO_SOCKET_IFNAME=eth0`. + + +We provide examples with [gloo](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml) and [nccl](https://github.com/microsoft/pai/tree/master/examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml) as backend. + diff --git a/examples/Distributed-example/Lite-imagenet-single-mul-DDP-gloo.yaml b/examples/Distributed-example/Lite-imagenet-single-mul-DDP-gloo.yaml new file mode 100644 index 0000000000..c17ce4b726 --- /dev/null +++ b/examples/Distributed-example/Lite-imagenet-single-mul-DDP-gloo.yaml @@ -0,0 +1,50 @@ +protocolVersion: 2 +name: imagenet-gloo_8ba8ed42_7606233c +type: job +jobRetryCount: 0 +prerequisites: + - type: dockerimage + uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' + name: docker_image_0 +taskRoles: + worker: + instances: 2 + completion: + minFailedInstances: 1 + taskRetryCount: 0 + dockerImage: docker_image_0 + resourcePerInstance: + gpu: 4 + cpu: 16 + memoryMB: 32768 + ports: + SynPort: 1 + commands: + - export GLOO_SOCKET_IFNAME=eth0 + - 'git clone https://github.com/NVIDIA/apex' + - cd apex + - python setup.py install + - cd .. + - apt update + - apt install -y nfs-common + - mkdir -p /mnt/data + - 'mount 10.151.40.32:/mnt/ImagenetData /mnt/data' + - >- + wget + https://raw.githubusercontent.com/microsoft/pai/master/examples/Distributed-example/Lite-imagenet-single-mul-DDP-nccl-gloo.py + - >- + python Lite-imagenet-single-mul-DDP-nccl-gloo.py -n 2 -g 4 + --dist-backend gloo --epochs 2 /mnt/data/imagenet/unzipped +defaults: + virtualCluster: default +extras: + com.microsoft.pai.runtimeplugin: + - plugin: ssh + parameters: + jobssh: true + userssh: {} + hivedScheduler: + taskRoles: + worker: + skuNum: 1 + skuType: null diff --git a/examples/Distributed-example/Lite-imagenet-single-mul-DDP-nccl-gloo.py b/examples/Distributed-example/Lite-imagenet-single-mul-DDP-nccl-gloo.py new file mode 100644 index 0000000000..0fb0f15c04 --- /dev/null +++ b/examples/Distributed-example/Lite-imagenet-single-mul-DDP-nccl-gloo.py @@ -0,0 +1,117 @@ +import os +from datetime import datetime +import argparse +import torch.multiprocessing as mp +import torch.backends.cudnn as cudnn +import torchvision +import torchvision.transforms as transforms +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.distributed as dist +from apex.parallel import DistributedDataParallel as DDP +from apex import amp + +import torchvision.datasets as datasets +import torchvision.models as models +model_names = sorted(name for name in models.__dict__ + if name.islower() and not name.startswith("__") + and callable(models.__dict__[name])) +def main(): + print('run main') + parser = argparse.ArgumentParser() + parser.add_argument('data', metavar='DIR', + help='path to dataset') + parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', + choices=model_names, + help='model architecture: ' + + ' | '.join(model_names) + + ' (default: resnet18)') + parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N', + help='number of data loading workers (default: 4)') + parser.add_argument('-g', '--gpus', default=1, type=int, + help='number of gpus per node') + parser.add_argument('-nr', '--nr', default=0, type=int, + help='ranking within the nodes') + parser.add_argument('-b', '--batch-size', default=256, type=int, + metavar='N', + help='mini-batch size (default: 256), this is the total ' + 'batch size of all GPUs on the current node when ' + 'using Data Parallel or Distributed Data Parallel') + parser.add_argument('--epochs', default=2, type=int, metavar='N', + help='number of total epochs to run') + parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') + args = parser.parse_args() + args.world_size = args.gpus * args.nodes + print('world_size:',args.world_size) + os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] + os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] + print('master:', os.environ['MASTER_ADDR'], 'port:', os.environ['MASTER_PORT']) + mp.spawn(train, nprocs=args.gpus, args=(args,)) + +def train(gpu, args): + print("start train") + rank = int(os.environ['PAI_TASK_INDEX']) * args.gpus + gpu + dist.init_process_group(backend=args.dist_backend, init_method='env://', world_size=args.world_size, rank=rank) + torch.manual_seed(0) + model=model = models.__dict__[args.arch]() + torch.cuda.set_device(gpu) + model.cuda(gpu) + batch_size = args.batch_size + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(gpu) + optimizer = torch.optim.SGD(model.parameters(), 1e-4) + # Wrap the model + model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) + # Data loading code + traindir = os.path.join(args.data, 'train') + valdir = os.path.join(args.data, 'val') + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + train_dataset = datasets.ImageFolder( + traindir, + transforms.Compose([ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + normalize, + ])) + train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) + + train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), + num_workers=args.nodes, pin_memory=True, sampler=train_sampler) + + val_loader = torch.utils.data.DataLoader( + datasets.ImageFolder(valdir, transforms.Compose([ + transforms.Resize(256), + transforms.CenterCrop(224), + transforms.ToTensor(), + normalize, + ])), + batch_size=args.batch_size, shuffle=False, + num_workers=args.nodes, pin_memory=True) + start = datetime.now() + total_step = len(train_loader) + for epoch in range(args.epochs): + for i, (images, labels) in enumerate(train_loader): + images = images.cuda(non_blocking=True) + labels = labels.cuda(non_blocking=True) + # Forward pass + outputs = model(images) + loss = criterion(outputs, labels) + + # Backward and optimize + optimizer.zero_grad() + loss.backward() + optimizer.step() + #if (i + 1) % 100 == 0 and gpu == 0: + print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step, + loss.item())) + if gpu == 0: + print("Training complete in: " + str(datetime.now() - start)) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/Distributed-example/LiteApex-imagenet-single-mul-DDP-nccl-gloo.py b/examples/Distributed-example/LiteApex-imagenet-single-mul-DDP-nccl-gloo.py new file mode 100644 index 0000000000..d76b454786 --- /dev/null +++ b/examples/Distributed-example/LiteApex-imagenet-single-mul-DDP-nccl-gloo.py @@ -0,0 +1,122 @@ +import os +from datetime import datetime +import argparse +import torch.multiprocessing as mp +import torch.backends.cudnn as cudnn +import torchvision +import torchvision.transforms as transforms +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.distributed as dist +from apex.parallel import DistributedDataParallel as DDP +from apex import amp + +import torchvision.datasets as datasets +import torchvision.models as models +model_names = sorted(name for name in models.__dict__ + if name.islower() and not name.startswith("__") + and callable(models.__dict__[name])) +def main(): + print('run main') + parser = argparse.ArgumentParser() + parser.add_argument('data', metavar='DIR', + help='path to dataset') + parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', + choices=model_names, + help='model architecture: ' + + ' | '.join(model_names) + + ' (default: resnet18)') + parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N', + help='number of data loading workers (default: 4)') + parser.add_argument('-g', '--gpus', default=1, type=int, + help='number of gpus per node') + parser.add_argument('-nr', '--nr', default=0, type=int, + help='ranking within the nodes') + parser.add_argument('-b', '--batch-size', default=256, type=int, + metavar='N', + help='mini-batch size (default: 256), this is the total ' + 'batch size of all GPUs on the current node when ' + 'using Data Parallel or Distributed Data Parallel') + parser.add_argument('--epochs', default=2, type=int, metavar='N', + help='number of total epochs to run') + parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') + args = parser.parse_args() + args.world_size = args.gpus * args.nodes + print('world_size:',args.world_size) + os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] + os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] + print('master:', os.environ['MASTER_ADDR'], 'port:', os.environ['MASTER_PORT']) + mp.spawn(train, nprocs=args.gpus, args=(args,)) + +def train(gpu, args): + print("start train") + rank = int(os.environ['PAI_TASK_INDEX']) * args.gpus + gpu + dist.init_process_group(backend=args.dist_backend, init_method='env://', world_size=args.world_size, rank=rank) + torch.manual_seed(0) + model=model = models.__dict__[args.arch]() + torch.cuda.set_device(gpu) + model.cuda(gpu) + batch_size = args.batch_size + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(gpu) + optimizer = torch.optim.SGD(model.parameters(), 1e-4) + # Wrap the model + model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) + # Wrap the model + model, optimizer = amp.initialize(model, optimizer, opt_level='O2') + model = DDP(model) + # Data loading code + traindir = os.path.join(args.data, 'train') + valdir = os.path.join(args.data, 'val') + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + + train_dataset = datasets.ImageFolder( + traindir, + transforms.Compose([ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + normalize, + ])) + train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) + + train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), + num_workers=args.nodes, pin_memory=True, sampler=train_sampler) + + val_loader = torch.utils.data.DataLoader( + datasets.ImageFolder(valdir, transforms.Compose([ + transforms.Resize(256), + transforms.CenterCrop(224), + transforms.ToTensor(), + normalize, + ])), + batch_size=args.batch_size, shuffle=False, + num_workers=args.nodes, pin_memory=True) + start = datetime.now() + total_step = len(train_loader) + for epoch in range(args.epochs): + for i, (images, labels) in enumerate(train_loader): + images = images.cuda(non_blocking=True) + labels = labels.cuda(non_blocking=True) + # Forward pass + outputs = model(images) + loss = criterion(outputs, labels) + + # Backward and optimize + optimizer.zero_grad() + with amp.scale_loss(loss, optimizer) as scaled_loss: + scaled_loss.backward() + loss.backward() + optimizer.step() + #if (i + 1) % 100 == 0 and gpu == 0: + print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step, + loss.item())) + if gpu == 0: + print("Training complete in: " + str(datetime.now() - start)) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/Distributed-example/PytorchExample-imagenet-single-mul-DDP-nccl-gloo.py b/examples/Distributed-example/PytorchExample-imagenet-single-mul-DDP-nccl-gloo.py new file mode 100644 index 0000000000..69087322b4 --- /dev/null +++ b/examples/Distributed-example/PytorchExample-imagenet-single-mul-DDP-nccl-gloo.py @@ -0,0 +1,427 @@ +import argparse +import os +import random +import shutil +import time +import warnings + +import torch +import torch.nn as nn +import torch.nn.parallel +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.optim +import torch.multiprocessing as mp +import torch.utils.data +import torch.utils.data.distributed +import torchvision.transforms as transforms +import torchvision.datasets as datasets +import torchvision.models as models + +model_names = sorted(name for name in models.__dict__ + if name.islower() and not name.startswith("__") + and callable(models.__dict__[name])) + +parser = argparse.ArgumentParser(description='PyTorch ImageNet Training') +parser.add_argument('data', metavar='DIR', + help='path to dataset') +parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', + choices=model_names, + help='model architecture: ' + + ' | '.join(model_names) + + ' (default: resnet18)') +parser.add_argument('-j', '--workers', default=4, type=int, metavar='N', + help='number of data loading workers (default: 4)') +parser.add_argument('--epochs', default=90, type=int, metavar='N', + help='number of total epochs to run') +parser.add_argument('--start-epoch', default=0, type=int, metavar='N', + help='manual epoch number (useful on restarts)') +parser.add_argument('-b', '--batch-size', default=256, type=int, + metavar='N', + help='mini-batch size (default: 256), this is the total ' + 'batch size of all GPUs on the current node when ' + 'using Data Parallel or Distributed Data Parallel') +parser.add_argument('--lr', '--learning-rate', default=0.1, type=float, + metavar='LR', help='initial learning rate', dest='lr') +parser.add_argument('--momentum', default=0.9, type=float, metavar='M', + help='momentum') +parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float, + metavar='W', help='weight decay (default: 1e-4)', + dest='weight_decay') +parser.add_argument('-p', '--print-freq', default=10, type=int, + metavar='N', help='print frequency (default: 10)') +parser.add_argument('--resume', default='', type=str, metavar='PATH', + help='path to latest checkpoint (default: none)') +parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true', + help='evaluate model on validation set') +parser.add_argument('--pretrained', dest='pretrained', action='store_true', + help='use pre-trained model') +parser.add_argument('--world-size', default=-1, type=int, + help='number of nodes for distributed training') +parser.add_argument('--rank', default=-1, type=int, + help='node rank for distributed training') +parser.add_argument('--dist-url', default='tcp://127.0.0.1:10000', type=str, + help='url used to set up distributed training') +parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') +parser.add_argument('--seed', default=None, type=int, + help='seed for initializing training. ') +parser.add_argument('--gpu', default=None, type=int, + help='GPU id to use.') +parser.add_argument('--multiprocessing-distributed', action='store_true', + help='Use multi-processing distributed training to launch ' + 'N processes per node, which has N GPUs. This is the ' + 'fastest way to use PyTorch for either single node or ' + 'multi node data parallel training') + +best_acc1 = 0 + + +def main(): + args = parser.parse_args() + + if args.seed is not None: + random.seed(args.seed) + torch.manual_seed(args.seed) + cudnn.deterministic = True + warnings.warn('You have chosen to seed training. ' + 'This will turn on the CUDNN deterministic setting, ' + 'which can slow down your training considerably! ' + 'You may see unexpected behavior when restarting ' + 'from checkpoints.') + + if args.gpu is not None: + warnings.warn('You have chosen a specific GPU. This will completely ' + 'disable data parallelism.') + + if args.dist_url == "env://" and args.world_size == -1: + args.world_size = int(os.environ["WORLD_SIZE"]) + + args.distributed = args.world_size > 1 or args.multiprocessing_distributed + + ngpus_per_node = torch.cuda.device_count() + if args.multiprocessing_distributed: + # Since we have ngpus_per_node processes per node, the total world_size + # needs to be adjusted accordingly + args.world_size = ngpus_per_node * args.world_size + # Use torch.multiprocessing.spawn to launch distributed processes: the + # main_worker process function + mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) + else: + # Simply call main_worker function + main_worker(args.gpu, ngpus_per_node, args) + + +def main_worker(gpu, ngpus_per_node, args): + global best_acc1 + args.gpu = gpu + + if args.gpu is not None: + print("Use GPU: {} for training".format(args.gpu)) + + if args.distributed: + if args.dist_url == "env://" and args.rank == -1: + args.rank = int(os.environ["RANK"]) + if args.multiprocessing_distributed: + print("env rank:",int(os.environ['PAI_TASK_INDEX']) * ngpus_per_node + gpu) + os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] + os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] + dist.init_process_group(backend=args.dist_backend, init_method='env://', + world_size=args.world_size, rank = int(os.environ['PAI_TASK_INDEX']) * ngpus_per_node + gpu) + # create model + if args.pretrained: + print("=> using pre-trained model '{}'".format(args.arch)) + model = models.__dict__[args.arch](pretrained=True) + else: + print("=> creating model '{}'".format(args.arch)) + model = models.__dict__[args.arch]() + if not torch.cuda.is_available(): + print('using CPU, this will be slow') + elif args.distributed: + # For multiprocessing distributed, DistributedDataParallel constructor + # should always set the single device scope, otherwise, + # DistributedDataParallel will use all available devices. + if args.gpu is not None: + torch.cuda.set_device(args.gpu) + model.cuda(args.gpu) + # When using a single GPU per process and per + # DistributedDataParallel, we need to divide the batch size + # ourselves based on the total number of GPUs we have + args.batch_size = int(args.batch_size / ngpus_per_node) + args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node) + model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) + else: + model.cuda() + # DistributedDataParallel will divide and allocate batch_size to all + # available GPUs if device_ids are not set + model = torch.nn.parallel.DistributedDataParallel(model) + elif args.gpu is not None: + torch.cuda.set_device(args.gpu) + model = model.cuda(args.gpu) + else: + # DataParallel will divide and allocate batch_size to all available GPUs + if args.arch.startswith('alexnet') or args.arch.startswith('vgg'): + model.features = torch.nn.DataParallel(model.features) + model.cuda() + else: + model = torch.nn.DataParallel(model).cuda() + + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(args.gpu) + + optimizer = torch.optim.SGD(model.parameters(), args.lr, + momentum=args.momentum, + weight_decay=args.weight_decay) + + # optionally resume from a checkpoint + if args.resume: + if os.path.isfile(args.resume): + print("=> loading checkpoint '{}'".format(args.resume)) + if args.gpu is None: + checkpoint = torch.load(args.resume) + else: + # Map model to be loaded to specified single gpu. + loc = 'cuda:{}'.format(args.gpu) + checkpoint = torch.load(args.resume, map_location=loc) + args.start_epoch = checkpoint['epoch'] + best_acc1 = checkpoint['best_acc1'] + if args.gpu is not None: + # best_acc1 may be from a checkpoint from a different GPU + best_acc1 = best_acc1.to(args.gpu) + model.load_state_dict(checkpoint['state_dict']) + optimizer.load_state_dict(checkpoint['optimizer']) + print("=> loaded checkpoint '{}' (epoch {})" + .format(args.resume, checkpoint['epoch'])) + else: + print("=> no checkpoint found at '{}'".format(args.resume)) + + cudnn.benchmark = True + + # Data loading code + traindir = os.path.join(args.data, 'train') + valdir = os.path.join(args.data, 'val') + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + + train_dataset = datasets.ImageFolder( + traindir, + transforms.Compose([ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + normalize, + ])) + + if args.distributed: + train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) + else: + train_sampler = None + + train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), + num_workers=args.workers, pin_memory=True, sampler=train_sampler) + + val_loader = torch.utils.data.DataLoader( + datasets.ImageFolder(valdir, transforms.Compose([ + transforms.Resize(256), + transforms.CenterCrop(224), + transforms.ToTensor(), + normalize, + ])), + batch_size=args.batch_size, shuffle=False, + num_workers=args.workers, pin_memory=True) + + if args.evaluate: + validate(val_loader, model, criterion, args) + return + + for epoch in range(args.start_epoch, args.epochs): + if args.distributed: + train_sampler.set_epoch(epoch) + adjust_learning_rate(optimizer, epoch, args) + + # train for one epoch + train(train_loader, model, criterion, optimizer, epoch, args) + + # evaluate on validation set + acc1 = validate(val_loader, model, criterion, args) + + # remember best acc@1 and save checkpoint + is_best = acc1 > best_acc1 + best_acc1 = max(acc1, best_acc1) + + if not args.multiprocessing_distributed or (args.multiprocessing_distributed + and args.rank % ngpus_per_node == 0): + save_checkpoint({ + 'epoch': epoch + 1, + 'arch': args.arch, + 'state_dict': model.state_dict(), + 'best_acc1': best_acc1, + 'optimizer' : optimizer.state_dict(), + }, is_best) + +def train(train_loader, model, criterion, optimizer, epoch, args): + batch_time = AverageMeter('Time', ':6.3f') + data_time = AverageMeter('Data', ':6.3f') + losses = AverageMeter('Loss', ':.4e') + top1 = AverageMeter('Acc@1', ':6.2f') + top5 = AverageMeter('Acc@5', ':6.2f') + progress = ProgressMeter( + len(train_loader), + [batch_time, data_time, losses, top1, top5], + prefix="Epoch: [{}]".format(epoch)) + + # switch to train mode + model.train() + + end = time.time() + for i, (images, target) in enumerate(train_loader): + # measure data loading time + data_time.update(time.time() - end) + + if args.gpu is not None: + images = images.cuda(args.gpu, non_blocking=True) + if torch.cuda.is_available(): + target = target.cuda(args.gpu, non_blocking=True) + + # compute output + output = model(images) + loss = criterion(output, target) + + # measure accuracy and record loss + acc1, acc5 = accuracy(output, target, topk=(1, 5)) + losses.update(loss.item(), images.size(0)) + top1.update(acc1[0], images.size(0)) + top5.update(acc5[0], images.size(0)) + + # compute gradient and do SGD step + optimizer.zero_grad() + loss.backward() + optimizer.step() + + # measure elapsed time + batch_time.update(time.time() - end) + end = time.time() + + if i % args.print_freq == 0: + progress.display(i) + + +def validate(val_loader, model, criterion, args): + batch_time = AverageMeter('Time', ':6.3f') + losses = AverageMeter('Loss', ':.4e') + top1 = AverageMeter('Acc@1', ':6.2f') + top5 = AverageMeter('Acc@5', ':6.2f') + progress = ProgressMeter( + len(val_loader), + [batch_time, losses, top1, top5], + prefix='Test: ') + + # switch to evaluate mode + model.eval() + + with torch.no_grad(): + end = time.time() + for i, (images, target) in enumerate(val_loader): + if args.gpu is not None: + images = images.cuda(args.gpu, non_blocking=True) + if torch.cuda.is_available(): + target = target.cuda(args.gpu, non_blocking=True) + + # compute output + output = model(images) + loss = criterion(output, target) + + # measure accuracy and record loss + acc1, acc5 = accuracy(output, target, topk=(1, 5)) + losses.update(loss.item(), images.size(0)) + top1.update(acc1[0], images.size(0)) + top5.update(acc5[0], images.size(0)) + + # measure elapsed time + batch_time.update(time.time() - end) + end = time.time() + + if i % args.print_freq == 0: + progress.display(i) + + # TODO: this should also be done with the ProgressMeter + print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}' + .format(top1=top1, top5=top5)) + + return top1.avg + + +def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'): + torch.save(state, filename) + if is_best: + shutil.copyfile(filename, 'model_best.pth.tar') + + +class AverageMeter(object): + """Computes and stores the average and current value""" + def __init__(self, name, fmt=':f'): + self.name = name + self.fmt = fmt + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + + def __str__(self): + fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' + return fmtstr.format(**self.__dict__) + + +class ProgressMeter(object): + def __init__(self, num_batches, meters, prefix=""): + self.batch_fmtstr = self._get_batch_fmtstr(num_batches) + self.meters = meters + self.prefix = prefix + + def display(self, batch): + entries = [self.prefix + self.batch_fmtstr.format(batch)] + entries += [str(meter) for meter in self.meters] + print('\t'.join(entries)) + + def _get_batch_fmtstr(self, num_batches): + num_digits = len(str(num_batches // 1)) + fmt = '{:' + str(num_digits) + 'd}' + return '[' + fmt + '/' + fmt.format(num_batches) + ']' + + +def adjust_learning_rate(optimizer, epoch, args): + """Sets the learning rate to the initial LR decayed by 10 every 30 epochs""" + lr = args.lr * (0.1 ** (epoch // 30)) + for param_group in optimizer.param_groups: + param_group['lr'] = lr + + +def accuracy(output, target, topk=(1,)): + """Computes the accuracy over the k top predictions for the specified values of k""" + with torch.no_grad(): + maxk = max(topk) + batch_size = target.size(0) + + _, pred = output.topk(maxk, 1, True, True) + pred = pred.t() + correct = pred.eq(target.view(1, -1).expand_as(pred)) + + res = [] + for k in topk: + correct_k = correct[:k].view(-1).float().sum(0, keepdim=True) + res.append(correct_k.mul_(100.0 / batch_size)) + return res + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml b/examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml new file mode 100644 index 0000000000..349f6964a2 --- /dev/null +++ b/examples/Distributed-example/cifar10-single-mul-DDP-gloo-Apex-mixed.yaml @@ -0,0 +1,46 @@ +protocolVersion: 2 +name: Apex-cifar-10-gloo-2node-4gpu-change +type: job +jobRetryCount: 0 +prerequisites: + - type: dockerimage + uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' + name: docker_image_0 +taskRoles: + worker: + instances: 2 + completion: + minFailedInstances: 1 + taskRetryCount: 0 + dockerImage: docker_image_0 + resourcePerInstance: + gpu: 2 + cpu: 8 + memoryMB: 16384 + ports: + SynPort: 1 + commands: + - export GLOO_SOCKET_IFNAME=eth0 + - 'git clone https://github.com/NVIDIA/apex' + - cd apex + - python setup.py install + - cd .. + - >- + wget + https://raw.githubusercontent.com/microsoft/pai/master/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py + - >- + python cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py -n 2 -g 2 + --epochs 2 --dist-backend gloo +defaults: + virtualCluster: default +extras: + com.microsoft.pai.runtimeplugin: + - plugin: ssh + parameters: + jobssh: true + userssh: {} + hivedScheduler: + taskRoles: + worker: + skuNum: 1 + skuType: null diff --git a/examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml b/examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml new file mode 100644 index 0000000000..ef51b3b798 --- /dev/null +++ b/examples/Distributed-example/cifar10-single-mul-DDP-gloo.yaml @@ -0,0 +1,46 @@ +protocolVersion: 2 +name: cifar10-single-mul-DDP-gloo-1_5b3c1b5c +type: job +jobRetryCount: 0 +prerequisites: + - type: dockerimage + uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' + name: docker_image_0 +taskRoles: + worker: + instances: 2 + completion: + minFailedInstances: 1 + taskRetryCount: 0 + dockerImage: docker_image_0 + resourcePerInstance: + gpu: 2 + cpu: 8 + memoryMB: 16384 + ports: + SynPort: 1 + commands: + - export GLOO_SOCKET_IFNAME=eth0 + - 'git clone https://github.com/NVIDIA/apex' + - cd apex + - python setup.py install + - cd .. + - >- + wget + https://raw.githubusercontent.com/microsoft/pai/master/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo.py + - >- + python cifar10-single-mul-DDP-nccl-gloo.py -n 2 -g 2 --epochs 2 + --dist-backend gloo +defaults: + virtualCluster: default +extras: + com.microsoft.pai.runtimeplugin: + - plugin: ssh + parameters: + jobssh: true + userssh: {} + hivedScheduler: + taskRoles: + worker: + skuNum: 1 + skuType: null diff --git a/examples/Distributed-example/cifar10-single-mul-DDP-nccl-Apex-mixed.yaml b/examples/Distributed-example/cifar10-single-mul-DDP-nccl-Apex-mixed.yaml new file mode 100644 index 0000000000..6fd46a3efa --- /dev/null +++ b/examples/Distributed-example/cifar10-single-mul-DDP-nccl-Apex-mixed.yaml @@ -0,0 +1,45 @@ +protocolVersion: 2 +name: Apex-cifar-10-nccl-2node-4gpu_2_748a8371 +type: job +jobRetryCount: 0 +prerequisites: + - type: dockerimage + uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' + name: docker_image_0 +taskRoles: + worker: + instances: 2 + completion: + minFailedInstances: 1 + taskRetryCount: 0 + dockerImage: docker_image_0 + resourcePerInstance: + gpu: 2 + cpu: 8 + memoryMB: 16384 + ports: + SynPort: 1 + commands: + - 'git clone https://github.com/NVIDIA/apex' + - cd apex + - python setup.py install + - cd .. + - >- + wget + https://raw.githubusercontent.com/microsoft/pai/master/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py + - >- + python cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py -n 2 -g 2 + --epochs 2 --dist-backend nccl +defaults: + virtualCluster: default +extras: + com.microsoft.pai.runtimeplugin: + - plugin: ssh + parameters: + jobssh: true + userssh: {} + hivedScheduler: + taskRoles: + worker: + skuNum: 1 + skuType: null diff --git a/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py b/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py new file mode 100644 index 0000000000..5f3ad33af2 --- /dev/null +++ b/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo-Apex-mixed.py @@ -0,0 +1,125 @@ +import os +from datetime import datetime +import argparse +import torch.multiprocessing as mp +import torch.backends.cudnn as cudnn +import torchvision +import torchvision.transforms as transforms +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.distributed as dist +from apex.parallel import DistributedDataParallel as DDP +from apex import amp + +def main(): + print('run main') + parser = argparse.ArgumentParser() + parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N', + help='number of data loading workers (default: 4)') + parser.add_argument('-g', '--gpus', default=1, type=int, + help='number of gpus per node') + parser.add_argument('-nr', '--nr', default=0, type=int, + help='ranking within the nodes') + parser.add_argument('--epochs', default=2, type=int, metavar='N', + help='number of total epochs to run') + parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') + args = parser.parse_args() + args.world_size = args.gpus * args.nodes + print('world_size:',args.world_size) + os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] + os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] + print('master:', os.environ['MASTER_ADDR'], 'port:', os.environ['MASTER_PORT']) + mp.spawn(train, nprocs=args.gpus, args=(args,)) +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = F.max_pool2d(F.relu(self.conv1(x)), 2) + x = F.max_pool2d(F.relu(self.conv2(x)), 2) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x +def train(gpu, args): + print("start train") + rank = int(os.environ['PAI_TASK_INDEX']) * args.gpus + gpu + dist.init_process_group(backend=args.dist_backend, init_method='env://', world_size=args.world_size, rank=rank) + torch.manual_seed(0) + model=Net() + torch.cuda.set_device(gpu) + model.cuda(gpu) + batch_size = 100 + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(gpu) + optimizer = torch.optim.SGD(model.parameters(), 1e-4) + # Wrap the model + model, optimizer = amp.initialize(model, optimizer, opt_level='O2') + model = DDP(model) + # Data loading code + transform_train = transforms.Compose([ + transforms.RandomCrop(32, padding=4), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), + ]) + + transform_test = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), + ]) + + trainset = torchvision.datasets.CIFAR10( + root='./data', train=True, download=True, transform=transform_train) + trainsampler = torch.utils.data.distributed.DistributedSampler( + trainset, + num_replicas=args.world_size, + rank=rank, + shuffle=True, + ) + trainloader = torch.utils.data.DataLoader( + trainset, batch_size=batch_size, shuffle=False, num_workers=2, sampler=trainsampler) + + testset = torchvision.datasets.CIFAR10( + root='./data', train=False, download=True, transform=transform_test) + testloader = torch.utils.data.DataLoader( + testset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True, sampler=trainsampler) + + classes = ('plane', 'car', 'bird', 'cat', 'deer', + 'dog', 'frog', 'horse', 'ship', 'truck') + start = datetime.now() + total_step = len(trainloader) + for epoch in range(args.epochs): + for i, (images, labels) in enumerate(trainloader): + images = images.cuda(non_blocking=True) + labels = labels.cuda(non_blocking=True) + # Forward pass + outputs = model(images) + loss = criterion(outputs, labels) + + # Backward and optimize + optimizer.zero_grad() + with amp.scale_loss(loss, optimizer) as scaled_loss: + scaled_loss.backward() + optimizer.step() + print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( + epoch + 1, + args.epochs, + i + 1, + total_step, + loss.item()) + ) + if gpu == 0: + print("Training complete in: " + str(datetime.now() - start)) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo.py b/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo.py new file mode 100644 index 0000000000..d55104b58a --- /dev/null +++ b/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo.py @@ -0,0 +1,122 @@ +import os +from datetime import datetime +import argparse +import torch.multiprocessing as mp +import torch.backends.cudnn as cudnn +import torchvision +import torchvision.transforms as transforms +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.distributed as dist +from apex.parallel import DistributedDataParallel as DDP +from apex import amp +def main(): + print('run main') + parser = argparse.ArgumentParser() + parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N', + help='number of data loading workers (default: 4)') + parser.add_argument('-g', '--gpus', default=1, type=int, + help='number of gpus per node') + parser.add_argument('-nr', '--nr', default=0, type=int, + help='ranking within the nodes') + parser.add_argument('--epochs', default=2, type=int, metavar='N', + help='number of total epochs to run') + parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') + args = parser.parse_args() + args.world_size = args.gpus * args.nodes + print('world_size:',args.world_size) + os.environ['MASTER_ADDR'] = os.environ['PAI_HOST_IP_worker_0'] + os.environ['MASTER_PORT'] = os.environ['PAI_worker_0_SynPort_PORT'] + print('master:', os.environ['MASTER_ADDR'], 'port:', os.environ['MASTER_PORT']) + mp.spawn(train, nprocs=args.gpus, args=(args,)) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = F.max_pool2d(F.relu(self.conv1(x)), 2) + x = F.max_pool2d(F.relu(self.conv2(x)), 2) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x + +def train(gpu, args): + print("start train") + rank = int(os.environ['PAI_TASK_INDEX']) * args.gpus + gpu + dist.init_process_group(backend=args.dist_backend, init_method='env://', world_size=args.world_size, rank=rank) + torch.manual_seed(0) + model=Net() + torch.cuda.set_device(gpu) + model.cuda(gpu) + batch_size = 100 + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(gpu) + optimizer = torch.optim.SGD(model.parameters(), 1e-4) + # Wrap the model + model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) + # Data loading code + transform_train = transforms.Compose([ + transforms.RandomCrop(32, padding=4), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), + ]) + + transform_test = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), + ]) + + trainset = torchvision.datasets.CIFAR10( + root='./data', train=True, download=True, transform=transform_train) + + trainsampler = torch.utils.data.distributed.DistributedSampler( + trainset, + num_replicas=args.world_size, + rank=rank, + shuffle=True, + ) + trainloader = torch.utils.data.DataLoader( + trainset, batch_size=batch_size, shuffle=False, num_workers=2, sampler=trainsampler) + + testset = torchvision.datasets.CIFAR10( + root='./data', train=False, download=True, transform=transform_test) + testloader = torch.utils.data.DataLoader( + testset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True, sampler=trainsampler) + + classes = ('plane', 'car', 'bird', 'cat', 'deer', + 'dog', 'frog', 'horse', 'ship', 'truck') + start = datetime.now() + total_step = len(trainloader) + for epoch in range(args.epochs): + for i, (images, labels) in enumerate(trainloader): + images = images.cuda(non_blocking=True) + labels = labels.cuda(non_blocking=True) + # Forward pass + outputs = model(images) + loss = criterion(outputs, labels) + + # Backward and optimize + optimizer.zero_grad() + loss.backward() + optimizer.step() + #if (i + 1) % 100 == 0 and gpu == 0: + print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step, + loss.item())) + if gpu == 0: + print("Training complete in: " + str(datetime.now() - start)) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml b/examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml new file mode 100644 index 0000000000..23eb6d6c06 --- /dev/null +++ b/examples/Distributed-example/cifar10-single-mul-DDP-nccl.yaml @@ -0,0 +1,45 @@ +protocolVersion: 2 +name: cifar10-single-mul-DDP-nccl-1 +type: job +jobRetryCount: 0 +prerequisites: + - type: dockerimage + uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' + name: docker_image_0 +taskRoles: + worker: + instances: 2 + completion: + minFailedInstances: 1 + taskRetryCount: 0 + dockerImage: docker_image_0 + resourcePerInstance: + gpu: 2 + cpu: 8 + memoryMB: 16384 + ports: + SynPort: 1 + commands: + - 'git clone https://github.com/NVIDIA/apex' + - cd apex + - python setup.py install + - cd .. + - >- + wget + https://raw.githubusercontent.com/microsoft/pai/master/examples/Distributed-example/cifar10-single-mul-DDP-nccl-gloo.py + - >- + python cifar10-single-mul-DDP-nccl-gloo.py -n 2 -g 2 --epochs 2 + --dist-backend nccl +defaults: + virtualCluster: default +extras: + com.microsoft.pai.runtimeplugin: + - plugin: ssh + parameters: + jobssh: true + userssh: {} + hivedScheduler: + taskRoles: + worker: + skuNum: 1 + skuType: null diff --git a/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.py b/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.py new file mode 100644 index 0000000000..526e781d0e --- /dev/null +++ b/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.py @@ -0,0 +1,148 @@ +#!/bin/env python +import os +import re +import sys +import argparse +import torch +import torch.cuda as cuda +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +import torch.backends.cudnn as cudnn +import torchvision +import torchvision.transforms as transforms +def parse_args(): + # Training configurations + parser = argparse.ArgumentParser(description='Configuration for cifar training') + parser.add_argument('--lr', default=0.1, type=float, help='Learing Rate') + parser.add_argument('--batchsize', type=int, default=128, help='Batchsize for training') + parser.add_argument('--epoch', type=int, default=200, help='The number of epochs') + parser.add_argument('--momentum', type=float, default=0.9, help='Momentum value for optimizer') + parser.add_argument('--weight_decay', type=float, default=5e-4, help='Weight decay for the optimizer') + parser.add_argument('--cpu', default=False, action='store_true', help='Only use CPU to train') + parser.add_argument('--gpuid', default='0', type=str, help='Gpus used for training') + parser.add_argument('--outdir', type=str, default='./log', help='Outdir of results') + return parser.parse_args() +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = F.max_pool2d(F.relu(self.conv1(x)), 2) + x = F.max_pool2d(F.relu(self.conv2(x)), 2) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x +def build_net(args): + print('Build network') + net = Net() + if args.cpu: + # Use CPU to run the model + args.gpu = False + return net + if not cuda.is_available() or cuda.device_count() == 0: + # If the cuda is not available or GPU count equals to 0 + args.gpu = False + return net + args.gpu = True + args.gpus = list(filter(lambda x: len(x) > 0, re.split(',', args.gpuid))) + if len(args.gpus) > 1: + net = nn.DataParallel(net) + return net.cuda() +def train_epoch(net, train_loader, optimizer, args): + net.train() + loss_sum = 0 + count = 0 + correct = 0 + for bid, (data, target) in enumerate(train_loader): + if args.gpu: + data, target = data.cuda(), target.cuda() + optimizer.zero_grad() + output = net(data) + loss = F.cross_entropy(output, target) + # print(bid,' ',loss.item()) + loss.backward() + optimizer.step() + + loss_sum += loss.item() + count += data.size(0) + _, predict = output.max(1) + correct += predict.eq(target).sum().item() + print('Loss: %.3f , Accuracy: %.3f' % (loss_sum / len(train_loader), correct / count)) + + +def validate(net, data_loader, args): + net.eval() + loss_sum = 0 + correct = 0 + count = 0 + with torch.no_grad(): + for bid, (data, target) in enumerate(data_loader): + if args.gpu: + data, target = data.cuda(), target.cuda() + output = net(data) + loss = F.cross_entropy(output, target) + _, predict = output.max(1) + loss_sum += loss.item() + count += target.size(0) + correct += predict.eq(target).sum().item() + print('Loss: %.3f Accuracy: %.3f' % (loss_sum / len(data_loader), correct / count)) + return correct / count + + +def prepare_data(args): + print('==> Preparing Data...') + cifar_transform_train = transforms.Compose([ + transforms.RandomCrop(32, padding=4), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), + ]) + cifar_transform_val = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)) + ]) + train_set = torchvision.datasets.CIFAR10(root='./data', transform=cifar_transform_train, train=True, download=True) + val_set = torchvision.datasets.CIFAR10(root='./data', transform=cifar_transform_val, train=False, download=True) + train_loader = torch.utils.data.DataLoader(train_set, batch_size=args.batchsize, shuffle=True, num_workers=4) + val_loader = torch.utils.data.DataLoader(val_set, batch_size=args.batchsize, shuffle=False, num_workers=4) + return train_loader, val_loader +def train(args): + train_loader, val_loader = prepare_data(args) + net = build_net(args) + optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay) + lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[int(args.epoch * 0.5), int(args.epoch * 0.75)], + gamma=0.1) + best_acc = 0 + checkpoint = {} + for epochid in range(args.epoch): + print("==> Training Epoch %d, Learning Rate %.4f" % (epochid, lr_scheduler.get_lr()[0])) + train_epoch(net, train_loader, optimizer, args) + print('==> Validating ') + acc = validate(net, val_loader, args) + lr_scheduler.step() + if acc > best_acc: + best_acc = acc + if args.cpu or len(args.gpus) == 1: + # Use cpu or one single gpu to train the model + checkpoint = net.state_dict() + elif len(args.gpus) > 1: + checkpoint = net.module.state_dict() + + print('Best Accuracy: ', best_acc) + +if __name__ == '__main__': + args = parse_args() + os.environ['CUDA_VISIBLE_DEVICES'] = args.gpuid + train(args) + + + + diff --git a/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.yaml b/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.yaml new file mode 100644 index 0000000000..bdefb68f1e --- /dev/null +++ b/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.yaml @@ -0,0 +1,32 @@ +protocolVersion: 2 +name: vv-cifar10-single-node-gpus-cpu-DP-4 +type: job +jobRetryCount: 0 +prerequisites: + - type: dockerimage + uri: 'openpai/standard:python_3.6-pytorch_1.2.0-gpu' + name: docker_image_0 + +taskRoles: + taskrole: + instances: 1 + completion: + minFailedInstances: 1 + taskRetryCount: 0 + dockerImage: docker_image_0 + resourcePerInstance: + gpu: 4 + cpu: 16 + memoryMB: 32768 + commands: + - >- + wget + https://raw.githubusercontent.com/microsoft/pai/master/examples/Distributed-example/cifar10-single-node-gpus-cpu-DP.py + - 'python cifar10-single-node-gpus-cpu-DP.py --gpuid 0,1,2,3' +defaults: + virtualCluster: default +extras: + com.microsoft.pai.runtimeplugin: + - plugin: ssh + parameters: + jobssh: true diff --git a/mkdocs.yml b/mkdocs.yml index da8989579d..68128f44fc 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,6 +7,7 @@ nav: - How to Manage Data: manual/cluster-user/how-to-manage-data.md - How to Debug Jobs: manual/cluster-user/how-to-debug-jobs.md - How to Use Advanced Job Settings: manual/cluster-user/how-to-use-advanced-job-settings.md + - How to Run Distributed Job: manual/cluster-user/how-to-run-distributed-job.md - Use Marketplace: manual/cluster-user/use-marketplace.md - Use VSCode Extension: manual/cluster-user/use-vscode-extension.md - Use Jupyter Notebook Extension: manual/cluster-user/use-jupyter-notebook-extension.md