Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Fix DPP enabled some sort and aggregate input may be empty issue (#59)
Browse files Browse the repository at this point in the history
Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi authored Jan 26, 2021
1 parent b47116d commit 9ce64d7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ class ColumnarAggregation(
}
numInputBatches += 1
}
if (processedNumRows == 0) {
data_loaded = true
aggrTime += NANOSECONDS.toMillis(eval_elapse)
nextBatch = false
return false
}
if (groupingFieldList.size > 0) {
val beforeFinish = System.nanoTime()
result_iterator = aggregator.finishByIterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ColumnarSorter(
var has_next: Boolean = true

override def hasNext: Boolean = {
if (sort_iterator == null) {
if (has_next && sort_iterator == null) {
while (cbIterator.hasNext) {
cb = cbIterator.next()

Expand All @@ -148,14 +148,18 @@ class ColumnarSorter(
}

val beforeSort = System.nanoTime()
sort_iterator = sorter.finishByIterator();
if (processedNumRows > 0) {
sort_iterator = sorter.finishByIterator();
}
sort_elapse += System.nanoTime() - beforeSort
total_elapse += System.nanoTime() - beforeSort
}
if (sort_iterator.hasNext()) {
if (sort_iterator != null && sort_iterator.hasNext()) {
return true
} else {
has_next = false
inputBatchHolder.foreach(cb => cb.close())
inputBatchHolder.clear
return false
}
}
Expand Down

0 comments on commit 9ce64d7

Please sign in to comment.