Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration test for scan rows with selection #2158

Merged
merged 9 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
self.record_reader.skip_records(num_records, self.pages.as_mut())
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
self.record_reader.skip_records(num_records, self.pages.as_mut())
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
8 changes: 7 additions & 1 deletion parquet/src/arrow/array_reader/complex_object_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,13 @@ where
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
match self.column_reader.as_mut() {
Some(reader) => reader.skip_records(num_records),
None => Ok(0),
None => {
if self.next_column_reader()? {
self.column_reader.as_mut().unwrap().skip_records(num_records)
}else {
Ok(0)
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
self.record_reader.skip_records(num_records, self.pages.as_mut())
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
self.record_reader.skip_records(num_records, self.pages.as_mut())
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
221 changes: 219 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;

/// Arrow reader api.
Expand Down Expand Up @@ -217,7 +218,15 @@ impl ParquetFileArrowReader {
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
let file_reader = if options.selection.is_some() {
let options = ReadOptionsBuilder::new().with_page_index().build();
Arc::new(SerializedFileReader::new_with_options(
chunk_reader,
options,
)?)
} else {
Arc::new(SerializedFileReader::new(chunk_reader)?)
};
Ok(Self::new_with_options(file_reader, options))
}

Expand Down Expand Up @@ -298,9 +307,14 @@ impl Iterator for ParquetRecordBatchReader {
continue;
}

// try to read record
let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) => {
selection.push_front(RowSelection::skip(remaining));
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix wrong logic, remaining record need read

if remaining != 0 {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
selection.push_front(RowSelection::select(remaining));
}
self.batch_size
}
None => front.row_count,
Expand Down Expand Up @@ -390,6 +404,7 @@ mod tests {

use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
ParquetRecordBatchReader, RowSelection,
};
use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
Expand Down Expand Up @@ -1586,4 +1601,206 @@ mod tests {
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
}

#[test]
fn test_scan_row_with_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let test_file = File::open(&path).unwrap();

// total row count 7300
// 1. test selection len more than one page row count
let batch_size = 1000;
let expected_data = create_expect_batch(&test_file, batch_size);

let selections = create_test_selection(batch_size, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 0;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert!(num == batch_size || num == 300);
total_row_count += num;
}
assert_eq!(total_row_count, 4000);

let selections = create_test_selection(batch_size, 7300, true);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 1;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
//the lase batch will be 300
assert!(num == batch_size || num == 300);
total_row_count += num;
}
assert_eq!(total_row_count, 3300);

// 2. test selection len less than one page row count
let batch_size = 20;
let expected_data = create_expect_batch(&test_file, batch_size);
let selections = create_test_selection(batch_size, 7300, false);

let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 0;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert_eq!(num, batch_size);
total_row_count += num;
}
assert_eq!(total_row_count, 3660);

let selections = create_test_selection(batch_size, 7300, true);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 1;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert_eq!(num, batch_size);
total_row_count += num;
}
assert_eq!(total_row_count, 3640);

// 3. test selection_len less than batch_size
let batch_size = 20;
let selection_len = 5;
let expected_data_batch = create_expect_batch(&test_file, batch_size);
let expected_data_selection = create_expect_batch(&test_file, selection_len);
let selections = create_test_selection(selection_len, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);

let mut total_row_count = 0;

for batch in skip_reader {
let batch = batch.unwrap();
let num = batch.num_rows();
assert!(num == batch_size || num == selection_len);
if num == batch_size {
assert_eq!(
batch,
expected_data_batch
.get(total_row_count / batch_size)
.unwrap()
.clone()
);
total_row_count += batch_size;
} else if num == selection_len {
assert_eq!(
batch,
expected_data_selection
.get(total_row_count / selection_len)
.unwrap()
.clone()
);
total_row_count += selection_len;
}
// add skip offset
total_row_count += selection_len;
}

// 4. test selection_len more than batch_size
// If batch_size < selection_len will divide selection(50, read) ->
// selection(20, read), selection(20, read), selection(10, read)
let batch_size = 20;
let selection_len = 50;
let another_batch_size = 10;
let expected_data_batch = create_expect_batch(&test_file, batch_size);
let expected_data_batch2 = create_expect_batch(&test_file, another_batch_size);
let selections = create_test_selection(selection_len, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);

let mut total_row_count = 0;

