Skip to content

Commit

Permalink
Merge pull request #4 from westonpace/apacheGH-35838
Browse files Browse the repository at this point in the history
Convert the delaying node to a gated node to demonstrate my original idea
  • Loading branch information
rtpsw authored Jun 14, 2023
2 parents e29520b + 1edc446 commit 4ecd7ed
Showing 1 changed file with 177 additions and 63 deletions.
240 changes: 177 additions & 63 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <string_view>
#include "arrow/acero/exec_plan.h"
#include "arrow/testing/future_util.h"
#ifndef NDEBUG
#include <sstream>
#endif
Expand All @@ -32,6 +35,7 @@
#include "arrow/acero/options_internal.h"
#endif
#include "arrow/acero/map_node.h"
#include "arrow/acero/query_context.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/util.h"
Expand All @@ -44,6 +48,7 @@
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

#define TRACED_TEST(t_class, t_name, t_body) \
TEST(t_class, t_name) { \
Expand Down Expand Up @@ -1411,50 +1416,159 @@ struct BackpressureCountingNode : public MapNode {
BackpressureCounters* counters;
};

struct BackpressureDelayingNodeOptions : public ExecNodeOptions {
explicit BackpressureDelayingNodeOptions(std::function<bool()> gate) : gate(gate) {}
class Gate {
public:
void ReleaseAllBatches() {
std::lock_guard lg(mutex_);
num_allowed_batches_ = -1;
NotifyAll();
}

void ReleaseOneBatch() {
std::lock_guard lg(mutex_);
DCHECK_GE(num_allowed_batches_, 0)
<< "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()";
num_allowed_batches_++;
NotifyAll();
}

Future<> WaitForNextReleasedBatch() {
std::lock_guard lg(mutex_);
if (current_waiter_.is_valid()) {
return current_waiter_;
}
Future<> fut;
if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) {
num_released_batches_++;
return Future<>::MakeFinished();
}

current_waiter_ = Future<>::Make();
return current_waiter_;
}

private:
void NotifyAll() {
if (current_waiter_.is_valid()) {
Future<> to_unlock = current_waiter_;
current_waiter_ = {};
to_unlock.MarkFinished();
}
}

std::function<bool()> gate;
Future<> current_waiter_;
int num_released_batches_ = 0;
int num_allowed_batches_ = 0;
std::mutex mutex_;
};

struct BackpressureDelayingNode : public MapNode {
struct GatedNodeOptions : public ExecNodeOptions {
explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
Gate* gate;
};

struct GatedNode : public ExecNode, public TracedNode {
static constexpr auto kKindName = "BackpressureDelayingNode";
static constexpr const char* kFactoryName = "backpressure_delay";

static void Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make));
ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make));
}
}

BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const BackpressureDelayingNodeOptions& options)
: MapNode(plan, inputs, output_schema), gate(options.gate) {}
GatedNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, const GatedNodeOptions& options)
: ExecNode(plan, inputs, {"input"}, output_schema),
TracedNode(this),
gate_(options.gate) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
auto bp_options = static_cast<const BackpressureDelayingNodeOptions&>(options);
return plan->EmplaceNode<BackpressureDelayingNode>(
plan, inputs, inputs[0]->output_schema(), bp_options);
auto gated_node_opts = static_cast<const GatedNodeOptions&>(options);
return plan->EmplaceNode<GatedNode>(plan, inputs, inputs[0]->output_schema(),
gated_node_opts);
}

const char* kind_name() const override { return kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
while (!gate()) {
SleepABit();

const Ordering& ordering() const override { return inputs_[0]->ordering(); }
Status InputFinished(ExecNode* input, int total_batches) override {
return output_->InputFinished(this, total_batches);
}
Status StartProducing() override {
NoteStartProducing(ToStringExtra());
return Status::OK();
}

void PauseProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->PauseProducing(this, counter);
}

void ResumeProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->ResumeProducing(this, counter);
}

Status StopProducingImpl() override { return Status::OK(); }

Status SendBatchesUnlocked(std::unique_lock<std::mutex>&& lock) {
while (!queued_batches_.empty()) {
// If we are ready to release the batch, do so immediately.
Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch();
bool callback_added = maybe_unlocked.TryAddCallback([this] {
return [this](const Status& st) {
DCHECK_OK(st);
plan_->query_context()->ScheduleTask(
[this] {
std::unique_lock lk(mutex_);
return SendBatchesUnlocked(std::move(lk));
},
"GatedNode::ResumeAfterNotify");
};
});
if (callback_added) {
break;
}
// Otherwise, the future is already finished which means the gate is unlocked
// and we are allowed to send a batch
ExecBatch next = std::move(queued_batches_.front());
queued_batches_.pop();
lock.unlock();
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next)));
lock.lock();
}
return batch;
return Status::OK();
}

