Skip to content

Commit

Permalink
Tightening up collective operation semantics
Browse files Browse the repository at this point in the history
- flyby: small_vector tweaks
  • Loading branch information
hkaiser committed Jan 10, 2024
1 parent 7b35448 commit 8790cf8
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ namespace hpx::detail {
}

// only void* is allowed to be converted to uintptr_t
void* ptr = ::operator new(offset_to_data + sizeof(T) * capacity);
void* ptr = ::operator new(mem);
if (nullptr == ptr)
{
throw std::bad_alloc();
Expand Down Expand Up @@ -319,9 +319,13 @@ namespace hpx::detail {
{
// indirect -> direct
auto* storage = indirect();
uninitialized_move_and_destroy(
storage->data(), direct_data(), storage->size());
set_direct_and_size(storage->size());
auto const data_size = storage->size();
if (data_size != 0)
{
uninitialized_move_and_destroy(
storage->data(), direct_data(), data_size);
set_direct_and_size(data_size);
}
detail::storage<T>::dealloc(storage);
}
}
Expand All @@ -332,16 +336,26 @@ namespace hpx::detail {
if (is_direct())
{
// direct -> indirect
uninitialized_move_and_destroy(data<direction::direct>(),
storage->data(), size<direction::direct>());
storage->size(size<direction::direct>());
auto const data_size = size<direction::direct>();
if (data_size != 0)
{
uninitialized_move_and_destroy(
data<direction::direct>(), storage->data(),
data_size);
storage->size(data_size);
}
}
else
{
// indirect -> indirect
uninitialized_move_and_destroy(data<direction::indirect>(),
storage->data(), size<direction::indirect>());
storage->size(size<direction::indirect>());
auto const data_size = size<direction::indirect>();
if (data_size != 0)
{
uninitialized_move_and_destroy(
data<direction::indirect>(), storage->data(),
data_size);
storage->size(data_size);
}
detail::storage<T>::dealloc(indirect());
}
set_indirect(storage);
Expand Down
53 changes: 22 additions & 31 deletions libs/core/futures/src/future_data.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2023 Hartmut Kaiser
// Copyright (c) 2015-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -22,13 +22,14 @@

#include <cstddef>
#include <exception>
#include <functional>
#include <mutex>
#include <utility>

namespace hpx::lcos::detail {

static run_on_completed_error_handler_type run_on_completed_error_handler;
namespace {
run_on_completed_error_handler_type run_on_completed_error_handler;
}

void set_run_on_completed_error_handler(
run_on_completed_error_handler_type f)
Expand Down Expand Up @@ -66,16 +67,12 @@ namespace hpx::lcos::detail {

///////////////////////////////////////////////////////////////////////////
template <typename Callback>
static void run_on_completed_on_new_thread(Callback&& f)
void run_on_completed_on_new_thread(Callback&& f)
{
lcos::local::futures_factory<void()> p(HPX_FORWARD(Callback, f));

bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr();
HPX_ASSERT(nullptr != hpx::threads::get_self_ptr());
hpx::launch policy = launch::fork;
if (!is_hpx_thread)
{
policy = launch::async;
}

policy.set_priority(threads::thread_priority::boost);
policy.set_stacksize(threads::thread_stacksize::current);
Expand All @@ -84,17 +81,12 @@ namespace hpx::lcos::detail {
threads::thread_id_ref_type const tid = //-V821
p.post("run_on_completed_on_new_thread", policy);

// wait for the task to run
if (is_hpx_thread)
{
// make sure this thread is executed last
this_thread::suspend(
threads::thread_schedule_state::pending, tid.noref());
return p.get_future().get();
}
// make sure this thread is executed last
this_thread::suspend(
threads::thread_schedule_state::pending, tid.noref());

// If we are not on a HPX thread, we need to return immediately, to
// allow the newly spawned thread to execute.
// wait for the task to run
return p.get_future().get();
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -124,15 +116,13 @@ namespace hpx::lcos::detail {
}

auto const state = this->state_.load(std::memory_order_acquire);
if (state != this->empty)
if (state != future_data_base::empty)
{
return false;
}

// this thread would block on the future

auto* thrd = get_thread_id_data(runs_child);
HPX_UNUSED(thrd); // might be unused
[[maybe_unused]] auto* thrd = get_thread_id_data(runs_child);

LTM_(debug).format("task_object::get_result_void: attempting to "
"directly execute child({}), description({})",
Expand Down Expand Up @@ -161,8 +151,6 @@ namespace hpx::lcos::detail {
return false;
}

static util::unused_type unused_;

util::unused_type*
future_data_base<traits::detail::future_data_void>::get_result_void(
void const* storage, error_code& ec)
Expand Down Expand Up @@ -190,6 +178,7 @@ namespace hpx::lcos::detail {

if (s == value)
{
static util::unused_type unused_;
return &unused_;
}

Expand Down Expand Up @@ -232,12 +221,12 @@ namespace hpx::lcos::detail {
hpx::scoped_annotation annotate(on_completed);
HPX_MOVE(on_completed)();
},
[&](std::exception_ptr ep) {
[&](std::exception_ptr const& ep) {
// If the completion handler throws an exception, there's
// nothing we can do, report the exception and terminate.
if (run_on_completed_error_handler)
{
run_on_completed_error_handler(HPX_MOVE(ep));
run_on_completed_error_handler(ep);
}
else
{
Expand Down Expand Up @@ -272,7 +261,9 @@ namespace hpx::lcos::detail {
cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH ||
(hpx::threads::get_self_ptr() == nullptr);
#endif
if (!recurse_asynchronously)

bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr();
if (!is_hpx_thread || !recurse_asynchronously)
{
// directly execute continuation on this thread
run_on_completed(HPX_FORWARD(Callback, on_completed));
Expand All @@ -289,17 +280,17 @@ namespace hpx::lcos::detail {
run_on_completed_on_new_thread(util::deferred_call(
p, HPX_FORWARD(Callback, on_completed)));
},
[&](std::exception_ptr ep) {
[&](std::exception_ptr const& ep) {
// If an exception while creating the new task or inside the
// completion handler is thrown, there is nothing we can do...
// ... but terminate and report the error
if (run_on_completed_error_handler)
{
run_on_completed_error_handler(HPX_MOVE(ep));
run_on_completed_error_handler(ep);
}
else
{
std::rethrow_exception(HPX_MOVE(ep));
std::rethrow_exception(ep);
}
});
}
Expand Down
25 changes: 12 additions & 13 deletions libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ namespace hpx::lcos::local {
protected:
// Set the data which has to go into the segment \a which.
template <typename OuterLock, typename F>
bool set(std::size_t which, OuterLock outer_lock, F&& f,
bool set(std::size_t which, OuterLock& outer_lock, F&& f,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(outer_lock);
Expand Down Expand Up @@ -224,15 +224,12 @@ namespace hpx::lcos::local {
std::decay_t<F>>)
{
// invoke callback with the outer lock being held
HPX_FORWARD(F, f)(outer_lock, *this);
HPX_FORWARD(F, f)(outer_lock, *this, ec);
}

outer_lock.unlock();
return true;
}
}

outer_lock.unlock();
return false;
}

Expand All @@ -242,7 +239,7 @@ namespace hpx::lcos::local {
{
hpx::no_mutex mtx;
std::unique_lock<hpx::no_mutex> lk(mtx);
return set(which, HPX_MOVE(lk), HPX_FORWARD(F, f), ec);
return set(which, lk, HPX_FORWARD(F, f), ec);
}

protected:
Expand Down Expand Up @@ -324,7 +321,8 @@ namespace hpx::lcos::local {

public:
template <typename Lock>
std::size_t next_generation(Lock& l, std::size_t new_generation)
std::size_t next_generation(
Lock& l, std::size_t new_generation, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(l);

Expand All @@ -335,10 +333,11 @@ namespace hpx::lcos::local {
if (new_generation < generation_)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::error::invalid_status,
HPX_THROWS_IF(ec, hpx::error::invalid_status,
"and_gate::next_generation",
"sequencing error, new generational counter value too "
"small");
return generation_;
}
generation_ = new_generation;
}
Expand All @@ -351,10 +350,11 @@ namespace hpx::lcos::local {
}

std::size_t next_generation(
std::size_t new_generation = static_cast<std::size_t>(-1))
std::size_t new_generation = static_cast<std::size_t>(-1),
error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);
return next_generation(l, new_generation);
return next_generation(l, new_generation, ec);
}

template <typename Lock>
Expand Down Expand Up @@ -441,11 +441,10 @@ namespace hpx::lcos::local {
}

template <typename Lock, typename F = std::nullptr_t>
bool set(std::size_t which, Lock l, F&& f = nullptr,
bool set(std::size_t which, Lock& l, F&& f = nullptr,
error_code& ec = hpx::throws)
{
return this->base_type::set(
which, HPX_MOVE(l), HPX_FORWARD(F, f), ec);
return this->base_type::set(which, l, HPX_FORWARD(F, f), ec);
}

template <typename Lock>
Expand Down
Loading

0 comments on commit 8790cf8

Please sign in to comment.