Skip to content

Commit

Permalink
Merge pull request #6411 from uiuc-hpc/fix-lci-pp
Browse files Browse the repository at this point in the history
Minor refactoring and fixes to the LCI parcelport and pingpong_performance2 benchmark
  • Loading branch information
hkaiser authored Jan 13, 2024
2 parents 797ad73 + 9b1c5c9 commit b8cc430
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace hpx::parcelset::policies::lci {
static int ndevices;
// How many completion managers to use
static int ncomps;
// Whether to enable in-buffer assembly for the header messages.
static bool enable_in_buffer_assembly;

static void init_config(util::runtime_configuration const& rtcfg);
};
Expand Down
31 changes: 31 additions & 0 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,37 @@ namespace hpx::parcelset::policies::lci {
pos_piggy_back_address = 8 * sizeof(value_type) + 2
};

template <typename buffer_type, typename ChunkType>
static size_t get_header_size(
parcel_buffer<buffer_type, ChunkType> const& buffer,
size_t max_header_size) noexcept
{
HPX_ASSERT(max_header_size >= pos_piggy_back_address);

size_t current_header_size = pos_piggy_back_address;
if (buffer.data_.size() <= (max_header_size - current_header_size))
{
current_header_size += buffer.data_.size();
}
int num_zero_copy_chunks = buffer.num_chunks_.first;
[[maybe_unused]] int num_non_zero_copy_chunks =
buffer.num_chunks_.second;
if (num_zero_copy_chunks != 0)
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
{
current_header_size += tchunk_size;
}
}
return current_header_size;
}

template <typename buffer_type, typename ChunkType>
header(parcel_buffer<buffer_type, ChunkType> const& buffer,
char* header_buffer, size_t max_header_size) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,29 +210,30 @@ namespace hpx::traits {
hpx::threads::policies::scheduler_mode::
do_background_work_only);