for batch in skip_reader {
let batch = batch.unwrap();
let num = batch.num_rows();
assert!(num == batch_size || num == another_batch_size);
if num == batch_size {
assert_eq!(
batch,
expected_data_batch
.get(total_row_count / batch_size)
.unwrap()
.clone()
);
total_row_count += batch_size;
} else if num == another_batch_size {
assert_eq!(
batch,
expected_data_batch2
.get(total_row_count / another_batch_size)
.unwrap()
.clone()
);
total_row_count += 10;
// add skip offset
total_row_count += selection_len;
}
}

fn create_skip_reader(
test_file: &File,
batch_size: usize,
selections: Vec<RowSelection>,
) -> ParquetRecordBatchReader {
let arrow_reader_options =
ArrowReaderOptions::new().with_row_selection(selections);

let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(
test_file.try_clone().unwrap(),
arrow_reader_options,
)
.unwrap();
skip_arrow_reader.get_record_reader(batch_size).unwrap()
}

fn create_test_selection(
step_len: usize,
total_len: usize,
skip_first: bool,
) -> Vec<RowSelection> {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
while remaining != 0 {
let step = if remaining > step_len {
step_len
} else {
remaining
};
vec.push(RowSelection {
row_count: step,
skip,
});
remaining -= step;
skip = !skip;
}
vec
}

fn create_expect_batch(test_file: &File, batch_size: usize) -> Vec<RecordBatch> {
let mut serial_arrow_reader =
ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap();
let serial_reader =
serial_arrow_reader.get_record_reader(batch_size).unwrap();
let mut expected_data = vec![];
for batch in serial_reader {
expected_data.push(batch.unwrap());
}
expected_data
}
}
}
18 changes: 16 additions & 2 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::arrow::record_reader::{
buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
};
use crate::column::page::PageIterator;
use crate::column::{
page::PageReader,
reader::{
Expand Down Expand Up @@ -184,11 +185,24 @@ where
/// # Returns
///
/// Number of records skipped
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
pub fn skip_records(
&mut self,
num_records: usize,
pages: &mut dyn PageIterator,
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<usize> {
// First need to clear the buffer
let end_of_column = match self.column_reader.as_mut() {
Some(reader) => !reader.has_next()?,
None => return Ok(0),
None => {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = pages.next() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix skip before all read operator, need set column_reader

self.set_page_reader(page_reader?)?;
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong, as it will now only mark end_of_column when it reaches the end of the file, instead of the end of a column chunk within a row group. This will break record delimiting for repeated fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold i move it out to

 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
        if self.record_reader.column_reader().is_none() {
            // If we skip records before all read operation
            // we need set `column_reader` by `set_page_reader`
            if let Some(page_reader) = self.pages.next() {
                self.record_reader.set_page_reader(page_reader?)?;
            } else {
                return Ok(0);
            }
        }
        self.record_reader.skip_records(num_records)
    }

I think in this situation , only skip the first page without read any record the column_reader is none. related #2171 if
we create it in colchunk, then we will remove this check.

} else {
return Ok(0);
}
}
};

let (buffered_records, buffered_values) =
Expand Down
13 changes: 10 additions & 3 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ where
let mut remaining = num_records;
while remaining != 0 {
if self.num_buffered_values == self.num_decoded_values {
let metadata = match self.page_reader.peek_next_page()? {
let mut metadata = match self.page_reader.peek_next_page()? {
None => return Ok(num_records - remaining),
Some(metadata) => metadata,
};
Expand All @@ -312,13 +312,20 @@ where

// If page has less rows than the remaining records to
// be skipped, skip entire page
if metadata.num_rows < remaining {
while metadata.num_rows < remaining {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary, there is already an outer while loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because first add below

// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
  self.read_new_page()?;

if we still use if, we may read needless page header

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This while loop should result in the same behaviour as the previous continue??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... it's an useless loop

self.page_reader.skip_next_page()?;
remaining -= metadata.num_rows;
continue;
metadata = match self.page_reader.peek_next_page()? {
None => return Ok(num_records - remaining),
Some(metadata) => metadata,
};
}
// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
self.read_new_page()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could check the return type of this, and short-circuit if it returns false?

}

// start skip values in page level
let to_read = remaining
.min((self.num_buffered_values - self.num_decoded_values) as usize);

Expand Down