diff --git a/Cargo.lock b/Cargo.lock index dff964bd90f7..688f54df080c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3225,7 +3225,9 @@ version = "0.42.0" dependencies = [ "ahash", "bitflags", + "crossbeam-channel", "futures", + "glob", "memchr", "once_cell", "polars-arrow", @@ -3368,6 +3370,7 @@ dependencies = [ "chrono", "chrono-tz", "ciborium", + "crossbeam-channel", "either", "futures", "hashbrown", diff --git a/Cargo.toml b/Cargo.toml index a83488882fb6..f3315c6b0891 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ strength_reduce = "0.2" strum_macros = "0.26" thiserror = "1" tokio = "1.26" +tokio-stream = "0.1.15" tokio-util = "0.7.8" unicode-reverse = "1.0.8" url = "2.4" diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index b66920de7ab9..86c8a6aab91e 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -623,6 +623,19 @@ impl PhysicalIoExpr for PhysicalIoHelper { fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> { self.expr.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + let _ = to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s) + } + } + columns + } } pub fn phys_expr_to_io_expr(expr: Arc) -> Arc { diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index 545f19168f9f..3e784383c587 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -248,7 +248,7 @@ where ); ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?; - let df = chunk_df_for_writing(df, 512 * 512)?; + let df = chunk_df_for_writing(df, 10)?; let iter = df.iter_chunks(self.compat_level, true); for batch in iter { diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index a06b3f88a0dd..a5fa4be4f3cc 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -13,7 +13,6 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use super::mmap::ColumnStore; -use super::predicates::read_this_row_group; use crate::cloud::{ build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, }; @@ -26,6 +25,7 @@ type DownloadedRowGroup = Vec<(u64, Bytes)>; type QueuePayload = (usize, DownloadedRowGroup); type QueueSend = Arc>>; +#[derive(Debug, Clone)] pub struct ParquetObjectStore { store: PolarsObjectStore, path: ObjectPath, @@ -272,37 +272,8 @@ impl FetchRowGroupsFromObjectStore { .collect() }); - let mut prefetched: PlHashMap = PlHashMap::new(); - - let mut row_groups = if let Some(pred) = predicate.as_deref() { - row_group_range - .filter_map(|i| { - let rg = &row_groups[i]; - - // TODO! - // Optimize this. Now we partition the predicate columns twice. (later on reading as well) - // I think we must add metadata context where we can cache and amortize the partitioning. - let mut part_md = PartitionedColumnChunkMD::new(rg); - let live = pred.live_variables(); - part_md.set_partitions( - live.as_ref() - .map(|vars| vars.iter().map(|s| s.as_ref()).collect::>()) - .as_ref(), - ); - let should_be_read = - matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true)); - - // Already add the row groups that will be skipped to the prefetched data. - if !should_be_read { - prefetched.insert(i, Default::default()); - } + let mut row_groups = row_groups.iter().cloned().enumerate().collect::>(); - should_be_read.then(|| (i, rg.clone())) - }) - .collect::>() - } else { - row_groups.iter().cloned().enumerate().collect() - }; let reader = Arc::new(reader); let msg_limit = get_rg_prefetch_size(); diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 096e7d8170c8..383cbaca8adc 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -61,7 +61,7 @@ fn assert_dtypes(data_type: &ArrowDataType) { } } -fn column_idx_to_series( +pub fn column_idx_to_series( column_i: usize, // The metadata belonging to this column field_md: &[&ColumnChunkMetaData], @@ -144,6 +144,7 @@ fn rg_to_dfs( row_group_end: usize, slice: (usize, usize), file_metadata: &FileMetaData, + row_groups: &Vec, schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -177,6 +178,7 @@ fn rg_to_dfs( row_group_start, row_group_end, file_metadata, + row_groups, schema, live_variables, predicate, @@ -196,7 +198,7 @@ fn rg_to_dfs( row_group_start, row_group_end, slice, - file_metadata, + row_groups, schema, predicate, row_index, @@ -211,7 +213,7 @@ fn rg_to_dfs( row_group_end, previous_row_count, slice, - file_metadata, + row_groups, schema, predicate, row_index, @@ -247,6 +249,7 @@ fn rg_to_dfs_prefiltered( row_group_start: usize, row_group_end: usize, file_metadata: &FileMetaData, + row_groups: &[RowGroupMetaData], schema: &ArrowSchemaRef, live_variables: Vec, predicate: &dyn PhysicalIoExpr, @@ -529,7 +532,7 @@ fn rg_to_dfs_optionally_par_over_columns( row_group_start: usize, row_group_end: usize, slice: (usize, usize), - file_metadata: &FileMetaData, + row_groups: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -540,13 +543,11 @@ fn rg_to_dfs_optionally_par_over_columns( ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); - let mut n_rows_processed: usize = (0..row_group_start) - .map(|i| file_metadata.row_groups[i].num_rows()) - .sum(); + let mut n_rows_processed: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum(); let slice_end = slice.0 + slice.1; for rg_idx in row_group_start..row_group_end { - let md = &file_metadata.row_groups[rg_idx]; + let md = &row_groups[rg_idx]; // Set partitioned fields to prevent quadratic behavior. let projected_columns = projected_columns_set(schema, projection); @@ -636,7 +637,7 @@ fn rg_to_dfs_par_over_rg( row_group_end: usize, previous_row_count: &mut IdxSize, slice: (usize, usize), - file_metadata: &FileMetaData, + row_groups: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -645,16 +646,16 @@ fn rg_to_dfs_par_over_rg( hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { // compute the limits per row group and the row count offsets - let mut row_groups = Vec::with_capacity(row_group_end - row_group_start); + let mut row_groups_iter = Vec::with_capacity(row_group_end - row_group_start); let mut n_rows_processed: usize = (0..row_group_start) - .map(|i| file_metadata.row_groups[i].num_rows()) + .map(|i| row_groups[i].num_rows()) .sum(); let slice_end = slice.0 + slice.1; for i in row_group_start..row_group_end { let row_count_start = *previous_row_count; - let rg_md = &file_metadata.row_groups[i]; + let rg_md = &row_groups[i]; let rg_slice = split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end); *previous_row_count = previous_row_count @@ -665,7 +666,7 @@ fn rg_to_dfs_par_over_rg( continue; } - row_groups.push((i, rg_md, rg_slice, row_count_start)); + row_groups_iter.push((i, rg_md, rg_slice, row_count_start)); } let dfs = POOL.install(|| { @@ -673,7 +674,7 @@ fn rg_to_dfs_par_over_rg( // Ensure all row groups are partitioned. let part_mds = { let projected_columns = projected_columns_set(schema, projection); - row_groups + row_groups_iter .par_iter() .map(|(_, rg, _, _)| { let mut ccmd = PartitionedColumnChunkMD::new(rg); @@ -683,7 +684,7 @@ fn rg_to_dfs_par_over_rg( .collect::>() }; - row_groups + row_groups_iter .into_par_iter() .enumerate() .map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| { @@ -747,6 +748,7 @@ pub fn read_parquet( projection: Option<&[usize]>, reader_schema: &ArrowSchemaRef, metadata: Option, + row_groups: Option>, predicate: Option<&dyn PhysicalIoExpr>, mut parallel: ParallelStrategy, row_index: Option, @@ -766,6 +768,7 @@ pub fn read_parquet( let file_metadata = metadata .map(Ok) .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?; + let row_groups = row_groups.unwrap_or_else(|| file_metadata.row_groups.clone()); let n_row_groups = file_metadata.row_groups.len(); // if there are multiple row groups and categorical data @@ -820,6 +823,7 @@ pub fn read_parquet( n_row_groups, slice, &file_metadata, + &row_groups, reader_schema, predicate, row_index.clone(), @@ -887,7 +891,10 @@ impl From for RowGroupFetcher { } impl RowGroupFetcher { - async fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { + pub async fn fetch_row_groups( + &mut self, + _row_groups: Range, + ) -> PolarsResult { match self { RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups), #[cfg(feature = "cloud")] @@ -947,6 +954,7 @@ pub struct BatchedParquetReader { projection: Arc<[usize]>, schema: ArrowSchemaRef, metadata: FileMetaDataRef, + row_groups: Vec, predicate: Option>, row_index: Option, rows_read: IdxSize, @@ -967,6 +975,7 @@ impl BatchedParquetReader { pub fn new( row_group_fetcher: RowGroupFetcher, metadata: FileMetaDataRef, + row_groups: Vec, schema: ArrowSchemaRef, slice: (usize, usize), projection: Option>, @@ -978,7 +987,7 @@ impl BatchedParquetReader { include_file_path: Option<(PlSmallStr, Arc)>, mut parallel: ParallelStrategy, ) -> PolarsResult { - let n_row_groups = metadata.row_groups.len(); + let n_row_groups = row_groups.len(); let projection = projection .map(Arc::from) .unwrap_or_else(|| (0usize..schema.len()).collect::>()); @@ -1004,6 +1013,7 @@ impl BatchedParquetReader { projection, schema, metadata, + row_groups, row_index, rows_read: 0, predicate, @@ -1054,7 +1064,7 @@ impl BatchedParquetReader { self.row_group_offset, self.row_group_offset + n, self.slice, - &self.metadata.row_groups, + &self.row_groups, ); let store = self @@ -1070,6 +1080,7 @@ impl BatchedParquetReader { row_group_range.end, self.slice, &self.metadata, + &self.row_groups, &self.schema, self.predicate.as_deref(), self.row_index.clone(), @@ -1093,6 +1104,7 @@ impl BatchedParquetReader { let predicate = self.predicate.clone(); let schema = self.schema.clone(); let metadata = self.metadata.clone(); + let row_groups = self.row_groups.clone(); let parallel = self.parallel; let projection = self.projection.clone(); let use_statistics = self.use_statistics; @@ -1107,6 +1119,7 @@ impl BatchedParquetReader { row_group_range.end, slice, &metadata, + &row_groups, &schema, predicate.as_deref(), row_index, diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index f5b52437dd82..3dad97ef59d9 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -2,24 +2,31 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use arrow::datatypes::ArrowSchemaRef; +use polars_core::config::verbose; use polars_core::prelude::*; #[cfg(feature = "cloud")] use polars_core::utils::accumulate_dataframes_vertical_unchecked; -use polars_parquet::read; +use polars_parquet::read::{self, RowGroupMetaData}; #[cfg(feature = "cloud")] use super::async_impl::FetchRowGroupsFromObjectStore; #[cfg(feature = "cloud")] use super::async_impl::ParquetObjectStore; +use super::mmap::ColumnStore; +use super::predicates::read_this_row_group; pub use super::read_impl::BatchedParquetReader; -use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader}; +use super::read_impl::{ + column_idx_to_series, compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader, RowGroupFetcher +}; #[cfg(feature = "cloud")] use super::utils::materialize_empty_df; #[cfg(feature = "cloud")] use crate::cloud::CloudOptions; +use crate::hive::materialize_hive_partitions; use crate::mmap::MmapBytesReader; use crate::parquet::metadata::FileMetaDataRef; -use crate::predicates::PhysicalIoExpr; +use crate::parquet::read::metadata::PartitionedColumnChunkMD; +use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::prelude::*; use crate::RowIndex; @@ -157,6 +164,7 @@ impl ParquetReader { pub fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.get_metadata()?.clone(); let schema = self.schema()?; + let row_groups = metadata.row_groups.clone(); // XXX: Can a parquet file starts at an offset? self.reader.seek(SeekFrom::Start(0))?; @@ -164,6 +172,7 @@ impl ParquetReader { BatchedParquetReader::new( row_group_fetcher, metadata, + row_groups, schema, self.slice, self.projection, @@ -219,6 +228,7 @@ impl SerReader for ParquetReader { self.projection.as_deref(), &schema, Some(metadata), + None, self.predicate.as_deref(), self.parallel, self.row_index, @@ -262,6 +272,7 @@ pub struct ParquetAsyncReader { include_file_path: Option<(PlSmallStr, Arc)>, schema: Option, parallel: ParallelStrategy, + row_groups: Option>, } #[cfg(feature = "cloud")] @@ -283,6 +294,7 @@ impl ParquetAsyncReader { include_file_path: None, schema: None, parallel: Default::default(), + row_groups: None, }) } @@ -328,6 +340,17 @@ impl ParquetAsyncReader { self } + pub async fn num_rows_with_predicate(&mut self) -> PolarsResult { + let row_sizes = self + .prune_row_groups(self.reader.clone()) + .await? + .iter() + .map(|(row_size, _md)| *row_size) + .collect::>(); + + Ok(row_sizes.iter().sum()) + } + pub fn with_row_index(mut self, row_index: Option) -> Self { self.row_index = row_index; self @@ -379,24 +402,30 @@ impl ParquetAsyncReader { Some(schema) => schema, None => self.schema().await?, }; + let mut row_groups = self + .row_groups + .clone() + .unwrap_or(metadata.row_groups.clone()); + + if self.slice.1 != 0 { + self.slice = (0, usize::MAX); + } else { + row_groups = vec![]; + } // row group fetched deals with projection let row_group_fetcher = FetchRowGroupsFromObjectStore::new( self.reader, schema.clone(), self.projection.as_deref(), self.predicate.clone(), - compute_row_group_range( - 0, - metadata.row_groups.len(), - self.slice, - &metadata.row_groups, - ), - &metadata.row_groups, + compute_row_group_range(0, metadata.row_groups.len(), self.slice, &row_groups), + &row_groups, )? .into(); BatchedParquetReader::new( row_group_fetcher, metadata, + row_groups, schema, self.slice, self.projection, @@ -446,4 +475,115 @@ impl ParquetAsyncReader { } Ok(df) } + + #[cfg(feature = "cloud")] + async fn prune_row_groups( + &mut self, + reader: ParquetObjectStore, + ) -> PolarsResult> { + + + let metadata = self.reader.get_metadata().await?.clone(); + let schema = &self.schema().await?; + + let predicate = self.predicate.clone(); + let projection = self.projection.as_deref(); + let hive_partition_columns = self.hive_partition_columns.as_deref(); + + let predicate_columns = predicate.clone().unwrap().columns(); + let predicate_projection = materialize_projection( + Some(&predicate_columns), + &Schema::from_arrow_schema(&schema.clone()), + hive_partition_columns, + false, + ); + + let mut predicate_row_group_fetcher: RowGroupFetcher = FetchRowGroupsFromObjectStore::new( + reader, + schema.clone(), + projection, + predicate.clone(), + compute_row_group_range( + 0, + metadata.row_groups.len(), + (0, usize::MAX), + &metadata.row_groups, + ), + &metadata.row_groups, + )? + .into(); + + let predicate_store: ColumnStore = predicate_row_group_fetcher + .fetch_row_groups(0..metadata.row_groups.len()) + .await?; + + let mut remaining_rows = self.slice.1; + + let row_groups = metadata.row_groups.clone(); + let final_row_groups = row_groups + .iter() + .map(|md| { + let mut part_md = PartitionedColumnChunkMD::new(md); + let fields = predicate_columns.iter().map(|column| column.as_str()).collect::>(); + part_md.set_partitions(Some(&fields)); + + if !read_this_row_group(predicate.clone().as_deref(), &part_md, schema).unwrap() + || remaining_rows == 0 + { + return (0, md); + } + + let mut columns: Vec = vec![]; + + + for column_i in predicate_projection.as_ref().unwrap() { + let name = &schema.fields[*column_i].name; + let field_md = part_md.get_partitions(name).unwrap(); + let column = column_idx_to_series( + *column_i, + field_md.as_slice(), + None, + &schema.clone(), + &predicate_store, + ) + .unwrap(); + columns.push(column) + } + + let mut df = unsafe { DataFrame::new_no_checks(columns) }; + let reader_schema = schema.as_ref(); + + materialize_hive_partitions( + &mut df, + reader_schema, + hive_partition_columns, + md.num_rows(), + ); + apply_predicate(&mut df, predicate.as_deref(), false).unwrap(); + + let row_count = df.height(); + + remaining_rows = remaining_rows.saturating_sub(row_count); + + (row_count, md) + }) + .filter(|(row_count, _md)| *row_count != 0) + .map(|(row_count, md)| (row_count, md.clone())) + .collect::>(); + if verbose() { + eprintln!( + "reduced the number of row groups in pruning by {}", + row_groups.len() - final_row_groups.len() + ) + } + let row_groups = Some( + final_row_groups + .clone() + .into_iter() + .map(|(_row_countm, row_group_metadata)| row_group_metadata) + .collect::>(), + ); + self.row_groups = row_groups; + Ok(final_row_groups) + } } diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 8acfc304a1a8..2b6229bf63c0 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -16,6 +16,8 @@ pub trait PhysicalIoExpr: Send + Sync { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { None } + + fn columns(&self) -> Vec; } pub trait StatsEvaluator { diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 03fcc0d8b2c8..80c940163da4 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -25,6 +25,8 @@ polars-utils = { workspace = true } ahash = { workspace = true } bitflags = { workspace = true } +crossbeam-channel = { workspace = true } +glob = { version = "0.3" } memchr = { workspace = true } once_cell = { workspace = true } pyo3 = { workspace = true, optional = true } diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index d0125dd0bfbc..dda8e0fe2a96 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -19,6 +19,7 @@ use std::path::Path; use std::sync::{Arc, Mutex}; pub use anonymous_scan::*; +use crossbeam_channel::{bounded, Receiver}; #[cfg(feature = "csv")] pub use csv::*; #[cfg(not(target_arch = "wasm32"))] @@ -31,6 +32,7 @@ pub use ndjson::*; #[cfg(feature = "parquet")] pub use parquet::*; use polars_core::prelude::*; +use polars_core::POOL; use polars_expr::{create_physical_expr, ExpressionConversionState}; use polars_io::RowIndex; use polars_mem_engine::{create_physical_plan, Executor}; @@ -883,6 +885,30 @@ impl LazyFrame { ) } + pub fn sink_to_batches(mut self) -> Result, PolarsError> { + self.opt_state.set(OptFlags::STREAMING, true); + let morsels_per_sink = POOL.current_num_threads(); + let backpressure = morsels_per_sink * 4; + let (sender, receiver) = bounded(backpressure); + self.logical_plan = DslPlan::Sink { + input: Arc::new(self.logical_plan), + payload: SinkType::Batch { + sender: BatchSender { id: 0, sender }, + }, + }; + + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: format!("cannot run the whole query in a streaming order") + ); + POOL.spawn(move || { + let _ = physical_plan.execute(&mut state).unwrap(); + }); + + Ok(receiver) + } + #[cfg(any( feature = "ipc", feature = "parquet", diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 777f769866d0..7a8f2db51449 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -33,6 +33,19 @@ impl PhysicalIoExpr for Wrap { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + let _ = to_aexpr(self.0.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s) + } + } + columns + } } impl PhysicalPipedExpr for Wrap { fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index bd3d87ff8832..0fcde6c3b234 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -343,6 +343,7 @@ impl ParquetExec { .await?; let num_rows = reader.num_rows().await?; + PolarsResult::Ok((num_rows, reader)) }); let readers_and_metadata = futures::future::try_join_all(iter).await?; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index fc7a29435c77..d92fbce78af7 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -217,6 +217,9 @@ fn create_physical_plan_impl( SinkType::Cloud { .. } => { polars_bail!(InvalidOperation: "cloud sink not supported in standard engine.") }, + SinkType::Batch { .. } => { + polars_bail!(InvalidOperation: "batch sink not supported in the standard engine") + }, }, Union { inputs, options } => { let inputs = inputs diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 4e81e8531bac..bce2b4c78e2f 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -35,6 +35,10 @@ impl PhysicalIoExpr for Len { fn live_variables(&self) -> Option> { Some(vec![]) } + + fn columns(&self) -> Vec { + unimplemented!() + } } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs new file mode 100644 index 000000000000..58fd05e037a5 --- /dev/null +++ b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs @@ -0,0 +1,51 @@ +use std::any::Any; + +use crossbeam_channel::Sender; +use polars_core::prelude::*; + +use crate::operators::{ + chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, +}; + +#[derive(Clone)] +pub struct BatchSink { + sender: Sender, +} + +impl BatchSink { + pub fn new(sender: Sender) -> PolarsResult { + Ok(Self { sender }) + } +} + +impl Sink for BatchSink { + fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { + let df: DataFrame = chunks_to_df_unchecked(vec![chunk]); + let result = self.sender.send(df); + match result { + Ok(..) => Ok(SinkResult::CanHaveMoreInput), + Err(..) => Ok(SinkResult::Finished), + } + } + + fn combine(&mut self, _other: &mut dyn Sink) { + // Nothing to do + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } + + fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult { + let _ = self.sender.send(Default::default()); + Ok(FinalizedSink::Finished(Default::default())) + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn fmt(&self) -> &str { + "batch_sink" + } +} diff --git a/crates/polars-pipe/src/executors/sinks/output/mod.rs b/crates/polars-pipe/src/executors/sinks/output/mod.rs index 602e525fa853..a2f8cc9b25f2 100644 --- a/crates/polars-pipe/src/executors/sinks/output/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/output/mod.rs @@ -1,3 +1,4 @@ +mod batch_sink; #[cfg(feature = "csv")] mod csv; #[cfg(any( @@ -14,6 +15,7 @@ mod json; #[cfg(feature = "parquet")] mod parquet; +pub use batch_sink::*; #[cfg(feature = "csv")] pub use csv::*; #[cfg(feature = "ipc")] diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index cd0cb58f3574..cccf14861d57 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -37,6 +37,7 @@ pub struct ParquetSource { processed_rows: AtomicUsize, iter: Range, paths: Arc>, + total_files_read: usize, options: ParquetOptions, file_options: FileScanOptions, #[allow(dead_code)] @@ -48,6 +49,7 @@ pub struct ParquetSource { run_async: bool, prefetch_size: usize, predicate: Option>, + rows_left_to_read: usize, } impl ParquetSource { @@ -184,64 +186,71 @@ impl ParquetSource { /// requires `self.processed_rows` to be incremented in the correct order), as it does not /// coordinate to increment the row offset in a properly ordered manner. #[cfg(feature = "async")] - async fn init_reader_async(&self, index: usize) -> PolarsResult { - use std::sync::atomic::Ordering; - - let metadata = self.metadata.clone(); + async fn init_reader_async( + &self, + index: usize, + n_rows: usize, + ) -> PolarsResult<(ParquetAsyncReader, usize)> { + let metadata: Option> = self.metadata.clone(); let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); let (path, options, file_options, projection, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; - let batched_reader = { + let reader = { let uri = path.to_string_lossy(); - - let mut async_reader = - ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) - .await? - .with_row_index(file_options.row_index) - .with_projection(projection) - .check_schema( - self.file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left(), - ) - .await? - .with_predicate(predicate.clone()) - .use_statistics(options.use_statistics) - .with_hive_partition_columns(hive_partitions) - .with_include_file_path( - self.file_options - .include_file_paths - .as_ref() - .map(|x| (x.clone(), Arc::from(path.to_str().unwrap()))), - ); - - let n_rows_this_file = async_reader.num_rows().await?; - let current_row_offset = self - .processed_rows - .fetch_add(n_rows_this_file, Ordering::Relaxed); - - let slice = file_options.slice.map(|slice| { - assert!(slice.0 >= 0); - let slice_start = slice.0 as usize; - let slice_end = slice_start + slice.1; - split_slice_at_file( - &mut current_row_offset.clone(), - n_rows_this_file, - slice_start, - slice_end, + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) + .await? + .read_parallel(options.parallel) + .with_row_index(file_options.row_index) + .with_projection(projection) + .with_predicate(predicate.clone()) + .with_hive_partition_columns(hive_partitions) + .use_statistics(options.use_statistics) + .with_slice(Some((0, n_rows))) + .with_include_file_path( + self.file_options + .include_file_paths + .as_ref() + .map(|x| (x.clone(), Arc::from(path.to_str().unwrap()))), ) - }); - - async_reader.with_slice(slice).batched(chunk_size).await? }; + Ok((reader, chunk_size)) + } + + #[cfg(feature = "async")] + async fn init_batch_reader( + &self, + reader: ParquetAsyncReader, + chunk_size: usize, + ) -> PolarsResult { + let batched_reader = reader + .check_schema( + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), + ) + .await? + .batched(chunk_size) + .await?; Ok(batched_reader) } + #[cfg(feature = "async")] + async fn num_rows_per_reader(&self, reader: &mut ParquetAsyncReader) -> PolarsResult { + let predicate = self.predicate.clone(); + let num_rows; + if predicate.is_some() { + num_rows = reader.num_rows_with_predicate().await; + } else { + num_rows = reader.num_rows().await; + } + num_rows + } + #[allow(unused_variables)] #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -264,6 +273,7 @@ impl ParquetSource { eprintln!("POLARS PREFETCH_SIZE: {}", prefetch_size) } let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async(); + let rows_left_to_read = file_options.slice.unwrap_or((0, usize::MAX)).1; let mut source = ParquetSource { batched_readers: VecDeque::new(), @@ -274,6 +284,7 @@ impl ParquetSource { file_options, iter, paths, + total_files_read: 0, cloud_options, metadata, file_info, @@ -282,6 +293,7 @@ impl ParquetSource { run_async, prefetch_size, predicate, + rows_left_to_read, }; // Already start downloading when we deal with cloud urls. if run_async { @@ -297,41 +309,66 @@ impl ParquetSource { // // It is important we do this for a reasonable batch size, that's why we start this when we // have just 2 readers left. - if self.run_async { - #[cfg(not(feature = "async"))] - panic!("activate 'async' feature"); - - #[cfg(feature = "async")] - { - if self.batched_readers.len() <= 2 || self.batched_readers.is_empty() { - let range = 0..self.prefetch_size - self.batched_readers.len(); - let range = range - .zip(&mut self.iter) - .map(|(_, index)| index) - .collect::>(); - let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); - - let batched_readers = if self.file_options.slice.is_some() { - polars_io::pl_async::get_runtime().block_on_potential_spawn(async { - futures::stream::iter(init_iter) - .then(|x| x) - .try_collect() - .await - })? - } else { - polars_io::pl_async::get_runtime().block_on_potential_spawn(async { - futures::future::try_join_all(init_iter).await - })? - }; - - for r in batched_readers { - self.finish_init_reader(r)?; - } + if self.batched_readers.is_empty() + && self.rows_left_to_read != 0 + && self.total_files_read != self.paths.len() + { + if self.run_async { + let range = 0..self.prefetch_size - self.batched_readers.len(); + + let range = range + .zip(&mut self.iter) + .map(|(_, index)| index) + .collect::>(); + + let readers = range + .clone() + .into_iter() + .map(|index| self.init_reader_async(index, self.rows_left_to_read)); + let mut readers = polars_io::pl_async::get_runtime() + .block_on(async { futures::future::try_join_all(readers).await })?; + + let num_rows_to_read = readers + .iter_mut() + .map(|(reader, _chunk_size)| self.num_rows_per_reader(reader)); + + let num_rows_to_read = polars_io::pl_async::get_runtime() + .block_on(async { futures::future::try_join_all(num_rows_to_read).await })?; + + let num_rows_to_read = num_rows_to_read + .into_iter() + .zip(readers) + .map(|(rows_per_reader, (reader, chunk_size))| { + self.total_files_read += 1; + if self.rows_left_to_read == 0 { + return (reader, chunk_size, 0); + } + self.rows_left_to_read = + self.rows_left_to_read.saturating_sub(rows_per_reader); + (reader, chunk_size, rows_per_reader) + }) + .filter(|(_reader, _chunk_size, rows_per_reader)| *rows_per_reader != 0) + .collect::>(); + + let init_iter = + num_rows_to_read + .into_iter() + .map(|(reader, chunk_size, _num_rows)| { + self.init_batch_reader(reader, chunk_size) + }); + + let batched_readers = + polars_io::pl_async::get_runtime().block_on_potential_spawn(async { + futures::future::try_join_all(init_iter).await + })?; + + for r in batched_readers { + self.finish_init_reader(r)?; + } + } else { + for _ in 0..self.prefetch_size - self.batched_readers.len() { + self.init_next_reader_sync()? } - } - } else { - for _ in 0..self.prefetch_size - self.batched_readers.len() { - self.init_next_reader_sync()? } } Ok(()) @@ -343,7 +380,9 @@ impl Source for ParquetSource { self.prefetch_files()?; let Some(mut reader) = self.batched_readers.pop_front() else { - // If there was no new reader, we depleted all of them and are finished. + if self.total_files_read != self.paths.len() && self.rows_left_to_read != 0 { + return self.get_batches(_context); + } return Ok(SourceResult::Finished); }; diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 1e6f93eac9df..f89c42e61e36 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -138,6 +138,18 @@ where fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.p.as_stats_evaluator() } + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + let _ = to_aexpr(self.p.expression(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s) + } + } + columns + } } PolarsResult::Ok(Arc::new(Wrap { p }) as Arc) @@ -181,6 +193,9 @@ where SinkType::Memory => { Box::new(OrderedSink::new(input_schema.into_owned())) as Box }, + SinkType::Batch { sender } => { + Box::new(BatchSink::new(sender.sender.clone())?) as Box + }, #[allow(unused_variables)] SinkType::File { path, file_type, .. diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index b37b9b445f10..c8cfc0aed632 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = { workspace = true } chrono = { workspace = true, optional = true } chrono-tz = { workspace = true, optional = true } ciborium = { workspace = true, optional = true } +crossbeam-channel = { workspace = true } either = { workspace = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } diff --git a/crates/polars-plan/src/plans/ir/dot.rs b/crates/polars-plan/src/plans/ir/dot.rs index 69e3a69733c5..094fdf2b0e2a 100644 --- a/crates/polars-plan/src/plans/ir/dot.rs +++ b/crates/polars-plan/src/plans/ir/dot.rs @@ -318,6 +318,7 @@ impl<'a> IRDotDisplay<'a> { write_label(f, id, |f| { f.write_str(match payload { SinkType::Memory => "SINK (MEMORY)", + SinkType::Batch { .. } => "SINK (BATCH)", SinkType::File { .. } => "SINK (FILE)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (CLOUD)", diff --git a/crates/polars-plan/src/plans/ir/format.rs b/crates/polars-plan/src/plans/ir/format.rs index 45fa8efa117e..dfd6b55d38c5 100644 --- a/crates/polars-plan/src/plans/ir/format.rs +++ b/crates/polars-plan/src/plans/ir/format.rs @@ -367,6 +367,7 @@ impl<'a> IRDisplay<'a> { Sink { input, payload, .. } => { let name = match payload { SinkType::Memory => "SINK (memory)", + SinkType::Batch { .. } => "SINK (batch)", SinkType::File { .. } => "SINK (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", diff --git a/crates/polars-plan/src/plans/ir/schema.rs b/crates/polars-plan/src/plans/ir/schema.rs index 1586463a8c0f..783470584f48 100644 --- a/crates/polars-plan/src/plans/ir/schema.rs +++ b/crates/polars-plan/src/plans/ir/schema.rs @@ -38,6 +38,7 @@ impl IR { ExtContext { .. } => "ext_context", Sink { payload, .. } => match payload { SinkType::Memory => "sink (memory)", + SinkType::Batch { .. } => "sink (batch)", SinkType::File { .. } => "sink (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "sink (cloud)", diff --git a/crates/polars-plan/src/plans/ir/tree_format.rs b/crates/polars-plan/src/plans/ir/tree_format.rs index 59cf0aa60e52..9b8e6d799112 100644 --- a/crates/polars-plan/src/plans/ir/tree_format.rs +++ b/crates/polars-plan/src/plans/ir/tree_format.rs @@ -358,6 +358,7 @@ impl<'a> TreeFmtNode<'a> { match payload { SinkType::Memory => "SINK (memory)", SinkType::File { .. } => "SINK (file)", + SinkType::Batch { .. } => "SINK (batch)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", }, diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index 078acbae7177..5c7d696a9856 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -1,8 +1,10 @@ +use std::hash::{Hash, Hasher}; #[cfg(feature = "json")] use std::num::NonZeroUsize; use std::path::PathBuf; use bitflags::bitflags; +use crossbeam_channel::{bounded, Sender}; use polars_core::prelude::*; use polars_core::utils::SuperTypeOptions; #[cfg(feature = "csv")] @@ -293,10 +295,41 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +#[derive(Clone, Debug)] +pub struct BatchSender { + pub id: u32, + pub sender: Sender, +} + +impl Default for BatchSender { + fn default() -> Self { + let (sender, _receiver) = bounded(1); + Self { id: 0, sender } + } +} + +impl PartialEq for BatchSender { + fn eq(&self, other: &Self) -> bool { + self.sender.same_channel(&other.sender) + } +} + +impl Eq for BatchSender {} + +impl Hash for BatchSender { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SinkType { Memory, + Batch { + #[serde(skip)] + sender: BatchSender, + }, File { path: Arc, file_type: FileType, diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 9ec82b991f39..0f60194b5ff2 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1903,3 +1903,17 @@ def test_row_index_projection_pushdown_18463( df.select("index").slice(1, 1).collect(), df.collect().select("index").slice(1, 1), ) +def test_skip_full_load_of_rgs_using_predicate( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capfd: Any +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + df = pl.DataFrame( + {"a": pl.arange(0, 10, eager=True), "b": pl.arange(0, 10, eager=True)} + ) + root = tmp_path / "test_rg_skip.parquet" + df.write_parquet(root, use_pyarrow=True, row_group_size=2) + + q = pl.scan_parquet(root, parallel="row_groups") + assert q.filter(pl.col("a").gt(6)).collect().shape == (3, 2) + assert "reduced the number of row groups in pruning by 3" in capfd.readouterr().err