Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert the delaying node to a gated node to demonstrate my original idea #4

Merged
merged 2 commits into from
Jun 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 177 additions & 63 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <string_view>
#include "arrow/acero/exec_plan.h"
#include "arrow/testing/future_util.h"
#ifndef NDEBUG
#include <sstream>
#endif
Expand All @@ -32,6 +35,7 @@
#include "arrow/acero/options_internal.h"
#endif
#include "arrow/acero/map_node.h"
#include "arrow/acero/query_context.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/util.h"
Expand All @@ -44,6 +48,7 @@
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

#define TRACED_TEST(t_class, t_name, t_body) \
TEST(t_class, t_name) { \
Expand Down Expand Up @@ -1411,50 +1416,159 @@ struct BackpressureCountingNode : public MapNode {
BackpressureCounters* counters;
};

struct BackpressureDelayingNodeOptions : public ExecNodeOptions {
explicit BackpressureDelayingNodeOptions(std::function<bool()> gate) : gate(gate) {}
class Gate {
public:
void ReleaseAllBatches() {
std::lock_guard lg(mutex_);
num_allowed_batches_ = -1;
NotifyAll();
}

void ReleaseOneBatch() {
std::lock_guard lg(mutex_);
DCHECK_GE(num_allowed_batches_, 0)
<< "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()";
num_allowed_batches_++;
NotifyAll();
}

Future<> WaitForNextReleasedBatch() {
std::lock_guard lg(mutex_);
if (current_waiter_.is_valid()) {
return current_waiter_;
}
Future<> fut;
if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) {
num_released_batches_++;
return Future<>::MakeFinished();
}

current_waiter_ = Future<>::Make();
return current_waiter_;
}

private:
void NotifyAll() {
if (current_waiter_.is_valid()) {
Future<> to_unlock = current_waiter_;
current_waiter_ = {};
to_unlock.MarkFinished();
}
}

std::function<bool()> gate;
Future<> current_waiter_;
int num_released_batches_ = 0;
int num_allowed_batches_ = 0;
std::mutex mutex_;
};

struct BackpressureDelayingNode : public MapNode {
struct GatedNodeOptions : public ExecNodeOptions {
explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
Gate* gate;
};

struct GatedNode : public ExecNode, public TracedNode {
static constexpr auto kKindName = "BackpressureDelayingNode";
static constexpr const char* kFactoryName = "backpressure_delay";
Comment on lines 1471 to 1472
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change these constants to "GatedNode" and "gated".


static void Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make));
ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make));
}
}

BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const BackpressureDelayingNodeOptions& options)
: MapNode(plan, inputs, output_schema), gate(options.gate) {}
GatedNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, const GatedNodeOptions& options)
: ExecNode(plan, inputs, {"input"}, output_schema),
TracedNode(this),
gate_(options.gate) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
auto bp_options = static_cast<const BackpressureDelayingNodeOptions&>(options);
return plan->EmplaceNode<BackpressureDelayingNode>(
plan, inputs, inputs[0]->output_schema(), bp_options);
auto gated_node_opts = static_cast<const GatedNodeOptions&>(options);
return plan->EmplaceNode<GatedNode>(plan, inputs, inputs[0]->output_schema(),
gated_node_opts);
}

const char* kind_name() const override { return kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
while (!gate()) {
SleepABit();

const Ordering& ordering() const override { return inputs_[0]->ordering(); }
Status InputFinished(ExecNode* input, int total_batches) override {
return output_->InputFinished(this, total_batches);
}
Status StartProducing() override {
NoteStartProducing(ToStringExtra());
return Status::OK();
}

void PauseProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->PauseProducing(this, counter);
}

void ResumeProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->ResumeProducing(this, counter);
}

Status StopProducingImpl() override { return Status::OK(); }

Status SendBatchesUnlocked(std::unique_lock<std::mutex>&& lock) {
while (!queued_batches_.empty()) {
// If we are ready to release the batch, do so immediately.
Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch();
bool callback_added = maybe_unlocked.TryAddCallback([this] {
return [this](const Status& st) {
DCHECK_OK(st);
plan_->query_context()->ScheduleTask(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of a (not so related) question I had. In asof join node, should we call

output_->InputReceived(this, std::move(out_b)) within ScheduleTask (since this is on the processing thread)

https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#LL1356C21-L1356C67

And in general, when to call InputReceived directly vs calling InputReceived within a ScheduleTask?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of a (not so related) question I had. In asof join node, should we call

output_->InputReceived(this, std::move(out_b)) within ScheduleTask (since this is on the processing thread)

Yes, I noticed that as well. You probably should. That being said, it probably won't make too much difference since you're running everything single threaded anyways. However, even that isn't really true. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream. If you use schedule task then you'd just be transferring more work to the other thread so it might actually hurt.

So I was going to wait and make this suggestion when / if you add multithreading support to asof-join.

And in general, when to call InputReceived directly vs calling InputReceived within a ScheduleTask?

You should call it directly if you are going to keep processing the data that is in the current thread's CPU cache. You should schedule a new task if you are going to start processing a batch of data that isn't in the current thread's CPU cache.

This case fits the second condition. However, this case is also a special case anyways. This is because the thread that executes that callback will be the unit test thread (as part of the call to ReleaseAllBatches). We definitely want to transfer to the scheduler anytime we are coming from "outside the exec plan".

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream.

A bit more accurately, this "thread-splitting" occurs for each as-of-join node in the plan. We could create an issue to replace the internal-as-of-join-thread with some kind of execution facility.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream.

Yeah that has been something that I have slight concern over. I don't love the fact that downstream works, e.g., projections happens outside the serial execution thread. I was trying to get to the model where the asof join processing thread pulls data from the (buffered) asof join input queues, do the join and send the output to the downstream via the scheduler. So from downstream node point of view, the asof join processing thread is transparent to any other node and purely something internal to the asof join node.

It doesn't really bring any performance benefit I think (like you mentioned, it might actually make it run slower because more work is shifted to the scheduler thread), but I do like the simpler execution model of it (processing thread being transparent). And I think if we want to do that, we can do this by calling output_->InputReceived(this, std::move(out_b)) with ScheduleTask. (Yaron is working on some internal benchmark suite so once that is ready we can play with this more)

[this] {
std::unique_lock lk(mutex_);
return SendBatchesUnlocked(std::move(lk));
},
"GatedNode::ResumeAfterNotify");
};
});
if (callback_added) {
break;
}
// Otherwise, the future is already finished which means the gate is unlocked
// and we are allowed to send a batch
ExecBatch next = std::move(queued_batches_.front());
queued_batches_.pop();
lock.unlock();
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next)));
lock.lock();
}
return batch;
return Status::OK();
}

