From 5f0a8eefb3623a735d531df26586962ac069977c Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 22 Jan 2024 14:13:00 -0600 Subject: [PATCH] Fixing race in collective operations - adding test --- libs/core/futures/src/future_data.cpp | 2 +- .../include/hpx/lcos_local/and_gate.hpp | 2 +- .../hpx/collectives/detail/communicator.hpp | 53 +- .../collectives/tests/unit/CMakeLists.txt | 3 +- .../tests/unit/concurrent_collectives.cpp | 814 ++++++++++++++++++ 5 files changed, 849 insertions(+), 25 deletions(-) create mode 100644 libs/full/collectives/tests/unit/concurrent_collectives.cpp diff --git a/libs/core/futures/src/future_data.cpp b/libs/core/futures/src/future_data.cpp index 28d8eda865d3..c49b5a863c96 100644 --- a/libs/core/futures/src/future_data.cpp +++ b/libs/core/futures/src/future_data.cpp @@ -263,7 +263,7 @@ namespace hpx::lcos::detail { #endif bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); - if (!is_hpx_thread || !recurse_asynchronously) + if (is_hpx_thread && !recurse_asynchronously) { // directly execute continuation on this thread run_on_completed(HPX_FORWARD(Callback, on_completed)); diff --git a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp index 8ee7ebf0a133..96fe88bdd5a5 100644 --- a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp +++ b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp @@ -408,7 +408,7 @@ namespace hpx::lcos::local { // Note: This type is not thread-safe. It has to be protected from // concurrent access by different threads by the code using instances // of this type. - struct and_gate : public base_and_gate + struct and_gate : base_and_gate { private: using base_type = base_and_gate; diff --git a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp index a8821eea3c6f..0dc3ae6b4b0c 100644 --- a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp @@ -181,7 +181,8 @@ namespace hpx::collectives::detail { }; private: - std::size_t get_num_sites(std::size_t num_values) const noexcept + [[nodiscard]] constexpr std::size_t get_num_sites( + std::size_t num_values) const noexcept { return num_values == static_cast(-1) ? num_sites_ : num_values; @@ -231,19 +232,22 @@ namespace hpx::collectives::detail { std::size_t generation, std::size_t capacity, F&& f, Lock& l) { HPX_ASSERT_OWNS_LOCK(l); - auto sf = gate_.get_shared_future(l); - - traits::detail::get_shared_state(sf)->reserve_callbacks( - get_num_sites(capacity)); - - auto fut = sf.then(hpx::launch::sync, HPX_FORWARD(F, f)); + // Wait for the requested generation to be processed. gate_.synchronize(generation == static_cast(-1) ? gate_.generation(l) : generation, l); - return fut; + // Get future from gate only after synchronization as otherwise we + // may get a future returned that does not belong to the requested + // generation. + auto sf = gate_.get_shared_future(l); + + traits::detail::get_shared_state(sf)->reserve_callbacks( + get_num_sites(capacity)); + + return sf.then(hpx::launch::sync, HPX_FORWARD(F, f)); } template @@ -262,9 +266,16 @@ namespace hpx::collectives::detail { "collective operation {}, which {}, generation {}.", basename_, operation, which, generation); } - current_operation_ = operation; + + if (generation == static_cast(-1) || + generation == gate_.generation(l)) + { + current_operation_ = operation; + } + return true; } + return false; } @@ -284,6 +295,11 @@ namespace hpx::collectives::detail { // This callback will be invoked once for each participating // site after all sites have checked in. + // On exit, keep track of number of invocations of this + // callback. + auto on_exit = hpx::experimental::scope_exit( + [this] { ++on_ready_count_; }); + f.get(); // propagate any exceptions // It does not matter whether the lock will be acquired here. It @@ -315,7 +331,7 @@ namespace hpx::collectives::detail { l.unlock(); HPX_THROW_EXCEPTION(hpx::error::invalid_status, "communicator::handle_data::on_ready", - "communictor {}: sequencing error, an excessive " + "communicator {}: sequencing error, an excessive " "number of on_ready callbacks have been invoked before " "the end of the collective operation {}, which {}, " "generation {}. Expected count {}, received count {}.", @@ -323,11 +339,6 @@ namespace hpx::collectives::detail { on_ready_count_, num_sites_); } - // On exit, keep track of number of invocations of this - // callback. - auto on_exit = hpx::experimental::scope_exit( - [this] { ++on_ready_count_; }); - if constexpr (!std::is_same_v>) { @@ -338,8 +349,6 @@ namespace hpx::collectives::detail { else { HPX_UNUSED(this); - HPX_UNUSED(which); - HPX_UNUSED(generation); HPX_UNUSED(num_values); HPX_UNUSED(finalizer); } @@ -373,7 +382,7 @@ namespace hpx::collectives::detail { if constexpr (!std::is_same_v>) { - // call provided step function for each invocation site + // Call provided step function for each invocation site. HPX_FORWARD(Step, step)(access_data(num_values), which); } @@ -399,7 +408,7 @@ namespace hpx::collectives::detail { "been invoked at the end of the collective {} " "operation. Expected count {}, received count {}, " "which {}, generation {}.", - *operation, on_ready_count_, num_sites_, which, + operation, on_ready_count_, num_sites_, which, generation); return; } @@ -416,7 +425,7 @@ namespace hpx::collectives::detail { return f; } - // protect against vector idiosyncrasies + // Protect against vector idiosyncrasies. template static constexpr decltype(auto) handle_bool(Data&& data) noexcept { @@ -433,15 +442,15 @@ namespace hpx::collectives::detail { template friend struct hpx::traits::communication_operation; - mutex_type mtx_; hpx::unique_any_nonser data_; hpx::lcos::local::and_gate gate_; std::size_t const num_sites_; std::size_t on_ready_count_ = 0; char const* current_operation_ = nullptr; + char const* basename_ = nullptr; + mutex_type mtx_; bool needs_initialization_ = true; bool data_available_ = false; - char const* basename_ = nullptr; }; } // namespace hpx::collectives::detail diff --git a/libs/full/collectives/tests/unit/CMakeLists.txt b/libs/full/collectives/tests/unit/CMakeLists.txt index fbe375a17734..6e75b80c4909 100644 --- a/libs/full/collectives/tests/unit/CMakeLists.txt +++ b/libs/full/collectives/tests/unit/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023 Hartmut Kaiser +# Copyright (c) 2019-2024 Hartmut Kaiser # # SPDX-License-Identifier: BSL-1.0 # Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -23,6 +23,7 @@ if(HPX_WITH_NETWORKING) set(tests ${tests} broadcast_direct + concurrent_collectives exclusive_scan_ gather inclusive_scan_ diff --git a/libs/full/collectives/tests/unit/concurrent_collectives.cpp b/libs/full/collectives/tests/unit/concurrent_collectives.cpp new file mode 100644 index 000000000000..7e242d743b42 --- /dev/null +++ b/libs/full/collectives/tests/unit/concurrent_collectives.cpp @@ -0,0 +1,814 @@ +// Copyright (c) 2019-2024 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if !defined(HPX_COMPUTE_DEVICE_CODE) +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace hpx::collectives; + +constexpr char const* concurrent_basename = "/test/concurrent_communicator/"; +#if defined(HPX_DEBUG) +constexpr int ITERATIONS = 100; +#else +constexpr int ITERATIONS = 1000; +#endif +constexpr std::uint32_t num_sites = 10; + +std::atomic generation(0); + +double test_all_gather( + communicator const& comm, std::uint32_t num_localities, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + hpx::future> overall_result = + all_gather(comm, here + i, generation_arg(gen)); + + std::vector r = overall_result.get(); + HPX_TEST_EQ(r.size(), num_localities); + + for (std::size_t j = 0; j != r.size(); ++j) + { + HPX_TEST_EQ(r[j], j + i); + } + } + + return t.elapsed(); +} + +double test_all_reduce( + communicator const& comm, std::uint32_t num_localities, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + hpx::future overall_result = all_reduce( + comm, here + i, std::plus{}, generation_arg(gen)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result.get()); + } + + return t.elapsed(); +} + +double test_all_to_all( + communicator const& comm, std::uint32_t num_localities, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + std::vector values(num_localities); + std::fill(values.begin(), values.end(), here + i); + + hpx::future> overall_result = + all_to_all(comm, std::move(values), generation_arg(gen)); + + std::vector r = overall_result.get(); + + HPX_TEST_EQ(r.size(), num_localities); + + for (std::size_t j = 0; j != r.size(); ++j) + { + HPX_TEST_EQ(r[j], j + i); + } + } + + return t.elapsed(); +} + +double test_broadcast(communicator const& comm, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + if (here == 0) + { + hpx::future result = + broadcast_to(comm, i + 42, generation_arg(gen)); + + HPX_TEST_EQ(i + 42, result.get()); + } + else + { + hpx::future result = + hpx::collectives::broadcast_from( + comm, generation_arg(gen)); + + HPX_TEST_EQ(i + 42, result.get()); + } + } + + return t.elapsed(); +} + +double test_exclusive_scan(communicator const& comm, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + hpx::future overall_result = + exclusive_scan(comm, here + i, std::plus<>{}, generation_arg(gen)); + + std::uint32_t sum = i; + for (std::uint32_t j = 0; j < here; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result.get()); + } + + return t.elapsed(); +} + +double test_gather(communicator const& comm, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + if (here == 0) + { + hpx::future> overall_result = + gather_here(comm, 42 + i, generation_arg(gen)); + + std::vector sol = overall_result.get(); + for (std::size_t j = 0; j != sol.size(); ++j) + { + HPX_TEST(j + 42 + i == sol[j]); + } + } + else + { + hpx::future overall_result = + gather_there(comm, here + 42 + i, generation_arg(gen)); + overall_result.get(); + } + } + + return t.elapsed(); +} + +double test_inclusive_scan(communicator const& comm, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + hpx::future overall_result = inclusive_scan( + comm, here + i, std::plus{}, generation_arg(gen)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != here + 1; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result.get()); + } + + return t.elapsed(); +} + +double test_reduce( + communicator const& comm, std::uint32_t num_localities, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + std::uint32_t value = here + i; + if (here == 0) + { + hpx::future overall_result = reduce_here( + comm, std::move(value), std::plus<>{}, generation_arg(gen)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result.get()); + } + else + { + hpx::future overall_result = + reduce_there(comm, std::move(value), generation_arg(gen)); + overall_result.get(); + } + } + + return t.elapsed(); +} + +double test_scatter( + communicator const& comm, std::uint32_t num_localities, std::uint32_t here) +{ + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + std::size_t const gen = ++generation; + + if (here == 0) + { + std::vector data(num_localities); + std::iota(data.begin(), data.end(), 42 + i); + + hpx::future result = + scatter_to(comm, std::move(data), generation_arg(gen)); + + HPX_TEST_EQ(i + 42 + here, result.get()); + } + else + { + hpx::future result = + scatter_from(comm, generation_arg(gen)); + + HPX_TEST_EQ(i + 42 + here, result.get()); + } + } + + return t.elapsed(); +} + +//////////////////////////////////////////////////////////////////////////////// +double test_local_all_gather(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site, i] { + hpx::chrono::high_resolution_timer const t; + + auto const value = site; + + hpx::future> overall_result = + all_gather(comms[site], value + i, this_site_arg(site), + generation_arg(gen)); + + std::vector const r = overall_result.get(); + HPX_TEST_EQ(r.size(), num_sites); + + for (std::size_t j = 0; j != r.size(); ++j) + { + HPX_TEST_EQ(r[j], j + i); + } + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_all_reduce(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site] { + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + auto value = site; + + hpx::future result = + all_reduce(comms[site], value, std::plus<>{}, + this_site_arg(site), generation_arg(gen)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_sites; ++j) + { + sum += j; + } + + HPX_TEST_EQ(sum, result.get()); + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_all_to_all(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site]() { + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + auto value = site; + + hpx::future> overall_result = + all_gather(comms[site], value, this_site_arg(value), + generation_arg(gen)); + + std::vector const r = overall_result.get(); + HPX_TEST_EQ(r.size(), num_sites); + + for (std::size_t j = 0; j != r.size(); ++j) + { + HPX_TEST_EQ(r[j], j); + } + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_broadcast(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site]() { + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + if (site == 0) + { + hpx::future result = + broadcast_to(comms[site], 42 + i, this_site_arg(site), + generation_arg(gen)); + + HPX_TEST_EQ(42 + i, result.get()); + } + else + { + hpx::future result = + hpx::collectives::broadcast_from( + comms[site], this_site_arg(site), + generation_arg(gen)); + + HPX_TEST_EQ(42 + i, result.get()); + } + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_exclusive_scan(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site]() { + hpx::chrono::high_resolution_timer const t; + + hpx::future overall_result = + exclusive_scan(comms[site], site + i, std::plus<>{}, + this_site_arg(site), generation_arg(gen)); + + auto const result = overall_result.get(); + + std::uint32_t sum = i; + for (std::uint32_t j = 0; j != site; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, result); + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_gather(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site]() { + hpx::chrono::high_resolution_timer const t; + + if (site == 0) + { + hpx::future> overall_result = + gather_here(comms[site], 42 + i, generation_arg(gen), + this_site_arg(site)); + + std::vector const sol = overall_result.get(); + for (std::size_t j = 0; j != sol.size(); ++j) + { + HPX_TEST(j + 42 + i == sol[j]); + } + } + else + { + hpx::future overall_result = + gather_there(comms[site], site + 42 + i, + generation_arg(gen), this_site_arg(site)); + overall_result.get(); + } + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_inclusive_scan(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site]() { + hpx::chrono::high_resolution_timer const t; + + hpx::future overall_result = + inclusive_scan(comms[site], site + i, std::plus<>{}, + this_site_arg(site), generation_arg(gen)); + + auto const result = overall_result.get(); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != site + 1; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, result); + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_reduce(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site]() { + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + auto value = site + i; + if (site == 0) + { + hpx::future overall_result = reduce_here( + comms[site], std::move(value), std::plus<>{}, + generation_arg(gen), this_site_arg(site)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_sites; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result.get()); + } + else + { + hpx::future overall_result = + reduce_there(comms[site], std::move(value), + generation_arg(gen), this_site_arg(site)); + overall_result.get(); + } + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +double test_local_scatter(std::vector const& comms) +{ + double elapsed = 0.; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + std::vector> sites; + sites.reserve(num_sites); + + auto const gen = ++generation; + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([&, site] { + hpx::chrono::high_resolution_timer const t; + + if (site == 0) + { + std::vector data(num_sites); + std::iota(data.begin(), data.end(), 42 + i); + + hpx::future result = + scatter_to(comms[site], std::move(data), + generation_arg(gen), this_site_arg(site)); + + HPX_TEST_EQ(i + 42 + site, result.get()); + } + else + { + hpx::future result = + scatter_from(comms[site], + generation_arg(gen), this_site_arg(site)); + + HPX_TEST_EQ(i + 42 + site, result.get()); + } + + if (site == 0) + { + elapsed += t.elapsed(); + } + })); + } + + hpx::wait_all(std::move(sites)); + } + + return elapsed; +} + +//////////////////////////////////////////////////////////////////////////////// +int hpx_main() +{ + std::uint32_t const here = hpx::get_locality_id(); + +#if defined(HPX_HAVE_NETWORKING) + if (hpx::get_num_localities(hpx::launch::sync) > 1) + { + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const concurrent_comm = create_communicator(concurrent_basename, + num_sites_arg(num_localities), this_site_arg(here)); + + auto f1 = + hpx::async(&test_all_gather, concurrent_comm, num_localities, here); + auto f2 = + hpx::async(&test_all_reduce, concurrent_comm, num_localities, here); + auto f3 = + hpx::async(&test_all_to_all, concurrent_comm, num_localities, here); + auto f4 = hpx::async(&test_broadcast, concurrent_comm, here); + auto f5 = hpx::async(&test_exclusive_scan, concurrent_comm, here); + auto f6 = hpx::async(&test_gather, concurrent_comm, here); + auto f7 = hpx::async(&test_inclusive_scan, concurrent_comm, here); + auto f8 = + hpx::async(&test_reduce, concurrent_comm, num_localities, here); + auto f9 = + hpx::async(&test_scatter, concurrent_comm, num_localities, here); + + hpx::wait_all(f1, f2, f3, f4, f5, f6, f7, f8, f9); + + if (here == 0) + { + std::cout << "remote all_gather timing: " + << f1.get() / ITERATIONS << " [s]\n"; + std::cout << "remote all_reduce timing: " + << f2.get() / ITERATIONS << " [s]\n"; + std::cout << "remote all_to_all timing: " + << f3.get() / ITERATIONS << " [s]\n"; + std::cout << "remote broadcast timing: " + << f4.get() / ITERATIONS << " [s]\n"; + std::cout << "remote exclusive_scan timing: " + << f5.get() / ITERATIONS << " [s]\n"; + std::cout << "remote gather timing: " + << f6.get() / ITERATIONS << " [s]\n"; + std::cout << "remote inclusive_scan timing: " + << f7.get() / ITERATIONS << " [s]\n"; + std::cout << "remote reduce timing: " + << f8.get() / ITERATIONS << " [s]\n"; + std::cout << "remote scatter timing: " + << f9.get() / ITERATIONS << " [s]\n"; + } + } +#endif + + if (here == 0) + { + generation = 0; + + std::vector concurrent_comms; + concurrent_comms.reserve(num_sites); + + for (std::uint32_t site = 0; site != num_sites; ++site) + { + concurrent_comms.push_back( + create_local_communicator(concurrent_basename, + num_sites_arg(num_sites), this_site_arg(site))); + } + + auto f1 = hpx::async(&test_local_all_gather, concurrent_comms); + auto f2 = hpx::async(&test_local_all_reduce, concurrent_comms); + auto f3 = hpx::async(&test_local_all_to_all, concurrent_comms); + auto f4 = hpx::async(&test_local_broadcast, concurrent_comms); + auto f5 = hpx::async(&test_local_exclusive_scan, concurrent_comms); + auto f6 = hpx::async(&test_local_gather, concurrent_comms); + auto f7 = hpx::async(&test_local_inclusive_scan, concurrent_comms); + auto f8 = hpx::async(&test_local_reduce, concurrent_comms); + auto f9 = hpx::async(&test_local_scatter, concurrent_comms); + + hpx::wait_all(f1, f2, f3, f4, f5, f6, f7, f8, f9); + + std::cout << "local all_gather timing: " + << f1.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local all_reduce timing: " + << f2.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local all_to_all timing: " + << f3.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local broadcast timing: " + << f4.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local exclusive_scan timing: " + << f5.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local gather timing: " + << f6.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local inclusive_scan timing: " + << f7.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local reduce timing: " + << f8.get() / (10 * ITERATIONS) << " [s]\n"; + std::cout << "local scatter timing: " + << f9.get() / (10 * ITERATIONS) << " [s]\n"; + } + + return hpx::finalize(); +} + +int main(int argc, char* argv[]) +{ + std::vector const cfg = {"hpx.run_hpx_main!=1"}; + + hpx::init_params init_args; + init_args.cfg = cfg; + + HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0); + return hpx::util::report_errors(); +} + +#endif