Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(#13608)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 authored and Boruch Chalk committed Sep 5, 2024
1 parent e4746a5 commit 5823b25
Show file tree
Hide file tree
Showing 25 changed files with 493 additions and 142 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ strength_reduce = "0.2"
strum_macros = "0.26"
thiserror = "1"
tokio = "1.26"
tokio-stream = "0.1.15"
tokio-util = "0.7.8"
unicode-reverse = "1.0.8"
url = "2.4"
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,19 @@ impl PhysicalIoExpr for PhysicalIoHelper {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
}

fn columns(&self) -> Vec<PlSmallStr> {
let mut arena: Arena<AExpr> = Arena::new();
let _ = to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s)
}
}
columns
}
}

pub fn phys_expr_to_io_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalIoExpr> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ where
);

ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
let df = chunk_df_for_writing(df, 512 * 512)?;
let df = chunk_df_for_writing(df, 10)?;
let iter = df.iter_chunks(self.compat_level, true);

for batch in iter {
Expand Down
33 changes: 2 additions & 31 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;

use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
Expand All @@ -26,6 +25,7 @@ type DownloadedRowGroup = Vec<(u64, Bytes)>;
type QueuePayload = (usize, DownloadedRowGroup);
type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;

#[derive(Debug, Clone)]
pub struct ParquetObjectStore {
store: PolarsObjectStore,
path: ObjectPath,
Expand Down Expand Up @@ -272,37 +272,8 @@ impl FetchRowGroupsFromObjectStore {
.collect()
});

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];

// TODO!
// Optimize this. Now we partition the predicate columns twice. (later on reading as well)
// I think we must add metadata context where we can cache and amortize the partitioning.
let mut part_md = PartitionedColumnChunkMD::new(rg);
let live = pred.live_variables();
part_md.set_partitions(
live.as_ref()
.map(|vars| vars.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>())
.as_ref(),
);
let should_be_read =
matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(i, Default::default());
}
let mut row_groups = row_groups.iter().cloned().enumerate().collect::<Vec<_>>();

should_be_read.then(|| (i, rg.clone()))
})
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

