Skip to content

Commit

Permalink
Revert "simplify reading metadata"
Browse files Browse the repository at this point in the history
This reverts commit c0432e6.
  • Loading branch information
alamb committed Oct 1, 2024
1 parent c0432e6 commit 21e8dc2
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::index_reader;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
metadata::*,
Expand Down Expand Up @@ -191,22 +192,37 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let mut metadata_builder = ParquetMetaDataReader::new()
.with_page_indexes(options.enable_page_index)
.parse_and_finish(&chunk_reader)?
.into_builder();

let mut predicates = options.predicates;

// Filter row groups based on the predicates
if !predicates.is_empty() {
for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
if predicates
.iter_mut()
.all(|predicate| predicate(&rg_meta, i))
{
metadata_builder = metadata_builder.add_row_group(rg_meta);
for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
keep = false;
break;
}
}
if keep {
metadata_builder = metadata_builder.add_row_group(rg_meta);
}
}

if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in metadata_builder.row_groups().iter() {
let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}
metadata_builder = metadata_builder
.set_column_index(Some(columns_indexes))
.set_offset_index(Some(offset_indexes));
}

Ok(Self {
Expand Down

0 comments on commit 21e8dc2

Please sign in to comment.