Skip to content

Commit

Permalink
[BP] Fixes for large size clusters. (#10880) (#10899)
Browse files Browse the repository at this point in the history
- Increase listener backlog.
- Check for empty kernels.
  • Loading branch information
trivialfis authored Oct 17, 2024
1 parent 41c2680 commit 5556bc3
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 28 deletions.
9 changes: 3 additions & 6 deletions include/xgboost/collective/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,10 @@ class TCPSocket {
[[nodiscard]] HandleT const &Handle() const { return handle_; }
/**
* @brief Listen to incoming requests. Should be called after bind.
*
* Both the default and minimum backlog is set to 256.
*/
[[nodiscard]] Result Listen(std::int32_t backlog = 16) {
if (listen(handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}
[[nodiscard]] Result Listen(std::int32_t backlog = 256);
/**
* @brief Bind socket to INADDR_ANY, return the port selected by the OS.
*/
Expand Down
9 changes: 9 additions & 0 deletions src/collective/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
#include "xgboost/collective/socket.h"

#include <algorithm> // for max
#include <array> // for array
#include <cstddef> // std::size_t
#include <cstdint> // std::int32_t
Expand Down Expand Up @@ -58,6 +59,14 @@ SockAddrV4 SockAddrV4::InaddrAny() { return MakeSockAddress("0.0.0.0", 0).V4();
SockAddrV6 SockAddrV6::Loopback() { return MakeSockAddress("::1", 0).V6(); }
SockAddrV6 SockAddrV6::InaddrAny() { return MakeSockAddress("::", 0).V6(); }

[[nodiscard]] Result TCPSocket::Listen(std::int32_t backlog) {
backlog = std::max(backlog, 256);
if (listen(this->handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}

std::size_t TCPSocket::Send(StringView str) {
CHECK(!this->IsClosed());
CHECK_LT(str.size(), std::numeric_limits<std::int32_t>::max());
Expand Down
3 changes: 2 additions & 1 deletion src/collective/tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} {
listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6);
return listener_.Bind(host_, &this->port_);
} << [&] {
return listener_.Listen();
CHECK_GT(this->n_workers_, 0);
return listener_.Listen(this->n_workers_);
};
SafeColl(rc);
}
Expand Down
7 changes: 0 additions & 7 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) {
lambda(i);
}
}
template <typename L>
__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end,
L lambda) {
for (auto i : GridStrideRange(begin, end)) {
lambda(i, device_idx);
}
}

/* \brief A wrapper around kernel launching syntax, used to guard against empty input.
*
Expand Down
15 changes: 9 additions & 6 deletions src/tree/gpu_hist/row_partitioner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ void SortPositionBatch(common::Span<const PerNodeData<OpDataT>> d_batch_info,

// Value found by experimentation
const int kItemsThread = 12;
const int grid_size = xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);

SortPositionCopyKernel<kBlockSize, RowIndexT, OpDataT>
<<<grid_size, kBlockSize, 0>>>(batch_info_itr, ridx, ridx_tmp, total_rows);
std::uint32_t const kGridSize =
xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);
dh::LaunchKernel{kGridSize, kBlockSize, 0}(SortPositionCopyKernel<kBlockSize, RowIndexT, OpDataT>,
batch_info_itr, ridx, ridx_tmp, total_rows);
}

struct NodePositionInfo {
Expand Down Expand Up @@ -328,11 +329,13 @@ class RowPartitioner {
sizeof(NodePositionInfo) * ridx_segments_.size(),
cudaMemcpyDefault));

constexpr int kBlockSize = 512;
constexpr std::uint32_t kBlockSize = 512;
const int kItemsThread = 8;
const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
const std::uint32_t grid_size =
xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
common::Span<const RowIndexT> d_ridx(ridx_.data().get(), ridx_.size());
FinalisePositionKernel<kBlockSize><<<grid_size, kBlockSize, 0>>>(
dh::LaunchKernel{grid_size, kBlockSize}(
FinalisePositionKernel<kBlockSize, RowIndexT, FinalisePositionOpT>,
dh::ToSpan(d_node_info_storage), d_ridx, d_out_position, op);
}
};
Expand Down
16 changes: 8 additions & 8 deletions tests/cpp/tree/gpu_hist/test_row_partitioner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
#include <thrust/host_vector.h>
#include <thrust/sequence.h>

#include <algorithm>
#include <vector>

#include "../../../../src/tree/gpu_hist/row_partitioner.cuh"
#include "../../helpers.h"
#include "xgboost/base.h"
#include "xgboost/context.h"
#include "xgboost/task.h"
#include "xgboost/tree_model.h"
#include "../../helpers.h" // for RandomDataGenerator

namespace xgboost::tree {
void TestUpdatePositionBatch() {
Expand Down Expand Up @@ -55,7 +52,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
thrust::device_vector<uint32_t> ridx_tmp(ridx_in.size());
thrust::device_vector<bst_uint> counts(segments.size());

auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; };
auto op = [=] __device__(auto ridx, int split_index, int data) {
return ridx % 2 == 0;
};
std::vector<int> op_data(segments.size());
std::vector<PerNodeData<int>> h_batch_info(segments.size());
dh::TemporaryArray<PerNodeData<int>> d_batch_info(segments.size());
Expand All @@ -73,7 +72,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
dh::ToSpan(ridx_tmp), dh::ToSpan(counts),
total_rows, op, &tmp);

auto op_without_data = [=] __device__(auto ridx) { return ridx % 2 == 0; };
auto op_without_data = [=] __device__(auto ridx) {
return ridx % 2 == 0;
};
for (size_t i = 0; i < segments.size(); i++) {
auto begin = ridx.begin() + segments[i].begin;
auto end = ridx.begin() + segments[i].end;
Expand All @@ -87,11 +88,10 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
}
}

TEST(GpuHist, SortPositionBatch) {
TEST(RowPartitioner, SortPositionBatch) {
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 3}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 1}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{3, 6}, {0, 2}});
}

} // namespace xgboost::tree

0 comments on commit 5556bc3

Please sign in to comment.