Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

edf: adding a construction option with a given number of pre-picks #31592

Merged
merged 13 commits into from
Jan 25, 2024
110 changes: 110 additions & 0 deletions source/common/upstream/edf_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ namespace Upstream {
// weights and an O(log n) pick time.
template <class C> class EdfScheduler : public Scheduler<C> {
public:
EdfScheduler() {}

// See scheduler.h for an explanation of each public method.
std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
std::shared_ptr<C> ret = popEntry();
Expand Down Expand Up @@ -64,7 +66,110 @@ template <class C> class EdfScheduler : public Scheduler<C> {

bool empty() const override { return queue_.empty(); }

// Creates an EdfScheduler with the given weights and their corresponding
htuch marked this conversation as resolved.
Show resolved Hide resolved
// entries, and emulating a number of initial picks to be performed. Note that
// the internal state of the scheduler will be very similar to creating an empty
// scheduler, adding the entries one after the other, and then performing
// "picks" pickAndAdd operation without modifying the entries' weights.
// The only thing that may be different is that entries with the same weight
// may be chosen a bit differently (the order_offset_ values may be different).
// Breaking the ties of same weight entries will be kept in future picks from
// the scheduler.
static EdfScheduler<C> createWithPicks(const std::vector<std::shared_ptr<C>>& entries,
std::function<double(const C&)> calculate_weight,
uint32_t picks) {
// Limiting the number of picks, as over 400M picks should be sufficient
// for most scenarios.
picks = picks % 429496729; // % UINT_MAX/10
EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks);
// Assume no non-positive weights.
ASSERT(std::none_of(entries.cbegin(), entries.cend(),
[&calculate_weight](const std::shared_ptr<C>& entry) {
return calculate_weight(*entry) <= 0;
}));

// Nothing to do if there are no entries.
if (entries.size() == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you refresh my memory on whether we already have an optimization here if all weights are equal somewhere earlier in the call chain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typical answer is "it depends":

return EdfScheduler<C>();
}

// Augment the weight computation to add some epsilon to each entry's
adisuissa marked this conversation as resolved.
Show resolved Hide resolved
// weight to avoid cases where weights are multiplies of each other. For
// example if there are 2 weights: 25 and 75, and picks=23, then the
// floor_picks will be {5, 17} (respectively), and the deadlines will be
htuch marked this conversation as resolved.
Show resolved Hide resolved
// {0.24000000000000002 and 0.24} (respectively). This small difference will
// cause a "wrong" pick compared to when starting from an empty scheduler
// and picking 23 times. Adding a small value to each weight circumvents
// this problem. This was added as a result of the following comment:
// https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769.
auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double {
return calculate_weight(entry) + 1e-13;
};

// Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0),
// W = sum(w_i), and P be the number of times to "pick" from the scheduler.
// Let p'_i = floor(P * w_i/W), then the number of times each entry is being
// picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N.
//
// The following code does P picks, by first emulating p'_i picks for each
// entry, and then executing the leftover P - sum(p'_i) picks.
double weights_sum = std::accumulate(
entries.cbegin(), entries.cend(), 0.0,
[&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) {
return sum_so_far + aug_calculate_weight(*entry);
});
std::vector<uint32_t> floor_picks;
floor_picks.reserve(entries.size());
std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks),
[picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) {
// Getting the lower-bound by casting to an integer.
return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks /
weights_sum);
});

// Pre-compute the priority-queue entries to use an O(N) initialization c'tor.
std::vector<EdfEntry> scheduler_entries;
scheduler_entries.reserve(entries.size());
uint32_t picks_so_far = 0;
double max_pick_time = 0.0;
// Emulate a per-entry addition to a deadline that is applicable to N picks.
for (size_t i = 0; i < entries.size(); ++i) {
// Add the entry with p'_i picks. As there were p'_i picks, the entry's
// next deadline is (p'_i + 1) / w_i.
const double weight = aug_calculate_weight(*entries[i]);
// While validating the algorithm there were a few cases where the math
// and floating-point arithmetic did not agree (specifically floor(A*B)
// was greater than A*B). The following if statement solves the problem by
// reducing floor-picks for the entry, which may result in more iterations
// in the code after the loop.
if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) {
floor_picks[i]--;
}
const double pick_time = floor_picks[i] / weight;
const double deadline = (floor_picks[i] + 1) / weight;
EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.",
static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight);
scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]});
max_pick_time = std::max(max_pick_time, pick_time);
picks_so_far += floor_picks[i];
}
// The scheduler's current_time_ needs to be the largest time that some entry was picked.
EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size());
ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_);

