diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index eea271306e25..19c877dffc2c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2422,6 +2422,76 @@ mod tests { assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]); } + // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec + // but different algorithms: LZ4_HADOOP and LZ4_RAW. + // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses + // LZ4_HADOOP algorithm for compression. + // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses + // LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with + // older parquet-cpp versions. + // + // For more information, check: https://github.com/apache/arrow-rs/issues/2988 + #[test] + fn test_read_lz4_hadoop_fallback() { + for file in [ + "hadoop_lz4_compressed.parquet", + "non_hadoop_lz4_compressed.parquet", + ] { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/{}", testdata, file); + let file = File::open(&path).unwrap(); + let expected_rows = 4; + + let batches = ParquetRecordBatchReader::try_new(file, expected_rows) + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + + assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.num_rows(), expected_rows); + + let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap(); + assert_eq!( + a.values(), + &[1593604800, 1593604800, 1593604801, 1593604801] + ); + + let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap(); + let b: Vec<_> = b.iter().flatten().collect(); + assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]); + + let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap(); + assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]); + } + } + + #[test] + fn test_read_lz4_hadoop_large() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/hadoop_lz4_compressed_larger.parquet", testdata); + let file = File::open(&path).unwrap(); + let expected_rows = 10000; + + let batches = ParquetRecordBatchReader::try_new(file, expected_rows) + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), expected_rows); + + let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap(); + let a: Vec<_> = a.iter().flatten().collect(); + assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"); + assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843"); + assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c"); + assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf"); + } + #[test] #[cfg(feature = "snap")] fn test_read_nested_lists() { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 7415d9aad0a7..3cdf04f5494c 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -24,7 +24,7 @@ use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::column::writer::encoder::{ ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues, }; -use crate::compression::{create_codec, Codec}; +use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::encodings::levels::LevelEncoder; @@ -221,7 +221,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { page_writer: Box, ) -> Self { let codec = props.compression(descr.path()); - let compressor = create_codec(codec).unwrap(); + let codec_options = CodecOptionsBuilder::default().build(); + let compressor = create_codec(codec, &codec_options).unwrap(); let encoder = E::try_new(&descr, props.as_ref()).unwrap(); let statistics_enabled = props.statistics_enabled(descr.path()); @@ -1107,7 +1108,8 @@ mod tests { }; use crate::file::writer::TrackedWrite; use crate::file::{ - properties::WriterProperties, reader::SerializedPageReader, + properties::{ReaderProperties, WriterProperties}, + reader::SerializedPageReader, writer::SerializedPageWriter, }; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; @@ -1674,11 +1676,15 @@ mod tests { assert_eq!(stats.null_count(), 0); assert!(stats.distinct_count().is_none()); - let reader = SerializedPageReader::new( + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let reader = SerializedPageReader::new_with_properties( Arc::new(Bytes::from(buf)), &r.metadata, r.rows_written as usize, None, + Arc::new(props), ) .unwrap(); @@ -1714,11 +1720,15 @@ mod tests { let r = writer.close().unwrap(); assert!(r.metadata.statistics().is_none()); - let reader = SerializedPageReader::new( + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let reader = SerializedPageReader::new_with_properties( Arc::new(Bytes::from(buf)), &r.metadata, r.rows_written as usize, None, + Arc::new(props), ) .unwrap(); @@ -1842,12 +1852,16 @@ mod tests { let r = writer.close().unwrap(); // Read pages and check the sequence + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); let mut page_reader = Box::new( - SerializedPageReader::new( + SerializedPageReader::new_with_properties( Arc::new(file), &r.metadata, r.rows_written as usize, None, + Arc::new(props), ) .unwrap(), ); @@ -2210,12 +2224,16 @@ mod tests { assert_eq!(values_written, values.len()); let result = writer.close().unwrap(); + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); let page_reader = Box::new( - SerializedPageReader::new( + SerializedPageReader::new_with_properties( Arc::new(file), &result.metadata, result.rows_written as usize, None, + Arc::new(props), ) .unwrap(), ); diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 310dbd34f1f6..bba14f94e2eb 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -26,9 +26,12 @@ # Example ```no_run -use parquet::{basic::Compression, compression::create_codec}; +use parquet::{basic::Compression, compression::{create_codec, CodecOptionsBuilder}}; -let mut codec = match create_codec(Compression::SNAPPY) { +let codec_options = CodecOptionsBuilder::default() + .set_backward_compatible_lz4(false) + .build(); +let mut codec = match create_codec(Compression::SNAPPY, &codec_options) { Ok(Some(codec)) => codec, _ => panic!(), }; @@ -71,10 +74,60 @@ pub trait Codec: Send { ) -> Result; } +/// Struct to hold `Codec` creation options. +#[derive(Debug, PartialEq, Eq)] +pub struct CodecOptions { + /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP. + backward_compatible_lz4: bool, +} + +impl Default for CodecOptions { + fn default() -> Self { + CodecOptionsBuilder::default().build() + } +} + +pub struct CodecOptionsBuilder { + /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP. + backward_compatible_lz4: bool, +} + +impl Default for CodecOptionsBuilder { + fn default() -> Self { + Self { + backward_compatible_lz4: true, + } + } +} + +impl CodecOptionsBuilder { + /// Enable/disable backward compatible LZ4. + /// + /// If backward compatible LZ4 is enable, on LZ4_HADOOP error it will fallback + /// to the older versions LZ4 algorithms. That is LZ4_FRAME, for backward compatibility + /// with files generated by older versions of this library, and LZ4_RAW, for backward + /// compatibility with files generated by older versions of parquet-cpp. + /// + /// If backward compatible LZ4 is disabled, on LZ4_HADOOP error it will return the error. + pub fn set_backward_compatible_lz4(mut self, value: bool) -> CodecOptionsBuilder { + self.backward_compatible_lz4 = value; + self + } + + pub fn build(self) -> CodecOptions { + CodecOptions { + backward_compatible_lz4: self.backward_compatible_lz4, + } + } +} + /// Given the compression type `codec`, returns a codec used to compress and decompress /// bytes for the compression type. /// This returns `None` if the codec type is `UNCOMPRESSED`. -pub fn create_codec(codec: CodecType) -> Result>> { +pub fn create_codec( + codec: CodecType, + options: &CodecOptions, +) -> Result>> { match codec { #[cfg(any(feature = "brotli", test))] CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))), @@ -83,7 +136,9 @@ pub fn create_codec(codec: CodecType) -> Result>> { #[cfg(any(feature = "snap", test))] CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))), #[cfg(any(feature = "lz4", test))] - CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))), + CodecType::LZ4 => Ok(Some(Box::new(LZ4HadoopCodec::new( + options.backward_compatible_lz4, + )))), #[cfg(any(feature = "zstd", test))] CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))), #[cfg(any(feature = "lz4", test))] @@ -348,6 +403,7 @@ pub use zstd_codec::*; #[cfg(any(feature = "lz4", test))] mod lz4_raw_codec { use crate::compression::Codec; + use crate::errors::ParquetError; use crate::errors::Result; /// Codec for LZ4 Raw compression algorithm. @@ -360,12 +416,6 @@ mod lz4_raw_codec { } } - // Compute max LZ4 uncompress size. - // Check https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation - fn max_uncompressed_size(compressed_size: usize) -> usize { - (compressed_size << 8) - compressed_size - 2526 - } - impl Codec for LZ4RawCodec { fn decompress( &mut self, @@ -374,8 +424,14 @@ mod lz4_raw_codec { uncompress_size: Option, ) -> Result { let offset = output_buf.len(); - let required_len = - uncompress_size.unwrap_or_else(|| max_uncompressed_size(input_buf.len())); + let required_len = match uncompress_size { + Some(uncompress_size) => uncompress_size, + None => { + return Err(ParquetError::General( + "LZ4RawCodec unsupported without uncompress_size".into(), + )) + } + }; output_buf.resize(offset + required_len, 0); match lz4::block::decompress_to_buffer( input_buf, @@ -383,8 +439,10 @@ mod lz4_raw_codec { &mut output_buf[offset..], ) { Ok(n) => { - if n < required_len { - output_buf.truncate(offset + n); + if n != required_len { + return Err(ParquetError::General( + "LZ4RawCodec uncompress_size is not the expected one".into(), + )); } Ok(n) } @@ -414,6 +472,190 @@ mod lz4_raw_codec { #[cfg(any(feature = "lz4", test))] pub use lz4_raw_codec::*; +#[cfg(any(feature = "lz4", test))] +mod lz4_hadoop_codec { + use crate::compression::lz4_codec::LZ4Codec; + use crate::compression::lz4_raw_codec::LZ4RawCodec; + use crate::compression::Codec; + use crate::errors::{ParquetError, Result}; + use std::io; + + /// Size of u32 type. + const SIZE_U32: usize = std::mem::size_of::(); + + /// Length of the LZ4_HADOOP prefix. + const PREFIX_LEN: usize = SIZE_U32 * 2; + + /// Codec for LZ4 Hadoop compression algorithm. + pub struct LZ4HadoopCodec { + /// Whether or not to fallback to other LZ4 implementations on error. + /// Fallback is done to be backward compatible with older versions of this + /// library and older versions parquet-cpp. + backward_compatible_lz4: bool, + } + + impl LZ4HadoopCodec { + /// Creates new LZ4 Hadoop compression codec. + pub(crate) fn new(backward_compatible_lz4: bool) -> Self { + Self { + backward_compatible_lz4, + } + } + } + + /// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec. + /// Adapted from pola-rs [compression.rs:try_decompress_hadoop](https://pola-rs.github.io/polars/src/parquet2/compression.rs.html#225) + /// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474). + /// Returns error if decompression failed. + fn try_decompress_hadoop( + input_buf: &[u8], + output_buf: &mut [u8], + ) -> io::Result { + // Parquet files written with the Hadoop Lz4Codec use their own framing. + // The input buffer can contain an arbitrary number of "frames", each + // with the following structure: + // - bytes 0..3: big-endian uint32_t representing the frame decompressed size + // - bytes 4..7: big-endian uint32_t representing the frame compressed size + // - bytes 8...: frame compressed data + // + // The Hadoop Lz4Codec source code can be found here: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc + let mut input_len = input_buf.len(); + let mut input = input_buf; + let mut read_bytes = 0; + let mut output_len = output_buf.len(); + let mut output: &mut [u8] = output_buf; + while input_len >= PREFIX_LEN { + let mut bytes = [0; SIZE_U32]; + bytes.copy_from_slice(&input[0..4]); + let expected_decompressed_size = u32::from_be_bytes(bytes); + let mut bytes = [0; SIZE_U32]; + bytes.copy_from_slice(&input[4..8]); + let expected_compressed_size = u32::from_be_bytes(bytes); + input = &input[PREFIX_LEN..]; + input_len -= PREFIX_LEN; + + if input_len < expected_compressed_size as usize { + return Err(io::Error::new( + io::ErrorKind::Other, + "Not enough bytes for Hadoop frame", + )); + } + + if output_len < expected_decompressed_size as usize { + return Err(io::Error::new( + io::ErrorKind::Other, + "Not enough bytes to hold advertised output", + )); + } + let decompressed_size = lz4::block::decompress_to_buffer( + &input[..expected_compressed_size as usize], + Some(output_len as i32), + output, + )?; + if decompressed_size != expected_decompressed_size as usize { + return Err(io::Error::new( + io::ErrorKind::Other, + "Unexpected decompressed size", + )); + } + input_len -= expected_compressed_size as usize; + output_len -= expected_decompressed_size as usize; + read_bytes += expected_decompressed_size as usize; + if input_len > expected_compressed_size as usize { + input = &input[expected_compressed_size as usize..]; + output = &mut output[expected_decompressed_size as usize..]; + } else { + break; + } + } + if input_len == 0 { + Ok(read_bytes) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "Not all input are consumed", + )) + } + } + + impl Codec for LZ4HadoopCodec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec, + uncompress_size: Option, + ) -> Result { + let output_len = output_buf.len(); + let required_len = match uncompress_size { + Some(n) => n, + None => { + return Err(ParquetError::General( + "LZ4HadoopCodec unsupported without uncompress_size".into(), + )) + } + }; + output_buf.resize(output_len + required_len, 0); + match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) { + Ok(n) => { + if n != required_len { + return Err(ParquetError::General( + "LZ4HadoopCodec uncompress_size is not the expected one" + .into(), + )); + } + Ok(n) + } + Err(e) if !self.backward_compatible_lz4 => Err(e.into()), + // Fallback done to be backward compatible with older versions of this + // libray and older versions of parquet-cpp. + Err(_) => { + // Truncate any inserted element before tryingg next algorithm. + output_buf.truncate(output_len); + match LZ4Codec::new().decompress( + input_buf, + output_buf, + uncompress_size, + ) { + Ok(n) => Ok(n), + Err(_) => { + // Truncate any inserted element before tryingg next algorithm. + output_buf.truncate(output_len); + LZ4RawCodec::new().decompress( + input_buf, + output_buf, + uncompress_size, + ) + } + } + } + } + } + + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + // Allocate memory to store the LZ4_HADOOP prefix. + let offset = output_buf.len(); + output_buf.resize(offset + PREFIX_LEN, 0); + + // Append LZ4_RAW compressed bytes after prefix. + LZ4RawCodec::new().compress(input_buf, output_buf)?; + + // Prepend decompressed size and compressed size in big endian to be compatible + // with LZ4_HADOOP. + let output_buf = &mut output_buf[offset..]; + let compressed_size = output_buf.len() - PREFIX_LEN; + let compressed_size = compressed_size as u32; + let uncompressed_size = input_buf.len() as u32; + output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes()); + output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes()); + + Ok(()) + } + } +} +#[cfg(any(feature = "lz4", test))] +pub use lz4_hadoop_codec::*; + #[cfg(test)] mod tests { use super::*; @@ -421,8 +663,11 @@ mod tests { use crate::util::test_common::rand_gen::random_bytes; fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option) { - let mut c1 = create_codec(c).unwrap().unwrap(); - let mut c2 = create_codec(c).unwrap().unwrap(); + let codec_options = CodecOptionsBuilder::default() + .set_backward_compatible_lz4(false) + .build(); + let mut c1 = create_codec(c, &codec_options).unwrap().unwrap(); + let mut c2 = create_codec(c, &codec_options).unwrap().unwrap(); // Compress with c1 let mut compressed = Vec::new(); @@ -473,42 +718,53 @@ mod tests { assert_eq!(&decompressed[..4], prefix); } - fn test_codec(c: CodecType) { + fn test_codec_with_size(c: CodecType) { let sizes = vec![100, 10000, 100000]; for size in sizes { let data = random_bytes(size); - test_roundtrip(c, &data, None); test_roundtrip(c, &data, Some(data.len())); } } + fn test_codec_without_size(c: CodecType) { + let sizes = vec![100, 10000, 100000]; + for size in sizes { + let data = random_bytes(size); + test_roundtrip(c, &data, None); + } + } + #[test] fn test_codec_snappy() { - test_codec(CodecType::SNAPPY); + test_codec_with_size(CodecType::SNAPPY); + test_codec_without_size(CodecType::SNAPPY); } #[test] fn test_codec_gzip() { - test_codec(CodecType::GZIP); + test_codec_with_size(CodecType::GZIP); + test_codec_without_size(CodecType::GZIP); } #[test] fn test_codec_brotli() { - test_codec(CodecType::BROTLI); + test_codec_with_size(CodecType::BROTLI); + test_codec_without_size(CodecType::BROTLI); } #[test] fn test_codec_lz4() { - test_codec(CodecType::LZ4); + test_codec_with_size(CodecType::LZ4); } #[test] fn test_codec_zstd() { - test_codec(CodecType::ZSTD); + test_codec_with_size(CodecType::ZSTD); + test_codec_without_size(CodecType::ZSTD); } #[test] fn test_codec_lz4_raw() { - test_codec(CodecType::LZ4_RAW); + test_codec_with_size(CodecType::LZ4_RAW); } } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 11fb13b4bd68..dc9feb4ce185 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -47,10 +47,27 @@ //! Some(Encoding::PLAIN) //! ); //! ``` +//! +//! Reader properties. +//! +//! # Usage +//! +//! ```rust +//! use parquet::file::properties::ReaderProperties; +//! +//! // Create properties with default configuration. +//! let props = ReaderProperties::builder().build(); +//! +//! // Use properties builder to set certain options and assemble the configuration. +//! let props = ReaderProperties::builder() +//! .set_backward_compatible_lz4(false) +//! .build(); +//! ``` use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; +use crate::compression::{CodecOptions, CodecOptionsBuilder}; use crate::file::metadata::KeyValue; use crate::schema::types::ColumnPath; @@ -560,6 +577,66 @@ impl ColumnProperties { } } +/// Reference counted reader properties. +pub type ReaderPropertiesPtr = Arc; + +/// Reader properties. +/// +/// All properties are immutable and `Send` + `Sync`. +/// Use [`ReaderPropertiesBuilder`] to assemble these properties. +pub struct ReaderProperties { + codec_options: CodecOptions, +} + +impl ReaderProperties { + /// Returns builder for reader properties with default values. + pub fn builder() -> ReaderPropertiesBuilder { + ReaderPropertiesBuilder::with_defaults() + } + + /// Returns codec options. + pub(crate) fn codec_options(&self) -> &CodecOptions { + &self.codec_options + } +} + +/// Reader properties builder. +pub struct ReaderPropertiesBuilder { + codec_options_builder: CodecOptionsBuilder, +} + +/// Reader properties builder. +impl ReaderPropertiesBuilder { + /// Returns default state of the builder. + fn with_defaults() -> Self { + Self { + codec_options_builder: CodecOptionsBuilder::default(), + } + } + + /// Finalizes the configuration and returns immutable reader properties struct. + pub fn build(self) -> ReaderProperties { + ReaderProperties { + codec_options: self.codec_options_builder.build(), + } + } + + /// Enable/disable backward compatible LZ4. + /// + /// If backward compatible LZ4 is enable, on LZ4_HADOOP error it will fallback + /// to the older versions LZ4 algorithms. That is LZ4_FRAME, for backward compatibility + /// with files generated by older versions of this library, and LZ4_RAW, for backward + /// compatibility with files generated by older versions of parquet-cpp. + /// + /// If backward compatible LZ4 is disabled, on LZ4_HADOOP error it will return the error. + pub fn set_backward_compatible_lz4(mut self, value: bool) -> Self { + self.codec_options_builder = self + .codec_options_builder + .set_backward_compatible_lz4(value); + self + } +} + #[cfg(test)] mod tests { use super::*; @@ -747,4 +824,28 @@ mod tests { DEFAULT_DICTIONARY_ENABLED ); } + + #[test] + fn test_reader_properties_default_settings() { + let props = ReaderProperties::builder().build(); + + let codec_options = CodecOptionsBuilder::default() + .set_backward_compatible_lz4(true) + .build(); + + assert_eq!(props.codec_options(), &codec_options); + } + + #[test] + fn test_reader_properties_builder() { + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + + let codec_options = CodecOptionsBuilder::default() + .set_backward_compatible_lz4(false) + .build(); + + assert_eq!(props.codec_options(), &codec_options); + } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 854ae1ef6d34..2b3c7d139148 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -31,7 +31,13 @@ use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::index_reader; -use crate::file::{footer, metadata::*, reader::*, statistics}; +use crate::file::{ + footer, + metadata::*, + properties::{ReaderProperties, ReaderPropertiesPtr}, + reader::*, + statistics, +}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; @@ -139,6 +145,7 @@ impl IntoIterator for SerializedFileReader { pub struct SerializedFileReader { chunk_reader: Arc, metadata: Arc, + props: ReaderPropertiesPtr, } /// A predicate for filtering row groups, invoked with the metadata and index @@ -153,6 +160,7 @@ pub type ReadGroupPredicate = Box bool>; pub struct ReadOptionsBuilder { predicates: Vec, enable_page_index: bool, + props: Option, } impl ReadOptionsBuilder { @@ -186,11 +194,21 @@ impl ReadOptionsBuilder { self } + /// Set the `ReaderProperties` configuration. + pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self { + self.props = Some(properties); + self + } + /// Seal the builder and return the read options pub fn build(self) -> ReadOptions { + let props = self + .props + .unwrap_or_else(|| ReaderProperties::builder().build()); ReadOptions { predicates: self.predicates, enable_page_index: self.enable_page_index, + props, } } } @@ -202,6 +220,7 @@ impl ReadOptionsBuilder { pub struct ReadOptions { predicates: Vec, enable_page_index: bool, + props: ReaderProperties, } impl SerializedFileReader { @@ -209,9 +228,11 @@ impl SerializedFileReader { /// Returns error if Parquet file does not exist or is corrupt. pub fn new(chunk_reader: R) -> Result { let metadata = footer::parse_metadata(&chunk_reader)?; + let props = Arc::new(ReaderProperties::builder().build()); Ok(Self { chunk_reader: Arc::new(chunk_reader), metadata: Arc::new(metadata), + props, }) } @@ -257,6 +278,7 @@ impl SerializedFileReader { Some(columns_indexes), Some(offset_indexes), )), + props: Arc::new(options.props), }) } else { Ok(Self { @@ -265,6 +287,7 @@ impl SerializedFileReader { metadata.file_metadata().clone(), filtered_row_groups, )), + props: Arc::new(options.props), }) } } @@ -298,10 +321,12 @@ impl FileReader for SerializedFileReader { fn get_row_group(&self, i: usize) -> Result> { let row_group_metadata = self.metadata.row_group(i); // Row groups should be processed sequentially. + let props = Arc::clone(&self.props); let f = Arc::clone(&self.chunk_reader); - Ok(Box::new(SerializedRowGroupReader::new( + Ok(Box::new(SerializedRowGroupReader::new_with_properties( f, row_group_metadata, + props, ))) } @@ -314,14 +339,20 @@ impl FileReader for SerializedFileReader { pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, + props: ReaderPropertiesPtr, } impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { - /// Creates new row group reader from a file and row group metadata. - fn new(chunk_reader: Arc, metadata: &'a RowGroupMetaData) -> Self { + /// Creates new row group reader from a file, row group metadata and custom config. + fn new_with_properties( + chunk_reader: Arc, + metadata: &'a RowGroupMetaData, + props: ReaderPropertiesPtr, + ) -> Self { Self { chunk_reader, metadata, + props, } } } @@ -345,11 +376,13 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' .as_ref() .map(|x| x[i].clone()); - Ok(Box::new(SerializedPageReader::new( + let props = Arc::clone(&self.props); + Ok(Box::new(SerializedPageReader::new_with_properties( Arc::clone(&self.chunk_reader), col, self.metadata.num_rows() as usize, page_locations, + props, )?)) } @@ -531,7 +564,25 @@ impl SerializedPageReader { total_rows: usize, page_locations: Option>, ) -> Result { - let decompressor = create_codec(meta.compression())?; + let props = Arc::new(ReaderProperties::builder().build()); + SerializedPageReader::new_with_properties( + reader, + meta, + total_rows, + page_locations, + props, + ) + } + + /// Creates a new serialized page with custom options. + pub fn new_with_properties( + reader: Arc, + meta: &ColumnChunkMetaData, + total_rows: usize, + page_locations: Option>, + props: ReaderPropertiesPtr, + ) -> Result { + let decompressor = create_codec(meta.compression(), props.codec_options())?; let (start, len) = meta.byte_range(); let state = match page_locations { diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index dbbc38461677..528f72494190 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -646,10 +646,10 @@ mod tests { use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; use crate::column::page::PageReader; - use crate::compression::{create_codec, Codec}; + use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::Int32Type; use crate::file::{ - properties::{WriterProperties, WriterVersion}, + properties::{ReaderProperties, WriterProperties, WriterVersion}, reader::{FileReader, SerializedFileReader, SerializedPageReader}, statistics::{from_thrift, to_thrift, Statistics}, }; @@ -947,7 +947,10 @@ mod tests { fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) { let mut compressed_pages = vec![]; let mut total_num_values = 0i64; - let mut compressor = create_codec(codec).unwrap(); + let codec_options = CodecOptionsBuilder::default() + .set_backward_compatible_lz4(false) + .build(); + let mut compressor = create_codec(codec, &codec_options).unwrap(); for page in pages { let uncompressed_len = page.buffer().len(); @@ -1056,11 +1059,15 @@ mod tests { .build() .unwrap(); - let mut page_reader = SerializedPageReader::new( + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = SerializedPageReader::new_with_properties( Arc::new(reader), &meta, total_num_values as usize, None, + Arc::new(props), ) .unwrap(); diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index e43456eb6f40..5744de35e337 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -24,7 +24,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderB use parquet::arrow::ArrowWriter; use parquet::basic::{Encoding, PageType}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{ReaderProperties, WriterProperties}; use parquet::file::reader::SerializedPageReader; use std::sync::Arc; @@ -129,11 +129,15 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { .enumerate(); for (idx, (column, column_layout)) in iter { - let page_reader = SerializedPageReader::new( + let properties = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let page_reader = SerializedPageReader::new_with_properties( Arc::new(file_reader.clone()), column, row_group.num_rows() as usize, None, + Arc::new(properties), ) .unwrap();