Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intergrate GLOOParallelContext to support Multi-CPU Core for Dygraph DataParallel #35154

Merged
merged 13 commits into from
Sep 8, 2021
6 changes: 6 additions & 0 deletions paddle/fluid/imperative/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ if(NOT WIN32)
endif()
cc_library(data_loader SRCS data_loader.cc DEPS enforce)
endif(NOT WIN32)
if(WITH_GLOO)
cc_library(imperative_gloo_context SRCS gloo_context.cc DEPS collective_helper device_context tensor var_type_traits)
if ( WIN32 OR (NOT (WITH_NCCL OR WITH_RCCL OR WITH_XPU_BKCL) ))
cc_library(reducer SRCS reducer.cc DEPS layer)
endif()
endif()

cc_library(gradient_accumulator SRCS gradient_accumulator.cc DEPS blas operator lod_tensor selected_rows selected_rows_functor var_type_traits layer math_function)

Expand Down
109 changes: 109 additions & 0 deletions paddle/fluid/imperative/gloo_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/imperative/gloo_context.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/split.h"

namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle

namespace paddle {
namespace imperative {

void GLOOParallelContext::Init() {
// PADDLE_THROW(platform::errors::OutOfRange(
// "Still not implement Init"));
VLOG(4) << "Start GLOOParallelContext initialization";
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
gloo_wrapper->SetSize(strategy_.nranks_);
gloo_wrapper->SetRank(strategy_.local_rank_);
gloo_wrapper->SetPrefix("");
gloo_wrapper->SetIface("lo");
auto addr = paddle::string::Split(strategy_.trainer_endpoints_[0], ':');
VLOG(4) << "Server is" << strategy_.trainer_endpoints_[0];
std::string host = addr[0];
int port = std::stoi(addr[1]);
gloo_wrapper->SetHttpStore(host, port, "worker");
gloo_wrapper->Init();
device_ = std::unique_ptr<platform::CPUDeviceContext>(
new platform::CPUDeviceContext(platform::CPUPlace()));
}

void GLOOParallelContext::InitWithRingID(int ring_id) {
PADDLE_THROW(
platform::errors::OutOfRange("Still not implement InitWithRingID"));
}

#define GLOO_CASE(type, T, gw) \
case type: { \
VLOG(4) << "Use the gloo all reduce to sync. SRC:" << src_tensor; \
std::vector<T> send_vector##T; \
framework::TensorToVector<T>(src_tensor, &send_vector##T); \
auto recv_vector##T = gw->AllReduce<T>(send_vector##T); \
framework::TensorFromVector<T>(recv_vector##T, dst_tensor); \
VLOG(4) << "DST:" << *dst_tensor; \
break; \
}

void GLOOParallelContext::AllReduceByStream(const framework::Variable &src,
framework::Variable *dst,
int ring_id, bool use_calc_stream) {
// AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
auto src_tensor = src.Get<framework::LoDTensor>();
auto *dst_tensor = dst->GetMutable<framework::LoDTensor>();
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
dst_tensor->Resize(src_tensor.dims());
switch (src_tensor.type()) {
GLOO_CASE(framework::proto::VarType::FP32, float, gloo_wrapper);
GLOO_CASE(framework::proto::VarType::FP64, double, gloo_wrapper);
GLOO_CASE(framework::proto::VarType::INT32, int, gloo_wrapper);
GLOO_CASE(framework::proto::VarType::INT64, int64_t, gloo_wrapper);
default: {
PADDLE_THROW(
platform::errors::InvalidArgument("Invalid datatype for allreduce"));
}
}
gloo_wrapper->Barrier();
}

paddle::platform::DeviceContext *GLOOParallelContext::GetDeviceContext(
int ring_id) {
// return the CPUDeviceContext
return device_.get();
}

void GLOOParallelContext::WaitCompute(int ring_id) {
// do nothing because cpu don't need sync
return;
}

void GLOOParallelContext::WaitComm(int ring_id) {
// do nothing because cpu don't need sync
return;
}

void GLOOParallelContext::SynchronizeCompute() {
// do nothing because cpu don't need sync
return;
}

} // namespace imperative
} // namespace paddle
60 changes: 60 additions & 0 deletions paddle/fluid/imperative/gloo_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/imperative/parallel_context.h"
#include "paddle/fluid/platform/device_context.h"

namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle

namespace paddle {
namespace imperative {

class GLOOParallelContext : public ParallelContext {
public:
explicit GLOOParallelContext(const ParallelStrategy& strategy,
const platform::Place& place)
: ParallelContext(strategy, place) {}

~GLOOParallelContext() override = default;

void Init() override;

void InitWithRingID(int ring_id) override;

void AllReduceByStream(const framework::Variable& src,
framework::Variable* dst, int ring_id,
bool use_calc_stream) override;

paddle::platform::DeviceContext* GetDeviceContext(int ring_id) override;

void WaitCompute(int ring_id) override;

void WaitComm(int ring_id) override;

void SynchronizeCompute() override;

private:
std::unique_ptr<platform::CPUDeviceContext> device_;
};

} // namespace imperative
} // namespace paddle
17 changes: 11 additions & 6 deletions paddle/fluid/imperative/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace paddle {
namespace imperative {

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
// div the nranks
void Group::DivNRanks(const platform::DeviceContext &context, int64_t nranks) {
framework::Tensor *tensor =
Expand All @@ -42,9 +42,12 @@ void Group::DivNRanks(const platform::DeviceContext &context, int64_t nranks) {
DivNRanks(tensor, nranks, context);
#endif
} else if (platform::is_cpu_place(tensor->place())) {
VLOG(4) << "before div 2" << *tensor;
VLOG(4) << "NDiv for cpu devices : rank = " << nranks;
framework::VisitDataTypeSmall(
dtype_, DivNRanksForAllReduce<platform::CPUDeviceContext>(
tensor, nranks, context));
VLOG(4) << "after div 2" << *tensor;
} else if (platform::is_xpu_place(tensor->place())) {
#ifdef PADDLE_WITH_XPU_BKCL
// TODO(liuyuhui) support xpu about div nranks in the future
Expand Down Expand Up @@ -758,8 +761,8 @@ void Reducer::MarkGroupReady(size_t group_index) {

for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0;
++next_group_) {
auto &group = groups_[next_group_];
const int run_order = next_group_ % nrings_;
UNUSED auto &group = groups_[next_group_];
UNUSED const int run_order = next_group_ % nrings_;

// For CUDA or XPU, compute_stream --> comm_stream.
// For CPU, do nothing.
Expand All @@ -786,11 +789,12 @@ void Reducer::MarkGroupReady(size_t group_index) {
cv_.notify_all();
}
});
#elif defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)
#elif defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) || \
defined(PADDLE_WITH_GLOO)
FusedAllReduceSchedule(run_order, group, next_group_);
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Not compiled with BKCL or NCCL."));
"Not compiled with BKCL or NCCL or GLOO."));
#endif
}
}
Expand Down Expand Up @@ -963,7 +967,8 @@ void Reducer::FinalizeBackward() {

if (find_unused_vars_each_step_) {
// TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_GLOO)
ProcessUnusedDenseVars();
#endif
// Initialize local used vars
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/imperative/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace paddle {
namespace imperative {

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)

template <typename T>
struct DivNRanksFunctor {
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/pybind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ endif()
if(WITH_GLOO)
set(PYBIND_DEPS ${PYBIND_DEPS} gloo_context)
set(PYBIND_SRCS ${PYBIND_SRCS} gloo_context_py.cc)
set(PYBIND_DEPS ${PYBIND_DEPS} imperative_gloo_context)
set(PYBIND_DEPS ${PYBIND_DEPS} reducer)
endif(WITH_GLOO)

if (WITH_CRYPTO)
Expand Down
17 changes: 16 additions & 1 deletion paddle/fluid/pybind/imperative.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ limitations under the License. */
#include "paddle/fluid/imperative/basic_engine.h"
#include "paddle/fluid/imperative/bkcl_context.h"
#include "paddle/fluid/imperative/data_loader.h"
#include "paddle/fluid/imperative/gloo_context.h"
#include "paddle/fluid/imperative/hooks.h"
#include "paddle/fluid/imperative/layer.h"
#include "paddle/fluid/imperative/nccl_context.h"
Expand Down Expand Up @@ -1887,7 +1888,7 @@ void BindImperative(py::module *m_ptr) {
py::call_guard<py::gil_scoped_release>());

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
py::class_<imperative::ParallelContext,
std::shared_ptr<imperative::ParallelContext>>(m,
"ParallelContext");
Expand Down Expand Up @@ -1932,6 +1933,20 @@ void BindImperative(py::module *m_ptr) {
&imperative::BKCLParallelContext::InitWithRingID,
py::arg("ring_id"));
#endif

#if defined(PADDLE_WITH_GLOO)
// xiongkun
py::class_<imperative::GLOOParallelContext, imperative::ParallelContext,
std::shared_ptr<imperative::GLOOParallelContext>>(
m, "GLOOParallelContext")
.def(py::init<const imperative::ParallelStrategy &,
const platform::CPUPlace &>())
.def("init", [](imperative::GLOOParallelContext &self) { self.Init(); })
.def("init_with_ring_id",
&imperative::GLOOParallelContext::InitWithRingID,
py::arg("ring_id"));
#endif

m.def("pylayer_apply",
[](const platform::CPUPlace &place, const py::object &cls,
const py::args args, const py::kwargs kwargs) {
Expand Down
Loading