-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert the delaying node to a gated node to demonstrate my original idea #4
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,12 @@ | |
|
||
#include <chrono> | ||
#include <memory> | ||
#include <mutex> | ||
#include <numeric> | ||
#include <random> | ||
#include <string_view> | ||
#include "arrow/acero/exec_plan.h" | ||
#include "arrow/testing/future_util.h" | ||
#ifndef NDEBUG | ||
#include <sstream> | ||
#endif | ||
|
@@ -32,6 +35,7 @@ | |
#include "arrow/acero/options_internal.h" | ||
#endif | ||
#include "arrow/acero/map_node.h" | ||
#include "arrow/acero/query_context.h" | ||
#include "arrow/acero/test_nodes.h" | ||
#include "arrow/acero/test_util_internal.h" | ||
#include "arrow/acero/util.h" | ||
|
@@ -44,6 +48,7 @@ | |
#include "arrow/testing/random.h" | ||
#include "arrow/util/checked_cast.h" | ||
#include "arrow/util/thread_pool.h" | ||
#include "arrow/util/tracing_internal.h" | ||
|
||
#define TRACED_TEST(t_class, t_name, t_body) \ | ||
TEST(t_class, t_name) { \ | ||
|
@@ -1411,50 +1416,159 @@ struct BackpressureCountingNode : public MapNode { | |
BackpressureCounters* counters; | ||
}; | ||
|
||
struct BackpressureDelayingNodeOptions : public ExecNodeOptions { | ||
explicit BackpressureDelayingNodeOptions(std::function<bool()> gate) : gate(gate) {} | ||
class Gate { | ||
public: | ||
void ReleaseAllBatches() { | ||
std::lock_guard lg(mutex_); | ||
num_allowed_batches_ = -1; | ||
NotifyAll(); | ||
} | ||
|
||
void ReleaseOneBatch() { | ||
std::lock_guard lg(mutex_); | ||
DCHECK_GE(num_allowed_batches_, 0) | ||
<< "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()"; | ||
num_allowed_batches_++; | ||
NotifyAll(); | ||
} | ||
|
||
Future<> WaitForNextReleasedBatch() { | ||
std::lock_guard lg(mutex_); | ||
if (current_waiter_.is_valid()) { | ||
return current_waiter_; | ||
} | ||
Future<> fut; | ||
if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) { | ||
num_released_batches_++; | ||
return Future<>::MakeFinished(); | ||
} | ||
|
||
current_waiter_ = Future<>::Make(); | ||
return current_waiter_; | ||
} | ||
|
||
private: | ||
void NotifyAll() { | ||
if (current_waiter_.is_valid()) { | ||
Future<> to_unlock = current_waiter_; | ||
current_waiter_ = {}; | ||
to_unlock.MarkFinished(); | ||
} | ||
} | ||
|
||
std::function<bool()> gate; | ||
Future<> current_waiter_; | ||
int num_released_batches_ = 0; | ||
int num_allowed_batches_ = 0; | ||
std::mutex mutex_; | ||
}; | ||
|
||
struct BackpressureDelayingNode : public MapNode { | ||
struct GatedNodeOptions : public ExecNodeOptions { | ||
explicit GatedNodeOptions(Gate* gate) : gate(gate) {} | ||
Gate* gate; | ||
}; | ||
|
||
struct GatedNode : public ExecNode, public TracedNode { | ||
static constexpr auto kKindName = "BackpressureDelayingNode"; | ||
static constexpr const char* kFactoryName = "backpressure_delay"; | ||
|
||
static void Register() { | ||
auto exec_reg = default_exec_factory_registry(); | ||
if (!exec_reg->GetFactory(kFactoryName).ok()) { | ||
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make)); | ||
ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make)); | ||
} | ||
} | ||
|
||
BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs, | ||
std::shared_ptr<Schema> output_schema, | ||
const BackpressureDelayingNodeOptions& options) | ||
: MapNode(plan, inputs, output_schema), gate(options.gate) {} | ||
GatedNode(ExecPlan* plan, std::vector<ExecNode*> inputs, | ||
std::shared_ptr<Schema> output_schema, const GatedNodeOptions& options) | ||
: ExecNode(plan, inputs, {"input"}, output_schema), | ||
TracedNode(this), | ||
gate_(options.gate) {} | ||
|
||
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs, | ||
const ExecNodeOptions& options) { | ||
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); | ||
auto bp_options = static_cast<const BackpressureDelayingNodeOptions&>(options); | ||
return plan->EmplaceNode<BackpressureDelayingNode>( | ||
plan, inputs, inputs[0]->output_schema(), bp_options); | ||
auto gated_node_opts = static_cast<const GatedNodeOptions&>(options); | ||
return plan->EmplaceNode<GatedNode>(plan, inputs, inputs[0]->output_schema(), | ||
gated_node_opts); | ||
} | ||
|
||
const char* kind_name() const override { return kKindName; } | ||
Result<ExecBatch> ProcessBatch(ExecBatch batch) override { | ||
while (!gate()) { | ||
SleepABit(); | ||
|
||
const Ordering& ordering() const override { return inputs_[0]->ordering(); } | ||
Status InputFinished(ExecNode* input, int total_batches) override { | ||
return output_->InputFinished(this, total_batches); | ||
} | ||
Status StartProducing() override { | ||
NoteStartProducing(ToStringExtra()); | ||
return Status::OK(); | ||
} | ||
|
||
void PauseProducing(ExecNode* output, int32_t counter) override { | ||
inputs_[0]->PauseProducing(this, counter); | ||
} | ||
|
||
void ResumeProducing(ExecNode* output, int32_t counter) override { | ||
inputs_[0]->ResumeProducing(this, counter); | ||
} | ||
|
||
Status StopProducingImpl() override { return Status::OK(); } | ||
|
||
Status SendBatchesUnlocked(std::unique_lock<std::mutex>&& lock) { | ||
while (!queued_batches_.empty()) { | ||
// If we are ready to release the batch, do so immediately. | ||
Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); | ||
bool callback_added = maybe_unlocked.TryAddCallback([this] { | ||
return [this](const Status& st) { | ||
DCHECK_OK(st); | ||
plan_->query_context()->ScheduleTask( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reminds me of a (not so related) question I had. In asof join node, should we call
https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#LL1356C21-L1356C67 And in general, when to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, I noticed that as well. You probably should. That being said, it probably won't make too much difference since you're running everything single threaded anyways. However, even that isn't really true. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream. If you use schedule task then you'd just be transferring more work to the other thread so it might actually hurt. So I was going to wait and make this suggestion when / if you add multithreading support to asof-join.
You should call it directly if you are going to keep processing the data that is in the current thread's CPU cache. You should schedule a new task if you are going to start processing a batch of data that isn't in the current thread's CPU cache. This case fits the second condition. However, this case is also a special case anyways. This is because the thread that executes that callback will be the unit test thread (as part of the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
A bit more accurately, this "thread-splitting" occurs for each as-of-join node in the plan. We could create an issue to replace the internal-as-of-join-thread with some kind of execution facility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah that has been something that I have slight concern over. I don't love the fact that downstream works, e.g., projections happens outside the serial execution thread. I was trying to get to the model where the asof join processing thread pulls data from the (buffered) asof join input queues, do the join and send the output to the downstream via the scheduler. So from downstream node point of view, the asof join processing thread is transparent to any other node and purely something internal to the asof join node. It doesn't really bring any performance benefit I think (like you mentioned, it might actually make it run slower because more work is shifted to the scheduler thread), but I do like the simpler execution model of it (processing thread being transparent). And I think if we want to do that, we can do this by calling |
||
[this] { | ||
std::unique_lock lk(mutex_); | ||
return SendBatchesUnlocked(std::move(lk)); | ||
}, | ||
"GatedNode::ResumeAfterNotify"); | ||
}; | ||
}); | ||
if (callback_added) { | ||
break; | ||
} | ||
// Otherwise, the future is already finished which means the gate is unlocked | ||
// and we are allowed to send a batch | ||
ExecBatch next = std::move(queued_batches_.front()); | ||
queued_batches_.pop(); | ||
lock.unlock(); | ||
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); | ||
lock.lock(); | ||
} | ||
return batch; | ||
return Status::OK(); | ||
} | ||
|
||
Status InputReceived(ExecNode* input, ExecBatch batch) override { | ||
auto scope = TraceInputReceived(batch); | ||
DCHECK_EQ(input, inputs_[0]); | ||
|
||
// This may be called concurrently by the source and by a restart attempt. Process | ||
// one at a time (this critical section should be pretty small) | ||
std::unique_lock lk(mutex_); | ||
queued_batches_.push(std::move(batch)); | ||
|
||
return SendBatchesUnlocked(std::move(lk)); | ||
} | ||
|
||
std::function<bool()> gate; | ||
Gate* gate_; | ||
std::queue<ExecBatch> queued_batches_; | ||
std::mutex mutex_; | ||
}; | ||
|
||
AsyncGenerator<std::optional<ExecBatch>> GetGen( | ||
AsyncGenerator<std::optional<ExecBatch>> gen) { | ||
return gen; | ||
} | ||
AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) { | ||
return bws.gen(false, false); | ||
} | ||
|
||
template <typename BatchesMaker> | ||
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, | ||
double fast_delay, double slow_delay, bool noisy = false) { | ||
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { | ||
auto l_schema = | ||
schema({field("time", int32()), field("key", int32()), field("l_value", int32())}); | ||
auto r0_schema = | ||
|
@@ -1474,22 +1588,27 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, | |
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); | ||
|
||
BackpressureCountingNode::Register(); | ||
BackpressureDelayingNode::Register(); | ||
GatedNode::Register(); | ||
|
||
struct BackpressureSourceConfig { | ||
std::string name_prefix; | ||
bool is_fast; | ||
bool is_gated; | ||
std::shared_ptr<Schema> schema; | ||
decltype(l_batches) batches; | ||
|
||
std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } | ||
std::string name() const { | ||
return name_prefix + ";" + (is_gated ? "gated" : "ungated"); | ||
} | ||
}; | ||
|
||
// must have at least one fast and one slow | ||
Gate gate; | ||
GatedNodeOptions gate_options(&gate); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. I must be missing something or getting lucky with timing |
||
|
||
// Two ungated and one gated | ||
std::vector<BackpressureSourceConfig> source_configs = { | ||
{"0", true, l_schema, l_batches}, | ||
{"1", false, r0_schema, r0_batches}, | ||
{"2", true, r1_schema, r1_batches}, | ||
{"0", false, l_schema, l_batches}, | ||
{"1", true, r0_schema, r0_batches}, | ||
{"2", false, r1_schema, r1_batches}, | ||
}; | ||
|
||
std::vector<BackpressureCounters> bp_counters(source_configs.size()); | ||
|
@@ -1498,59 +1617,55 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, | |
std::vector<Declaration::Input> bp_decls; | ||
for (size_t i = 0; i < source_configs.size(); i++) { | ||
const auto& config = source_configs[i]; | ||
src_decls.emplace_back( | ||
"source", SourceNodeOptions( | ||
config.schema, | ||
MakeDelayedGen(config.batches, config.name(), | ||
config.is_fast ? fast_delay : slow_delay, noisy))); | ||
|
||
src_decls.emplace_back("source", | ||
SourceNodeOptions(config.schema, GetGen(config.batches))); | ||
bp_options.push_back( | ||
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i])); | ||
std::shared_ptr<ExecNodeOptions> options = bp_options.back(); | ||
std::vector<Declaration::Input> bp_in = {src_decls.back()}; | ||
Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, | ||
std::move(options)}; | ||
if (config.is_gated) { | ||
bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options}; | ||
} | ||
bp_decls.push_back(bp_decl); | ||
} | ||
|
||
Declaration asofjoin = { | ||
"asofjoin", bp_decls, | ||
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)}; | ||
Declaration asofjoin = {"asofjoin", bp_decls, | ||
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)}; | ||
|
||
ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool, | ||
internal::ThreadPool::Make(1)); | ||
ExecContext exec_ctx(default_memory_pool(), tpool.get()); | ||
Future<BatchesWithCommonSchema> batches_fut = | ||
DeclarationToExecBatchesAsync(asofjoin, exec_ctx); | ||
|
||
BackpressureDelayingNodeOptions delay_options([&bp_counters]() { | ||
auto has_bp_been_applied = [&] { | ||
int total_paused = 0; | ||
for (const auto& counters : bp_counters) { | ||
if (counters.pause_count > 0 || counters.resume_count > 0) { | ||
return true; | ||
} | ||
total_paused += counters.pause_count; | ||
} | ||
return false; | ||
}); | ||
Declaration delaying = { | ||
BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options}; | ||
|
||
ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader, | ||
DeclarationToReader(delaying, /*use_threads=*/false)); | ||
|
||
int64_t total_length = 0; | ||
for (;;) { | ||
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next()); | ||
if (!batch) { | ||
break; | ||
} | ||
total_length += batch->num_rows(); | ||
} | ||
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length); | ||
// One of the inputs is gated. The other two will eventually be paused by the asof | ||
// join node | ||
return total_paused >= 2; | ||
}; | ||
|
||
BusyWait(10.0, has_bp_been_applied); | ||
ASSERT_TRUE(has_bp_been_applied()); | ||
|
||
gate.ReleaseAllBatches(); | ||
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); | ||
|
||
size_t total_count = 0; | ||
size_t total_resumed = 0; | ||
for (const auto& counters : bp_counters) { | ||
total_count += counters.pause_count; | ||
total_count += counters.resume_count; | ||
total_resumed += counters.resume_count; | ||
} | ||
ASSERT_GT(total_count, 0); | ||
ASSERT_GE(total_resumed, 2); | ||
} | ||
|
||
TEST(AsofJoinTest, BackpressureWithBatches) { | ||
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1, | ||
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false); | ||
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); | ||
} | ||
|
||
template <typename BatchesMaker> | ||
|
@@ -1615,8 +1730,7 @@ T GetEnvValue(const std::string& var, T default_value) { | |
TEST(AsofJoinTest, BackpressureWithBatchesGen) { | ||
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20); | ||
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); | ||
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size, | ||
/*fast_delay=*/0.001, /*slow_delay=*/0.01); | ||
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size); | ||
} | ||
|
||
} // namespace acero | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change these constants to "GatedNode" and "gated".