Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Browse files Browse the repository at this point in the history
… move_dropout_to_phi
  • Loading branch information
phlrain committed Mar 4, 2022
2 parents 2c228cf + f3161c5 commit 89d3670
Show file tree
Hide file tree
Showing 25 changed files with 1,276 additions and 424 deletions.
194 changes: 194 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroupGloo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#endif

#include <gloo/broadcast.h>
#include <gloo/reduce.h>
#include <gloo/scatter.h>
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h"
Expand Down Expand Up @@ -144,6 +146,22 @@ void set_inputs(P& opts, const std::vector<Tensor>& tensors) { // NOLINT
opts.setInputs(get_multi_data<T>(tensors), tensors[0].numel());
}

template <typename T, typename P>
void set_inputs_for_scatter(P& opts, // NOLINT
const std::vector<Tensor>& tensors, // NOLINT
int nranks) {
std::vector<T*> ret(nranks);
auto raw_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[0].impl());
T* raw_pointer = reinterpret_cast<T*>(raw_tensor->data());
size_t offset = 0;
for (int i = 0; i < nranks; i++) {
ret[i] = raw_pointer + offset;
offset += tensors[0].numel() / nranks;
}
opts.setInputs(ret, tensors[0].numel() / nranks);
}

ProcessGroupGloo::GlooTask::GlooTask(int rank,
const std::vector<Tensor>& inputs,
CommType comm_type)
Expand Down Expand Up @@ -257,6 +275,182 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
return task;
}

class BarrierGlooTask : public ProcessGroupGloo::GlooTask {
public:
BarrierGlooTask(int rank, const std::shared_ptr<gloo::Context>& context)
: ProcessGroupGloo::GlooTask(rank, std::vector<Tensor>{},
CommType::BARRIER),
_context(context) {}

void Run() override { _do_barrier(); }

private:
std::shared_ptr<gloo::Context> _context;

void _do_barrier() {
gloo::BarrierOptions opts(_context);
gloo::barrier(opts);
}
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Barrier(
const BarrierOptions& opts) {
std::shared_ptr<BarrierGlooTask> task;
auto context = get_context();
task = std::make_shared<BarrierGlooTask>(rank_, context);
task->Run();
return task;
}

class AllgatherGlooTask : public ProcessGroupGloo::GlooTask {
public:
AllgatherGlooTask(int rank, const std::shared_ptr<gloo::Context>& context,
std::vector<Tensor>& inputs, // NOLINT
std::vector<Tensor>& outputs, // NOLINT
uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER),
_context(context),
_inputs(inputs),
_outputs(outputs),
_tag(tag) {}

void Run() override { _do_allgather(_inputs, _outputs); }

private:
std::shared_ptr<gloo::Context> _context;
std::vector<Tensor> _inputs;
std::vector<Tensor> _outputs;
uint32_t _tag;

void _do_allgather(std::vector<Tensor>& in, // NOLINT
std::vector<Tensor>& out) { // NOLINT
const auto& dtype = in[0].type();
gloo::AllgatherOptions opts(_context);
GENERATE_FUNC(dtype, set_input, opts, in[0]);
GENERATE_FUNC(dtype, set_output, opts, out[0]);
opts.setTag(_tag);
gloo::allgather(opts);
}
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllGather(
std::vector<Tensor>& in_tensors, std::vector<Tensor>& out_tensors) {
std::shared_ptr<AllgatherGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_shared<AllgatherGlooTask>(rank_, context, in_tensors,
out_tensors, tag);
task->Run();
return task;
}

class ReduceGlooTask : public ProcessGroupGloo::GlooTask {
public:
ReduceGlooTask(int rank, const std::shared_ptr<gloo::Context>& context,
std::vector<Tensor>& in, ReduceOp reduce_op, // NOLINT
int dst, uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, in, CommType::REDUCE),
_context(context),
_inputs(in),
_reduce_op(reduce_op),
_dst(dst),
_tag(tag) {}

void Run() override { _do_reduce(_inputs, _dst); }

private:
std::shared_ptr<gloo::Context> _context;
std::vector<Tensor> _inputs;
const ReduceOp _reduce_op;
int _dst;
uint32_t _tag;

gloo::ReduceOptions::Func _get_function(const experimental::DataType type,
const ReduceOp op) {
gloo::ReduceOptions::Func fn;
GENERATE_FUNC(type, _get_function_impl, fn, op);
return fn;
}

template <typename T>
void _get_function_impl(gloo::ReduceOptions::Func& fn, // NOLINT
const ReduceOp op) {
fn = get_function<T>(op);
}

void _do_reduce(std::vector<Tensor>& tensors, int dst) { // NOLINT
const auto& dtype = tensors[0].type();
gloo::ReduceOptions opts(_context);
GENERATE_FUNC(dtype, set_input, opts, tensors[0]);
GENERATE_FUNC(dtype, set_output, opts, tensors[0]);
opts.setReduceFunction(_get_function(dtype, _reduce_op));
opts.setTag(_tag);
opts.setRoot(dst);
gloo::reduce(opts);
}
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
std::vector<Tensor>& tensors, const ReduceOptions& opts) {
std::shared_ptr<ReduceGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_shared<ReduceGlooTask>(rank_, context, tensors,
opts.reduce_op, opts.root_rank, tag);
task->Run();
return task;
}

class ScatterGlooTask : public ProcessGroupGloo::GlooTask {
public:
ScatterGlooTask(int rank, const std::shared_ptr<gloo::Context>& context,
std::vector<Tensor>& inputs, // NOLINT
std::vector<Tensor>& outputs, // NOLINT
int src, int size, uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER),
_context(context),
_inputs(inputs),
_outputs(outputs),
_src(src),
_size(size),
_tag(tag) {}

void Run() override { _do_scatter(_inputs, _outputs, _src); }

private:
std::shared_ptr<gloo::Context> _context;
std::vector<Tensor> _inputs;
std::vector<Tensor> _outputs;
int _src;
int _size;
uint32_t _tag;

void _do_scatter(std::vector<Tensor>& in, std::vector<Tensor>& out, // NOLINT
int src) {
const auto& dtype = in[0].type();
gloo::ScatterOptions opts(_context);
if (rank_ == src) {
GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in, _size);
}
GENERATE_FUNC(dtype, set_output, opts, out[0]);
opts.setRoot(src);
opts.setTag(_tag);
gloo::scatter(opts);
}
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
std::vector<Tensor>& in_tensors, std::vector<Tensor>& out_tensors,
const ScatterOptions& opts) {
std::shared_ptr<ScatterGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_shared<ScatterGlooTask>(
rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag);
task->Run();
return task;
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) {
::gloo::transport::tcp::attr attr;
Expand Down
14 changes: 14 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroupGloo.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@ class ProcessGroupGloo : public ProcessGroup {
std::vector<Tensor>& inputs,
const AllreduceOptions& opts = AllreduceOptions()) override;

std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;

std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<Tensor>& in_tensors,
std::vector<Tensor>& out_tensors) override;

std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<Tensor>& tensors, const ReduceOptions& opts) override;

