Skip to content

Commit

Permalink
Merge pull request #310 from FelixPetriconi/Fix303
Browse files Browse the repository at this point in the history
Fix303
  • Loading branch information
FelixPetriconi authored Sep 4, 2020
2 parents fe2de9e + 1f83b84 commit 4083a1f
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 70 deletions.
180 changes: 111 additions & 69 deletions stlab/concurrency/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1127,26 +1127,37 @@ template <typename F, typename Args>
struct when_all_shared {
// decay
Args _args;
std::mutex _guard;
future<void> _holds[std::tuple_size<Args>::value]{};
std::atomic_size_t _remaining{std::tuple_size<Args>::value};
std::atomic_flag _error_happened = ATOMIC_FLAG_INIT;
std::size_t _remaining{std::tuple_size<Args>::value};
std::exception_ptr _exception;
packaged_task<> _f;

template <std::size_t index, typename FF>
void done(FF&& f) {
assign_ready_future<FF>::assign(std::get<index>(_args), std::forward<FF>(f));
if (--_remaining == 0) _f();
auto run{ false };
{
std::unique_lock lock{ _guard };
if (!_exception) {
assign_ready_future<FF>::assign(std::get<index>(_args), std::forward<FF>(f));
if (--_remaining == 0) run = true;
}
}
if (run) _f();
}

void failure(std::exception_ptr error) {
auto before = _error_happened.test_and_set();
if (before == false) {
for (auto& h : _holds)
h.reset();
_exception = std::move(error);
_f();
auto run{ false };
{
std::unique_lock lock{ _guard };
if (!_exception) {
for (auto& h : _holds)
h.reset();
_exception = std::move(error);
run = true;
}
}
if (run) _f();
}
};

Expand All @@ -1155,28 +1166,37 @@ struct when_any_shared {
using result_type = R;
// decay
stlab::optional<R> _arg;
std::mutex _guard;
future<void> _holds[S]{};
std::atomic_size_t _remaining{S};
std::atomic_flag _value_received = ATOMIC_FLAG_INIT;
std::size_t _remaining{S};
std::exception_ptr _exception;
size_t _index;
std::size_t _index = std::numeric_limits<std::size_t>::max();
packaged_task<> _f;

void failure(std::exception_ptr error) {
if (--_remaining == 0) {
_exception = std::move(error);
_f();
auto run{ false };
{
std::unique_lock lock{ _guard };
if (--_remaining == 0) {
_exception = std::move(error);
run = true;
}
}
if (run) _f();
}

template <size_t index, typename FF>
void done(FF&& f) {
auto before = _value_received.test_and_set();
if (before == false) {
_arg = std::move(*std::forward<FF>(f).get_try());
_index = index;
_f();
auto run{ false };
{
std::unique_lock lock{ _guard };
if (_index == std::numeric_limits<std::size_t>::max()) {
_arg = std::move(*std::forward<FF>(f).get_try());
_index = index;
run = true;
}
}
if (run) _f();
}

template <typename F>
Expand All @@ -1189,27 +1209,36 @@ template <size_t S>
struct when_any_shared<S, void> {
using result_type = void;
// decay
std::mutex _guard;
future<void> _holds[S]{};
std::atomic_size_t _remaining{S};
std::atomic_flag _value_received = ATOMIC_FLAG_INIT;
std::size_t _remaining{S};
std::exception_ptr _exception;
size_t _index;
std::size_t _index = std::numeric_limits<std::size_t>::max();
packaged_task<> _f;

void failure(std::exception_ptr error) {
if (--_remaining == 0) {
_exception = std::move(error);
_f();
auto run{ false };
std::unique_lock lock{ _guard };
{
if (--_remaining == 0) {
_exception = std::move(error);
run = true;
}
}
if (run) _f();
}

template <size_t index, typename FF>
void done(FF&&) {
auto before = _value_received.test_and_set();
if (before == false) {
_index = index;
_f();
auto run{ false };
{
std::unique_lock lock{ _guard };
if (_index == std::numeric_limits<std::size_t>::max()) {
_index = index;
run = true;
}
}
if (run) _f();
}

template <typename F>
Expand Down Expand Up @@ -1248,6 +1277,7 @@ auto apply_when_any_arg(F& f, P& p) {

template <std::size_t i, typename E, typename P, typename T>
void attach_when_arg_(E&& executor, std::shared_ptr<P>& p, T a) {
std::unique_lock lock{ p->_guard };
p->_holds[i] = std::move(a).recover(std::forward<E>(executor), [_w = std::weak_ptr<P>(p)](auto x) {
auto p = _w.lock();
if (!p) return;
Expand Down Expand Up @@ -1345,21 +1375,21 @@ namespace detail {
template <typename T>
struct value_storer {
template <typename C, typename F>
static void store(C& c, F&& f, size_t index) {
c._results = std::move(*std::forward<F>(f).get_try());
c._index = index;
static void store(C& context, F&& f, std::size_t index) {
context._results = std::move(*std::forward<F>(f).get_try());
context._index = index;
}
};

template <typename T>
struct value_storer<std::vector<T>> {
template <typename C, typename F>
static void store(C& c, F&& f, size_t index) {
c._results[index] = std::move(*std::forward<F>(f).get_try());
static void store(C& context, F&& f, std::size_t index) {
context._results[index] = std::move(*std::forward<F>(f).get_try());
}
};

template <bool Indxed, typename R>
template <bool Indexed, typename R>
struct result_creator;

template <>
Expand Down Expand Up @@ -1400,43 +1430,43 @@ struct context_result {

R _results;
std::exception_ptr _exception;
size_t _index;
std::size_t _index{0};
F _f;

context_result(F f, size_t s) : _index(0), _f(std::move(f)) { init(_results, s); }
context_result(F f, std::size_t s) : _f(std::move(f)) { init(_results, s); }

template <typename T>
void init(std::vector<T>& v, size_t s) {
void init(std::vector<T>& v, std::size_t s) {
v.resize(s);
}

template <typename T>
void init(T&, size_t) {}
void init(T&, std::size_t) {}

template <typename FF>
void apply(FF&& f, size_t index) {
void apply(FF&& f, std::size_t index) {
value_storer<R>::store(*this, std::forward<FF>(f), index);
}

void apply(std::exception_ptr error, size_t) { _exception = std::move(error); }
void apply(std::exception_ptr error, std::size_t) { _exception = std::move(error); }

auto operator()() { return result_creator<Indexed, R>::go(*this); }
};

template <typename F, bool Indexed>
struct context_result<F, Indexed, void> {
std::exception_ptr _exception;
size_t _index;
std::size_t _index{0};
F _f;

context_result(F f, size_t) : _f(std::move(f)) {}
context_result(F f, std::size_t) : _f(std::move(f)) {}

template <typename FF>
void apply(FF&&, size_t index) {
void apply(FF&&, std::size_t index) {
_index = index;
}

void apply(std::exception_ptr error, size_t) { _exception = std::move(error); }
void apply(std::exception_ptr error, std::size_t) { _exception = std::move(error); }

auto operator()() { return result_creator<Indexed, void>::go(*this); }
};
Expand All @@ -1445,24 +1475,26 @@ struct context_result<F, Indexed, void> {

/*
* This specialization is used for cases when only one ready future is enough to move forward.
* In case of when_any, the first successfull future triggers the continuation. All others are
* cancelled. In case of when_all, after the first error, this future cannot be fullfilled anymore
* In case of when_any, the first successful future triggers the continuation. All others are
* cancelled. In case of when_all, after the first error, this future cannot be fulfilled anymore
* and so we cancel the all the others.
*/
struct single_trigger {
template <typename C, typename F>
static void go(C& context, F&& f, size_t index) {
auto before = context._single_event_trigger.test_and_set();
if (!before) {
{
std::unique_lock guard(context._hold_guard);
static bool go(C& context, F&& f, size_t index) {
auto run{ false };
{
std::unique_lock lock{ context._guard };
if (!context._single_event) {
for (auto i = 0u; i < context._holds.size(); ++i) {
if (i != index) context._holds[i].reset();
}
context._single_event = true;
context.apply(std::forward<F>(f), index);
run = true;
}
context.apply(std::forward<F>(f), index);
context._f();
}
return run;
}
};

Expand All @@ -1474,25 +1506,35 @@ struct single_trigger {
*/
struct all_trigger {
template <typename C, typename F>
static void go(C& context, F&& f, size_t index) {
context.apply(std::forward<F>(f), index);
if (--context._remaining == 0) context._f();
static bool go(C& context, F&& f, size_t index) {
auto run{ false };
{
std::unique_lock lock{ context._guard };
context.apply(std::forward<F>(f), index);
if (--context._remaining == 0) run = true;
}
return run;
}

template <typename C>
static void go(C& context, std::exception_ptr error, size_t index) {
if (--context._remaining == 0) {
context.apply(std::move(error), index);
context._f();
static bool go(C& context, std::exception_ptr error, size_t index) {
auto run{ false };
{
std::unique_lock lock{ context._guard };
if (--context._remaining == 0) {
context.apply(std::move(error), index);
run = true;
}
}
return run;
}
};

template <typename CR, typename F, typename ResultCollector, typename FailureCollector>
struct common_context : CR {
std::atomic_size_t _remaining;
std::atomic_flag _single_event_trigger = ATOMIC_FLAG_INIT;
std::mutex _hold_guard;
std::mutex _guard;
std::size_t _remaining;
bool _single_event{false};
std::vector<future<void>> _holds;
packaged_task<> _f;

Expand All @@ -1506,20 +1548,20 @@ struct common_context : CR {
}

void failure(std::exception_ptr& error, size_t index) {
FailureCollector::go(*this, error, index);
if (FailureCollector::go(*this, error, index)) _f();
}

template <typename FF>
void done(FF&& f, size_t index) {
ResultCollector::go(*this, std::forward<FF>(f), index);
if (ResultCollector::go(*this, std::forward<FF>(f), index)) _f();
}
};

/**************************************************************************************************/

template <typename C, typename E, typename T>
void attach_tasks(size_t index, E executor, const std::shared_ptr<C>& context, T&& a) {
std::unique_lock guard(context->_hold_guard);
std::unique_lock guard(context->_guard);
context->_holds[index] =
std::move(a).recover(std::move(executor), [_context = make_weak_ptr(context), _i = index](auto x) {
auto p = _context.lock();
Expand Down
32 changes: 32 additions & 0 deletions test/future_when_all_range_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/future.hpp>
#include <stlab/concurrency/serial_queue.hpp>
#include <stlab/concurrency/utility.hpp>
#include <stlab/test/model.hpp>

Expand Down Expand Up @@ -73,6 +74,37 @@ BOOST_AUTO_TEST_CASE(future_when_all_void_range_with_one_element) {
BOOST_REQUIRE_LE(1, custom_scheduler<1>::usage_counter());
}

BOOST_AUTO_TEST_CASE(future_when_all_void_range_with_all_delayed) {
using namespace std::chrono_literals;

stlab::serial_queue_t seriel_queue_1 {stlab::default_executor};
stlab::serial_queue_t seriel_queue_2 {stlab::default_executor};
stlab::serial_queue_t seriel_queue_3 {stlab::default_executor};

std::vector<stlab::future<void>> test_futures;
test_futures.emplace_back(
stlab::async(seriel_queue_1.executor(), []{
std::this_thread::sleep_for(0.1s);})
);

test_futures.emplace_back(
stlab::async(seriel_queue_2.executor(), []{
std::this_thread::sleep_for(0.1s);})
);

test_futures.emplace_back(
stlab::async(seriel_queue_3.executor(), []{
std::this_thread::sleep_for(0.1s);})
);
bool done{false};
auto done_future = stlab::when_all(stlab::default_executor, [&done] { done = true;},
std::make_pair(test_futures.begin(), test_futures.end()));

stlab::blocking_get(done_future);

BOOST_REQUIRE(done);
}

BOOST_AUTO_TEST_CASE(future_when_all_void_range_with_many_elements) {
BOOST_TEST_MESSAGE("running future when_all void with range with many elements");
size_t p = 0;
Expand Down
1 change: 0 additions & 1 deletion test/sanitizer_suppressions
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
race:stlab::v1::detail::when_all_shared

race:stlab::v1::blocking_get

0 comments on commit 4083a1f

Please sign in to comment.