Status InputReceived(ExecNode* input, ExecBatch batch) override {
auto scope = TraceInputReceived(batch);
DCHECK_EQ(input, inputs_[0]);

// This may be called concurrently by the source and by a restart attempt. Process
// one at a time (this critical section should be pretty small)
std::unique_lock lk(mutex_);
queued_batches_.push(std::move(batch));

return SendBatchesUnlocked(std::move(lk));
}

std::function<bool()> gate;
Gate* gate_;
std::queue<ExecBatch> queued_batches_;
std::mutex mutex_;
};

AsyncGenerator<std::optional<ExecBatch>> GetGen(
AsyncGenerator<std::optional<ExecBatch>> gen) {
return gen;
}
AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
return bws.gen(false, false);
}

template <typename BatchesMaker>
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
double fast_delay, double slow_delay, bool noisy = false) {
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
Expand All @@ -1474,22 +1588,27 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));

BackpressureCountingNode::Register();
BackpressureDelayingNode::Register();
GatedNode::Register();

struct BackpressureSourceConfig {
std::string name_prefix;
bool is_fast;
bool is_gated;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;

std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); }
std::string name() const {
return name_prefix + ";" + (is_gated ? "gated" : "ungated");
}
};

// must have at least one fast and one slow
Gate gate;
GatedNodeOptions gate_options(&gate);

// Two ungated and one gated
std::vector<BackpressureSourceConfig> source_configs = {
{"0", true, l_schema, l_batches},
{"1", false, r0_schema, r0_batches},
{"2", true, r1_schema, r1_batches},
{"0", false, l_schema, l_batches},
{"1", true, r0_schema, r0_batches},
{"2", false, r1_schema, r1_batches},
};

std::vector<BackpressureCounters> bp_counters(source_configs.size());
Expand All @@ -1498,59 +1617,55 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];
src_decls.emplace_back(
"source", SourceNodeOptions(
config.schema,
MakeDelayedGen(config.batches, config.name(),
config.is_fast ? fast_delay : slow_delay, noisy)));

src_decls.emplace_back("source",
SourceNodeOptions(config.schema, GetGen(config.batches)));
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
std::vector<Declaration::Input> bp_in = {src_decls.back()};
Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
std::move(options)};
if (config.is_gated) {
bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
}
bp_decls.push_back(bp_decl);
}

Declaration asofjoin = {
"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)};
Declaration asofjoin = {"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};

ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
internal::ThreadPool::Make(1));
ExecContext exec_ctx(default_memory_pool(), tpool.get());
Future<BatchesWithCommonSchema> batches_fut =
DeclarationToExecBatchesAsync(asofjoin, exec_ctx);

BackpressureDelayingNodeOptions delay_options([&bp_counters]() {
auto has_bp_been_applied = [&] {
int total_paused = 0;
for (const auto& counters : bp_counters) {
if (counters.pause_count > 0 || counters.resume_count > 0) {
return true;
}
total_paused += counters.pause_count;
}
return false;
});
Declaration delaying = {
BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options};

ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(delaying, /*use_threads=*/false));

int64_t total_length = 0;
for (;;) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
if (!batch) {
break;
}
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
// One of the inputs is gated. The other two will eventually be paused by the asof
// join node
return total_paused >= 2;
};

BusyWait(10.0, has_bp_been_applied);
ASSERT_TRUE(has_bp_been_applied());

gate.ReleaseAllBatches();
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);

size_t total_count = 0;
size_t total_resumed = 0;
for (const auto& counters : bp_counters) {
total_count += counters.pause_count;
total_count += counters.resume_count;
total_resumed += counters.resume_count;
}
ASSERT_GT(total_count, 0);
ASSERT_GE(total_resumed, 2);
}

TEST(AsofJoinTest, BackpressureWithBatches) {
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1,
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
}

template <typename BatchesMaker>
Expand Down Expand Up @@ -1615,8 +1730,7 @@ T GetEnvValue(const std::string& var, T default_value) {
TEST(AsofJoinTest, BackpressureWithBatchesGen) {
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
/*fast_delay=*/0.001, /*slow_delay=*/0.01);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
}

} // namespace acero
Expand Down

0 comments on commit 4ecd7ed

Please sign in to comment.