-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Speed] feature/ParallelExecutor #8891
Changes from 10 commits
0770935
c5b8d29
540410b
ae7e456
85b1176
ae83f50
20d0f97
9689df5
ef50216
6da0d89
0999119
a27e4d9
b766466
a220790
18289b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# ParallelExecutor Design Doc | ||
|
||
## Background | ||
|
||
We use parallel_do to describe the multi-GPU training. However, this approach would | ||
introduce a large number of dependencies at initializer, backward, optimizer and memory | ||
optimizer. Adding device information could solve this problem and but for the time being, | ||
we introduce ParallelExecutor as a python wrapper of Executor. | ||
|
||
## Design | ||
|
||
#### API | ||
|
||
We don't expose `scope` as in the run interface because we use it to maintaining the inter-device | ||
variables. For example | ||
|
||
1. NCCL communicator | ||
1. Data reader(?) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need "?", if so probably need to explain it. |
||
|
||
We don't expose feed in parallel_do, because it is time consuming to split feed var | ||
onto different devices, while the whole point of implementing parallel_executor is the speed. | ||
|
||
#### Python level thread | ||
|
||
#### optimize with allreduce op |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
from collections import defaultdict | ||
|
||
import framework | ||
import core | ||
import layers | ||
from backward import append_backward | ||
from framework import program_guard | ||
|
@@ -35,7 +36,11 @@ class Optimizer(object): | |
but need to use one of it's implementation. | ||
""" | ||
|
||
def __init__(self, learning_rate, regularization=None): | ||
def __init__(self, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comments for the arguments? |
||
learning_rate, | ||
global_step=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you might want to put global step next to avoid breaking exiting users? |
||
regularization=None, | ||
append_all_reduce=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel this to be an implementation detail. Can we avoid exposing it to user? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
if not isinstance(learning_rate, float) and \ | ||
not isinstance(learning_rate, framework.Variable): | ||
raise TypeError("learning rate should be float or Variable") | ||
|
@@ -53,6 +58,7 @@ def __init__(self, learning_rate, regularization=None): | |
# {accum_name : { paramter_name : accumulator_for_parameter, ...}, ...} | ||
self._accumulators = defaultdict(lambda: dict()) | ||
self.helper = None | ||
self.append_all_reduce = append_all_reduce | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this a public attribute? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be private. |
||
|
||
def _create_global_learning_rate(self): | ||
lr = self.global_learning_rate() | ||
|
@@ -219,6 +225,30 @@ def minimize(self, | |
""" | ||
params_grads = append_backward(loss, parameter_list, no_grad_set, | ||
[error_clip_callback]) | ||
if self.append_all_reduce: | ||
global_block = loss.block.program.global_block() | ||
dummy_communicator = global_block.create_var( | ||
type=core.VarDesc.VarType.RAW, | ||
name=str(core.get_nccl_com_name())) | ||
for (_, grad) in params_grads: | ||
if grad is None: | ||
continue | ||
all_reduced_var = global_block.create_var( | ||
name=grad.name + '__nccl_all_reduce__') | ||
global_block.append_op( | ||
type='ncclAllReduce', | ||
inputs={'X': [grad], | ||
"Communicator": [dummy_communicator]}, | ||
outputs={'Out': [all_reduced_var]}, | ||
attrs={'reduction': 'ncclSum'}) | ||
global_block.append_op( | ||
type="assign", | ||
inputs={"X": [all_reduced_var]}, | ||
outputs={"Out": [grad]}) | ||
|
||
# The real nccl_com will be created by the parallel executor. | ||
# Change the nccl_com name in the blockDesc to dummy | ||
dummy_communicator.name = "__nccl_com_dummy__" | ||
|
||
params_grads = append_gradient_clip_ops(params_grads) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import print_function | ||
from threading import Lock | ||
|
||
import executor | ||
import time | ||
from . import core | ||
from threading import Thread | ||
from Queue import Queue | ||
from core import CUDAPlace | ||
|
||
__all__ = ['ParallelExecutor'] | ||
|
||
print_lock = Lock() | ||
|
||
|
||
def save_print(*args, **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is |
||
with print_lock: | ||
print(time.time(), *args, **kwargs) | ||
|
||
|
||
def pretty_id_indent(idx): | ||
return '\t' * idx * 4 + str(idx) + ":" | ||
|
||
|
||
def run_exe(q, idx, exe, program, feed, fetch_list, feed_var_name, | ||
fetch_var_name, cur_scope, return_numpy): | ||
q.put((idx, exe.run(program, feed, fetch_list, feed_var_name, | ||
fetch_var_name, cur_scope, return_numpy))) | ||
|
||
|
||
class ParallelExecutor(object): | ||
def __init__(self, gpu_list): | ||
self.executors = [] | ||
for gpu_id in gpu_list: | ||
self.executors.append(executor.Executor(CUDAPlace(gpu_id))) | ||
|
||
self.scope = core.Scope() | ||
self.scopes = {} | ||
for idx, _ in enumerate(self.executors): | ||
self.scopes[idx] = self.scope.new_scope() | ||
|
||
core.init_nccl_com(self.scope, gpu_list) | ||
|
||
def run(self, | ||
program=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to run part of the program in multi-gpus and keep others in 1 gpu or cpu There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. |
||
feed=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the design doc, you said feed won't be exposed? |
||
fetch_list=None, | ||
feed_var_name='feed', | ||
fetch_var_name='fetch', | ||
scope=None, | ||
return_numpy=True): | ||
# TODO(helin): split input | ||
q = Queue(maxsize=len(self.executors)) | ||
for idx, exe in enumerate(self.executors): | ||
cur_scope = self.scopes[idx] | ||
t = Thread( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little worried about doing multi-threading in Python for such important computation, but maybe I'm wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think need an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @typhoonzero Right. A design of C++ |
||
target=run_exe, | ||
args=(q, idx, exe, program, feed, fetch_list, feed_var_name, | ||
fetch_var_name, cur_scope, return_numpy)) | ||
t.daemon = True | ||
t.start() | ||
|
||
results = [] | ||
for _ in self.executors: | ||
results.append(q.get()) | ||
|
||
results.sort(key=lambda x: x[0]) | ||
# TODO(helin): concat output | ||
return results[0][1] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import absolute_import | ||
from __future__ import division | ||
from __future__ import print_function | ||
|
||
import numpy as np | ||
import argparse | ||
import time | ||
|
||
import paddle.fluid as fluid | ||
|
||
SEED = 1 | ||
DTYPE = "float32" | ||
|
||
# random seed must set before configuring the network. | ||
# fluid.default_startup_program().random_seed = SEED | ||
|
||
|
||
def parse_args(): | ||
parser = argparse.ArgumentParser("mnist model benchmark.") | ||
parser.add_argument( | ||
'--label_size', type=int, default=10, help='The label size.') | ||
parser.add_argument( | ||
'--batch_size', type=int, default=10, help='The minibatch size.') | ||
parser.add_argument( | ||
'--iterations', type=int, default=5, help='The number of minibatches.') | ||
parser.add_argument( | ||
'--use_nccl', | ||
default=False, | ||
action='store_true', | ||
help='If set, use nccl') | ||
args = parser.parse_args() | ||
return args | ||
|
||
|
||
def print_arguments(args): | ||
print('----------- Configuration Arguments -----------') | ||
for arg, value in sorted(vars(args).iteritems()): | ||
print('%s: %s' % (arg, value)) | ||
print('------------------------------------------------') | ||
|
||
|
||
def program_summary(program): | ||
print("--------------------") | ||
for block in program.blocks: | ||
for op in block.ops: | ||
outputs = [[x + ":"] + op.output(x) for x in op.output_names] | ||
inputs = [[x + ":"] + op.input(x) for x in op.input_names] | ||
print(block.idx, op.type, inputs, "|", outputs) | ||
|
||
|
||
def vgg16_bn_drop(input): | ||
def conv_block(input, num_filter, groups, dropouts): | ||
return fluid.nets.img_conv_group( | ||
input=input, | ||
pool_size=2, | ||
pool_stride=2, | ||
conv_num_filter=[num_filter] * groups, | ||
conv_filter_size=3, | ||
conv_act='relu', | ||
conv_with_batchnorm=False, | ||
conv_batchnorm_drop_rate=dropouts, | ||
pool_type='max') | ||
|
||
conv1 = conv_block(input, 64, 2, [0.3, 0]) | ||
conv2 = conv_block(conv1, 128, 2, [0.4, 0]) | ||
conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) | ||
conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) | ||
conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) | ||
|
||
# drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove the commented code? |
||
fc1 = fluid.layers.fc(input=conv5, size=4096, act=None) | ||
# bn = fluid.layers.batch_norm(input=fc1, act='relu') | ||
# drop2 = fluid.layers.dropout(x=fc1, dropout_prob=0.5) | ||
fc2 = fluid.layers.fc(input=fc1, size=4096, act=None) | ||
return fc2 | ||
|
||
|
||
def run_benchmark(args): | ||
# Train program | ||
images = fluid.layers.fill_constant( | ||
shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01) | ||
predict = vgg16_bn_drop(images) | ||
label = fluid.layers.fill_constant( | ||
shape=(args.batch_size, 1), dtype='int64', value=0) | ||
cost = fluid.layers.cross_entropy(input=predict, label=label) | ||
avg_cost = fluid.layers.mean(x=cost) | ||
|
||
# Optimization | ||
# Note the flag append_all_reduce=True | ||
opt = fluid.optimizer.SGDOptimizer( | ||
learning_rate=0.001, append_all_reduce=True) | ||
opt.minimize(avg_cost) | ||
|
||
# program_summary(fluid.default_main_program()) | ||
|
||
exe = fluid.ParallelExecutor(gpu_list=[0, 1]) | ||
|
||
# Parameter initialization | ||
exe.run(fluid.default_startup_program()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, parameter initialization and other startup computations are also done in parallel N times? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
|
||
for iter_id in range(0, args.iterations): | ||
start = time.time() | ||
outs = exe.run(fluid.default_main_program(), | ||
feed={}, | ||
fetch_list=[avg_cost, predict, cost]) | ||
loss = np.array(outs[0]) | ||
|
||
end = time.time() | ||
print("iter=%d, error=%f, elapse=%f" % (iter_id, loss, (end - start))) | ||
|
||
|
||
if __name__ == '__main__': | ||
args = parse_args() | ||
print_arguments(args) | ||
run_benchmark(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Briefly describe the plan for model save/restore?