Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Speed] feature/ParallelExecutor #8891

Closed
25 changes: 25 additions & 0 deletions doc/design/parallel_executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# ParallelExecutor Design Doc

## Background

We use parallel_do to describe the multi-GPU training. However, this approach would
introduce a large number of dependencies at initializer, backward, optimizer and memory
optimizer. Adding device information could solve this problem and but for the time being,
we introduce ParallelExecutor as a python wrapper of Executor.

## Design
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Briefly describe the plan for model save/restore?


#### API

We don't expose `scope` as in the run interface because we use it to maintaining the inter-device
variables. For example

1. NCCL communicator
1. Data reader(?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need "?", if so probably need to explain it.


We don't expose feed in parallel_do, because it is time consuming to split feed var
onto different devices, while the whole point of implementing parallel_executor is the speed.

#### Python level thread

#### optimize with allreduce op
8 changes: 8 additions & 0 deletions paddle/fluid/operators/nccl/nccl_gpu_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,13 @@ const std::vector<ncclComm_t>& Communicator::comms() const {
return *global_comms;
}

const char* GlobalNCCLCommunicatorName() { return "__nccl_all_reduce_com__"; }

void InitNCCLCom(framework::Scope* s, std::vector<int> gpus) {
auto com = s->Var(GlobalNCCLCommunicatorName())
->GetMutable<platform::Communicator>();
com->InitAll(gpus);
}

} // namespace platform
} // namespace paddle
5 changes: 5 additions & 0 deletions paddle/fluid/operators/nccl/nccl_gpu_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License. */
#include <unordered_map>
#include <vector>

#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h"
Expand All @@ -41,5 +42,9 @@ struct Communicator {
const std::vector<ncclComm_t>& comms() const;
};

const char* GlobalNCCLCommunicatorName();

void InitNCCLCom(framework::Scope*, std::vector<int>);

} // namespace platform
} // namespace paddle
2 changes: 1 addition & 1 deletion paddle/fluid/pybind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ if(WITH_PYTHON)
DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method
${GLOB_OP_LIB})
if(NOT APPLE AND NOT ANDROID)
target_link_libraries(paddle_pybind rt)
target_link_libraries(paddle_pybind rt pybind)
endif(NOT APPLE AND NOT ANDROID)
endif(WITH_PYTHON)
11 changes: 8 additions & 3 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,11 @@ All parameter, weight, gradient are variables in Paddle.

py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>())
.def("run",
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
Executor::Run);
.def("run", [](framework::Executor &self, const ProgramDesc &p, Scope *s,
int i, bool b1, bool b2) -> void {
py::gil_scoped_release release_guard;
self.Run(p, s, i, b1, b2);
});

m.def("init_gflags", framework::InitGflags);
m.def("init_glog", framework::InitGLOG);
Expand Down Expand Up @@ -450,6 +452,9 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_CUDA
m.def("get_cuda_device_count", platform::GetCUDADeviceCount);

m.def("get_nccl_com_name", &platform::GlobalNCCLCommunicatorName);
m.def("init_nccl_com", &platform::InitNCCLCom);

m.def("nvprof_init", platform::CudaProfilerInit);
m.def("nvprof_start", platform::CudaProfilerStart);
m.def("nvprof_stop", platform::CudaProfilerStop);
Expand Down
4 changes: 3 additions & 1 deletion python/paddle/fluid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# import all class inside executor into fluid module
import executor
from executor import *
import parallel_executor
from parallel_executor import *

import io
import evaluator
Expand All @@ -43,7 +45,7 @@

Tensor = LoDTensor

