Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Speed] feature/ParallelExecutor #8891

Closed
53 changes: 53 additions & 0 deletions doc/design/parallel_executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# ParallelExecutor Design Doc

## Introduction

We introduce `ParallelExecutor`, as an alternative solution to `ParallelDo`, to run multi-GPU
training in PaddlePaddle Fluid. Essentially, it is a python wrapper of a list of `Executor`s
and `Scopes`s. In addition, it maintains inter-device variables such as NCCL communicator.

To train a neural network multi GPUs, a user only need to make the following modifications in
the code
1. specify `append_all_reduce` flag in the optimizer
1. use `ParallelExecutor` to run the `programDesc`.

```python
cost = your_neural_network()

opt = fluid.optimizer.SGDOptimizer(..., append_all_reduce=True)
Copy link
Contributor

@panyx0718 panyx0718 Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if append_all_reduce=True is not set, will it use other way to send gradients to each other? Or just raise exception?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it's not handled.

opt.minimize(avg_cost)

exe = fluid.ParallelExecutor(gpu_list=[0, 1])
```

## Design
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Briefly describe the plan for model save/restore?


#### ParallelExecutor

A `ParallelExecutor` contains a list of `Executor`s and its associated `Scope`s. All
the `Scope`s are the subscopes of `ParallelExecutor`'s scope, hence they have access
to inter-device variables such as NCCL communicator.

```

/ SubScope 0, contains weights on GPU 0
Scope, contains NCCL Communicator -- SubScope 1, contains weights on GPU 1
\ ...
```

During the runtime, we start `#gpu` python threads to run each `Executor`s.

#### Optimize with AllReduce op

During the construction of the optimization path, AllReduce Op is added to the ProgramDesc.


## API

The `ParallelExecutor.run` has similar interface as `Executor.run`. Besides
1. Scope: we don't expose `scope` in `ParallelExecutor.run` since `ParallelExecutor` has its
own scope to maintain NCCL.
1. Feed: we don't expose `feed` in the API either, because the whole point of implementing
parallel_executor is the speed. The input for NN should be implemented in an reader OP.
1. Fetch: we return the fetched value on all GPUs as a list. (e.g. `exe.run(..., fetch=loss)`
with return `[loss_on_gpu0, loss_on_gpu1]`)
8 changes: 8 additions & 0 deletions paddle/fluid/operators/nccl/nccl_gpu_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,13 @@ const std::vector<ncclComm_t>& Communicator::comms() const {
return *global_comms;
}

const char* GlobalNCCLCommunicatorName() { return "__nccl_all_reduce_com__"; }

void InitNCCLCom(framework::Scope* s, std::vector<int> gpus) {
auto com = s->Var(GlobalNCCLCommunicatorName())
->GetMutable<platform::Communicator>();
com->InitAll(gpus);
}

} // namespace platform
} // namespace paddle
5 changes: 5 additions & 0 deletions paddle/fluid/operators/nccl/nccl_gpu_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License. */
#include <unordered_map>
#include <vector>

#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h"
Expand All @@ -41,5 +42,9 @@ struct Communicator {
const std::vector<ncclComm_t>& comms() const;
};

const char* GlobalNCCLCommunicatorName();

void InitNCCLCom(framework::Scope*, std::vector<int>);

} // namespace platform
} // namespace paddle
2 changes: 1 addition & 1 deletion paddle/fluid/pybind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ if(WITH_PYTHON)
DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method
${GLOB_OP_LIB})
if(NOT APPLE AND NOT ANDROID)
target_link_libraries(paddle_pybind rt)
target_link_libraries(paddle_pybind rt pybind)
endif(NOT APPLE AND NOT ANDROID)
endif(WITH_PYTHON)
11 changes: 8 additions & 3 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,11 @@ All parameter, weight, gradient are variables in Paddle.

py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>())
.def("run",
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
Executor::Run);
.def("run", [](framework::Executor &self, const ProgramDesc &p, Scope *s,
int i, bool b1, bool b2) -> void {
py::gil_scoped_release release_guard;
self.Run(p, s, i, b1, b2);
});

