Skip to content

Commit

Permalink
perf: Add upfront partitioning in ColumnChunkMetadata (#18584)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Sep 7, 2024
1 parent 80769d2 commit de344d6
Show file tree
Hide file tree
Showing 24 changed files with 252 additions and 343 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 21 additions & 39 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::pl_async::get_runtime;
use crate::predicates::PhysicalIoExpr;

Expand Down Expand Up @@ -178,16 +177,12 @@ async fn download_projection(
let mut ranges = Vec::with_capacity(fields.len());
let mut offsets = Vec::with_capacity(fields.len());
fields.iter().for_each(|name| {
let columns = row_group.columns();

// A single column can have multiple matches (structs).
let iter = columns.iter().filter_map(|meta| {
if meta.descriptor().path_in_schema[0] == name {
let (offset, len) = meta.byte_range();
Some((offset, offset as usize..(offset + len) as usize))
} else {
None
}
let iter = row_group.columns_under_root_iter(name).map(|meta| {
let byte_range = meta.byte_range();
let offset = byte_range.start;
let byte_range = byte_range.start as usize..byte_range.end as usize;
(offset, byte_range)
});

for (offset, range) in iter {
Expand Down Expand Up @@ -215,33 +210,30 @@ async fn download_row_group(
sender: QueueSend,
rg_index: usize,
) -> bool {
if rg.columns().is_empty() {
if rg.n_columns() == 0 {
return true;
}
let offset = rg.columns().iter().map(|c| c.byte_range().0).min().unwrap();
let (max_offset, len) = rg
.columns()
.iter()
.map(|c| c.byte_range())
.max_by_key(|k| k.0)
.unwrap();

let full_byte_range = rg.full_byte_range();
let full_byte_range = full_byte_range.start as usize..full_byte_range.end as usize;

let result = async_reader
.get_range(offset as usize, (max_offset - offset + len) as usize)
.get_range(
full_byte_range.start,
full_byte_range.end - full_byte_range.start,
)
.await
.map(|bytes| {
let base_offset = offset;
(
rg_index,
rg.columns()
.iter()
.map(|c| {
let (offset, len) = c.byte_range();
let slice_offset = offset - base_offset;

rg.byte_ranges_iter()
.map(|range| {
(
offset,
bytes.slice(slice_offset as usize..(slice_offset + len) as usize),
range.start,
bytes.slice(
range.start as usize - full_byte_range.start
..range.end as usize - full_byte_range.start,
),
)
})
.collect::<DownloadedRowGroup>(),
Expand Down Expand Up @@ -279,18 +271,8 @@ impl FetchRowGroupsFromObjectStore {
.filter_map(|i| {
let rg = &row_groups[i];

// TODO!
// Optimize this. Now we partition the predicate columns twice. (later on reading as well)
// I think we must add metadata context where we can cache and amortize the partitioning.
let mut part_md = PartitionedColumnChunkMD::new(rg);
let live = pred.live_variables();
part_md.set_partitions(
live.as_ref()
.map(|vars| vars.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>())
.as_ref(),
);
let should_be_read =
matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true));
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
Expand Down
57 changes: 0 additions & 57 deletions crates/polars-io/src/parquet/read/metadata.rs

This file was deleted.

23 changes: 13 additions & 10 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bytes::Bytes;
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::read::{
column_iter_to_arrays, BasicDecompressor, ColumnChunkMetaData, Filter, PageReader,
column_iter_to_arrays, BasicDecompressor, ColumnChunkMetadata, Filter, PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};

Expand All @@ -31,8 +31,8 @@ pub enum ColumnStore {
/// For cloud files the relevant memory regions should have been prefetched.
pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
field_columns: &'a [&ColumnChunkMetaData],
) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> {
field_columns: &'a [&ColumnChunkMetadata],
) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {
field_columns
.iter()
.map(|meta| _mmap_single_column(store, meta))
Expand All @@ -41,16 +41,19 @@ pub(super) fn mmap_columns<'a>(

fn _mmap_single_column<'a>(
store: &'a ColumnStore,
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, MemSlice) {
let (start, len) = meta.byte_range();
meta: &'a ColumnChunkMetadata,
) -> (&'a ColumnChunkMetadata, MemSlice) {
let byte_range = meta.byte_range();
let chunk = match store {
ColumnStore::Local(mem_slice) => mem_slice.slice((start as usize)..(start + len) as usize),
ColumnStore::Local(mem_slice) => {
mem_slice.slice(byte_range.start as usize..byte_range.end as usize)
},
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
let entry = fetched.get(&start).unwrap_or_else(|| {
let entry = fetched.get(&byte_range.start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
"mmap_columns: column with start {} must be prefetched in ColumnStore.\n",
byte_range.start
)
});
MemSlice::from_bytes(entry.clone())
Expand All @@ -62,7 +65,7 @@ fn _mmap_single_column<'a>(
// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows us to memory map
pub fn to_deserializer(
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
columns: Vec<(&ColumnChunkMetadata, MemSlice)>,
field: Field,
filter: Option<Filter>,
) -> PolarsResult<Box<dyn Array>> {
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#[cfg(feature = "cloud")]
mod async_impl;
mod metadata;
mod mmap;
mod options;
mod predicates;
Expand All @@ -40,7 +39,6 @@ pub use reader::{BatchedParquetReader, ParquetReader};
pub use utils::materialize_empty_df;

pub mod _internal {
pub use super::metadata::PartitionedColumnChunkMD;
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
}
24 changes: 13 additions & 11 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetaData;

use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};

impl ColumnStats {
Expand All @@ -17,18 +17,20 @@ impl ColumnStats {

/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
part_md: &PartitionedColumnChunkMD,
md: &RowGroupMetaData,
schema: &ArrowSchema,
) -> PolarsResult<Option<BatchStats>> {
// TODO! fix this performance. This is a full sequential scan.
let stats = schema
.iter_values()
.map(|field| match part_md.get_partitions(&field.name) {
Some(md) => {
let st = deserialize(field, &md)?;
Ok(ColumnStats::from_arrow_stats(st, field))
},
None => Ok(ColumnStats::new(field.into(), None, None, None)),
.map(|field| {
let iter = md.columns_under_root_iter(&field.name);

Ok(if iter.len() == 0 {
ColumnStats::new(field.into(), None, None, None)
} else {
ColumnStats::from_arrow_stats(deserialize(field, iter)?, field)
})
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand All @@ -39,18 +41,18 @@ pub(crate) fn collect_statistics(
Ok(Some(BatchStats::new(
Arc::new(Schema::from_arrow_schema(schema)),
stats,
Some(part_md.num_rows()),
Some(md.num_rows()),
)))
}

pub fn read_this_row_group(
predicate: Option<&dyn PhysicalIoExpr>,
part_md: &PartitionedColumnChunkMD,
md: &RowGroupMetaData,
schema: &ArrowSchema,
) -> PolarsResult<bool> {
if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(part_md, schema)? {
if let Some(stats) = collect_statistics(md, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
Loading

0 comments on commit de344d6

Please sign in to comment.