std::shared_ptr<ProcessGroup::Task> Scatter(std::vector<Tensor>& in_tensors,
std::vector<Tensor>& out_tensors,
const ScatterOptions&) override;

std::shared_ptr<::gloo::Context> get_context() { return _context; }
uint64_t next_tag() { return _tag++; }

Expand Down
68 changes: 40 additions & 28 deletions paddle/fluid/distributed/store/tcp_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ void MasterDaemon::_do_set(SocketType socket) {
}

void MasterDaemon::_do_get(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_get";
std::string key = tcputils::receive_string(socket);
auto iter = _store.find(key);
PADDLE_ENFORCE_NE(
Expand All @@ -86,13 +87,14 @@ void MasterDaemon::_do_get(SocketType socket) {
void MasterDaemon::_do_stop(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_stop";
ReplyType value = ReplyType::STOP_WAIT;
tcputils::send_value<ReplyType>(socket, value);
if (--_nranks == 0) {
_stop = true;
}
tcputils::send_value<ReplyType>(socket, value);
}

void MasterDaemon::_do_wait(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_wait";
std::string key = tcputils::receive_string(socket);
auto iter = _store.find(key);
auto reply = ReplyType::STOP_WAIT;
Expand Down Expand Up @@ -134,32 +136,42 @@ void MasterDaemon::run() {
}

for (size_t i = 1; i < fds.size(); i++) {
if (fds[i].revents == 0) {
continue;
}

Command command = tcputils::receive_value<Command>(fds[i].fd);
VLOG(3) << "TCPStore: recv command: " << static_cast<int>(command) << ".";

switch (command) {
case Command::ADD:
_do_add(fds[i].fd);
break;
case Command::GET:
_do_get(fds[i].fd);
break;
case Command::SET:
_do_set(fds[i].fd);
break;
case Command::WAIT:
_do_wait(fds[i].fd);
break;
case Command::STOP:
_do_stop(fds[i].fd);
break;
default:
VLOG(0) << "Unknow command: " << static_cast<int>(command);
exit(-1);
VLOG(0) << "fds.size:" << fds.size();
VLOG(0) << "fds.size-i:" << i;
VLOG(0) << "fds[i].revents:" << fds[i].revents;

try {
if (fds[i].revents == 0) {
continue;
}

Command command = tcputils::receive_value<Command>(fds[i].fd);
VLOG(3) << "TCPStore: recv command: " << static_cast<int>(command)
<< ".";

switch (command) {
case Command::ADD:
_do_add(fds[i].fd);
break;
case Command::GET:
_do_get(fds[i].fd);
break;
case Command::SET:
_do_set(fds[i].fd);
break;
case Command::WAIT:
_do_wait(fds[i].fd);
break;
case Command::STOP:
_do_stop(fds[i].fd);
break;
default:
VLOG(0) << "Unknow command: " << static_cast<int>(command);
exit(-1);
}
} catch (...) {
fds.erase(fds.begin() + i);
_sockets.erase(_sockets.begin() + i - 1);
}
}
}
Expand Down Expand Up @@ -281,8 +293,8 @@ void TCPStore::wait(const std::string& key) {
}

TCPStore::~TCPStore() {
_client->send_command_for_key(Command::STOP, "");
VLOG(3) << "~TCPStore";
_client->send_command_for_key(Command::STOP, "");
ReplyType ret = _client->receive_value<ReplyType>();
PADDLE_ENFORCE_EQ(ret, ReplyType::STOP_WAIT,
platform::errors::InvalidArgument(
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/framework/infershape_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ phi::InferMetaContext BuildInferMetaContext(InferShapeContext* ctx,
std::type_index(typeid(std::vector<int32_t>))) {
infer_meta_context.EmplaceBackAttr(std::move(
phi::ScalarArray(BOOST_GET_CONST(std::vector<int32_t>, attr))));
} else if (std::type_index(attr.type()) ==
std::type_index(typeid(std::vector<int64_t>))) {
infer_meta_context.EmplaceBackAttr(std::move(
phi::ScalarArray(BOOST_GET_CONST(std::vector<int64_t>, attr))));
} else if (std::type_index(attr.type()) ==
std::type_index(typeid(int))) {
infer_meta_context.EmplaceBackAttr(
Expand Down
Loading

1 comment on commit 89d3670

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.