-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Speed] feature/ParallelExecutor #8891
Changes from all commits
0770935
c5b8d29
540410b
ae7e456
85b1176
ae83f50
20d0f97
9689df5
ef50216
6da0d89
0999119
a27e4d9
b766466
a220790
18289b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# ParallelExecutor Design Doc | ||
|
||
## Introduction | ||
|
||
We introduce `ParallelExecutor`, as an alternative solution to `ParallelDo`, to run multi-GPU | ||
training in PaddlePaddle Fluid. Essentially, it is a python wrapper of a list of `Executor`s | ||
and `Scopes`s. In addition, it maintains inter-device variables such as NCCL communicator. | ||
|
||
To train a neural network multi GPUs, a user only need to make the following modifications in | ||
the code | ||
1. specify `append_all_reduce` flag in the optimizer | ||
1. use `ParallelExecutor` to run the `programDesc`. | ||
|
||
```python | ||
cost = your_neural_network() | ||
|
||
opt = fluid.optimizer.SGDOptimizer(..., append_all_reduce=True) | ||
opt.minimize(avg_cost) | ||
|
||
exe = fluid.ParallelExecutor(gpu_list=[0, 1]) | ||
``` | ||
|
||
## Design | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Briefly describe the plan for model save/restore? |
||
|
||
#### ParallelExecutor | ||
|
||
A `ParallelExecutor` contains a list of `Executor`s and its associated `Scope`s. All | ||
the `Scope`s are the subscopes of `ParallelExecutor`'s scope, hence they have access | ||
to inter-device variables such as NCCL communicator. | ||
|
||
``` | ||
|
||
/ SubScope 0, contains weights on GPU 0 | ||
Scope, contains NCCL Communicator -- SubScope 1, contains weights on GPU 1 | ||
\ ... | ||
``` | ||
|
||
During the runtime, we start `#gpu` python threads to run each `Executor`s. | ||
|
||
#### Optimize with AllReduce op | ||
|
||
During the construction of the optimization path, AllReduce Op is added to the ProgramDesc. | ||
|
||
|
||
## API | ||
|
||
The `ParallelExecutor.run` has similar interface as `Executor.run`. Besides | ||
1. Scope: we don't expose `scope` in `ParallelExecutor.run` since `ParallelExecutor` has its | ||
own scope to maintain NCCL. | ||
1. Feed: we don't expose `feed` in the API either, because the whole point of implementing | ||
parallel_executor is the speed. The input for NN should be implemented in an reader OP. | ||
1. Fetch: we return the fetched value on all GPUs as a list. (e.g. `exe.run(..., fetch=loss)` | ||
with return `[loss_on_gpu0, loss_on_gpu1]`) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
from collections import defaultdict | ||
|
||
import framework | ||
import core | ||
import layers | ||
from backward import append_backward | ||
from framework import program_guard | ||
|
@@ -35,7 +36,11 @@ class Optimizer(object): | |
but need to use one of it's implementation. | ||
""" | ||
|
||
def __init__(self, learning_rate, regularization=None): | ||
def __init__(self, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comments for the arguments? |
||
learning_rate, | ||
global_step=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you might want to put global step next to avoid breaking exiting users? |
||
regularization=None, | ||
append_all_reduce=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel this to be an implementation detail. Can we avoid exposing it to user? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
if not isinstance(learning_rate, float) and \ | ||
not isinstance(learning_rate, framework.Variable): | ||
raise TypeError("learning rate should be float or Variable") | ||
|
@@ -53,6 +58,7 @@ def __init__(self, learning_rate, regularization=None): | |
# {accum_name : { paramter_name : accumulator_for_parameter, ...}, ...} | ||
self._accumulators = defaultdict(lambda: dict()) | ||
self.helper = None | ||
self.append_all_reduce = append_all_reduce | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this a public attribute? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be private. |
||
|
||
def _create_global_learning_rate(self): | ||
lr = self.global_learning_rate() | ||
|
@@ -219,6 +225,30 @@ def minimize(self, | |
""" | ||
params_grads = append_backward(loss, parameter_list, no_grad_set, | ||
[error_clip_callback]) | ||
if self.append_all_reduce: | ||
global_block = loss.block.program.global_block() | ||
dummy_communicator = global_block.create_var( | ||
type=core.VarDesc.VarType.RAW, | ||
name=str(core.get_nccl_com_name())) | ||
for (_, grad) in params_grads: | ||
if grad is None: | ||
continue | ||
all_reduced_var = global_block.create_var( | ||
name=grad.name + '__nccl_all_reduce__') | ||
global_block.append_op( | ||
type='ncclAllReduce', | ||
inputs={'X': [grad], | ||
"Communicator": [dummy_communicator]}, | ||
outputs={'Out': [all_reduced_var]}, | ||
attrs={'reduction': 'ncclSum'}) | ||
global_block.append_op( | ||
type="assign", | ||
inputs={"X": [all_reduced_var]}, | ||
outputs={"Out": [grad]}) | ||
|
||
# The real nccl_com will be created by the parallel executor. | ||
# Change the nccl_com name in the blockDesc to dummy | ||
dummy_communicator.name = "__nccl_com_dummy__" | ||
|
||
params_grads = append_gradient_clip_ops(params_grads) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import print_function | ||
from threading import Lock | ||
|
||
import executor | ||
import time | ||
from . import core | ||
from threading import Thread | ||
from Queue import Queue | ||
from core import CUDAPlace | ||
|
||
__all__ = ['ParallelExecutor'] | ||
|
||
|
||
def run_exe(q, idx, exe, program, feed, fetch_list, feed_var_name, | ||
fetch_var_name, cur_scope, return_numpy): | ||
q.put((idx, exe.run(program, feed, fetch_list, feed_var_name, | ||
fetch_var_name, cur_scope, return_numpy))) | ||
|
||
|
||
class ParallelExecutor(object): | ||
def __init__(self, gpu_list): | ||
if not core.is_compiled_with_cuda(): | ||
raise RuntimeError("ParallelExecutor only supports GPU version") | ||
|
||
self.executors = [] | ||
for gpu_id in gpu_list: | ||
self.executors.append(executor.Executor(CUDAPlace(gpu_id))) | ||
|
||
self.scope = core.Scope() | ||
self.scopes = {} | ||
for idx, _ in enumerate(self.executors): | ||
self.scopes[idx] = self.scope.new_scope() | ||
|
||
core.init_nccl_com(self.scope, gpu_list) | ||
|
||
def run(self, | ||
program=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to run part of the program in multi-gpus and keep others in 1 gpu or cpu There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. |
||
feed=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the design doc, you said feed won't be exposed? |
||
fetch_list=None, | ||
feed_var_name='feed', | ||
fetch_var_name='fetch', | ||
return_numpy=True): | ||
# TODO(helin): split input | ||
q = Queue(maxsize=len(self.executors)) | ||
for idx, exe in enumerate(self.executors): | ||
cur_scope = self.scopes[idx] | ||
t = Thread( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little worried about doing multi-threading in Python for such important computation, but maybe I'm wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think need an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @typhoonzero Right. A design of C++ |
||
target=run_exe, | ||
args=(q, idx, exe, program, feed, fetch_list, feed_var_name, | ||
fetch_var_name, cur_scope, return_numpy)) | ||
t.daemon = True | ||
t.start() | ||
|
||
results = [] | ||
for _ in self.executors: | ||
results.append(q.get()) | ||
|
||
results.sort(key=lambda x: x[0]) | ||
return [val for (gpu_id, val) in results] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import absolute_import | ||
from __future__ import division | ||
from __future__ import print_function | ||
|
||
import numpy as np | ||
import argparse | ||
import time | ||
|
||
import paddle.fluid as fluid | ||
|
||
SEED = 1 | ||
DTYPE = "float32" | ||
|
||
# random seed must set before configuring the network. | ||
# fluid.default_startup_program().random_seed = SEED | ||
|
||
|
||
def parse_args(): | ||
parser = argparse.ArgumentParser("mnist model benchmark.") | ||
parser.add_argument( | ||
'--label_size', type=int, default=10, help='The label size.') | ||
parser.add_argument( | ||
'--batch_size', type=int, default=10, help='The minibatch size.') | ||
parser.add_argument( | ||
'--iterations', type=int, default=5, help='The number of minibatches.') | ||
parser.add_argument( | ||
'--use_nccl', | ||
default=False, | ||
action='store_true', | ||
help='If set, use nccl') | ||
args = parser.parse_args() | ||
return args | ||
|
||
|
||
def print_arguments(args): | ||
print('----------- Configuration Arguments -----------') | ||
for arg, value in sorted(vars(args).iteritems()): | ||
print('%s: %s' % (arg, value)) | ||
print('------------------------------------------------') | ||
|
||
|
||
def program_summary(program): | ||
print("--------------------") | ||
for block in program.blocks: | ||
for op in block.ops: | ||
outputs = [[x + ":"] + op.output(x) for x in op.output_names] | ||
inputs = [[x + ":"] + op.input(x) for x in op.input_names] | ||
print(block.idx, op.type, inputs, "|", outputs) | ||
|
||
|
||
def vgg16_bn_drop(input): | ||
def conv_block(input, num_filter, groups, dropouts): | ||
return fluid.nets.img_conv_group( | ||
input=input, | ||
pool_size=2, | ||
pool_stride=2, | ||
conv_num_filter=[num_filter] * groups, | ||
conv_filter_size=3, | ||
conv_act='relu', | ||
conv_with_batchnorm=False, | ||
conv_batchnorm_drop_rate=dropouts, | ||
pool_type='max') | ||
|
||
conv1 = conv_block(input, 64, 2, [0.3, 0]) | ||
conv2 = conv_block(conv1, 128, 2, [0.4, 0]) | ||
conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) | ||
conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) | ||
conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) | ||
|
||
fc1 = fluid.layers.fc(input=conv5, size=4096, act=None) | ||
fc2 = fluid.layers.fc(input=fc1, size=4096, act=None) | ||
return fc2 | ||
|
||
|
||
def run_benchmark(args): | ||
# Train program | ||
images = fluid.layers.fill_constant( | ||
shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01) | ||
predict = vgg16_bn_drop(images) | ||
label = fluid.layers.fill_constant( | ||
shape=(args.batch_size, 1), dtype='int64', value=0) | ||
cost = fluid.layers.cross_entropy(input=predict, label=label) | ||
avg_cost = fluid.layers.mean(x=cost) | ||
|
||
# Optimization | ||
# Note the flag append_all_reduce=True | ||
opt = fluid.optimizer.SGDOptimizer( | ||
learning_rate=0.001, append_all_reduce=True) | ||
opt.minimize(avg_cost) | ||
|
||
# program_summary(fluid.default_main_program()) | ||
|
||
exe = fluid.ParallelExecutor( | ||
gpu_list=range(fluid.core.get_cuda_device_count())) | ||
|
||
# Parameter initialization | ||
exe.run(fluid.default_startup_program()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, parameter initialization and other startup computations are also done in parallel N times? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
|
||
for iter_id in range(0, args.iterations): | ||
start = time.time() | ||
outs = exe.run(fluid.default_main_program(), | ||
feed={}, | ||
fetch_list=[avg_cost, predict, cost]) | ||
loss = np.array(outs[0][0]) | ||
|
||
end = time.time() | ||
print("iter=%d, error=%f, elapse=%f" % (iter_id, loss, (end - start))) | ||
|
||
|
||
if __name__ == '__main__': | ||
args = parse_args() | ||
print_arguments(args) | ||
run_benchmark(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if append_all_reduce=True is not set, will it use other way to send gradients to each other? Or just raise exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it's not handled.