From 3b29c8213522e1fcf5476abb836c681c5b25d394 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 15 Aug 2022 21:42:01 +0200 Subject: [PATCH] Exposed parquet indexed page filtering to `FileReader` (#1216) --- benches/read_parquet.rs | 2 +- examples/parquet_read.rs | 2 +- examples/parquet_read_async.rs | 11 +- src/io/ipc/read/mod.rs | 1 + src/io/parquet/read/file.rs | 46 ++- src/io/parquet/read/indexes/binary.rs | 6 +- src/io/parquet/read/indexes/boolean.rs | 6 +- .../parquet/read/indexes/fixed_len_binary.rs | 6 +- src/io/parquet/read/indexes/mod.rs | 378 ++++++++++++++---- src/io/parquet/read/indexes/primitive.rs | 24 +- src/io/parquet/read/mod.rs | 3 +- src/io/parquet/read/row_group.rs | 142 +++++-- src/io/parquet/read/statistics/dictionary.rs | 3 +- tests/it/io/parquet/mod.rs | 20 +- tests/it/io/parquet/read.rs | 6 +- tests/it/io/parquet/read_indexes.rs | 68 +++- tests/it/io/parquet/write_async.rs | 7 +- 17 files changed, 546 insertions(+), 185 deletions(-) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index cce4875a4de..0f445fbb01b 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -41,7 +41,7 @@ fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> { let schema = schema.filter(|index, _| index == column); - let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None); + let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None, None); for maybe_chunk in reader { let columns = maybe_chunk?; diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index e0e5a220e8a..a296c73c249 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -35,7 +35,7 @@ fn main() -> Result<(), Error> { .collect(); // we can then read the row groups into chunks - let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None); + let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None); let start = SystemTime::now(); for maybe_chunk in chunks { diff --git a/examples/parquet_read_async.rs b/examples/parquet_read_async.rs index 3f1abc4faf6..8056b853cca 100644 --- a/examples/parquet_read_async.rs +++ b/examples/parquet_read_async.rs @@ -35,8 +35,15 @@ async fn main() -> Result<()> { for row_group in &metadata.row_groups { // A row group is consumed in two steps: the first step is to read the (compressed) // columns into memory, which is IO-bounded. - let column_chunks = - read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?; + let column_chunks = read::read_columns_many_async( + factory, + row_group, + schema.fields.clone(), + None, + None, + None, + ) + .await?; // the second step is to iterate over the columns in chunks. // this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 88f1deaf3a7..1da43b0c2bd 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -29,6 +29,7 @@ pub mod stream_async; pub mod file_async; pub(crate) use common::first_dict_field; +#[cfg(feature = "io_flight")] pub(crate) use common::{read_dictionary, read_record_batch}; pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata}; pub use reader::FileReader; diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index ff8944d38cf..391544685a6 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -1,5 +1,7 @@ use std::io::{Read, Seek}; +use parquet2::indexes::FilteredPage; + use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; @@ -29,8 +31,10 @@ impl FileReader { schema: Schema, chunk_size: Option, limit: Option, + page_indexes: Option>>>>, ) -> Self { - let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit); + let row_groups = + RowGroupReader::new(reader, schema, row_groups, chunk_size, limit, page_indexes); Self { row_groups, @@ -105,9 +109,10 @@ impl Iterator for FileReader { pub struct RowGroupReader { reader: R, schema: Schema, - row_groups: std::iter::Enumerate>, + row_groups: std::vec::IntoIter, chunk_size: Option, remaining_rows: usize, + page_indexes: Option>>>>, } impl RowGroupReader { @@ -118,13 +123,18 @@ impl RowGroupReader { row_groups: Vec, chunk_size: Option, limit: Option, + page_indexes: Option>>>>, ) -> Self { + if let Some(pages) = &page_indexes { + assert_eq!(pages.len(), row_groups.len()) + } Self { reader, schema, - row_groups: row_groups.into_iter().enumerate(), + row_groups: row_groups.into_iter(), chunk_size, remaining_rows: limit.unwrap_or(usize::MAX), + page_indexes: page_indexes.map(|pages| pages.into_iter()), } } @@ -138,26 +148,42 @@ impl RowGroupReader { return Ok(None); } - let row_group = if let Some((_, row_group)) = self.row_groups.next() { + let row_group = if let Some(row_group) = self.row_groups.next() { row_group } else { return Ok(None); }; + let pages = self.page_indexes.as_mut().and_then(|iter| iter.next()); + + // the number of rows depends on whether indexes are selected or not. + let num_rows = pages + .as_ref() + .map(|x| { + // first field, first column within that field + x[0][0] + .iter() + .map(|page| { + page.selected_rows + .iter() + .map(|interval| interval.length) + .sum::() + }) + .sum() + }) + .unwrap_or_else(|| row_group.num_rows()); + let column_chunks = read_columns_many( &mut self.reader, &row_group, self.schema.fields.clone(), self.chunk_size, Some(self.remaining_rows), + pages, )?; - let result = RowGroupDeserializer::new( - column_chunks, - row_group.num_rows(), - Some(self.remaining_rows), - ); - self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows()); + let result = RowGroupDeserializer::new(column_chunks, num_rows, Some(self.remaining_rows)); + self.remaining_rows = self.remaining_rows.saturating_sub(num_rows); Ok(Some(result)) } } diff --git a/src/io/parquet/read/indexes/binary.rs b/src/io/parquet/read/indexes/binary.rs index 608a0c065bd..2f7e40e3208 100644 --- a/src/io/parquet/read/indexes/binary.rs +++ b/src/io/parquet/read/indexes/binary.rs @@ -7,13 +7,13 @@ use crate::{ trusted_len::TrustedLen, }; -use super::ColumnIndex; +use super::ColumnPageStatistics; pub fn deserialize( indexes: &[PageIndex>], data_type: &DataType, -) -> Result { - Ok(ColumnIndex { +) -> Result { + Ok(ColumnPageStatistics { min: deserialize_binary_iter(indexes.iter().map(|index| index.min.as_ref()), data_type)?, max: deserialize_binary_iter(indexes.iter().map(|index| index.max.as_ref()), data_type)?, null_count: PrimitiveArray::from_trusted_len_iter( diff --git a/src/io/parquet/read/indexes/boolean.rs b/src/io/parquet/read/indexes/boolean.rs index 501c9e63a64..f26a2f1b7ab 100644 --- a/src/io/parquet/read/indexes/boolean.rs +++ b/src/io/parquet/read/indexes/boolean.rs @@ -2,10 +2,10 @@ use parquet2::indexes::PageIndex; use crate::array::{BooleanArray, PrimitiveArray}; -use super::ColumnIndex; +use super::ColumnPageStatistics; -pub fn deserialize(indexes: &[PageIndex]) -> ColumnIndex { - ColumnIndex { +pub fn deserialize(indexes: &[PageIndex]) -> ColumnPageStatistics { + ColumnPageStatistics { min: Box::new(BooleanArray::from_trusted_len_iter( indexes.iter().map(|index| index.min), )), diff --git a/src/io/parquet/read/indexes/fixed_len_binary.rs b/src/io/parquet/read/indexes/fixed_len_binary.rs index c4499814d12..12ef77667ba 100644 --- a/src/io/parquet/read/indexes/fixed_len_binary.rs +++ b/src/io/parquet/read/indexes/fixed_len_binary.rs @@ -6,10 +6,10 @@ use crate::{ trusted_len::TrustedLen, }; -use super::ColumnIndex; +use super::ColumnPageStatistics; -pub fn deserialize(indexes: &[PageIndex>], data_type: DataType) -> ColumnIndex { - ColumnIndex { +pub fn deserialize(indexes: &[PageIndex>], data_type: DataType) -> ColumnPageStatistics { + ColumnPageStatistics { min: deserialize_binary_iter( indexes.iter().map(|index| index.min.as_ref()), data_type.clone(), diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs index 02aa7c75ca1..e890f27d69d 100644 --- a/src/io/parquet/read/indexes/mod.rs +++ b/src/io/parquet/read/indexes/mod.rs @@ -1,8 +1,11 @@ +//! API to perform page-level filtering (also known as indexes) +use parquet2::error::Error as ParquetError; use parquet2::indexes::{ - BooleanIndex, ByteIndex, FixedLenByteIndex, Index as ParquetIndex, NativeIndex, + select_pages, BooleanIndex, ByteIndex, FixedLenByteIndex, Index as ParquetIndex, NativeIndex, + PageLocation, }; -use parquet2::metadata::ColumnChunkMetaData; -use parquet2::read::read_columns_indexes as _read_columns_indexes; +use parquet2::metadata::{ColumnChunkMetaData, RowGroupMetaData}; +use parquet2::read::{read_columns_indexes as _read_columns_indexes, read_pages_locations}; use parquet2::schema::types::PhysicalType as ParquetPhysicalType; mod binary; @@ -10,132 +13,339 @@ mod boolean; mod fixed_len_binary; mod primitive; +use std::collections::VecDeque; use std::io::{Read, Seek}; -use crate::datatypes::Field; +use crate::array::UInt64Array; +use crate::datatypes::{Field, PrimitiveType}; use crate::{ - array::{Array, UInt64Array}, - datatypes::DataType, + array::Array, + datatypes::{DataType, PhysicalType}, error::Error, }; -/// Arrow-deserialized [`ColumnIndex`] containing the minimum and maximum value -/// of every page from the column. -/// # Invariants -/// The minimum and maximum are guaranteed to have the same logical type. +use super::get_field_pages; + +pub use parquet2::indexes::{FilteredPage, Interval}; + +/// Page statistics of an Arrow field. +#[derive(Debug, PartialEq)] +pub enum FieldPageStatistics { + /// Variant used for fields with a single parquet column (e.g. primitives, dictionaries, list) + Single(ColumnPageStatistics), + /// Variant used for fields with multiple parquet columns (e.g. Struct, Map) + Multiple(Vec), +} + +impl From for FieldPageStatistics { + fn from(column: ColumnPageStatistics) -> Self { + Self::Single(column) + } +} + +/// [`ColumnPageStatistics`] contains the minimum, maximum, and null_count +/// of each page of a parquet column, as an [`Array`]. +/// This struct has the following invariants: +/// * `min`, `max` and `null_count` have the same length (equal to the number of pages in the column) +/// * `min`, `max` and `null_count` are guaranteed to be non-null +/// * `min` and `max` have the same logical type #[derive(Debug, PartialEq)] -pub struct ColumnIndex { +pub struct ColumnPageStatistics { /// The minimum values in the pages pub min: Box, /// The maximum values in the pages pub max: Box, - /// The number of null values in the pages + /// The number of null values in the pages. pub null_count: UInt64Array, } -impl ColumnIndex { - /// The [`DataType`] of the column index. - pub fn data_type(&self) -> &DataType { - self.min.data_type() - } -} - /// Given a sequence of [`ParquetIndex`] representing the page indexes of each column in the -/// parquet file, returns the page-level statistics as arrow's arrays, as a vector of [`ColumnIndex`]. +/// parquet file, returns the page-level statistics as a [`FieldPageStatistics`]. /// /// This function maps timestamps, decimal types, etc. accordingly. /// # Implementation -/// This function is CPU-bounded but `O(P)` where `P` is the total number of pages in all columns. +/// This function is CPU-bounded `O(P)` where `P` is the total number of pages on all columns. /// # Error /// This function errors iff the value is not deserializable to arrow (e.g. invalid utf-8) fn deserialize( - indexes: &[Box], - data_types: Vec, -) -> Result, Error> { - indexes - .iter() - .zip(data_types.into_iter()) - .map(|(index, data_type)| match index.physical_type() { - ParquetPhysicalType::Boolean => { - let index = index.as_any().downcast_ref::().unwrap(); - Ok(boolean::deserialize(&index.indexes)) + indexes: &mut VecDeque<&Box>, + data_type: DataType, +) -> Result { + match data_type.to_physical_type() { + PhysicalType::Boolean => { + let index = indexes + .pop_front() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + Ok(boolean::deserialize(&index.indexes).into()) + } + PhysicalType::Primitive(PrimitiveType::Int128) => { + let index = indexes.pop_front().unwrap(); + match index.physical_type() { + ParquetPhysicalType::Int32 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_i32(&index.indexes, data_type).into()) + } + parquet2::schema::types::PhysicalType::Int64 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok( + primitive::deserialize_i64( + &index.indexes, + &index.primitive_type, + data_type, + ) + .into(), + ) + } + parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => { + let index = index.as_any().downcast_ref::().unwrap(); + Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into()) + } + other => Err(Error::nyi(format!( + "Deserialize {other:?} to arrow's int64" + ))), } - ParquetPhysicalType::Int32 => { - let index = index.as_any().downcast_ref::>().unwrap(); - Ok(primitive::deserialize_i32(&index.indexes, data_type)) + } + PhysicalType::Primitive(PrimitiveType::UInt8) + | PhysicalType::Primitive(PrimitiveType::UInt16) + | PhysicalType::Primitive(PrimitiveType::UInt32) + | PhysicalType::Primitive(PrimitiveType::Int32) => { + let index = indexes + .pop_front() + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(primitive::deserialize_i32(&index.indexes, data_type).into()) + } + PhysicalType::Primitive(PrimitiveType::Int64) => { + let index = indexes.pop_front().unwrap(); + match index.physical_type() { + ParquetPhysicalType::Int64 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok( + primitive::deserialize_i64( + &index.indexes, + &index.primitive_type, + data_type, + ) + .into(), + ) + } + parquet2::schema::types::PhysicalType::Int96 => { + let index = index + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(primitive::deserialize_i96(&index.indexes, data_type).into()) + } + other => Err(Error::nyi(format!( + "Deserialize {other:?} to arrow's int64" + ))), } - ParquetPhysicalType::Int64 => { - let index = index.as_any().downcast_ref::>().unwrap(); - Ok(primitive::deserialize_i64( - &index.indexes, - &index.primitive_type, - data_type, - )) + } + PhysicalType::Primitive(PrimitiveType::Float32) => { + let index = indexes + .pop_front() + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(primitive::deserialize_id(&index.indexes, data_type).into()) + } + PhysicalType::Primitive(PrimitiveType::Float64) => { + let index = indexes + .pop_front() + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(primitive::deserialize_id(&index.indexes, data_type).into()) + } + PhysicalType::Binary + | PhysicalType::LargeBinary + | PhysicalType::Utf8 + | PhysicalType::LargeUtf8 => { + let index = indexes + .pop_front() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + binary::deserialize(&index.indexes, &data_type).map(|x| x.into()) + } + PhysicalType::FixedSizeBinary => { + let index = indexes + .pop_front() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into()) + } + PhysicalType::Dictionary(_) => { + if let DataType::Dictionary(_, inner, _) = data_type.to_logical_type() { + deserialize(indexes, (**inner).clone()) + } else { + unreachable!() } - ParquetPhysicalType::Int96 => { - let index = index - .as_any() - .downcast_ref::>() - .unwrap(); - Ok(primitive::deserialize_i96(&index.indexes, data_type)) + } + PhysicalType::List => { + if let DataType::List(inner) = data_type.to_logical_type() { + deserialize(indexes, inner.data_type.clone()) + } else { + unreachable!() } - ParquetPhysicalType::Float => { - let index = index.as_any().downcast_ref::>().unwrap(); - Ok(primitive::deserialize_id(&index.indexes, data_type)) + } + PhysicalType::LargeList => { + if let DataType::LargeList(inner) = data_type.to_logical_type() { + deserialize(indexes, inner.data_type.clone()) + } else { + unreachable!() } - ParquetPhysicalType::Double => { - let index = index.as_any().downcast_ref::>().unwrap(); - Ok(primitive::deserialize_id(&index.indexes, data_type)) - } - ParquetPhysicalType::ByteArray => { - let index = index.as_any().downcast_ref::().unwrap(); - binary::deserialize(&index.indexes, &data_type) - } - ParquetPhysicalType::FixedLenByteArray(_) => { - let index = index.as_any().downcast_ref::().unwrap(); - Ok(fixed_len_binary::deserialize(&index.indexes, data_type)) - } - }) - .collect() -} + } + PhysicalType::Struct => { + let children_fields = if let DataType::Struct(children) = data_type.to_logical_type() { + children + } else { + unreachable!() + }; + let children = children_fields + .iter() + .map(|child| deserialize(indexes, child.data_type.clone())) + .collect::, Error>>()?; -// recursive function to get the corresponding leaf data_types corresponding to the -// parquet columns -fn populate_dt(data_type: &DataType, container: &mut Vec) { - match data_type.to_logical_type() { - DataType::List(inner) => populate_dt(&inner.data_type, container), - DataType::LargeList(inner) => populate_dt(&inner.data_type, container), - DataType::Dictionary(_, inner, _) => populate_dt(inner, container), - DataType::Struct(fields) => fields - .iter() - .for_each(|f| populate_dt(&f.data_type, container)), - _ => container.push(data_type.clone()), + Ok(FieldPageStatistics::Multiple(children)) + } + + other => Err(Error::nyi(format!( + "Deserialize into arrow's {other:?} page index" + ))), } } +/// Checks whether the row group have page index information (page statistics) +pub fn has_indexes(row_group: &RowGroupMetaData) -> bool { + row_group + .columns() + .iter() + .all(|chunk| chunk.column_chunk().column_index_offset.is_some()) +} + /// Reads the column indexes from the reader assuming a valid set of derived Arrow fields /// for all parquet the columns in the file. /// +/// It returns one [`FieldPageStatistics`] per field in `fields` +/// /// This function is expected to be used to filter out parquet pages. /// /// # Implementation /// This function is IO-bounded and calls `reader.read_exact` exactly once. /// # Error /// Errors iff the indexes can't be read or their deserialization to arrow is incorrect (e.g. invalid utf-8) -pub fn read_columns_indexes( +fn read_columns_indexes( reader: &mut R, chunks: &[ColumnChunkMetaData], fields: &[Field], -) -> Result, Error> { +) -> Result, Error> { let indexes = _read_columns_indexes(reader, chunks)?; - // map arrow fields to the corresponding columns in parquet taking into account - // that fields may be nested but parquet column indexes are only leaf columns - let mut data_types = vec![]; fields .iter() - .map(|f| &f.data_type) - .for_each(|d| populate_dt(d, &mut data_types)); + .map(|field| { + let indexes = get_field_pages(chunks, &indexes, &field.name); + let mut indexes = indexes.into_iter().collect(); - deserialize(&indexes, data_types) + deserialize(&mut indexes, field.data_type.clone()) + }) + .collect() +} + +/// Returns the set of (row) intervals of the pages. +fn compute_page_row_intervals( + locations: &[PageLocation], + num_rows: usize, +) -> Result, ParquetError> { + if locations.is_empty() { + return Ok(vec![]); + }; + + let last = (|| { + let start: usize = locations.last().unwrap().first_row_index.try_into()?; + let length = num_rows - start; + Result::<_, ParquetError>::Ok(Interval::new(start, length)) + })(); + + let pages_lengths = locations + .windows(2) + .map(|x| { + let start = usize::try_from(x[0].first_row_index)?; + let length = usize::try_from(x[1].first_row_index - x[0].first_row_index)?; + Ok(Interval::new(start, length)) + }) + .chain(std::iter::once(last)); + pages_lengths.collect() +} + +/// Reads all page locations and index locations (IO-bounded) and uses `predicate` to compute +/// the set of [`FilteredPage`] that fulfill the predicate. +/// +/// The non-trivial argument of this function is `predicate`, that controls which pages are selected. +/// Its signature contains 2 arguments: +/// * 0th argument (indexes): contains one [`ColumnPageStatistics`] (page statistics) per field. +/// Use it to evaluate the predicate against +/// * 1th argument (intervals): contains one [`Vec>`] (row positions) per field. +/// For each field, the outermost vector corresponds to each parquet column: +/// a primitive field contains 1 column, a struct field with 2 primitive fields contain 2 columns. +/// The inner `Vec` contains one [`Interval`] per page: its length equals the length of [`ColumnPageStatistics`]. +/// It returns a single [`Vec`] denoting the set of intervals that the predicate selects (over all columns). +/// +/// This returns one item per `field`. For each field, there is one item per column (for non-nested types it returns one column) +/// and finally [`Vec`], that corresponds to the set of selected pages. +pub fn read_filtered_pages< + R: Read + Seek, + F: Fn(&[FieldPageStatistics], &[Vec>]) -> Vec, +>( + reader: &mut R, + row_group: &RowGroupMetaData, + fields: &[Field], + predicate: F, + //is_intersection: bool, +) -> Result>>, Error> { + let num_rows = row_group.num_rows(); + + // one vec per column + let locations = read_pages_locations(reader, row_group.columns())?; + // one Vec> per field (non-nested contain a single entry on the first column) + let locations = fields + .iter() + .map(|field| get_field_pages(row_group.columns(), &locations, &field.name)) + .collect::>(); + + // one ColumnPageStatistics per field + let indexes = read_columns_indexes(reader, row_group.columns(), fields)?; + + let intervals = locations + .iter() + .map(|locations| { + locations + .iter() + .map(|locations| Ok(compute_page_row_intervals(locations, num_rows)?)) + .collect::, Error>>() + }) + .collect::, Error>>()?; + + let intervals = predicate(&indexes, &intervals); + + locations + .into_iter() + .map(|locations| { + locations + .into_iter() + .map(|locations| Ok(select_pages(&intervals, locations, num_rows)?)) + .collect::, Error>>() + }) + .collect() } diff --git a/src/io/parquet/read/indexes/primitive.rs b/src/io/parquet/read/indexes/primitive.rs index 103d67bbcf1..636661f5223 100644 --- a/src/io/parquet/read/indexes/primitive.rs +++ b/src/io/parquet/read/indexes/primitive.rs @@ -7,7 +7,7 @@ use crate::datatypes::{DataType, TimeUnit}; use crate::trusted_len::TrustedLen; use crate::types::NativeType; -use super::ColumnIndex; +use super::ColumnPageStatistics; #[inline] fn deserialize_int32>>( @@ -143,8 +143,8 @@ fn deserialize_id_s>>( Box::new(PrimitiveArray::::from_trusted_len_iter(iter).to(data_type)) } -pub fn deserialize_i32(indexes: &[PageIndex], data_type: DataType) -> ColumnIndex { - ColumnIndex { +pub fn deserialize_i32(indexes: &[PageIndex], data_type: DataType) -> ColumnPageStatistics { + ColumnPageStatistics { min: deserialize_int32(indexes.iter().map(|index| index.min), data_type.clone()), max: deserialize_int32(indexes.iter().map(|index| index.max), data_type), null_count: PrimitiveArray::from_trusted_len_iter( @@ -159,8 +159,8 @@ pub fn deserialize_i64( indexes: &[PageIndex], primitive_type: &PrimitiveType, data_type: DataType, -) -> ColumnIndex { - ColumnIndex { +) -> ColumnPageStatistics { + ColumnPageStatistics { min: deserialize_int64( indexes.iter().map(|index| index.min), primitive_type, @@ -179,8 +179,11 @@ pub fn deserialize_i64( } } -pub fn deserialize_i96(indexes: &[PageIndex<[u32; 3]>], data_type: DataType) -> ColumnIndex { - ColumnIndex { +pub fn deserialize_i96( + indexes: &[PageIndex<[u32; 3]>], + data_type: DataType, +) -> ColumnPageStatistics { + ColumnPageStatistics { min: deserialize_int96(indexes.iter().map(|index| index.min), data_type.clone()), max: deserialize_int96(indexes.iter().map(|index| index.max), data_type), null_count: PrimitiveArray::from_trusted_len_iter( @@ -191,8 +194,11 @@ pub fn deserialize_i96(indexes: &[PageIndex<[u32; 3]>], data_type: DataType) -> } } -pub fn deserialize_id(indexes: &[PageIndex], data_type: DataType) -> ColumnIndex { - ColumnIndex { +pub fn deserialize_id( + indexes: &[PageIndex], + data_type: DataType, +) -> ColumnPageStatistics { + ColumnPageStatistics { min: deserialize_id_s(indexes.iter().map(|index| index.min), data_type.clone()), max: deserialize_id_s(indexes.iter().map(|index| index.max), data_type), null_count: PrimitiveArray::from_trusted_len_iter( diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 9276c303f69..be0ddc9ac56 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -3,7 +3,7 @@ mod deserialize; mod file; -mod indexes; +pub mod indexes; mod row_group; pub mod schema; pub mod statistics; @@ -37,7 +37,6 @@ use crate::{array::Array, error::Result}; pub use deserialize::{column_iter_to_arrays, get_page_iterator}; pub use file::{FileReader, RowGroupReader}; -pub use indexes::{read_columns_indexes, ColumnIndex}; pub use row_group::*; pub use schema::{infer_schema, FileMetaData}; diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index d2e7f0059d3..ed4d724b639 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -5,8 +5,9 @@ use futures::{ AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, }; use parquet2::{ + indexes::FilteredPage, metadata::ColumnChunkMetaData, - read::{BasicDecompressor, PageReader}, + read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader}, }; use crate::{ @@ -91,6 +92,21 @@ pub fn get_field_columns<'a>( .collect() } +/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. +/// For non-nested parquet types, this returns a single column +pub fn get_field_pages<'a, T>( + columns: &'a [ColumnChunkMetaData], + items: &'a [T], + field_name: &str, +) -> Vec<&'a T> { + columns + .iter() + .zip(items) + .filter(|(metadata, _)| metadata.descriptor().path_in_schema[0] == field_name) + .map(|(_, item)| item) + .collect() +} + /// Reads all columns that are part of the parquet field `field_name` /// # Implementation /// This operation is IO-bounded `O(C)` where C is the number of columns associated to @@ -167,6 +183,12 @@ pub async fn read_columns_async< try_join_all(futures).await } +type Pages = Box< + dyn Iterator> + + Sync + + Send, +>; + /// Converts a vector of columns associated with the parquet field whose name is [`Field`] /// to an iterator of [`Array`], [`ArrayIter`] of chunk size `chunk_size`. pub fn to_deserializer<'a>( @@ -174,26 +196,59 @@ pub fn to_deserializer<'a>( field: Field, num_rows: usize, chunk_size: Option, + pages: Option>>, ) -> Result> { let chunk_size = chunk_size.map(|c| c.min(num_rows)); - let (columns, types): (Vec<_>, Vec<_>) = columns - .into_iter() - .map(|(column_meta, chunk)| { - let len = chunk.len(); - let pages = PageReader::new( - std::io::Cursor::new(chunk), - column_meta, - std::sync::Arc::new(|_, _| true), - vec![], - len * 2 + 1024, - ); - ( - BasicDecompressor::new(pages, vec![]), - &column_meta.descriptor().descriptor.primitive_type, - ) - }) - .unzip(); + let (columns, types) = if let Some(pages) = pages { + let (columns, types): (Vec<_>, Vec<_>) = columns + .into_iter() + .zip(pages.into_iter()) + .map(|((column_meta, chunk), mut pages)| { + // de-offset the start, since we read in chunks (and offset is from start of file) + let mut meta: PageMetaData = column_meta.into(); + pages + .iter_mut() + .for_each(|page| page.start -= meta.column_start); + meta.column_start = 0; + let pages = IndexedPageReader::new_with_page_meta( + std::io::Cursor::new(chunk), + meta, + pages, + vec![], + vec![], + ); + let pages = Box::new(pages) as Pages; + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + (columns, types) + } else { + let (columns, types): (Vec<_>, Vec<_>) = columns + .into_iter() + .map(|(column_meta, chunk)| { + let len = chunk.len(); + let pages = PageReader::new( + std::io::Cursor::new(chunk), + column_meta, + std::sync::Arc::new(|_, _| true), + vec![], + len * 2 + 1024, + ); + let pages = Box::new(pages) as Pages; + ( + BasicDecompressor::new(pages, vec![]), + &column_meta.descriptor().descriptor.primitive_type, + ) + }) + .unzip(); + + (columns, types) + }; column_iter_to_arrays(columns, types, field, chunk_size, num_rows) } @@ -214,6 +269,7 @@ pub fn read_columns_many<'a, R: Read + Seek>( fields: Vec, chunk_size: Option, limit: Option, + pages: Option>>>, ) -> Result>> { let num_rows = row_group.num_rows(); let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows); @@ -225,19 +281,27 @@ pub fn read_columns_many<'a, R: Read + Seek>( .map(|field| read_columns(reader, row_group.columns(), &field.name)) .collect::>>()?; - field_columns - .into_iter() - .zip(fields.into_iter()) - .map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size)) - .collect() + if let Some(pages) = pages { + field_columns + .into_iter() + .zip(fields) + .zip(pages) + .map(|((columns, field), pages)| { + to_deserializer(columns, field, num_rows, chunk_size, Some(pages)) + }) + .collect() + } else { + field_columns + .into_iter() + .zip(fields.into_iter()) + .map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size, None)) + .collect() + } } /// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose /// name matches `fields`'s names. /// -/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group - -/// it reads all the columns to memory from the row group associated to the requested fields. -/// /// # Implementation /// This operation is IO-bounded `O(C)` where C is the number of columns in the row group - /// it reads all the columns to memory from the row group associated to the requested fields. @@ -252,16 +316,32 @@ pub async fn read_columns_many_async< row_group: &RowGroupMetaData, fields: Vec, chunk_size: Option, + limit: Option, + pages: Option>>>, ) -> Result>> { + let num_rows = row_group.num_rows(); + let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows); + let futures = fields .iter() .map(|field| read_columns_async(factory.clone(), row_group.columns(), &field.name)); let field_columns = try_join_all(futures).await?; - field_columns - .into_iter() - .zip(fields.into_iter()) - .map(|(columns, field)| to_deserializer(columns, field, row_group.num_rows(), chunk_size)) - .collect() + if let Some(pages) = pages { + field_columns + .into_iter() + .zip(fields) + .zip(pages) + .map(|((columns, field), pages)| { + to_deserializer(columns, field, num_rows, chunk_size, Some(pages)) + }) + .collect() + } else { + field_columns + .into_iter() + .zip(fields.into_iter()) + .map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size, None)) + .collect() + } } diff --git a/src/io/parquet/read/statistics/dictionary.rs b/src/io/parquet/read/statistics/dictionary.rs index a4bc349741f..3f644ad7da1 100644 --- a/src/io/parquet/read/statistics/dictionary.rs +++ b/src/io/parquet/read/statistics/dictionary.rs @@ -40,7 +40,8 @@ impl MutableArray for DynMutableDictionary { let inner = self.inner.as_box(); match self.data_type.to_physical_type() { PhysicalType::Dictionary(key) => match_integer_type!(key, |$T| { - let keys = PrimitiveArray::<$T>::from_iter((0..inner.len() as $T).map(Some)); + let keys: Vec<$T> = (0..inner.len() as $T).collect(); + let keys = PrimitiveArray::<$T>::from_vec(keys); Box::new(DictionaryArray::<$T>::try_new(self.data_type.clone(), keys, inner).unwrap()) }), _ => todo!(), diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 04d838e96a1..b1bab1b84ae 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -26,12 +26,17 @@ pub fn read_column(mut reader: R, column: &str) -> Result(mut reader: R, column: &str) -> Result) -> Result>>()?; @@ -1559,7 +1565,7 @@ fn filter_chunk() -> Result<()> { .map(|(_, row_group)| row_group) .collect(); - let reader = p_read::FileReader::new(reader, row_groups, schema, None, None); + let reader = p_read::FileReader::new(reader, row_groups, schema, None, None, None); let new_chunks = reader.collect::>>()?; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index efb3943da94..233560398f9 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -500,7 +500,7 @@ fn all_types() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata.row_groups, schema, None, None); + let reader = FileReader::new(reader, metadata.row_groups, schema, None, None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); @@ -542,7 +542,7 @@ fn all_types_chunked() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; // chunk it in 5 (so, (5,3)) - let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None); let batches = reader.collect::>>()?; assert_eq!(batches.len(), 2); @@ -605,7 +605,7 @@ fn invalid_utf8() -> Result<()> { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; - let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None); + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None); let error = reader.collect::>>().unwrap_err(); assert!( diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 328a826eaf0..a9776fdb5a9 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -1,9 +1,9 @@ use std::io::Cursor; +use arrow2::chunk::Chunk; use arrow2::error::Error; +use arrow2::io::parquet::read::indexes; use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*}; -use parquet2::indexes::{compute_rows, select_pages}; -use parquet2::read::IndexedPageReader; /// Returns 2 sets of pages with different the same number of rows distributed un-evenly fn pages( @@ -103,32 +103,56 @@ fn read_with_indexes( let schema = infer_schema(&metadata)?; - let row_group = &metadata.row_groups[0]; + // row group-based filtering can be done here + let row_groups = metadata.row_groups; - let pages = read_pages_locations(&mut reader, row_group.columns())?; - - // say we concluded from the indexes that we only needed the "6" from the first column, so second page. - let _indexes = read_columns_indexes(&mut reader, row_group.columns(), &schema.fields)?; - let intervals = compute_rows(&[false, true, false], &pages[0], row_group.num_rows())?; - - // based on the intervals from c1, we compute which pages from the second column are required: - let pages = select_pages(&intervals, &pages[1], row_group.num_rows())?; + // one per row group + let pages = row_groups + .iter() + .map(|row_group| { + assert!(indexes::has_indexes(row_group)); + + indexes::read_filtered_pages(&mut reader, row_group, &schema.fields, |_, intervals| { + let first_field = &intervals[0]; + let first_field_column = &first_field[0]; + assert_eq!(first_field_column.len(), 3); + let selection = [false, true, false]; + + first_field_column + .iter() + .zip(selection) + .filter_map(|(i, is_selected)| is_selected.then(|| *i)) + .collect() + }) + }) + .collect::>>()?; - // and read them: - let c1 = &metadata.row_groups[0].columns()[1]; + // apply projection pushdown + let schema = schema.filter(|index, _| index == 1); + let pages = pages + .into_iter() + .map(|pages| { + pages + .into_iter() + .enumerate() + .filter(|(index, _)| *index == 1) + .map(|(_, pages)| pages) + .collect::>() + }) + .collect::>(); - let pages = IndexedPageReader::new(reader, c1, pages, vec![], vec![]); - let pages = BasicDecompressor::new(pages, vec![]); + let expected = Chunk::new(vec![expected]); - let arrays = column_iter_to_arrays( - vec![pages], - vec![&c1.descriptor().descriptor.primitive_type], - schema.fields[1].clone(), + let chunks = FileReader::new( + reader, + row_groups, + schema, + Some(1024 * 8 * 8), None, - row_group.num_rows(), - )?; + Some(pages), + ); - let arrays = arrays.collect::>>()?; + let arrays = chunks.collect::>>()?; assert_eq!(arrays, vec![expected]); Ok(()) diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 000c39ca64c..3e1ba14fe9c 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -60,9 +60,10 @@ async fn test_parquet_async_roundtrip() { let mut out = vec![]; for group in &metadata.row_groups { - let column_chunks = read_columns_many_async(factory, group, schema.fields.clone(), None) - .await - .unwrap(); + let column_chunks = + read_columns_many_async(factory, group, schema.fields.clone(), None, None, None) + .await + .unwrap(); let chunks = RowGroupDeserializer::new(column_chunks, group.num_rows(), None); let mut chunks = chunks.collect::>>().unwrap(); out.append(&mut chunks);