Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Distributed] Opt nccl connection by lazy initialization #55005

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 37 additions & 65 deletions paddle/fluid/distributed/collective/process_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "paddle/fluid/distributed/collective/types.h"
#include "paddle/fluid/distributed/collective/utils.h"
#include "paddle/fluid/eager/api/utils/tensor_utils.h" // NOTE: this header is required somewhere
#include "paddle/phi/core/dense_tensor.h"
#include "paddle/phi/core/device_context.h"
Expand All @@ -34,22 +35,6 @@ namespace distributed {

constexpr int kIgnoreId = -1;

enum class CommType : std::uint8_t {
BROADCAST = 0,
ALLREDUCE = 1,
ALLREDUCE_SPARSE = 2, // TODO(shenliang03): to support sparse in allreduce
REDUCE = 3,
ALLGATHER = 4,
GATHER = 5,
SCATTER = 6,
REDUCE_SCATTER = 7,
ALLTOALL = 8,
SEND = 9,
RECV = 10,
BARRIER = 11,
UNKNOWN = 100,
};

class ProcessGroup {
public:
class Task {
Expand Down Expand Up @@ -405,68 +390,57 @@ class ProcessGroup {
// legacy APIs
// TODO(liyurui): This API will be moved later
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions& = AllreduceOptions()) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce", GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const AllreduceOptions& options = AllreduceOptions()) {
return AllReduce(outputs.data(), inputs.front(), options, false);
}

virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions&,
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce with sync_op flag",
GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const AllreduceOptions& options,
bool sync_op) {
return AllReduce(outputs.data(), inputs.front(), options, sync_op);
}

// TODO(sunyilun): methods below will be removed later
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions& = BroadcastOptions()) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast", GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const BroadcastOptions& options = BroadcastOptions()) {
return Broadcast(outputs.data(), inputs.front(), options, false);
}

virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions&,
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast with sync_op flag",
GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const BroadcastOptions& options,
bool sync_op) {
return Broadcast(outputs.data(), inputs.front(), options, sync_op);
}

virtual std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support send", GetBackendName()));
std::vector<phi::DenseTensor>& tensors, int dst_rank) { // NOLINT
return Send(tensors.front(), dst_rank, false);
}

virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support recv", GetBackendName()));
std::vector<phi::DenseTensor>& tensors, int src_rank) { // NOLINT
return Recv(&tensors.front(), src_rank, false);
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&) { // NOLINT
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support all_gather", GetBackendName()));
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors) { // NOLINT
return AllGather(out_tensors.data(), in_tensors.front(), false);
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support all_gather with sync_op flag",
GetBackendName()));
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op) {
return AllGather(out_tensors.data(), in_tensors.front(), sync_op);
}

virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
Expand All @@ -477,19 +451,17 @@ class ProcessGroup {
}

virtual std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>& ins, // NOLINT
std::vector<phi::DenseTensor>& outs, // NOLINT
const ReduceOptions& opts) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support reduce", GetBackendName()));
return Reduce(outs.data(), ins.front(), opts, false);
}

virtual std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ScatterOptions&) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support scatter", GetBackendName()));
std::vector<phi::DenseTensor>& ins, // NOLINT
std::vector<phi::DenseTensor>& outs, // NOLINT
const ScatterOptions& opts) {
return Scatter(outs.data(), ins.front(), opts, false);
}

protected:
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/distributed/collective/process_group_bkcl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "paddle/fluid/distributed/collective/bkcl_tools.h"
#include "paddle/fluid/distributed/collective/common.h"
#include "paddle/fluid/distributed/collective/utils.h"
#include "paddle/fluid/platform/device/xpu/bkcl_helper.h"
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#include "paddle/phi/api/lib/utils/allocator.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "paddle/fluid/distributed/collective/common.h"
#include "paddle/fluid/distributed/collective/custom_ccl_tools.h"
#include "paddle/fluid/distributed/collective/utils.h"
#include "paddle/fluid/memory/malloc.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
Expand Down
Loading