diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index e10a3f33da3b0..a8a27139d11bc 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -45,6 +45,7 @@ class Future; namespace util { class Codec; +class CodecOptions; } // namespace util class Buffer; diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index c67cb4539bc8f..5ad17e993f153 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -136,7 +136,7 @@ Result Codec::DefaultCompressionLevel(Compression::type codec_type) { } Result> Codec::Create(Compression::type codec_type, - int compression_level) { + const CodecOptions& codec_options) { if (!IsAvailable(codec_type)) { if (codec_type == Compression::LZO) { return Status::NotImplemented("LZO codec not implemented"); @@ -151,6 +151,7 @@ Result> Codec::Create(Compression::type codec_type, "' not built"); } + auto compression_level = codec_options.compression_level; if (compression_level != kUseDefaultCompressionLevel && !SupportsCompressionLevel(codec_type)) { return Status::Invalid("Codec '", GetCodecAsString(codec_type), @@ -166,16 +167,23 @@ Result> Codec::Create(Compression::type codec_type, codec = internal::MakeSnappyCodec(); #endif break; - case Compression::GZIP: + case Compression::GZIP: { #ifdef ARROW_WITH_ZLIB - codec = internal::MakeGZipCodec(compression_level); + auto opt = dynamic_cast(&codec_options); + codec = internal::MakeGZipCodec(compression_level, + opt ? opt->gzip_format : GZipFormat::GZIP, + opt ? opt->window_bits : std::nullopt); #endif break; - case Compression::BROTLI: + } + case Compression::BROTLI: { #ifdef ARROW_WITH_BROTLI - codec = internal::MakeBrotliCodec(compression_level); + auto opt = dynamic_cast(&codec_options); + codec = internal::MakeBrotliCodec(compression_level, + opt ? opt->window_bits : std::nullopt); #endif break; + } case Compression::LZ4: #ifdef ARROW_WITH_LZ4 codec = internal::MakeLz4RawCodec(compression_level); @@ -210,6 +218,12 @@ Result> Codec::Create(Compression::type codec_type, return std::move(codec); } +// use compression level to create Codec +Result> Codec::Create(Compression::type codec_type, + int compression_level) { + return Codec::Create(codec_type, CodecOptions{compression_level}); +} + bool Codec::IsAvailable(Compression::type codec_type) { switch (codec_type) { case Compression::UNCOMPRESSED: diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index f0d359d195c80..f7bf4d5e12d02 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "arrow/result.h" @@ -107,6 +108,40 @@ class ARROW_EXPORT Decompressor { // XXX add methods for buffer size heuristics? }; +/// \brief Compression codec options +class ARROW_EXPORT CodecOptions { + public: + explicit CodecOptions(int compression_level = kUseDefaultCompressionLevel) + : compression_level(compression_level) {} + + virtual ~CodecOptions() = default; + + int compression_level; +}; + +// ---------------------------------------------------------------------- +// GZip codec options implementation + +enum class GZipFormat { + ZLIB, + DEFLATE, + GZIP, +}; + +class ARROW_EXPORT GZipCodecOptions : public CodecOptions { + public: + GZipFormat gzip_format = GZipFormat::GZIP; + std::optional window_bits; +}; + +// ---------------------------------------------------------------------- +// brotli codec options implementation + +class ARROW_EXPORT BrotliCodecOptions : public CodecOptions { + public: + std::optional window_bits; +}; + /// \brief Compression codec class ARROW_EXPORT Codec { public: @@ -122,9 +157,13 @@ class ARROW_EXPORT Codec { /// \brief Return compression type for name (all lower case) static Result GetCompressionType(const std::string& name); - /// \brief Create a codec for the given compression algorithm + /// \brief Create a codec for the given compression algorithm with CodecOptions static Result> Create( - Compression::type codec, int compression_level = kUseDefaultCompressionLevel); + Compression::type codec, const CodecOptions& codec_options = CodecOptions{}); + + /// \brief Create a codec for the given compression algorithm + static Result> Create(Compression::type codec, + int compression_level); /// \brief Return true if support for indicated codec has been enabled static bool IsAvailable(Compression::type codec); diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 0ee69281c9fa0..5025f595022f1 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -92,8 +92,8 @@ class BrotliDecompressor : public Decompressor { class BrotliCompressor : public Compressor { public: - explicit BrotliCompressor(int compression_level) - : compression_level_(compression_level) {} + explicit BrotliCompressor(int compression_level, int window_bits) + : compression_level_(compression_level), window_bits_(window_bits) {} ~BrotliCompressor() override { if (state_ != nullptr) { @@ -109,6 +109,9 @@ class BrotliCompressor : public Compressor { if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) { return BrotliError("Brotli set compression level failed"); } + if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_LGWIN, window_bits_)) { + return BrotliError("Brotli set window size failed"); + } return Status::OK(); } @@ -166,6 +169,7 @@ class BrotliCompressor : public Compressor { private: const int compression_level_; + const int window_bits_; }; // ---------------------------------------------------------------------- @@ -173,10 +177,11 @@ class BrotliCompressor : public Compressor { class BrotliCodec : public Codec { public: - explicit BrotliCodec(int compression_level) + explicit BrotliCodec(int compression_level, int window_bits) : compression_level_(compression_level == kUseDefaultCompressionLevel ? kBrotliDefaultCompressionLevel - : compression_level) {} + : compression_level), + window_bits_(window_bits) {} Result Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { @@ -201,16 +206,16 @@ class BrotliCodec : public Codec { DCHECK_GE(input_len, 0); DCHECK_GE(output_buffer_len, 0); std::size_t output_size = static_cast(output_buffer_len); - if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW, - BROTLI_DEFAULT_MODE, static_cast(input_len), input, - &output_size, output_buffer) == BROTLI_FALSE) { + if (BrotliEncoderCompress(compression_level_, window_bits_, BROTLI_DEFAULT_MODE, + static_cast(input_len), input, &output_size, + output_buffer) == BROTLI_FALSE) { return Status::IOError("Brotli compression failure."); } return output_size; } Result> MakeCompressor() override { - auto ptr = std::make_shared(compression_level_); + auto ptr = std::make_shared(compression_level_, window_bits_); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -221,6 +226,14 @@ class BrotliCodec : public Codec { return ptr; } + Status Init() override { + if (window_bits_ < BROTLI_MIN_WINDOW_BITS || window_bits_ > BROTLI_MAX_WINDOW_BITS) { + return Status::Invalid("Brotli window_bits should be between ", + BROTLI_MIN_WINDOW_BITS, " and ", BROTLI_MAX_WINDOW_BITS); + } + return Status::OK(); + } + Compression::type compression_type() const override { return Compression::BROTLI; } int compression_level() const override { return compression_level_; } @@ -232,12 +245,15 @@ class BrotliCodec : public Codec { private: const int compression_level_; + const int window_bits_; }; } // namespace -std::unique_ptr MakeBrotliCodec(int compression_level) { - return std::make_unique(compression_level); +std::unique_ptr MakeBrotliCodec(int compression_level, + std::optional window_bits) { + return std::make_unique(compression_level, + window_bits.value_or(BROTLI_DEFAULT_WINDOW)); } } // namespace internal diff --git a/cpp/src/arrow/util/compression_internal.h b/cpp/src/arrow/util/compression_internal.h index d4cdca117da0c..ab2cf6d98b632 100644 --- a/cpp/src/arrow/util/compression_internal.h +++ b/cpp/src/arrow/util/compression_internal.h @@ -35,25 +35,20 @@ constexpr int kBrotliDefaultCompressionLevel = 8; // Brotli codec. std::unique_ptr MakeBrotliCodec( - int compression_level = kBrotliDefaultCompressionLevel); + int compression_level = kBrotliDefaultCompressionLevel, + std::optional window_bits = std::nullopt); // BZ2 codec. constexpr int kBZ2DefaultCompressionLevel = 9; + std::unique_ptr MakeBZ2Codec(int compression_level = kBZ2DefaultCompressionLevel); // GZip constexpr int kGZipDefaultCompressionLevel = 9; -struct GZipFormat { - enum type { - ZLIB, - DEFLATE, - GZIP, - }; -}; - std::unique_ptr MakeGZipCodec(int compression_level = kGZipDefaultCompressionLevel, - GZipFormat::type format = GZipFormat::GZIP); + GZipFormat format = GZipFormat::GZIP, + std::optional window_bits = std::nullopt); // Snappy std::unique_ptr MakeSnappyCodec(); diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 761e883ec7e83..8f2a7f052ccb6 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -389,9 +389,81 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) { continue; } const auto level = combination.level; + const auto codec_options = arrow::util::CodecOptions(level); const auto expect_success = combination.expect_success; - auto result1 = Codec::Create(compression, level); - auto result2 = Codec::Create(compression, level); + auto result1 = Codec::Create(compression, codec_options); + auto result2 = Codec::Create(compression, codec_options); + ASSERT_EQ(expect_success, result1.ok()); + ASSERT_EQ(expect_success, result2.ok()); + if (expect_success) { + CheckCodecRoundtrip(*result1, *result2, data); + } + } +} + +TEST(TestCodecMisc, SpecifyCodecOptionsGZip) { + // for now only GZIP & Brotli codec options supported, since it has specific parameters + // to be customized, other codecs could directly go with CodecOptions, could add more + // specific codec options if needed. + struct CombinationOption { + int level; + GZipFormat format; + int window_bits; + bool expect_success; + }; + constexpr CombinationOption combinations[] = {{2, GZipFormat::ZLIB, 12, true}, + {9, GZipFormat::GZIP, 9, true}, + {9, GZipFormat::GZIP, 20, false}, + {5, GZipFormat::DEFLATE, -12, false}, + {-992, GZipFormat::GZIP, 15, false}}; + + std::vector data = MakeRandomData(2000); + for (const auto& combination : combinations) { + const auto compression = Compression::GZIP; + if (!Codec::IsAvailable(compression)) { + // Support for this codec hasn't been built + continue; + } + auto codec_options = arrow::util::GZipCodecOptions(); + codec_options.compression_level = combination.level; + codec_options.gzip_format = combination.format; + codec_options.window_bits = combination.window_bits; + const auto expect_success = combination.expect_success; + auto result1 = Codec::Create(compression, codec_options); + auto result2 = Codec::Create(compression, codec_options); + ASSERT_EQ(expect_success, result1.ok()); + ASSERT_EQ(expect_success, result2.ok()); + if (expect_success) { + CheckCodecRoundtrip(*result1, *result2, data); + } + } +} + +TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) { + // for now only GZIP & Brotli codec options supported, since it has specific parameters + // to be customized, other codecs could directly go with CodecOptions, could add more + // specific codec options if needed. + struct CombinationOption { + int level; + int window_bits; + bool expect_success; + }; + constexpr CombinationOption combinations[] = { + {8, 22, true}, {11, 10, true}, {1, 24, true}, {5, -12, false}, {-992, 25, false}}; + + std::vector data = MakeRandomData(2000); + for (const auto& combination : combinations) { + const auto compression = Compression::BROTLI; + if (!Codec::IsAvailable(compression)) { + // Support for this codec hasn't been built + continue; + } + auto codec_options = arrow::util::BrotliCodecOptions(); + codec_options.compression_level = combination.level; + codec_options.window_bits = combination.window_bits; + const auto expect_success = combination.expect_success; + auto result1 = Codec::Create(compression, codec_options); + auto result2 = Codec::Create(compression, codec_options); ASSERT_EQ(expect_success, result1.ok()); ASSERT_EQ(expect_success, result2.ok()); if (expect_success) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 6dcc5153abd4e..2b38bdceab15b 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -44,7 +44,13 @@ namespace { // there. // Maximum window size -constexpr int WINDOW_BITS = 15; +constexpr int kGZipMaxWindowBits = 15; + +// Minimum window size +constexpr int kGZipMinWindowBits = 9; + +// Default window size +constexpr int kGZipDefaultWindowBits = 15; // Output Gzip. constexpr int GZIP_CODEC = 16; @@ -55,8 +61,7 @@ constexpr int DETECT_CODEC = 32; constexpr int kGZipMinCompressionLevel = 1; constexpr int kGZipMaxCompressionLevel = 9; -int CompressionWindowBitsForFormat(GZipFormat::type format) { - int window_bits = WINDOW_BITS; +int CompressionWindowBitsForFormat(GZipFormat format, int window_bits) { switch (format) { case GZipFormat::DEFLATE: window_bits = -window_bits; @@ -70,12 +75,12 @@ int CompressionWindowBitsForFormat(GZipFormat::type format) { return window_bits; } -int DecompressionWindowBitsForFormat(GZipFormat::type format) { +int DecompressionWindowBitsForFormat(GZipFormat format, int window_bits) { if (format == GZipFormat::DEFLATE) { - return -WINDOW_BITS; + return -window_bits; } else { /* If not deflate, autodetect format from header */ - return WINDOW_BITS | DETECT_CODEC; + return window_bits | DETECT_CODEC; } } @@ -88,8 +93,11 @@ Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { class GZipDecompressor : public Decompressor { public: - explicit GZipDecompressor(GZipFormat::type format) - : format_(format), initialized_(false), finished_(false) {} + explicit GZipDecompressor(GZipFormat format, int window_bits) + : format_(format), + window_bits_(window_bits), + initialized_(false), + finished_(false) {} ~GZipDecompressor() override { if (initialized_) { @@ -103,7 +111,7 @@ class GZipDecompressor : public Decompressor { finished_ = false; int ret; - int window_bits = DecompressionWindowBitsForFormat(format_); + int window_bits = DecompressionWindowBitsForFormat(format_, window_bits_); if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { return ZlibError("zlib inflateInit failed: "); } else { @@ -161,7 +169,8 @@ class GZipDecompressor : public Decompressor { } z_stream stream_; - GZipFormat::type format_; + GZipFormat format_; + int window_bits_; bool initialized_; bool finished_; }; @@ -180,13 +189,13 @@ class GZipCompressor : public Compressor { } } - Status Init(GZipFormat::type format) { + Status Init(GZipFormat format, int input_window_bits) { DCHECK(!initialized_); memset(&stream_, 0, sizeof(stream_)); int ret; // Initialize to run specified format - int window_bits = CompressionWindowBitsForFormat(format); + int window_bits = CompressionWindowBitsForFormat(format, input_window_bits); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibError("zlib deflateInit failed: "); @@ -300,8 +309,9 @@ class GZipCompressor : public Compressor { class GZipCodec : public Codec { public: - explicit GZipCodec(int compression_level, GZipFormat::type format) + explicit GZipCodec(int compression_level, GZipFormat format, int window_bits) : format_(format), + window_bits_(window_bits), compressor_initialized_(false), decompressor_initialized_(false) { compression_level_ = compression_level == kUseDefaultCompressionLevel @@ -316,12 +326,12 @@ class GZipCodec : public Codec { Result> MakeCompressor() override { auto ptr = std::make_shared(compression_level_); - RETURN_NOT_OK(ptr->Init(format_)); + RETURN_NOT_OK(ptr->Init(format_, window_bits_)); return ptr; } Result> MakeDecompressor() override { - auto ptr = std::make_shared(format_); + auto ptr = std::make_shared(format_, window_bits_); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -332,7 +342,7 @@ class GZipCodec : public Codec { int ret; // Initialize to run specified format - int window_bits = CompressionWindowBitsForFormat(format_); + int window_bits = CompressionWindowBitsForFormat(format_, window_bits_); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg); @@ -354,7 +364,7 @@ class GZipCodec : public Codec { int ret; // Initialize to run either deflate or zlib/gzip format - int window_bits = DecompressionWindowBitsForFormat(format_); + int window_bits = DecompressionWindowBitsForFormat(format_, window_bits_); if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { return ZlibErrorPrefix("zlib inflateInit failed: ", stream_.msg); } @@ -461,6 +471,10 @@ class GZipCodec : public Codec { } Status Init() override { + if (window_bits_ < kGZipMinWindowBits || window_bits_ > kGZipMaxWindowBits) { + return Status::Invalid("GZip window_bits should be between ", kGZipMinWindowBits, + " and ", kGZipMaxWindowBits); + } const Status init_compressor_status = InitCompressor(); if (!init_compressor_status.ok()) { return init_compressor_status; @@ -482,7 +496,7 @@ class GZipCodec : public Codec { // Realistically, this will always be GZIP, but we leave the option open to // configure - GZipFormat::type format_; + GZipFormat format_; // These variables are mutually exclusive. When the codec is in "compressor" // state, compressor_initialized_ is true while decompressor_initialized_ is @@ -491,6 +505,7 @@ class GZipCodec : public Codec { // Indeed, this is slightly hacky, but the alternative is having separate // Compressor and Decompressor classes. If this ever becomes an issue, we can // perform the refactoring then + int window_bits_; bool compressor_initialized_; bool decompressor_initialized_; int compression_level_; @@ -498,8 +513,10 @@ class GZipCodec : public Codec { } // namespace -std::unique_ptr MakeGZipCodec(int compression_level, GZipFormat::type format) { - return std::make_unique(compression_level, format); +std::unique_ptr MakeGZipCodec(int compression_level, GZipFormat format, + std::optional window_bits) { + return std::make_unique(compression_level, format, + window_bits.value_or(kGZipDefaultWindowBits)); } } // namespace internal diff --git a/cpp/src/parquet/column_io_benchmark.cc b/cpp/src/parquet/column_io_benchmark.cc index 6ee579bec9a69..48e434a342e72 100644 --- a/cpp/src/parquet/column_io_benchmark.cc +++ b/cpp/src/parquet/column_io_benchmark.cc @@ -40,8 +40,7 @@ std::shared_ptr BuildWriter(int64_t output_size, ColumnDescriptor* schema, const WriterProperties* properties, Compression::type codec) { - std::unique_ptr pager = - PageWriter::Open(dst, codec, Codec::UseDefaultCompressionLevel(), metadata); + std::unique_ptr pager = PageWriter::Open(dst, codec, metadata); std::shared_ptr writer = ColumnWriter::Make(metadata, std::move(pager), properties); return std::static_pointer_cast(writer); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 33e9f8f6658ae..e34420b9f6e79 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -249,14 +249,14 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) { class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal, int16_t column_chunk_ordinal, - bool use_page_checksum_verification, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, + int16_t column_chunk_ordinal, bool use_page_checksum_verification, MemoryPool* pool = ::arrow::default_memory_pool(), std::shared_ptr meta_encryptor = nullptr, std::shared_ptr data_encryptor = nullptr, ColumnIndexBuilder* column_index_builder = nullptr, - OffsetIndexBuilder* offset_index_builder = nullptr) + OffsetIndexBuilder* offset_index_builder = nullptr, + const CodecOptions& codec_options = CodecOptions{}) : sink_(std::move(sink)), metadata_(metadata), pool_(pool), @@ -277,7 +277,7 @@ class SerializedPageWriter : public PageWriter { if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) { InitEncryption(); } - compressor_ = GetCodec(codec, compression_level); + compressor_ = GetCodec(codec, codec_options); thrift_serializer_ = std::make_unique(); } @@ -618,21 +618,21 @@ class SerializedPageWriter : public PageWriter { class BufferedPageWriter : public PageWriter { public: BufferedPageWriter(std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal, int16_t current_column_ordinal, - bool use_page_checksum_verification, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, + int16_t current_column_ordinal, bool use_page_checksum_verification, MemoryPool* pool = ::arrow::default_memory_pool(), std::shared_ptr meta_encryptor = nullptr, std::shared_ptr data_encryptor = nullptr, ColumnIndexBuilder* column_index_builder = nullptr, - OffsetIndexBuilder* offset_index_builder = nullptr) + OffsetIndexBuilder* offset_index_builder = nullptr, + const CodecOptions& codec_options = CodecOptions{}) : final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) { in_memory_sink_ = CreateOutputStream(pool); pager_ = std::make_unique( - in_memory_sink_, codec, compression_level, metadata, row_group_ordinal, - current_column_ordinal, use_page_checksum_verification, pool, - std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, - offset_index_builder); + in_memory_sink_, codec, metadata, row_group_ordinal, current_column_ordinal, + use_page_checksum_verification, pool, std::move(meta_encryptor), + std::move(data_encryptor), column_index_builder, offset_index_builder, + codec_options); } int64_t WriteDictionaryPage(const DictionaryPage& page) override { @@ -690,26 +690,38 @@ class BufferedPageWriter : public PageWriter { std::unique_ptr PageWriter::Open( std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool, - bool buffered_row_group, std::shared_ptr meta_encryptor, - std::shared_ptr data_encryptor, bool page_write_checksum_enabled, - ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* offset_index_builder) { + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, + int16_t column_chunk_ordinal, MemoryPool* pool, bool buffered_row_group, + std::shared_ptr meta_encryptor, std::shared_ptr data_encryptor, + bool page_write_checksum_enabled, ColumnIndexBuilder* column_index_builder, + OffsetIndexBuilder* offset_index_builder, const CodecOptions& codec_options) { if (buffered_row_group) { return std::unique_ptr(new BufferedPageWriter( - std::move(sink), codec, compression_level, metadata, row_group_ordinal, - column_chunk_ordinal, page_write_checksum_enabled, pool, - std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, - offset_index_builder)); + std::move(sink), codec, metadata, row_group_ordinal, column_chunk_ordinal, + page_write_checksum_enabled, pool, std::move(meta_encryptor), + std::move(data_encryptor), column_index_builder, offset_index_builder, + codec_options)); } else { return std::unique_ptr(new SerializedPageWriter( - std::move(sink), codec, compression_level, metadata, row_group_ordinal, - column_chunk_ordinal, page_write_checksum_enabled, pool, - std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, - offset_index_builder)); + std::move(sink), codec, metadata, row_group_ordinal, column_chunk_ordinal, + page_write_checksum_enabled, pool, std::move(meta_encryptor), + std::move(data_encryptor), column_index_builder, offset_index_builder, + codec_options)); } } +std::unique_ptr PageWriter::Open( + std::shared_ptr sink, Compression::type codec, + int compression_level, ColumnChunkMetaDataBuilder* metadata, + int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool, + bool buffered_row_group, std::shared_ptr meta_encryptor, + std::shared_ptr data_encryptor, bool page_write_checksum_enabled, + ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* offset_index_builder) { + return PageWriter::Open(sink, codec, metadata, row_group_ordinal, column_chunk_ordinal, + pool, buffered_row_group, meta_encryptor, data_encryptor, + page_write_checksum_enabled, column_index_builder, + offset_index_builder, CodecOptions{compression_level}); +} // ---------------------------------------------------------------------- // ColumnWriter diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 792b108ac8835..88a42acc2f706 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/util/compression.h" #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/types.h" @@ -35,6 +36,7 @@ class BitWriter; namespace util { class RleEncoder; +class CodecOptions; } // namespace util } // namespace arrow @@ -85,6 +87,22 @@ class PARQUET_EXPORT PageWriter { public: virtual ~PageWriter() {} + static std::unique_ptr Open( + std::shared_ptr sink, Compression::type codec, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal = -1, + int16_t column_chunk_ordinal = -1, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), + bool buffered_row_group = false, + std::shared_ptr header_encryptor = NULLPTR, + std::shared_ptr data_encryptor = NULLPTR, + bool page_write_checksum_enabled = false, + // column_index_builder MUST outlive the PageWriter + ColumnIndexBuilder* column_index_builder = NULLPTR, + // offset_index_builder MUST outlive the PageWriter + OffsetIndexBuilder* offset_index_builder = NULLPTR, + const CodecOptions& codec_options = CodecOptions{}); + + ARROW_DEPRECATED("Deprecated in 13.0.0. Use CodecOptions-taking overload instead.") static std::unique_ptr Open( std::shared_ptr sink, Compression::type codec, int compression_level, ColumnChunkMetaDataBuilder* metadata, diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index af9876370ee42..58199c402bd7a 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -118,8 +118,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); std::unique_ptr pager = PageWriter::Open( - sink_, column_properties.compression(), Codec::UseDefaultCompressionLevel(), - metadata_.get(), /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, + sink_, column_properties.compression(), metadata_.get(), + /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, ::arrow::default_memory_pool(), /* buffered_row_group */ false, /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR, enable_checksum); std::shared_ptr writer = @@ -162,6 +162,20 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); } + void TestRequiredWithCodecOptions(Encoding::type encoding, + Compression::type compression, bool enable_dictionary, + bool enable_statistics, int64_t num_rows = SMALL_SIZE, + const std::shared_ptr& codec_options = + std::make_shared(), + bool enable_checksum = false) { + this->GenerateData(num_rows); + + this->WriteRequiredWithCodecOptions(encoding, compression, enable_dictionary, + enable_statistics, codec_options, num_rows, + enable_checksum); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); + } + void TestDictionaryFallbackEncoding(ParquetVersion::type version) { this->GenerateData(VERY_LARGE_SIZE); ColumnProperties column_properties; @@ -238,7 +252,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { bool enable_checksum) { ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); - column_properties.set_compression_level(compression_level); + column_properties.set_codec_options( + std::make_shared(compression_level)); std::shared_ptr> writer = this->BuildWriter( num_rows, column_properties, ParquetVersion::PARQUET_1_0, enable_checksum); writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); @@ -256,7 +271,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { bit_util::BytesForBits(static_cast(this->values_.size())) + 1, 255); ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); - column_properties.set_compression_level(compression_level); + column_properties.set_codec_options( + std::make_shared(compression_level)); std::shared_ptr> writer = this->BuildWriter( num_rows, column_properties, ParquetVersion::PARQUET_1_0, enable_checksum); writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, @@ -266,6 +282,22 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { writer->Close(); } + void WriteRequiredWithCodecOptions(Encoding::type encoding, + Compression::type compression, + bool enable_dictionary, bool enable_statistics, + const std::shared_ptr& codec_options, + int64_t num_rows, bool enable_checksum) { + ColumnProperties column_properties(encoding, compression, enable_dictionary, + enable_statistics); + column_properties.set_codec_options(codec_options); + std::shared_ptr> writer = this->BuildWriter( + num_rows, column_properties, ParquetVersion::PARQUET_1_0, enable_checksum); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + // The behaviour should be independent from the number of Close() calls + writer->Close(); + writer->Close(); + } + void ReadAndCompare(Compression::type compression, int64_t num_rows, bool page_checksum_verify) { this->SetupValuesOut(num_rows); @@ -522,6 +554,14 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE); } + +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCodecOptions) { + auto codec_options = std::make_shared<::arrow::util::GZipCodecOptions>(); + codec_options->gzip_format = ::arrow::util::GZipFormat::GZIP; + codec_options->window_bits = 12; + this->TestRequiredWithCodecOptions(Encoding::PLAIN, Compression::GZIP, false, false, + LARGE_SIZE, codec_options); +} #endif #ifdef ARROW_WITH_LZ4 @@ -818,8 +858,7 @@ TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); std::unique_ptr pager = - PageWriter::Open(sink, Compression::UNCOMPRESSED, - Codec::UseDefaultCompressionLevel(), metadata.get()); + PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get()); std::shared_ptr writer = ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); auto typed_writer = std::static_pointer_cast>(writer); @@ -1350,7 +1389,7 @@ class ColumnWriterTestSizeEstimated : public ::testing::Test { schema_descriptor_->Column(0)); std::unique_ptr pager = PageWriter::Open( - sink_, compression, Codec::UseDefaultCompressionLevel(), metadata_.get(), + sink_, compression, metadata_.get(), /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, ::arrow::default_memory_pool(), /* buffered_row_group */ buffered, /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR, diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 42ce7591cb7a6..ef86e742362b8 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -153,11 +153,24 @@ class RowGroupSerializer : public RowGroupWriter::Contents { auto oi_builder = page_index_builder_ && properties_->page_index_enabled(path) ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) : nullptr; - std::unique_ptr pager = PageWriter::Open( - sink_, properties_->compression(path), properties_->compression_level(path), - col_meta, row_group_ordinal_, static_cast(column_ordinal), - properties_->memory_pool(), false, meta_encryptor, data_encryptor, - properties_->page_checksum_enabled(), ci_builder, oi_builder); + auto codec_options = properties_->codec_options(path) + ? properties_->codec_options(path).get() + : nullptr; + + std::unique_ptr pager; + if (!codec_options) { + pager = PageWriter::Open(sink_, properties_->compression(path), col_meta, + row_group_ordinal_, static_cast(column_ordinal), + properties_->memory_pool(), false, meta_encryptor, + data_encryptor, properties_->page_checksum_enabled(), + ci_builder, oi_builder, CodecOptions()); + } else { + pager = PageWriter::Open(sink_, properties_->compression(path), col_meta, + row_group_ordinal_, static_cast(column_ordinal), + properties_->memory_pool(), false, meta_encryptor, + data_encryptor, properties_->page_checksum_enabled(), + ci_builder, oi_builder, *codec_options); + } column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); return column_writers_[0].get(); } @@ -291,12 +304,24 @@ class RowGroupSerializer : public RowGroupWriter::Contents { auto oi_builder = page_index_builder_ && properties_->page_index_enabled(path) ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) : nullptr; - std::unique_ptr pager = PageWriter::Open( - sink_, properties_->compression(path), properties_->compression_level(path), - col_meta, static_cast(row_group_ordinal_), - static_cast(column_ordinal), properties_->memory_pool(), - buffered_row_group_, meta_encryptor, data_encryptor, - properties_->page_checksum_enabled(), ci_builder, oi_builder); + auto codec_options = properties_->codec_options(path) + ? (properties_->codec_options(path)).get() + : nullptr; + + std::unique_ptr pager; + if (!codec_options) { + pager = PageWriter::Open(sink_, properties_->compression(path), col_meta, + row_group_ordinal_, static_cast(column_ordinal), + properties_->memory_pool(), false, meta_encryptor, + data_encryptor, properties_->page_checksum_enabled(), + ci_builder, oi_builder, CodecOptions()); + } else { + pager = PageWriter::Open(sink_, properties_->compression(path), col_meta, + row_group_ordinal_, static_cast(column_ordinal), + properties_->memory_pool(), false, meta_encryptor, + data_encryptor, properties_->page_checksum_enabled(), + ci_builder, oi_builder, *codec_options); + } column_writers_.push_back( ColumnWriter::Make(col_meta, std::move(pager), properties_)); } diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h index 00a193f144a18..b085e57cd9918 100644 --- a/cpp/src/parquet/platform.h +++ b/cpp/src/parquet/platform.h @@ -87,6 +87,7 @@ namespace parquet { using Buffer = ::arrow::Buffer; using Codec = ::arrow::util::Codec; +using CodecOptions = ::arrow::util::CodecOptions; using Compression = ::arrow::Compression; using MemoryPool = ::arrow::MemoryPool; using MutableBuffer = ::arrow::MutableBuffer; diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index c195ab80791ca..bd7eb9dc7abd6 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -153,7 +153,6 @@ class PARQUET_EXPORT ColumnProperties { dictionary_enabled_(dictionary_enabled), statistics_enabled_(statistics_enabled), max_stats_size_(max_stats_size), - compression_level_(Codec::UseDefaultCompressionLevel()), page_index_enabled_(page_index_enabled) {} void set_encoding(Encoding::type encoding) { encoding_ = encoding; } @@ -173,7 +172,14 @@ class PARQUET_EXPORT ColumnProperties { } void set_compression_level(int compression_level) { - compression_level_ = compression_level; + if (!codec_options_) { + codec_options_ = std::make_shared(); + } + codec_options_->compression_level = compression_level; + } + + void set_codec_options(const std::shared_ptr& codec_options) { + codec_options_ = codec_options; } void set_page_index_enabled(bool page_index_enabled) { @@ -190,7 +196,9 @@ class PARQUET_EXPORT ColumnProperties { size_t max_statistics_size() const { return max_stats_size_; } - int compression_level() const { return compression_level_; } + int compression_level() const { return codec_options_->compression_level; } + + const std::shared_ptr& codec_options() const { return codec_options_; } bool page_index_enabled() const { return page_index_enabled_; } @@ -200,7 +208,7 @@ class PARQUET_EXPORT ColumnProperties { bool dictionary_enabled_; bool statistics_enabled_; size_t max_stats_size_; - int compression_level_; + std::shared_ptr codec_options_; bool page_index_enabled_; }; @@ -394,6 +402,9 @@ class PARQUET_EXPORT WriterProperties { /// level is selected by the user or if the special /// std::numeric_limits::min() value is passed, then Arrow selects the /// compression level. + /// + /// If other compressor-specific options need to be set in addition to the compression + /// level, use the codec_options method. Builder* compression_level(int compression_level) { default_column_properties_.set_compression_level(compression_level); return this; @@ -411,7 +422,10 @@ class PARQUET_EXPORT WriterProperties { /// std::numeric_limits::min() value is passed, then Arrow selects the /// compression level. Builder* compression_level(const std::string& path, int compression_level) { - codecs_compression_level_[path] = compression_level; + if (!codec_options_[path]) { + codec_options_[path] = std::make_shared(); + } + codec_options_[path]->compression_level = compression_level; return this; } @@ -431,6 +445,34 @@ class PARQUET_EXPORT WriterProperties { return this->compression_level(path->ToDotString(), compression_level); } + /// \brief Specify the default codec options for the compressor in + /// every column. + /// + /// The codec options allow configuring the compression level as well + /// as other codec-specific options. + Builder* codec_options( + const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { + default_column_properties_.set_codec_options(codec_options); + return this; + } + + /// \brief Specify the codec options for the compressor for the column + /// described by path. + Builder* codec_options( + const std::string& path, + const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { + codec_options_[path] = codec_options; + return this; + } + + /// \brief Specify the codec options for the compressor for the column + /// described by path. + Builder* codec_options( + const std::shared_ptr& path, + const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { + return this->codec_options(path->ToDotString(), codec_options); + } + /// Define the file encryption properties. /// Default NULL. Builder* encryption( @@ -579,8 +621,8 @@ class PARQUET_EXPORT WriterProperties { for (const auto& item : encodings_) get(item.first).set_encoding(item.second); for (const auto& item : codecs_) get(item.first).set_compression(item.second); - for (const auto& item : codecs_compression_level_) - get(item.first).set_compression_level(item.second); + for (const auto& item : codec_options_) + get(item.first).set_codec_options(item.second); for (const auto& item : dictionary_enabled_) get(item.first).set_dictionary_enabled(item.second); for (const auto& item : statistics_enabled_) @@ -617,7 +659,7 @@ class PARQUET_EXPORT WriterProperties { ColumnProperties default_column_properties_; std::unordered_map encodings_; std::unordered_map codecs_; - std::unordered_map codecs_compression_level_; + std::unordered_map> codec_options_; std::unordered_map dictionary_enabled_; std::unordered_map statistics_enabled_; std::unordered_map page_index_enabled_; @@ -680,6 +722,11 @@ class PARQUET_EXPORT WriterProperties { return column_properties(path).compression_level(); } + const std::shared_ptr codec_options( + const std::shared_ptr& path) const { + return column_properties(path).codec_options(); + } + bool dictionary_enabled(const std::shared_ptr& path) const { return column_properties(path).dictionary_enabled(); } diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc index 2ba1b8a604604..96c3a63b831eb 100644 --- a/cpp/src/parquet/properties_test.cc +++ b/cpp/src/parquet/properties_test.cc @@ -70,6 +70,37 @@ TEST(TestWriterProperties, AdvancedHandling) { ASSERT_EQ(ParquetDataPageVersion::V2, props->data_page_version()); } +TEST(TestWriterProperties, SetCodecOptions) { + WriterProperties::Builder builder; + builder.compression("gzip", Compression::GZIP); + builder.compression("zstd", Compression::ZSTD); + builder.compression("brotli", Compression::BROTLI); + auto gzip_codec_options = std::make_shared<::arrow::util::GZipCodecOptions>(); + gzip_codec_options->compression_level = 5; + gzip_codec_options->window_bits = 12; + builder.codec_options("gzip", gzip_codec_options); + auto codec_options = std::make_shared(); + builder.codec_options(codec_options); + auto brotli_codec_options = std::make_shared<::arrow::util::BrotliCodecOptions>(); + brotli_codec_options->compression_level = 11; + brotli_codec_options->window_bits = 20; + builder.codec_options("brotli", brotli_codec_options); + std::shared_ptr props = builder.build(); + + ASSERT_EQ(5, + props->codec_options(ColumnPath::FromDotString("gzip"))->compression_level); + ASSERT_EQ(12, std::dynamic_pointer_cast<::arrow::util::GZipCodecOptions>( + props->codec_options(ColumnPath::FromDotString("gzip"))) + ->window_bits); + ASSERT_EQ(Codec::UseDefaultCompressionLevel(), + props->codec_options(ColumnPath::FromDotString("zstd"))->compression_level); + ASSERT_EQ(11, + props->codec_options(ColumnPath::FromDotString("brotli"))->compression_level); + ASSERT_EQ(20, std::dynamic_pointer_cast<::arrow::util::BrotliCodecOptions>( + props->codec_options(ColumnPath::FromDotString("brotli"))) + ->window_bits); +} + TEST(TestReaderProperties, GetStreamInsufficientData) { // ARROW-6058 std::string data = "shorter than expected"; diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 28f472aaf9dd8..3127b60e5d1ae 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -51,10 +51,11 @@ bool IsCodecSupported(Compression::type codec) { } std::unique_ptr GetCodec(Compression::type codec) { - return GetCodec(codec, Codec::UseDefaultCompressionLevel()); + return GetCodec(codec, CodecOptions()); } -std::unique_ptr GetCodec(Compression::type codec, int compression_level) { +std::unique_ptr GetCodec(Compression::type codec, + const CodecOptions& codec_options) { std::unique_ptr result; if (codec == Compression::LZO) { throw ParquetException( @@ -69,10 +70,15 @@ std::unique_ptr GetCodec(Compression::type codec, int compression_level) throw ParquetException(ss.str()); } - PARQUET_ASSIGN_OR_THROW(result, Codec::Create(codec, compression_level)); + PARQUET_ASSIGN_OR_THROW(result, Codec::Create(codec, codec_options)); return result; } +// use compression level to create Codec +std::unique_ptr GetCodec(Compression::type codec, int compression_level) { + return GetCodec(codec, CodecOptions{compression_level}); +} + bool PageCanUseChecksum(PageType::type pageType) { switch (pageType) { case PageType::type::DATA_PAGE: diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index d4d6a73f147fc..f35384b8df1ef 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -487,6 +487,10 @@ bool IsCodecSupported(Compression::type codec); PARQUET_EXPORT std::unique_ptr GetCodec(Compression::type codec); +PARQUET_EXPORT +std::unique_ptr GetCodec(Compression::type codec, + const CodecOptions& codec_options); + PARQUET_EXPORT std::unique_ptr GetCodec(Compression::type codec, int compression_level);