-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathCollectNSucceeded-inl.h
69 lines (57 loc) · 2.26 KB
/
CollectNSucceeded-inl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once
#include <folly/ExceptionWrapper.h>
namespace nebula {
// Not thread-safe, all futures need to be on the same executor
template <class FutureIter, typename ResultEval>
folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(FutureIter first,
FutureIter last,
size_t n,
ResultEval&& eval) {
using Result = SucceededResultList<FutureIter>;
if (n == 0) {
return folly::Future<Result>(Result());
}
struct Context {
Context(size_t total, ResultEval&& e) : eval(std::forward<ResultEval>(e)), nTotal(total) {}
ResultEval eval;
Result results;
std::atomic<size_t> numCompleted = {0};
std::atomic<size_t> nSucceeded = {0};
folly::Promise<Result> promise;
size_t nTotal;
};
size_t total = size_t(std::distance(first, last));
DCHECK_GE(total, 0U);
if (total < n) {
return folly::Future<Result>(
folly::exception_wrapper(std::runtime_error("Not enough futures")));
}
auto ctx = std::make_shared<Context>(total, std::forward<ResultEval>(eval));
// for each succeeded Future, add to the result list, until
// we have required number of futures, at which point we fulfil
// the promise with the result list
for (size_t index = 0; first != last; ++first, ++index) {
first->setCallback_([n, ctx, index](auto, folly::Try<FutureReturnType<FutureIter>>&& t) {
if (!ctx->promise.isFulfilled()) {
if (!t.hasException()) {
if (ctx->eval(index, t.value())) {
++ctx->nSucceeded;
}
ctx->results.emplace_back(index, std::move(t.value()));
}
if ((++ctx->numCompleted) == ctx->nTotal || ctx->nSucceeded == n) {
// Done
VLOG(2) << "Set Value [completed=" << ctx->numCompleted << ", total=" << ctx->nTotal
<< ", Result list size=" << ctx->results.size() << "]";
ctx->promise.setValue(std::move(ctx->results));
}
}
});
}
return ctx->promise.getFuture();
}
} // namespace nebula