Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pg heter cloud #40911

Merged
merged 35 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9ba08b1
rename TensorBase interface data_type() to dtype()
zyfncg Nov 16, 2021
3c1afc0
rename type to dtype of TensorMeta
zyfncg Nov 17, 2021
288f086
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zyfncg Nov 17, 2021
701a0bd
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zyfncg Nov 17, 2021
7bc3cbb
merge the code
zyfncg Nov 17, 2021
7b79b03
merge the code
zyfncg Nov 17, 2021
471a1bf
fix the problem when merge conflict
zyfncg Nov 18, 2021
d39a1d9
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zyfncg Nov 19, 2021
835e415
fix bug of ci caused by type of tensor_meta
zyfncg Nov 19, 2021
ab60a6d
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Nov 19, 2021
471741f
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Dec 20, 2021
691056a
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Jan 20, 2022
7d68080
Merge branch 'develop' of https://github.com/sandyhouse/Paddle into d…
Mar 8, 2022
9756b09
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 8, 2022
7dc697a
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 9, 2022
d3f0397
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 15, 2022
f187384
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 16, 2022
644ef58
Merge branch 'develop' of https://github.com/sandyhouse/Paddle into d…
Mar 17, 2022
53fac52
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 17, 2022
2f12cfe
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 22, 2022
20ebc3f
update
Mar 24, 2022
92b418c
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 25, 2022
829adae
update
Mar 28, 2022
67d84d9
update
Mar 28, 2022
8922319
update
Mar 28, 2022
e73ca14
update
Mar 28, 2022
d0179a5
update
Mar 28, 2022
f4bc987
update
Mar 29, 2022
e96932e
update
Mar 29, 2022
83c6fef
update
Mar 29, 2022
0df9a69
update
Mar 29, 2022
4f0ff5d
update
Mar 29, 2022
d8197e5
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 30, 2022
01a3cf6
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Mar 30, 2022
3ef390b
update
Mar 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions paddle/fluid/distributed/collective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ if (WITH_DISTRIBUTE)
endif()

if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()

if(WITH_ASCEND_CL)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
54 changes: 54 additions & 0 deletions paddle/fluid/distributed/collective/Common.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/Common.h"

namespace paddle {
namespace distributed {

std::vector<Place> GetPlaceList(const std::vector<Tensor>& tensors) {
std::vector<Place> places;
places.reserve(tensors.size());
for (auto& tensor : tensors) {
places.push_back(tensor.inner_place());
}
return places;
}

std::string GetKeyFromPlaces(const std::vector<Place>& places) {
std::string placeList;
for (auto& place : places) {
std::stringstream tmp;
tmp << place;
if (placeList.empty()) {
placeList += tmp.str();
} else {
placeList += "," + tmp.str();
}
}
return placeList;
}

static bool CheckTensorsInPlace(const std::vector<Tensor>& tensors,
const PlaceType type) {
return std::all_of(tensors.cbegin(), tensors.cend(),
[&](const Tensor& t) { return t.place() == type; });
}

bool CheckTensorsInCudaPlace(const std::vector<Tensor>& tensors) {
return CheckTensorsInPlace(tensors, PlaceType::kGPU);
}

} // namespace distributed
} // namespace paddle
33 changes: 33 additions & 0 deletions paddle/fluid/distributed/collective/Common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
namespace paddle {
namespace distributed {

using Tensor = paddle::experimental::Tensor;

using Place = paddle::platform::Place;
// Get the list of devices from list of tensors
std::vector<Place> GetPlaceList(const std::vector<Tensor>& tensors);
// Get the deviceList String from the list of devices
std::string GetKeyFromPlaces(const std::vector<Place>& places);

bool CheckTensorsInCudaPlace(const std::vector<Tensor>& tensors);

} // namespace distributed
} // namespace paddle
46 changes: 46 additions & 0 deletions paddle/fluid/distributed/collective/HCCLTools.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h"

namespace paddle {
namespace distributed {

HcclReduceOp ToHCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, HcclReduceOp> red_type = {
{ReduceOp::MIN, HCCL_REDUCE_MIN},
{ReduceOp::MAX, HCCL_REDUCE_MAX},
{ReduceOp::SUM, HCCL_REDUCE_SUM},
{ReduceOp::PRODUCT, HCCL_REDUCE_PROD},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(
it != red_type.end(), true,
platform::errors::InvalidArgument("Invalid hccl reduction. "
"Must be Min | Max | Prod | Sum"));
return it->second;
}

std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&hcclID);
std::ostringstream oss;
for (size_t i = 0; i < sizeof(hcclID); ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}

} // namespace distributed
} // namespace paddle
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/collective/HCCLTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string>

#include "boost/variant.hpp"
#include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/collective_helper.h"
Expand Down Expand Up @@ -170,5 +171,8 @@ class HCCLCommManager {
mutable std::mutex mutex_;
};

