From 08d0dc32f2ec0f33764513ef70ac9dc7d6e74f47 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 17 Jan 2022 12:24:01 +0800 Subject: [PATCH 1/8] Change SPMS to use heap sort, use SPMS instead of in-mem-sort as well --- .../src/physical_plan/sorts/external_sort.rs | 26 +- .../src/physical_plan/sorts/in_mem_sort.rs | 241 ------------------ datafusion/src/physical_plan/sorts/mod.rs | 66 +++-- .../sorts/sort_preserving_merge.rs | 190 ++++++++------ 4 files changed, 170 insertions(+), 353 deletions(-) delete mode 100644 datafusion/src/physical_plan/sorts/in_mem_sort.rs diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs index 8550cb5ad433..6f436fc7f7fd 100644 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ b/datafusion/src/physical_plan/sorts/external_sort.rs @@ -27,7 +27,6 @@ use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; -use crate::physical_plan::sorts::in_mem_sort::InMemSortStream; use crate::physical_plan::sorts::sort::sort_batch; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; use crate::physical_plan::sorts::SortedStream; @@ -136,6 +135,7 @@ impl ExternalSorter { &self.expr, self.runtime.batch_size(), baseline_metrics, + self.runtime.clone(), ) .await?; streams.push(SortedStream::new(in_mem_stream, self.used())); @@ -149,7 +149,7 @@ impl ExternalSorter { let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin( - SortPreservingMergeStream::new_from_stream( + SortPreservingMergeStream::new_from_streams( streams, self.schema.clone(), &self.expr, @@ -229,6 +229,7 @@ impl MemoryConsumer for ExternalSorter { &*self.expr, self.runtime.batch_size(), baseline_metrics, + self.runtime.clone(), ) .await; @@ -256,6 +257,7 @@ async fn in_mem_partial_sort( expressions: &[PhysicalSortExpr], target_batch_size: usize, baseline_metrics: BaselineMetrics, + runtime: Arc, ) -> Result { if sorted_bathes.len() == 1 { Ok(Box::pin(SizedRecordBatchStream::new( @@ -263,15 +265,19 @@ async fn in_mem_partial_sort( vec![Arc::new(sorted_bathes.pop().unwrap())], ))) } else { - let new = sorted_bathes.drain(..).collect(); + let batches = sorted_bathes.drain(..).collect(); assert_eq!(sorted_bathes.len(), 0); - Ok(Box::pin(InMemSortStream::new( - new, - schema, - expressions, - target_batch_size, - baseline_metrics, - )?)) + Ok(Box::pin( + SortPreservingMergeStream::new_from_batches( + batches, + schema, + expressions, + target_batch_size, + baseline_metrics, + runtime, + ) + .await, + )) } } diff --git a/datafusion/src/physical_plan/sorts/in_mem_sort.rs b/datafusion/src/physical_plan/sorts/in_mem_sort.rs deleted file mode 100644 index 9e7753d42472..000000000000 --- a/datafusion/src/physical_plan/sorts/in_mem_sort.rs +++ /dev/null @@ -1,241 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::BinaryHeap; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use arrow::{ - array::{make_array as make_arrow_array, MutableArrayData}, - compute::SortOptions, - datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, -}; -use futures::Stream; - -use crate::error::Result; -use crate::physical_plan::metrics::BaselineMetrics; -use crate::physical_plan::sorts::{RowIndex, SortKeyCursor}; -use crate::physical_plan::{ - expressions::PhysicalSortExpr, PhysicalExpr, RecordBatchStream, -}; - -/// Merge buffered, self-sorted record batches to get an order. -/// -/// Internally, it uses MinHeap to reduce extra memory consumption -/// by not concatenating all batches into one and sorting it as done by `SortExec`. -pub(crate) struct InMemSortStream { - /// The schema of the RecordBatches yielded by this stream - schema: SchemaRef, - /// Self sorted batches to be merged together - batches: Vec>, - /// The accumulated row indexes for the next record batch - in_progress: Vec, - /// The desired RecordBatch size to yield - target_batch_size: usize, - /// used to record execution metrics - baseline_metrics: BaselineMetrics, - /// If the stream has encountered an error - aborted: bool, - /// min heap for record comparison - min_heap: BinaryHeap, -} - -impl InMemSortStream { - pub(crate) fn new( - sorted_batches: Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - target_batch_size: usize, - baseline_metrics: BaselineMetrics, - ) -> Result { - let len = sorted_batches.len(); - let mut cursors = Vec::with_capacity(len); - let mut min_heap = BinaryHeap::with_capacity(len); - - let column_expressions: Vec> = - expressions.iter().map(|x| x.expr.clone()).collect(); - - // The sort options for each expression - let sort_options: Arc> = - Arc::new(expressions.iter().map(|x| x.options).collect()); - - sorted_batches - .into_iter() - .enumerate() - .try_for_each(|(idx, batch)| { - let batch = Arc::new(batch); - let cursor = match SortKeyCursor::new( - idx, - batch.clone(), - &column_expressions, - sort_options.clone(), - ) { - Ok(cursor) => cursor, - Err(e) => return Err(e), - }; - min_heap.push(cursor); - cursors.insert(idx, batch); - Ok(()) - })?; - - Ok(Self { - schema, - batches: cursors, - target_batch_size, - baseline_metrics, - aborted: false, - in_progress: vec![], - min_heap, - }) - } - - /// Returns the index of the next batch to pull a row from, or None - /// if all cursors for all batch are exhausted - fn next_cursor(&mut self) -> Result> { - match self.min_heap.pop() { - None => Ok(None), - Some(cursor) => Ok(Some(cursor)), - } - } - - /// Drains the in_progress row indexes, and builds a new RecordBatch from them - /// - /// Will then drop any cursors for which all rows have been yielded to the output - fn build_record_batch(&mut self) -> ArrowResult { - let columns = self - .schema - .fields() - .iter() - .enumerate() - .map(|(column_idx, field)| { - let arrays = self - .batches - .iter() - .map(|batch| batch.column(column_idx).data()) - .collect(); - - let mut array_data = MutableArrayData::new( - arrays, - field.is_nullable(), - self.in_progress.len(), - ); - - if self.in_progress.is_empty() { - return make_arrow_array(array_data.freeze()); - } - - let first = &self.in_progress[0]; - let mut buffer_idx = first.stream_idx; - let mut start_row_idx = first.row_idx; - let mut end_row_idx = start_row_idx + 1; - - for row_index in self.in_progress.iter().skip(1) { - let next_buffer_idx = row_index.stream_idx; - - if next_buffer_idx == buffer_idx && row_index.row_idx == end_row_idx { - // subsequent row in same batch - end_row_idx += 1; - continue; - } - - // emit current batch of rows for current buffer - array_data.extend(buffer_idx, start_row_idx, end_row_idx); - - // start new batch of rows - buffer_idx = next_buffer_idx; - start_row_idx = row_index.row_idx; - end_row_idx = start_row_idx + 1; - } - - // emit final batch of rows - array_data.extend(buffer_idx, start_row_idx, end_row_idx); - make_arrow_array(array_data.freeze()) - }) - .collect(); - - self.in_progress.clear(); - RecordBatch::try_new(self.schema.clone(), columns) - } - - #[inline] - fn poll_next_inner( - self: &mut Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - if self.aborted { - return Poll::Ready(None); - } - - loop { - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); - let _timer = elapsed_compute.timer(); - - match self.next_cursor() { - Ok(Some(mut cursor)) => { - let batch_idx = cursor.batch_idx; - let row_idx = cursor.advance(); - - // insert the cursor back to min_heap if the record batch is not exhausted - if !cursor.is_finished() { - self.min_heap.push(cursor); - } - - self.in_progress.push(RowIndex { - stream_idx: batch_idx, - cursor_idx: 0, - row_idx, - }); - } - Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None), - Ok(None) => return Poll::Ready(Some(self.build_record_batch())), - Err(e) => { - self.aborted = true; - return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new( - e, - ))))); - } - }; - - if self.in_progress.len() == self.target_batch_size { - return Poll::Ready(Some(self.build_record_batch())); - } - } - } -} - -impl Stream for InMemSortStream { - type Item = ArrowResult; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let poll = self.poll_next_inner(cx); - self.baseline_metrics.record_poll(poll) - } -} - -impl RecordBatchStream for InMemSortStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index 3dda13b1a178..11e9d3845f66 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -19,6 +19,7 @@ use crate::error; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::common::batch_byte_size; use crate::physical_plan::{PhysicalExpr, SendableRecordBatchStream}; use arrow::array::{ArrayRef, DynComparator}; use arrow::compute::SortOptions; @@ -32,11 +33,11 @@ use std::borrow::BorrowMut; use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::pin::Pin; +use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; pub mod external_sort; -mod in_mem_sort; pub mod sort; pub mod sort_preserving_merge; @@ -50,8 +51,9 @@ pub mod sort_preserving_merge; /// by this row cursor, with that of another `SortKeyCursor`. A cursor stores /// a row comparator for each other cursor that it is compared to. struct SortKeyCursor { - columns: Vec, - cur_row: usize, + stream_idx: usize, + sort_columns: Vec, + cur_row: AtomicUsize, num_rows: usize, // An index uniquely identifying the record batch scanned by this cursor. @@ -68,8 +70,8 @@ struct SortKeyCursor { impl<'a> std::fmt::Debug for SortKeyCursor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("SortKeyCursor") - .field("columns", &self.columns) - .field("cur_row", &self.cur_row) + .field("sort_columns", &self.sort_columns) + .field("cur_row", &self.cur_row()) .field("num_rows", &self.num_rows) .field("batch_idx", &self.batch_idx) .field("batch", &self.batch) @@ -80,19 +82,21 @@ impl<'a> std::fmt::Debug for SortKeyCursor { impl SortKeyCursor { fn new( + stream_idx: usize, batch_idx: usize, batch: Arc, sort_key: &[Arc], sort_options: Arc>, ) -> error::Result { - let columns = sort_key + let sort_columns = sort_key .iter() .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows()))) .collect::>()?; Ok(Self { - cur_row: 0, + stream_idx, + cur_row: AtomicUsize::new(0), num_rows: batch.num_rows(), - columns, + sort_columns, batch, batch_idx, batch_comparators: RwLock::new(HashMap::new()), @@ -101,38 +105,41 @@ impl SortKeyCursor { } fn is_finished(&self) -> bool { - self.num_rows == self.cur_row + self.num_rows == self.cur_row() } - fn advance(&mut self) -> usize { + fn advance(&self) -> usize { assert!(!self.is_finished()); - let t = self.cur_row; - self.cur_row += 1; - t + self.cur_row + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + fn cur_row(&self) -> usize { + self.cur_row.load(std::sync::atomic::Ordering::SeqCst) } /// Compares the sort key pointed to by this instance's row cursor with that of another fn compare(&self, other: &SortKeyCursor) -> error::Result { - if self.columns.len() != other.columns.len() { + if self.sort_columns.len() != other.sort_columns.len() { return Err(DataFusionError::Internal(format!( "SortKeyCursors had inconsistent column counts: {} vs {}", - self.columns.len(), - other.columns.len() + self.sort_columns.len(), + other.sort_columns.len() ))); } - if self.columns.len() != self.sort_options.len() { + if self.sort_columns.len() != self.sort_options.len() { return Err(DataFusionError::Internal(format!( "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}", - self.columns.len(), + self.sort_columns.len(), self.sort_options.len() ))); } let zipped: Vec<((&ArrayRef, &ArrayRef), &SortOptions)> = self - .columns + .sort_columns .iter() - .zip(other.columns.iter()) + .zip(other.sort_columns.iter()) .zip(self.sort_options.iter()) .collect::>(); @@ -146,7 +153,7 @@ impl SortKeyCursor { })?; for (i, ((l, r), sort_options)) in zipped.iter().enumerate() { - match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) { + match (l.is_valid(self.cur_row()), r.is_valid(other.cur_row())) { (false, true) if sort_options.nulls_first => return Ok(Ordering::Less), (false, true) => return Ok(Ordering::Greater), (true, false) if sort_options.nulls_first => { @@ -154,7 +161,7 @@ impl SortKeyCursor { } (true, false) => return Ok(Ordering::Less), (false, false) => {} - (true, true) => match cmp[i](self.cur_row, other.cur_row) { + (true, true) => match cmp[i](self.cur_row(), other.cur_row()) { Ordering::Equal => {} o if sort_options.descending => return Ok(o.reverse()), o => return Ok(o), @@ -179,7 +186,7 @@ impl SortKeyCursor { let cmp = map .borrow_mut() .entry(other.batch_idx) - .or_insert_with(|| Vec::with_capacity(other.columns.len())); + .or_insert_with(|| Vec::with_capacity(other.sort_columns.len())); for (i, ((l, r), _)) in zipped.iter().enumerate() { if i >= cmp.len() { @@ -247,14 +254,15 @@ impl SortedStream { enum StreamWrapper { Receiver(mpsc::Receiver>), Stream(Option), + SingleBatch(Option), } impl StreamWrapper { fn mem_used(&self) -> usize { - if let StreamWrapper::Stream(Some(s)) = &self { - s.mem_used - } else { - 0 + match &self { + StreamWrapper::Stream(Some(s)) => s.mem_used, + StreamWrapper::SingleBatch(Some(b)) => batch_byte_size(b), + _ => 0, } } } @@ -281,6 +289,9 @@ impl Stream for StreamWrapper { Poll::Pending => Poll::Pending, } } + StreamWrapper::SingleBatch(ref mut ob) => { + Poll::Ready(Ok(ob.take()).transpose()) + } } } } @@ -290,6 +301,7 @@ impl FusedStream for StreamWrapper { match self { StreamWrapper::Receiver(receiver) => receiver.is_terminated(), StreamWrapper::Stream(stream) => stream.is_none(), + StreamWrapper::SingleBatch(b) => b.is_none(), } } } diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index fa49daf5a1a6..d00ff81cdd17 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -22,8 +22,7 @@ use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use std::any::Any; -use std::cmp::Ordering; -use std::collections::VecDeque; +use std::collections::{BinaryHeap, VecDeque}; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -177,7 +176,7 @@ impl ExecutionPlan for SortPreservingMergeExec { .unzip(); Ok(Box::pin( - SortPreservingMergeStream::new_from_receiver( + SortPreservingMergeStream::new_from_receivers( receivers, AbortOnDropMany(join_handles), self.schema(), @@ -293,7 +292,7 @@ pub(crate) struct SortPreservingMergeStream { /// /// Exhausted cursors will be popped off the front once all /// their rows have been yielded to the output - cursors: Vec>, + cursors: Vec>>, /// The accumulated row indexes for the next record batch in_progress: Vec, @@ -316,6 +315,9 @@ pub(crate) struct SortPreservingMergeStream { /// An index to uniquely identify the input stream batch next_batch_index: usize, + /// min heap for record comparison + min_heap: BinaryHeap>, + /// runtime runtime: Arc, } @@ -328,7 +330,7 @@ impl Drop for SortPreservingMergeStream { impl SortPreservingMergeStream { #[allow(clippy::too_many_arguments)] - pub(crate) async fn new_from_receiver( + pub(crate) async fn new_from_receivers( receivers: Vec>>, _drop_helper: AbortOnDropMany<()>, schema: SchemaRef, @@ -338,16 +340,16 @@ impl SortPreservingMergeStream { partition: usize, runtime: Arc, ) -> Self { - let cursors = (0..receivers.len()) + let stream_count = receivers.len(); + let cursors = (0..stream_count) .into_iter() .map(|_| VecDeque::new()) .collect(); - let wrappers = receivers.into_iter().map(StreamWrapper::Receiver).collect(); let streams = Arc::new(MergingStreams::new(partition, wrappers, runtime.clone())); runtime.register_consumer(&(streams.clone() as Arc)); - Self { + SortPreservingMergeStream { schema, cursors, streams, @@ -359,11 +361,12 @@ impl SortPreservingMergeStream { aborted: false, in_progress: vec![], next_batch_index: 0, + min_heap: BinaryHeap::with_capacity(stream_count), runtime, } } - pub(crate) async fn new_from_stream( + pub(crate) async fn new_from_streams( streams: Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], @@ -372,16 +375,15 @@ impl SortPreservingMergeStream { partition: usize, runtime: Arc, ) -> Self { - let cursors = (0..streams.len()) + let stream_count = streams.len(); + let cursors = (0..stream_count) .into_iter() .map(|_| VecDeque::new()) .collect(); - let wrappers = streams .into_iter() .map(|s| StreamWrapper::Stream(Some(s))) - .collect::>(); - + .collect(); let streams = Arc::new(MergingStreams::new(partition, wrappers, runtime.clone())); runtime.register_consumer(&(streams.clone() as Arc)); @@ -397,6 +399,47 @@ impl SortPreservingMergeStream { aborted: false, in_progress: vec![], next_batch_index: 0, + min_heap: BinaryHeap::with_capacity(stream_count), + runtime, + } + } + + pub(crate) async fn new_from_batches( + batches: Vec, + schema: SchemaRef, + expressions: &[PhysicalSortExpr], + target_batch_size: usize, + baseline_metrics: BaselineMetrics, + runtime: Arc, + ) -> Self { + let batch_count = batches.len(); + let wrappers = batches + .into_iter() + .map(|s| StreamWrapper::SingleBatch(Some(s))) + .collect(); + + let cursors = (0..batch_count) + .into_iter() + .map(|_| VecDeque::new()) + .collect(); + + // We are not registering this into runtime since it's only used within + // an external sorter, already a registered memory consumer. + let streams = Arc::new(MergingStreams::new(0, wrappers, runtime.clone())); + + SortPreservingMergeStream { + schema, + cursors, + streams, + _drop_helper: AbortOnDropMany(vec![]), + column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), + sort_options: Arc::new(expressions.iter().map(|x| x.options).collect()), + target_batch_size, + baseline_metrics, + aborted: false, + in_progress: vec![], + next_batch_index: 0, + min_heap: BinaryHeap::with_capacity(batch_count), runtime, } } @@ -430,18 +473,24 @@ impl SortPreservingMergeStream { return Poll::Ready(Err(e)); } Some(Ok(batch)) => { - let cursor = match SortKeyCursor::new( - self.next_batch_index, // assign this batch an ID - Arc::new(batch), - &self.column_expressions, - self.sort_options.clone(), - ) { - Ok(cursor) => cursor, - Err(e) => { - return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e)))); - } - }; + let cursor = Arc::new( + match SortKeyCursor::new( + idx, + self.next_batch_index, // assign this batch an ID + Arc::new(batch), + &self.column_expressions, + self.sort_options.clone(), + ) { + Ok(cursor) => cursor, + Err(e) => { + return Poll::Ready(Err(ArrowError::ExternalError( + Box::new(e), + ))); + } + }, + ); self.next_batch_index += 1; + self.min_heap.push(cursor.clone()); self.cursors[idx].push_back(cursor) } } @@ -449,28 +498,13 @@ impl SortPreservingMergeStream { Poll::Ready(Ok(())) } - /// Returns the index of the next stream to pull a row from, or None + /// Returns the cursor of the next stream to pull a row from, or None /// if all cursors for all streams are exhausted - fn next_stream_idx(&mut self) -> Result> { - let mut min_cursor: Option<(usize, &mut SortKeyCursor)> = None; - for (idx, candidate) in self.cursors.iter_mut().enumerate() { - if let Some(candidate) = candidate.back_mut() { - if candidate.is_finished() { - continue; - } - - match min_cursor { - None => min_cursor = Some((idx, candidate)), - Some((_, ref mut min)) => { - if min.compare(candidate)? == Ordering::Greater { - min_cursor = Some((idx, candidate)) - } - } - } - } + fn next_cursor(&mut self) -> Result>> { + match self.min_heap.pop() { + None => Ok(None), + Some(cursor) => Ok(Some(cursor)), } - - Ok(min_cursor.map(|(idx, _)| idx)) } /// Drains the in_progress row indexes, and builds a new RecordBatch from them @@ -604,8 +638,42 @@ impl SortPreservingMergeStream { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - let stream_idx = match self.next_stream_idx() { - Ok(Some(idx)) => idx, + match self.next_cursor() { + Ok(Some(cursor)) => { + let stream_idx = cursor.stream_idx; + let cursor_idx = self.cursors[stream_idx].len() - 1; + let row_idx = cursor.advance(); + + let mut cursor_finished = false; + // insert the cursor back to min_heap if the record batch is not exhausted + if !cursor.is_finished() { + self.min_heap.push(cursor); + } else { + cursor_finished = true; + } + + self.in_progress.push(RowIndex { + stream_idx, + cursor_idx, + row_idx, + }); + + if self.in_progress.len() == self.target_batch_size { + return Poll::Ready(Some(self.build_record_batch())); + } + + // If removed the last row from the cursor, need to fetch a new record + // batch if possible, before looping round again + if cursor_finished { + match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) { + Ok(_) => {} + Err(e) => { + self.aborted = true; + return Poll::Ready(Some(Err(e))); + } + } + } + } Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None), Ok(None) => return Poll::Ready(Some(self.build_record_batch())), Err(e) => { @@ -614,34 +682,6 @@ impl SortPreservingMergeStream { e, ))))); } - }; - - let cursors = &mut self.cursors[stream_idx]; - let cursor_idx = cursors.len() - 1; - let cursor = cursors.back_mut().unwrap(); - let row_idx = cursor.advance(); - let cursor_finished = cursor.is_finished(); - - self.in_progress.push(RowIndex { - stream_idx, - cursor_idx, - row_idx, - }); - - if self.in_progress.len() == self.target_batch_size { - return Poll::Ready(Some(self.build_record_batch())); - } - - // If removed the last row from the cursor, need to fetch a new record - // batch if possible, before looping round again - if cursor_finished { - match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) { - Ok(_) => {} - Err(e) => { - self.aborted = true; - return Poll::Ready(Some(Err(e))); - } - } } } } @@ -1257,7 +1297,7 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let baseline_metrics = BaselineMetrics::new(&metrics, 0); - let merge_stream = SortPreservingMergeStream::new_from_receiver( + let merge_stream = SortPreservingMergeStream::new_from_receivers( receivers, // Use empty vector since we want to use the join handles ourselves AbortOnDropMany(vec![]), From 4c099f714817b5db4321cc01cb25a95701582aea Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 17 Jan 2022 19:54:45 +0800 Subject: [PATCH 2/8] Incorporate metrics, external_sort pass all sort tests --- datafusion/src/physical_plan/common.rs | 14 +- datafusion/src/physical_plan/explain.rs | 5 + .../src/physical_plan/sorts/external_sort.rs | 367 ++++++++++++++++-- datafusion/src/physical_plan/sorts/mod.rs | 5 +- datafusion/tests/provider_filter_pushdown.rs | 6 +- 5 files changed, 354 insertions(+), 43 deletions(-) diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index dd0c8248e459..391ee84ff026 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -20,6 +20,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::metrics::BaselineMetrics; use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; use arrow::compute::concat; use arrow::datatypes::{Schema, SchemaRef}; @@ -41,15 +42,21 @@ pub struct SizedRecordBatchStream { schema: SchemaRef, batches: Vec>, index: usize, + baseline_metrics: BaselineMetrics, } impl SizedRecordBatchStream { /// Create a new RecordBatchIterator - pub fn new(schema: SchemaRef, batches: Vec>) -> Self { + pub fn new( + schema: SchemaRef, + batches: Vec>, + baseline_metrics: BaselineMetrics, + ) -> Self { SizedRecordBatchStream { schema, index: 0, batches, + baseline_metrics, } } } @@ -61,12 +68,13 @@ impl Stream for SizedRecordBatchStream { mut self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { - Poll::Ready(if self.index < self.batches.len() { + let poll = Poll::Ready(if self.index < self.batches.len() { self.index += 1; Some(Ok(self.batches[self.index - 1].as_ref().clone())) } else { None - }) + }); + self.baseline_metrics.record_poll(poll) } } diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index df3dc98f196d..f827dc32eca4 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -32,6 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc use super::SendableRecordBatchStream; use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use async_trait::async_trait; /// Explain execution plan operator. This operator contains the string @@ -146,9 +147,13 @@ impl ExecutionPlan for ExplainExec { ], )?; + let metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&metrics, partition); + Ok(Box::pin(SizedRecordBatchStream::new( self.schema.clone(), vec![Arc::new(record_batch)], + baseline_metrics, ))) } diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs index 6f436fc7f7fd..ebda27887480 100644 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ b/datafusion/src/physical_plan/sorts/external_sort.rs @@ -25,7 +25,7 @@ use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time, }; use crate::physical_plan::sorts::sort::sort_batch; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; @@ -50,6 +50,7 @@ use std::fs::File; use std::io::BufReader; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; use tokio::task; @@ -63,7 +64,7 @@ use tokio::task; /// let batch = input.next(); /// // no enough memory available, spill first. /// if exec_memory_available < size_of(batch) { -/// let ordered_stream = in_mem_heap_sort(in_mem_batches.drain(..)); +/// let ordered_stream = sort_preserving_merge(in_mem_batches.drain(..)); /// let tmp_file = spill_write(ordered_stream); /// spills.push(tmp_file); /// } @@ -73,7 +74,7 @@ use tokio::task; /// } /// /// let partial_ordered_streams = vec![]; -/// let in_mem_stream = in_mem_heap_sort(in_mem_batches.drain(..)); +/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..)); /// partial_ordered_streams.push(in_mem_stream); /// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream)); /// let result = sort_preserving_merge(partial_ordered_streams); @@ -85,7 +86,8 @@ struct ExternalSorter { /// Sort expressions expr: Vec, runtime: Arc, - metrics: ExecutionPlanMetricsSet, + metrics: AggregatedMetricsSet, + inner_metrics: BaselineMetrics, used: AtomicUsize, spilled_bytes: AtomicUsize, spilled_count: AtomicUsize, @@ -96,8 +98,10 @@ impl ExternalSorter { partition_id: usize, schema: SchemaRef, expr: Vec, + metrics: AggregatedMetricsSet, runtime: Arc, ) -> Self { + let inner_metrics = metrics.new_intermediate_baseline(partition_id); Self { id: MemoryConsumerId::new(partition_id), schema, @@ -105,7 +109,8 @@ impl ExternalSorter { spills: Mutex::new(vec![]), expr, runtime, - metrics: ExecutionPlanMetricsSet::new(), + metrics, + inner_metrics, used: AtomicUsize::new(0), spilled_bytes: AtomicUsize::new(0), spilled_count: AtomicUsize::new(0), @@ -117,49 +122,70 @@ impl ExternalSorter { self.try_grow(size).await?; self.used.fetch_add(size, Ordering::SeqCst); // sort each batch as it's inserted, more probably to be cache-resident + let elapsed_compute = self.inner_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); let sorted_batch = sort_batch(input, self.schema.clone(), &*self.expr)?; + timer.done(); let mut in_mem_batches = self.in_mem_batches.lock().await; in_mem_batches.push(sorted_batch); Ok(()) } + async fn spilled_before(&self) -> bool { + let spills = self.spills.lock().await; + !spills.is_empty() + } + /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. async fn sort(&self) -> Result { let partition = self.partition_id(); let mut in_mem_batches = self.in_mem_batches.lock().await; - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let mut streams: Vec = vec![]; - let in_mem_stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - self.runtime.batch_size(), - baseline_metrics, - self.runtime.clone(), - ) - .await?; - streams.push(SortedStream::new(in_mem_stream, self.used())); - let mut spills = self.spills.lock().await; + if self.spilled_before().await { + let baseline_metrics = self.metrics.new_intermediate_baseline(partition); + let mut streams: Vec = vec![]; + let in_mem_stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + self.runtime.batch_size(), + baseline_metrics, + self.runtime.clone(), + ) + .await?; + streams.push(SortedStream::new(in_mem_stream, self.used())); - for spill in spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone()).await?; - streams.push(SortedStream::new(stream, 0)); - } - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let mut spills = self.spills.lock().await; - Ok(Box::pin( - SortPreservingMergeStream::new_from_streams( - streams, + for spill in spills.drain(..) { + let stream = read_spill_as_stream(spill, self.schema.clone()).await?; + streams.push(SortedStream::new(stream, 0)); + } + let baseline_metrics = self.metrics.new_final_baseline(partition); + Ok(Box::pin( + SortPreservingMergeStream::new_from_streams( + streams, + self.schema.clone(), + &self.expr, + self.runtime.batch_size(), + baseline_metrics, + partition, + self.runtime.clone(), + ) + .await, + )) + } else { + let baseline_metrics = self.metrics.new_final_baseline(partition); + in_mem_partial_sort( + &mut *in_mem_batches, self.schema.clone(), &self.expr, self.runtime.batch_size(), baseline_metrics, - partition, self.runtime.clone(), ) - .await, - )) + .await + } } fn used(&self) -> usize { @@ -220,7 +246,7 @@ impl MemoryConsumer for ExternalSorter { return Ok(0); } - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let baseline_metrics = self.metrics.new_intermediate_baseline(partition); let path = self.runtime.disk_manager.create_tmp_file()?; let stream = in_mem_partial_sort( @@ -263,6 +289,7 @@ async fn in_mem_partial_sort( Ok(Box::pin(SizedRecordBatchStream::new( schema, vec![Arc::new(sorted_bathes.pop().unwrap())], + baseline_metrics, ))) } else { let batches = sorted_bathes.drain(..).collect(); @@ -359,12 +386,79 @@ pub struct ExternalSortExec { input: Arc, /// Sort expressions expr: Vec, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, + /// Containing all metrics set created for sort, such as all sets for `sort_merge_join`s + all_metrics: AggregatedMetricsSet, /// Preserve partitions of input plan preserve_partitioning: bool, } +#[derive(Debug, Clone)] +struct AggregatedMetricsSet { + intermediate: Arc>>, + final_: Arc>>, +} + +impl AggregatedMetricsSet { + fn new() -> Self { + Self { + intermediate: Arc::new(std::sync::Mutex::new(vec![])), + final_: Arc::new(std::sync::Mutex::new(vec![])), + } + } + + fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { + let ms = ExecutionPlanMetricsSet::new(); + let result = BaselineMetrics::new(&ms, partition); + self.intermediate.lock().unwrap().push(ms); + result + } + + fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { + let ms = ExecutionPlanMetricsSet::new(); + let result = BaselineMetrics::new(&ms, partition); + self.final_.lock().unwrap().push(ms); + result + } + + fn merge_compute_time(&self, dest: &Time) { + let time1 = self + .intermediate + .lock() + .unwrap() + .iter() + .map(|es| { + es.clone_inner() + .elapsed_compute() + .map_or(0u64, |v| v as u64) + }) + .sum(); + let time2 = self + .final_ + .lock() + .unwrap() + .iter() + .map(|es| { + es.clone_inner() + .elapsed_compute() + .map_or(0u64, |v| v as u64) + }) + .sum(); + dest.add_duration(Duration::from_nanos(time1)); + dest.add_duration(Duration::from_nanos(time2)); + } + + fn merge_output_count(&self, dest: &Count) { + let count = self + .final_ + .lock() + .unwrap() + .iter() + .map(|es| es.clone_inner().output_rows().map_or(0, |v| v)) + .sum(); + dest.add(count); + } +} + impl ExternalSortExec { /// Create a new sort execution plan pub fn try_new( @@ -384,7 +478,7 @@ impl ExternalSortExec { Self { expr, input, - metrics: ExecutionPlanMetricsSet::new(), + all_metrics: AggregatedMetricsSet::new(), preserve_partitioning, } } @@ -467,14 +561,25 @@ impl ExecutionPlan for ExternalSortExec { } } - let _baseline_metrics = BaselineMetrics::new(&self.metrics, partition); let input = self.input.execute(partition, runtime.clone()).await?; - external_sort(input, partition, self.expr.clone(), runtime).await + external_sort( + input, + partition, + self.expr.clone(), + self.all_metrics.clone(), + runtime, + ) + .await } fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) + let metrics = ExecutionPlanMetricsSet::new(); + let baseline = BaselineMetrics::new(&metrics, 0); + self.all_metrics + .merge_compute_time(baseline.elapsed_compute()); + self.all_metrics.merge_output_count(baseline.output_rows()); + Some(metrics.clone_inner()) } fn fmt_as( @@ -499,6 +604,7 @@ async fn external_sort( mut input: SendableRecordBatchStream, partition_id: usize, expr: Vec, + metrics: AggregatedMetricsSet, runtime: Arc, ) -> Result { let schema = input.schema(); @@ -506,6 +612,7 @@ async fn external_sort( partition_id, schema.clone(), expr, + metrics, runtime.clone(), )); runtime.register_consumer(&(sorter.clone() as Arc)); @@ -527,15 +634,20 @@ mod tests { use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; + use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::{ collect, file_format::{CsvExec, PhysicalPlanConfig}, }; use crate::test; + use crate::test::assert_is_pending; + use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test_util; use arrow::array::*; use arrow::compute::SortOptions; use arrow::datatypes::*; + use futures::FutureExt; + use std::collections::{BTreeMap, HashMap}; async fn sort_with_runtime(runtime: Arc) -> Result> { let schema = test_util::aggr_test_schema(); @@ -662,4 +774,187 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_sort_metadata() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let field_metadata: BTreeMap = + vec![("foo".to_string(), "bar".to_string())] + .into_iter() + .collect(); + let schema_metadata: HashMap = + vec![("baz".to_string(), "barf".to_string())] + .into_iter() + .collect(); + + let mut field = Field::new("field_name", DataType::UInt64, true); + field.set_metadata(Some(field_metadata.clone())); + let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone()); + let schema = Arc::new(schema); + + let data: ArrayRef = + Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::()); + + let batch = RecordBatch::try_new(schema.clone(), vec![data]).unwrap(); + let input = + Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); + + let sort_exec = Arc::new(ExternalSortExec::try_new( + vec![PhysicalSortExpr { + expr: col("field_name", &schema)?, + options: SortOptions::default(), + }], + input, + )?); + + let result: Vec = collect(sort_exec, runtime).await?; + + let expected_data: ArrayRef = + Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::()); + let expected_batch = + RecordBatch::try_new(schema.clone(), vec![expected_data]).unwrap(); + + // Data is correct + assert_eq!(&vec![expected_batch], &result); + + // explicitlty ensure the metadata is present + assert_eq!( + result[0].schema().fields()[0].metadata(), + &Some(field_metadata) + ); + assert_eq!(result[0].schema().metadata(), &schema_metadata); + + Ok(()) + } + + #[tokio::test] + async fn test_lex_sort_by_float() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Float32, true), + Field::new("b", DataType::Float64, true), + ])); + + // define data. + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float32Array::from(vec![ + Some(f32::NAN), + None, + None, + Some(f32::NAN), + Some(1.0_f32), + Some(1.0_f32), + Some(2.0_f32), + Some(3.0_f32), + ])), + Arc::new(Float64Array::from(vec![ + Some(200.0_f64), + Some(20.0_f64), + Some(10.0_f64), + Some(100.0_f64), + Some(f64::NAN), + None, + None, + Some(f64::NAN), + ])), + ], + )?; + + let sort_exec = Arc::new(ExternalSortExec::try_new( + vec![ + PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }, + PhysicalSortExpr { + expr: col("b", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + ], + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), + )?); + + assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); + assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); + + let result: Vec = collect(sort_exec.clone(), runtime).await?; + let metrics = sort_exec.metrics().unwrap(); + assert!(metrics.elapsed_compute().unwrap() > 0); + assert_eq!(metrics.output_rows().unwrap(), 8); + assert_eq!(result.len(), 1); + + let columns = result[0].columns(); + + assert_eq!(DataType::Float32, *columns[0].data_type()); + assert_eq!(DataType::Float64, *columns[1].data_type()); + + let a = as_primitive_array::(&columns[0]); + let b = as_primitive_array::(&columns[1]); + + // convert result to strings to allow comparing to expected result containing NaN + let result: Vec<(Option, Option)> = (0..result[0].num_rows()) + .map(|i| { + let aval = if a.is_valid(i) { + Some(a.value(i).to_string()) + } else { + None + }; + let bval = if b.is_valid(i) { + Some(b.value(i).to_string()) + } else { + None + }; + (aval, bval) + }) + .collect(); + + let expected: Vec<(Option, Option)> = vec![ + (None, Some("10".to_owned())), + (None, Some("20".to_owned())), + (Some("NaN".to_owned()), Some("100".to_owned())), + (Some("NaN".to_owned()), Some("200".to_owned())), + (Some("3".to_owned()), Some("NaN".to_owned())), + (Some("2".to_owned()), None), + (Some("1".to_owned()), Some("NaN".to_owned())), + (Some("1".to_owned()), None), + ]; + + assert_eq!(expected, result); + + Ok(()) + } + + #[tokio::test] + async fn test_drop_cancel() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); + let refs = blocking_exec.refs(); + let sort_exec = Arc::new(ExternalSortExec::try_new( + vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions::default(), + }], + blocking_exec, + )?); + + let fut = collect(sort_exec, runtime); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); + drop(fut); + assert_strong_count_converges_to_zero(refs).await; + + Ok(()) + } } diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index 11e9d3845f66..62f7d0b9bd53 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -200,7 +200,7 @@ impl SortKeyCursor { } impl Ord for SortKeyCursor { - /// Needed by min-heap comparison in `in_mem_sort` and reverse the order at the same time. + /// Needed by min-heap comparison and reverse the order at the same time. fn cmp(&self, other: &Self) -> Ordering { other.compare(self).unwrap() } @@ -226,8 +226,7 @@ impl PartialOrd for SortKeyCursor { struct RowIndex { /// The index of the stream stream_idx: usize, - /// For sort_preserving_merge, it's the index of the cursor within the stream's VecDequeue. - /// For in_mem_sort which have only one batch for each stream, cursor_idx always 0 + /// The index of the cursor within the stream's VecDequeue. cursor_idx: usize, /// The row index row_idx: usize, diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 330e95c6b037..396ae21fb699 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -25,6 +25,7 @@ use datafusion::execution::context::ExecutionContext; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr; use datafusion::physical_plan::common::SizedRecordBatchStream; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -81,12 +82,15 @@ impl ExecutionPlan for CustomPlan { async fn execute( &self, - _partition: usize, + partition: usize, _runtime: Arc, ) -> Result { + let metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&metrics, partition); Ok(Box::pin(SizedRecordBatchStream::new( self.schema(), self.batches.clone(), + baseline_metrics, ))) } From 419a3622221eb9c22074925bd2348bb17051cfd0 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 18 Jan 2022 15:07:44 +0800 Subject: [PATCH 3/8] Remove the original sort, substitute with external sort --- datafusion/src/execution/context.rs | 1 + .../src/physical_plan/sorts/external_sort.rs | 960 ------------------ datafusion/src/physical_plan/sorts/mod.rs | 1 - datafusion/src/physical_plan/sorts/sort.rs | 658 +++++++++--- datafusion/tests/sql/joins.rs | 32 +- 5 files changed, 556 insertions(+), 1096 deletions(-) delete mode 100644 datafusion/src/physical_plan/sorts/external_sort.rs diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 89ccd7b2b938..6bae15bfef67 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1214,6 +1214,7 @@ impl FunctionRegistry for ExecutionContextState { #[cfg(test)] mod tests { use super::*; + use crate::execution::context::QueryPlanner; use crate::logical_plan::plan::Projection; use crate::logical_plan::TableScan; use crate::logical_plan::{binary_expr, lit, Operator}; diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs deleted file mode 100644 index ebda27887480..000000000000 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ /dev/null @@ -1,960 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines the External-Sort plan - -use crate::error::{DataFusionError, Result}; -use crate::execution::memory_manager::{ - ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, -}; -use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; -use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time, -}; -use crate::physical_plan::sorts::sort::sort_batch; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; -use crate::physical_plan::sorts::SortedStream; -use crate::physical_plan::stream::RecordBatchReceiverStream; -use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, -}; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; -use arrow::ipc::reader::FileReader; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use futures::lock::Mutex; -use futures::StreamExt; -use log::{error, info}; -use std::any::Any; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::fs::File; -use std::io::BufReader; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; -use tokio::task; - -/// Sort arbitrary size of data to get an total order (may spill several times during sorting based on free memory available). -/// -/// The basic architecture of the algorithm: -/// -/// let spills = vec![]; -/// let in_mem_batches = vec![]; -/// while (input.has_next()) { -/// let batch = input.next(); -/// // no enough memory available, spill first. -/// if exec_memory_available < size_of(batch) { -/// let ordered_stream = sort_preserving_merge(in_mem_batches.drain(..)); -/// let tmp_file = spill_write(ordered_stream); -/// spills.push(tmp_file); -/// } -/// // sort the batch while it's probably still in cache and buffer it. -/// let sorted = sort_by_key(batch); -/// in_mem_batches.push(sorted); -/// } -/// -/// let partial_ordered_streams = vec![]; -/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..)); -/// partial_ordered_streams.push(in_mem_stream); -/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream)); -/// let result = sort_preserving_merge(partial_ordered_streams); -struct ExternalSorter { - id: MemoryConsumerId, - schema: SchemaRef, - in_mem_batches: Mutex>, - spills: Mutex>, - /// Sort expressions - expr: Vec, - runtime: Arc, - metrics: AggregatedMetricsSet, - inner_metrics: BaselineMetrics, - used: AtomicUsize, - spilled_bytes: AtomicUsize, - spilled_count: AtomicUsize, -} - -impl ExternalSorter { - pub fn new( - partition_id: usize, - schema: SchemaRef, - expr: Vec, - metrics: AggregatedMetricsSet, - runtime: Arc, - ) -> Self { - let inner_metrics = metrics.new_intermediate_baseline(partition_id); - Self { - id: MemoryConsumerId::new(partition_id), - schema, - in_mem_batches: Mutex::new(vec![]), - spills: Mutex::new(vec![]), - expr, - runtime, - metrics, - inner_metrics, - used: AtomicUsize::new(0), - spilled_bytes: AtomicUsize::new(0), - spilled_count: AtomicUsize::new(0), - } - } - - async fn insert_batch(&self, input: RecordBatch) -> Result<()> { - let size = batch_byte_size(&input); - self.try_grow(size).await?; - self.used.fetch_add(size, Ordering::SeqCst); - // sort each batch as it's inserted, more probably to be cache-resident - let elapsed_compute = self.inner_metrics.elapsed_compute().clone(); - let timer = elapsed_compute.timer(); - let sorted_batch = sort_batch(input, self.schema.clone(), &*self.expr)?; - timer.done(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - in_mem_batches.push(sorted_batch); - Ok(()) - } - - async fn spilled_before(&self) -> bool { - let spills = self.spills.lock().await; - !spills.is_empty() - } - - /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - async fn sort(&self) -> Result { - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - - if self.spilled_before().await { - let baseline_metrics = self.metrics.new_intermediate_baseline(partition); - let mut streams: Vec = vec![]; - let in_mem_stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - self.runtime.batch_size(), - baseline_metrics, - self.runtime.clone(), - ) - .await?; - streams.push(SortedStream::new(in_mem_stream, self.used())); - - let mut spills = self.spills.lock().await; - - for spill in spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone()).await?; - streams.push(SortedStream::new(stream, 0)); - } - let baseline_metrics = self.metrics.new_final_baseline(partition); - Ok(Box::pin( - SortPreservingMergeStream::new_from_streams( - streams, - self.schema.clone(), - &self.expr, - self.runtime.batch_size(), - baseline_metrics, - partition, - self.runtime.clone(), - ) - .await, - )) - } else { - let baseline_metrics = self.metrics.new_final_baseline(partition); - in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - self.runtime.batch_size(), - baseline_metrics, - self.runtime.clone(), - ) - .await - } - } - - fn used(&self) -> usize { - self.used.load(Ordering::SeqCst) - } - - fn spilled_bytes(&self) -> usize { - self.spilled_bytes.load(Ordering::SeqCst) - } - - fn spilled_count(&self) -> usize { - self.spilled_count.load(Ordering::SeqCst) - } -} - -impl Debug for ExternalSorter { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter") - .field("id", &self.id()) - .field("memory_used", &self.used()) - .field("spilled_bytes", &self.spilled_bytes()) - .field("spilled_count", &self.spilled_count()) - .finish() - } -} - -#[async_trait] -impl MemoryConsumer for ExternalSorter { - fn name(&self) -> String { - "ExternalSorter".to_owned() - } - - fn id(&self) -> &MemoryConsumerId { - &self.id - } - - fn memory_manager(&self) -> Arc { - self.runtime.memory_manager.clone() - } - - fn type_(&self) -> &ConsumerType { - &ConsumerType::Requesting - } - - async fn spill(&self) -> Result { - info!( - "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", - self.name(), - self.id(), - self.used(), - self.spilled_count() - ); - - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - // we could always get a chance to free some memory as long as we are holding some - if in_mem_batches.len() == 0 { - return Ok(0); - } - - let baseline_metrics = self.metrics.new_intermediate_baseline(partition); - - let path = self.runtime.disk_manager.create_tmp_file()?; - let stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &*self.expr, - self.runtime.batch_size(), - baseline_metrics, - self.runtime.clone(), - ) - .await; - - let total_size = - spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) - .await?; - - let mut spills = self.spills.lock().await; - let used = self.used.swap(0, Ordering::SeqCst); - self.spilled_count.fetch_add(1, Ordering::SeqCst); - self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst); - spills.push(path); - Ok(used) - } - - fn mem_used(&self) -> usize { - self.used.load(Ordering::SeqCst) - } -} - -/// consume the `sorted_bathes` and do in_mem_sort -async fn in_mem_partial_sort( - sorted_bathes: &mut Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - target_batch_size: usize, - baseline_metrics: BaselineMetrics, - runtime: Arc, -) -> Result { - if sorted_bathes.len() == 1 { - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(sorted_bathes.pop().unwrap())], - baseline_metrics, - ))) - } else { - let batches = sorted_bathes.drain(..).collect(); - assert_eq!(sorted_bathes.len(), 0); - Ok(Box::pin( - SortPreservingMergeStream::new_from_batches( - batches, - schema, - expressions, - target_batch_size, - baseline_metrics, - runtime, - ) - .await, - )) - } -} - -async fn spill_partial_sorted_stream( - in_mem_stream: &mut SendableRecordBatchStream, - path: String, - schema: SchemaRef, -) -> Result { - let (sender, receiver) = tokio::sync::mpsc::channel(2); - while let Some(item) = in_mem_stream.next().await { - sender.send(Some(item)).await.ok(); - } - sender.send(None).await.ok(); - let path_clone = path.clone(); - let res = - task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; - match res { - Ok(r) => r, - Err(e) => Err(DataFusionError::Execution(format!( - "Error occurred while spilling {}", - e - ))), - } -} - -async fn read_spill_as_stream( - path: String, - schema: SchemaRef, -) -> Result { - let (sender, receiver): ( - TKSender>, - TKReceiver>, - ) = tokio::sync::mpsc::channel(2); - let path_clone = path.clone(); - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_spill(sender, path_clone) { - error!("Failure while reading spill file: {}. Error: {}", path, e); - } - }); - Ok(RecordBatchReceiverStream::create( - &schema, - receiver, - join_handle, - )) -} - -fn write_sorted( - mut receiver: TKReceiver>>, - path: String, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - while let Some(Some(batch)) = receiver.blocking_recv() { - writer.write(&batch?)?; - } - writer.finish()?; - info!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, writer.num_rows, writer.num_bytes - ); - Ok(writer.num_bytes as usize) -} - -fn read_spill(sender: TKSender>, path: String) -> Result<()> { - let file = BufReader::new(File::open(&path)?); - let reader = FileReader::try_new(file)?; - for batch in reader { - sender - .blocking_send(batch) - .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; - } - Ok(()) -} - -/// External Sort execution plan -#[derive(Debug)] -pub struct ExternalSortExec { - /// Input schema - input: Arc, - /// Sort expressions - expr: Vec, - /// Containing all metrics set created for sort, such as all sets for `sort_merge_join`s - all_metrics: AggregatedMetricsSet, - /// Preserve partitions of input plan - preserve_partitioning: bool, -} - -#[derive(Debug, Clone)] -struct AggregatedMetricsSet { - intermediate: Arc>>, - final_: Arc>>, -} - -impl AggregatedMetricsSet { - fn new() -> Self { - Self { - intermediate: Arc::new(std::sync::Mutex::new(vec![])), - final_: Arc::new(std::sync::Mutex::new(vec![])), - } - } - - fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { - let ms = ExecutionPlanMetricsSet::new(); - let result = BaselineMetrics::new(&ms, partition); - self.intermediate.lock().unwrap().push(ms); - result - } - - fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { - let ms = ExecutionPlanMetricsSet::new(); - let result = BaselineMetrics::new(&ms, partition); - self.final_.lock().unwrap().push(ms); - result - } - - fn merge_compute_time(&self, dest: &Time) { - let time1 = self - .intermediate - .lock() - .unwrap() - .iter() - .map(|es| { - es.clone_inner() - .elapsed_compute() - .map_or(0u64, |v| v as u64) - }) - .sum(); - let time2 = self - .final_ - .lock() - .unwrap() - .iter() - .map(|es| { - es.clone_inner() - .elapsed_compute() - .map_or(0u64, |v| v as u64) - }) - .sum(); - dest.add_duration(Duration::from_nanos(time1)); - dest.add_duration(Duration::from_nanos(time2)); - } - - fn merge_output_count(&self, dest: &Count) { - let count = self - .final_ - .lock() - .unwrap() - .iter() - .map(|es| es.clone_inner().output_rows().map_or(0, |v| v)) - .sum(); - dest.add(count); - } -} - -impl ExternalSortExec { - /// Create a new sort execution plan - pub fn try_new( - expr: Vec, - input: Arc, - ) -> Result { - Ok(Self::new_with_partitioning(expr, input, false)) - } - - /// Create a new sort execution plan with the option to preserve - /// the partitioning of the input plan - pub fn new_with_partitioning( - expr: Vec, - input: Arc, - preserve_partitioning: bool, - ) -> Self { - Self { - expr, - input, - all_metrics: AggregatedMetricsSet::new(), - preserve_partitioning, - } - } - - /// Input schema - pub fn input(&self) -> &Arc { - &self.input - } - - /// Sort expressions - pub fn expr(&self) -> &[PhysicalSortExpr] { - &self.expr - } -} - -#[async_trait] -impl ExecutionPlan for ExternalSortExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.input.schema() - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - if self.preserve_partitioning { - self.input.output_partitioning() - } else { - Partitioning::UnknownPartitioning(1) - } - } - - fn required_child_distribution(&self) -> Distribution { - if self.preserve_partitioning { - Distribution::UnspecifiedDistribution - } else { - Distribution::SinglePartition - } - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - fn with_new_children( - &self, - children: Vec>, - ) -> Result> { - match children.len() { - 1 => Ok(Arc::new(ExternalSortExec::try_new( - self.expr.clone(), - children[0].clone(), - )?)), - _ => Err(DataFusionError::Internal( - "SortExec wrong number of children".to_string(), - )), - } - } - - async fn execute( - &self, - partition: usize, - runtime: Arc, - ) -> Result { - if !self.preserve_partitioning { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "SortExec invalid partition {}", - partition - ))); - } - - // sort needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "SortExec requires a single input partition".to_owned(), - )); - } - } - - let input = self.input.execute(partition, runtime.clone()).await?; - - external_sort( - input, - partition, - self.expr.clone(), - self.all_metrics.clone(), - runtime, - ) - .await - } - - fn metrics(&self) -> Option { - let metrics = ExecutionPlanMetricsSet::new(); - let baseline = BaselineMetrics::new(&metrics, 0); - self.all_metrics - .merge_compute_time(baseline.elapsed_compute()); - self.all_metrics.merge_output_count(baseline.output_rows()); - Some(metrics.clone_inner()) - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - write!(f, "SortExec: [{}]", expr.join(",")) - } - } - } - - fn statistics(&self) -> Statistics { - self.input.statistics() - } -} - -async fn external_sort( - mut input: SendableRecordBatchStream, - partition_id: usize, - expr: Vec, - metrics: AggregatedMetricsSet, - runtime: Arc, -) -> Result { - let schema = input.schema(); - let sorter = Arc::new(ExternalSorter::new( - partition_id, - schema.clone(), - expr, - metrics, - runtime.clone(), - )); - runtime.register_consumer(&(sorter.clone() as Arc)); - - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; - } - - let result = sorter.sort().await; - runtime.drop_consumer(sorter.id()); - result -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::datasource::object_store::local::LocalFileSystem; - use crate::execution::runtime_env::RuntimeConfig; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::physical_plan::expressions::col; - use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::{ - collect, - file_format::{CsvExec, PhysicalPlanConfig}, - }; - use crate::test; - use crate::test::assert_is_pending; - use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::test_util; - use arrow::array::*; - use arrow::compute::SortOptions; - use arrow::datatypes::*; - use futures::FutureExt; - use std::collections::{BTreeMap, HashMap}; - - async fn sort_with_runtime(runtime: Arc) -> Result> { - let schema = test_util::aggr_test_schema(); - let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - PhysicalPlanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - batch_size: 1024, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let sort_exec = Arc::new(ExternalSortExec::try_new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), - )?); - - collect(sort_exec, runtime).await - } - - #[tokio::test] - async fn test_in_mem_sort() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_sort_spill() -> Result<()> { - let config = RuntimeConfig::new() - .with_memory_fraction(1.0) - // trigger spill there will be 4 batches with 5.5KB for each - .with_max_execution_memory(12288); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_multi_output_batch() -> Result<()> { - let config = RuntimeConfig::new().with_batch_size(26); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 4); - - let columns_b1 = result[0].columns(); - let columns_b3 = result[3].columns(); - - let c1 = as_string_array(&columns_b1[0]); - let c13 = as_string_array(&columns_b3[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c13.value(c13.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns_b1[1]); - let c23 = as_primitive_array::(&columns_b3[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c23.value(c23.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns_b1[6]); - let c73 = as_primitive_array::(&columns_b3[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c73.value(c73.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_sort_metadata() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); - let field_metadata: BTreeMap = - vec![("foo".to_string(), "bar".to_string())] - .into_iter() - .collect(); - let schema_metadata: HashMap = - vec![("baz".to_string(), "barf".to_string())] - .into_iter() - .collect(); - - let mut field = Field::new("field_name", DataType::UInt64, true); - field.set_metadata(Some(field_metadata.clone())); - let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone()); - let schema = Arc::new(schema); - - let data: ArrayRef = - Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::()); - - let batch = RecordBatch::try_new(schema.clone(), vec![data]).unwrap(); - let input = - Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); - - let sort_exec = Arc::new(ExternalSortExec::try_new( - vec![PhysicalSortExpr { - expr: col("field_name", &schema)?, - options: SortOptions::default(), - }], - input, - )?); - - let result: Vec = collect(sort_exec, runtime).await?; - - let expected_data: ArrayRef = - Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::()); - let expected_batch = - RecordBatch::try_new(schema.clone(), vec![expected_data]).unwrap(); - - // Data is correct - assert_eq!(&vec![expected_batch], &result); - - // explicitlty ensure the metadata is present - assert_eq!( - result[0].schema().fields()[0].metadata(), - &Some(field_metadata) - ); - assert_eq!(result[0].schema().metadata(), &schema_metadata); - - Ok(()) - } - - #[tokio::test] - async fn test_lex_sort_by_float() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Float32, true), - Field::new("b", DataType::Float64, true), - ])); - - // define data. - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Float32Array::from(vec![ - Some(f32::NAN), - None, - None, - Some(f32::NAN), - Some(1.0_f32), - Some(1.0_f32), - Some(2.0_f32), - Some(3.0_f32), - ])), - Arc::new(Float64Array::from(vec![ - Some(200.0_f64), - Some(20.0_f64), - Some(10.0_f64), - Some(100.0_f64), - Some(f64::NAN), - None, - None, - Some(f64::NAN), - ])), - ], - )?; - - let sort_exec = Arc::new(ExternalSortExec::try_new( - vec![ - PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions { - descending: true, - nulls_first: true, - }, - }, - PhysicalSortExpr { - expr: col("b", &schema)?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }, - ], - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), - )?); - - assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); - assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); - - let result: Vec = collect(sort_exec.clone(), runtime).await?; - let metrics = sort_exec.metrics().unwrap(); - assert!(metrics.elapsed_compute().unwrap() > 0); - assert_eq!(metrics.output_rows().unwrap(), 8); - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - assert_eq!(DataType::Float32, *columns[0].data_type()); - assert_eq!(DataType::Float64, *columns[1].data_type()); - - let a = as_primitive_array::(&columns[0]); - let b = as_primitive_array::(&columns[1]); - - // convert result to strings to allow comparing to expected result containing NaN - let result: Vec<(Option, Option)> = (0..result[0].num_rows()) - .map(|i| { - let aval = if a.is_valid(i) { - Some(a.value(i).to_string()) - } else { - None - }; - let bval = if b.is_valid(i) { - Some(b.value(i).to_string()) - } else { - None - }; - (aval, bval) - }) - .collect(); - - let expected: Vec<(Option, Option)> = vec![ - (None, Some("10".to_owned())), - (None, Some("20".to_owned())), - (Some("NaN".to_owned()), Some("100".to_owned())), - (Some("NaN".to_owned()), Some("200".to_owned())), - (Some("3".to_owned()), Some("NaN".to_owned())), - (Some("2".to_owned()), None), - (Some("1".to_owned()), Some("NaN".to_owned())), - (Some("1".to_owned()), None), - ]; - - assert_eq!(expected, result); - - Ok(()) - } - - #[tokio::test] - async fn test_drop_cancel() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); - - let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); - let refs = blocking_exec.refs(); - let sort_exec = Arc::new(ExternalSortExec::try_new( - vec![PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions::default(), - }], - blocking_exec, - )?); - - let fut = collect(sort_exec, runtime); - let mut fut = fut.boxed(); - - assert_is_pending(&mut fut); - drop(fut); - assert_strong_count_converges_to_zero(refs).await; - - Ok(()) - } -} diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index 62f7d0b9bd53..e7caf53309e7 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -37,7 +37,6 @@ use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; -pub mod external_sort; pub mod sort; pub mod sort_preserving_merge; diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 678ad03745d9..64e2b465c22b 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -15,47 +15,456 @@ // specific language governing permissions and limitations // under the License. -//! Defines the SORT plan +//! Sort that deals with an arbitrary size of the input. +//! It will do in-memory sorting if it has enough memory budget +//! but spills to disk if needed. use crate::error::{DataFusionError, Result}; +use crate::execution::memory_manager::{ + ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, +}; use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::AbortOnDropSingle; +use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time, }; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; +use crate::physical_plan::sorts::SortedStream; +use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + SendableRecordBatchStream, Statistics, }; -use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, Statistics}; +use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; +use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; -use arrow::{array::ArrayRef, error::ArrowError}; use async_trait::async_trait; -use futures::stream::Stream; -use futures::Future; -use pin_project_lite::pin_project; +use futures::lock::Mutex; +use futures::StreamExt; +use log::{error, info}; use std::any::Any; -use std::pin::Pin; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::File; +use std::io::BufReader; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; +use tokio::task; + +/// Sort arbitrary size of data to get an total order (may spill several times during sorting based on free memory available). +/// +/// The basic architecture of the algorithm: +/// +/// let spills = vec![]; +/// let in_mem_batches = vec![]; +/// while (input.has_next()) { +/// let batch = input.next(); +/// // no enough memory available, spill first. +/// if exec_memory_available < size_of(batch) { +/// let ordered_stream = sort_preserving_merge(in_mem_batches.drain(..)); +/// let tmp_file = spill_write(ordered_stream); +/// spills.push(tmp_file); +/// } +/// // sort the batch while it's probably still in cache and buffer it. +/// let sorted = sort_by_key(batch); +/// in_mem_batches.push(sorted); +/// } +/// +/// let partial_ordered_streams = vec![]; +/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..)); +/// partial_ordered_streams.push(in_mem_stream); +/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream)); +/// let result = sort_preserving_merge(partial_ordered_streams); +struct ExternalSorter { + id: MemoryConsumerId, + schema: SchemaRef, + in_mem_batches: Mutex>, + spills: Mutex>, + /// Sort expressions + expr: Vec, + runtime: Arc, + metrics: AggregatedMetricsSet, + inner_metrics: BaselineMetrics, + used: AtomicUsize, + spilled_bytes: AtomicUsize, + spilled_count: AtomicUsize, +} + +impl ExternalSorter { + pub fn new( + partition_id: usize, + schema: SchemaRef, + expr: Vec, + metrics: AggregatedMetricsSet, + runtime: Arc, + ) -> Self { + let inner_metrics = metrics.new_intermediate_baseline(partition_id); + Self { + id: MemoryConsumerId::new(partition_id), + schema, + in_mem_batches: Mutex::new(vec![]), + spills: Mutex::new(vec![]), + expr, + runtime, + metrics, + inner_metrics, + used: AtomicUsize::new(0), + spilled_bytes: AtomicUsize::new(0), + spilled_count: AtomicUsize::new(0), + } + } + + async fn insert_batch(&self, input: RecordBatch) -> Result<()> { + if input.num_rows() > 0 { + let size = batch_byte_size(&input); + self.try_grow(size).await?; + self.used.fetch_add(size, Ordering::SeqCst); + // sort each batch as it's inserted, more probably to be cache-resident + let elapsed_compute = self.inner_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let sorted_batch = sort_batch(input, self.schema.clone(), &*self.expr)?; + timer.done(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + in_mem_batches.push(sorted_batch); + } + Ok(()) + } + + async fn spilled_before(&self) -> bool { + let spills = self.spills.lock().await; + !spills.is_empty() + } + + /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. + async fn sort(&self) -> Result { + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + + if self.spilled_before().await { + let baseline_metrics = self.metrics.new_intermediate_baseline(partition); + let mut streams: Vec = vec![]; + let in_mem_stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + self.runtime.batch_size(), + baseline_metrics, + self.runtime.clone(), + ) + .await?; + streams.push(SortedStream::new(in_mem_stream, self.used())); + + let mut spills = self.spills.lock().await; + + for spill in spills.drain(..) { + let stream = read_spill_as_stream(spill, self.schema.clone()).await?; + streams.push(SortedStream::new(stream, 0)); + } + let baseline_metrics = self.metrics.new_final_baseline(partition); + Ok(Box::pin( + SortPreservingMergeStream::new_from_streams( + streams, + self.schema.clone(), + &self.expr, + self.runtime.batch_size(), + baseline_metrics, + partition, + self.runtime.clone(), + ) + .await, + )) + } else { + let baseline_metrics = self.metrics.new_final_baseline(partition); + in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + self.runtime.batch_size(), + baseline_metrics, + self.runtime.clone(), + ) + .await + } + } + + fn used(&self) -> usize { + self.used.load(Ordering::SeqCst) + } + + fn spilled_bytes(&self) -> usize { + self.spilled_bytes.load(Ordering::SeqCst) + } + + fn spilled_count(&self) -> usize { + self.spilled_count.load(Ordering::SeqCst) + } +} + +impl Debug for ExternalSorter { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("ExternalSorter") + .field("id", &self.id()) + .field("memory_used", &self.used()) + .field("spilled_bytes", &self.spilled_bytes()) + .field("spilled_count", &self.spilled_count()) + .finish() + } +} + +#[async_trait] +impl MemoryConsumer for ExternalSorter { + fn name(&self) -> String { + "ExternalSorter".to_owned() + } + + fn id(&self) -> &MemoryConsumerId { + &self.id + } + + fn memory_manager(&self) -> Arc { + self.runtime.memory_manager.clone() + } + + fn type_(&self) -> &ConsumerType { + &ConsumerType::Requesting + } + + async fn spill(&self) -> Result { + info!( + "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", + self.name(), + self.id(), + self.used(), + self.spilled_count() + ); + + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + // we could always get a chance to free some memory as long as we are holding some + if in_mem_batches.len() == 0 { + return Ok(0); + } + + let baseline_metrics = self.metrics.new_intermediate_baseline(partition); + + let path = self.runtime.disk_manager.create_tmp_file()?; + let stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &*self.expr, + self.runtime.batch_size(), + baseline_metrics, + self.runtime.clone(), + ) + .await; + + let total_size = + spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) + .await?; + + let mut spills = self.spills.lock().await; + let used = self.used.swap(0, Ordering::SeqCst); + self.spilled_count.fetch_add(1, Ordering::SeqCst); + self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst); + spills.push(path); + Ok(used) + } + + fn mem_used(&self) -> usize { + self.used.load(Ordering::SeqCst) + } +} + +/// consume the `sorted_bathes` and do in_mem_sort +async fn in_mem_partial_sort( + sorted_bathes: &mut Vec, + schema: SchemaRef, + expressions: &[PhysicalSortExpr], + target_batch_size: usize, + baseline_metrics: BaselineMetrics, + runtime: Arc, +) -> Result { + if sorted_bathes.len() == 1 { + Ok(Box::pin(SizedRecordBatchStream::new( + schema, + vec![Arc::new(sorted_bathes.pop().unwrap())], + baseline_metrics, + ))) + } else { + let batches = sorted_bathes.drain(..).collect(); + assert_eq!(sorted_bathes.len(), 0); + Ok(Box::pin( + SortPreservingMergeStream::new_from_batches( + batches, + schema, + expressions, + target_batch_size, + baseline_metrics, + runtime, + ) + .await, + )) + } +} + +async fn spill_partial_sorted_stream( + in_mem_stream: &mut SendableRecordBatchStream, + path: String, + schema: SchemaRef, +) -> Result { + let (sender, receiver) = tokio::sync::mpsc::channel(2); + while let Some(item) = in_mem_stream.next().await { + sender.send(Some(item)).await.ok(); + } + sender.send(None).await.ok(); + let path_clone = path.clone(); + let res = + task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; + match res { + Ok(r) => r, + Err(e) => Err(DataFusionError::Execution(format!( + "Error occurred while spilling {}", + e + ))), + } +} -/// Sort execution plan +async fn read_spill_as_stream( + path: String, + schema: SchemaRef, +) -> Result { + let (sender, receiver): ( + TKSender>, + TKReceiver>, + ) = tokio::sync::mpsc::channel(2); + let path_clone = path.clone(); + let join_handle = task::spawn_blocking(move || { + if let Err(e) = read_spill(sender, path_clone) { + error!("Failure while reading spill file: {}. Error: {}", path, e); + } + }); + Ok(RecordBatchReceiverStream::create( + &schema, + receiver, + join_handle, + )) +} + +fn write_sorted( + mut receiver: TKReceiver>>, + path: String, + schema: SchemaRef, +) -> Result { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + while let Some(Some(batch)) = receiver.blocking_recv() { + writer.write(&batch?)?; + } + writer.finish()?; + info!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, writer.num_rows, writer.num_bytes + ); + Ok(writer.num_bytes as usize) +} + +fn read_spill(sender: TKSender>, path: String) -> Result<()> { + let file = BufReader::new(File::open(&path)?); + let reader = FileReader::try_new(file)?; + for batch in reader { + sender + .blocking_send(batch) + .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; + } + Ok(()) +} + +/// External Sort execution plan #[derive(Debug)] pub struct SortExec { /// Input schema input: Arc, /// Sort expressions expr: Vec, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, + /// Containing all metrics set created for sort, such as all sets for `sort_merge_join`s + all_metrics: AggregatedMetricsSet, /// Preserve partitions of input plan preserve_partitioning: bool, } +#[derive(Debug, Clone)] +struct AggregatedMetricsSet { + intermediate: Arc>>, + final_: Arc>>, +} + +impl AggregatedMetricsSet { + fn new() -> Self { + Self { + intermediate: Arc::new(std::sync::Mutex::new(vec![])), + final_: Arc::new(std::sync::Mutex::new(vec![])), + } + } + + fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { + let ms = ExecutionPlanMetricsSet::new(); + let result = BaselineMetrics::new(&ms, partition); + self.intermediate.lock().unwrap().push(ms); + result + } + + fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { + let ms = ExecutionPlanMetricsSet::new(); + let result = BaselineMetrics::new(&ms, partition); + self.final_.lock().unwrap().push(ms); + result + } + + fn merge_compute_time(&self, dest: &Time) { + let time1 = self + .intermediate + .lock() + .unwrap() + .iter() + .map(|es| { + es.clone_inner() + .elapsed_compute() + .map_or(0u64, |v| v as u64) + }) + .sum(); + let time2 = self + .final_ + .lock() + .unwrap() + .iter() + .map(|es| { + es.clone_inner() + .elapsed_compute() + .map_or(0u64, |v| v as u64) + }) + .sum(); + dest.add_duration(Duration::from_nanos(time1)); + dest.add_duration(Duration::from_nanos(time2)); + } + + fn merge_output_count(&self, dest: &Count) { + let count = self + .final_ + .lock() + .unwrap() + .iter() + .map(|es| es.clone_inner().output_rows().map_or(0, |v| v)) + .sum(); + dest.add(count); + } +} + impl SortExec { /// Create a new sort execution plan pub fn try_new( @@ -75,7 +484,7 @@ impl SortExec { Self { expr, input, - metrics: ExecutionPlanMetricsSet::new(), + all_metrics: AggregatedMetricsSet::new(), preserve_partitioning, } } @@ -93,7 +502,6 @@ impl SortExec { #[async_trait] impl ExecutionPlan for SortExec { - /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self } @@ -102,10 +510,6 @@ impl ExecutionPlan for SortExec { self.input.schema() } - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { if self.preserve_partitioning { @@ -123,6 +527,10 @@ impl ExecutionPlan for SortExec { } } + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + fn with_new_children( &self, children: Vec>, @@ -159,14 +567,25 @@ impl ExecutionPlan for SortExec { } } - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let input = self.input.execute(partition, runtime).await?; + let input = self.input.execute(partition, runtime.clone()).await?; - Ok(Box::pin(SortStream::new( + external_sort( input, + partition, self.expr.clone(), - baseline_metrics, - ))) + self.all_metrics.clone(), + runtime, + ) + .await + } + + fn metrics(&self) -> Option { + let metrics = ExecutionPlanMetricsSet::new(); + let baseline = BaselineMetrics::new(&metrics, 0); + self.all_metrics + .merge_compute_time(baseline.elapsed_compute()); + self.all_metrics.merge_output_count(baseline.output_rows()); + Some(metrics.clone_inner()) } fn fmt_as( @@ -182,16 +601,12 @@ impl ExecutionPlan for SortExec { } } - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - fn statistics(&self) -> Statistics { self.input.statistics() } } -pub(crate) fn sort_batch( +fn sort_batch( batch: RecordBatch, schema: SchemaRef, expr: &[PhysicalSortExpr], @@ -227,97 +642,38 @@ pub(crate) fn sort_batch( ) } -pin_project! { - /// stream for sort plan - struct SortStream { - #[pin] - output: futures::channel::oneshot::Receiver>>, - finished: bool, - schema: SchemaRef, - drop_helper: AbortOnDropSingle<()>, - } -} - -impl SortStream { - fn new( - input: SendableRecordBatchStream, - expr: Vec, - baseline_metrics: BaselineMetrics, - ) -> Self { - let (tx, rx) = futures::channel::oneshot::channel(); - let schema = input.schema(); - let join_handle = tokio::spawn(async move { - let schema = input.schema(); - let sorted_batch = common::collect(input) - .await - .map_err(DataFusionError::into_arrow_external_error) - .and_then(move |batches| { - let timer = baseline_metrics.elapsed_compute().timer(); - // combine all record batches into one for each column - let combined = common::combine_batches(&batches, schema.clone())?; - // sort combined record batch - let result = combined - .map(|batch| sort_batch(batch, schema, &expr)) - .transpose()? - .record_output(&baseline_metrics); - timer.done(); - Ok(result) - }); - - // failing here is OK, the receiver is gone and does not care about the result - tx.send(sorted_batch).ok(); - }); - - Self { - output: rx, - finished: false, - schema, - drop_helper: AbortOnDropSingle::new(join_handle), - } +async fn external_sort( + mut input: SendableRecordBatchStream, + partition_id: usize, + expr: Vec, + metrics: AggregatedMetricsSet, + runtime: Arc, +) -> Result { + let schema = input.schema(); + let sorter = Arc::new(ExternalSorter::new( + partition_id, + schema.clone(), + expr, + metrics, + runtime.clone(), + )); + runtime.register_consumer(&(sorter.clone() as Arc)); + + while let Some(batch) = input.next().await { + let batch = batch?; + sorter.insert_batch(batch).await?; } -} - -impl Stream for SortStream { - type Item = ArrowResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.finished { - return Poll::Ready(None); - } - - // is the output ready? - let this = self.project(); - let output_poll = this.output.poll(cx); - - match output_poll { - Poll::Ready(result) => { - *this.finished = true; - - // check for error in receiving channel and unwrap actual result - let result = match result { - Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving - Ok(result) => result.transpose(), - }; - - Poll::Ready(result) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl RecordBatchStream for SortStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } + let result = sorter.sort().await; + runtime.drop_consumer(sorter.id()); + result } #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; - use super::*; use crate::datasource::object_store::local::LocalFileSystem; + use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -325,17 +681,17 @@ mod tests { collect, file_format::{CsvExec, PhysicalPlanConfig}, }; + use crate::test; use crate::test::assert_is_pending; - use crate::test::exec::assert_strong_count_converges_to_zero; - use crate::test::{self, exec::BlockingExec}; + use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test_util; use arrow::array::*; + use arrow::compute::SortOptions; use arrow::datatypes::*; use futures::FutureExt; + use std::collections::{BTreeMap, HashMap}; - #[tokio::test] - async fn test_sort() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + async fn sort_with_runtime(runtime: Arc) -> Result> { let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = @@ -377,7 +733,42 @@ mod tests { Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), )?); - let result: Vec = collect(sort_exec, runtime).await?; + collect(sort_exec, runtime).await + } + + #[tokio::test] + async fn test_in_mem_sort() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let result = sort_with_runtime(runtime).await?; + + assert_eq!(result.len(), 1); + + let columns = result[0].columns(); + + let c1 = as_string_array(&columns[0]); + assert_eq!(c1.value(0), "a"); + assert_eq!(c1.value(c1.len() - 1), "e"); + + let c2 = as_primitive_array::(&columns[1]); + assert_eq!(c2.value(0), 1); + assert_eq!(c2.value(c2.len() - 1), 5,); + + let c7 = as_primitive_array::(&columns[6]); + assert_eq!(c7.value(0), 15); + assert_eq!(c7.value(c7.len() - 1), 254,); + + Ok(()) + } + + #[tokio::test] + async fn test_sort_spill() -> Result<()> { + let config = RuntimeConfig::new() + .with_memory_fraction(1.0) + // trigger spill there will be 4 batches with 5.5KB for each + .with_max_execution_memory(12288); + let runtime = Arc::new(RuntimeEnv::new(config)?); + let result = sort_with_runtime(runtime).await?; + assert_eq!(result.len(), 1); let columns = result[0].columns(); @@ -397,6 +788,35 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_multi_output_batch() -> Result<()> { + let config = RuntimeConfig::new().with_batch_size(26); + let runtime = Arc::new(RuntimeEnv::new(config)?); + let result = sort_with_runtime(runtime).await?; + + assert_eq!(result.len(), 4); + + let columns_b1 = result[0].columns(); + let columns_b3 = result[3].columns(); + + let c1 = as_string_array(&columns_b1[0]); + let c13 = as_string_array(&columns_b3[0]); + assert_eq!(c1.value(0), "a"); + assert_eq!(c13.value(c13.len() - 1), "e"); + + let c2 = as_primitive_array::(&columns_b1[1]); + let c23 = as_primitive_array::(&columns_b3[1]); + assert_eq!(c2.value(0), 1); + assert_eq!(c23.value(c23.len() - 1), 5,); + + let c7 = as_primitive_array::(&columns_b1[6]); + let c73 = as_primitive_array::(&columns_b3[6]); + assert_eq!(c7.value(0), 15); + assert_eq!(c73.value(c73.len() - 1), 254,); + + Ok(()) + } + #[tokio::test] async fn test_sort_metadata() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/tests/sql/joins.rs b/datafusion/tests/sql/joins.rs index 1613463550f0..47907f740636 100644 --- a/datafusion/tests/sql/joins.rs +++ b/datafusion/tests/sql/joins.rs @@ -418,32 +418,32 @@ async fn cross_join_unbalanced() { // the order of the values is not determinisitic, so we need to sort to check the values let sql = - "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, t1_name"; + "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, t1_name, t2_name"; let actual = execute_to_batches(&mut ctx, sql).await; let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", "+-------+---------+---------+", - "| 11 | a | z |", - "| 11 | a | y |", - "| 11 | a | x |", "| 11 | a | w |", - "| 22 | b | z |", - "| 22 | b | y |", - "| 22 | b | x |", + "| 11 | a | x |", + "| 11 | a | y |", + "| 11 | a | z |", "| 22 | b | w |", - "| 33 | c | z |", - "| 33 | c | y |", - "| 33 | c | x |", + "| 22 | b | x |", + "| 22 | b | y |", + "| 22 | b | z |", "| 33 | c | w |", - "| 44 | d | z |", - "| 44 | d | y |", - "| 44 | d | x |", + "| 33 | c | x |", + "| 33 | c | y |", + "| 33 | c | z |", "| 44 | d | w |", - "| 77 | e | z |", - "| 77 | e | y |", - "| 77 | e | x |", + "| 44 | d | x |", + "| 44 | d | y |", + "| 44 | d | z |", "| 77 | e | w |", + "| 77 | e | x |", + "| 77 | e | y |", + "| 77 | e | z |", "+-------+---------+---------+", ]; assert_batches_eq!(expected, &actual); From 58817bbaf11cf38d45b30847942eeb641d5fadef Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 18 Jan 2022 15:41:21 +0800 Subject: [PATCH 4/8] Fix different batch_size setting in SPMS test --- .../src/physical_plan/sorts/sort_preserving_merge.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 42dfe01e93f2..9c1544f2ab60 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -1127,8 +1127,6 @@ mod tests { #[tokio::test] async fn test_partition_sort_streaming_input_output() { - let runtime = - Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(23)).unwrap()); let schema = test_util::aggr_test_schema(); let sort = vec![ @@ -1144,12 +1142,15 @@ mod tests { }, ]; + let runtime = Arc::new(RuntimeEnv::default()); let input = sorted_partitioned_input(sort.clone(), &[10, 5, 13], runtime.clone()).await; - let basic = basic_sort(input.clone(), sort.clone(), runtime.clone()).await; + let basic = basic_sort(input.clone(), sort.clone(), runtime).await; + let runtime_bs_23 = + Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(23)).unwrap()); let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); - let merged = collect(merge, runtime.clone()).await.unwrap(); + let merged = collect(merge, runtime_bs_23).await.unwrap(); assert_eq!(merged.len(), 14); From dd9886dc8fe293e46a864a1576da4fe6df66061e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 19 Jan 2022 17:22:17 +0800 Subject: [PATCH 5/8] Change to use combine and sort for in memory N-way merge --- .../src/physical_plan/sorts/external_sort.rs | 657 ------------------ datafusion/src/physical_plan/sorts/mod.rs | 7 - datafusion/src/physical_plan/sorts/sort.rs | 153 ++-- .../sorts/sort_preserving_merge.rs | 61 +- 4 files changed, 57 insertions(+), 821 deletions(-) delete mode 100644 datafusion/src/physical_plan/sorts/external_sort.rs diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs deleted file mode 100644 index 6c60aac398b4..000000000000 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ /dev/null @@ -1,657 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines the External-Sort plan - -use crate::error::{DataFusionError, Result}; -use crate::execution::memory_manager::{ - ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, -}; -use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; -use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, -}; -use crate::physical_plan::sorts::in_mem_sort::InMemSortStream; -use crate::physical_plan::sorts::sort::sort_batch; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; -use crate::physical_plan::sorts::SortedStream; -use crate::physical_plan::stream::RecordBatchReceiverStream; -use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, -}; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; -use arrow::ipc::reader::FileReader; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use futures::lock::Mutex; -use futures::StreamExt; -use log::{error, info}; -use std::any::Any; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::fs::File; -use std::io::BufReader; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; -use tokio::task; - -/// Sort arbitrary size of data to get an total order (may spill several times during sorting based on free memory available). -/// -/// The basic architecture of the algorithm: -/// -/// let spills = vec![]; -/// let in_mem_batches = vec![]; -/// while (input.has_next()) { -/// let batch = input.next(); -/// // no enough memory available, spill first. -/// if exec_memory_available < size_of(batch) { -/// let ordered_stream = in_mem_heap_sort(in_mem_batches.drain(..)); -/// let tmp_file = spill_write(ordered_stream); -/// spills.push(tmp_file); -/// } -/// // sort the batch while it's probably still in cache and buffer it. -/// let sorted = sort_by_key(batch); -/// in_mem_batches.push(sorted); -/// } -/// -/// let partial_ordered_streams = vec![]; -/// let in_mem_stream = in_mem_heap_sort(in_mem_batches.drain(..)); -/// partial_ordered_streams.push(in_mem_stream); -/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream)); -/// let result = sort_preserving_merge(partial_ordered_streams); -struct ExternalSorter { - id: MemoryConsumerId, - schema: SchemaRef, - in_mem_batches: Mutex>, - spills: Mutex>, - /// Sort expressions - expr: Vec, - runtime: Arc, - metrics: ExecutionPlanMetricsSet, - used: AtomicUsize, - spilled_bytes: AtomicUsize, - spilled_count: AtomicUsize, -} - -impl ExternalSorter { - pub fn new( - partition_id: usize, - schema: SchemaRef, - expr: Vec, - runtime: Arc, - ) -> Self { - Self { - id: MemoryConsumerId::new(partition_id), - schema, - in_mem_batches: Mutex::new(vec![]), - spills: Mutex::new(vec![]), - expr, - runtime, - metrics: ExecutionPlanMetricsSet::new(), - used: AtomicUsize::new(0), - spilled_bytes: AtomicUsize::new(0), - spilled_count: AtomicUsize::new(0), - } - } - - async fn insert_batch(&self, input: RecordBatch) -> Result<()> { - let size = batch_byte_size(&input); - self.try_grow(size).await?; - self.used.fetch_add(size, Ordering::SeqCst); - // sort each batch as it's inserted, more probably to be cache-resident - let sorted_batch = sort_batch(input, self.schema.clone(), &*self.expr)?; - let mut in_mem_batches = self.in_mem_batches.lock().await; - in_mem_batches.push(sorted_batch); - Ok(()) - } - - /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - async fn sort(&self) -> Result { - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let mut streams: Vec = vec![]; - let in_mem_stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - self.runtime.batch_size(), - baseline_metrics, - ) - .await?; - streams.push(SortedStream::new(in_mem_stream, self.used())); - - let mut spills = self.spills.lock().await; - - for spill in spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone()).await?; - streams.push(SortedStream::new(stream, 0)); - } - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - - Ok(Box::pin( - SortPreservingMergeStream::new_from_stream( - streams, - self.schema.clone(), - &self.expr, - baseline_metrics, - partition, - self.runtime.clone(), - ) - .await, - )) - } - - fn used(&self) -> usize { - self.used.load(Ordering::SeqCst) - } - - fn spilled_bytes(&self) -> usize { - self.spilled_bytes.load(Ordering::SeqCst) - } - - fn spilled_count(&self) -> usize { - self.spilled_count.load(Ordering::SeqCst) - } -} - -impl Debug for ExternalSorter { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter") - .field("id", &self.id()) - .field("memory_used", &self.used()) - .field("spilled_bytes", &self.spilled_bytes()) - .field("spilled_count", &self.spilled_count()) - .finish() - } -} - -#[async_trait] -impl MemoryConsumer for ExternalSorter { - fn name(&self) -> String { - "ExternalSorter".to_owned() - } - - fn id(&self) -> &MemoryConsumerId { - &self.id - } - - fn memory_manager(&self) -> Arc { - self.runtime.memory_manager.clone() - } - - fn type_(&self) -> &ConsumerType { - &ConsumerType::Requesting - } - - async fn spill(&self) -> Result { - info!( - "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", - self.name(), - self.id(), - self.used(), - self.spilled_count() - ); - - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - // we could always get a chance to free some memory as long as we are holding some - if in_mem_batches.len() == 0 { - return Ok(0); - } - - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - - let path = self.runtime.disk_manager.create_tmp_file()?; - let stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &*self.expr, - self.runtime.batch_size(), - baseline_metrics, - ) - .await; - - let total_size = - spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) - .await?; - - let mut spills = self.spills.lock().await; - let used = self.used.swap(0, Ordering::SeqCst); - self.spilled_count.fetch_add(1, Ordering::SeqCst); - self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst); - spills.push(path); - Ok(used) - } - - fn mem_used(&self) -> usize { - self.used.load(Ordering::SeqCst) - } -} - -/// consume the `sorted_bathes` and do in_mem_sort -async fn in_mem_partial_sort( - sorted_bathes: &mut Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - target_batch_size: usize, - baseline_metrics: BaselineMetrics, -) -> Result { - if sorted_bathes.len() == 1 { - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(sorted_bathes.pop().unwrap())], - ))) - } else { - let new = sorted_bathes.drain(..).collect(); - assert_eq!(sorted_bathes.len(), 0); - Ok(Box::pin(InMemSortStream::new( - new, - schema, - expressions, - target_batch_size, - baseline_metrics, - )?)) - } -} - -async fn spill_partial_sorted_stream( - in_mem_stream: &mut SendableRecordBatchStream, - path: String, - schema: SchemaRef, -) -> Result { - let (sender, receiver) = tokio::sync::mpsc::channel(2); - while let Some(item) = in_mem_stream.next().await { - sender.send(Some(item)).await.ok(); - } - sender.send(None).await.ok(); - let path_clone = path.clone(); - let res = - task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; - match res { - Ok(r) => r, - Err(e) => Err(DataFusionError::Execution(format!( - "Error occurred while spilling {}", - e - ))), - } -} - -async fn read_spill_as_stream( - path: String, - schema: SchemaRef, -) -> Result { - let (sender, receiver): ( - TKSender>, - TKReceiver>, - ) = tokio::sync::mpsc::channel(2); - let path_clone = path.clone(); - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_spill(sender, path_clone) { - error!("Failure while reading spill file: {}. Error: {}", path, e); - } - }); - Ok(RecordBatchReceiverStream::create( - &schema, - receiver, - join_handle, - )) -} - -fn write_sorted( - mut receiver: TKReceiver>>, - path: String, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - while let Some(Some(batch)) = receiver.blocking_recv() { - writer.write(&batch?)?; - } - writer.finish()?; - info!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, writer.num_rows, writer.num_bytes - ); - Ok(writer.num_bytes as usize) -} - -fn read_spill(sender: TKSender>, path: String) -> Result<()> { - let file = BufReader::new(File::open(&path)?); - let reader = FileReader::try_new(file)?; - for batch in reader { - sender - .blocking_send(batch) - .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; - } - Ok(()) -} - -/// External Sort execution plan -#[derive(Debug)] -pub struct ExternalSortExec { - /// Input schema - input: Arc, - /// Sort expressions - expr: Vec, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, - /// Preserve partitions of input plan - preserve_partitioning: bool, -} - -impl ExternalSortExec { - /// Create a new sort execution plan - pub fn try_new( - expr: Vec, - input: Arc, - ) -> Result { - Ok(Self::new_with_partitioning(expr, input, false)) - } - - /// Create a new sort execution plan with the option to preserve - /// the partitioning of the input plan - pub fn new_with_partitioning( - expr: Vec, - input: Arc, - preserve_partitioning: bool, - ) -> Self { - Self { - expr, - input, - metrics: ExecutionPlanMetricsSet::new(), - preserve_partitioning, - } - } - - /// Input schema - pub fn input(&self) -> &Arc { - &self.input - } - - /// Sort expressions - pub fn expr(&self) -> &[PhysicalSortExpr] { - &self.expr - } -} - -#[async_trait] -impl ExecutionPlan for ExternalSortExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.input.schema() - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - if self.preserve_partitioning { - self.input.output_partitioning() - } else { - Partitioning::UnknownPartitioning(1) - } - } - - fn required_child_distribution(&self) -> Distribution { - if self.preserve_partitioning { - Distribution::UnspecifiedDistribution - } else { - Distribution::SinglePartition - } - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - fn with_new_children( - &self, - children: Vec>, - ) -> Result> { - match children.len() { - 1 => Ok(Arc::new(ExternalSortExec::try_new( - self.expr.clone(), - children[0].clone(), - )?)), - _ => Err(DataFusionError::Internal( - "SortExec wrong number of children".to_string(), - )), - } - } - - async fn execute( - &self, - partition: usize, - runtime: Arc, - ) -> Result { - if !self.preserve_partitioning { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "SortExec invalid partition {}", - partition - ))); - } - - // sort needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "SortExec requires a single input partition".to_owned(), - )); - } - } - - let _baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let input = self.input.execute(partition, runtime.clone()).await?; - - external_sort(input, partition, self.expr.clone(), runtime).await - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - write!(f, "SortExec: [{}]", expr.join(",")) - } - } - } - - fn statistics(&self) -> Statistics { - self.input.statistics() - } -} - -async fn external_sort( - mut input: SendableRecordBatchStream, - partition_id: usize, - expr: Vec, - runtime: Arc, -) -> Result { - let schema = input.schema(); - let sorter = Arc::new(ExternalSorter::new( - partition_id, - schema.clone(), - expr, - runtime.clone(), - )); - runtime.register_consumer(&(sorter.clone() as Arc)); - - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; - } - - let result = sorter.sort().await; - runtime.drop_consumer(sorter.id()); - result -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::datasource::object_store::local::LocalFileSystem; - use crate::execution::runtime_env::RuntimeConfig; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::physical_plan::expressions::col; - use crate::physical_plan::{ - collect, - file_format::{CsvExec, FileScanConfig}, - }; - use crate::test; - use crate::test_util; - use arrow::array::*; - use arrow::compute::SortOptions; - use arrow::datatypes::*; - - async fn sort_with_runtime(runtime: Arc) -> Result> { - let schema = test_util::aggr_test_schema(); - let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let sort_exec = Arc::new(ExternalSortExec::try_new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), - )?); - - collect(sort_exec, runtime).await - } - - #[tokio::test] - async fn test_in_mem_sort() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_sort_spill() -> Result<()> { - let config = RuntimeConfig::new() - .with_memory_fraction(1.0) - // trigger spill there will be 4 batches with 5.5KB for each - .with_max_execution_memory(12288); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_multi_output_batch() -> Result<()> { - let config = RuntimeConfig::new().with_batch_size(26); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 4); - - let columns_b1 = result[0].columns(); - let columns_b3 = result[3].columns(); - - let c1 = as_string_array(&columns_b1[0]); - let c13 = as_string_array(&columns_b3[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c13.value(c13.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns_b1[1]); - let c23 = as_primitive_array::(&columns_b3[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c23.value(c23.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns_b1[6]); - let c73 = as_primitive_array::(&columns_b3[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c73.value(c73.len() - 1), 254,); - - Ok(()) - } -} diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index e7caf53309e7..1bb880f855ac 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -19,7 +19,6 @@ use crate::error; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::common::batch_byte_size; use crate::physical_plan::{PhysicalExpr, SendableRecordBatchStream}; use arrow::array::{ArrayRef, DynComparator}; use arrow::compute::SortOptions; @@ -252,14 +251,12 @@ impl SortedStream { enum StreamWrapper { Receiver(mpsc::Receiver>), Stream(Option), - SingleBatch(Option), } impl StreamWrapper { fn mem_used(&self) -> usize { match &self { StreamWrapper::Stream(Some(s)) => s.mem_used, - StreamWrapper::SingleBatch(Some(b)) => batch_byte_size(b), _ => 0, } } @@ -287,9 +284,6 @@ impl Stream for StreamWrapper { Poll::Pending => Poll::Pending, } } - StreamWrapper::SingleBatch(ref mut ob) => { - Poll::Ready(Ok(ob.take()).transpose()) - } } } } @@ -299,7 +293,6 @@ impl FusedStream for StreamWrapper { match self { StreamWrapper::Receiver(receiver) => receiver.is_terminated(), StreamWrapper::Stream(stream) => stream.is_none(), - StreamWrapper::SingleBatch(b) => b.is_none(), } } } diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 7569afe9752d..1d70b51d6698 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -33,8 +33,8 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStrea use crate::physical_plan::sorts::SortedStream; use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, + common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, + Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; @@ -58,30 +58,15 @@ use std::time::Duration; use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; use tokio::task; -/// Sort arbitrary size of data to get an total order (may spill several times during sorting based on free memory available). +/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). /// /// The basic architecture of the algorithm: -/// -/// let spills = vec![]; -/// let in_mem_batches = vec![]; -/// while (input.has_next()) { -/// let batch = input.next(); -/// // no enough memory available, spill first. -/// if exec_memory_available < size_of(batch) { -/// let ordered_stream = sort_preserving_merge(in_mem_batches.drain(..)); -/// let tmp_file = spill_write(ordered_stream); -/// spills.push(tmp_file); -/// } -/// // sort the batch while it's probably still in cache and buffer it. -/// let sorted = sort_by_key(batch); -/// in_mem_batches.push(sorted); -/// } -/// -/// let partial_ordered_streams = vec![]; -/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..)); -/// partial_ordered_streams.push(in_mem_stream); -/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream)); -/// let result = sort_preserving_merge(partial_ordered_streams); +/// 1. get a non-empty new batch from input +/// 2. check with the memory manager if we could buffer the batch in memory +/// 2.1 if memory sufficient, then buffer batch in memory, go to 1. +/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. +/// buffer the batch in memory, go to 1. +/// 3. when input is exhausted, merge all in memory batches and spills to get a total order. struct ExternalSorter { id: MemoryConsumerId, schema: SchemaRef, @@ -91,7 +76,6 @@ struct ExternalSorter { expr: Vec, runtime: Arc, metrics: AggregatedMetricsSet, - inner_metrics: BaselineMetrics, used: AtomicUsize, spilled_bytes: AtomicUsize, spilled_count: AtomicUsize, @@ -105,7 +89,6 @@ impl ExternalSorter { metrics: AggregatedMetricsSet, runtime: Arc, ) -> Self { - let inner_metrics = metrics.new_intermediate_baseline(partition_id); Self { id: MemoryConsumerId::new(partition_id), schema, @@ -114,7 +97,6 @@ impl ExternalSorter { expr, runtime, metrics, - inner_metrics, used: AtomicUsize::new(0), spilled_bytes: AtomicUsize::new(0), spilled_count: AtomicUsize::new(0), @@ -126,13 +108,8 @@ impl ExternalSorter { let size = batch_byte_size(&input); self.try_grow(size).await?; self.used.fetch_add(size, Ordering::SeqCst); - // sort each batch as it's inserted, more probably to be cache-resident - let elapsed_compute = self.inner_metrics.elapsed_compute().clone(); - let timer = elapsed_compute.timer(); - let sorted_batch = sort_batch(input, self.schema.clone(), &*self.expr)?; - timer.done(); let mut in_mem_batches = self.in_mem_batches.lock().await; - in_mem_batches.push(sorted_batch); + in_mem_batches.push(input); } Ok(()) } @@ -150,15 +127,16 @@ impl ExternalSorter { if self.spilled_before().await { let baseline_metrics = self.metrics.new_intermediate_baseline(partition); let mut streams: Vec = vec![]; - let in_mem_stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - baseline_metrics, - self.runtime.clone(), - ) - .await?; - streams.push(SortedStream::new(in_mem_stream, self.used())); + if in_mem_batches.len() > 0 { + let in_mem_stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + baseline_metrics, + ) + .await?; + streams.push(SortedStream::new(in_mem_stream, self.used())); + } let mut spills = self.spills.lock().await; @@ -178,16 +156,17 @@ impl ExternalSorter { ) .await, )) - } else { + } else if in_mem_batches.len() > 0 { let baseline_metrics = self.metrics.new_final_baseline(partition); in_mem_partial_sort( &mut *in_mem_batches, self.schema.clone(), &self.expr, baseline_metrics, - self.runtime.clone(), ) .await + } else { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } } @@ -257,7 +236,6 @@ impl MemoryConsumer for ExternalSorter { self.schema.clone(), &*self.expr, baseline_metrics, - self.runtime.clone(), ) .await; @@ -278,34 +256,38 @@ impl MemoryConsumer for ExternalSorter { } } -/// consume the `sorted_bathes` and do in_mem_sort +/// consume the non-empty `sorted_bathes` and do in_mem_sort async fn in_mem_partial_sort( - sorted_bathes: &mut Vec, + buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], baseline_metrics: BaselineMetrics, - runtime: Arc, ) -> Result { - if sorted_bathes.len() == 1 { - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(sorted_bathes.pop().unwrap())], - baseline_metrics, - ))) - } else { - let batches = sorted_bathes.drain(..).collect(); - assert_eq!(sorted_bathes.len(), 0); - Ok(Box::pin( - SortPreservingMergeStream::new_from_batches( - batches, - schema, - expressions, - baseline_metrics, - runtime, - ) - .await, - )) - } + assert_ne!(buffered_batches.len(), 0); + + let result = { + // NB timer records time taken on drop, so there are no + // calls to `timer.done()` below. + let _timer = baseline_metrics.elapsed_compute().timer(); + + let pre_sort = if buffered_batches.len() == 1 { + buffered_batches.pop() + } else { + let batches = buffered_batches.drain(..).collect::>(); + // combine all record batches into one for each column + common::combine_batches(&batches, schema.clone())? + }; + + pre_sort + .map(|batch| sort_batch(batch, schema.clone(), expressions)) + .transpose()? + }; + + Ok(Box::pin(SizedRecordBatchStream::new( + schema, + vec![Arc::new(result.unwrap())], + baseline_metrics, + ))) } async fn spill_partial_sorted_stream( @@ -386,7 +368,7 @@ pub struct SortExec { input: Arc, /// Sort expressions expr: Vec, - /// Containing all metrics set created for sort, such as all sets for `sort_merge_join`s + /// Containing all metrics set created during sort all_metrics: AggregatedMetricsSet, /// Preserve partitions of input plan preserve_partitioning: bool, @@ -563,7 +545,7 @@ impl ExecutionPlan for SortExec { let input = self.input.execute(partition, runtime.clone()).await?; - external_sort( + do_sort( input, partition, self.expr.clone(), @@ -636,7 +618,7 @@ fn sort_batch( ) } -async fn external_sort( +async fn do_sort( mut input: SendableRecordBatchStream, partition_id: usize, expr: Vec, @@ -781,35 +763,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_multi_output_batch() -> Result<()> { - let config = RuntimeConfig::new().with_batch_size(26); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let result = sort_with_runtime(runtime).await?; - - assert_eq!(result.len(), 4); - - let columns_b1 = result[0].columns(); - let columns_b3 = result[3].columns(); - - let c1 = as_string_array(&columns_b1[0]); - let c13 = as_string_array(&columns_b3[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c13.value(c13.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns_b1[1]); - let c23 = as_primitive_array::(&columns_b3[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c23.value(c23.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns_b1[6]); - let c73 = as_primitive_array::(&columns_b3[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c73.value(c73.len() - 1), 254,); - - Ok(()) - } - #[tokio::test] async fn test_sort_metadata() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 9c1544f2ab60..1564640cb87e 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -388,44 +388,6 @@ impl SortPreservingMergeStream { } } - pub(crate) async fn new_from_batches( - batches: Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - baseline_metrics: BaselineMetrics, - runtime: Arc, - ) -> Self { - let batch_count = batches.len(); - let wrappers = batches - .into_iter() - .map(|s| StreamWrapper::SingleBatch(Some(s))) - .collect(); - - let cursors = (0..batch_count) - .into_iter() - .map(|_| VecDeque::new()) - .collect(); - - // We are not registering this into runtime since it's only used within - // an external sorter, already a registered memory consumer. - let streams = Arc::new(MergingStreams::new(0, wrappers, runtime.clone())); - - SortPreservingMergeStream { - schema, - cursors, - streams, - _drop_helper: AbortOnDropMany(vec![]), - column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), - sort_options: Arc::new(expressions.iter().map(|x| x.options).collect()), - baseline_metrics, - aborted: false, - in_progress: vec![], - next_batch_index: 0, - min_heap: BinaryHeap::with_capacity(batch_count), - runtime, - } - } - /// If the stream at the given index is not exhausted, and the last cursor for the /// stream is finished, poll the stream for the next RecordBatch and create a new /// cursor for the stream from the returned result @@ -480,15 +442,6 @@ impl SortPreservingMergeStream { Poll::Ready(Ok(())) } - /// Returns the cursor of the next stream to pull a row from, or None - /// if all cursors for all streams are exhausted - fn next_cursor(&mut self) -> Result>> { - match self.min_heap.pop() { - None => Ok(None), - Some(cursor) => Ok(Some(cursor)), - } - } - /// Drains the in_progress row indexes, and builds a new RecordBatch from them /// /// Will then drop any cursors for which all rows have been yielded to the output @@ -620,8 +573,8 @@ impl SortPreservingMergeStream { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - match self.next_cursor() { - Ok(Some(cursor)) => { + match self.min_heap.pop() { + Some(cursor) => { let stream_idx = cursor.stream_idx; let cursor_idx = self.cursors[stream_idx].len() - 1; let row_idx = cursor.advance(); @@ -656,14 +609,8 @@ impl SortPreservingMergeStream { } } } - Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None), - Ok(None) => return Poll::Ready(Some(self.build_record_batch())), - Err(e) => { - self.aborted = true; - return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new( - e, - ))))); - } + None if self.in_progress.is_empty() => return Poll::Ready(None), + None => return Poll::Ready(Some(self.build_record_batch())), } } } From d28b96c613028b63624e2018dfd3cf2cc27a5251 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 19 Jan 2022 19:05:28 +0800 Subject: [PATCH 6/8] Resolve comments on async and doc --- datafusion/src/physical_plan/sorts/sort.rs | 40 ++++++++++--------- .../sorts/sort_preserving_merge.rs | 28 ++++++------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 1d70b51d6698..efc974d7d99e 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -133,29 +133,25 @@ impl ExternalSorter { self.schema.clone(), &self.expr, baseline_metrics, - ) - .await?; + )?; streams.push(SortedStream::new(in_mem_stream, self.used())); } let mut spills = self.spills.lock().await; for spill in spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone()).await?; + let stream = read_spill_as_stream(spill, self.schema.clone())?; streams.push(SortedStream::new(stream, 0)); } let baseline_metrics = self.metrics.new_final_baseline(partition); - Ok(Box::pin( - SortPreservingMergeStream::new_from_streams( - streams, - self.schema.clone(), - &self.expr, - baseline_metrics, - partition, - self.runtime.clone(), - ) - .await, - )) + Ok(Box::pin(SortPreservingMergeStream::new_from_streams( + streams, + self.schema.clone(), + &self.expr, + baseline_metrics, + partition, + self.runtime.clone(), + ))) } else if in_mem_batches.len() > 0 { let baseline_metrics = self.metrics.new_final_baseline(partition); in_mem_partial_sort( @@ -164,7 +160,6 @@ impl ExternalSorter { &self.expr, baseline_metrics, ) - .await } else { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } @@ -236,8 +231,7 @@ impl MemoryConsumer for ExternalSorter { self.schema.clone(), &*self.expr, baseline_metrics, - ) - .await; + ); let total_size = spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) @@ -257,7 +251,7 @@ impl MemoryConsumer for ExternalSorter { } /// consume the non-empty `sorted_bathes` and do in_mem_sort -async fn in_mem_partial_sort( +fn in_mem_partial_sort( buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], @@ -312,7 +306,7 @@ async fn spill_partial_sorted_stream( } } -async fn read_spill_as_stream( +fn read_spill_as_stream( path: String, schema: SchemaRef, ) -> Result { @@ -375,6 +369,12 @@ pub struct SortExec { } #[derive(Debug, Clone)] +/// Aggregates all metrics during a complex operation, which is composed of multiple stages and +/// each stage reports its statistics separately. +/// Give sort as an example, when the dataset is more significant than available memory, it will report +/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`. +/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation), +/// and which are intermediate metrics that we only account for elapsed_compute time. struct AggregatedMetricsSet { intermediate: Arc>>, final_: Arc>>, @@ -402,6 +402,7 @@ impl AggregatedMetricsSet { result } + /// We should accumulate all times from all stages' reports for the total time consumption. fn merge_compute_time(&self, dest: &Time) { let time1 = self .intermediate @@ -429,6 +430,7 @@ impl AggregatedMetricsSet { dest.add_duration(Duration::from_nanos(time2)); } + /// We should only care about output from the final stage metrics. fn merge_output_count(&self, dest: &Count) { let count = self .final_ diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 1564640cb87e..189a9fb336d6 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -167,18 +167,15 @@ impl ExecutionPlan for SortPreservingMergeExec { }) .unzip(); - Ok(Box::pin( - SortPreservingMergeStream::new_from_receivers( - receivers, - AbortOnDropMany(join_handles), - self.schema(), - &self.expr, - baseline_metrics, - partition, - runtime.clone(), - ) - .await, - )) + Ok(Box::pin(SortPreservingMergeStream::new_from_receivers( + receivers, + AbortOnDropMany(join_handles), + self.schema(), + &self.expr, + baseline_metrics, + partition, + runtime, + ))) } } } @@ -318,7 +315,7 @@ impl Drop for SortPreservingMergeStream { impl SortPreservingMergeStream { #[allow(clippy::too_many_arguments)] - pub(crate) async fn new_from_receivers( + pub(crate) fn new_from_receivers( receivers: Vec>>, _drop_helper: AbortOnDropMany<()>, schema: SchemaRef, @@ -352,7 +349,7 @@ impl SortPreservingMergeStream { } } - pub(crate) async fn new_from_streams( + pub(crate) fn new_from_streams( streams: Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], @@ -1237,8 +1234,7 @@ mod tests { baseline_metrics, 0, runtime.clone(), - ) - .await; + ); let mut merged = common::collect(Box::pin(merge_stream)).await.unwrap(); From 5b35c2ade790d356c687382f050c67344df7096f Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 20 Jan 2022 00:48:29 +0800 Subject: [PATCH 7/8] Update sort to avoid deadlock during spilling --- datafusion/src/physical_plan/sorts/sort.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index efc974d7d99e..7c020198f8d8 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -290,13 +290,13 @@ async fn spill_partial_sorted_stream( schema: SchemaRef, ) -> Result { let (sender, receiver) = tokio::sync::mpsc::channel(2); + let path_clone = path.clone(); + let res = + task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; while let Some(item) = in_mem_stream.next().await { sender.send(Some(item)).await.ok(); } sender.send(None).await.ok(); - let path_clone = path.clone(); - let res = - task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; match res { Ok(r) => r, Err(e) => Err(DataFusionError::Execution(format!( From 6a458c67b5d11cf9dfa09281289c4d2da9484a2d Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 20 Jan 2022 11:56:42 +0800 Subject: [PATCH 8/8] Fix spill hanging --- datafusion/src/physical_plan/sorts/sort.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 7c020198f8d8..c3a138e373c3 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -291,13 +291,12 @@ async fn spill_partial_sorted_stream( ) -> Result { let (sender, receiver) = tokio::sync::mpsc::channel(2); let path_clone = path.clone(); - let res = - task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)).await; + let handle = task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)); while let Some(item) = in_mem_stream.next().await { - sender.send(Some(item)).await.ok(); + sender.send(item).await.ok(); } - sender.send(None).await.ok(); - match res { + drop(sender); + match handle.await { Ok(r) => r, Err(e) => Err(DataFusionError::Execution(format!( "Error occurred while spilling {}", @@ -328,12 +327,12 @@ fn read_spill_as_stream( } fn write_sorted( - mut receiver: TKReceiver>>, + mut receiver: TKReceiver>, path: String, schema: SchemaRef, ) -> Result { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - while let Some(Some(batch)) = receiver.blocking_recv() { + while let Some(batch) = receiver.blocking_recv() { writer.write(&batch?)?; } writer.finish()?;