-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/insert reduce_op to parallel exe #10096
Feature/insert reduce_op to parallel exe #10096
Conversation
…ngduoZH/Paddle into feature/add_reduce_op_handle
…ngduoZH/Paddle into feature/insert_reduce_to_parallel_exe
cf3a209
to
a22d385
Compare
f9eafab
to
898b196
Compare
… feature/insert_reduce_to_parallel_exe
f6113f6
to
9aa2b65
Compare
aa04680
to
f5749fe
Compare
… feature/insert_reduce_to_parallel_exe
f5749fe
to
cec94e1
Compare
b932a66
to
1389575
Compare
1389575
to
20ba594
Compare
b4f0207
to
8b0adbe
Compare
… feature/insert_reduce_to_parallel_exe
8b0adbe
to
f965e9a
Compare
… feature/insert_reduce_to_parallel_exe
9dd6517
to
7b58d47
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you further verify the performance improvements on different environments? The improvement seems not obvious in 4card-8thread condition, while this change introduce significant complexity?
call(); | ||
} | ||
} | ||
if (*out_handle != *in_var_handle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is out_handle? What is the usage of this code block? Can you add some comments to explain this block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcast
is to broadcast the root's data to all the devices except itself, so if the input and output of root device are different, we need to add the copying operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is root device and what is input and output of root device?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the description above is a bit obscure. I regarded the source device, which will send the data to other devices, as the root node.
The function of broadcast_op
is sending input data to all the devices, so there must exist one of output and input is in the same device, if the value(tensor or seleceted_rows) of them is not equal, we should do the memory copying manually.
BroadcastOpHandle(const std::vector<Scope *> &local_scopes, | ||
const std::vector<platform::Place> &places); | ||
const std::vector<platform::Place> &places, bool use_nccl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use_nccl_ should be false and we don't need use_nccl argument here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BroadcastOp
also can broadcast CPU data which can not use nccl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, that's why we don't need use_nccl argument here for CPU case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing out, in order to compare the performance between memory copying and ncclBcast, I add this parameter, the result is ncclBcast is faster. So use_nccl can be removed.
#endif | ||
|
||
if (use_gpu_) { | ||
#ifndef PADDLE_WITH_CUDA |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are so many ifdef here, can we improve this block of codes?
// Wait input done, this Wait is asynchronous operation | ||
WaitInputVarGenerated(in_var_handles); | ||
|
||
std::vector<int64_t> out_rows; | ||
std::vector<Tensor> in_tensors; | ||
std::vector<platform::Place> in_places; | ||
// std::vector<platform::Place> in_places; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
in_places.push_back(in_p); | ||
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(), | ||
"Places must be all on CPU or all on CUDA."); | ||
// in_places.push_back(in_handle->place_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
std::vector<std::unordered_set<std::string>> var_name_on_devices; | ||
std::vector<std::unordered_set<std::string>> bcast_var_name_set; | ||
|
||
var_name_on_devices.resize(places_.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be reserve()? Is it different from resize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer : https://stackoverflow.com/questions/7397768/choice-between-vectorresize-and-vectorreserve
The resize() method (and passing argument to constructor is equivalent to that) will insert or delete appropriate number of elements to the vector to make it given size (it has optional second argument to specify their value). It will affect the size(), iteration will go over all those elements, push_back will insert after them and you can directly access them using the operator[].
The reserve() method only allocates memory, but leaves it uninitialized. It only affects capacity(), but size() will be unchanged. There is no value for the objects, because nothing is added to the vector. If you then insert the elements, no reallocation will happen, because it was done in advance, but that's the only effect.
@@ -55,7 +55,7 @@ struct ReduceOpHandle : public OpHandleBase { | |||
|
|||
std::string Name() const override; | |||
|
|||
bool IsMultiDeviceTransfer() override { return false; }; | |||
bool IsMultiDeviceTransfer() override { return true; }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not so sure about this now that many things have changed.
@@ -28,7 +28,9 @@ def __init__(self, | |||
loss_name=None, | |||
main_program=None, | |||
num_threads=None, | |||
threads_per_dev=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems confusing to have both num_threads and threads_per_dev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
auto dev_ctx = dev_ctxes_.at(out_p); | ||
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] { | ||
paddle::framework::TensorCopy( | ||
in_tensor, out_p, *(dev_ctx), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for () for dex_ctx?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have removed.
auto *out = out_var_handles[j]; | ||
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_); | ||
|
||
if (*out != *in_var_handle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you comparing VarHandle instead of VarHandle*? Is it correct?
Hopefully, we don't use "auto" everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you comparing VarHandle instead of VarHandle*?
line 90 compares the instance but not the point.
Is it correct?
The point of input (in_var_handle) and output(out) must be different because the output varHandle is always created. code
Hopefully, we don't use "auto" everywhere.
Thanks, I will correct later.
} | ||
|
||
int type = platform::ToNCCLDataType(in_tensor.type()); | ||
all_reduce_calls.emplace_back([=] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is "broadcast" not "all_reduce"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks!
call(); | ||
} | ||
} | ||
if (*out_handle != *in_var_handle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is root device and what is input and output of root device?
BroadcastOpHandle(const std::vector<Scope *> &local_scopes, | ||
const std::vector<platform::Place> &places); | ||
const std::vector<platform::Place> &places, bool use_nccl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, that's why we don't need use_nccl argument here for CPU case?
@@ -30,7 +30,8 @@ def __init__(self, | |||
num_threads=None, | |||
allow_op_delay=False, | |||
share_vars_from=None, | |||
customize_loss_grad=False): | |||
customize_loss_grad=False, | |||
use_nccl_allreduce=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -565,24 +568,31 @@ def test_parallel_testing(self): | |||
depth = 8 | |||
mix_hidden_lr = 1e-3 | |||
embedding_name = 'emb' | |||
is_sparse = True | |||
use_nccl_allreduce = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it cover is_sparse=False and use_nccl_allreduce=True?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will add unit test.
|
||
bool operator!=(const VarHandle& o) const { | ||
return o.generated_op_ != generated_op_ || o.name_ != name_ || | ||
o.scope_idx_ != scope_idx_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about version_ and place_? this seems dangerous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this operator is used in here,
if (*out != *in_var_handle)
The version_
of them can be different, but the place_
.....
Let me think about it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is no need to implement !=
manually. You can just invoke return this->operator==(o);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
55d542c
to
be5a84b
Compare
be5a84b
to
ea78be2
Compare
… feature/insert_reduce_to_parallel_exe
93daa01
to
c0a3746
Compare
This PR's work includes:
nccl_Bcast
, otherwise, it will calltensor copying
.nccl_Bcast
ortensor copying
according to the input's place.I compared the performance of before and after optimization, the result is: (sec/batch)
This is to say, parallel_exe supports updating sparse parameter now, meanwhile, the performance of parallel_exe is Improved.