m.def("init_gflags", framework::InitGflags);
m.def("init_glog", framework::InitGLOG);
Expand Down Expand Up @@ -450,6 +452,9 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_CUDA
m.def("get_cuda_device_count", platform::GetCUDADeviceCount);

m.def("get_nccl_com_name", &platform::GlobalNCCLCommunicatorName);
m.def("init_nccl_com", &platform::InitNCCLCom);

m.def("nvprof_init", platform::CudaProfilerInit);
m.def("nvprof_start", platform::CudaProfilerStart);
m.def("nvprof_stop", platform::CudaProfilerStop);
Expand Down
4 changes: 3 additions & 1 deletion python/paddle/fluid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# import all class inside executor into fluid module
import executor
from executor import *
import parallel_executor
from parallel_executor import *

import io
import evaluator
Expand All @@ -43,7 +45,7 @@

Tensor = LoDTensor

__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + [
__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + parallel_executor.__all__ + [
'io',
'initializer',
'layers',
Expand Down
32 changes: 31 additions & 1 deletion python/paddle/fluid/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from collections import defaultdict

import framework
import core
import layers
from backward import append_backward
from framework import program_guard
Expand All @@ -35,7 +36,11 @@ class Optimizer(object):
but need to use one of it's implementation.
"""

def __init__(self, learning_rate, regularization=None):
def __init__(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments for the arguments?

learning_rate,
global_step=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to put global step next to avoid breaking exiting users?

regularization=None,
append_all_reduce=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this to be an implementation detail. Can we avoid exposing it to user?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

if not isinstance(learning_rate, float) and \
not isinstance(learning_rate, framework.Variable):
raise TypeError("learning rate should be float or Variable")
Expand All @@ -53,6 +58,7 @@ def __init__(self, learning_rate, regularization=None):
# {accum_name : { paramter_name : accumulator_for_parameter, ...}, ...}
self._accumulators = defaultdict(lambda: dict())
self.helper = None
self.append_all_reduce = append_all_reduce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a public attribute?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be private.


def _create_global_learning_rate(self):
lr = self.global_learning_rate()
Expand Down Expand Up @@ -219,6 +225,30 @@ def minimize(self,
"""
params_grads = append_backward(loss, parameter_list, no_grad_set,
[error_clip_callback])
if self.append_all_reduce:
global_block = loss.block.program.global_block()
dummy_communicator = global_block.create_var(
type=core.VarDesc.VarType.RAW,
name=str(core.get_nccl_com_name()))
for (_, grad) in params_grads:
if grad is None:
continue
all_reduced_var = global_block.create_var(
name=grad.name + '__nccl_all_reduce__')
global_block.append_op(
type='ncclAllReduce',
inputs={'X': [grad],
"Communicator": [dummy_communicator]},
outputs={'Out': [all_reduced_var]},
attrs={'reduction': 'ncclSum'})
global_block.append_op(
type="assign",
inputs={"X": [all_reduced_var]},
outputs={"Out": [grad]})

# The real nccl_com will be created by the parallel executor.
# Change the nccl_com name in the blockDesc to dummy
dummy_communicator.name = "__nccl_com_dummy__"

params_grads = append_gradient_clip_ops(params_grads)

Expand Down
73 changes: 73 additions & 0 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
from threading import Lock

import executor
import time
from . import core
from threading import Thread
from Queue import Queue
from core import CUDAPlace

__all__ = ['ParallelExecutor']


def run_exe(q, idx, exe, program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy):
q.put((idx, exe.run(program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy)))


class ParallelExecutor(object):
def __init__(self, gpu_list):
if not core.is_compiled_with_cuda():
raise RuntimeError("ParallelExecutor only supports GPU version")

self.executors = []
for gpu_id in gpu_list:
self.executors.append(executor.Executor(CUDAPlace(gpu_id)))

self.scope = core.Scope()
self.scopes = {}
for idx, _ in enumerate(self.executors):
self.scopes[idx] = self.scope.new_scope()

core.init_nccl_com(self.scope, gpu_list)

def run(self,
program=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to run part of the program in multi-gpus and keep others in 1 gpu or cpu

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.

feed=None,
Copy link
Contributor

@panyx0718 panyx0718 Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the design doc, you said feed won't be exposed?

fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch',
return_numpy=True):
# TODO(helin): split input
q = Queue(maxsize=len(self.executors))
for idx, exe in enumerate(self.executors):
cur_scope = self.scopes[idx]
t = Thread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about doing multi-threading in Python for such important computation, but maybe I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think need an Executor written in c++ and wrap it in python, so the thread in CPU can use multiple cores.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero Right. A design of C++ ParallelExecutor will be submitted in a separate PR.

target=run_exe,
args=(q, idx, exe, program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy))
t.daemon = True
t.start()

results = []
for _ in self.executors:
results.append(q.get())

results.sort(key=lambda x: x[0])
return [val for (gpu_id, val) in results]
127 changes: 127 additions & 0 deletions python/paddle/fluid/tests/book/parallel_executor_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import argparse
import time

import paddle.fluid as fluid

SEED = 1
DTYPE = "float32"

# random seed must set before configuring the network.
# fluid.default_startup_program().random_seed = SEED


def parse_args():
parser = argparse.ArgumentParser("mnist model benchmark.")
parser.add_argument(
'--label_size', type=int, default=10, help='The label size.')
parser.add_argument(
'--batch_size', type=int, default=10, help='The minibatch size.')
parser.add_argument(
'--iterations', type=int, default=5, help='The number of minibatches.')
parser.add_argument(
'--use_nccl',
default=False,
action='store_true',
help='If set, use nccl')
args = parser.parse_args()
return args


def print_arguments(args):
print('----------- Configuration Arguments -----------')
for arg, value in sorted(vars(args).iteritems()):
print('%s: %s' % (arg, value))
print('------------------------------------------------')


def program_summary(program):
print("--------------------")
for block in program.blocks:
for op in block.ops:
outputs = [[x + ":"] + op.output(x) for x in op.output_names]
inputs = [[x + ":"] + op.input(x) for x in op.input_names]
print(block.idx, op.type, inputs, "|", outputs)


def vgg16_bn_drop(input):
def conv_block(input, num_filter, groups, dropouts):
return fluid.nets.img_conv_group(
input=input,
pool_size=2,
pool_stride=2,
conv_num_filter=[num_filter] * groups,
conv_filter_size=3,
conv_act='relu',
conv_with_batchnorm=False,
conv_batchnorm_drop_rate=dropouts,
pool_type='max')

conv1 = conv_block(input, 64, 2, [0.3, 0])
conv2 = conv_block(conv1, 128, 2, [0.4, 0])
conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0])
conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0])
conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0])