Status InputReceived(ExecNode* input, ExecBatch batch) override {
auto scope = TraceInputReceived(batch);
DCHECK_EQ(input, inputs_[0]);

// This may be called concurrently by the source and by a restart attempt. Process
// one at a time (this critical section should be pretty small)
std::unique_lock lk(mutex_);
queued_batches_.push(std::move(batch));

return SendBatchesUnlocked(std::move(lk));
}

std::function<bool()> gate;
Gate* gate_;
std::queue<ExecBatch> queued_batches_;
std::mutex mutex_;
};

AsyncGenerator<std::optional<ExecBatch>> GetGen(
AsyncGenerator<std::optional<ExecBatch>> gen) {
return gen;
}
AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
return bws.gen(false, false);
}

template <typename BatchesMaker>
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
double fast_delay, double slow_delay, bool noisy = false) {
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
Expand All @@ -1474,22 +1588,27 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));

BackpressureCountingNode::Register();
BackpressureDelayingNode::Register();
GatedNode::Register();

struct BackpressureSourceConfig {
std::string name_prefix;
bool is_fast;
bool is_gated;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;

std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); }
std::string name() const {
return name_prefix + ";" + (is_gated ? "gated" : "ungated");
}
};

// must have at least one fast and one slow
Gate gate;
GatedNodeOptions gate_options(&gate);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see gate_options is used anywhere. @westonpace, are you sure the tester is doing what you intended?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I must be missing something or getting lucky with timing


// Two ungated and one gated
std::vector<BackpressureSourceConfig> source_configs = {
{"0", true, l_schema, l_batches},
{"1", false, r0_schema, r0_batches},
{"2", true, r1_schema, r1_batches},
{"0", false, l_schema, l_batches},
{"1", true, r0_schema, r0_batches},
{"2", false, r1_schema, r1_batches},
};

std::vector<BackpressureCounters> bp_counters(source_configs.size());
Expand All @@ -1498,59 +1617,55 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];
src_decls.emplace_back(
"source", SourceNodeOptions(
config.schema,
MakeDelayedGen(config.batches, config.name(),
config.is_fast ? fast_delay : slow_delay, noisy)));

src_decls.emplace_back("source",
SourceNodeOptions(config.schema, GetGen(config.batches)));
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
std::vector<Declaration::Input> bp_in = {src_decls.back()};
Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
std::move(options)};
if (config.is_gated) {
bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
}
bp_decls.push_back(bp_decl);
}

Declaration asofjoin = {
"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)};
Declaration asofjoin = {"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};

ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
internal::ThreadPool::Make(1));
ExecContext exec_ctx(default_memory_pool(), tpool.get());
Future<BatchesWithCommonSchema> batches_fut =
DeclarationToExecBatchesAsync(asofjoin, exec_ctx);

BackpressureDelayingNodeOptions delay_options([&bp_counters]() {
auto has_bp_been_applied = [&] {
int total_paused = 0;
for (const auto& counters : bp_counters) {
if (counters.pause_count > 0 || counters.resume_count > 0) {
return true;
}
total_paused += counters.pause_count;
}
return false;
});
Declaration delaying = {
BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options};

ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(delaying, /*use_threads=*/false));

int64_t total_length = 0;
for (;;) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
if (!batch) {
break;
}
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
// One of the inputs is gated. The other two will eventually be paused by the asof
// join node
return total_paused >= 2;
};

BusyWait(10.0, has_bp_been_applied);
ASSERT_TRUE(has_bp_been_applied());

gate.ReleaseAllBatches();
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);

size_t total_count = 0;
size_t total_resumed = 0;
for (const auto& counters : bp_counters) {
total_count += counters.pause_count;
total_count += counters.resume_count;
total_resumed += counters.resume_count;
}
ASSERT_GT(total_count, 0);
ASSERT_GE(total_resumed, 2);
}

TEST(AsofJoinTest, BackpressureWithBatches) {
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1,
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
}

template <typename BatchesMaker>
Expand Down Expand Up @@ -1615,8 +1730,7 @@ T GetEnvValue(const std::string& var, T default_value) {
TEST(AsofJoinTest, BackpressureWithBatchesGen) {
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
/*fast_delay=*/0.001, /*slow_delay=*/0.01);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
}

} // namespace acero
Expand Down