size_t ncores_to_add =
size_t npus_to_add =
parcelset::policies::lci::config_t::progress_thread_num;
std::vector<const hpx::resource::core*> cores;
std::vector<const hpx::resource::pu*> pus;
for (auto& numa_domain : rp.numa_domains())
{
for (auto& core : numa_domain.cores())
{
cores.push_back(&core);
for (auto& pu : core.pus())
pus.push_back(&pu);
}
}
if (cores.size() <= 1)
if (pus.size() <= 1)
{
fprintf(stderr, "We don't have enough cores!\n");
fprintf(stderr, "We don't have enough pus!\n");
exit(1);
}
if ((size_t) ncores_to_add > cores.size() / 2)
if ((size_t) npus_to_add > pus.size() / 2)
{
ncores_to_add = cores.size() / 2;
npus_to_add = pus.size() / 2;
}
for (size_t i = 0; i < ncores_to_add; ++i)
for (size_t i = 0; i < npus_to_add; ++i)
{
size_t next_core = i * cores.size() / ncores_to_add;
rp.add_resource(*cores[next_core], "lci-progress-pool");
size_t next_pu = i * pus.size() / npus_to_add;
rp.add_resource(*pus[next_pu], "lci-progress-pool");
}
}
}
Expand Down Expand Up @@ -264,8 +265,9 @@ namespace hpx::traits {
"progress_type = rp\n"
"prepost_recv_num = 1\n"
"reg_mem = 1\n"
"ndevices = 2\n"
"ncomps = 1\n";
"ndevices = 1\n"
"ncomps = 1\n"
"enable_in_buffer_assembly = 1\n";
}
};
} // namespace hpx::traits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace hpx::parcelset::policies::lci {
hpx::chrono::high_resolution_timer timer_;
header header_;
LCI_mbuffer_t header_buffer;
std::vector<char> header_buffer_vector;
bool need_send_data;
bool need_send_tchunks;
LCI_tag_t tag;
Expand Down
3 changes: 3 additions & 0 deletions libs/full/parcelport_lci/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace hpx::parcelset::policies::lci {
bool config_t::reg_mem;
int config_t::ndevices;
int config_t::ncomps;
bool config_t::enable_in_buffer_assembly;

void config_t::init_config(util::runtime_configuration const& rtcfg)
{
Expand Down Expand Up @@ -105,6 +106,8 @@ namespace hpx::parcelset::policies::lci {
reg_mem = util::get_entry_as(rtcfg, "hpx.parcel.lci.reg_mem", 1);
ndevices = util::get_entry_as(rtcfg, "hpx.parcel.lci.ndevices", 1);
ncomps = util::get_entry_as(rtcfg, "hpx.parcel.lci.ncomps", 1);
enable_in_buffer_assembly = util::get_entry_as(
rtcfg, "hpx.parcel.lci.enable_in_buffer_assembly", 1);

if (!enable_send_immediate && enable_lci_backlog_queue)
{
Expand Down
17 changes: 8 additions & 9 deletions libs/full/parcelport_lci/src/parcelport_lci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ namespace hpx::parcelset::policies::lci {
if (hpx::this_thread::get_pool() ==
&hpx::resource::get_thread_pool("lci-progress-pool"))
{
std::size_t prg_thread_id =
hpx::get_local_worker_thread_num();
double rate = (double) config_t::ndevices /
config_t::progress_thread_num;
for (int i = prg_thread_id * rate;
i < (prg_thread_id + 1) * rate; ++i)
int prg_thread_id =
static_cast<int>(hpx::get_local_worker_thread_num());
HPX_ASSERT(prg_thread_id < config_t::progress_thread_num);
for (int i = prg_thread_id * config_t::ndevices /
config_t::progress_thread_num;
i < (prg_thread_id + 1) * config_t::ndevices /
config_t::progress_thread_num;
++i)
{
devices_to_progress.push_back(&devices[i]);
}
Expand Down Expand Up @@ -459,9 +461,6 @@ namespace hpx::parcelset::policies::lci {
hpx::threads::get_self_id() == hpx::threads::invalid_thread_id))
{
static thread_local unsigned int tls_rand_seed = rand();
util::lci_environment::log(
util::lci_environment::log_level_t::debug, "device",
"Rank %d unusual phase\n", LCI_RANK);
return devices[rand_r(&tls_rand_seed) % devices.size()];
}
if (tls_device_idx == std::size_t(-1))
Expand Down
6 changes: 3 additions & 3 deletions libs/full/parcelport_lci/src/sender_connection_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ namespace hpx::parcelset::policies::lci {
char buf[1024];
size_t consumed = 0;
consumed += snprintf(buf + consumed, sizeof(buf) - consumed,
"%d:%lf:send_connection(%p) start:%d:%d:%d:[", LCI_RANK,
"%d:%lf:send_connection(%p) start:%d:%d:%d:%d:[", LCI_RANK,
hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this,
header_.numbytes_nonzero_copy(), header_.numbytes_tchunk(),
header_.num_zero_copy_chunks());
dst_rank, header_.numbytes_nonzero_copy(),
header_.numbytes_tchunk(), header_.num_zero_copy_chunks());
HPX_ASSERT(sizeof(buf) > consumed);
for (int i = 0; i < header_.num_zero_copy_chunks(); ++i)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,24 @@ namespace hpx::parcelset::policies::lci {
postprocess_handler_ = HPX_MOVE(parcel_postprocess);

// build header
while (LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE);
header_ = header(
buffer_, (char*) header_buffer.address, header_buffer.length);
header_buffer.length = header_.size();
if (config_t::enable_in_buffer_assembly)
{
while (
LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE);
header_ = header(
buffer_, (char*) header_buffer.address, header_buffer.length);
header_buffer.length = header_.size();
}
else
{
header_buffer_vector.resize(
header::get_header_size(buffer_, LCI_MEDIUM_SIZE));
header_ =
header(buffer_, static_cast<char*>(header_buffer_vector.data()),
header_buffer_vector.size());
}
HPX_ASSERT((header_.num_zero_copy_chunks() == 0) ==
buffer_.transmission_chunks_.empty());
need_send_data = false;
Expand Down Expand Up @@ -85,6 +97,15 @@ namespace hpx::parcelset::policies::lci {
"Rank %d Wrap around!\n", LCI_RANK);
header_.set_device_idx(device_p->idx);
header_.set_tag(tag);
if (!config_t::enable_in_buffer_assembly)
{
while (
LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
memcpy(header_buffer.address, header_buffer_vector.data(),
header_buffer_vector.size());
header_buffer.length = header_buffer_vector.size();
}
send_chunks_idx = 0;
completion = nullptr;
segment_to_use = LCI_SEGMENT_ALL;
Expand Down
61 changes: 44 additions & 17 deletions tests/performance/network/pingpong_performance2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const std::size_t nsteps_default = 1;
const std::size_t window_default = 10000;
const std::size_t inject_rate_default = 0;
const std::size_t batch_size_default = 10;
const std::size_t nwarmups_default = 1;
const std::size_t niters_default = 1;

size_t window;
size_t inject_rate;
Expand Down Expand Up @@ -79,6 +81,7 @@ void on_recv(hpx::id_type to, std::vector<char> const& in, std::size_t counter)
if (result + 1 == window)
{
hpx::post<on_done_action>(hpx::find_root_locality());
done_counter = 0;
}
return;
}
Expand All @@ -104,13 +107,21 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
window = b_arg["window"].as<std::size_t>();
inject_rate = b_arg["inject-rate"].as<std::size_t>();
batch_size = b_arg["batch-size"].as<std::size_t>();
std::size_t const nwarmups = b_arg["nwarmups"].as<std::size_t>();
std::size_t const niters = b_arg["niters"].as<std::size_t>();

if (nsteps == 0)
{
std::cout << "nsteps is 0!" << std::endl;
return 0;
}

if (window == 0)
{
std::cout << "window is 0!" << std::endl;
return 0;
}

std::vector<hpx::id_type> localities = hpx::find_remote_localities();

hpx::id_type to;
Expand All @@ -126,32 +137,43 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
set_window_action act;
act(to, window);

hpx::chrono::high_resolution_timer timer_total;

for (size_t i = 0; i < window; i += batch_size)
double inject_time = 0;
double time = 0;
for (size_t j = 0; j < nwarmups + niters; ++j)
{
while (inject_rate > 0 &&
static_cast<double>(i) / timer_total.elapsed() >
static_cast<double>(inject_rate))
hpx::chrono::high_resolution_timer timer_total;

for (size_t i = 0; i < window; i += batch_size)
{
continue;
while (inject_rate > 0 &&
static_cast<double>(i) / timer_total.elapsed() >
static_cast<double>(inject_rate))
{
continue;
}
hpx::post<on_inject_action>(hpx::find_here(), to, nbytes, nsteps);
}
hpx::post<on_inject_action>(hpx::find_here(), to, nbytes, nsteps);
}
double achieved_inject_rate =
static_cast<double>(window) / timer_total.elapsed() / 1e3;
if (j >= nwarmups)
inject_time += timer_total.elapsed();

semaphore.wait();
semaphore.wait();
if (j >= nwarmups)
time += timer_total.elapsed();
}

double time = timer_total.elapsed();
double latency = time * 1e6 / static_cast<double>(nsteps);
double msg_rate = static_cast<double>(nsteps * window) / time / 1e3;
double achieved_inject_rate =
static_cast<double>(window * niters) / inject_time / 1e3;
double latency = time * 1e6 / static_cast<double>(nsteps * niters);
double msg_rate =
static_cast<double>(nsteps * window * niters) / time / 1e3;
double bandwidth =
static_cast<double>(nbytes * nsteps * window) / time / 1e6;
static_cast<double>(nbytes * nsteps * window * niters) / time / 1e6;
if (verbose)
{
std::cout << "[hpx_pingpong]" << std::endl
<< "total_time(secs)=" << time << std::endl
<< "nwarmups=" << nwarmups << std::endl
<< "niters=" << niters << std::endl
<< "nbytes=" << nbytes << std::endl
<< "window=" << window << std::endl
<< "latency(us)=" << latency << std::endl
Expand All @@ -165,6 +187,7 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
{
std::cout << "[hpx_pingpong]"
<< ":total_time(secs)=" << time << ":nbytes=" << nbytes
<< ":nwarmups=" << nwarmups << ":niters=" << niters
<< ":window=" << window << ":latency(us)=" << latency
<< ":inject_rate(K/s)=" << achieved_inject_rate
<< ":msg_rate(M/s)=" << msg_rate
Expand Down Expand Up @@ -192,7 +215,11 @@ int main(int argc, char* argv[])
po::value<std::size_t>()->default_value(inject_rate_default),
"the rate of injecting the first message of ping-pong")("batch-size",
po::value<std::size_t>()->default_value(batch_size_default),
"the number of messages to inject per inject thread")("verbose",
"the number of messages to inject per inject thread")("nwarmups",
po::value<std::size_t>()->default_value(nwarmups_default),
"the iteration count of warmup runs")("niters",
po::value<std::size_t>()->default_value(niters_default),
"the iteration count of measurement iterations.")("verbose",
po::value<bool>()->default_value(true),
"verbosity of output,if false output is for awk");

Expand Down

0 comments on commit b8cc430

Please sign in to comment.