From 0cf8c4724c101bbbf126db05544c9b651f9f288c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Jun 2023 05:58:12 -0700 Subject: [PATCH 1/2] Convert the delaying node to a gated node to demonstrate my original idea --- cpp/src/arrow/acero/asof_join_node_test.cc | 180 +++++++++++++++------ 1 file changed, 134 insertions(+), 46 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 7af1379d5a26b..03d28aa2c518e 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -19,9 +19,12 @@ #include #include +#include #include #include #include +#include "arrow/acero/exec_plan.h" +#include "arrow/testing/future_util.h" #ifndef NDEBUG #include #endif @@ -32,6 +35,7 @@ #include "arrow/acero/options_internal.h" #endif #include "arrow/acero/map_node.h" +#include "arrow/acero/query_context.h" #include "arrow/acero/test_nodes.h" #include "arrow/acero/test_util_internal.h" #include "arrow/acero/util.h" @@ -44,6 +48,7 @@ #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #define TRACED_TEST(t_class, t_name, t_body) \ TEST(t_class, t_name) { \ @@ -1411,47 +1416,135 @@ struct BackpressureCountingNode : public MapNode { BackpressureCounters* counters; }; -struct BackpressureDelayingNodeOptions : public ExecNodeOptions { - explicit BackpressureDelayingNodeOptions(std::function gate) : gate(gate) {} +class Gate { + public: + void ReleaseAllBatches() { + std::lock_guard lg(mutex_); + num_allowed_batches_ = -1; + NotifyAll(); + } + + void ReleaseOneBatch() { + std::lock_guard lg(mutex_); + DCHECK_GE(num_allowed_batches_, 0) + << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()"; + num_allowed_batches_++; + NotifyAll(); + } + + Future<> WaitForNextReleasedBatch() { + std::lock_guard lg(mutex_); + if (current_waiter_.is_valid()) { + return current_waiter_; + } + Future<> fut; + if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) { + num_released_batches_++; + return Future<>::MakeFinished(); + } + + current_waiter_ = Future<>::Make(); + return current_waiter_; + } + + private: + void NotifyAll() { + if (current_waiter_.is_valid()) { + Future<> to_unlock = current_waiter_; + current_waiter_ = {}; + to_unlock.MarkFinished(); + } + } - std::function gate; + Future<> current_waiter_; + int num_released_batches_ = 0; + int num_allowed_batches_ = 0; + std::mutex mutex_; }; -struct BackpressureDelayingNode : public MapNode { +struct GatedNodeOptions : public ExecNodeOptions { + explicit GatedNodeOptions(Gate* gate) : gate(gate) {} + Gate* gate; +}; + +struct GatedNode : public ExecNode, public TracedNode { static constexpr auto kKindName = "BackpressureDelayingNode"; static constexpr const char* kFactoryName = "backpressure_delay"; static void Register() { auto exec_reg = default_exec_factory_registry(); if (!exec_reg->GetFactory(kFactoryName).ok()) { - ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make)); + ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make)); } } - BackpressureDelayingNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, - const BackpressureDelayingNodeOptions& options) - : MapNode(plan, inputs, output_schema), gate(options.gate) {} + GatedNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, const GatedNodeOptions& options) + : ExecNode(plan, inputs, {"input"}, output_schema), + TracedNode(this), + gate_(options.gate) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); - auto bp_options = static_cast(options); - return plan->EmplaceNode( - plan, inputs, inputs[0]->output_schema(), bp_options); + auto gated_node_opts = static_cast(options); + return plan->EmplaceNode(plan, inputs, inputs[0]->output_schema(), + gated_node_opts); } const char* kind_name() const override { return kKindName; } - Result ProcessBatch(ExecBatch batch) override { - while (!gate()) { - SleepABit(); + + const Ordering& ordering() const override { return inputs_[0]->ordering(); } + Status InputFinished(ExecNode* input, int total_batches) override { + return output_->InputFinished(this, total_batches); + } + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + // If we are ready to release the batch, do so immediately. + Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); + if (maybe_unlocked.is_finished()) { + return output_->InputReceived(this, std::move(batch)); } - return batch; + + // Otherwise, we will wait for the gate to notify us and then check if we are + // ready to relese a batch again. + maybe_unlocked.AddCallback([this, input, batch](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this, input, batch] { return InputReceived(input, batch); }, + "GatedNode::ResumeAfterNotify"); + }); + return Status::OK(); } - std::function gate; + Gate* gate_; }; +AsyncGenerator> GetGen( + AsyncGenerator> gen) { + return gen; +} +AsyncGenerator> GetGen(BatchesWithSchema bws) { + return bws.gen(false, false); +} + template void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, double fast_delay, double slow_delay, bool noisy = false) { @@ -1474,7 +1567,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); BackpressureCountingNode::Register(); - BackpressureDelayingNode::Register(); + GatedNode::Register(); struct BackpressureSourceConfig { std::string name_prefix; @@ -1485,6 +1578,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } }; + Gate gate; + GatedNodeOptions gate_options(&gate); + // must have at least one fast and one slow std::vector source_configs = { {"0", true, l_schema, l_batches}, @@ -1498,11 +1594,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::vector bp_decls; for (size_t i = 0; i < source_configs.size(); i++) { const auto& config = source_configs[i]; - src_decls.emplace_back( - "source", SourceNodeOptions( - config.schema, - MakeDelayedGen(config.batches, config.name(), - config.is_fast ? fast_delay : slow_delay, noisy))); + + src_decls.emplace_back("source", + SourceNodeOptions(config.schema, GetGen(config.batches))); bp_options.push_back( std::make_shared(&bp_counters[i])); std::shared_ptr options = bp_options.back(); @@ -1516,36 +1610,30 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, "asofjoin", bp_decls, GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; - BackpressureDelayingNodeOptions delay_options([&bp_counters]() { + ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, + internal::ThreadPool::Make(1)); + ExecContext exec_ctx(default_memory_pool(), tpool.get()); + Future batches_fut = + DeclarationToExecBatchesAsync(asofjoin, exec_ctx); + + BusyWait(10.0, [&] { + int total_paused = 0; for (const auto& counters : bp_counters) { - if (counters.pause_count > 0 || counters.resume_count > 0) { - return true; - } + total_paused += counters.pause_count; } - return false; + // One of the inputs is gated. The other two will eventually be paused by the asof + // join node + return total_paused >= 2; }); - Declaration delaying = { - BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options}; - - ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, - DeclarationToReader(delaying, /*use_threads=*/false)); - int64_t total_length = 0; - for (;;) { - ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next()); - if (!batch) { - break; - } - total_length += batch->num_rows(); - } - ASSERT_EQ(static_cast(num_batches * batch_size), total_length); + gate.ReleaseAllBatches(); + ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); - size_t total_count = 0; + size_t total_resumed = 0; for (const auto& counters : bp_counters) { - total_count += counters.pause_count; - total_count += counters.resume_count; + total_resumed += counters.resume_count; } - ASSERT_GT(total_count, 0); + ASSERT_GE(total_resumed, 2); } TEST(AsofJoinTest, BackpressureWithBatches) { From 1edc44638175cee358333032e68aa27fdeb783b7 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 14 Jun 2023 06:34:20 -0700 Subject: [PATCH 2/2] Fix the test to actually use the gated node. Fix the gated node so that it delivers batches in order. --- cpp/src/arrow/acero/asof_join_node_test.cc | 88 ++++++++++++++-------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 03d28aa2c518e..e264bf815112b 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1513,28 +1513,50 @@ struct GatedNode : public ExecNode, public TracedNode { Status StopProducingImpl() override { return Status::OK(); } + Status SendBatchesUnlocked(std::unique_lock&& lock) { + while (!queued_batches_.empty()) { + // If we are ready to release the batch, do so immediately. + Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); + bool callback_added = maybe_unlocked.TryAddCallback([this] { + return [this](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this] { + std::unique_lock lk(mutex_); + return SendBatchesUnlocked(std::move(lk)); + }, + "GatedNode::ResumeAfterNotify"); + }; + }); + if (callback_added) { + break; + } + // Otherwise, the future is already finished which means the gate is unlocked + // and we are allowed to send a batch + ExecBatch next = std::move(queued_batches_.front()); + queued_batches_.pop(); + lock.unlock(); + ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); + lock.lock(); + } + return Status::OK(); + } + Status InputReceived(ExecNode* input, ExecBatch batch) override { auto scope = TraceInputReceived(batch); DCHECK_EQ(input, inputs_[0]); - // If we are ready to release the batch, do so immediately. - Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); - if (maybe_unlocked.is_finished()) { - return output_->InputReceived(this, std::move(batch)); - } + // This may be called concurrently by the source and by a restart attempt. Process + // one at a time (this critical section should be pretty small) + std::unique_lock lk(mutex_); + queued_batches_.push(std::move(batch)); - // Otherwise, we will wait for the gate to notify us and then check if we are - // ready to relese a batch again. - maybe_unlocked.AddCallback([this, input, batch](const Status& st) { - DCHECK_OK(st); - plan_->query_context()->ScheduleTask( - [this, input, batch] { return InputReceived(input, batch); }, - "GatedNode::ResumeAfterNotify"); - }); - return Status::OK(); + return SendBatchesUnlocked(std::move(lk)); } Gate* gate_; + std::queue queued_batches_; + std::mutex mutex_; }; AsyncGenerator> GetGen( @@ -1546,8 +1568,7 @@ AsyncGenerator> GetGen(BatchesWithSchema bws) { } template -void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, - double fast_delay, double slow_delay, bool noisy = false) { +void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { auto l_schema = schema({field("time", int32()), field("key", int32()), field("l_value", int32())}); auto r0_schema = @@ -1571,21 +1592,23 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, struct BackpressureSourceConfig { std::string name_prefix; - bool is_fast; + bool is_gated; std::shared_ptr schema; decltype(l_batches) batches; - std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } + std::string name() const { + return name_prefix + ";" + (is_gated ? "gated" : "ungated"); + } }; Gate gate; GatedNodeOptions gate_options(&gate); - // must have at least one fast and one slow + // Two ungated and one gated std::vector source_configs = { - {"0", true, l_schema, l_batches}, - {"1", false, r0_schema, r0_batches}, - {"2", true, r1_schema, r1_batches}, + {"0", false, l_schema, l_batches}, + {"1", true, r0_schema, r0_batches}, + {"2", false, r1_schema, r1_batches}, }; std::vector bp_counters(source_configs.size()); @@ -1603,12 +1626,14 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, std::vector bp_in = {src_decls.back()}; Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, std::move(options)}; + if (config.is_gated) { + bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options}; + } bp_decls.push_back(bp_decl); } - Declaration asofjoin = { - "asofjoin", bp_decls, - GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; + Declaration asofjoin = {"asofjoin", bp_decls, + GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)}; ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, internal::ThreadPool::Make(1)); @@ -1616,7 +1641,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, Future batches_fut = DeclarationToExecBatchesAsync(asofjoin, exec_ctx); - BusyWait(10.0, [&] { + auto has_bp_been_applied = [&] { int total_paused = 0; for (const auto& counters : bp_counters) { total_paused += counters.pause_count; @@ -1624,7 +1649,10 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, // One of the inputs is gated. The other two will eventually be paused by the asof // join node return total_paused >= 2; - }); + }; + + BusyWait(10.0, has_bp_been_applied); + ASSERT_TRUE(has_bp_been_applied()); gate.ReleaseAllBatches(); ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); @@ -1637,8 +1665,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1, - /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false); + return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); } template @@ -1703,8 +1730,7 @@ T GetEnvValue(const std::string& var, T default_value) { TEST(AsofJoinTest, BackpressureWithBatchesGen) { int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20); int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); - return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size, - /*fast_delay=*/0.001, /*slow_delay=*/0.01); + return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size); } } // namespace acero