Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-417] Sort spill support framework (#369)
Browse files Browse the repository at this point in the history
* adding unit test for sort spill

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix unit test

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* support spill in sort

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix read path

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* support multiple keys

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* update API for more kernels

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* clean up

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* support Spill API in sort

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* reenable CoalesceBatch before Sort

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* remove use cnt

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* integrate with spark mem mgnt framework

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* more spill to sort result iterator

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* clean up

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* release recordbatch from java size

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* rebase

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix unit test

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* refine repeated spill

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan authored Jul 23, 2021
1 parent 7168359 commit 5d05fd2
Show file tree
Hide file tree
Showing 23 changed files with 584 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;

import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer;
import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16;
import org.apache.arrow.gandiva.exceptions.GandivaException;
import org.apache.arrow.gandiva.expression.ExpressionTree;
Expand Down Expand Up @@ -142,6 +143,25 @@ public ArrowRecordBatch[] evaluate(ArrowRecordBatch recordBatch) throws RuntimeE
return evaluate(recordBatch, null);
}

/**
* Evaluate input data using builded native function, and output as recordBatch.
*/
public ArrowRecordBatch[] evaluate2(ArrowRecordBatch recordBatch) throws RuntimeException, IOException {
byte[] bytes = UnsafeRecordBatchSerializer.serializeUnsafe(recordBatch);
ArrowRecordBatchBuilder[] resRecordBatchBuilderList = jniWrapper.nativeEvaluate2(nativeHandler, bytes);
ArrowRecordBatch[] recordBatchList = new ArrowRecordBatch[resRecordBatchBuilderList.length];
for (int i = 0; i < resRecordBatchBuilderList.length; i++) {
if (resRecordBatchBuilderList[i] == null) {
recordBatchList[i] = null;
break;
}
ArrowRecordBatchBuilderImpl resRecordBatchBuilderImpl = new ArrowRecordBatchBuilderImpl(
resRecordBatchBuilderList[i]);
recordBatchList[i] = resRecordBatchBuilderImpl.build();
}
return recordBatchList;
}