Expand Down
49 changes: 31 additions & 18 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
}
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
// The metadata belonging to this column
field_md: &[&ColumnChunkMetaData],
Expand Down Expand Up @@ -144,6 +144,7 @@ fn rg_to_dfs(
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetaData,
row_groups: &Vec<RowGroupMetaData>,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand Down Expand Up @@ -177,6 +178,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
file_metadata,
row_groups,
schema,
live_variables,
predicate,
Expand All @@ -196,7 +198,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
slice,
file_metadata,
row_groups,
schema,
predicate,
row_index,
Expand All @@ -211,7 +213,7 @@ fn rg_to_dfs(
row_group_end,
previous_row_count,
slice,
file_metadata,
row_groups,
schema,
predicate,
row_index,
Expand Down Expand Up @@ -247,6 +249,7 @@ fn rg_to_dfs_prefiltered(
row_group_start: usize,
row_group_end: usize,
file_metadata: &FileMetaData,
row_groups: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
live_variables: Vec<PlSmallStr>,
predicate: &dyn PhysicalIoExpr,
Expand Down Expand Up @@ -529,7 +532,7 @@ fn rg_to_dfs_optionally_par_over_columns(
row_group_start: usize,
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetaData,
row_groups: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -540,13 +543,11 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.sum();
let mut n_rows_processed: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum();
let slice_end = slice.0 + slice.1;

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
let md = &row_groups[rg_idx];

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
Expand Down Expand Up @@ -636,7 +637,7 @@ fn rg_to_dfs_par_over_rg(
row_group_end: usize,
previous_row_count: &mut IdxSize,
slice: (usize, usize),
file_metadata: &FileMetaData,
row_groups: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -645,16 +646,16 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
// compute the limits per row group and the row count offsets
let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
let mut row_groups_iter = Vec::with_capacity(row_group_end - row_group_start);

let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.map(|i| row_groups[i].num_rows())
.sum();
let slice_end = slice.0 + slice.1;

for i in row_group_start..row_group_end {
let row_count_start = *previous_row_count;
let rg_md = &file_metadata.row_groups[i];
let rg_md = &row_groups[i];
let rg_slice =
split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end);
*previous_row_count = previous_row_count
Expand All @@ -665,15 +666,15 @@ fn rg_to_dfs_par_over_rg(
continue;
}

row_groups.push((i, rg_md, rg_slice, row_count_start));
row_groups_iter.push((i, rg_md, rg_slice, row_count_start));
}

let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
let part_mds = {
let projected_columns = projected_columns_set(schema, projection);
row_groups
row_groups_iter
.par_iter()
.map(|(_, rg, _, _)| {
let mut ccmd = PartitionedColumnChunkMD::new(rg);
Expand All @@ -683,7 +684,7 @@ fn rg_to_dfs_par_over_rg(
.collect::<Vec<_>>()
};

row_groups
row_groups_iter
.into_par_iter()
.enumerate()
.map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| {
Expand Down Expand Up @@ -747,6 +748,7 @@ pub fn read_parquet<R: MmapBytesReader>(
projection: Option<&[usize]>,
reader_schema: &ArrowSchemaRef,
metadata: Option<FileMetaDataRef>,
row_groups: Option<Vec<RowGroupMetaData>>,
predicate: Option<&dyn PhysicalIoExpr>,
mut parallel: ParallelStrategy,
row_index: Option<RowIndex>,
Expand All @@ -766,6 +768,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
let row_groups = row_groups.unwrap_or_else(|| file_metadata.row_groups.clone());
let n_row_groups = file_metadata.row_groups.len();

// if there are multiple row groups and categorical data
Expand Down Expand Up @@ -820,6 +823,7 @@ pub fn read_parquet<R: MmapBytesReader>(
n_row_groups,
slice,
&file_metadata,
&row_groups,
reader_schema,
predicate,
row_index.clone(),
Expand Down Expand Up @@ -887,7 +891,10 @@ impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
}

impl RowGroupFetcher {
async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
pub async fn fetch_row_groups(
&mut self,
_row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
match self {
RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -947,6 +954,7 @@ pub struct BatchedParquetReader {
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetaDataRef,
row_groups: Vec<RowGroupMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
Expand All @@ -967,6 +975,7 @@ impl BatchedParquetReader {
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetaDataRef,
row_groups: Vec<RowGroupMetaData>,
schema: ArrowSchemaRef,
slice: (usize, usize),
projection: Option<Vec<usize>>,
Expand All @@ -978,7 +987,7 @@ impl BatchedParquetReader {
include_file_path: Option<(PlSmallStr, Arc<str>)>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let n_row_groups = row_groups.len();
let projection = projection
.map(Arc::from)
.unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
Expand All @@ -1004,6 +1013,7 @@ impl BatchedParquetReader {
projection,
schema,
metadata,
row_groups,
row_index,
rows_read: 0,
predicate,
Expand Down Expand Up @@ -1054,7 +1064,7 @@ impl BatchedParquetReader {
self.row_group_offset,
self.row_group_offset + n,
self.slice,
&self.metadata.row_groups,
&self.row_groups,
);

let store = self
Expand All @@ -1070,6 +1080,7 @@ impl BatchedParquetReader {
row_group_range.end,
self.slice,
&self.metadata,
&self.row_groups,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
Expand All @@ -1093,6 +1104,7 @@ impl BatchedParquetReader {
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let row_groups = self.row_groups.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
Expand All @@ -1107,6 +1119,7 @@ impl BatchedParquetReader {
row_group_range.end,
slice,
&metadata,
&row_groups,
&schema,
predicate.as_deref(),
row_index,
Expand Down
Loading

0 comments on commit 5823b25

Please sign in to comment.