-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #9946 from chengduoZH/feature/add_reduce_op_handle
Feature/add reduce op handle
- Loading branch information
Showing
6 changed files
with
609 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#pragma once | ||
#include <algorithm> | ||
#include <map> | ||
#include <vector> | ||
#include "paddle/fluid/framework/details/reduce_and_gather.h" | ||
#include "paddle/fluid/framework/lod_tensor.h" | ||
#include "paddle/fluid/framework/selected_rows.h" | ||
namespace paddle { | ||
namespace framework { | ||
namespace details { | ||
|
||
struct ReduceLoDTensor { | ||
const std::vector<LoDTensor> &src_tensors_; | ||
LoDTensor &dst_tensor_; | ||
|
||
ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst) | ||
: src_tensors_(src), dst_tensor_(*dst) {} | ||
|
||
template <typename T> | ||
void operator()() const { | ||
PADDLE_ENFORCE(!src_tensors_.empty()); | ||
auto &t0 = src_tensors_[0]; | ||
PADDLE_ENFORCE_NE(t0.numel(), 0); | ||
dst_tensor_.Resize(t0.dims()); | ||
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace()); | ||
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst); | ||
|
||
for (size_t i = 1; i < src_tensors_.size(); ++i) { | ||
auto &t = src_tensors_[i]; | ||
PADDLE_ENFORCE_EQ(t.dims(), t0.dims()); | ||
PADDLE_ENFORCE_EQ(t.type(), t0.type()); | ||
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst, | ||
[](T a, T b) -> T { return a + b; }); | ||
} | ||
} | ||
}; | ||
|
||
inline void GatherSelectedRows( | ||
const std::vector<const SelectedRows *> &src_selecte_rows_, | ||
const std::vector<platform::Place> &in_places, | ||
const std::unordered_map<platform::Place, platform::DeviceContext *, | ||
platform::PlaceHash> &dev_ctxes, | ||
const platform::Place &out_place, SelectedRows *dst_selecte_rows) { | ||
PADDLE_ENFORCE(!src_selecte_rows_.empty()); | ||
|
||
std::vector<Tensor> in_tensors; | ||
std::vector<int64_t> out_rows; | ||
|
||
for (auto in_sr_ptr : src_selecte_rows_) { | ||
auto &in_sr = *in_sr_ptr; | ||
in_tensors.emplace_back(in_sr.value()); | ||
out_rows.insert(out_rows.end(), in_sr.rows().begin(), in_sr.rows().end()); | ||
} | ||
|
||
auto &pre_in = src_selecte_rows_[0]; | ||
|
||
auto &dst_tensor = *dst_selecte_rows; | ||
dst_tensor.set_height(pre_in->height()); | ||
dst_tensor.set_rows(out_rows); | ||
size_t rows = out_rows.size(); | ||
DDim out_dim = pre_in->GetCompleteDims(); | ||
out_dim[0] = static_cast<int64_t>(rows); | ||
dst_tensor.mutable_value()->Resize(out_dim); | ||
dst_tensor.mutable_value()->mutable_data(out_place, pre_in->value().type()); | ||
Tensor *out_tensor = dst_tensor.mutable_value(); | ||
|
||
// copy | ||
int s = 0, e = 0; | ||
for (size_t j = 0; j < in_tensors.size(); ++j) { | ||
e += in_tensors[j].dims()[0]; | ||
auto sub_out = out_tensor->Slice(s, e); | ||
paddle::framework::TensorCopy(in_tensors[j], out_place, | ||
*(dev_ctxes.at(in_places[j])), &sub_out); | ||
s = e; | ||
} | ||
} | ||
|
||
} // namespace details | ||
} // namespace framework | ||
} // namespace paddle |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include "paddle/fluid/framework/details/reduce_op_handle.h" | ||
#include "paddle/fluid/framework/details/reduce_and_gather.h" | ||
|
||
namespace paddle { | ||
namespace framework { | ||
namespace details { | ||
|
||
void ReduceOpHandle::RunImpl() { | ||
// the input and output may have dummy var. | ||
std::vector<VarHandle *> in_var_handles = GetValidVarHandles(inputs_); | ||
std::vector<VarHandle *> out_var_handles = GetValidVarHandles(outputs_); | ||
|
||
PADDLE_ENFORCE_EQ( | ||
in_var_handles.size(), places_.size(), | ||
"The number of output should equal to the number of places."); | ||
PADDLE_ENFORCE_EQ(out_var_handles.size(), 1, | ||
"The number of output should be one."); | ||
|
||
// Wait input done, this Wait is asynchronous operation | ||
WaitEvents(in_var_handles); | ||
|
||
// check in the same place | ||
auto in_0_handle = in_var_handles[0]; | ||
auto pre_place = in_0_handle->place_; | ||
|
||
std::vector<platform::Place> in_places; | ||
for (auto *in_handle : in_var_handles) { | ||
auto in_p = in_handle->place_; | ||
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(), | ||
"Places must be all on CPU or all on CUDA."); | ||
in_places.emplace_back(in_p); | ||
} | ||
|
||
auto out_var = local_scopes_[out_var_handles[0]->scope_idx_]->FindVar( | ||
out_var_handles[0]->name_); | ||
|
||
auto pre_in_var = | ||
local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_); | ||
|
||
if (pre_in_var->IsType<framework::SelectedRows>()) { | ||
auto &pre_in = pre_in_var->Get<framework::SelectedRows>(); | ||
std::vector<const SelectedRows *> in_selected_rows; | ||
|
||
for (auto *in_handle : in_var_handles) { | ||
auto in_var = | ||
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_); | ||
auto &in_sr = in_var->Get<framework::SelectedRows>(); | ||
|
||
PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(), | ||
"The type of input is not consistent."); | ||
|
||
in_selected_rows.emplace_back(&in_sr); | ||
} | ||
auto trg = out_var->GetMutable<framework::SelectedRows>(); | ||
GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, | ||
out_var_handles[0]->place_, trg); | ||
} else { | ||
auto pre_in = pre_in_var->Get<framework::LoDTensor>(); | ||
std::vector<LoDTensor> lod_tensors; | ||
|
||
// can be refined | ||
for (auto *in_handle : in_var_handles) { | ||
auto in_var = | ||
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_); | ||
auto &in_sr = in_var->Get<framework::LoDTensor>(); | ||
|
||
PADDLE_ENFORCE_EQ(in_sr.type(), pre_in.type(), | ||
"The type of input is not consistent."); | ||
|
||
lod_tensors.emplace_back(in_sr); | ||
} | ||
|
||
auto trg = out_var->GetMutable<framework::LoDTensor>(); | ||
trg->Resize(pre_in.dims()); | ||
trg->mutable_data(out_var_handles[0]->place_, pre_in.type()); | ||
|
||
if (paddle::platform::is_cpu_place(pre_place)) { | ||
ReduceLoDTensor func(lod_tensors, trg); | ||
VisitDataType(ToDataType(lod_tensors[0].type()), func); | ||
} else if (paddle::platform::is_gpu_place(pre_place)) { | ||
#ifdef PADDLE_WITH_CUDA | ||
auto out_p = out_var_handles[0]->place_; | ||
int root = boost::get<platform::CUDAPlace>(out_p).device; | ||
|
||
std::vector<std::function<void()>> all_reduce_calls; | ||
for (size_t i = 0; i < local_scopes_.size(); ++i) { | ||
auto &p = in_places[i]; | ||
auto &lod_tensor = lod_tensors[i]; | ||
|
||
int dev_id = boost::get<platform::CUDAPlace>(p).device; | ||
auto &nccl_ctx = nccl_ctxs_->at(dev_id); | ||
auto stream = nccl_ctx.stream(); | ||
auto comm = nccl_ctx.comm_; | ||
|
||
void *buffer = const_cast<void *>(lod_tensor.data<void>()); | ||
void *recvbuffer = nullptr; | ||
if (root == dev_id) { | ||
recvbuffer = trg->mutable_data(out_var_handles[0]->place_); | ||
} | ||
|
||
all_reduce_calls.emplace_back([=] { | ||
PADDLE_ENFORCE(platform::dynload::ncclReduce( | ||
buffer, recvbuffer, static_cast<size_t>(lod_tensor.numel()), | ||
platform::ToNCCLDataType(lod_tensor.type()), ncclSum, root, comm, | ||
stream)); | ||
}); | ||
} | ||
|
||
this->RunAndRecordEvent([&] { | ||
platform::NCCLGroupGuard guard; | ||
for (auto &call : all_reduce_calls) { | ||
call(); | ||
} | ||
}); | ||
#else | ||
PADDLE_THROW("CUDA is not support."); | ||
#endif | ||
} else { | ||
PADDLE_THROW("Place should be CPUPlace or CUDAPlace."); | ||
} | ||
} | ||
} | ||
|
||
void ReduceOpHandle::WaitEvents( | ||
const std::vector<VarHandle *> &in_var_handles) { | ||
if (in_var_handles[0]->generated_op_) { | ||
for (auto *in : in_var_handles) { | ||
in_var_handles[0]->generated_op_->Wait(dev_ctxes_[in->place_]); | ||
} | ||
} | ||
} | ||
|
||
std::vector<VarHandle *> ReduceOpHandle::GetValidVarHandles( | ||
const std::vector<VarHandleBase *> &inputs) { | ||
std::vector<VarHandle *> in_var_handles; | ||
for (auto *in : inputs) { | ||
auto *in_handle = dynamic_cast<VarHandle *>(in); | ||
if (in_handle) { | ||
in_var_handles.push_back(in_handle); | ||
} | ||
} | ||
return in_var_handles; | ||
} | ||
std::string ReduceOpHandle::Name() const { return "reduce"; } | ||
} // namespace details | ||
} // namespace framework | ||
} // namespace paddle |
Oops, something went wrong.