diff --git a/crates/polars-stream/src/nodes/parquet_source.rs b/crates/polars-stream/src/nodes/parquet_source.rs index 55e6e97e578d..a55a569c7697 100644 --- a/crates/polars-stream/src/nodes/parquet_source.rs +++ b/crates/polars-stream/src/nodes/parquet_source.rs @@ -37,7 +37,6 @@ use crate::async_executor::{self}; use crate::async_primitives::connector::connector; use crate::async_primitives::wait_group::{WaitGroup, WaitToken}; use crate::morsel::get_ideal_morsel_size; -use crate::utils::notify_channel::{notify_channel, NotifyReceiver}; use crate::utils::task_handles_ext; type AsyncTaskData = Option<( @@ -367,8 +366,6 @@ impl ParquetSourceNode { assert_eq!(self.physical_predicate.is_some(), self.predicate.is_some()); let predicate = self.physical_predicate.clone(); let memory_prefetch_func = self.memory_prefetch_func; - let (start_tx, start_rx) = tokio::sync::oneshot::channel(); - self.morsel_stream_starter = Some(start_tx); let mut row_group_data_fetcher = RowGroupDataFetcher { metadata_rx, @@ -397,23 +394,6 @@ impl ParquetSourceNode { // that under heavy CPU load scenarios the I/O throughput drops due to this task not being // scheduled we can change it to be a high priority task. let morsel_stream_task_handle = async_executor::spawn(TaskPriority::Low, async move { - if start_rx.await.is_err() { - drop(row_group_data_fetcher); - return metadata_task_handle.await.unwrap(); - } - - if verbose { - eprintln!("[ParquetSource]: Starting row group data fetch") - } - - // We must `recv()` from the `NotifyReceiver` before awaiting on the - // `normalized_slice_oneshot_rx`, as in the negative offset case the slice resolution - // only runs after the first notify. - if !row_group_data_fetcher.init_next_file_state().await { - drop(row_group_data_fetcher); - return metadata_task_handle.await.unwrap(); - }; - let slice_range = { let Ok(slice) = normalized_slice_oneshot_rx.await else { // If we are here then the producer probably errored. @@ -518,16 +498,16 @@ impl ParquetSourceNode { break; }, Err(SendError::Closed(v)) => { - // The port assigned to this wait group has been closed, so we will not + // The channel assigned to this wait group has been closed, so we will not // add it back to the list of wait groups, and we will try to send this - // across another port. + // across another channel. df = v.0 }, Err(SendError::Full(_)) => unreachable!(), } let Some(v) = wait_groups.next().await else { - // All ports have closed + // All channels have closed break 'main; }; @@ -553,10 +533,16 @@ impl ParquetSourceNode { /// we can find a way to re-use it. #[allow(clippy::type_complexity)] fn init_metadata_fetcher( - &self, + &mut self, ) -> ( tokio::sync::oneshot::Receiver>, - NotifyReceiver<(usize, usize, Arc, FileMetaData, usize)>, + crate::async_primitives::connector::Receiver<( + usize, + usize, + Arc, + FileMetaData, + usize, + )>, task_handles_ext::AbortOnDropHandle>, ) { let verbose = self.verbose; @@ -572,7 +558,7 @@ impl ParquetSourceNode { let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) = tokio::sync::oneshot::channel(); - let (metadata_tx, mut metadata_notify_rx, metadata_rx) = notify_channel(); + let (mut metadata_tx, metadata_rx) = connector(); let byte_source_builder = self.byte_source_builder.clone(); @@ -672,6 +658,9 @@ impl ParquetSourceNode { let metadata_prefetch_size = self.config.metadata_prefetch_size; let metadata_decode_ahead_size = self.config.metadata_decode_ahead_size; + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + self.morsel_stream_starter = Some(start_tx); + let metadata_task_handle = if self .file_options .slice @@ -706,86 +695,88 @@ impl ParquetSourceNode { let current_row_offset_ref = &mut 0usize; let current_path_index_ref = &mut 0usize; - 'main: while metadata_notify_rx.recv().await.is_some() { - loop { - let current_path_index = *current_path_index_ref; - *current_path_index_ref += 1; + if start_rx.await.is_err() { + return Ok(()); + } - let Some(v) = metadata_stream.next().await else { - break 'main; - }; + if verbose { + eprintln!("[ParquetSource]: Starting data fetch") + } - let (path_index, byte_source, metadata, file_max_row_group_height) = v - .map_err(|err| { - err.wrap_msg(|msg| { - format!( - "error at path (index: {}, path: {}): {}", - current_path_index, - paths[current_path_index].to_str().unwrap(), - msg - ) - }) - })?; - - assert_eq!(path_index, current_path_index); - - let current_row_offset = *current_row_offset_ref; - *current_row_offset_ref = - current_row_offset.saturating_add(metadata.num_rows); - - if let Some(slice_range) = slice_range.clone() { - match SplitSlicePosition::split_slice_at_file( - current_row_offset, - metadata.num_rows, - slice_range, - ) { - SplitSlicePosition::Before => { - if verbose { - eprintln!( - "[ParquetSource]: Slice pushdown: \ - Skipped file at index {} ({} rows)", - current_path_index, metadata.num_rows - ); - } - continue; - }, - SplitSlicePosition::After => unreachable!(), - SplitSlicePosition::Overlapping(..) => {}, - }; - }; + loop { + let current_path_index = *current_path_index_ref; + *current_path_index_ref += 1; - { - use tokio::sync::mpsc::error::*; - match metadata_tx.try_send(( - path_index, - current_row_offset, - byte_source, - metadata, - file_max_row_group_height, - )) { - Err(TrySendError::Closed(_)) => break 'main, - Ok(_) => {}, - Err(TrySendError::Full(_)) => unreachable!(), - } - } + let Some(v) = metadata_stream.next().await else { + break; + }; - if let Some(slice_range) = slice_range.as_ref() { - if *current_row_offset_ref >= slice_range.end { + let (path_index, byte_source, metadata, file_max_row_group_height) = v + .map_err(|err| { + err.wrap_msg(|msg| { + format!( + "error at path (index: {}, path: {}): {}", + current_path_index, + paths[current_path_index].to_str().unwrap(), + msg + ) + }) + })?; + + assert_eq!(path_index, current_path_index); + + let current_row_offset = *current_row_offset_ref; + *current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows); + + if let Some(slice_range) = slice_range.clone() { + match SplitSlicePosition::split_slice_at_file( + current_row_offset, + metadata.num_rows, + slice_range, + ) { + SplitSlicePosition::Before => { if verbose { eprintln!( "[ParquetSource]: Slice pushdown: \ - Stopped reading at file at index {} \ - (remaining {} files will not be read)", - current_path_index, - paths.len() - current_path_index - 1, + Skipped file at index {} ({} rows)", + current_path_index, metadata.num_rows ); } - break 'main; - } + continue; + }, + SplitSlicePosition::After => unreachable!(), + SplitSlicePosition::Overlapping(..) => {}, }; + }; + if metadata_tx + .send(( + path_index, + current_row_offset, + byte_source, + metadata, + file_max_row_group_height, + )) + .await + .is_err() + { break; } + + if let Some(slice_range) = slice_range.as_ref() { + if *current_row_offset_ref >= slice_range.end { + if verbose { + eprintln!( + "[ParquetSource]: Slice pushdown: \ + Stopped reading at file at index {} \ + (remaining {} files will not be read)", + current_path_index, + paths.len() - current_path_index - 1, + ); + } + break; + } + }; } Ok(()) @@ -843,13 +834,14 @@ impl ParquetSourceNode { let path_count = self.paths.len(); io_runtime.spawn(async move { - // Wait for the first morsel request before we call `init_negative_slice_and_metadata` - // This also means the receiver must `recv()` once before awaiting on the - // `normalized_slice_oneshot_rx` to avoid hanging. - if metadata_notify_rx.recv().await.is_none() { + if start_rx.await.is_err() { return Ok(()); } + if verbose { + eprintln!("[ParquetSource]: Starting data fetch (negative slice)") + } + let (slice_range, processed_metadata_rev, cum_rows) = async_executor::AbortOnDropHandle::new(async_executor::spawn( TaskPriority::Low, @@ -873,21 +865,12 @@ impl ParquetSourceNode { } } - let mut metadata_iter = processed_metadata_rev.into_iter().rev(); + let metadata_iter = processed_metadata_rev.into_iter().rev(); let current_row_offset_ref = &mut 0usize; - // do-while: We already consumed a notify above. - loop { - let Some(( - current_path_index, - byte_source, - metadata, - file_max_row_group_height, - )) = metadata_iter.next() - else { - break; - }; - + for (current_path_index, byte_source, metadata, file_max_row_group_height) in + metadata_iter + { let current_row_offset = *current_row_offset_ref; *current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows); @@ -900,19 +883,18 @@ impl ParquetSourceNode { SplitSlicePosition::Overlapping(..) )); - { - use tokio::sync::mpsc::error::*; - match metadata_tx.try_send(( + if metadata_tx + .send(( current_path_index, current_row_offset, byte_source, metadata, file_max_row_group_height, - )) { - Err(TrySendError::Closed(_)) => break, - Ok(v) => v, - Err(TrySendError::Full(_)) => unreachable!(), - } + )) + .await + .is_err() + { + break; } if *current_row_offset_ref >= slice_range.end { @@ -927,10 +909,6 @@ impl ParquetSourceNode { } break; } - - if metadata_notify_rx.recv().await.is_none() { - break; - } } Ok(()) @@ -1040,7 +1018,13 @@ struct RowGroupData { } struct RowGroupDataFetcher { - metadata_rx: NotifyReceiver<(usize, usize, Arc, FileMetaData, usize)>, + metadata_rx: crate::async_primitives::connector::Receiver<( + usize, + usize, + Arc, + FileMetaData, + usize, + )>, use_statistics: bool, verbose: bool, reader_schema: Arc, @@ -1085,7 +1069,7 @@ impl RowGroupDataFetcher { } async fn init_next_file_state(&mut self) -> bool { - let Some((path_index, row_offset, byte_source, metadata, file_max_row_group_height)) = + let Ok((path_index, row_offset, byte_source, metadata, file_max_row_group_height)) = self.metadata_rx.recv().await else { return false; diff --git a/crates/polars-stream/src/utils/mod.rs b/crates/polars-stream/src/utils/mod.rs index f8d0d74ff027..4d16cd5499e3 100644 --- a/crates/polars-stream/src/utils/mod.rs +++ b/crates/polars-stream/src/utils/mod.rs @@ -1,5 +1,4 @@ pub mod in_memory_linearize; pub mod late_materialized_df; pub mod linearizer; -pub mod notify_channel; pub mod task_handles_ext; diff --git a/crates/polars-stream/src/utils/notify_channel.rs b/crates/polars-stream/src/utils/notify_channel.rs deleted file mode 100644 index 5aaef03ddc61..000000000000 --- a/crates/polars-stream/src/utils/notify_channel.rs +++ /dev/null @@ -1,56 +0,0 @@ -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::mpsc::{channel, Receiver, Sender}; - -/// Receiver that calls `notify()` before `recv()` -pub struct NotifyReceiver { - receiver: Receiver, - /// We use a channel for notify because it lets the sender know when the receiver has been - /// dropped. - notify: Sender<()>, -} - -impl NotifyReceiver { - pub async fn recv(&mut self) -> Option { - match self.notify.try_send(()) { - Err(TrySendError::Closed(_)) => None, - Ok(_) => self.receiver.recv().await, - v @ Err(TrySendError::Full(_)) => { - v.unwrap(); - unreachable!(); - }, - } - } -} - -/// The notify allows us to make the producer only produce values when requested. Otherwise it would -/// produce a new value as soon as the previous value was consumed (as there would be channel -/// capacity). -pub fn notify_channel() -> (Sender, Receiver<()>, NotifyReceiver) { - let (tx, rx) = channel::(1); - let (notify_tx, notify_rx) = channel(1); - - ( - tx, - notify_rx, - NotifyReceiver { - receiver: rx, - notify: notify_tx, - }, - ) -} - -mod tests { - - #[test] - fn test_notify_channel() { - use futures::FutureExt; - - use super::notify_channel; - let (tx, mut notify, mut rx) = notify_channel(); - assert!(notify.recv().now_or_never().is_none()); - assert!(rx.recv().now_or_never().is_none()); - assert_eq!(notify.recv().now_or_never().unwrap(), Some(())); - assert!(tx.try_send(()).is_ok()); - assert!(rx.recv().now_or_never().is_some()); - } -}