From 1300996bed2531c8c792d10b6d1d156f03622970 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Mon, 15 May 2023 21:50:24 +0800 Subject: [PATCH 01/13] order_by_external_impl init --- cpp/src/arrow/acero/order_by_external_impl.cc | 85 +++++++++++++++++++ cpp/src/arrow/acero/order_by_external_impl.h | 53 ++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 cpp/src/arrow/acero/order_by_external_impl.cc create mode 100644 cpp/src/arrow/acero/order_by_external_impl.h diff --git a/cpp/src/arrow/acero/order_by_external_impl.cc b/cpp/src/arrow/acero/order_by_external_impl.cc new file mode 100644 index 0000000000000..fa04c2b3cd181 --- /dev/null +++ b/cpp/src/arrow/acero/order_by_external_impl.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/acero/order_by_external_impl.h" + +#include +#include +#include +#include +#include "arrow/acero/options.h" +#include "arrow/compute/api_vector.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" + +namespace arrow { + +using internal::checked_cast; + +using compute::TakeOptions; + +namespace acero { + +class SortBasicExternalImpl : public OrderByExternalImpl { + public: + SortBasicExternalImpl(ExecContext* ctx, const std::shared_ptr& output_schema, + const SortOptions& options = SortOptions{},int64_t buffer_size, std::string external_storage_path) + : ctx_(ctx), output_schema_(output_schema), options_(options), + buffer_size_(buffer_size), external_storage_path_(external_storage_path) {} + + void InputReceived(const std::shared_ptr& batch) override { + std::unique_lock lock(mutex_); + batches_.push_back(batch); + } + + Result DoFinish() override { + std::unique_lock lock(mutex_); + ARROW_ASSIGN_OR_RAISE(auto table, + Table::FromRecordBatches(output_schema_, std::move(batches_))); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); + return Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_); + } + + // todo tostring() + std::string ToString() const override { return options_.ToString(); } + + protected: + ExecContext* ctx_; + std::shared_ptr output_schema_; + std::mutex mutex_; + std::vector> batches_; + + private: + const SortOptions options_; + int64_t buffer_size_; + std::string external_storage_path_; +}; // namespace compute + +Result> OrderByExternalImpl::MakeSort( + ExecContext* ctx, const std::shared_ptr& output_schema, + const SortOptions& options, int64_t buffer_size, std::string external_storage_path) { + std::unique_ptr impl{ + new SortBasicExternalImpl(ctx, output_schema, options, buffer_size, external_storage_path)}; + return std::move(impl); +} + +} // namespace acero +} // namespace arrow diff --git a/cpp/src/arrow/acero/order_by_external_impl.h b/cpp/src/arrow/acero/order_by_external_impl.h new file mode 100644 index 0000000000000..47df3c9fac495 --- /dev/null +++ b/cpp/src/arrow/acero/order_by_external_impl.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/acero/options.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" + +namespace arrow { + +using compute::ExecContext; + +namespace acero { + +class OrderByExternalImpl { + public: + virtual ~OrderByExternalImpl() = default; + + virtual void InputReceived(const std::shared_ptr& batch) = 0; + + virtual Result DoFinish() = 0; + + virtual std::string ToString() const = 0; + + static Result> MakeSort( + ExecContext* ctx, const std::shared_ptr& output_schema, + const SortOptions& options, int64_t buffer_size, std::string external_storage_path); +}; + +} // namespace acero +} // namespace arrow From df0bb636a87646cef93c3175acce36feda7a5334 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Tue, 16 May 2023 23:19:07 +0800 Subject: [PATCH 02/13] Init Input Received --- cpp/src/arrow/acero/order_by_external_impl.cc | 24 +++++++++++++++++-- cpp/src/arrow/acero/order_by_external_impl.h | 2 +- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/order_by_external_impl.cc b/cpp/src/arrow/acero/order_by_external_impl.cc index fa04c2b3cd181..132d3feeaf365 100644 --- a/cpp/src/arrow/acero/order_by_external_impl.cc +++ b/cpp/src/arrow/acero/order_by_external_impl.cc @@ -43,17 +43,36 @@ class SortBasicExternalImpl : public OrderByExternalImpl { SortBasicExternalImpl(ExecContext* ctx, const std::shared_ptr& output_schema, const SortOptions& options = SortOptions{},int64_t buffer_size, std::string external_storage_path) : ctx_(ctx), output_schema_(output_schema), options_(options), - buffer_size_(buffer_size), external_storage_path_(external_storage_path) {} + buffer_size_(buffer_size), external_storage_path_(external_storage_path), batches_size_(0) {} - void InputReceived(const std::shared_ptr& batch) override { + Status InputReceived(const std::shared_ptr& batch) override { std::unique_lock lock(mutex_); batches_.push_back(batch); + + int64_t batch_size = 0; + for (auto column : batch->columns()) { + for (auto buffer : column->data()->buffers) { + batch_size += buffer->size(); + } + } + batches_size_ += batch_size; + + if (batches_size_ >= buffer_size_) { + batches_size_ = 0; + ARROW_ASSIGN_OR_RAISE(auto table, + Table::FromRecordBatches(output_schema_, std::move(batches_))); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); + Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_); + } + + return Status::OK(); } Result DoFinish() override { std::unique_lock lock(mutex_); ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(output_schema_, std::move(batches_))); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); return Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_); } @@ -66,6 +85,7 @@ class SortBasicExternalImpl : public OrderByExternalImpl { std::shared_ptr output_schema_; std::mutex mutex_; std::vector> batches_; + int64_t batches_size_; private: const SortOptions options_; diff --git a/cpp/src/arrow/acero/order_by_external_impl.h b/cpp/src/arrow/acero/order_by_external_impl.h index 47df3c9fac495..228a26076902d 100644 --- a/cpp/src/arrow/acero/order_by_external_impl.h +++ b/cpp/src/arrow/acero/order_by_external_impl.h @@ -38,7 +38,7 @@ class OrderByExternalImpl { public: virtual ~OrderByExternalImpl() = default; - virtual void InputReceived(const std::shared_ptr& batch) = 0; + virtual Status InputReceived(const std::shared_ptr& batch) = 0; virtual Result DoFinish() = 0; From c1bd294bb5414aea0fc49d50333174d1e7fdd3d1 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 17 May 2023 09:54:49 +0800 Subject: [PATCH 03/13] sort in receive --- cpp/src/arrow/acero/order_by_external_impl.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/order_by_external_impl.cc b/cpp/src/arrow/acero/order_by_external_impl.cc index 132d3feeaf365..9beab47270ef2 100644 --- a/cpp/src/arrow/acero/order_by_external_impl.cc +++ b/cpp/src/arrow/acero/order_by_external_impl.cc @@ -43,7 +43,8 @@ class SortBasicExternalImpl : public OrderByExternalImpl { SortBasicExternalImpl(ExecContext* ctx, const std::shared_ptr& output_schema, const SortOptions& options = SortOptions{},int64_t buffer_size, std::string external_storage_path) : ctx_(ctx), output_schema_(output_schema), options_(options), - buffer_size_(buffer_size), external_storage_path_(external_storage_path), batches_size_(0) {} + buffer_size_(buffer_size), external_storage_path_(external_storage_path), + batches_size_(0), files_count_(0) {} Status InputReceived(const std::shared_ptr& batch) override { std::unique_lock lock(mutex_); @@ -58,11 +59,12 @@ class SortBasicExternalImpl : public OrderByExternalImpl { batches_size_ += batch_size; if (batches_size_ >= buffer_size_) { - batches_size_ = 0; ARROW_ASSIGN_OR_RAISE(auto table, - Table::FromRecordBatches(output_schema_, std::move(batches_))); + Table::FromRecordBatches(output_schema_, std::move(batches_))); //todo check batches_ ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); - Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_)); + files_count_++; + batches_size_ = 0; } return Status::OK(); @@ -71,8 +73,7 @@ class SortBasicExternalImpl : public OrderByExternalImpl { Result DoFinish() override { std::unique_lock lock(mutex_); ARROW_ASSIGN_OR_RAISE(auto table, - Table::FromRecordBatches(output_schema_, std::move(batches_))); - + Table::FromRecordBatches(output_schema_, std::move(batches_))); ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); return Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_); } @@ -86,6 +87,7 @@ class SortBasicExternalImpl : public OrderByExternalImpl { std::mutex mutex_; std::vector> batches_; int64_t batches_size_; + int64_t files_count_; private: const SortOptions options_; From a355c50812e2041760464d614b3cf40fda44077a Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Tue, 30 May 2023 23:34:21 +0800 Subject: [PATCH 04/13] init push back in OrderedSpillingAccumulationQueue --- cpp/src/arrow/acero/order_by_external_impl.cc | 107 ------------------ cpp/src/arrow/acero/order_by_external_impl.h | 53 --------- cpp/src/arrow/acero/order_by_node.cc | 91 +++++++++++++++ 3 files changed, 91 insertions(+), 160 deletions(-) delete mode 100644 cpp/src/arrow/acero/order_by_external_impl.cc delete mode 100644 cpp/src/arrow/acero/order_by_external_impl.h diff --git a/cpp/src/arrow/acero/order_by_external_impl.cc b/cpp/src/arrow/acero/order_by_external_impl.cc deleted file mode 100644 index 9beab47270ef2..0000000000000 --- a/cpp/src/arrow/acero/order_by_external_impl.cc +++ /dev/null @@ -1,107 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/acero/order_by_external_impl.h" - -#include -#include -#include -#include -#include "arrow/acero/options.h" -#include "arrow/compute/api_vector.h" -#include "arrow/record_batch.h" -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/table.h" -#include "arrow/type.h" -#include "arrow/util/checked_cast.h" - -namespace arrow { - -using internal::checked_cast; - -using compute::TakeOptions; - -namespace acero { - -class SortBasicExternalImpl : public OrderByExternalImpl { - public: - SortBasicExternalImpl(ExecContext* ctx, const std::shared_ptr& output_schema, - const SortOptions& options = SortOptions{},int64_t buffer_size, std::string external_storage_path) - : ctx_(ctx), output_schema_(output_schema), options_(options), - buffer_size_(buffer_size), external_storage_path_(external_storage_path), - batches_size_(0), files_count_(0) {} - - Status InputReceived(const std::shared_ptr& batch) override { - std::unique_lock lock(mutex_); - batches_.push_back(batch); - - int64_t batch_size = 0; - for (auto column : batch->columns()) { - for (auto buffer : column->data()->buffers) { - batch_size += buffer->size(); - } - } - batches_size_ += batch_size; - - if (batches_size_ >= buffer_size_) { - ARROW_ASSIGN_OR_RAISE(auto table, - Table::FromRecordBatches(output_schema_, std::move(batches_))); //todo check batches_ - ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); - ARROW_ASSIGN_OR_RAISE(auto sorted_table, Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_)); - files_count_++; - batches_size_ = 0; - } - - return Status::OK(); - } - - Result DoFinish() override { - std::unique_lock lock(mutex_); - ARROW_ASSIGN_OR_RAISE(auto table, - Table::FromRecordBatches(output_schema_, std::move(batches_))); - ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, options_, ctx_)); - return Take(table, indices, TakeOptions::NoBoundsCheck(), ctx_); - } - - // todo tostring() - std::string ToString() const override { return options_.ToString(); } - - protected: - ExecContext* ctx_; - std::shared_ptr output_schema_; - std::mutex mutex_; - std::vector> batches_; - int64_t batches_size_; - int64_t files_count_; - - private: - const SortOptions options_; - int64_t buffer_size_; - std::string external_storage_path_; -}; // namespace compute - -Result> OrderByExternalImpl::MakeSort( - ExecContext* ctx, const std::shared_ptr& output_schema, - const SortOptions& options, int64_t buffer_size, std::string external_storage_path) { - std::unique_ptr impl{ - new SortBasicExternalImpl(ctx, output_schema, options, buffer_size, external_storage_path)}; - return std::move(impl); -} - -} // namespace acero -} // namespace arrow diff --git a/cpp/src/arrow/acero/order_by_external_impl.h b/cpp/src/arrow/acero/order_by_external_impl.h deleted file mode 100644 index 228a26076902d..0000000000000 --- a/cpp/src/arrow/acero/order_by_external_impl.h +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "arrow/acero/options.h" -#include "arrow/record_batch.h" -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/type.h" - -namespace arrow { - -using compute::ExecContext; - -namespace acero { - -class OrderByExternalImpl { - public: - virtual ~OrderByExternalImpl() = default; - - virtual Status InputReceived(const std::shared_ptr& batch) = 0; - - virtual Result DoFinish() = 0; - - virtual std::string ToString() const = 0; - - static Result> MakeSort( - ExecContext* ctx, const std::shared_ptr& output_schema, - const SortOptions& options, int64_t buffer_size, std::string external_storage_path); -}; - -} // namespace acero -} // namespace arrow diff --git a/cpp/src/arrow/acero/order_by_node.cc b/cpp/src/arrow/acero/order_by_node.cc index 1811fa9f4c73c..8a70f65ef17af 100644 --- a/cpp/src/arrow/acero/order_by_node.cc +++ b/cpp/src/arrow/acero/order_by_node.cc @@ -30,12 +30,17 @@ #include "arrow/table.h" #include "arrow/util/checked_cast.h" #include "arrow/util/tracing_internal.h" +#include "arrow/util/type_fwd.h" +#include "parquet/arrow/writer.h" +#include "parquet/arrow/reader.h" namespace arrow { using internal::checked_cast; using compute::TakeOptions; +using parquet::ArrowWriterProperties; +using parquet::WriterProperties; namespace acero { namespace { @@ -154,6 +159,92 @@ class OrderByNode : public ExecNode, public TracedNode { std::mutex mutex_; }; +class OrderedSpillingAccumulationQueue { + public: + OrderedSpillingAccumulationQueue(int64_t buffer_size, std::string path_to_folder, + ExecPlan* plan, std::shared_ptr output_schema, + Ordering new_ordering) + : buffer_size_(buffer_size), + plan_(plan), + output_schema_(output_schema), + path_to_folder_(path_to_folder), + ordering_(new_ordering), + accumulation_queue_size_(0), + spill_count_(0) {} + + // Inserts a batch into the queue. This may trigger a write to disk if enough data is + // accumulated If it does, then SpillCount should be incremented before this method + // returns (but the write can happen in the background, asynchronously) + Status push_back(std::shared_ptr record_batch) { + mutex_.lock(); + accumulation_queue_.push_back(record_batch); + accumulation_queue_size_ += record_batch->num_rows(); + + if (accumulation_queue_size_ >= buffer_size_) { + spill_count_++; + ARROW_ASSIGN_OR_RAISE( + auto table, + Table::FromRecordBatches( + output_schema_, std::move(accumulation_queue_))); // todo check batches_ + accumulation_queue_size_ = 0; + accumulation_queue_ = make_shared>>(); + mutex_.unlock(); + + // sort + SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); + ExecContext* ctx = plan_->query_context()->exec_context(); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, + Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); + std::shared_ptr sorted_table_ptr = sorted_table.table(); + // write to external storage + std::string folder_path = path_to_folder_ + "/0_sort_" + spill_count_ + ".parquet"; + std::shared_ptr props = + WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, + arrow::io::FileOutputStream::Open(folder_path)); + plan_->query_context()->ScheduleIOTask( + [sorted_table_ptr, outfile, props, arrow_props]() mutable { + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( + *(sorted_table_ptr.get()), arrow::default_memory_pool(), outfile, + /*chunk_size=*/3, props, arrow_props)); + }, + "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); + + } else { + mutex_.unlock(); + } + return Status::OK(); + } + + // The number of files that have been written to disk. This should also include any data in memory + // so it will be the number of files written to disk + 1 if there is in-memory data. + int SpillCount() { + { + std::lock_guard lk(mutex_); + return spill_count_; + } + } + + // This should only be called after all calls to InsertBatch have been completed. This starts reading + // the data that was spilled. It will grab the next batch of data from the given spilled file. If spill_index + // == SpillCount() - 1 then this might be data that is already in-memory. + Future> FetchNextBatch(int spill_index); + + private: + std::mutex mutex_; + std::vector> accumulation_queue_; + int64_t spill_count_; + int64_t accumulation_queue_size_; + int64_t buffer_size_; + std::shared_ptr output_schema_; + std::string path_to_folder_; + Ordering ordering_; + ExecPlan* plan_ ; +}; + } // namespace namespace internal { From 803c58efe03be17d0e855e7ab6a4aa3e58233f8d Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Thu, 1 Jun 2023 09:36:27 +0800 Subject: [PATCH 05/13] independent node --- cpp/src/arrow/acero/external_order_by_node.cc | 258 ++++++++++++++++++ cpp/src/arrow/acero/order_by_node.cc | 93 +------ 2 files changed, 259 insertions(+), 92 deletions(-) create mode 100644 cpp/src/arrow/acero/external_order_by_node.cc diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc new file mode 100644 index 0000000000000..a7a28940888ca --- /dev/null +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/util.h" +#include "arrow/result.h" +#include "arrow/table.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/tracing_internal.h" +#include "arrow/util/type_fwd.h" +#include "parquet/arrow/writer.h" +#include "parquet/arrow/reader.h" + +namespace arrow { + +using internal::checked_cast; + +using compute::TakeOptions; +using parquet::ArrowWriterProperties; +using parquet::WriterProperties; + +namespace acero { +namespace { + +class OrderByNode : public ExecNode, public TracedNode { + public: + OrderByNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, Ordering new_ordering) + : ExecNode(plan, std::move(inputs), {"input"}, std::move(output_schema)), + TracedNode(this), + ordering_(std::move(new_ordering)) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "FetchNode")); + + const auto& order_options = checked_cast(options); + + if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { + return Status::Invalid("`ordering` must be an explicit non-empty ordering"); + } + + std::shared_ptr output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode( + plan, std::move(inputs), std::move(output_schema), order_options.ordering); + } + + const char* kind_name() const override { return "OrderByNode"; } + + const Ordering& ordering() const override { return ordering_; } + + Status InputFinished(ExecNode* input, int total_batches) override { + DCHECK_EQ(input, inputs_[0]); + EVENT_ON_CURRENT_SPAN("InputFinished", {{"batches.length", total_batches}}); + // We can't send InputFinished downstream because we might change the # of batches + // when we sort it. So that happens later in DoFinish + if (counter_.SetTotal(total_batches)) { + return DoFinish(); + } + return Status::OK(); + } + + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, + batch.ToRecordBatch(output_schema_)); + + { + std::lock_guard lk(mutex_); + accumulation_queue_.push_back(std::move(record_batch)); + } + + if (counter_.Increment()) { + return DoFinish(); + } + return Status::OK(); + } + + Status DoFinish() { + ARROW_ASSIGN_OR_RAISE( + auto table, + Table::FromRecordBatches(output_schema_, std::move(accumulation_queue_))); + SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); + ExecContext* ctx = plan_->query_context()->exec_context(); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); + ARROW_ASSIGN_OR_RAISE(Datum sorted, + Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); + const std::shared_ptr& sorted_table = sorted.table(); + TableBatchReader reader(*sorted_table); + reader.set_chunksize(ExecPlan::kMaxBatchSize); + int batch_index = 0; + while (true) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr next, reader.Next()); + if (!next) { + return output_->InputFinished(this, batch_index); + } + int index = batch_index++; + plan_->query_context()->ScheduleTask( + [this, batch = std::move(next), index]() mutable { + ExecBatch exec_batch(*batch); + exec_batch.index = index; + return output_->InputReceived(this, std::move(exec_batch)); + }, + "OrderByNode::ProcessBatch"); + } + } + + protected: + std::string ToStringExtra(int indent = 0) const override { + std::stringstream ss; + ss << "ordering=" << ordering_.ToString(); + return ss.str(); + } + + private: + AtomicCounter counter_; + Ordering ordering_; + std::vector> accumulation_queue_; + std::mutex mutex_; +}; + +class OrderedSpillingAccumulationQueue { + public: + OrderedSpillingAccumulationQueue(int64_t buffer_size, std::string path_to_folder, + ExecPlan* plan, std::shared_ptr output_schema, + Ordering new_ordering) + : buffer_size_(buffer_size), + plan_(plan), + output_schema_(output_schema), + path_to_folder_(path_to_folder), + ordering_(new_ordering), + accumulation_queue_size_(0), + spill_count_(0) {} + + // Inserts a batch into the queue. This may trigger a write to disk if enough data is + // accumulated If it does, then SpillCount should be incremented before this method + // returns (but the write can happen in the background, asynchronously) + Status push_back(std::shared_ptr record_batch) { + mutex_.lock(); + accumulation_queue_.push_back(record_batch); + accumulation_queue_size_ += record_batch->num_rows(); + + if (accumulation_queue_size_ >= buffer_size_) { + spill_count_++; + ARROW_ASSIGN_OR_RAISE( + auto table, + Table::FromRecordBatches( + output_schema_, std::move(accumulation_queue_))); // todo check batches_ + accumulation_queue_size_ = 0; + accumulation_queue_ = make_shared>>(); + mutex_.unlock(); + + // sort + SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); + ExecContext* ctx = plan_->query_context()->exec_context(); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, + Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); + std::shared_ptr sorted_table_ptr = sorted_table.table(); + // write to external storage + std::string folder_path = path_to_folder_ + "/0_sort_" + spill_count_ + ".parquet"; + std::shared_ptr props = + WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, + arrow::io::FileOutputStream::Open(folder_path)); + plan_->query_context()->ScheduleIOTask( + [sorted_table_ptr, outfile, props, arrow_props]() mutable { + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( + *(sorted_table_ptr.get()), arrow::default_memory_pool(), outfile, + /*chunk_size=*/3, props, arrow_props)); + }, + "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); + } else { + mutex_.unlock(); + } + return Status::OK(); + } + + // The number of files that have been written to disk. This should also include any data in memory + // so it will be the number of files written to disk + 1 if there is in-memory data. + int SpillCount() { + { + std::lock_guard lk(mutex_); + return spill_count_; + } + } + + // This should only be called after all calls to InsertBatch have been completed. This starts reading + // the data that was spilled. It will grab the next batch of data from the given spilled file. If spill_index + // == SpillCount() - 1 then this might be data that is already in-memory. + Future> FetchNextBatch(int spill_index); + + private: + std::mutex mutex_; + std::vector> accumulation_queue_; + int64_t spill_count_; + int64_t accumulation_queue_size_; + int64_t buffer_size_; + std::shared_ptr output_schema_; + std::string path_to_folder_; + Ordering ordering_; + ExecPlan* plan_ ; +}; + +} // namespace + +namespace internal { + +void RegisterOrderByNode(ExecFactoryRegistry* registry) { + DCHECK_OK( + registry->AddFactory(std::string(OrderByNodeOptions::kName), OrderByNode::Make)); +} + +} // namespace internal +} // namespace acero +} // namespace arrow diff --git a/cpp/src/arrow/acero/order_by_node.cc b/cpp/src/arrow/acero/order_by_node.cc index 8a70f65ef17af..8616896cb1733 100644 --- a/cpp/src/arrow/acero/order_by_node.cc +++ b/cpp/src/arrow/acero/order_by_node.cc @@ -30,17 +30,12 @@ #include "arrow/table.h" #include "arrow/util/checked_cast.h" #include "arrow/util/tracing_internal.h" -#include "arrow/util/type_fwd.h" -#include "parquet/arrow/writer.h" -#include "parquet/arrow/reader.h" namespace arrow { using internal::checked_cast; using compute::TakeOptions; -using parquet::ArrowWriterProperties; -using parquet::WriterProperties; namespace acero { namespace { @@ -159,92 +154,6 @@ class OrderByNode : public ExecNode, public TracedNode { std::mutex mutex_; }; -class OrderedSpillingAccumulationQueue { - public: - OrderedSpillingAccumulationQueue(int64_t buffer_size, std::string path_to_folder, - ExecPlan* plan, std::shared_ptr output_schema, - Ordering new_ordering) - : buffer_size_(buffer_size), - plan_(plan), - output_schema_(output_schema), - path_to_folder_(path_to_folder), - ordering_(new_ordering), - accumulation_queue_size_(0), - spill_count_(0) {} - - // Inserts a batch into the queue. This may trigger a write to disk if enough data is - // accumulated If it does, then SpillCount should be incremented before this method - // returns (but the write can happen in the background, asynchronously) - Status push_back(std::shared_ptr record_batch) { - mutex_.lock(); - accumulation_queue_.push_back(record_batch); - accumulation_queue_size_ += record_batch->num_rows(); - - if (accumulation_queue_size_ >= buffer_size_) { - spill_count_++; - ARROW_ASSIGN_OR_RAISE( - auto table, - Table::FromRecordBatches( - output_schema_, std::move(accumulation_queue_))); // todo check batches_ - accumulation_queue_size_ = 0; - accumulation_queue_ = make_shared>>(); - mutex_.unlock(); - - // sort - SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); - ExecContext* ctx = plan_->query_context()->exec_context(); - ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); - ARROW_ASSIGN_OR_RAISE(auto sorted_table, - Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); - std::shared_ptr sorted_table_ptr = sorted_table.table(); - // write to external storage - std::string folder_path = path_to_folder_ + "/0_sort_" + spill_count_ + ".parquet"; - std::shared_ptr props = - WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); - std::shared_ptr arrow_props = - ArrowWriterProperties::Builder().store_schema()->build(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, - arrow::io::FileOutputStream::Open(folder_path)); - plan_->query_context()->ScheduleIOTask( - [sorted_table_ptr, outfile, props, arrow_props]() mutable { - ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( - *(sorted_table_ptr.get()), arrow::default_memory_pool(), outfile, - /*chunk_size=*/3, props, arrow_props)); - }, - "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); - - } else { - mutex_.unlock(); - } - return Status::OK(); - } - - // The number of files that have been written to disk. This should also include any data in memory - // so it will be the number of files written to disk + 1 if there is in-memory data. - int SpillCount() { - { - std::lock_guard lk(mutex_); - return spill_count_; - } - } - - // This should only be called after all calls to InsertBatch have been completed. This starts reading - // the data that was spilled. It will grab the next batch of data from the given spilled file. If spill_index - // == SpillCount() - 1 then this might be data that is already in-memory. - Future> FetchNextBatch(int spill_index); - - private: - std::mutex mutex_; - std::vector> accumulation_queue_; - int64_t spill_count_; - int64_t accumulation_queue_size_; - int64_t buffer_size_; - std::shared_ptr output_schema_; - std::string path_to_folder_; - Ordering ordering_; - ExecPlan* plan_ ; -}; - } // namespace namespace internal { @@ -256,4 +165,4 @@ void RegisterOrderByNode(ExecFactoryRegistry* registry) { } // namespace internal } // namespace acero -} // namespace arrow +} // namespace arrow \ No newline at end of file From f031cb465a480e7905e8865c773c59ae0e415f46 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Thu, 1 Jun 2023 09:57:00 +0800 Subject: [PATCH 06/13] init external option --- cpp/src/arrow/acero/external_order_by_node.cc | 26 ++++++++++++------- cpp/src/arrow/acero/options.h | 14 ++++++++++ 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc index a7a28940888ca..a04e49df9923f 100644 --- a/cpp/src/arrow/acero/external_order_by_node.cc +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -45,30 +45,35 @@ using parquet::WriterProperties; namespace acero { namespace { -class OrderByNode : public ExecNode, public TracedNode { +class ExternalOrderByNode : public ExecNode, public TracedNode { public: OrderByNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Ordering new_ordering) + std::shared_ptr output_schema, Ordering new_ordering, + int64_t buffer_size, std::string path_to_folder) : ExecNode(plan, std::move(inputs), {"input"}, std::move(output_schema)), TracedNode(this), - ordering_(std::move(new_ordering)) {} + ordering_(std::move(new_ordering)), + buffer_size_(buffer_size), + path_to_folder_(path_to_folder) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "FetchNode")); - - const auto& order_options = checked_cast(options); + const auto& order_options = checked_cast(options); if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { return Status::Invalid("`ordering` must be an explicit non-empty ordering"); } + //todo check buffer_size && path_to_folder + std::shared_ptr output_schema = inputs[0]->output_schema(); - return plan->EmplaceNode( - plan, std::move(inputs), std::move(output_schema), order_options.ordering); + return plan->EmplaceNode( + plan, std::move(inputs), std::move(output_schema), order_options.ordering, + order_options.buffer_size, order_options.path_to_folder); } - const char* kind_name() const override { return "OrderByNode"; } + const char* kind_name() const override { return "ExternalOrderByNode"; } const Ordering& ordering() const override { return ordering_; } @@ -148,14 +153,15 @@ class OrderByNode : public ExecNode, public TracedNode { protected: std::string ToStringExtra(int indent = 0) const override { std::stringstream ss; - ss << "ordering=" << ordering_.ToString(); + ss << "external ordering=" << ordering_.ToString(); return ss.str(); } private: AtomicCounter counter_; + int64_t buffer_size_; + std::string path_to_folder_; Ordering ordering_; - std::vector> accumulation_queue_; std::mutex mutex_; }; diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index b1ab6b5d9d75a..41fa2a6434e2a 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -542,6 +542,20 @@ class ARROW_ACERO_EXPORT OrderByNodeOptions : public ExecNodeOptions { Ordering ordering; }; +class ARROW_ACERO_EXPORT ExternalOrderByNodeOptions : public OrderByNodeOptions { + public: + static constexpr std::string_view kName = "external_order_by"; + explicit OrderByNodeOptions(Ordering ordering, int64_t buffer_size, + std::string path_to_folder) + : OrderByNodeOptions(ordering), + buffer_size(std::move(buffer_size)), + path_to_folder(std::move(path_to_folder)) {} + + /// \brief buffer_size defines the number of row buffer can hold, path_to_folder defines temporal folder used for spillover. + int64_t buffer_size; + std::string path_to_folder; +}; + enum class JoinType { LEFT_SEMI, RIGHT_SEMI, From 92a90f03f66f815a50be2c1fa18de5fed38e3e6a Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sat, 3 Jun 2023 12:22:13 +0800 Subject: [PATCH 07/13] push_finshed --- cpp/src/arrow/acero/external_order_by_node.cc | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc index a04e49df9923f..0b93ab75902e1 100644 --- a/cpp/src/arrow/acero/external_order_by_node.cc +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -47,14 +47,15 @@ namespace { class ExternalOrderByNode : public ExecNode, public TracedNode { public: - OrderByNode(ExecPlan* plan, std::vector inputs, + ExternalOrderByNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, Ordering new_ordering, int64_t buffer_size, std::string path_to_folder) - : ExecNode(plan, std::move(inputs), {"input"}, std::move(output_schema)), + : ExecNode(plan, std::move(inputs), {"input"}, output_schema), TracedNode(this), - ordering_(std::move(new_ordering)), + ordering_(new_ordering), buffer_size_(buffer_size), - path_to_folder_(path_to_folder) {} + path_to_folder_(path_to_folder), + accumulation_queue_(plan, output_schema, new_ordering, buffer_size, path_to_folder) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -110,10 +111,7 @@ class ExternalOrderByNode : public ExecNode, public TracedNode { ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, batch.ToRecordBatch(output_schema_)); - { - std::lock_guard lk(mutex_); - accumulation_queue_.push_back(std::move(record_batch)); - } + accumulation_queue_.push_back(std::move(record_batch)); if (counter_.Increment()) { return DoFinish(); @@ -122,6 +120,9 @@ class ExternalOrderByNode : public ExecNode, public TracedNode { } Status DoFinish() { + accumulation_queue_.push_finshed(); + std::vector batches; + ARROW_ASSIGN_OR_RAISE( auto table, Table::FromRecordBatches(output_schema_, std::move(accumulation_queue_))); @@ -163,13 +164,14 @@ class ExternalOrderByNode : public ExecNode, public TracedNode { std::string path_to_folder_; Ordering ordering_; std::mutex mutex_; + OrderedSpillingAccumulationQueue accumulation_queue_; }; class OrderedSpillingAccumulationQueue { public: - OrderedSpillingAccumulationQueue(int64_t buffer_size, std::string path_to_folder, - ExecPlan* plan, std::shared_ptr output_schema, - Ordering new_ordering) + OrderedSpillingAccumulationQueue(ExecPlan* plan, std::shared_ptr output_schema, + Ordering new_ordering, int64_t buffer_size, + std::string path_to_folder) : buffer_size_(buffer_size), plan_(plan), output_schema_(output_schema), @@ -224,6 +226,13 @@ class OrderedSpillingAccumulationQueue { return Status::OK(); } + Status push_finshed(){ + mutex_.lock(); + + + + } + // The number of files that have been written to disk. This should also include any data in memory // so it will be the number of files written to disk + 1 if there is in-memory data. int SpillCount() { From abdbadb5661b82c2b3eb7db2a9e249ff0758fec9 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sat, 3 Jun 2023 20:28:44 +0800 Subject: [PATCH 08/13] finish_init --- cpp/src/arrow/acero/external_order_by_node.cc | 95 ++++++++++++++----- 1 file changed, 70 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc index 0b93ab75902e1..b253bb18f92de 100644 --- a/cpp/src/arrow/acero/external_order_by_node.cc +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -190,6 +190,7 @@ class OrderedSpillingAccumulationQueue { if (accumulation_queue_size_ >= buffer_size_) { spill_count_++; + int64_t spill_index=spill_count_-1; ARROW_ASSIGN_OR_RAISE( auto table, Table::FromRecordBatches( @@ -198,45 +199,60 @@ class OrderedSpillingAccumulationQueue { accumulation_queue_ = make_shared>>(); mutex_.unlock(); - // sort - SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); - ExecContext* ctx = plan_->query_context()->exec_context(); - ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); - ARROW_ASSIGN_OR_RAISE(auto sorted_table, - Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); - std::shared_ptr sorted_table_ptr = sorted_table.table(); - // write to external storage - std::string folder_path = path_to_folder_ + "/0_sort_" + spill_count_ + ".parquet"; - std::shared_ptr props = - WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); - std::shared_ptr arrow_props = - ArrowWriterProperties::Builder().store_schema()->build(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, - arrow::io::FileOutputStream::Open(folder_path)); - plan_->query_context()->ScheduleIOTask( - [sorted_table_ptr, outfile, props, arrow_props]() mutable { - ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( - *(sorted_table_ptr.get()), arrow::default_memory_pool(), outfile, - /*chunk_size=*/3, props, arrow_props)); - }, - "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, sort_table(table)); + ARROW_RETURN_NOT_OK(schedule_write_task(sorted_table, spill_index)); } else { mutex_.unlock(); } return Status::OK(); } - Status push_finshed(){ - mutex_.lock(); + Status push_finshed() { + mutex_.lock(); + batch_size_ = buffer_size_ / spill_count_; + if (accumulation_queue_size_ > 0) { + if (accumulation_queue_size_ > batch_size_) { + spill_count_++; + int64_t spill_index = spill_count_ - 1; + int64_t output_size = accumulation_queue_size_ - batch_size_; + ARROW_ASSIGN_OR_RAISE( + auto table, + Table::FromRecordBatches( + output_schema_, std::move(accumulation_queue_))); // todo check batches_ + accumulation_queue_size_ = 0; + mutex_.unlock(); + + //output oversize part of data + TableBatchReader reader(table); + reader.set_chunksize(std::move(output_size)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr next, reader.Next()); + std::vector> batches; + batches.push_back(std::move(next)); + ARROW_ASSIGN_OR_RAISE( + auto output_table, Table::FromRecordBatches(output_schema_, std::move(batches))); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, sort_table(output_table)); + ARROW_RETURN_NOT_OK(schedule_write_task(sorted_table, spill_index)); + + //todo buffer the remanent data + reader.set_chunksize(batch_size_); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, reader.Next()); + + }else{ + //todo buffer the remanent data + } + } else { + mutex_.unlock(); + } + return Status::OK(); } // The number of files that have been written to disk. This should also include any data in memory // so it will be the number of files written to disk + 1 if there is in-memory data. int SpillCount() { - { + {// to do check whether the lock is reentried std::lock_guard lk(mutex_); return spill_count_; } @@ -247,12 +263,41 @@ class OrderedSpillingAccumulationQueue { // == SpillCount() - 1 then this might be data that is already in-memory. Future> FetchNextBatch(int spill_index); + protected: + Result> sort_table(std::shared_ptr table) { + SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); + ExecContext* ctx = plan_->query_context()->exec_context(); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, + Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); + return sorted_table.table(); + } + + Status schedule_write_task(std::shared_ptr table, int spill_index) { + std::string folder_path = + path_to_folder_ + "/sort_" + std::to_string(spill_index) + ".parquet"; + std::shared_ptr props = + WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, + arrow::io::FileOutputStream::Open(folder_path)); + plan_->query_context()->ScheduleIOTask( + [table, outfile, props, arrow_props]() mutable { + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( + *(table.get()), arrow::default_memory_pool(), outfile, + /*chunk_size=*/3, props, arrow_props)); + }, + "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); + } + private: std::mutex mutex_; std::vector> accumulation_queue_; int64_t spill_count_; int64_t accumulation_queue_size_; int64_t buffer_size_; + int64_t batch_size_; std::shared_ptr output_schema_; std::string path_to_folder_; Ordering ordering_; From 63725e81cbc2bf8a7722187777fca454b696208e Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sun, 4 Jun 2023 16:03:56 +0800 Subject: [PATCH 09/13] add cmake --- cpp/src/arrow/acero/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index 258dcb5580c34..55e97216aabbc 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -41,6 +41,7 @@ set(ARROW_ACERO_SRCS hash_join_node.cc map_node.cc options.cc + external_order_by_node.cc order_by_node.cc order_by_impl.cc partition_util.cc From 868e6213b8963a16f1184b26daadde60d94d21a0 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Mon, 5 Jun 2023 20:07:33 +0800 Subject: [PATCH 10/13] fix option --- cpp/src/arrow/acero/options.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index 41fa2a6434e2a..8d70e0ec73ce1 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -545,9 +545,9 @@ class ARROW_ACERO_EXPORT OrderByNodeOptions : public ExecNodeOptions { class ARROW_ACERO_EXPORT ExternalOrderByNodeOptions : public OrderByNodeOptions { public: static constexpr std::string_view kName = "external_order_by"; - explicit OrderByNodeOptions(Ordering ordering, int64_t buffer_size, + explicit ExternalOrderByNodeOptions(Ordering ordering, int64_t buffer_size, std::string path_to_folder) - : OrderByNodeOptions(ordering), + : OrderByNodeOptions(std::move(ordering)), buffer_size(std::move(buffer_size)), path_to_folder(std::move(path_to_folder)) {} From 21db372990f35ce1e7072be4c6354c9a923387d6 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Mon, 5 Jun 2023 22:51:12 +0800 Subject: [PATCH 11/13] fetchInit --- cpp/src/arrow/acero/external_order_by_node.cc | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc index b253bb18f92de..9244c8907b607 100644 --- a/cpp/src/arrow/acero/external_order_by_node.cc +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -237,6 +237,8 @@ class OrderedSpillingAccumulationQueue { reader.set_chunksize(batch_size_); ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, reader.Next()); + + plan_->query_context()->async_scheduler()-> }else{ //todo buffer the remanent data @@ -258,12 +260,46 @@ class OrderedSpillingAccumulationQueue { } } - // This should only be called after all calls to InsertBatch have been completed. This starts reading - // the data that was spilled. It will grab the next batch of data from the given spilled file. If spill_index + // This should only be called after all calls to InsertBatch have been completed. This + // starts reading the data that was spilled. It will grab the next batch of data from + // the given spilled file. If spill_index // == SpillCount() - 1 then this might be data that is already in-memory. - Future> FetchNextBatch(int spill_index); + Future> FetchNextBatch(int spill_index) { + std::string path_to_file = get_path_to_file(spill_index); + Future> future = Future>::Make(); + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + // Configure general Parquet reader settings + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + // Configure Arrow-specific Parquet reader settings + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(batch_size_); + + parquet::arrow::FileReaderBuilder reader_builder; + ARROW_RETURN_NOT_OK( + reader_builder.OpenFile(path_to_file, /*memory_map=*/false, reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + std::unique_ptr arrow_reader; + ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ARROW_RETURN_NOT_OK(arrow_reader->GetRecordBatchReader(&rb_reader)); + for (arrow::Result> maybe_batch : *rb_reader) { + // Operate on each batch... + } + return future; + } protected: + inline std::string get_path_to_file(int spill_index) { + return path_to_folder_ + "/sort_" + std::to_string(spill_index) + ".parquet"; + } + Result> sort_table(std::shared_ptr table) { SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); ExecContext* ctx = plan_->query_context()->exec_context(); @@ -274,14 +310,13 @@ class OrderedSpillingAccumulationQueue { } Status schedule_write_task(std::shared_ptr table, int spill_index) { - std::string folder_path = - path_to_folder_ + "/sort_" + std::to_string(spill_index) + ".parquet"; + std::string path_to_file =get_path_to_file(spill_index); std::shared_ptr props = WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); std::shared_ptr arrow_props = ArrowWriterProperties::Builder().store_schema()->build(); ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, - arrow::io::FileOutputStream::Open(folder_path)); + arrow::io::FileOutputStream::Open(path_to_file)); plan_->query_context()->ScheduleIOTask( [table, outfile, props, arrow_props]() mutable { ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( From 0e8dfcc1f5d352cf15eda09634c583b68f5500c9 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Tue, 6 Jun 2023 23:26:32 +0800 Subject: [PATCH 12/13] OrderedSpillingAccumulationQueue Done --- cpp/src/arrow/acero/external_order_by_node.cc | 188 +++++++++++------- 1 file changed, 117 insertions(+), 71 deletions(-) diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc index 9244c8907b607..e96942c18df80 100644 --- a/cpp/src/arrow/acero/external_order_by_node.cc +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -31,6 +31,11 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/tracing_internal.h" #include "arrow/util/type_fwd.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/async_util.h" +#include "arrow/util/thread_pool.h" +#include "arrow/util/future.h" +#include "arrow/io/util_internal.h" #include "parquet/arrow/writer.h" #include "parquet/arrow/reader.h" @@ -65,7 +70,7 @@ class ExternalOrderByNode : public ExecNode, public TracedNode { if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { return Status::Invalid("`ordering` must be an explicit non-empty ordering"); } - + //todo check buffer_size && path_to_folder std::shared_ptr output_schema = inputs[0]->output_schema(); @@ -163,7 +168,6 @@ class ExternalOrderByNode : public ExecNode, public TracedNode { int64_t buffer_size_; std::string path_to_folder_; Ordering ordering_; - std::mutex mutex_; OrderedSpillingAccumulationQueue accumulation_queue_; }; @@ -178,7 +182,9 @@ class OrderedSpillingAccumulationQueue { path_to_folder_(path_to_folder), ordering_(new_ordering), accumulation_queue_size_(0), - spill_count_(0) {} + spill_count_(0), + exec_batch_in_memory_(nullptr), + has_only_memory_spill_count_(false) {} // Inserts a batch into the queue. This may trigger a write to disk if enough data is // accumulated If it does, then SpillCount should be incremented before this method @@ -190,7 +196,7 @@ class OrderedSpillingAccumulationQueue { if (accumulation_queue_size_ >= buffer_size_) { spill_count_++; - int64_t spill_index=spill_count_-1; + int64_t spill_index = spill_count_ - 1; ARROW_ASSIGN_OR_RAISE( auto table, Table::FromRecordBatches( @@ -208,56 +214,64 @@ class OrderedSpillingAccumulationQueue { } Status push_finshed() { - mutex_.lock(); + bool = false; + if(accumulation_queue_size_>0){ + spill_count_++; + } batch_size_ = buffer_size_ / spill_count_; + if (accumulation_queue_size_ > 0) { - if (accumulation_queue_size_ > batch_size_) { - spill_count_++; + ARROW_ASSIGN_OR_RAISE( + auto table, + Table::FromRecordBatches( + output_schema_, std::move(accumulation_queue_))); // todo check batches_ + + if (accumulation_queue_size_ > batch_size_) { // output oversize part of data int64_t spill_index = spill_count_ - 1; int64_t output_size = accumulation_queue_size_ - batch_size_; - ARROW_ASSIGN_OR_RAISE( - auto table, - Table::FromRecordBatches( - output_schema_, std::move(accumulation_queue_))); // todo check batches_ - accumulation_queue_size_ = 0; - mutex_.unlock(); - - //output oversize part of data + TableBatchReader reader(table); reader.set_chunksize(std::move(output_size)); ARROW_ASSIGN_OR_RAISE(std::shared_ptr next, reader.Next()); std::vector> batches; batches.push_back(std::move(next)); - ARROW_ASSIGN_OR_RAISE( - auto output_table, Table::FromRecordBatches(output_schema_, std::move(batches))); + ARROW_ASSIGN_OR_RAISE(auto output_table, Table::FromRecordBatches( + output_schema_, std::move(batches))); ARROW_ASSIGN_OR_RAISE(auto sorted_table, sort_table(output_table)); ARROW_RETURN_NOT_OK(schedule_write_task(sorted_table, spill_index)); - //todo buffer the remanent data reader.set_chunksize(batch_size_); ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, reader.Next()); + exec_batch_in_memory_ = std::make_shared(*record_batch); + } else {//in memory + has_only_memory_spill_count_ = true; - - plan_->query_context()->async_scheduler()-> - }else{ - //todo buffer the remanent data - - + TableBatchReader reader(table); + reader.set_chunksize(batch_size_); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, reader.Next()); + exec_batch_in_memory_ = std::make_shared(*record_batch); } + accumulation_queue_size_ = 0; + } - } else { - mutex_.unlock(); + //init AsyncGenerators + int64_t count=spill_count_; + if(has_only_memory_spill_count_){ + count--; + } + for (int i = 0; i > FetchNextBatch(int spill_index) { + if (spill_index == spill_count_ - 1 && exec_batch_in_memory_ != nullptr) { + auto future = Future>::MakeFinished( + *(std::move(exec_batch_in_memory_))); + exec_batch_in_memory_ = nullptr; + return future; + } + if (spill_index >= 0 && + (spill_index < spill_count_ - 1 || + (spill_index == spill_count_ - 1 && !has_only_memory_spill_count_))) { + return asyncGenerators_.at(spill_index)(); + } + + return Future>::MakeFinished(std::nullopt); + } + + protected: + inline std::string get_path_to_file(int spill_index) { + return path_to_folder_ + "/sort_" + std::to_string(spill_index) + ".parquet"; + } + + Result> sort_table(std::shared_ptr table) { + SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); + ExecContext* ctx = plan_->query_context()->exec_context(); + ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); + ARROW_ASSIGN_OR_RAISE(auto sorted_table, + Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); + return sorted_table.table(); + } + + Status schedule_write_task(std::shared_ptr table, int spill_index) { std::string path_to_file = get_path_to_file(spill_index); - Future> future = Future>::Make(); - arrow::MemoryPool* pool = arrow::default_memory_pool(); + std::shared_ptr props = + WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, + arrow::io::FileOutputStream::Open(path_to_file)); + plan_->query_context()->ScheduleIOTask( + [table, outfile, props, arrow_props]() mutable { + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( + *(table.get()), arrow::default_memory_pool(), outfile, + /*chunk_size=*/3, props, arrow_props)); + }, + "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); + } + static Result>> MakeGenerator( + const std::string& path_to_file, int64_t batch_size) { + arrow::MemoryPool* pool = arrow::default_memory_pool(); // Configure general Parquet reader settings auto reader_properties = parquet::ReaderProperties(pool); reader_properties.set_buffer_size(4096 * 4); @@ -276,7 +335,7 @@ class OrderedSpillingAccumulationQueue { // Configure Arrow-specific Parquet reader settings auto arrow_reader_props = parquet::ArrowReaderProperties(); - arrow_reader_props.set_batch_size(batch_size_); + arrow_reader_props.set_batch_size(batch_size); parquet::arrow::FileReaderBuilder reader_builder; ARROW_RETURN_NOT_OK( @@ -289,46 +348,34 @@ class OrderedSpillingAccumulationQueue { std::shared_ptr<::arrow::RecordBatchReader> rb_reader; ARROW_RETURN_NOT_OK(arrow_reader->GetRecordBatchReader(&rb_reader)); - for (arrow::Result> maybe_batch : *rb_reader) { - // Operate on each batch... - } - return future; + + return MakeGenerator(rb_reader, io::internal::GetIOThreadPool()); } - protected: - inline std::string get_path_to_file(int spill_index) { - return path_to_folder_ + "/sort_" + std::to_string(spill_index) + ".parquet"; - } - - Result> sort_table(std::shared_ptr table) { - SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); - ExecContext* ctx = plan_->query_context()->exec_context(); - ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); - ARROW_ASSIGN_OR_RAISE(auto sorted_table, - Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); - return sorted_table.table(); - } - - Status schedule_write_task(std::shared_ptr table, int spill_index) { - std::string path_to_file =get_path_to_file(spill_index); - std::shared_ptr props = - WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); - std::shared_ptr arrow_props = - ArrowWriterProperties::Builder().store_schema()->build(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr outfile, - arrow::io::FileOutputStream::Open(path_to_file)); - plan_->query_context()->ScheduleIOTask( - [table, outfile, props, arrow_props]() mutable { - ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( - *(table.get()), arrow::default_memory_pool(), outfile, - /*chunk_size=*/3, props, arrow_props)); - }, - "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); - } + static Result>> MakeGenerator( + const std::shared_ptr& reader, + arrow::internal::Executor* io_executor) { + auto to_exec_batch = + [](const std::shared_ptr& batch) -> std::optional { + if (batch == NULLPTR) { + return std::nullopt; + } + return std::optional(ExecBatch(*batch)); + }; + Iterator> batch_it = MakeIteratorFromReader(reader); + auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it)); + if (io_executor == nullptr) { + return MakeBlockingGenerator(std::move(exec_batch_it)); + } + return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); + } private: std::mutex mutex_; std::vector> accumulation_queue_; + std::vector>> asyncGenerators_; + std::shared_ptr exec_batch_in_memory_; + bool has_only_memory_spill_count_; int64_t spill_count_; int64_t accumulation_queue_size_; int64_t buffer_size_; @@ -336,9 +383,8 @@ class OrderedSpillingAccumulationQueue { std::shared_ptr output_schema_; std::string path_to_folder_; Ordering ordering_; - ExecPlan* plan_ ; + ExecPlan* plan_; }; - } // namespace namespace internal { From a49695c5cca591cb72c6bb38e8590dd1a8f5a027 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 7 Jun 2023 09:15:41 +0800 Subject: [PATCH 13/13] fix bug --- cpp/src/arrow/acero/external_order_by_node.cc | 251 ++++++++---------- 1 file changed, 115 insertions(+), 136 deletions(-) diff --git a/cpp/src/arrow/acero/external_order_by_node.cc b/cpp/src/arrow/acero/external_order_by_node.cc index e96942c18df80..aeee2d10043b8 100644 --- a/cpp/src/arrow/acero/external_order_by_node.cc +++ b/cpp/src/arrow/acero/external_order_by_node.cc @@ -28,6 +28,7 @@ #include "arrow/acero/util.h" #include "arrow/result.h" #include "arrow/table.h" +#include "arrow/io/file.h" #include "arrow/util/checked_cast.h" #include "arrow/util/tracing_internal.h" #include "arrow/util/type_fwd.h" @@ -50,137 +51,17 @@ using parquet::WriterProperties; namespace acero { namespace { -class ExternalOrderByNode : public ExecNode, public TracedNode { - public: - ExternalOrderByNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Ordering new_ordering, - int64_t buffer_size, std::string path_to_folder) - : ExecNode(plan, std::move(inputs), {"input"}, output_schema), - TracedNode(this), - ordering_(new_ordering), - buffer_size_(buffer_size), - path_to_folder_(path_to_folder), - accumulation_queue_(plan, output_schema, new_ordering, buffer_size, path_to_folder) {} - - static Result Make(ExecPlan* plan, std::vector inputs, - const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "FetchNode")); - const auto& order_options = checked_cast(options); - - if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { - return Status::Invalid("`ordering` must be an explicit non-empty ordering"); - } - - //todo check buffer_size && path_to_folder - - std::shared_ptr output_schema = inputs[0]->output_schema(); - return plan->EmplaceNode( - plan, std::move(inputs), std::move(output_schema), order_options.ordering, - order_options.buffer_size, order_options.path_to_folder); - } - - const char* kind_name() const override { return "ExternalOrderByNode"; } - - const Ordering& ordering() const override { return ordering_; } - - Status InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - EVENT_ON_CURRENT_SPAN("InputFinished", {{"batches.length", total_batches}}); - // We can't send InputFinished downstream because we might change the # of batches - // when we sort it. So that happens later in DoFinish - if (counter_.SetTotal(total_batches)) { - return DoFinish(); - } - return Status::OK(); - } - - Status StartProducing() override { - NoteStartProducing(ToStringExtra()); - return Status::OK(); - } - - void PauseProducing(ExecNode* output, int32_t counter) override { - inputs_[0]->PauseProducing(this, counter); - } - - void ResumeProducing(ExecNode* output, int32_t counter) override { - inputs_[0]->ResumeProducing(this, counter); - } - - Status StopProducingImpl() override { return Status::OK(); } - - Status InputReceived(ExecNode* input, ExecBatch batch) override { - auto scope = TraceInputReceived(batch); - DCHECK_EQ(input, inputs_[0]); - - ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, - batch.ToRecordBatch(output_schema_)); - - accumulation_queue_.push_back(std::move(record_batch)); - - if (counter_.Increment()) { - return DoFinish(); - } - return Status::OK(); - } - - Status DoFinish() { - accumulation_queue_.push_finshed(); - std::vector batches; - - ARROW_ASSIGN_OR_RAISE( - auto table, - Table::FromRecordBatches(output_schema_, std::move(accumulation_queue_))); - SortOptions sort_options(ordering_.sort_keys(), ordering_.null_placement()); - ExecContext* ctx = plan_->query_context()->exec_context(); - ARROW_ASSIGN_OR_RAISE(auto indices, SortIndices(table, sort_options, ctx)); - ARROW_ASSIGN_OR_RAISE(Datum sorted, - Take(table, indices, TakeOptions::NoBoundsCheck(), ctx)); - const std::shared_ptr
& sorted_table = sorted.table(); - TableBatchReader reader(*sorted_table); - reader.set_chunksize(ExecPlan::kMaxBatchSize); - int batch_index = 0; - while (true) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr next, reader.Next()); - if (!next) { - return output_->InputFinished(this, batch_index); - } - int index = batch_index++; - plan_->query_context()->ScheduleTask( - [this, batch = std::move(next), index]() mutable { - ExecBatch exec_batch(*batch); - exec_batch.index = index; - return output_->InputReceived(this, std::move(exec_batch)); - }, - "OrderByNode::ProcessBatch"); - } - } - - protected: - std::string ToStringExtra(int indent = 0) const override { - std::stringstream ss; - ss << "external ordering=" << ordering_.ToString(); - return ss.str(); - } - - private: - AtomicCounter counter_; - int64_t buffer_size_; - std::string path_to_folder_; - Ordering ordering_; - OrderedSpillingAccumulationQueue accumulation_queue_; -}; - class OrderedSpillingAccumulationQueue { public: OrderedSpillingAccumulationQueue(ExecPlan* plan, std::shared_ptr output_schema, Ordering new_ordering, int64_t buffer_size, std::string path_to_folder) - : buffer_size_(buffer_size), - plan_(plan), + : plan_(plan), output_schema_(output_schema), - path_to_folder_(path_to_folder), ordering_(new_ordering), + buffer_size_(buffer_size), + path_to_folder_(path_to_folder), + accumulation_queue_size_(0), spill_count_(0), exec_batch_in_memory_(nullptr), @@ -202,7 +83,7 @@ class OrderedSpillingAccumulationQueue { Table::FromRecordBatches( output_schema_, std::move(accumulation_queue_))); // todo check batches_ accumulation_queue_size_ = 0; - accumulation_queue_ = make_shared>>(); + accumulation_queue_ = std::vector>(); mutex_.unlock(); ARROW_ASSIGN_OR_RAISE(auto sorted_table, sort_table(table)); @@ -214,7 +95,6 @@ class OrderedSpillingAccumulationQueue { } Status push_finshed() { - bool = false; if(accumulation_queue_size_>0){ spill_count_++; } @@ -321,8 +201,10 @@ class OrderedSpillingAccumulationQueue { ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable( *(table.get()), arrow::default_memory_pool(), outfile, /*chunk_size=*/3, props, arrow_props)); + return Status::OK(); }, "OrderByNode::OrderedSpillingAccumulationQueue::Spillover"); + return Status::OK(); } static Result>> MakeGenerator( @@ -371,27 +253,124 @@ class OrderedSpillingAccumulationQueue { } private: + ExecPlan* plan_; + std::shared_ptr output_schema_; + Ordering ordering_; + int64_t buffer_size_; + std::string path_to_folder_; + int64_t accumulation_queue_size_; + int64_t spill_count_; + std::shared_ptr exec_batch_in_memory_; + bool has_only_memory_spill_count_; + std::mutex mutex_; std::vector> accumulation_queue_; std::vector>> asyncGenerators_; - std::shared_ptr exec_batch_in_memory_; - bool has_only_memory_spill_count_; - int64_t spill_count_; - int64_t accumulation_queue_size_; - int64_t buffer_size_; int64_t batch_size_; - std::shared_ptr output_schema_; - std::string path_to_folder_; +}; + +class ExternalOrderByNode : public ExecNode, public TracedNode { + public: + ExternalOrderByNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, Ordering new_ordering, + int64_t buffer_size, std::string path_to_folder) + : ExecNode(plan, std::move(inputs), {"input"}, std::move(output_schema)), + TracedNode(this), + ordering_(std::move(new_ordering)), + buffer_size_(std::move(buffer_size)), + path_to_folder_(std::move(path_to_folder)), + accumulation_queue_(plan, output_schema_, ordering_, buffer_size_, + path_to_folder_) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "FetchNode")); + const auto& order_options = checked_cast(options); + + if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { + return Status::Invalid("`ordering` must be an explicit non-empty ordering"); + } + + //todo check buffer_size && path_to_folder + + std::shared_ptr output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode( + plan, std::move(inputs), std::move(output_schema), order_options.ordering, + order_options.buffer_size, order_options.path_to_folder); + } + + const char* kind_name() const override { return "ExternalOrderByNode"; } + + const Ordering& ordering() const override { return ordering_; } + + Status InputFinished(ExecNode* input, int total_batches) override { + DCHECK_EQ(input, inputs_[0]); + EVENT_ON_CURRENT_SPAN("InputFinished", {{"batches.length", total_batches}}); + // We can't send InputFinished downstream because we might change the # of batches + // when we sort it. So that happens later in DoFinish + if (counter_.SetTotal(total_batches)) { + return DoFinish(); + } + return Status::OK(); + } + + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, + batch.ToRecordBatch(output_schema_)); + RETURN_NOT_OK(accumulation_queue_.push_back(std::move(record_batch))); + + if (counter_.Increment()) { + return DoFinish(); + } + return Status::OK(); + } + + Status DoFinish() { + ARROW_RETURN_NOT_OK(accumulation_queue_.push_finshed()); + std::vector batches; + return Status::OK(); + } + + protected: + std::string ToStringExtra(int indent = 0) const override { + std::stringstream ss; + ss << "external ordering=" << ordering_.ToString(); + return ss.str(); + } + + private: + AtomicCounter counter_; Ordering ordering_; - ExecPlan* plan_; + int64_t buffer_size_; + std::string path_to_folder_; + OrderedSpillingAccumulationQueue accumulation_queue_; }; + } // namespace namespace internal { -void RegisterOrderByNode(ExecFactoryRegistry* registry) { +void RegisterExternalOrderByNode(ExecFactoryRegistry* registry) { DCHECK_OK( - registry->AddFactory(std::string(OrderByNodeOptions::kName), OrderByNode::Make)); + registry->AddFactory(std::string(ExternalOrderByNodeOptions::kName), ExternalOrderByNode::Make)); } } // namespace internal