From 9ea473e9261eeba8471fcfa0a79265d69a3520a9 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 28 Aug 2024 18:16:18 +0200 Subject: [PATCH] perf: Parquet do not copy uncompressed pages --- crates/polars-parquet/src/parquet/page/mod.rs | 14 -- .../src/parquet/read/compression.rs | 123 +++++++++--------- 2 files changed, 60 insertions(+), 77 deletions(-) diff --git a/crates/polars-parquet/src/parquet/page/mod.rs b/crates/polars-parquet/src/parquet/page/mod.rs index 128f1af03c14..9ffe7e3a9e51 100644 --- a/crates/polars-parquet/src/parquet/page/mod.rs +++ b/crates/polars-parquet/src/parquet/page/mod.rs @@ -258,13 +258,6 @@ pub enum CompressedPage { } impl CompressedPage { - pub(crate) fn buffer(&self) -> &[u8] { - match self { - CompressedPage::Data(page) => &page.buffer, - CompressedPage::Dict(page) => &page.buffer, - } - } - pub(crate) fn buffer_mut(&mut self) -> &mut Vec { match self { CompressedPage::Data(page) => page.buffer.to_mut(), @@ -292,13 +285,6 @@ impl CompressedPage { CompressedPage::Dict(_) => Some(0), } } - - pub(crate) fn uncompressed_size(&self) -> usize { - match self { - CompressedPage::Data(page) => page.uncompressed_page_size, - CompressedPage::Dict(page) => page.uncompressed_page_size, - } - } } /// An uncompressed, encoded dictionary page. diff --git a/crates/polars-parquet/src/parquet/read/compression.rs b/crates/polars-parquet/src/parquet/read/compression.rs index a3d2db312ada..a79989c39e26 100644 --- a/crates/polars-parquet/src/parquet/read/compression.rs +++ b/crates/polars-parquet/src/parquet/read/compression.rs @@ -54,75 +54,72 @@ fn decompress_v2( Ok(()) } -/// decompresses a [`CompressedDataPage`] into `buffer`. -/// If the page is un-compressed, `buffer` is swapped instead. -/// Returns whether the page was decompressed. -pub fn decompress_buffer( - compressed_page: &mut CompressedPage, - buffer: &mut Vec, -) -> ParquetResult { - if compressed_page.compression() != Compression::Uncompressed { - // prepare the compression buffer - let read_size = compressed_page.uncompressed_size(); - - if read_size > buffer.capacity() { - // dealloc and ignore region, replacing it by a new region. - // This won't reallocate - it frees and calls `alloc_zeroed` - *buffer = vec![0; read_size]; - } else if read_size > buffer.len() { - // fill what we need with zeros so that we can use them in `Read`. - // This won't reallocate - buffer.resize(read_size, 0); - } else { - buffer.truncate(read_size); - } - match compressed_page { - CompressedPage::Data(compressed_page) => match compressed_page.header() { - DataPageHeader::V1(_) => { - decompress_v1(&compressed_page.buffer, compressed_page.compression, buffer)? - }, - DataPageHeader::V2(header) => decompress_v2( - &compressed_page.buffer, - header, - compressed_page.compression, - buffer, - )?, - }, - CompressedPage::Dict(page) => decompress_v1(&page.buffer, page.compression(), buffer)?, - } - Ok(true) - } else { - // page.buffer is already decompressed => swap it with `buffer`, making `page.buffer` the - // decompression buffer and `buffer` the decompressed buffer - std::mem::swap(&mut compressed_page.buffer().to_vec(), buffer); - Ok(false) - } -} - -fn create_page(compressed_page: CompressedPage, buffer: Vec) -> Page { - match compressed_page { - CompressedPage::Data(page) => Page::Data(DataPage::new_read( +/// Decompresses the page, using `buffer` for decompression. +/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved. +/// Else, decompression took place. +pub fn decompress(compressed_page: CompressedPage, buffer: &mut Vec) -> ParquetResult { + Ok(match (compressed_page.compression(), compressed_page) { + (Compression::Uncompressed, CompressedPage::Data(page)) => Page::Data(DataPage::new_read( page.header, - CowBuffer::Owned(buffer), + page.buffer, page.descriptor, )), - CompressedPage::Dict(page) => Page::Dict(DictPage { - buffer: CowBuffer::Owned(buffer), + (_, CompressedPage::Data(page)) => { + // prepare the compression buffer + let read_size = page.uncompressed_size(); + + if read_size > buffer.capacity() { + // dealloc and ignore region, replacing it by a new region. + // This won't reallocate - it frees and calls `alloc_zeroed` + *buffer = vec![0; read_size]; + } else if read_size > buffer.len() { + // fill what we need with zeros so that we can use them in `Read`. + // This won't reallocate + buffer.resize(read_size, 0); + } else { + buffer.truncate(read_size); + } + + match page.header() { + DataPageHeader::V1(_) => decompress_v1(&page.buffer, page.compression, buffer)?, + DataPageHeader::V2(header) => { + decompress_v2(&page.buffer, header, page.compression, buffer)? + }, + } + let buffer = CowBuffer::Owned(std::mem::take(buffer)); + + Page::Data(DataPage::new_read(page.header, buffer, page.descriptor)) + }, + (Compression::Uncompressed, CompressedPage::Dict(page)) => Page::Dict(DictPage { + buffer: page.buffer, num_values: page.num_values, is_sorted: page.is_sorted, }), - } -} - -/// Decompresses the page, using `buffer` for decompression. -/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved. -/// Else, decompression took place. -pub fn decompress( - mut compressed_page: CompressedPage, - buffer: &mut Vec, -) -> ParquetResult { - decompress_buffer(&mut compressed_page, buffer)?; - Ok(create_page(compressed_page, std::mem::take(buffer))) + (_, CompressedPage::Dict(page)) => { + // prepare the compression buffer + let read_size = page.uncompressed_page_size; + + if read_size > buffer.capacity() { + // dealloc and ignore region, replacing it by a new region. + // This won't reallocate - it frees and calls `alloc_zeroed` + *buffer = vec![0; read_size]; + } else if read_size > buffer.len() { + // fill what we need with zeros so that we can use them in `Read`. + // This won't reallocate + buffer.resize(read_size, 0); + } else { + buffer.truncate(read_size); + } + decompress_v1(&page.buffer, page.compression(), buffer)?; + let buffer = CowBuffer::Owned(std::mem::take(buffer)); + + Page::Dict(DictPage { + buffer, + num_values: page.num_values, + is_sorted: page.is_sorted, + }) + }, + }) } type _Decompressor = streaming_decompression::Decompressor<