__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + [
__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + parallel_executor.__all__ + [
'io',
'initializer',
'layers',
Expand Down
32 changes: 31 additions & 1 deletion python/paddle/fluid/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from collections import defaultdict

import framework
import core
import layers
from backward import append_backward
from framework import program_guard
Expand All @@ -35,7 +36,11 @@ class Optimizer(object):
but need to use one of it's implementation.
"""

def __init__(self, learning_rate, regularization=None):
def __init__(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments for the arguments?

learning_rate,
global_step=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to put global step next to avoid breaking exiting users?

regularization=None,
append_all_reduce=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this to be an implementation detail. Can we avoid exposing it to user?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

if not isinstance(learning_rate, float) and \
not isinstance(learning_rate, framework.Variable):
raise TypeError("learning rate should be float or Variable")
Expand All @@ -53,6 +58,7 @@ def __init__(self, learning_rate, regularization=None):
# {accum_name : { paramter_name : accumulator_for_parameter, ...}, ...}
self._accumulators = defaultdict(lambda: dict())
self.helper = None
self.append_all_reduce = append_all_reduce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a public attribute?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be private.


def _create_global_learning_rate(self):
lr = self.global_learning_rate()
Expand Down Expand Up @@ -219,6 +225,30 @@ def minimize(self,
"""
params_grads = append_backward(loss, parameter_list, no_grad_set,
[error_clip_callback])
if self.append_all_reduce:
global_block = loss.block.program.global_block()
dummy_communicator = global_block.create_var(
type=core.VarDesc.VarType.RAW,
name=str(core.get_nccl_com_name()))
for (_, grad) in params_grads:
if grad is None:
continue
all_reduced_var = global_block.create_var(
name=grad.name + '__nccl_all_reduce__')
global_block.append_op(
type='ncclAllReduce',
inputs={'X': [grad],
"Communicator": [dummy_communicator]},
outputs={'Out': [all_reduced_var]},
attrs={'reduction': 'ncclSum'})
global_block.append_op(
type="assign",
inputs={"X": [all_reduced_var]},
outputs={"Out": [grad]})

# The real nccl_com will be created by the parallel executor.
# Change the nccl_com name in the blockDesc to dummy
dummy_communicator.name = "__nccl_com_dummy__"

params_grads = append_gradient_clip_ops(params_grads)

Expand Down
83 changes: 83 additions & 0 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
from threading import Lock

import executor
import time
from . import core
from threading import Thread
from Queue import Queue
from core import CUDAPlace

__all__ = ['ParallelExecutor']

print_lock = Lock()


def save_print(*args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is save_print and pretty_id_indent necessary? They are not used in this PR.

with print_lock:
print(time.time(), *args, **kwargs)


def pretty_id_indent(idx):
return '\t' * idx * 4 + str(idx) + ":"


def run_exe(q, idx, exe, program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy):
q.put((idx, exe.run(program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy)))


class ParallelExecutor(object):
def __init__(self, gpu_list):
self.executors = []
for gpu_id in gpu_list:
self.executors.append(executor.Executor(CUDAPlace(gpu_id)))

self.scope = core.Scope()
self.scopes = {}
for idx, _ in enumerate(self.executors):
self.scopes[idx] = self.scope.new_scope()

core.init_nccl_com(self.scope, gpu_list)

def run(self,
program=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to run part of the program in multi-gpus and keep others in 1 gpu or cpu

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.

feed=None,
Copy link
Contributor

@panyx0718 panyx0718 Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the design doc, you said feed won't be exposed?

fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch',
scope=None,
return_numpy=True):
# TODO(helin): split input
q = Queue(maxsize=len(self.executors))
for idx, exe in enumerate(self.executors):
cur_scope = self.scopes[idx]
t = Thread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about doing multi-threading in Python for such important computation, but maybe I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think need an Executor written in c++ and wrap it in python, so the thread in CPU can use multiple cores.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero Right. A design of C++ ParallelExecutor will be submitted in a separate PR.

target=run_exe,
args=(q, idx, exe, program, feed, fetch_list, feed_var_name,
fetch_var_name, cur_scope, return_numpy))
t.daemon = True
t.start()

results = []
for _ in self.executors:
results.append(q.get())

results.sort(key=lambda x: x[0])
# TODO(helin): concat output
return results[0][1]
129 changes: 129 additions & 0 deletions python/paddle/fluid/tests/book/parallel_executor_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import argparse
import time

import paddle.fluid as fluid

SEED = 1
DTYPE = "float32"

# random seed must set before configuring the network.
# fluid.default_startup_program().random_seed = SEED


def parse_args():
parser = argparse.ArgumentParser("mnist model benchmark.")
parser.add_argument(
'--label_size', type=int, default=10, help='The label size.')
parser.add_argument(
'--batch_size', type=int, default=10, help='The minibatch size.')
parser.add_argument(
'--iterations', type=int, default=5, help='The number of minibatches.')
parser.add_argument(
'--use_nccl',
default=False,
action='store_true',
help='If set, use nccl')
args = parser.parse_args()
return args


def print_arguments(args):
print('----------- Configuration Arguments -----------')
for arg, value in sorted(vars(args).iteritems()):
print('%s: %s' % (arg, value))
print('------------------------------------------------')


def program_summary(program):
print("--------------------")
for block in program.blocks:
for op in block.ops:
outputs = [[x + ":"] + op.output(x) for x in op.output_names]
inputs = [[x + ":"] + op.input(x) for x in op.input_names]
print(block.idx, op.type, inputs, "|", outputs)


def vgg16_bn_drop(input):
def conv_block(input, num_filter, groups, dropouts):
return fluid.nets.img_conv_group(
input=input,
pool_size=2,
pool_stride=2,
conv_num_filter=[num_filter] * groups,
conv_filter_size=3,
conv_act='relu',
conv_with_batchnorm=False,
conv_batchnorm_drop_rate=dropouts,
pool_type='max')

conv1 = conv_block(input, 64, 2, [0.3, 0])
conv2 = conv_block(conv1, 128, 2, [0.4, 0])
conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0])
conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0])
conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0])

# drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove the commented code?

fc1 = fluid.layers.fc(input=conv5, size=4096, act=None)
# bn = fluid.layers.batch_norm(input=fc1, act='relu')
# drop2 = fluid.layers.dropout(x=fc1, dropout_prob=0.5)
fc2 = fluid.layers.fc(input=fc1, size=4096, act=None)
return fc2


def run_benchmark(args):
# Train program
images = fluid.layers.fill_constant(
shape=(args.batch_size, 3, 200, 200), dtype='float32', value=0.01)
predict = vgg16_bn_drop(images)
label = fluid.layers.fill_constant(
shape=(args.batch_size, 1), dtype='int64', value=0)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)

# Optimization
# Note the flag append_all_reduce=True
opt = fluid.optimizer.SGDOptimizer(
learning_rate=0.001, append_all_reduce=True)
opt.minimize(avg_cost)

# program_summary(fluid.default_main_program())

exe = fluid.ParallelExecutor(gpu_list=[0, 1])

# Parameter initialization
exe.run(fluid.default_startup_program())
Copy link
Contributor

@panyx0718 panyx0718 Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, parameter initialization and other startup computations are also done in parallel N times?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


for iter_id in range(0, args.iterations):
start = time.time()
outs = exe.run(fluid.default_main_program(),
feed={},
fetch_list=[avg_cost, predict, cost])
loss = np.array(outs[0])

end = time.time()
print("iter=%d, error=%f, elapse=%f" % (iter_id, loss, (end - start)))


if __name__ == '__main__':
args = parse_args()
print_arguments(args)
run_benchmark(args)