fc1 = fluid.layers.fc(input=conv5, size=4096, act=None)
fc2 = fluid.layers.fc(input=fc1, size=4096, act=None)
return fc2


def run_benchmark(args):
# Train program
images = fluid.layers.fill_constant(
shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01)
predict = vgg16_bn_drop(images)
label = fluid.layers.fill_constant(
shape=(args.batch_size, 1), dtype='int64', value=0)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)

# Optimization
# Note the flag append_all_reduce=True
opt = fluid.optimizer.SGDOptimizer(
learning_rate=0.001, append_all_reduce=True)
opt.minimize(avg_cost)

# program_summary(fluid.default_main_program())

exe = fluid.ParallelExecutor(
gpu_list=range(fluid.core.get_cuda_device_count()))

# Parameter initialization
exe.run(fluid.default_startup_program())
Copy link
Contributor

@panyx0718 panyx0718 Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, parameter initialization and other startup computations are also done in parallel N times?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


for iter_id in range(0, args.iterations):
start = time.time()
outs = exe.run(fluid.default_main_program(),
feed={},
fetch_list=[avg_cost, predict, cost])
loss = np.array(outs[0][0])

end = time.time()
print("iter=%d, error=%f, elapse=%f" % (iter_id, loss, (end - start)))


if __name__ == '__main__':
args = parse_args()
print_arguments(args)
run_benchmark(args)