Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(pola-rs…
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 authored and Boruch Chalk committed Nov 12, 2024
1 parent 4fb7cd1 commit cd9f21b
Show file tree
Hide file tree
Showing 26 changed files with 495 additions and 129 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ strength_reduce = "0.2"
strum_macros = "0.26"
thiserror = "1"
tokio = "1.26"
tokio-stream = "0.1.15"
tokio-util = "0.7.8"
unicode-reverse = "1.0.8"
url = "2.4"
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,19 @@ impl PhysicalIoExpr for PhysicalIoHelper {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
}

fn columns(&self) -> Vec<PlSmallStr> {
let mut arena: Arena<AExpr> = Arena::new();
let _ = to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s)
}
}
columns
}
}

pub fn phys_expr_to_io_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalIoExpr> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ where
);

ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
let df = chunk_df_for_writing(df, 512 * 512)?;
let df = chunk_df_for_writing(df, 10)?;
let iter = df.iter_chunks(self.compat_level, true);

for batch in iter {
Expand Down
23 changes: 2 additions & 21 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;

use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
Expand All @@ -25,6 +24,7 @@ type DownloadedRowGroup = PlHashMap<u64, Bytes>;
type QueuePayload = (usize, DownloadedRowGroup);
type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;

#[derive(Debug, Clone)]
pub struct ParquetObjectStore {
store: PolarsObjectStore,
path: ObjectPath,
Expand Down Expand Up @@ -242,27 +242,8 @@ impl FetchRowGroupsFromObjectStore {
.collect()
});

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();
let mut row_groups = row_groups.iter().cloned().enumerate().collect::<Vec<_>>();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];

let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(i, Default::default());
}

should_be_read.then(|| (i, rg.clone()))
})
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

