Skip to content

Commit

Permalink
Merge pull request #4 from bkietz/10183-async-csv
Browse files Browse the repository at this point in the history
simplify Loop slightly
  • Loading branch information
westonpace authored Feb 15, 2021
2 parents 4f0e764 + f096126 commit c29c19e
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions cpp/src/arrow/util/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,11 @@ class ARROW_MUST_USE_TYPE Future {
///
/// Returns true if a callback was actually added and false if the callback failed
/// to add because the future was marked complete.
template <typename OnComplete>
bool TryAddCallback(const std::function<OnComplete()>& callback_factory) const {
template <typename CallbackFactory>
bool TryAddCallback(const CallbackFactory& callback_factory) const {
return impl_->TryAddCallback([this, &callback_factory]() {
return Callback<OnComplete>{WeakFuture<T>(*this), callback_factory()};
return Callback<detail::result_of_t<CallbackFactory()>>{WeakFuture<T>(*this),
callback_factory()};
});
}

Expand Down Expand Up @@ -691,25 +692,22 @@ Future<BreakValueType> Loop(Iterate iterate) {

auto control_fut = iterate();
while (true) {
if (control_fut.is_finished()) {
// There's no need to AddCallback on a finished future; we can
// CheckForTermination now. This also avoids recursion and potential stack
// overflow.
if (CheckForTermination(control_fut.result())) return;

control_fut = iterate();
} else {
std::function<Callback()> callback_factory = [this]() { return *this; };
if (control_fut.TryAddCallback(callback_factory)) {
break;
}
// Else we tried to add a callback but someone had stolen in and marked the
// future finished so we can just resume iteration
if (control_fut.TryAddCallback([this]() { return *this; })) {
// Adding a callback succeeded; control_fut was not finished
// and we must wait to CheckForTermination.
return;
}
// Adding a callback failed; control_fut was finished and we
// can CheckForTermination immediately. This also avoids recursion and potential
// stack overflow.
if (CheckForTermination(control_fut.result())) return;

control_fut = iterate();
}
}

Iterate iterate;

// If the future returned by control_fut is never completed then we will be hanging on
// to break_fut forever even if the listener has given up listening on it. Instead we
// rely on the fact that a producer (the caller of Future<>::Make) is always
Expand Down

0 comments on commit c29c19e

Please sign in to comment.