// Left to do some picks, execute them one after the other.
EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step",
picks_so_far, picks - picks_so_far);
while (picks_so_far < picks) {
scheduler.pickAndAdd(calculate_weight);
picks_so_far++;
}
return scheduler;
}

private:
friend class EdfSchedulerTest;

/**
* Clears expired entries and pops the next unexpired entry in the queue.
*/
Expand Down Expand Up @@ -106,6 +211,11 @@ template <class C> class EdfScheduler : public Scheduler<C> {
}
};

EdfScheduler(std::vector<EdfEntry>&& scheduler_entries, double current_time,
uint32_t order_offset)
: current_time_(current_time), order_offset_(order_offset),
queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {}

// Current time in EDF scheduler.
// TODO(htuch): Is it worth the small extra complexity to use integer time for performance
// reasons?
Expand Down
5 changes: 4 additions & 1 deletion test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ envoy_cc_test(
envoy_cc_test(
name = "edf_scheduler_test",
srcs = ["edf_scheduler_test.cc"],
deps = ["//source/common/upstream:scheduler_lib"],
deps = [
"//source/common/upstream:scheduler_lib",
"//test/test_common:utility_lib",
],
)

envoy_cc_test_library(
Expand Down
198 changes: 189 additions & 9 deletions test/common/upstream/edf_scheduler_test.cc
Original file line number Diff line number Diff line change
@@ -1,19 +1,60 @@
#include "source/common/upstream/edf_scheduler.h"

#include "test/test_common/test_random_generator.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Upstream {
namespace {

TEST(EdfSchedulerTest, Empty) {
class EdfSchedulerTest : public testing::Test {
public:
template <typename T>
static void compareEdfSchedulers(EdfScheduler<T>& scheduler1, EdfScheduler<T>& scheduler2) {
// Compares that the given EdfSchedulers internal queues are equal up
// (ignoring the order_offset_ values).
EXPECT_EQ(scheduler1.queue_.size(), scheduler2.queue_.size());
// Cannot iterate over std::priority_queue directly, so need to copy the
// contents to a vector first.
auto copyFunc = [](EdfScheduler<T>& scheduler) {
std::vector<typename EdfScheduler<T>::EdfEntry> result;
result.reserve(scheduler.queue_.size());
while (!scheduler.empty()) {
result.emplace_back(std::move(scheduler.queue_.top()));
scheduler.queue_.pop();
}
// Re-add all elements so the contents of the input scheduler isn't
// changed.
for (auto& entry : result) {
scheduler.queue_.push(entry);
}
return result;
};
std::vector<typename EdfScheduler<T>::EdfEntry> contents1 = copyFunc(scheduler1);
std::vector<typename EdfScheduler<T>::EdfEntry> contents2 = copyFunc(scheduler2);
for (size_t i = 0; i < contents1.size(); ++i) {
// Given 2 queues and some number of picks P, where one queue is created empty
// and P picks are performed, and the other queue is created using
// `EdfScheduler::createWithPicks()` their deadlines may be a bit different
// due to floating point arithmetic differences. The comparison code uses
// a NEAR comparison to account for such differences.
EXPECT_NEAR(contents1[i].deadline_, contents2[i].deadline_, 1e-5)
adisuissa marked this conversation as resolved.
Show resolved Hide resolved
<< "inequal deadline in element " << i;
std::shared_ptr<T> entry1 = contents1[i].entry_.lock();
std::shared_ptr<T> entry2 = contents2[i].entry_.lock();
EXPECT_EQ(*entry1, *entry2) << "inequal entry in element " << i;
}
}
};

TEST_F(EdfSchedulerTest, Empty) {
EdfScheduler<uint32_t> sched;
EXPECT_EQ(nullptr, sched.peekAgain([](const double&) { return 0; }));
EXPECT_EQ(nullptr, sched.pickAndAdd([](const double&) { return 0; }));
}

// Validate we get regular RR behavior when all weights are the same.
TEST(EdfSchedulerTest, Unweighted) {
TEST_F(EdfSchedulerTest, Unweighted) {
EdfScheduler<uint32_t> sched;
constexpr uint32_t num_entries = 128;
std::shared_ptr<uint32_t> entries[num_entries];
Expand All @@ -34,7 +75,7 @@ TEST(EdfSchedulerTest, Unweighted) {
}

// Validate we get weighted RR behavior when weights are distinct.
TEST(EdfSchedulerTest, Weighted) {
TEST_F(EdfSchedulerTest, Weighted) {
EdfScheduler<uint32_t> sched;
constexpr uint32_t num_entries = 128;
std::shared_ptr<uint32_t> entries[num_entries];
Expand All @@ -59,7 +100,7 @@ TEST(EdfSchedulerTest, Weighted) {
}

// Validate that expired entries are ignored.
TEST(EdfSchedulerTest, Expired) {
TEST_F(EdfSchedulerTest, Expired) {
EdfScheduler<uint32_t> sched;

auto second_entry = std::make_shared<uint32_t>(42);
Expand All @@ -77,7 +118,7 @@ TEST(EdfSchedulerTest, Expired) {
}

// Validate that expired entries are not peeked.
TEST(EdfSchedulerTest, ExpiredPeek) {
TEST_F(EdfSchedulerTest, ExpiredPeek) {
EdfScheduler<uint32_t> sched;

{
Expand All @@ -93,7 +134,7 @@ TEST(EdfSchedulerTest, ExpiredPeek) {
}

// Validate that expired entries are ignored.
TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) {
TEST_F(EdfSchedulerTest, ExpiredPeekedIsNotPicked) {
EdfScheduler<uint32_t> sched;

{
Expand All @@ -110,7 +151,7 @@ TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) {
EXPECT_TRUE(sched.pickAndAdd([](const double&) { return 1; }) == nullptr);
}

TEST(EdfSchedulerTest, ManyPeekahead) {
TEST_F(EdfSchedulerTest, ManyPeekahead) {
EdfScheduler<uint32_t> sched1;
EdfScheduler<uint32_t> sched2;
constexpr uint32_t num_entries = 128;
Expand All @@ -134,6 +175,145 @@ TEST(EdfSchedulerTest, ManyPeekahead) {
}
}

} // namespace
// Validates that creating a scheduler using the createWithPicks (with 0 picks)
// is equal to creating an empty scheduler and adding entries one after the other.
TEST_F(EdfSchedulerTest, SchedulerWithZeroPicksEqualToEmptyWithAddedEntries) {
constexpr uint32_t num_entries = 128;
std::vector<std::shared_ptr<uint32_t>> entries;
entries.reserve(num_entries);

// Populate sched1 one entry after the other.
EdfScheduler<uint32_t> sched1;
for (uint32_t i = 0; i < num_entries; ++i) {
entries.emplace_back(std::make_shared<uint32_t>(i + 1));
sched1.add(i + 1, entries.back());
}

EdfScheduler<uint32_t> sched2 = EdfScheduler<uint32_t>::createWithPicks(
entries, [](const double& w) { return w; }, 0);

compareEdfSchedulers(sched1, sched2);
}

// Validates that creating a scheduler using the createWithPicks (with 5 picks)
// is equal to creating an empty scheduler and adding entries one after the other,
// and then performing some number of picks.
TEST_F(EdfSchedulerTest, SchedulerWithSomePicksEqualToEmptyWithAddedEntries) {
constexpr uint32_t num_entries = 128;
// Use double-precision weights from the range [0.01, 100.5].
// Using different weights to avoid a case where entries with the same weight
// will be chosen in different order.
std::vector<std::shared_ptr<double>> entries;
entries.reserve(num_entries);
for (uint32_t i = 0; i < num_entries; ++i) {
const double entry_weight = (100.5 - 0.01) / num_entries * i + 0.01;
entries.emplace_back(std::make_shared<double>(entry_weight));
}

const std::vector<uint32_t> all_picks{5, 140, 501, 123456, 894571};
for (const auto picks : all_picks) {
// Populate sched1 one entry after the other.
EdfScheduler<double> sched1;
for (uint32_t i = 0; i < num_entries; ++i) {
sched1.add(*entries[i], entries[i]);
}
// Perform the picks on sched1.
for (uint32_t i = 0; i < picks; ++i) {
sched1.pickAndAdd([](const double& w) { return w; });
}

// Create sched2 with pre-built and pre-picked entries.
EdfScheduler<double> sched2 = EdfScheduler<double>::createWithPicks(
entries, [](const double& w) { return w; }, picks);

compareEdfSchedulers(sched1, sched2);
}
}

// Emulates first-pick scenarios by creating a scheduler with the given
// weights and a random number of pre-picks, and validates that the next pick
// of all the weights is close to the given weights.
void firstPickTest(const std::vector<double> weights) {
adisuissa marked this conversation as resolved.
Show resolved Hide resolved
TestRandomGenerator rand;
ASSERT(std::accumulate(weights.begin(), weights.end(), 0.) == 100.0);
// To be able to converge to the expected weights, a decent number of iterations
// should be used. If the number of weights is large, the number of iterations
// should be larger than 10000.
constexpr uint64_t iterations = 4e5;
// The expected range of the weights is [0,100). If this is no longer the
// case, this value may need to be updated.
constexpr double tolerance_pct = 1.0;

// Set up the entries as simple integers.
std::vector<std::shared_ptr<size_t>> entries;
entries.reserve(weights.size());
for (size_t i = 0; i < weights.size(); ++i) {
entries.emplace_back(std::make_shared<size_t>(i));
}

absl::flat_hash_map<size_t, int> sched_picks;
auto calc_weight = [&weights](const size_t& i) -> double { return weights[i]; };

for (uint64_t i = 0; i < iterations; ++i) {
// Create a scheduler with the given weights with a random number of
// emulated pre-picks.
uint32_t r = rand.random();
auto sched = EdfScheduler<size_t>::createWithPicks(entries, calc_weight, r);

// Perform a "first-pick" from that scheduler, and increase the counter for
// that entry.
sched_picks[*sched.pickAndAdd(calc_weight)]++;
}

// Validate that the observed distribution and expected weights are close.
ASSERT_EQ(weights.size(), sched_picks.size());
for (const auto& it : sched_picks) {
const double expected = calc_weight(it.first);
const double observed = 100 * static_cast<double>(it.second) / iterations;
EXPECT_NEAR(expected, observed, tolerance_pct);
}
}

// Validates that after creating schedulers using the createWithPicks (with random picks)
// and then performing a "first-pick", the distribution of the "first-picks" is
// equal to the weights.
TEST_F(EdfSchedulerTest, SchedulerWithRandomPicksFirstPickDistribution) {
firstPickTest({25.0, 75.0});
firstPickTest({1.0, 99.0});
firstPickTest({50.0, 50.0});
firstPickTest({1.0, 20.0, 79.0});
}
htuch marked this conversation as resolved.
Show resolved Hide resolved

constexpr uint64_t BATCH_SIZE = 50;
static std::vector<uint64_t> picksStarts() {
std::vector<uint64_t> start_idxs;
// Add the first range, as it starts at 1 (and not 0).
start_idxs.emplace_back(1);
// The weight delta between iterations is 0.001, so to cover the range
// from 0 to 100, the largest start_idx will be 100 / 0.001.
for (uint64_t i = 50; i < 100 * 1000; i += BATCH_SIZE) {
start_idxs.emplace_back(i);
}
return start_idxs;
}

class EdfSchedulerSpecialTest : public testing::TestWithParam<uint64_t> {};
// Validates that after creating schedulers using the createWithPicks (with random picks)
// and then performing a "first-pick", the distribution of the "first-picks" is
// equal to the weights. Trying the case of 2 weights between 0 to 100, in steps
// of 0.001. This test takes too long, and therefore it is disabled by default.
// If the EDF scheduler is enable, it can be manually executed.
TEST_P(EdfSchedulerSpecialTest, DISABLED_ExhustiveValidator) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Exhustive => Exhaustive

const uint64_t start_idx = GetParam();
for (uint64_t i = start_idx; i < start_idx + BATCH_SIZE; ++i) {
const double w1 = 0.001 * i;
ENVOY_LOG_MISC(trace, "Testing weights: w1={}, w2={}", w1, 100.0 - w1);
firstPickTest({w1, 100.0 - w1});
}
}

INSTANTIATE_TEST_SUITE_P(ExhustiveValidator, EdfSchedulerSpecialTest,
testing::ValuesIn(picksStarts()));

} // namespace Upstream
} // namespace Envoy