Expand Down
59 changes: 38 additions & 21 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
sorting_map
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
// The metadata belonging to this column
field_md: &[&ColumnChunkMetadata],
Expand Down Expand Up @@ -204,6 +204,7 @@ fn rg_to_dfs(
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetadata,
row_groups: &Vec<RowGroupMetadata>,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand Down Expand Up @@ -243,6 +244,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
file_metadata,
row_groups,
schema,
live_variables,
predicate,
Expand All @@ -262,7 +264,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
slice,
file_metadata,
row_groups,
schema,
predicate,
row_index,
Expand All @@ -277,7 +279,7 @@ fn rg_to_dfs(
row_group_end,
previous_row_count,
slice,
file_metadata,
row_groups,
schema,
predicate,
row_index,
Expand Down Expand Up @@ -307,6 +309,7 @@ fn rg_to_dfs_prefiltered(
row_group_start: usize,
row_group_end: usize,
file_metadata: &FileMetadata,
row_groups: &[RowGroupMetadata],
schema: &ArrowSchemaRef,
live_variables: Vec<PlSmallStr>,
predicate: &dyn PhysicalIoExpr,
Expand All @@ -324,7 +327,7 @@ fn rg_to_dfs_prefiltered(
None => Vec::new(),
Some(_) => (row_group_start..row_group_end)
.map(|index| {
let md = &file_metadata.row_groups[index];
let md = &row_groups[index];

let current_offset = row_offset;
let current_row_count = md.num_rows() as IdxSize;
Expand Down Expand Up @@ -379,7 +382,7 @@ fn rg_to_dfs_prefiltered(
(row_group_start..row_group_end)
.into_par_iter()
.map(|rg_idx| {
let md = &file_metadata.row_groups[rg_idx];
let md = &row_groups[rg_idx];

if use_statistics {
match read_this_row_group(Some(predicate), md, schema) {
Expand Down Expand Up @@ -615,7 +618,7 @@ fn rg_to_dfs_optionally_par_over_columns(
row_group_start: usize,
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetadata,
row_groups: &[RowGroupMetadata],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -626,20 +629,18 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.sum();
let mut n_rows_processed: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum();
let slice_end = slice.0 + slice.1;

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
let md = &row_groups[rg_idx];

let rg_slice =
split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
&& !read_this_row_group(predicate, &row_groups[rg_idx], schema)?
{
*previous_row_count += rg_slice.1 as IdxSize;
continue;
Expand Down Expand Up @@ -750,7 +751,7 @@ fn rg_to_dfs_par_over_rg(
row_group_end: usize,
previous_row_count: &mut IdxSize,
slice: (usize, usize),
file_metadata: &FileMetadata,
row_groups: &[RowGroupMetadata],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -759,16 +760,16 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
// compute the limits per row group and the row count offsets
let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
let mut row_groups_iter = Vec::with_capacity(row_group_end - row_group_start);

let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.map(|i| row_groups[i].num_rows())
.sum();
let slice_end = slice.0 + slice.1;

for i in row_group_start..row_group_end {
let row_count_start = *previous_row_count;
let rg_md = &file_metadata.row_groups[i];
let rg_md = &row_groups[i];
let rg_slice =
split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end);
*previous_row_count = previous_row_count
Expand All @@ -779,15 +780,19 @@ fn rg_to_dfs_par_over_rg(
continue;
}

row_groups.push((rg_md, rg_slice, row_count_start));
row_groups_iter.push((i, rg_md, rg_slice, row_count_start));
}

let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
row_groups

row_groups_iter
.into_par_iter()
.map(|(md, slice, row_count_start)| {
.enumerate()
.map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| {
let md = &row_groups[iter_idx];

if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? {
return Ok(None);
}
Expand Down Expand Up @@ -859,6 +864,7 @@ pub fn read_parquet<R: MmapBytesReader>(
projection: Option<&[usize]>,
reader_schema: &ArrowSchemaRef,
metadata: Option<FileMetadataRef>,
row_groups: Option<Vec<RowGroupMetadata>>,
predicate: Option<&dyn PhysicalIoExpr>,
mut parallel: ParallelStrategy,
row_index: Option<RowIndex>,
Expand All @@ -878,6 +884,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
let row_groups = row_groups.unwrap_or_else(|| file_metadata.row_groups.clone());
let n_row_groups = file_metadata.row_groups.len();

// if there are multiple row groups and categorical data
Expand Down Expand Up @@ -942,6 +949,7 @@ pub fn read_parquet<R: MmapBytesReader>(
n_row_groups,
slice,
&file_metadata,
&row_groups,
reader_schema,
predicate,
row_index.clone(),
Expand Down Expand Up @@ -1007,7 +1015,10 @@ impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
}

impl RowGroupFetcher {
async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
pub async fn fetch_row_groups(
&mut self,
_row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
match self {
RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -1067,6 +1078,7 @@ pub struct BatchedParquetReader {
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetadataRef,
row_groups: Vec<RowGroupMetadata>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
Expand All @@ -1087,6 +1099,7 @@ impl BatchedParquetReader {
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetadataRef,
row_groups: Vec<RowGroupMetadata>,
schema: ArrowSchemaRef,
slice: (usize, usize),
projection: Option<Vec<usize>>,
Expand All @@ -1098,7 +1111,7 @@ impl BatchedParquetReader {
include_file_path: Option<(PlSmallStr, Arc<str>)>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let n_row_groups = row_groups.len();
let projection = projection
.map(Arc::from)
.unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
Expand All @@ -1124,6 +1137,7 @@ impl BatchedParquetReader {
projection,
schema,
metadata,
row_groups,
row_index,
rows_read: 0,
predicate,
Expand Down Expand Up @@ -1174,7 +1188,7 @@ impl BatchedParquetReader {
self.row_group_offset,
self.row_group_offset + n,
self.slice,
&self.metadata.row_groups,
&self.row_groups,
);

let store = self
Expand All @@ -1190,6 +1204,7 @@ impl BatchedParquetReader {
row_group_range.end,
self.slice,
&self.metadata,
&self.row_groups,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
Expand All @@ -1213,6 +1228,7 @@ impl BatchedParquetReader {
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let row_groups = self.row_groups.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
Expand All @@ -1227,6 +1243,7 @@ impl BatchedParquetReader {
row_group_range.end,
slice,
&metadata,
&row_groups,
&schema,
predicate.as_deref(),
row_index,
Expand Down
Loading

0 comments on commit cd9f21b

Please sign in to comment.