Skip to content

Commit

Permalink
refactor(rust): Remove NotifyReceiver from new-streaming parquet so…
Browse files Browse the repository at this point in the history
…urce (#18540)
  • Loading branch information
nameexhaustion authored Sep 4, 2024
1 parent b1f15c3 commit af8738c
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 181 deletions.
232 changes: 108 additions & 124 deletions crates/polars-stream/src/nodes/parquet_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::async_executor::{self};
use crate::async_primitives::connector::connector;
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
use crate::morsel::get_ideal_morsel_size;
use crate::utils::notify_channel::{notify_channel, NotifyReceiver};
use crate::utils::task_handles_ext;

type AsyncTaskData = Option<(
Expand Down Expand Up @@ -367,8 +366,6 @@ impl ParquetSourceNode {
assert_eq!(self.physical_predicate.is_some(), self.predicate.is_some());
let predicate = self.physical_predicate.clone();
let memory_prefetch_func = self.memory_prefetch_func;
let (start_tx, start_rx) = tokio::sync::oneshot::channel();
self.morsel_stream_starter = Some(start_tx);

let mut row_group_data_fetcher = RowGroupDataFetcher {
metadata_rx,
Expand Down Expand Up @@ -397,23 +394,6 @@ impl ParquetSourceNode {
// that under heavy CPU load scenarios the I/O throughput drops due to this task not being
// scheduled we can change it to be a high priority task.
let morsel_stream_task_handle = async_executor::spawn(TaskPriority::Low, async move {
if start_rx.await.is_err() {
drop(row_group_data_fetcher);
return metadata_task_handle.await.unwrap();
}

if verbose {
eprintln!("[ParquetSource]: Starting row group data fetch")
}

// We must `recv()` from the `NotifyReceiver` before awaiting on the
// `normalized_slice_oneshot_rx`, as in the negative offset case the slice resolution
// only runs after the first notify.
if !row_group_data_fetcher.init_next_file_state().await {
drop(row_group_data_fetcher);
return metadata_task_handle.await.unwrap();
};

let slice_range = {
let Ok(slice) = normalized_slice_oneshot_rx.await else {
// If we are here then the producer probably errored.
Expand Down Expand Up @@ -518,16 +498,16 @@ impl ParquetSourceNode {
break;
},
Err(SendError::Closed(v)) => {
// The port assigned to this wait group has been closed, so we will not
// The channel assigned to this wait group has been closed, so we will not
// add it back to the list of wait groups, and we will try to send this
// across another port.
// across another channel.
df = v.0
},
Err(SendError::Full(_)) => unreachable!(),
}

let Some(v) = wait_groups.next().await else {
// All ports have closed
// All channels have closed
break 'main;
};

Expand All @@ -553,10 +533,16 @@ impl ParquetSourceNode {
/// we can find a way to re-use it.
#[allow(clippy::type_complexity)]
fn init_metadata_fetcher(
&self,
&mut self,
) -> (
tokio::sync::oneshot::Receiver<Option<(usize, usize)>>,
NotifyReceiver<(usize, usize, Arc<DynByteSource>, FileMetaData, usize)>,
crate::async_primitives::connector::Receiver<(
usize,
usize,
Arc<DynByteSource>,
FileMetaData,
usize,
)>,
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
) {
let verbose = self.verbose;
Expand All @@ -572,7 +558,7 @@ impl ParquetSourceNode {

let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) =
tokio::sync::oneshot::channel();
let (metadata_tx, mut metadata_notify_rx, metadata_rx) = notify_channel();
let (mut metadata_tx, metadata_rx) = connector();

let byte_source_builder = self.byte_source_builder.clone();

Expand Down Expand Up @@ -672,6 +658,9 @@ impl ParquetSourceNode {
let metadata_prefetch_size = self.config.metadata_prefetch_size;
let metadata_decode_ahead_size = self.config.metadata_decode_ahead_size;

let (start_tx, start_rx) = tokio::sync::oneshot::channel();
self.morsel_stream_starter = Some(start_tx);

let metadata_task_handle = if self
.file_options
.slice
Expand Down Expand Up @@ -706,86 +695,88 @@ impl ParquetSourceNode {
let current_row_offset_ref = &mut 0usize;
let current_path_index_ref = &mut 0usize;

'main: while metadata_notify_rx.recv().await.is_some() {
loop {
let current_path_index = *current_path_index_ref;
*current_path_index_ref += 1;
if start_rx.await.is_err() {
return Ok(());
}

let Some(v) = metadata_stream.next().await else {
break 'main;
};
if verbose {
eprintln!("[ParquetSource]: Starting data fetch")
}

let (path_index, byte_source, metadata, file_max_row_group_height) = v
.map_err(|err| {
err.wrap_msg(|msg| {
format!(
"error at path (index: {}, path: {}): {}",
current_path_index,
paths[current_path_index].to_str().unwrap(),
msg
)
})
})?;

assert_eq!(path_index, current_path_index);

let current_row_offset = *current_row_offset_ref;
*current_row_offset_ref =
current_row_offset.saturating_add(metadata.num_rows);

if let Some(slice_range) = slice_range.clone() {
match SplitSlicePosition::split_slice_at_file(
current_row_offset,
metadata.num_rows,
slice_range,
) {
SplitSlicePosition::Before => {
if verbose {
eprintln!(
"[ParquetSource]: Slice pushdown: \
Skipped file at index {} ({} rows)",
current_path_index, metadata.num_rows
);
}
continue;
},
SplitSlicePosition::After => unreachable!(),
SplitSlicePosition::Overlapping(..) => {},
};
};
loop {
let current_path_index = *current_path_index_ref;
*current_path_index_ref += 1;

{
use tokio::sync::mpsc::error::*;
match metadata_tx.try_send((
path_index,
current_row_offset,
byte_source,
metadata,
file_max_row_group_height,
)) {
Err(TrySendError::Closed(_)) => break 'main,
Ok(_) => {},
Err(TrySendError::Full(_)) => unreachable!(),
}
}
let Some(v) = metadata_stream.next().await else {
break;
};

if let Some(slice_range) = slice_range.as_ref() {
if *current_row_offset_ref >= slice_range.end {
let (path_index, byte_source, metadata, file_max_row_group_height) = v
.map_err(|err| {
err.wrap_msg(|msg| {
format!(
"error at path (index: {}, path: {}): {}",
current_path_index,
paths[current_path_index].to_str().unwrap(),
msg
)
})
})?;

assert_eq!(path_index, current_path_index);

let current_row_offset = *current_row_offset_ref;
*current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows);

if let Some(slice_range) = slice_range.clone() {
match SplitSlicePosition::split_slice_at_file(
current_row_offset,
metadata.num_rows,
slice_range,
) {
SplitSlicePosition::Before => {
if verbose {
eprintln!(
"[ParquetSource]: Slice pushdown: \
Stopped reading at file at index {} \
(remaining {} files will not be read)",
current_path_index,
paths.len() - current_path_index - 1,
Skipped file at index {} ({} rows)",
current_path_index, metadata.num_rows
);
}
break 'main;
}
continue;
},
SplitSlicePosition::After => unreachable!(),
SplitSlicePosition::Overlapping(..) => {},
};
};

if metadata_tx
.send((
path_index,
current_row_offset,
byte_source,
metadata,
file_max_row_group_height,
))
.await
.is_err()
{
break;
}

if let Some(slice_range) = slice_range.as_ref() {
if *current_row_offset_ref >= slice_range.end {
if verbose {
eprintln!(
"[ParquetSource]: Slice pushdown: \
Stopped reading at file at index {} \
(remaining {} files will not be read)",
current_path_index,
paths.len() - current_path_index - 1,
);
}
break;
}
};
}

Ok(())
Expand Down Expand Up @@ -843,13 +834,14 @@ impl ParquetSourceNode {
let path_count = self.paths.len();

io_runtime.spawn(async move {
// Wait for the first morsel request before we call `init_negative_slice_and_metadata`
// This also means the receiver must `recv()` once before awaiting on the
// `normalized_slice_oneshot_rx` to avoid hanging.
if metadata_notify_rx.recv().await.is_none() {
if start_rx.await.is_err() {
return Ok(());
}

if verbose {
eprintln!("[ParquetSource]: Starting data fetch (negative slice)")
}

let (slice_range, processed_metadata_rev, cum_rows) =
async_executor::AbortOnDropHandle::new(async_executor::spawn(
TaskPriority::Low,
Expand All @@ -873,21 +865,12 @@ impl ParquetSourceNode {
}
}

let mut metadata_iter = processed_metadata_rev.into_iter().rev();
let metadata_iter = processed_metadata_rev.into_iter().rev();
let current_row_offset_ref = &mut 0usize;

// do-while: We already consumed a notify above.
loop {
let Some((
current_path_index,
byte_source,
metadata,
file_max_row_group_height,
)) = metadata_iter.next()
else {
break;
};

for (current_path_index, byte_source, metadata, file_max_row_group_height) in
metadata_iter
{
let current_row_offset = *current_row_offset_ref;
*current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows);

Expand All @@ -900,19 +883,18 @@ impl ParquetSourceNode {
SplitSlicePosition::Overlapping(..)
));

{
use tokio::sync::mpsc::error::*;
match metadata_tx.try_send((
if metadata_tx
.send((
current_path_index,
current_row_offset,
byte_source,
metadata,
file_max_row_group_height,
)) {
Err(TrySendError::Closed(_)) => break,
Ok(v) => v,
Err(TrySendError::Full(_)) => unreachable!(),
}
))
.await
.is_err()
{
break;
}

if *current_row_offset_ref >= slice_range.end {
Expand All @@ -927,10 +909,6 @@ impl ParquetSourceNode {
}
break;
}

if metadata_notify_rx.recv().await.is_none() {
break;
}
}

Ok(())
Expand Down Expand Up @@ -1040,7 +1018,13 @@ struct RowGroupData {
}

struct RowGroupDataFetcher {
metadata_rx: NotifyReceiver<(usize, usize, Arc<DynByteSource>, FileMetaData, usize)>,
metadata_rx: crate::async_primitives::connector::Receiver<(
usize,
usize,
Arc<DynByteSource>,
FileMetaData,
usize,
)>,
use_statistics: bool,
verbose: bool,
reader_schema: Arc<ArrowSchema>,
Expand Down Expand Up @@ -1085,7 +1069,7 @@ impl RowGroupDataFetcher {
}

async fn init_next_file_state(&mut self) -> bool {
let Some((path_index, row_offset, byte_source, metadata, file_max_row_group_height)) =
let Ok((path_index, row_offset, byte_source, metadata, file_max_row_group_height)) =
self.metadata_rx.recv().await
else {
return false;
Expand Down
1 change: 0 additions & 1 deletion crates/polars-stream/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod in_memory_linearize;
pub mod late_materialized_df;
pub mod linearizer;
pub mod notify_channel;
pub mod task_handles_ext;
Loading

0 comments on commit af8738c

Please sign in to comment.