Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Parquet do not copy uncompressed pages #18441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,6 @@ pub enum CompressedPage {
}

impl CompressedPage {
pub(crate) fn buffer(&self) -> &[u8] {
match self {
CompressedPage::Data(page) => &page.buffer,
CompressedPage::Dict(page) => &page.buffer,
}
}

pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {
match self {
CompressedPage::Data(page) => page.buffer.to_mut(),
Expand Down Expand Up @@ -292,13 +285,6 @@ impl CompressedPage {
CompressedPage::Dict(_) => Some(0),
}
}

pub(crate) fn uncompressed_size(&self) -> usize {
match self {
CompressedPage::Data(page) => page.uncompressed_page_size,
CompressedPage::Dict(page) => page.uncompressed_page_size,
}
}
}

/// An uncompressed, encoded dictionary page.
Expand Down
123 changes: 60 additions & 63 deletions crates/polars-parquet/src/parquet/read/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,75 +54,72 @@ fn decompress_v2(
Ok(())
}

/// decompresses a [`CompressedDataPage`] into `buffer`.
/// If the page is un-compressed, `buffer` is swapped instead.
/// Returns whether the page was decompressed.
pub fn decompress_buffer(
compressed_page: &mut CompressedPage,
buffer: &mut Vec<u8>,
) -> ParquetResult<bool> {
if compressed_page.compression() != Compression::Uncompressed {
// prepare the compression buffer
let read_size = compressed_page.uncompressed_size();

if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}
match compressed_page {
CompressedPage::Data(compressed_page) => match compressed_page.header() {
DataPageHeader::V1(_) => {
decompress_v1(&compressed_page.buffer, compressed_page.compression, buffer)?
},
DataPageHeader::V2(header) => decompress_v2(
&compressed_page.buffer,
header,
compressed_page.compression,
buffer,
)?,
},
CompressedPage::Dict(page) => decompress_v1(&page.buffer, page.compression(), buffer)?,
}
Ok(true)
} else {
// page.buffer is already decompressed => swap it with `buffer`, making `page.buffer` the
// decompression buffer and `buffer` the decompressed buffer
std::mem::swap(&mut compressed_page.buffer().to_vec(), buffer);
Ok(false)
}
}

fn create_page(compressed_page: CompressedPage, buffer: Vec<u8>) -> Page {
match compressed_page {
CompressedPage::Data(page) => Page::Data(DataPage::new_read(
/// Decompresses the page, using `buffer` for decompression.
/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved.
/// Else, decompression took place.
pub fn decompress(compressed_page: CompressedPage, buffer: &mut Vec<u8>) -> ParquetResult<Page> {
Ok(match (compressed_page.compression(), compressed_page) {
(Compression::Uncompressed, CompressedPage::Data(page)) => Page::Data(DataPage::new_read(
page.header,
CowBuffer::Owned(buffer),
page.buffer,
page.descriptor,
)),
CompressedPage::Dict(page) => Page::Dict(DictPage {
buffer: CowBuffer::Owned(buffer),
(_, CompressedPage::Data(page)) => {
// prepare the compression buffer
let read_size = page.uncompressed_size();

if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}

match page.header() {
DataPageHeader::V1(_) => decompress_v1(&page.buffer, page.compression, buffer)?,
DataPageHeader::V2(header) => {
decompress_v2(&page.buffer, header, page.compression, buffer)?
},
}
let buffer = CowBuffer::Owned(std::mem::take(buffer));

Page::Data(DataPage::new_read(page.header, buffer, page.descriptor))
},
(Compression::Uncompressed, CompressedPage::Dict(page)) => Page::Dict(DictPage {
buffer: page.buffer,
num_values: page.num_values,
is_sorted: page.is_sorted,
}),
}
}

/// Decompresses the page, using `buffer` for decompression.
/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved.
/// Else, decompression took place.
pub fn decompress(
mut compressed_page: CompressedPage,
buffer: &mut Vec<u8>,
) -> ParquetResult<Page> {
decompress_buffer(&mut compressed_page, buffer)?;
Ok(create_page(compressed_page, std::mem::take(buffer)))
(_, CompressedPage::Dict(page)) => {
// prepare the compression buffer
let read_size = page.uncompressed_page_size;

if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}
decompress_v1(&page.buffer, page.compression(), buffer)?;
let buffer = CowBuffer::Owned(std::mem::take(buffer));

Page::Dict(DictPage {
buffer,
num_values: page.num_values,
is_sorted: page.is_sorted,
})
},
})
}

type _Decompressor<I> = streaming_decompression::Decompressor<
Expand Down