Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ChunkReader::get_bytes #2478

Merged
merged 4 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,14 @@ impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.slice(0..length).reader())
.map(|idx| data[idx].1.slice(0..length))
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {}",
Expand All @@ -641,7 +645,7 @@ impl ChunkReader for ColumnChunkData {
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
let end = start + length;
Ok(data.slice(start..end).reader())
Ok(data.slice(start..end))
}
}
}
Expand Down
15 changes: 2 additions & 13 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,8 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
));
}

let mut metadata = Vec::with_capacity(metadata_len);

let read = chunk_reader
.get_read(file_size - footer_metadata_len as u64, metadata_len)?
.read_to_end(&mut metadata)?;

if read != metadata_len {
return Err(eof_err!(
"Expected to read {} bytes of metadata, got {}",
metadata_len,
read
));
}
let metadata =
chunk_reader.get_bytes(file_size - footer_metadata_len as u64, metadata_len)?;

decode_metadata(&metadata)
}
Expand Down
17 changes: 17 additions & 0 deletions parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Contains file reader API and provides methods to access file metadata, row group
//! readers to read individual column chunks, or access record iterator.

use bytes::Bytes;
use std::{boxed::Box, io::Read, sync::Arc};

use crate::column::page::PageIterator;
Expand Down Expand Up @@ -48,6 +49,22 @@ pub trait ChunkReader: Length + Send + Sync {
/// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;

/// Get a range as bytes
/// This should fail if the exact number of bytes cannot be read
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(length);
let read = self.get_read(start, length)?.read_to_end(&mut buffer)?;

if read != length {
return Err(eof_err!(
"Expected to read {} bytes, read only {}",
length,
read
));
}
Ok(buffer.into())
}
}

// ----------------------------------------------------------------------
Expand Down
25 changes: 8 additions & 17 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
Ok(self.slice(start..start + length).reader())
Ok(self.slice(start..start + length))
}
}

Expand Down Expand Up @@ -623,26 +627,13 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

let page_len = front.compressed_page_size as usize;

// TODO: Add ChunkReader get_bytes to potentially avoid copy
let mut buffer = Vec::with_capacity(page_len);
let read = self
.reader
.get_read(front.offset as u64, page_len)?
.read_to_end(&mut buffer)?;

if read != page_len {
return Err(eof_err!(
"Expected to read {} bytes of page, read only {}",
page_len,
read
));
}
let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
Copy link
Contributor Author

@tustvold tustvold Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only do this when we have an offset index, as we need to know the size of the page to read. There is a question over whether we could just eagerly fetch the entire column chunk in the latter case, this needs some investigation. It would drastically simplify a lot of the code (it would eliminate FileSource)

Edit: Updated #1163 (comment)

Copy link
Member

@Ted-Jiang Ted-Jiang Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool ! like we call multi-times skip_rows in one page, this should eagerly fetch the entire column in memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a page is small enough like 1 mb, i guess there is no defect when using eagerly fetch the entire column. looking forward the investigation result!👍


let mut cursor = Cursor::new(buffer);
let mut cursor = Cursor::new(buffer.as_ref());
let header = read_page_header(&mut cursor)?;
let offset = cursor.position();

let bytes = Bytes::from(cursor.into_inner()).slice(offset as usize..);
let bytes = buffer.slice(offset as usize..);
decode_page(
header,
bytes.into(),
Expand Down