HcclReduceOp ToHCCLRedType(ReduceOp reduction);
std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID);

} // namespace distributed
} // namespace paddle
46 changes: 46 additions & 0 deletions paddle/fluid/distributed/collective/NCCLTools.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/NCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h"

namespace paddle {
namespace distributed {

ncclRedOp_t ToNCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, ncclRedOp_t> red_type = {
{ReduceOp::MIN, ncclMin},
{ReduceOp::MAX, ncclMax},
{ReduceOp::SUM, ncclSum},
{ReduceOp::PRODUCT, ncclProd},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(it != red_type.end(), true,
platform::errors::InvalidArgument(
"Invalid nccl reduction. Must be ncclMin | ncclMax | "
"ncclProd | ncclSum"));
return it->second;
}

std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&ncclID);
std::ostringstream oss;
for (auto i = 0; i < NCCL_UNIQUE_ID_BYTES; ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}

} // namespace distributed
} // namespace paddle
5 changes: 5 additions & 0 deletions paddle/fluid/distributed/collective/NCCLTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h"

#include "paddle/fluid/distributed/collective/Types.h"

namespace paddle {
namespace distributed {

Expand Down Expand Up @@ -194,5 +196,8 @@ class NCCLCommManager {
mutable std::mutex mutex_;
};

ncclRedOp_t ToNCCLRedType(ReduceOp reduction);
std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID);

} // namespace distributed
} // namespace paddle
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) {

void ProcessGroup::Task::Synchronize() {}

ProcessGroup::ProcessGroup(int rank, int size) : rank_(rank), size_(size) {}
ProcessGroup::ProcessGroup(int rank, int size, int gid)
: rank_(rank), size_(size) {
if (gid != IGNORE_ID) {
auto map = ProcessGroupMapFromGid::getInstance();
map->insert(gid, this);
}
}

} // namespace distributed
} // namespace paddle
50 changes: 41 additions & 9 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ constexpr auto kWaitTimeout = std::chrono::milliseconds(0);
namespace paddle {
namespace distributed {

constexpr int IGNORE_ID = -1;
using Tensor = paddle::experimental::Tensor;

enum class CommType : std::uint8_t {
Expand All @@ -49,14 +50,6 @@ enum class CommType : std::uint8_t {
UNKNOWN = 100,
};

struct ProcessGroupStrategy {
int nranks_{1};
int local_rank_{0};
std::vector<std::string> trainer_endpoints_{};
std::string current_endpoint_{""};
int nrings_{1};
};

class ProcessGroup {
public:
class Task {
Expand All @@ -76,7 +69,7 @@ class ProcessGroup {
bool is_completed_ = false;
};

explicit ProcessGroup(int rank, int size);
explicit ProcessGroup(int rank, int size, int gid);
virtual ~ProcessGroup() {}

int GetRank() const { return rank_; }
Expand All @@ -99,6 +92,12 @@ class ProcessGroup {
"ProcessGroup%s does not support broadcast", GetBackendName()));
}

virtual void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast for static",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) {
PADDLE_THROW(platform::errors::InvalidArgument(
Expand Down Expand Up @@ -151,5 +150,38 @@ class ProcessGroup {
const int size_;
};

class ProcessGroupMapFromGid {
public:
bool has(int gid) {
auto it = map_.find(gid);
return it != map_.end();
}

void insert(int gid, ProcessGroup* pg) {
PADDLE_ENFORCE_EQ(has(gid), false,
platform::errors::PreconditionNotMet(
"The process group with id %d doesnot exist.", gid));
map_[gid] = pg;
}

ProcessGroup* get(int gid) {
PADDLE_ENFORCE_EQ(has(gid), false,
platform::errors::PreconditionNotMet(
"The process group with id %d doesnot exist.", gid));
return map_.find(gid)->second;
}

static std::shared_ptr<ProcessGroupMapFromGid> getInstance() {
static auto s_instance = std::make_shared<ProcessGroupMapFromGid>();
return s_instance;
}

ProcessGroupMapFromGid() = default;
~ProcessGroupMapFromGid() = default;

private:
std::unordered_map<int, ProcessGroup*> map_;
};

} // namespace distributed
} // namespace paddle
6 changes: 4 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroupGloo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ ProcessGroupGloo::GlooTask::GlooTask(int rank,

ProcessGroupGloo::ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size), _tag(0), _store(new GlooStore(store)) {
int world_size, int gid, const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size, gid),
_tag(0),
_store(new GlooStore(store)) {
_context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
auto prefix_store =
::gloo::rendezvous::PrefixStore(std::to_string(0), *_store);
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/ProcessGroupGloo.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ProcessGroupGloo : public ProcessGroup {

explicit ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, std::shared_ptr<GlooOptions> options);
int world_size, int gid, std::shared_ptr<GlooOptions> options);

~ProcessGroupGloo() = default;

Expand Down
Loading