Skip to content

Commit

Permalink
[coll] Increase listener backlog.
Browse files Browse the repository at this point in the history
Debug GCP.

log.

don't clean.

Disable log.

Logs.

More.

Less.

check grid size.

work on test.

finalise.

Log early.

Type.

build.

Log.

Remove.

Cleanup.
  • Loading branch information
trivialfis committed Oct 11, 2024
1 parent 59e6c92 commit a27cd7c
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 27 deletions.
2 changes: 1 addition & 1 deletion include/xgboost/collective/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ class TCPSocket {
/**
* @brief Listen to incoming requests. Should be called after bind.
*/
[[nodiscard]] Result Listen(std::int32_t backlog = 16) {
[[nodiscard]] Result Listen(std::int32_t backlog = 256) {
if (listen(handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
Expand Down
3 changes: 2 additions & 1 deletion src/collective/comm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ std::string InitLog(std::string task_id, std::int32_t rank) {
if (task_id.empty()) {
return "Rank " + std::to_string(rank);
}
return "Task " + task_id + " got rank " + std::to_string(rank);
return "Task " + task_id + " pid:" + std::to_string(getpid()) + " got rank " +
std::to_string(rank);
}
} // namespace

Expand Down
7 changes: 0 additions & 7 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) {
lambda(i);
}
}
template <typename L>
__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end,
L lambda) {
for (auto i : GridStrideRange(begin, end)) {
lambda(i, device_idx);
}
}

/* \brief A wrapper around kernel launching syntax, used to guard against empty input.
*
Expand Down
9 changes: 9 additions & 0 deletions src/data/iterative_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro
XGDMatrixCallbackNext* next, float missing, int nthread,
bst_bin_t max_bin)
: proxy_{proxy}, reset_{reset}, next_{next} {
common::Monitor monitor;
monitor.Init("Iterator-Ctor");

monitor.Start(__func__);
// fetch the first batch
auto iter =
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{iter_handle, reset_, next_};
Expand All @@ -42,16 +46,21 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro
BatchParam p{max_bin, tree::TrainParam::DftSparseThreshold()};

if (ctx.IsCUDA()) {
monitor.Start("InitCUDA");
this->InitFromCUDA(&ctx, p, iter_handle, missing, ref);
monitor.Stop("InitCUDA");
} else {
monitor.Start("InitCPU");
this->InitFromCPU(&ctx, p, iter_handle, missing, ref);
monitor.Stop("InitCPU");
}

this->fmat_ctx_ = ctx;
this->batch_ = p;

LOG(INFO) << "Finished constructing the `IterativeDMatrix`: (" << this->Info().num_row_ << ", "
<< this->Info().num_col_ << ", " << this->Info().num_nonzero_ << ").";
monitor.Stop(__func__);
}

void IterativeDMatrix::InitFromCPU(Context const* ctx, BatchParam const& p,
Expand Down
28 changes: 15 additions & 13 deletions src/tree/gpu_hist/row_partitioner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,10 @@ void SortPositionBatch(Context const* ctx, common::Span<const PerNodeData<OpData

// Value found by experimentation
const int kItemsThread = 12;
const int grid_size = xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);

SortPositionCopyKernel<kBlockSize, OpDataT>
<<<grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()>>>(batch_info_itr, ridx, ridx_tmp,
total_rows);
std::uint32_t const kGridSize =
xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);
dh::LaunchKernel{kGridSize, kBlockSize, 0, ctx->CUDACtx()->Stream()}(
SortPositionCopyKernel<kBlockSize, OpDataT>, batch_info_itr, ridx, ridx_tmp, total_rows);
}

struct NodePositionInfo {
Expand Down Expand Up @@ -211,13 +210,14 @@ XGBOOST_DEV_INLINE int GetPositionFromSegments(std::size_t idx,
return position;
}

template <int kBlockSize, typename RowIndexT, typename OpT>
template <int kBlockSize, typename OpT>
__global__ __launch_bounds__(kBlockSize) void FinalisePositionKernel(
const common::Span<const NodePositionInfo> d_node_info, bst_idx_t base_ridx,
const common::Span<const RowIndexT> d_ridx, common::Span<bst_node_t> d_out_position, OpT op) {
common::Span<const NodePositionInfo> d_node_info, bst_idx_t base_ridx,
common::Span<const cuda_impl::RowIndexT> d_ridx, common::Span<bst_node_t> d_out_position,
OpT op) {
for (auto idx : dh::GridStrideRange<std::size_t>(0, d_ridx.size())) {
auto position = GetPositionFromSegments(idx, d_node_info.data());
RowIndexT ridx = d_ridx[idx] - base_ridx;
cuda_impl::RowIndexT ridx = d_ridx[idx] - base_ridx;
bst_node_t new_position = op(ridx, position);
d_out_position[ridx] = new_position;
}
Expand Down Expand Up @@ -377,12 +377,14 @@ class RowPartitioner {
sizeof(NodePositionInfo) * ridx_segments_.size(),
cudaMemcpyDefault, ctx->CUDACtx()->Stream()));

constexpr int kBlockSize = 512;
constexpr std::uint32_t kBlockSize = 512;
const int kItemsThread = 8;
const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
const std::uint32_t grid_size =
xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
common::Span<RowIndexT const> d_ridx{ridx_.data(), ridx_.size()};
FinalisePositionKernel<kBlockSize><<<grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()>>>(
dh::ToSpan(d_node_info_storage), base_ridx, d_ridx, d_out_position, op);
dh::LaunchKernel{grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()}(
FinalisePositionKernel<kBlockSize, FinalisePositionOpT>, dh::ToSpan(d_node_info_storage),
base_ridx, d_ridx, d_out_position, op);
}
};
}; // namespace xgboost::tree
45 changes: 40 additions & 5 deletions tests/cpp/tree/gpu_hist/test_row_partitioner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
#include "../../../../src/data/ellpack_page.cuh"
#include "../../../../src/tree/gpu_hist/expand_entry.cuh" // for GPUExpandEntry
#include "../../../../src/tree/gpu_hist/row_partitioner.cuh"
#include "../../../../src/tree/param.h" // for TrainParam
#include "../../helpers.h" // for RandomDataGenerator
#include "../../../../src/tree/param.h" // for TrainParam
#include "../../collective/test_worker.h" // for TestDistributedGlobal
#include "../../helpers.h" // for RandomDataGenerator

namespace xgboost::tree {
void TestUpdatePositionBatch() {
Expand Down Expand Up @@ -61,7 +62,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
thrust::device_vector<cuda_impl::RowIndexT> ridx_tmp(ridx_in.size());
thrust::device_vector<cuda_impl::RowIndexT> counts(segments.size());

auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; };
auto op = [=] __device__(auto ridx, int split_index, int data) {
return ridx % 2 == 0;
};
std::vector<int> op_data(segments.size());
std::vector<PerNodeData<int>> h_batch_info(segments.size());
dh::TemporaryArray<PerNodeData<int>> d_batch_info(segments.size());
Expand All @@ -79,7 +82,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
dh::ToSpan(ridx_tmp), dh::ToSpan(counts), total_rows, op,
&tmp);

auto op_without_data = [=] __device__(auto ridx) { return ridx % 2 == 0; };
auto op_without_data = [=] __device__(auto ridx) {
return ridx % 2 == 0;
};
for (size_t i = 0; i < segments.size(); i++) {
auto begin = ridx.begin() + segments[i].begin;
auto end = ridx.begin() + segments[i].end;
Expand All @@ -93,7 +98,7 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
}
}

TEST(GpuHist, SortPositionBatch) {
TEST(RowPartitioner, SortPositionBatch) {
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 3}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 1}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 6}});
Expand Down Expand Up @@ -178,4 +183,34 @@ void TestExternalMemory() {
} // anonymous namespace

TEST(RowPartitioner, LeafPartitionExternalMemory) { TestExternalMemory(); }

namespace {
void TestEmptyNode(std::int32_t n_workers) {
collective::TestDistributedGlobal(n_workers, [] {
auto ctx = MakeCUDACtx(DistGpuIdx());
RowPartitioner partitioner;
bst_idx_t n_samples = (collective::GetRank() == 0) ? 0 : 1024;
bst_idx_t base_rowid = 0;
partitioner.Reset(&ctx, /*n_samples=*/0, base_rowid);
std::vector<RegTree::Node> splits(1);
partitioner.UpdatePositionBatch(
&ctx, {0}, {1}, {2}, splits,
[] XGBOOST_DEVICE(bst_idx_t ridx, std::int32_t /*nidx_in_batch*/, RegTree::Node) {
return ridx < 3;
});
ASSERT_EQ(partitioner.GetNumNodes(), 3);
if (collective::GetRank() == 0) {
for (std::size_t i = 0; i < 3; ++i) {
ASSERT_TRUE(partitioner.GetRows(i).empty());
}
}
ctx.CUDACtx()->Stream().Sync();
});
}
} // anonymous namespace

TEST(RowPartitioner, MGPUEmpty) {
std::int32_t n_workers = curt::AllVisibleGPUs();
TestEmptyNode(n_workers);
}
} // namespace xgboost::tree

0 comments on commit a27cd7c

Please sign in to comment.