diff --git a/doc/design/parallel_executor.md b/doc/design/parallel_executor.md new file mode 100644 index 0000000000000..4093b4d9ea104 --- /dev/null +++ b/doc/design/parallel_executor.md @@ -0,0 +1,53 @@ +# ParallelExecutor Design Doc + +## Introduction + +We introduce `ParallelExecutor`, as an alternative solution to `ParallelDo`, to run multi-GPU +training in PaddlePaddle Fluid. Essentially, it is a python wrapper of a list of `Executor`s +and `Scopes`s. In addition, it maintains inter-device variables such as NCCL communicator. + +To train a neural network multi GPUs, a user only need to make the following modifications in +the code +1. specify `append_all_reduce` flag in the optimizer +1. use `ParallelExecutor` to run the `programDesc`. + +```python +cost = your_neural_network() + +opt = fluid.optimizer.SGDOptimizer(..., append_all_reduce=True) +opt.minimize(avg_cost) + +exe = fluid.ParallelExecutor(gpu_list=[0, 1]) +``` + +## Design + +#### ParallelExecutor + +A `ParallelExecutor` contains a list of `Executor`s and its associated `Scope`s. All +the `Scope`s are the subscopes of `ParallelExecutor`'s scope, hence they have access +to inter-device variables such as NCCL communicator. + +``` + + / SubScope 0, contains weights on GPU 0 +Scope, contains NCCL Communicator -- SubScope 1, contains weights on GPU 1 + \ ... +``` + +During the runtime, we start `#gpu` python threads to run each `Executor`s. + +#### Optimize with AllReduce op + +During the construction of the optimization path, AllReduce Op is added to the ProgramDesc. + + +## API + +The `ParallelExecutor.run` has similar interface as `Executor.run`. Besides +1. Scope: we don't expose `scope` in `ParallelExecutor.run` since `ParallelExecutor` has its +own scope to maintain NCCL. +1. Feed: we don't expose `feed` in the API either, because the whole point of implementing +parallel_executor is the speed. The input for NN should be implemented in an reader OP. +1. Fetch: we return the fetched value on all GPUs as a list. (e.g. `exe.run(..., fetch=loss)` +with return `[loss_on_gpu0, loss_on_gpu1]`) diff --git a/paddle/fluid/operators/nccl/nccl_gpu_common.cc b/paddle/fluid/operators/nccl/nccl_gpu_common.cc index 08b61765c2f0f..254bb2b09f658 100644 --- a/paddle/fluid/operators/nccl/nccl_gpu_common.cc +++ b/paddle/fluid/operators/nccl/nccl_gpu_common.cc @@ -61,5 +61,13 @@ const std::vector& Communicator::comms() const { return *global_comms; } +const char* GlobalNCCLCommunicatorName() { return "__nccl_all_reduce_com__"; } + +void InitNCCLCom(framework::Scope* s, std::vector gpus) { + auto com = s->Var(GlobalNCCLCommunicatorName()) + ->GetMutable(); + com->InitAll(gpus); +} + } // namespace platform } // namespace paddle diff --git a/paddle/fluid/operators/nccl/nccl_gpu_common.h b/paddle/fluid/operators/nccl/nccl_gpu_common.h index 113f93e346681..f4cb5b74ecfec 100644 --- a/paddle/fluid/operators/nccl/nccl_gpu_common.h +++ b/paddle/fluid/operators/nccl/nccl_gpu_common.h @@ -22,6 +22,7 @@ limitations under the License. */ #include #include +#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" @@ -41,5 +42,9 @@ struct Communicator { const std::vector& comms() const; }; +const char* GlobalNCCLCommunicatorName(); + +void InitNCCLCom(framework::Scope*, std::vector); + } // namespace platform } // namespace paddle diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index d62f34030894e..cc40fdb1c2527 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -4,6 +4,6 @@ if(WITH_PYTHON) DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method ${GLOB_OP_LIB}) if(NOT APPLE AND NOT ANDROID) - target_link_libraries(paddle_pybind rt) + target_link_libraries(paddle_pybind rt pybind) endif(NOT APPLE AND NOT ANDROID) endif(WITH_PYTHON) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index ac7d1efb57750..80479c97e98b1 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -401,9 +401,11 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Executor") .def(py::init()) - .def("run", - (void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) & - Executor::Run); + .def("run", [](framework::Executor &self, const ProgramDesc &p, Scope *s, + int i, bool b1, bool b2) -> void { + py::gil_scoped_release release_guard; + self.Run(p, s, i, b1, b2); + }); m.def("init_gflags", framework::InitGflags); m.def("init_glog", framework::InitGLOG); @@ -450,6 +452,9 @@ All parameter, weight, gradient are variables in Paddle. #ifdef PADDLE_WITH_CUDA m.def("get_cuda_device_count", platform::GetCUDADeviceCount); + m.def("get_nccl_com_name", &platform::GlobalNCCLCommunicatorName); + m.def("init_nccl_com", &platform::InitNCCLCom); + m.def("nvprof_init", platform::CudaProfilerInit); m.def("nvprof_start", platform::CudaProfilerStart); m.def("nvprof_stop", platform::CudaProfilerStop); diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 0df3fd0343dbd..81b2ed4862ba1 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -19,6 +19,8 @@ # import all class inside executor into fluid module import executor from executor import * +import parallel_executor +from parallel_executor import * import io import evaluator @@ -43,7 +45,7 @@ Tensor = LoDTensor -__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + [ +__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + parallel_executor.__all__ + [ 'io', 'initializer', 'layers', diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 1c12d53e4f352..c8647e3bb9924 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -15,6 +15,7 @@ from collections import defaultdict import framework +import core import layers from backward import append_backward from framework import program_guard @@ -35,7 +36,11 @@ class Optimizer(object): but need to use one of it's implementation. """ - def __init__(self, learning_rate, regularization=None): + def __init__(self, + learning_rate, + global_step=None, + regularization=None, + append_all_reduce=False): if not isinstance(learning_rate, float) and \ not isinstance(learning_rate, framework.Variable): raise TypeError("learning rate should be float or Variable") @@ -53,6 +58,7 @@ def __init__(self, learning_rate, regularization=None): # {accum_name : { paramter_name : accumulator_for_parameter, ...}, ...} self._accumulators = defaultdict(lambda: dict()) self.helper = None + self.append_all_reduce = append_all_reduce def _create_global_learning_rate(self): lr = self.global_learning_rate() @@ -219,6 +225,30 @@ def minimize(self, """ params_grads = append_backward(loss, parameter_list, no_grad_set, [error_clip_callback]) + if self.append_all_reduce: + global_block = loss.block.program.global_block() + dummy_communicator = global_block.create_var( + type=core.VarDesc.VarType.RAW, + name=str(core.get_nccl_com_name())) + for (_, grad) in params_grads: + if grad is None: + continue + all_reduced_var = global_block.create_var( + name=grad.name + '__nccl_all_reduce__') + global_block.append_op( + type='ncclAllReduce', + inputs={'X': [grad], + "Communicator": [dummy_communicator]}, + outputs={'Out': [all_reduced_var]}, + attrs={'reduction': 'ncclSum'}) + global_block.append_op( + type="assign", + inputs={"X": [all_reduced_var]}, + outputs={"Out": [grad]}) + + # The real nccl_com will be created by the parallel executor. + # Change the nccl_com name in the blockDesc to dummy + dummy_communicator.name = "__nccl_com_dummy__" params_grads = append_gradient_clip_ops(params_grads) diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py new file mode 100644 index 0000000000000..b9e1c01a32d57 --- /dev/null +++ b/python/paddle/fluid/parallel_executor.py @@ -0,0 +1,73 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from threading import Lock + +import executor +import time +from . import core +from threading import Thread +from Queue import Queue +from core import CUDAPlace + +__all__ = ['ParallelExecutor'] + + +def run_exe(q, idx, exe, program, feed, fetch_list, feed_var_name, + fetch_var_name, cur_scope, return_numpy): + q.put((idx, exe.run(program, feed, fetch_list, feed_var_name, + fetch_var_name, cur_scope, return_numpy))) + + +class ParallelExecutor(object): + def __init__(self, gpu_list): + if not core.is_compiled_with_cuda(): + raise RuntimeError("ParallelExecutor only supports GPU version") + + self.executors = [] + for gpu_id in gpu_list: + self.executors.append(executor.Executor(CUDAPlace(gpu_id))) + + self.scope = core.Scope() + self.scopes = {} + for idx, _ in enumerate(self.executors): + self.scopes[idx] = self.scope.new_scope() + + core.init_nccl_com(self.scope, gpu_list) + + def run(self, + program=None, + feed=None, + fetch_list=None, + feed_var_name='feed', + fetch_var_name='fetch', + return_numpy=True): + # TODO(helin): split input + q = Queue(maxsize=len(self.executors)) + for idx, exe in enumerate(self.executors): + cur_scope = self.scopes[idx] + t = Thread( + target=run_exe, + args=(q, idx, exe, program, feed, fetch_list, feed_var_name, + fetch_var_name, cur_scope, return_numpy)) + t.daemon = True + t.start() + + results = [] + for _ in self.executors: + results.append(q.get()) + + results.sort(key=lambda x: x[0]) + return [val for (gpu_id, val) in results] diff --git a/python/paddle/fluid/tests/book/parallel_executor_example.py b/python/paddle/fluid/tests/book/parallel_executor_example.py new file mode 100644 index 0000000000000..9601e8aef6bd4 --- /dev/null +++ b/python/paddle/fluid/tests/book/parallel_executor_example.py @@ -0,0 +1,127 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import argparse +import time + +import paddle.fluid as fluid + +SEED = 1 +DTYPE = "float32" + +# random seed must set before configuring the network. +# fluid.default_startup_program().random_seed = SEED + + +def parse_args(): + parser = argparse.ArgumentParser("mnist model benchmark.") + parser.add_argument( + '--label_size', type=int, default=10, help='The label size.') + parser.add_argument( + '--batch_size', type=int, default=10, help='The minibatch size.') + parser.add_argument( + '--iterations', type=int, default=5, help='The number of minibatches.') + parser.add_argument( + '--use_nccl', + default=False, + action='store_true', + help='If set, use nccl') + args = parser.parse_args() + return args + + +def print_arguments(args): + print('----------- Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + print('%s: %s' % (arg, value)) + print('------------------------------------------------') + + +def program_summary(program): + print("--------------------") + for block in program.blocks: + for op in block.ops: + outputs = [[x + ":"] + op.output(x) for x in op.output_names] + inputs = [[x + ":"] + op.input(x) for x in op.input_names] + print(block.idx, op.type, inputs, "|", outputs) + + +def vgg16_bn_drop(input): + def conv_block(input, num_filter, groups, dropouts): + return fluid.nets.img_conv_group( + input=input, + pool_size=2, + pool_stride=2, + conv_num_filter=[num_filter] * groups, + conv_filter_size=3, + conv_act='relu', + conv_with_batchnorm=False, + conv_batchnorm_drop_rate=dropouts, + pool_type='max') + + conv1 = conv_block(input, 64, 2, [0.3, 0]) + conv2 = conv_block(conv1, 128, 2, [0.4, 0]) + conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) + conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) + conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) + + fc1 = fluid.layers.fc(input=conv5, size=4096, act=None) + fc2 = fluid.layers.fc(input=fc1, size=4096, act=None) + return fc2 + + +def run_benchmark(args): + # Train program + images = fluid.layers.fill_constant( + shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01) + predict = vgg16_bn_drop(images) + label = fluid.layers.fill_constant( + shape=(args.batch_size, 1), dtype='int64', value=0) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Optimization + # Note the flag append_all_reduce=True + opt = fluid.optimizer.SGDOptimizer( + learning_rate=0.001, append_all_reduce=True) + opt.minimize(avg_cost) + + # program_summary(fluid.default_main_program()) + + exe = fluid.ParallelExecutor( + gpu_list=range(fluid.core.get_cuda_device_count())) + + # Parameter initialization + exe.run(fluid.default_startup_program()) + + for iter_id in range(0, args.iterations): + start = time.time() + outs = exe.run(fluid.default_main_program(), + feed={}, + fetch_list=[avg_cost, predict, cost]) + loss = np.array(outs[0][0]) + + end = time.time() + print("iter=%d, error=%f, elapse=%f" % (iter_id, loss, (end - start))) + + +if __name__ == '__main__': + args = parse_args() + print_arguments(args) + run_benchmark(args)