/**
* Evaluate input data using builded native function, and output as recordBatch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ native long nativeBuildWithFinish(long memoryPool, byte[] schemaBuf, byte[] expr
native ArrowRecordBatchBuilder[] nativeEvaluate(long nativeHandler, int numRows, long[] bufAddrs,
long[] bufSizes) throws RuntimeException;

native ArrowRecordBatchBuilder[] nativeEvaluate2(long nativeHandler, byte[] bytes) throws RuntimeException;

/**
* Get native kernel signature by the nativeHandler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
child match {
case CoalesceBatchesExec(fwdChild: SparkPlan) =>
ColumnarSortExec(plan.sortOrder, plan.global, fwdChild, plan.testSpillFrequency)
case _ =>
ColumnarSortExec(plan.sortOrder, plan.global, child, plan.testSpillFrequency)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class ColumnarSorter(
elapse.set(NANOSECONDS.toMillis(total_elapse))
sortTime.set(NANOSECONDS.toMillis(sort_elapse))
shuffleTime.set(NANOSECONDS.toMillis(shuffle_elapse))
inputBatchHolder.foreach(cb => cb.close())
inputBatchHolder.clear
//inputBatchHolder.foreach(cb => cb.close())
//inputBatchHolder.clear
if (sorter != null) {
sorter.close()
}
Expand All @@ -110,7 +110,7 @@ class ColumnarSorter(
(0 until input.numCols).toList.foreach(i =>
input.column(i).asInstanceOf[ArrowWritableColumnVector].retain())
val beforeSort = System.nanoTime()
sorter.evaluate(input_batch)
sorter.evaluate2(input_batch)
sort_elapse += System.nanoTime() - beforeSort
total_elapse += System.nanoTime() - beforeSort
ConverterUtils.releaseArrowRecordBatch(input_batch)
Expand Down Expand Up @@ -160,8 +160,8 @@ class ColumnarSorter(
return true
} else {
has_next = false
inputBatchHolder.foreach(cb => cb.close())
inputBatchHolder.clear
//inputBatchHolder.foreach(cb => cb.close())
//inputBatchHolder.clear
return false
}
}
Expand Down Expand Up @@ -373,7 +373,7 @@ object ColumnarSorter extends Logging {
val (sort_expr, arrowSchema) = init(sortOrder, outputAttributes, sparkConf)
val sorter = new ExpressionEvaluator()
val signature = sorter
.build(arrowSchema, Lists.newArrayList(sort_expr), arrowSchema, true /*return at finish*/ )
.build(arrowSchema, Lists.newArrayList(sort_expr), arrowSchema, true /*return at finish*/)
sorter.close
signature
}
Expand All @@ -391,7 +391,7 @@ object ColumnarSorter extends Logging {
val (sort_expr, arrowSchema) = init(sortOrder, outputAttributes, sparkConf)
val sorter = new ExpressionEvaluator(listJars.toList.asJava)
sorter
.build(arrowSchema, Lists.newArrayList(sort_expr), arrowSchema, true /*return at finish*/ )
.build(arrowSchema, Lists.newArrayList(sort_expr), arrowSchema, true /*return at finish*/, true/*enable spill*/)
new ColumnarSorter(
sorter,
outputAttributes,
Expand Down
2 changes: 1 addition & 1 deletion native-sql-engine/cpp/compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fi
mkdir build
cd build
cmake .. -DTESTS=${TESTS} -DBUILD_ARROW=${BUILD_ARROW} -DSTATIC_ARROW=${STATIC_ARROW} -DBUILD_PROTOBUF=${BUILD_PROTOBUF} -DARROW_ROOT=${ARROW_ROOT} -DARROW_BFS_INSTALL_DIR=${ARROW_BFS_INSTALL_DIR}
make
make -j2

set +eu

Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class ArrowComputeCodeGenerator : public CodeGenerator {
return arrow::Status::OK();
}

arrow::Status evaluate(const std::shared_ptr<arrow::RecordBatch>& in,
arrow::Status evaluate(std::shared_ptr<arrow::RecordBatch>& in,
std::vector<std::shared_ptr<arrow::RecordBatch>>* out) {
arrow::Status status = arrow::Status::OK();
std::vector<ArrayList> batch_array;
Expand Down Expand Up @@ -256,14 +256,15 @@ class ArrowComputeCodeGenerator : public CodeGenerator {
}
int64_t current_spilled = 0L;
for (auto visitor : visitor_list_) {
int64_t single_call_spilled;
int64_t single_call_spilled = 0;
RETURN_NOT_OK(visitor->Spill(size - current_spilled, &single_call_spilled));
current_spilled += single_call_spilled;
if (current_spilled >= size) {
*spilled_size = current_spilled;
return arrow::Status::OK();
}
}

*spilled_size = current_spilled;
return arrow::Status::OK();
}
Expand Down
11 changes: 6 additions & 5 deletions native-sql-engine/cpp/src/codegen/arrow_compute/expr_visitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ arrow::Status ExprVisitor::Eval(const std::shared_ptr<arrow::Array>& selection_i
return arrow::Status::OK();
}

arrow::Status ExprVisitor::Eval(const std::shared_ptr<arrow::RecordBatch>& in) {
in_record_batch_ = in;
arrow::Status ExprVisitor::Eval(std::shared_ptr<arrow::RecordBatch>& in) {
in_record_batch_ = std::move(in);
RETURN_NOT_OK(Eval());
return arrow::Status::OK();
}
Expand Down Expand Up @@ -757,19 +757,20 @@ arrow::Status ExprVisitor::GetResult(
}

arrow::Status ExprVisitor::Spill(int64_t size, int64_t* spilled_size) {
int64_t current_spilled = 0L;
int64_t current_spilled = 0;
if (dependency_) {
// fixme cycle invocation?
int64_t single_call_spilled;
int64_t single_call_spilled = 0;
RETURN_NOT_OK(dependency_->Spill(size - current_spilled, &single_call_spilled));
current_spilled += single_call_spilled;

if (current_spilled >= size) {
*spilled_size = current_spilled;
return arrow::Status::OK();
}
}
if (!finish_visitor_) {
int64_t single_call_spilled;
int64_t single_call_spilled = 0;
RETURN_NOT_OK(impl_->Spill(size - current_spilled, &single_call_spilled));
current_spilled += single_call_spilled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class ExprVisitor : public std::enable_shared_from_this<ExprVisitor> {
arrow::Status Init();
arrow::Status Eval(const std::shared_ptr<arrow::Array>& selection_in,
const std::shared_ptr<arrow::RecordBatch>& in);
arrow::Status Eval(const std::shared_ptr<arrow::RecordBatch>& in);
arrow::Status Eval(std::shared_ptr<arrow::RecordBatch>& in);
arrow::Status Eval();
std::string GetSignature() { return signature_; }
arrow::Status SetMember(const std::shared_ptr<arrow::RecordBatch>& ms);
Expand Down Expand Up @@ -190,6 +190,7 @@ class ExprVisitor : public std::enable_shared_from_this<ExprVisitor> {
std::shared_ptr<ExprVisitor> dependency_;
std::shared_ptr<arrow::Array> in_selection_array_;
std::shared_ptr<arrow::RecordBatch> in_record_batch_;
std::vector<std::shared_ptr<arrow::RecordBatch>> in_record_batch_holder_;
std::vector<std::shared_ptr<arrow::Field>> ret_fields_;

// For dual input kernels like probe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,9 @@ class SortArraysToIndicesVisitorImpl : public ExprVisitorImpl {
case ArrowComputeResultType::None: {
std::vector<std::shared_ptr<arrow::Array>> col_list;
for (auto col : p_->in_record_batch_->columns()) {
col_list.push_back(col);
col_list.push_back(std::move(col));
}
p_->in_record_batch_holder_.push_back(std::move(p_->in_record_batch_));
RETURN_NOT_OK(kernel_->Evaluate(col_list));
} break;
default:
Expand All @@ -508,6 +509,16 @@ class SortArraysToIndicesVisitorImpl : public ExprVisitorImpl {
return arrow::Status::OK();
}

arrow::Status Spill(int64_t size, int64_t* spilled_size) override {
std::cout << "target size: " << size << std::endl;
RETURN_NOT_OK(kernel_->Spill(size, spilled_size));

if (*spilled_size != 0) {
(p_->in_record_batch_holder_).clear();
}
return arrow::Status::OK();
}

arrow::Status MakeResultIterator(std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<ResultIteratorBase>* out) override {
switch (finish_return_type_) {
Expand Down
Loading

0 comments on commit 5d05fd2

Please sign in to comment.