diff --git a/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp b/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp index ea7e69202f58..cebec47403c6 100644 --- a/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp +++ b/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp @@ -175,7 +175,7 @@ namespace hpx::detail { } // only void* is allowed to be converted to uintptr_t - void* ptr = ::operator new(offset_to_data + sizeof(T) * capacity); + void* ptr = ::operator new(mem); if (nullptr == ptr) { throw std::bad_alloc(); @@ -319,9 +319,13 @@ namespace hpx::detail { { // indirect -> direct auto* storage = indirect(); - uninitialized_move_and_destroy( - storage->data(), direct_data(), storage->size()); - set_direct_and_size(storage->size()); + auto const data_size = storage->size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + storage->data(), direct_data(), data_size); + set_direct_and_size(data_size); + } detail::storage::dealloc(storage); } } @@ -332,16 +336,26 @@ namespace hpx::detail { if (is_direct()) { // direct -> indirect - uninitialized_move_and_destroy(data(), - storage->data(), size()); - storage->size(size()); + auto const data_size = size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + data(), storage->data(), + data_size); + storage->size(data_size); + } } else { // indirect -> indirect - uninitialized_move_and_destroy(data(), - storage->data(), size()); - storage->size(size()); + auto const data_size = size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + data(), storage->data(), + data_size); + storage->size(data_size); + } detail::storage::dealloc(indirect()); } set_indirect(storage); diff --git a/libs/core/futures/src/future_data.cpp b/libs/core/futures/src/future_data.cpp index a05f4d577be7..28d8eda865d3 100644 --- a/libs/core/futures/src/future_data.cpp +++ b/libs/core/futures/src/future_data.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2023 Hartmut Kaiser +// Copyright (c) 2015-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -22,13 +22,14 @@ #include #include -#include #include #include namespace hpx::lcos::detail { - static run_on_completed_error_handler_type run_on_completed_error_handler; + namespace { + run_on_completed_error_handler_type run_on_completed_error_handler; + } void set_run_on_completed_error_handler( run_on_completed_error_handler_type f) @@ -66,16 +67,12 @@ namespace hpx::lcos::detail { /////////////////////////////////////////////////////////////////////////// template - static void run_on_completed_on_new_thread(Callback&& f) + void run_on_completed_on_new_thread(Callback&& f) { lcos::local::futures_factory p(HPX_FORWARD(Callback, f)); - bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); + HPX_ASSERT(nullptr != hpx::threads::get_self_ptr()); hpx::launch policy = launch::fork; - if (!is_hpx_thread) - { - policy = launch::async; - } policy.set_priority(threads::thread_priority::boost); policy.set_stacksize(threads::thread_stacksize::current); @@ -84,17 +81,12 @@ namespace hpx::lcos::detail { threads::thread_id_ref_type const tid = //-V821 p.post("run_on_completed_on_new_thread", policy); - // wait for the task to run - if (is_hpx_thread) - { - // make sure this thread is executed last - this_thread::suspend( - threads::thread_schedule_state::pending, tid.noref()); - return p.get_future().get(); - } + // make sure this thread is executed last + this_thread::suspend( + threads::thread_schedule_state::pending, tid.noref()); - // If we are not on a HPX thread, we need to return immediately, to - // allow the newly spawned thread to execute. + // wait for the task to run + return p.get_future().get(); } /////////////////////////////////////////////////////////////////////////// @@ -124,15 +116,13 @@ namespace hpx::lcos::detail { } auto const state = this->state_.load(std::memory_order_acquire); - if (state != this->empty) + if (state != future_data_base::empty) { return false; } // this thread would block on the future - - auto* thrd = get_thread_id_data(runs_child); - HPX_UNUSED(thrd); // might be unused + [[maybe_unused]] auto* thrd = get_thread_id_data(runs_child); LTM_(debug).format("task_object::get_result_void: attempting to " "directly execute child({}), description({})", @@ -161,8 +151,6 @@ namespace hpx::lcos::detail { return false; } - static util::unused_type unused_; - util::unused_type* future_data_base::get_result_void( void const* storage, error_code& ec) @@ -190,6 +178,7 @@ namespace hpx::lcos::detail { if (s == value) { + static util::unused_type unused_; return &unused_; } @@ -232,12 +221,12 @@ namespace hpx::lcos::detail { hpx::scoped_annotation annotate(on_completed); HPX_MOVE(on_completed)(); }, - [&](std::exception_ptr ep) { + [&](std::exception_ptr const& ep) { // If the completion handler throws an exception, there's // nothing we can do, report the exception and terminate. if (run_on_completed_error_handler) { - run_on_completed_error_handler(HPX_MOVE(ep)); + run_on_completed_error_handler(ep); } else { @@ -272,7 +261,9 @@ namespace hpx::lcos::detail { cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH || (hpx::threads::get_self_ptr() == nullptr); #endif - if (!recurse_asynchronously) + + bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); + if (!is_hpx_thread || !recurse_asynchronously) { // directly execute continuation on this thread run_on_completed(HPX_FORWARD(Callback, on_completed)); @@ -289,17 +280,17 @@ namespace hpx::lcos::detail { run_on_completed_on_new_thread(util::deferred_call( p, HPX_FORWARD(Callback, on_completed))); }, - [&](std::exception_ptr ep) { + [&](std::exception_ptr const& ep) { // If an exception while creating the new task or inside the // completion handler is thrown, there is nothing we can do... // ... but terminate and report the error if (run_on_completed_error_handler) { - run_on_completed_error_handler(HPX_MOVE(ep)); + run_on_completed_error_handler(ep); } else { - std::rethrow_exception(HPX_MOVE(ep)); + std::rethrow_exception(ep); } }); } diff --git a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp index deeaeffc5e3b..8ee7ebf0a133 100644 --- a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp +++ b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp @@ -170,7 +170,7 @@ namespace hpx::lcos::local { protected: // Set the data which has to go into the segment \a which. template - bool set(std::size_t which, OuterLock outer_lock, F&& f, + bool set(std::size_t which, OuterLock& outer_lock, F&& f, error_code& ec = throws) { HPX_ASSERT_OWNS_LOCK(outer_lock); @@ -224,15 +224,12 @@ namespace hpx::lcos::local { std::decay_t>) { // invoke callback with the outer lock being held - HPX_FORWARD(F, f)(outer_lock, *this); + HPX_FORWARD(F, f)(outer_lock, *this, ec); } - outer_lock.unlock(); return true; } } - - outer_lock.unlock(); return false; } @@ -242,7 +239,7 @@ namespace hpx::lcos::local { { hpx::no_mutex mtx; std::unique_lock lk(mtx); - return set(which, HPX_MOVE(lk), HPX_FORWARD(F, f), ec); + return set(which, lk, HPX_FORWARD(F, f), ec); } protected: @@ -324,7 +321,8 @@ namespace hpx::lcos::local { public: template - std::size_t next_generation(Lock& l, std::size_t new_generation) + std::size_t next_generation( + Lock& l, std::size_t new_generation, error_code& ec = throws) { HPX_ASSERT_OWNS_LOCK(l); @@ -335,10 +333,11 @@ namespace hpx::lcos::local { if (new_generation < generation_) { l.unlock(); - HPX_THROW_EXCEPTION(hpx::error::invalid_status, + HPX_THROWS_IF(ec, hpx::error::invalid_status, "and_gate::next_generation", "sequencing error, new generational counter value too " "small"); + return generation_; } generation_ = new_generation; } @@ -351,10 +350,11 @@ namespace hpx::lcos::local { } std::size_t next_generation( - std::size_t new_generation = static_cast(-1)) + std::size_t new_generation = static_cast(-1), + error_code& ec = throws) { std::unique_lock l(mtx_); - return next_generation(l, new_generation); + return next_generation(l, new_generation, ec); } template @@ -441,11 +441,10 @@ namespace hpx::lcos::local { } template - bool set(std::size_t which, Lock l, F&& f = nullptr, + bool set(std::size_t which, Lock& l, F&& f = nullptr, error_code& ec = hpx::throws) { - return this->base_type::set( - which, HPX_MOVE(l), HPX_FORWARD(F, f), ec); + return this->base_type::set(which, l, HPX_FORWARD(F, f), ec); } template diff --git a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp index d4d06612f5ba..059f8184bc88 100644 --- a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp @@ -165,6 +165,12 @@ namespace hpx::collectives::detail { }; private: + std::size_t get_num_sites(std::size_t num_values) const noexcept + { + return num_values == static_cast(-1) ? num_sites_ : + num_values; + } + // re-initialize data template void reinitialize_data(std::size_t num_values) @@ -174,10 +180,8 @@ namespace hpx::collectives::detail { needs_initialization_ = false; data_available_ = false; - auto const new_size = - num_values == static_cast(-1) ? num_sites_ : - num_values; - auto* data = hpx::any_cast>(&data_); + auto const new_size = get_num_sites(num_values); + auto const* data = hpx::any_cast>(&data_); if (data == nullptr || data->size() < new_size) { data_ = std::vector(new_size); @@ -201,6 +205,7 @@ namespace hpx::collectives::detail { { needs_initialization_ = true; data_available_ = false; + on_ready_count_ = 0; } } @@ -212,8 +217,7 @@ namespace hpx::collectives::detail { auto sf = gate_.get_shared_future(l); traits::detail::get_shared_state(sf)->reserve_callbacks( - capacity == static_cast(-1) ? num_sites_ : - capacity); + get_num_sites(capacity)); auto fut = sf.then(hpx::launch::sync, HPX_FORWARD(F, f)); @@ -225,6 +229,27 @@ namespace hpx::collectives::detail { return fut; } + template + struct on_exit + { + explicit constexpr on_exit(F&& f_) noexcept + : f(HPX_MOVE(f_)) + { + } + + on_exit(on_exit const&) = delete; + on_exit(on_exit&&) = delete; + on_exit& operator=(on_exit const&) = delete; + on_exit& operator=(on_exit&&) = delete; + + ~on_exit() + { + f(); + } + + F f; + }; + // Step will be invoked under lock for each site that checks in (either // set or get). // @@ -239,6 +264,16 @@ namespace hpx::collectives::detail { shared_future&& f) mutable { f.get(); // propagate any exceptions + // It does not matter whether the lock will be acquired here. It + // either is still being held by the surrounding logic or is + // re-acquired here (if `on_ready` happens to run on a new + // thread asynchronously). + std::unique_lock l(mtx_, std::try_to_lock); + + // On exit, keep track of number of invocations of this + // callback. + on_exit _([this] { ++on_ready_count_; }); + if constexpr (!std::is_same_v>) { @@ -268,12 +303,34 @@ namespace hpx::collectives::detail { // Make sure next generation is enabled only after previous // generation has finished executing. - // - // set() consumes the lock - gate_.set( - which, HPX_MOVE(l), [this, generation](auto& l, auto& gate) { - gate.next_generation(l, generation); - this->invalidate_data(l); + gate_.set(which, l, + [this, generation, num_values]( + auto& l, auto& gate, error_code& ec) { + // This callback is invoked synchronously once for each + // collective operation after all data has been received and + // all (shared) futures were triggered. + + HPX_ASSERT_OWNS_LOCK(l); + + // Verify that all `on_ready` callbacks have finished + // executing at this point. + if (on_ready_count_ != get_num_sites(num_values)) + { + HPX_THROWS_IF(ec, hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, not all on_ready callbacks have " + "been invoked at the end of the collective " + "operation"); + return; + } + + // Reset communicator state before proceeding to the next + // generation. + invalidate_data(l); + + // Release threads possibly waiting for the next generation + // to be handled. + gate.next_generation(l, generation, ec); }); return f; @@ -281,7 +338,7 @@ namespace hpx::collectives::detail { // protect against vector idiosyncrasies template - static constexpr decltype(auto) handle_bool(Data&& data) + static constexpr decltype(auto) handle_bool(Data&& data) noexcept { if constexpr (std::is_same_v) { @@ -298,10 +355,11 @@ namespace hpx::collectives::detail { mutex_type mtx_; hpx::unique_any_nonser data_; - lcos::local::and_gate gate_; + hpx::lcos::local::and_gate gate_; std::size_t const num_sites_; - bool needs_initialization_; - bool data_available_; + std::size_t on_ready_count_ = 0; + bool needs_initialization_ = true; + bool data_available_ = false; }; } // namespace hpx::collectives::detail diff --git a/libs/full/collectives/src/create_communicator.cpp b/libs/full/collectives/src/create_communicator.cpp index 7f13d2a17b36..cd1d13f735b0 100644 --- a/libs/full/collectives/src/create_communicator.cpp +++ b/libs/full/collectives/src/create_communicator.cpp @@ -50,8 +50,6 @@ namespace hpx::collectives { communicator_server::communicator_server() noexcept //-V730 : num_sites_(0) - , needs_initialization_(false) - , data_available_(false) { HPX_ASSERT(false); // shouldn't ever be called } @@ -59,8 +57,6 @@ namespace hpx::collectives { communicator_server::communicator_server(std::size_t num_sites) noexcept : gate_(num_sites) , num_sites_(num_sites) - , needs_initialization_(true) - , data_available_(false) { HPX_ASSERT(num_sites != 0); }