Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-928] Add ARROW_CHECK for batch_size check (#973)
Browse files Browse the repository at this point in the history
* [NSE-928] Add ARROW_CHECK for batch_size check

* Use uint32_t for array_id

* Retain uint32_t for sort index

* Add ARROW_CHECK for recordbatch number currently
  • Loading branch information
zhixingheyi-tian authored Jun 28, 2022
1 parent 3bef6d4 commit c4d4a65
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
11 changes: 11 additions & 0 deletions native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,9 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
if (nulls_total_ == 0) {
// if all batches have no null value,
// we do not need to check whether the value is null
ARROW_CHECK_LE(num_batches_, 64 * 1024);
for (int array_id = 0; array_id < num_batches_; array_id++) {
ARROW_CHECK_LE(length_list_[array_id], 64 * 1024);
for (int64_t i = 0; i < length_list_[array_id]; i++) {
(indices_begin + indices_i)->array_id = array_id;
(indices_begin + indices_i)->id = i;
Expand All @@ -1408,7 +1410,9 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
}
} else {
// we should support nulls first and nulls last here
ARROW_CHECK_LE(num_batches_, 64 * 1024);
for (int array_id = 0; array_id < num_batches_; array_id++) {
ARROW_CHECK_LE(length_list_[array_id], 64 * 1024);
if (cached_key_[array_id]->null_count() == 0) {
// if this array has no null value,
// we do need to check if the value is null
Expand Down Expand Up @@ -1456,7 +1460,9 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
int64_t indices_i = 0;
int64_t indices_nan = 0;

ARROW_CHECK_LE(num_batches_, 64 * 1024);
for (int array_id = 0; array_id < num_batches_; array_id++) {
ARROW_CHECK_LE(length_list_[array_id], 64 * 1024);
for (int64_t i = 0; i < length_list_[array_id]; i++) {
if (cached_key_[array_id]->IsNull(i)) {
continue;
Expand Down Expand Up @@ -1988,6 +1994,7 @@ class SortArraysCodegenKernel : public SortArraysToIndicesKernel::Impl {

return BaseCodes() + R"(
#include "arrow/util/logging.h"
#include "precompile/wscgapi.hpp"
Expand Down Expand Up @@ -2023,7 +2030,9 @@ class TypedSorterImpl : public CodeGenBase {
ArrayItemIndexS* indices_end = indices_begin + items_total_;
int64_t indices_i = 0;
ARROW_CHECK_LE(num_batches_, 64 * 1024);
for (int array_id = 0; array_id < num_batches_; array_id++) {
ARROW_CHECK_LE(length_list_[array_id], 64*1024);
for (int64_t i = 0; i < length_list_[array_id]; i++) {
(indices_begin + indices_i)->array_id = array_id;
(indices_begin + indices_i)->id = i;
Expand Down Expand Up @@ -2581,7 +2590,9 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl {
void Partition(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) {
int64_t indices_i = 0;
int64_t indices_null = 0;
ARROW_CHECK_LE(num_batches_, 64 * 1024);
for (int array_id = 0; array_id < num_batches_; array_id++) {
ARROW_CHECK_LE(length_list_[array_id], 64 * 1024);
for (int64_t i = 0; i < length_list_[array_id]; i++) {
(indices_begin + indices_i)->array_id = array_id;
(indices_begin + indices_i)->id = i;
Expand Down
3 changes: 2 additions & 1 deletion native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,8 @@ Splitter::row_offset_type Splitter::CalculateSplitBatchSize(

arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) {
// buffer is allocated less than 64K
// ARROW_CHECK_LE(rb.num_rows(),64*1024);
// Will uncomment ARROW_CHECK_LE here, after fix the max batch_size issue
// ARROW_CHECK_LE(rb.num_rows(), 64 * 1024);

reducer_offsets_.resize(rb.num_rows());

Expand Down

0 comments on commit c4d4a65

Please sign in to comment.