Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build a simple event loop for collective. #9593

Merged
merged 8 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions rabit/include/rabit/internal/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <string>
#include <vector>

#include "rabit/internal/utils.h"
#include "rabit/serializable.h"
#include "dmlc/io.h"
#include "xgboost/logging.h"

namespace rabit::utils {
/*! \brief re-use definition of dmlc::SeekStream */
Expand Down Expand Up @@ -84,8 +84,7 @@ struct MemoryBufferStream : public SeekStream {
}
~MemoryBufferStream() override = default;
size_t Read(void *ptr, size_t size) override {
utils::Assert(curr_ptr_ <= p_buffer_->length(),
"read can not have position excceed buffer length");
CHECK_LE(curr_ptr_, p_buffer_->length()) << "read can not have position excceed buffer length";
size_t nread = std::min(p_buffer_->length() - curr_ptr_, size);
if (nread != 0) std::memcpy(ptr, &(*p_buffer_)[0] + curr_ptr_, nread);
curr_ptr_ += nread;
Expand Down
46 changes: 33 additions & 13 deletions rabit/include/rabit/internal/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
#include <chrono>
#include <cstring>
#include <string>
#include <system_error> // make_error_code, errc
#include <unordered_map>
#include <vector>

#include "utils.h"

#if !defined(_WIN32)

#include <sys/poll.h>
Expand Down Expand Up @@ -93,6 +92,20 @@ int PollImpl(PollFD* pfd, int nfds, std::chrono::seconds timeout) noexcept(true)
#endif // IS_MINGW()
}

template <typename E>
std::enable_if_t<std::is_integral_v<E>, xgboost::collective::Result> PollError(E const& revents) {
if ((revents & POLLERR) != 0) {
return xgboost::system::FailWithCode("Poll error condition.");
}
if ((revents & POLLNVAL) != 0) {
return xgboost::system::FailWithCode("Invalid polling request.");
}
if ((revents & POLLHUP) != 0) {
return xgboost::system::FailWithCode("Poll hung up.");
}
return xgboost::collective::Success();
}

/*! \brief helper data structure to perform poll */
struct PollHelper {
public:
Expand Down Expand Up @@ -160,25 +173,32 @@ struct PollHelper {
*
* @param timeout specify timeout in seconds. Block if negative.
*/
[[nodiscard]] xgboost::collective::Result Poll(std::chrono::seconds timeout) {
[[nodiscard]] xgboost::collective::Result Poll(std::chrono::seconds timeout,
bool check_error = true) {
std::vector<pollfd> fdset;
fdset.reserve(fds.size());
for (auto kv : fds) {
fdset.push_back(kv.second);
}
int ret = PollImpl(fdset.data(), fdset.size(), timeout);
std::int32_t ret = PollImpl(fdset.data(), fdset.size(), timeout);
if (ret == 0) {
return xgboost::collective::Fail("Poll timeout.");
return xgboost::collective::Fail("Poll timeout.", std::make_error_code(std::errc::timed_out));
} else if (ret < 0) {
return xgboost::system::FailWithCode("Poll failed.");
} else {
for (auto& pfd : fdset) {
auto revents = pfd.revents & pfd.events;
if (!revents) {
fds.erase(pfd.fd);
} else {
fds[pfd.fd].events = revents;
}
}

for (auto& pfd : fdset) {
auto result = PollError(pfd.revents);
if (check_error && !result.OK()) {
return result;
}

auto revents = pfd.revents & pfd.events;
if (!revents) {
// FIXME(jiamingy): remove this once rabit is replaced.
fds.erase(pfd.fd);
} else {
fds[pfd.fd].events = revents;
}
}
return xgboost::collective::Success();
Expand Down
3 changes: 1 addition & 2 deletions rabit/src/allreduce_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,12 +721,11 @@ AllreduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) {
}
finished = false;
}
watcher.WatchException(links[i].sock);
}
// finish running
if (finished) break;
// select
auto poll_res = watcher.Poll(timeout_sec);
auto poll_res = watcher.Poll(timeout_sec, false); // fail on macos
if (!poll_res.OK()) {
LOG(FATAL) << poll_res.Report();
}
Expand Down
167 changes: 167 additions & 0 deletions src/collective/loop.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/**
* Copyright 2023, XGBoost Contributors
*/
#include "loop.h"

#include <queue> // for queue

#include "rabit/internal/socket.h" // for PollHelper
#include "xgboost/collective/socket.h" // for FailWithCode
#include "xgboost/logging.h" // for CHECK

namespace xgboost::collective {
Result Loop::EmptyQueue() {
timer_.Start(__func__);
auto error = [this] {
this->stop_ = true;
timer_.Stop(__func__);
};

while (!queue_.empty() && !stop_) {
std::queue<Op> qcopy;
rabit::utils::PollHelper poll;

// watch all ops
while (!queue_.empty()) {
auto op = queue_.front();
queue_.pop();

switch (op.code) {
case Op::kRead: {
poll.WatchRead(*op.sock);
break;
}
case Op::kWrite: {
poll.WatchWrite(*op.sock);
break;
}
default: {
error();
return Fail("Invalid socket operation.");
}
}
qcopy.push(op);
}

// poll, work on fds that are ready.
timer_.Start("poll");
auto rc = poll.Poll(timeout_);
timer_.Stop("poll");
if (!rc.OK()) {
error();
return rc;
}
// we wonldn't be here if the queue is empty.
CHECK(!qcopy.empty());

while (!qcopy.empty() && !stop_) {
auto op = qcopy.front();
qcopy.pop();

std::int32_t n_bytes_done{0};
CHECK(op.sock->NonBlocking());

switch (op.code) {
case Op::kRead: {
if (poll.CheckRead(*op.sock)) {
n_bytes_done = op.sock->Recv(op.ptr + op.off, op.n - op.off);
}
break;
}
case Op::kWrite: {
if (poll.CheckWrite(*op.sock)) {
n_bytes_done = op.sock->Send(op.ptr + op.off, op.n - op.off);
}
break;
}
default: {
error();
return Fail("Invalid socket operation.");
}
}

if (n_bytes_done == -1 && !system::LastErrorWouldBlock()) {
stop_ = true;
auto rc = system::FailWithCode("Invalid socket output.");
error();
return rc;
}
op.off += n_bytes_done;
CHECK_LE(op.off, op.n);

if (op.off != op.n) {
// not yet finished, push back to queue for next round.
queue_.push(op);
}
}
}
timer_.Stop(__func__);
return Success();
}

void Loop::Process() {
// consumer
while (true) {
std::unique_lock lock{mu_};
cv_.wait(lock, [this] { return !this->queue_.empty() || stop_; });
if (stop_) {
break;
}
CHECK(!mu_.try_lock());

this->rc_ = this->EmptyQueue();
if (!rc_.OK()) {
stop_ = true;
cv_.notify_one();
break;
}

CHECK(queue_.empty());
CHECK(!mu_.try_lock());
cv_.notify_one();
}

if (rc_.OK()) {
CHECK(queue_.empty());
}
}

Result Loop::Stop() {
std::unique_lock lock{mu_};
stop_ = true;
lock.unlock();

CHECK_EQ(this->Block().OK(), this->rc_.OK());

if (curr_exce_) {
std::rethrow_exception(curr_exce_);
}

return Success();
}

Loop::Loop(std::chrono::seconds timeout) : timeout_{timeout} {
timer_.Init(__func__);
worker_ = std::thread{[this] {
try {
this->Process();
} catch (std::exception const& e) {
std::lock_guard<std::mutex> guard{mu_};
if (!curr_exce_) {
curr_exce_ = std::current_exception();
rc_ = Fail("Exception was thrown");
}
stop_ = true;
cv_.notify_all();
} catch (...) {
std::lock_guard<std::mutex> guard{mu_};
if (!curr_exce_) {
curr_exce_ = std::current_exception();
rc_ = Fail("Exception was thrown");
}
stop_ = true;
cv_.notify_all();
}
}};
}
} // namespace xgboost::collective
83 changes: 83 additions & 0 deletions src/collective/loop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright 2023, XGBoost Contributors
*/
#pragma once
#include <chrono> // for seconds
#include <condition_variable> // for condition_variable
#include <cstddef> // for size_t
#include <cstdint> // for int8_t, int32_t
#include <exception> // for exception_ptr
#include <mutex> // for unique_lock, mutex
#include <queue> // for queue
#include <thread> // for thread
#include <utility> // for move

#include "../common/timer.h" // for Monitor
#include "xgboost/collective/result.h" // for Result
#include "xgboost/collective/socket.h" // for TCPSocket

namespace xgboost::collective {
class Loop {
public:
struct Op {
enum Code : std::int8_t { kRead = 0, kWrite = 1 } code;
std::int32_t rank{-1};
std::int8_t* ptr{nullptr};
std::size_t n{0};
TCPSocket* sock{nullptr};
std::size_t off{0};

Op(Code c, std::int32_t rank, std::int8_t* ptr, std::size_t n, TCPSocket* sock, std::size_t off)
: code{c}, rank{rank}, ptr{ptr}, n{n}, sock{sock}, off{off} {}
Op(Op const&) = default;
Op& operator=(Op const&) = default;
Op(Op&&) = default;
Op& operator=(Op&&) = default;
};

private:
std::thread worker_;
std::condition_variable cv_;
std::mutex mu_;
std::queue<Op> queue_;
std::chrono::seconds timeout_;
Result rc_;
bool stop_{false};
std::exception_ptr curr_exce_{nullptr};
common::Monitor timer_;

Result EmptyQueue();
void Process();

public:
Result Stop();

void Submit(Op op) {
// producer
std::unique_lock lock{mu_};
queue_.push(op);
lock.unlock();
cv_.notify_one();
}

[[nodiscard]] Result Block() {
{
std::unique_lock lock{mu_};
cv_.notify_all();
}
std::unique_lock lock{mu_};
cv_.wait(lock, [this] { return this->queue_.empty() || stop_; });
return std::move(rc_);
}

explicit Loop(std::chrono::seconds timeout);

~Loop() noexcept(false) {
this->Stop();

if (worker_.joinable()) {
worker_.join();
}
}
};
} // namespace xgboost::collective
Loading