From 1e5c232fdf63b834ea0a93e0f82a7f6d3c5fb54b Mon Sep 17 00:00:00 2001 From: yyang52 Date: Mon, 8 May 2023 06:08:23 +0000 Subject: [PATCH] add CodecOptions to customize the compression parameter --- cpp/examples/arrow/parquet_read_write.cc | 33 ++++++++++ cpp/src/arrow/flight/flight_benchmark.cc | 4 +- cpp/src/arrow/ipc/feather.cc | 3 +- cpp/src/arrow/type_fwd.h | 1 + cpp/src/arrow/util/compression.cc | 22 +++++-- cpp/src/arrow/util/compression.h | 48 +++++++++++++- cpp/src/arrow/util/compression_brotli.cc | 29 ++++++--- cpp/src/arrow/util/compression_internal.h | 15 ++--- cpp/src/arrow/util/compression_lz4.cc | 8 +-- cpp/src/arrow/util/compression_test.cc | 76 ++++++++++++++++++++++- cpp/src/arrow/util/compression_zlib.cc | 52 ++++++++++------ cpp/src/parquet/column_io_benchmark.cc | 4 +- cpp/src/parquet/column_writer.cc | 33 +++++----- cpp/src/parquet/column_writer.h | 6 +- cpp/src/parquet/column_writer_test.cc | 58 ++++++++++++++--- cpp/src/parquet/file_writer.cc | 6 +- cpp/src/parquet/platform.h | 1 + cpp/src/parquet/properties.h | 75 +++++++++++----------- cpp/src/parquet/properties_test.cc | 31 +++++++++ cpp/src/parquet/types.cc | 7 ++- cpp/src/parquet/types.h | 3 +- r/src/compression.cpp | 3 +- 22 files changed, 387 insertions(+), 131 deletions(-) diff --git a/cpp/examples/arrow/parquet_read_write.cc b/cpp/examples/arrow/parquet_read_write.cc index 3b8b4c2212b75..9a59c3db97de9 100644 --- a/cpp/examples/arrow/parquet_read_write.cc +++ b/cpp/examples/arrow/parquet_read_write.cc @@ -165,11 +165,44 @@ arrow::Status WriteInBatches(std::string path_to_file) { return arrow::Status::OK(); } +arrow::Status WriteWithCodecOptions(std::string path_to_file) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + ARROW_ASSIGN_OR_RAISE(std::shared_ptr table, GetTable()); + + // Customize codec options with compression level and window bits + // the default window bits is 15, here we set a small number of 12 for some use scenario + // with limited hisotry buffer size + auto codec_options = std::make_shared(); + codec_options->compression_level_ = 9; + codec_options->window_bits = 12; + + // Choose compression + std::shared_ptr props = WriterProperties::Builder() + .compression(arrow::Compression::GZIP) + ->codec_options(codec_options) + ->build(); + + // Opt to store Arrow schema for easier reads back into Arrow + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + + std::shared_ptr outfile; + ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file)); + + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(), + arrow::default_memory_pool(), outfile, + /*chunk_size=*/3, props, arrow_props)); + return arrow::Status::OK(); +} + arrow::Status RunExamples(std::string path_to_file) { ARROW_RETURN_NOT_OK(WriteFullFile(path_to_file)); ARROW_RETURN_NOT_OK(ReadFullFile(path_to_file)); ARROW_RETURN_NOT_OK(WriteInBatches(path_to_file)); ARROW_RETURN_NOT_OK(ReadInBatches(path_to_file)); + ARROW_RETURN_NOT_OK(WriteWithCodecOptions(path_to_file)); return arrow::Status::OK(); } diff --git a/cpp/src/arrow/flight/flight_benchmark.cc b/cpp/src/arrow/flight/flight_benchmark.cc index f53b1c6dcea30..9ce96e43d7bb7 100644 --- a/cpp/src/arrow/flight/flight_benchmark.cc +++ b/cpp/src/arrow/flight/flight_benchmark.cc @@ -432,7 +432,9 @@ int main(int argc, char** argv) { const int level = level_str.empty() ? arrow::util::kUseDefaultCompressionLevel : std::stoi(level_str); const auto type = arrow::util::Codec::GetCompressionType(name).ValueOrDie(); - auto codec = arrow::util::Codec::Create(type, level).ValueOrDie(); + auto codec = arrow::util::Codec::Create( + type, std::make_shared(level)) + .ValueOrDie(); std::cout << "Compression method: " << name; if (!level_str.empty()) { std::cout << ", level " << level; diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index b6d3a3d7d8cbb..7ca32f9a07520 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -815,7 +815,8 @@ Status WriteTable(const Table& table, io::OutputStream* dst, ipc_options.allow_64bit = true; ARROW_ASSIGN_OR_RAISE( ipc_options.codec, - util::Codec::Create(properties.compression, properties.compression_level)); + util::Codec::Create(properties.compression, std::make_shared( + properties.compression_level))); std::shared_ptr writer; ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), ipc_options)); diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index 657abbaecc42b..2217802a06aeb 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -45,6 +45,7 @@ class Future; namespace util { class Codec; +class CodecOptions; } // namespace util class Buffer; diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index c67cb4539bc8f..7be5e28e0397f 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -135,8 +135,8 @@ Result Codec::DefaultCompressionLevel(Compression::type codec_type) { return codec->default_compression_level(); } -Result> Codec::Create(Compression::type codec_type, - int compression_level) { +Result> Codec::Create( + Compression::type codec_type, const std::shared_ptr& codec_options) { if (!IsAvailable(codec_type)) { if (codec_type == Compression::LZO) { return Status::NotImplemented("LZO codec not implemented"); @@ -151,6 +151,7 @@ Result> Codec::Create(Compression::type codec_type, "' not built"); } + auto compression_level = codec_options->compression_level_; if (compression_level != kUseDefaultCompressionLevel && !SupportsCompressionLevel(codec_type)) { return Status::Invalid("Codec '", GetCodecAsString(codec_type), @@ -166,16 +167,25 @@ Result> Codec::Create(Compression::type codec_type, codec = internal::MakeSnappyCodec(); #endif break; - case Compression::GZIP: + case Compression::GZIP: { #ifdef ARROW_WITH_ZLIB - codec = internal::MakeGZipCodec(compression_level); + std::shared_ptr opt = + std::dynamic_pointer_cast(codec_options); + codec = internal::MakeGZipCodec(compression_level, + opt ? opt->gzip_format : GZipFormat::GZIP, + opt ? opt->window_bits : kGZipDefaultWindowBits); #endif break; - case Compression::BROTLI: + } + case Compression::BROTLI: { #ifdef ARROW_WITH_BROTLI - codec = internal::MakeBrotliCodec(compression_level); + std::shared_ptr opt = + std::dynamic_pointer_cast(codec_options); + codec = internal::MakeBrotliCodec( + compression_level, opt ? opt->window_bits : kBrotliDefaultWindowBits); #endif break; + } case Compression::LZ4: #ifdef ARROW_WITH_LZ4 codec = internal::MakeLz4RawCodec(compression_level); diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index f0d359d195c80..de596a833980a 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -107,6 +107,50 @@ class ARROW_EXPORT Decompressor { // XXX add methods for buffer size heuristics? }; +/// \brief Compression codec options +class ARROW_EXPORT CodecOptions { + public: + CodecOptions(int compression_level = kUseDefaultCompressionLevel) { + compression_level_ = compression_level; + } + virtual ~CodecOptions() = default; + + int compression_level_; +}; + +// ---------------------------------------------------------------------- +// gzip codec options implementation + +struct GZipFormat { + enum type { + ZLIB, + DEFLATE, + GZIP, + }; +}; + +constexpr int kGZipDefaultWindowBits = 15; + +class GZipCodecOptions : public CodecOptions { + public: + ~GZipCodecOptions() = default; + + GZipFormat::type gzip_format = GZipFormat::GZIP; + int window_bits = kGZipDefaultWindowBits; +}; + +// ---------------------------------------------------------------------- +// brotli codec options implementation + +constexpr int kBrotliDefaultWindowBits = 22; + +class BrotliCodecOptions : public CodecOptions { + public: + ~BrotliCodecOptions() = default; + + int window_bits = kBrotliDefaultWindowBits; +}; + /// \brief Compression codec class ARROW_EXPORT Codec { public: @@ -124,7 +168,9 @@ class ARROW_EXPORT Codec { /// \brief Create a codec for the given compression algorithm static Result> Create( - Compression::type codec, int compression_level = kUseDefaultCompressionLevel); + Compression::type codec, + const std::shared_ptr& codec_options = + std::make_shared(kUseDefaultCompressionLevel)); /// \brief Return true if support for indicated codec has been enabled static bool IsAvailable(Compression::type codec); diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 0ee69281c9fa0..155747b5c7f77 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -92,8 +92,8 @@ class BrotliDecompressor : public Decompressor { class BrotliCompressor : public Compressor { public: - explicit BrotliCompressor(int compression_level) - : compression_level_(compression_level) {} + explicit BrotliCompressor(int compression_level, int window_bits) + : compression_level_(compression_level), window_bits_(window_bits) {} ~BrotliCompressor() override { if (state_ != nullptr) { @@ -109,6 +109,12 @@ class BrotliCompressor : public Compressor { if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) { return BrotliError("Brotli set compression level failed"); } + if (window_bits_ < BROTLI_MIN_WINDOW_BITS || window_bits_ > BROTLI_MAX_WINDOW_BITS) { + return Status::Invalid("window_bits should be within 10 ~ 24"); + } + if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_LGWIN, window_bits_)) { + return BrotliError("Brotli set window size failed"); + } return Status::OK(); } @@ -166,6 +172,7 @@ class BrotliCompressor : public Compressor { private: const int compression_level_; + const int window_bits_; }; // ---------------------------------------------------------------------- @@ -173,8 +180,9 @@ class BrotliCompressor : public Compressor { class BrotliCodec : public Codec { public: - explicit BrotliCodec(int compression_level) - : compression_level_(compression_level == kUseDefaultCompressionLevel + explicit BrotliCodec(int compression_level, int window_bits) + : window_bits_(window_bits), + compression_level_(compression_level == kUseDefaultCompressionLevel ? kBrotliDefaultCompressionLevel : compression_level) {} @@ -201,16 +209,16 @@ class BrotliCodec : public Codec { DCHECK_GE(input_len, 0); DCHECK_GE(output_buffer_len, 0); std::size_t output_size = static_cast(output_buffer_len); - if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW, - BROTLI_DEFAULT_MODE, static_cast(input_len), input, - &output_size, output_buffer) == BROTLI_FALSE) { + if (BrotliEncoderCompress(compression_level_, window_bits_, BROTLI_DEFAULT_MODE, + static_cast(input_len), input, &output_size, + output_buffer) == BROTLI_FALSE) { return Status::IOError("Brotli compression failure."); } return output_size; } Result> MakeCompressor() override { - auto ptr = std::make_shared(compression_level_); + auto ptr = std::make_shared(compression_level_, window_bits_); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -232,12 +240,13 @@ class BrotliCodec : public Codec { private: const int compression_level_; + const int window_bits_; }; } // namespace -std::unique_ptr MakeBrotliCodec(int compression_level) { - return std::make_unique(compression_level); +std::unique_ptr MakeBrotliCodec(int compression_level, int window_bits) { + return std::make_unique(compression_level, window_bits); } } // namespace internal diff --git a/cpp/src/arrow/util/compression_internal.h b/cpp/src/arrow/util/compression_internal.h index d4cdca117da0c..50a969fc842d3 100644 --- a/cpp/src/arrow/util/compression_internal.h +++ b/cpp/src/arrow/util/compression_internal.h @@ -35,25 +35,20 @@ constexpr int kBrotliDefaultCompressionLevel = 8; // Brotli codec. std::unique_ptr MakeBrotliCodec( - int compression_level = kBrotliDefaultCompressionLevel); + int compression_level = kBrotliDefaultCompressionLevel, + int window_bits = kBrotliDefaultWindowBits); // BZ2 codec. constexpr int kBZ2DefaultCompressionLevel = 9; + std::unique_ptr MakeBZ2Codec(int compression_level = kBZ2DefaultCompressionLevel); // GZip constexpr int kGZipDefaultCompressionLevel = 9; -struct GZipFormat { - enum type { - ZLIB, - DEFLATE, - GZIP, - }; -}; - std::unique_ptr MakeGZipCodec(int compression_level = kGZipDefaultCompressionLevel, - GZipFormat::type format = GZipFormat::GZIP); + GZipFormat::type format = GZipFormat::GZIP, + int window_bits = kGZipDefaultWindowBits); // Snappy std::unique_ptr MakeSnappyCodec(); diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 17e013c13ee0b..de5f0fcc40af1 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -17,13 +17,13 @@ #include "arrow/util/compression_internal.h" -#include -#include -#include - #include #include #include +#include +#include +#include +#include #include "arrow/result.h" #include "arrow/status.h" diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 761e883ec7e83..017b9eb7f7b10 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -389,9 +389,81 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) { continue; } const auto level = combination.level; + const auto codec_options = std::make_shared(level); const auto expect_success = combination.expect_success; - auto result1 = Codec::Create(compression, level); - auto result2 = Codec::Create(compression, level); + auto result1 = Codec::Create(compression, codec_options); + auto result2 = Codec::Create(compression, codec_options); + ASSERT_EQ(expect_success, result1.ok()); + ASSERT_EQ(expect_success, result2.ok()); + if (expect_success) { + CheckCodecRoundtrip(*result1, *result2, data); + } + } +} + +TEST(TestCodecMisc, SpecifyCodecOptionsGZip) { + // for now only GZIP & Brotli codec options supported, since it has specific parameters + // to be customized, other codecs could directly go with CodecOptions, could add more + // specific codec options if needed. + struct CombinationOption { + int level; + GZipFormat::type format; + int window_bits; + bool expect_success; + }; + constexpr CombinationOption combinations[] = {{2, GZipFormat::ZLIB, 12, true}, + {9, GZipFormat::GZIP, 9, true}, + {9, GZipFormat::GZIP, 20, false}, + {5, GZipFormat::DEFLATE, -12, false}, + {-992, GZipFormat::GZIP, 15, false}}; + + std::vector data = MakeRandomData(2000); + for (const auto& combination : combinations) { + const auto compression = Compression::GZIP; + if (!Codec::IsAvailable(compression)) { + // Support for this codec hasn't been built + continue; + } + auto codec_options = std::make_shared(); + codec_options->compression_level_ = combination.level; + codec_options->gzip_format = combination.format; + codec_options->window_bits = combination.window_bits; + const auto expect_success = combination.expect_success; + auto result1 = Codec::Create(compression, codec_options); + auto result2 = Codec::Create(compression, codec_options); + ASSERT_EQ(expect_success, result1.ok()); + ASSERT_EQ(expect_success, result2.ok()); + if (expect_success) { + CheckCodecRoundtrip(*result1, *result2, data); + } + } +} + +TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) { + // for now only GZIP & Brotli codec options supported, since it has specific parameters + // to be customized, other codecs could directly go with CodecOptions, could add more + // specific codec options if needed. + struct CombinationOption { + int level; + int window_bits; + bool expect_success; + }; + constexpr CombinationOption combinations[] = { + {8, 22, true}, {11, 10, true}, {1, 24, true}, {5, -12, false}, {-992, 25, false}}; + + std::vector data = MakeRandomData(2000); + for (const auto& combination : combinations) { + const auto compression = Compression::BROTLI; + if (!Codec::IsAvailable(compression)) { + // Support for this codec hasn't been built + continue; + } + auto codec_options = std::make_shared(); + codec_options->compression_level_ = combination.level; + codec_options->window_bits = combination.window_bits; + const auto expect_success = combination.expect_success; + auto result1 = Codec::Create(compression, codec_options); + auto result2 = Codec::Create(compression, codec_options); ASSERT_EQ(expect_success, result1.ok()); ASSERT_EQ(expect_success, result2.ok()); if (expect_success) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index 6dcc5153abd4e..7aab6759118e0 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -17,15 +17,14 @@ #include "arrow/util/compression_internal.h" +#include +#include #include #include #include #include #include -#include -#include - #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/logging.h" @@ -46,6 +45,9 @@ namespace { // Maximum window size constexpr int WINDOW_BITS = 15; +// Minimum window size +constexpr int kGZipMinWindowBits = 9; + // Output Gzip. constexpr int GZIP_CODEC = 16; @@ -55,8 +57,8 @@ constexpr int DETECT_CODEC = 32; constexpr int kGZipMinCompressionLevel = 1; constexpr int kGZipMaxCompressionLevel = 9; -int CompressionWindowBitsForFormat(GZipFormat::type format) { - int window_bits = WINDOW_BITS; +int CompressionWindowBitsForFormat(GZipFormat::type format, int input_window_bits) { + int window_bits = input_window_bits; switch (format) { case GZipFormat::DEFLATE: window_bits = -window_bits; @@ -70,12 +72,12 @@ int CompressionWindowBitsForFormat(GZipFormat::type format) { return window_bits; } -int DecompressionWindowBitsForFormat(GZipFormat::type format) { +int DecompressionWindowBitsForFormat(GZipFormat::type format, int input_window_bits) { if (format == GZipFormat::DEFLATE) { - return -WINDOW_BITS; + return -input_window_bits; } else { /* If not deflate, autodetect format from header */ - return WINDOW_BITS | DETECT_CODEC; + return input_window_bits | DETECT_CODEC; } } @@ -88,8 +90,11 @@ Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { class GZipDecompressor : public Decompressor { public: - explicit GZipDecompressor(GZipFormat::type format) - : format_(format), initialized_(false), finished_(false) {} + explicit GZipDecompressor(GZipFormat::type format, int window_bits) + : format_(format), + window_bits_(window_bits), + initialized_(false), + finished_(false) {} ~GZipDecompressor() override { if (initialized_) { @@ -103,7 +108,7 @@ class GZipDecompressor : public Decompressor { finished_ = false; int ret; - int window_bits = DecompressionWindowBitsForFormat(format_); + int window_bits = DecompressionWindowBitsForFormat(format_, window_bits_); if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { return ZlibError("zlib inflateInit failed: "); } else { @@ -162,6 +167,7 @@ class GZipDecompressor : public Decompressor { z_stream stream_; GZipFormat::type format_; + int window_bits_; bool initialized_; bool finished_; }; @@ -180,13 +186,13 @@ class GZipCompressor : public Compressor { } } - Status Init(GZipFormat::type format) { + Status Init(GZipFormat::type format, int input_window_bits) { DCHECK(!initialized_); memset(&stream_, 0, sizeof(stream_)); int ret; // Initialize to run specified format - int window_bits = CompressionWindowBitsForFormat(format); + int window_bits = CompressionWindowBitsForFormat(format, input_window_bits); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibError("zlib deflateInit failed: "); @@ -300,8 +306,9 @@ class GZipCompressor : public Compressor { class GZipCodec : public Codec { public: - explicit GZipCodec(int compression_level, GZipFormat::type format) + explicit GZipCodec(int compression_level, GZipFormat::type format, int window_bits) : format_(format), + window_bits_(window_bits), compressor_initialized_(false), decompressor_initialized_(false) { compression_level_ = compression_level == kUseDefaultCompressionLevel @@ -316,12 +323,12 @@ class GZipCodec : public Codec { Result> MakeCompressor() override { auto ptr = std::make_shared(compression_level_); - RETURN_NOT_OK(ptr->Init(format_)); + RETURN_NOT_OK(ptr->Init(format_, window_bits_)); return ptr; } Result> MakeDecompressor() override { - auto ptr = std::make_shared(format_); + auto ptr = std::make_shared(format_, window_bits_); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -332,7 +339,7 @@ class GZipCodec : public Codec { int ret; // Initialize to run specified format - int window_bits = CompressionWindowBitsForFormat(format_); + int window_bits = CompressionWindowBitsForFormat(format_, window_bits_); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg); @@ -354,7 +361,7 @@ class GZipCodec : public Codec { int ret; // Initialize to run either deflate or zlib/gzip format - int window_bits = DecompressionWindowBitsForFormat(format_); + int window_bits = DecompressionWindowBitsForFormat(format_, window_bits_); if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { return ZlibErrorPrefix("zlib inflateInit failed: ", stream_.msg); } @@ -461,6 +468,9 @@ class GZipCodec : public Codec { } Status Init() override { + if (window_bits_ < kGZipMinWindowBits || window_bits_ > WINDOW_BITS) { + return Status::Invalid("window_bits should be within 9 ~ 15"); + } const Status init_compressor_status = InitCompressor(); if (!init_compressor_status.ok()) { return init_compressor_status; @@ -494,12 +504,14 @@ class GZipCodec : public Codec { bool compressor_initialized_; bool decompressor_initialized_; int compression_level_; + int window_bits_; }; } // namespace -std::unique_ptr MakeGZipCodec(int compression_level, GZipFormat::type format) { - return std::make_unique(compression_level, format); +std::unique_ptr MakeGZipCodec(int compression_level, GZipFormat::type format, + int window_bits) { + return std::make_unique(compression_level, format, window_bits); } } // namespace internal diff --git a/cpp/src/parquet/column_io_benchmark.cc b/cpp/src/parquet/column_io_benchmark.cc index 6ee579bec9a69..3f779468f2881 100644 --- a/cpp/src/parquet/column_io_benchmark.cc +++ b/cpp/src/parquet/column_io_benchmark.cc @@ -40,8 +40,8 @@ std::shared_ptr BuildWriter(int64_t output_size, ColumnDescriptor* schema, const WriterProperties* properties, Compression::type codec) { - std::unique_ptr pager = - PageWriter::Open(dst, codec, Codec::UseDefaultCompressionLevel(), metadata); + std::unique_ptr pager = + PageWriter::Open(dst, codec, std::make_shared(), metadata); std::shared_ptr writer = ColumnWriter::Make(metadata, std::move(pager), properties); return std::static_pointer_cast(writer); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 2892335227771..5ade20a17eeb0 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -36,7 +36,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/compression.h" + #include "arrow/util/crc32.h" #include "arrow/util/endian.h" #include "arrow/util/logging.h" @@ -249,9 +249,9 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) { class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal, int16_t column_chunk_ordinal, - bool use_page_checksum_verification, + const std::shared_ptr& codec_options, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, + int16_t column_chunk_ordinal, bool use_page_checksum_verification, MemoryPool* pool = ::arrow::default_memory_pool(), std::shared_ptr meta_encryptor = nullptr, std::shared_ptr data_encryptor = nullptr, @@ -277,7 +277,7 @@ class SerializedPageWriter : public PageWriter { if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) { InitEncryption(); } - compressor_ = GetCodec(codec, compression_level); + compressor_ = GetCodec(codec, codec_options); thrift_serializer_ = std::make_unique(); } @@ -611,9 +611,9 @@ class SerializedPageWriter : public PageWriter { class BufferedPageWriter : public PageWriter { public: BufferedPageWriter(std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal, int16_t current_column_ordinal, - bool use_page_checksum_verification, + const std::shared_ptr& codec_options, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, + int16_t current_column_ordinal, bool use_page_checksum_verification, MemoryPool* pool = ::arrow::default_memory_pool(), std::shared_ptr meta_encryptor = nullptr, std::shared_ptr data_encryptor = nullptr, @@ -622,7 +622,7 @@ class BufferedPageWriter : public PageWriter { : final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) { in_memory_sink_ = CreateOutputStream(pool); pager_ = std::make_unique( - in_memory_sink_, codec, compression_level, metadata, row_group_ordinal, + in_memory_sink_, codec, codec_options, metadata, row_group_ordinal, current_column_ordinal, use_page_checksum_verification, pool, std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, offset_index_builder); @@ -683,20 +683,21 @@ class BufferedPageWriter : public PageWriter { std::unique_ptr PageWriter::Open( std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool, - bool buffered_row_group, std::shared_ptr meta_encryptor, - std::shared_ptr data_encryptor, bool page_write_checksum_enabled, - ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* offset_index_builder) { + const std::shared_ptr& codec_options, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, + int16_t column_chunk_ordinal, MemoryPool* pool, bool buffered_row_group, + std::shared_ptr meta_encryptor, std::shared_ptr data_encryptor, + bool page_write_checksum_enabled, ColumnIndexBuilder* column_index_builder, + OffsetIndexBuilder* offset_index_builder) { if (buffered_row_group) { return std::unique_ptr(new BufferedPageWriter( - std::move(sink), codec, compression_level, metadata, row_group_ordinal, + std::move(sink), codec, codec_options, metadata, row_group_ordinal, column_chunk_ordinal, page_write_checksum_enabled, pool, std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, offset_index_builder)); } else { return std::unique_ptr(new SerializedPageWriter( - std::move(sink), codec, compression_level, metadata, row_group_ordinal, + std::move(sink), codec, codec_options, metadata, row_group_ordinal, column_chunk_ordinal, page_write_checksum_enabled, pool, std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, offset_index_builder)); diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 792b108ac8835..c6852fe86dadb 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/util/compression.h" #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/types.h" @@ -87,8 +88,9 @@ class PARQUET_EXPORT PageWriter { static std::unique_ptr Open( std::shared_ptr sink, Compression::type codec, - int compression_level, ColumnChunkMetaDataBuilder* metadata, - int16_t row_group_ordinal = -1, int16_t column_chunk_ordinal = -1, + const std::shared_ptr& codec_options, + ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal = -1, + int16_t column_chunk_ordinal = -1, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), bool buffered_row_group = false, std::shared_ptr header_encryptor = NULLPTR, diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 8b133967e8c99..4b13f81b379f5 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -118,8 +118,9 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); std::unique_ptr pager = PageWriter::Open( - sink_, column_properties.compression(), Codec::UseDefaultCompressionLevel(), - metadata_.get(), /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, + sink_, column_properties.compression(), std::make_shared(), + metadata_.get(), + /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, ::arrow::default_memory_pool(), /* buffered_row_group */ false, /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR, enable_checksum); std::shared_ptr writer = @@ -162,6 +163,25 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); } + void TestRequiredWithCodecOptions( + Encoding::type encoding, Compression::type compression, bool enable_dictionary, + bool enable_statistics, int64_t num_rows = SMALL_SIZE, + int compression_level = Codec::UseDefaultCompressionLevel(), + ::arrow::util::GZipFormat::type format = ::arrow::util::GZipFormat::GZIP, + int window_bits = ::arrow::util::kGZipDefaultWindowBits, + bool enable_checksum = false) { + this->GenerateData(num_rows); + + auto codec_options = + std::make_shared<::arrow::util::GZipCodecOptions>(compression_level); + codec_options->gzip_format = format; + codec_options->window_bits = window_bits; + this->WriteRequiredWithCodecOptions(encoding, compression, enable_dictionary, + enable_statistics, codec_options, num_rows, + enable_checksum); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows, enable_checksum)); + } + void TestDictionaryFallbackEncoding(ParquetVersion::type version) { this->GenerateData(VERY_LARGE_SIZE); ColumnProperties column_properties; @@ -237,7 +257,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { bool enable_checksum) { ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); - column_properties.set_compression_level(compression_level); + column_properties.set_codec_options( + std::make_shared(compression_level)); std::shared_ptr> writer = this->BuildWriter( num_rows, column_properties, ParquetVersion::PARQUET_1_0, enable_checksum); writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); @@ -255,7 +276,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { bit_util::BytesForBits(static_cast(this->values_.size())) + 1, 255); ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); - column_properties.set_compression_level(compression_level); + column_properties.set_codec_options( + std::make_shared(compression_level)); std::shared_ptr> writer = this->BuildWriter( num_rows, column_properties, ParquetVersion::PARQUET_1_0, enable_checksum); writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, @@ -265,6 +287,22 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { writer->Close(); } + void WriteRequiredWithCodecOptions(Encoding::type encoding, + Compression::type compression, + bool enable_dictionary, bool enable_statistics, + const std::shared_ptr& codec_options, + int64_t num_rows, bool enable_checksum) { + ColumnProperties column_properties(encoding, compression, enable_dictionary, + enable_statistics); + column_properties.set_codec_options(codec_options); + std::shared_ptr> writer = this->BuildWriter( + num_rows, column_properties, ParquetVersion::PARQUET_1_0, enable_checksum); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + // The behaviour should be independent from the number of Close() calls + writer->Close(); + writer->Close(); + } + void ReadAndCompare(Compression::type compression, int64_t num_rows, bool page_checksum_verify) { this->SetupValuesOut(num_rows); @@ -521,6 +559,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE); } + +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCodecOptions) { + this->TestRequiredWithCodecOptions(Encoding::PLAIN, Compression::GZIP, false, false, + LARGE_SIZE, 10, ::arrow::util::GZipFormat::GZIP, 12); +} #endif #ifdef ARROW_WITH_LZ4 @@ -815,9 +858,8 @@ TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { auto props = WriterProperties::Builder().build(); auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); - std::unique_ptr pager = - PageWriter::Open(sink, Compression::UNCOMPRESSED, - Codec::UseDefaultCompressionLevel(), metadata.get()); + std::unique_ptr pager = PageWriter::Open( + sink, Compression::UNCOMPRESSED, std::make_shared(), metadata.get()); std::shared_ptr writer = ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); auto typed_writer = std::static_pointer_cast>(writer); @@ -1348,7 +1390,7 @@ class ColumnWriterTestSizeEstimated : public ::testing::Test { schema_descriptor_->Column(0)); std::unique_ptr pager = PageWriter::Open( - sink_, compression, Codec::UseDefaultCompressionLevel(), metadata_.get(), + sink_, compression, std::make_shared(), metadata_.get(), /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1, ::arrow::default_memory_pool(), /* buffered_row_group */ buffered, /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR, diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 57067bc533f5a..20cb6426750fb 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -153,8 +153,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) : nullptr; std::unique_ptr pager = PageWriter::Open( - sink_, properties_->compression(path), properties_->compression_level(path), - col_meta, row_group_ordinal_, static_cast(column_ordinal), + sink_, properties_->compression(path), properties_->codec_options(path), col_meta, + row_group_ordinal_, static_cast(column_ordinal), properties_->memory_pool(), false, meta_encryptor, data_encryptor, properties_->page_checksum_enabled(), ci_builder, oi_builder); column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); @@ -290,7 +290,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) : nullptr; std::unique_ptr pager = PageWriter::Open( - sink_, properties_->compression(path), properties_->compression_level(path), + sink_, properties_->compression(path), properties_->codec_options(path), col_meta, static_cast(row_group_ordinal_), static_cast(column_ordinal), properties_->memory_pool(), buffered_row_group_, meta_encryptor, data_encryptor, diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h index 00a193f144a18..b085e57cd9918 100644 --- a/cpp/src/parquet/platform.h +++ b/cpp/src/parquet/platform.h @@ -87,6 +87,7 @@ namespace parquet { using Buffer = ::arrow::Buffer; using Codec = ::arrow::util::Codec; +using CodecOptions = ::arrow::util::CodecOptions; using Compression = ::arrow::Compression; using MemoryPool = ::arrow::MemoryPool; using MutableBuffer = ::arrow::MutableBuffer; diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index f38dd17482b1f..8a171dba41866 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -153,7 +153,7 @@ class PARQUET_EXPORT ColumnProperties { dictionary_enabled_(dictionary_enabled), statistics_enabled_(statistics_enabled), max_stats_size_(max_stats_size), - compression_level_(Codec::UseDefaultCompressionLevel()), + codec_options_(std::make_shared()), page_index_enabled_(DEFAULT_IS_PAGE_INDEX_ENABLED) {} void set_encoding(Encoding::type encoding) { encoding_ = encoding; } @@ -172,8 +172,8 @@ class PARQUET_EXPORT ColumnProperties { max_stats_size_ = max_stats_size; } - void set_compression_level(int compression_level) { - compression_level_ = compression_level; + void set_codec_options(const std::shared_ptr& codec_options) { + codec_options_ = codec_options; } void set_page_index_enabled(bool page_index_enabled) { @@ -190,7 +190,7 @@ class PARQUET_EXPORT ColumnProperties { size_t max_statistics_size() const { return max_stats_size_; } - int compression_level() const { return compression_level_; } + const std::shared_ptr& codec_options() const { return codec_options_; } bool page_index_enabled() const { return page_index_enabled_; } @@ -200,7 +200,7 @@ class PARQUET_EXPORT ColumnProperties { bool dictionary_enabled_; bool statistics_enabled_; size_t max_stats_size_; - int compression_level_; + std::shared_ptr codec_options_; bool page_index_enabled_; }; @@ -382,9 +382,13 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->ToDotString(), codec); } - /// \brief Specify the default compression level for the compressor in - /// every column. In case a column does not have an explicitly specified - /// compression level, the default one would be used. + /// \brief Specify the default codec options for the compressor in + /// every column. Previously only compression level is supported to be customized, + /// with CodecOptions, more specific properties could be set by users. + /// + /// For compression level, it could be set by codec_options.compression_level. In + /// case a column does not have an explicitly specified compression level, the default + /// one would be used. /// /// The provided compression level is compressor specific. The user would /// have to familiarize oneself with the available levels for the selected @@ -394,41 +398,31 @@ class PARQUET_EXPORT WriterProperties { /// level is selected by the user or if the special /// std::numeric_limits::min() value is passed, then Arrow selects the /// compression level. - Builder* compression_level(int compression_level) { - default_column_properties_.set_compression_level(compression_level); + /// + /// For GZip Codec, users could set window_bits and format with GZipCodecOptions. + /// For other Codecs, CodecOptions could be used to set compression_level. More + /// specific codec options to be added. + Builder* codec_options( + const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { + default_column_properties_.set_codec_options(codec_options); return this; } - /// \brief Specify a compression level for the compressor for the column + /// \brief Specify the codec options for the compressor for the column /// described by path. - /// - /// The provided compression level is compressor specific. The user would - /// have to familiarize oneself with the available levels for the selected - /// compressor. If the compressor does not allow for selecting different - /// compression levels, calling this function would not have any effect. - /// Parquet and Arrow do not validate the passed compression level. If no - /// level is selected by the user or if the special - /// std::numeric_limits::min() value is passed, then Arrow selects the - /// compression level. - Builder* compression_level(const std::string& path, int compression_level) { - codecs_compression_level_[path] = compression_level; + Builder* codec_options( + const std::string& path, + const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { + codec_options_[path] = codec_options; return this; } - /// \brief Specify a compression level for the compressor for the column + /// \brief Specify the codec options for the compressor for the column /// described by path. - /// - /// The provided compression level is compressor specific. The user would - /// have to familiarize oneself with the available levels for the selected - /// compressor. If the compressor does not allow for selecting different - /// compression levels, calling this function would not have any effect. - /// Parquet and Arrow do not validate the passed compression level. If no - /// level is selected by the user or if the special - /// std::numeric_limits::min() value is passed, then Arrow selects the - /// compression level. - Builder* compression_level(const std::shared_ptr& path, - int compression_level) { - return this->compression_level(path->ToDotString(), compression_level); + Builder* codec_options( + const std::shared_ptr& path, + const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { + return this->codec_options(path->ToDotString(), codec_options); } /// Define the file encryption properties. @@ -565,8 +559,8 @@ class PARQUET_EXPORT WriterProperties { for (const auto& item : encodings_) get(item.first).set_encoding(item.second); for (const auto& item : codecs_) get(item.first).set_compression(item.second); - for (const auto& item : codecs_compression_level_) - get(item.first).set_compression_level(item.second); + for (const auto& item : codec_options_) + get(item.first).set_codec_options(item.second); for (const auto& item : dictionary_enabled_) get(item.first).set_dictionary_enabled(item.second); for (const auto& item : statistics_enabled_) @@ -599,7 +593,7 @@ class PARQUET_EXPORT WriterProperties { ColumnProperties default_column_properties_; std::unordered_map encodings_; std::unordered_map codecs_; - std::unordered_map codecs_compression_level_; + std::unordered_map> codec_options_; std::unordered_map dictionary_enabled_; std::unordered_map statistics_enabled_; std::unordered_map page_index_enabled_; @@ -658,8 +652,9 @@ class PARQUET_EXPORT WriterProperties { return column_properties(path).compression(); } - int compression_level(const std::shared_ptr& path) const { - return column_properties(path).compression_level(); + const std::shared_ptr codec_options( + const std::shared_ptr& path) const { + return column_properties(path).codec_options(); } bool dictionary_enabled(const std::shared_ptr& path) const { diff --git a/cpp/src/parquet/properties_test.cc b/cpp/src/parquet/properties_test.cc index 5fd182f679c98..cb7f38a827562 100644 --- a/cpp/src/parquet/properties_test.cc +++ b/cpp/src/parquet/properties_test.cc @@ -70,6 +70,37 @@ TEST(TestWriterProperties, AdvancedHandling) { ASSERT_EQ(ParquetDataPageVersion::V2, props->data_page_version()); } +TEST(TestWriterProperties, SetCodecOptions) { + WriterProperties::Builder builder; + builder.compression("gzip", Compression::GZIP); + builder.compression("zstd", Compression::ZSTD); + builder.compression("brotli", Compression::BROTLI); + auto gzip_codec_options = std::make_shared<::arrow::util::GZipCodecOptions>(); + gzip_codec_options->compression_level_ = 9; + gzip_codec_options->window_bits = 12; + builder.codec_options("gzip", gzip_codec_options); + auto codec_options = std::make_shared(); + builder.codec_options(codec_options); + auto brotli_codec_options = std::make_shared<::arrow::util::BrotliCodecOptions>(); + brotli_codec_options->compression_level_ = 11; + brotli_codec_options->window_bits = 20; + builder.codec_options("brotli", brotli_codec_options); + std::shared_ptr props = builder.build(); + + ASSERT_EQ(9, + props->codec_options(ColumnPath::FromDotString("gzip"))->compression_level_); + ASSERT_EQ(12, std::dynamic_pointer_cast<::arrow::util::GZipCodecOptions>( + props->codec_options(ColumnPath::FromDotString("gzip"))) + ->window_bits); + ASSERT_EQ(Codec::UseDefaultCompressionLevel(), + props->codec_options(ColumnPath::FromDotString("zstd"))->compression_level_); + ASSERT_EQ( + 11, props->codec_options(ColumnPath::FromDotString("brotli"))->compression_level_); + ASSERT_EQ(20, std::dynamic_pointer_cast<::arrow::util::BrotliCodecOptions>( + props->codec_options(ColumnPath::FromDotString("brotli"))) + ->window_bits); +} + TEST(TestReaderProperties, GetStreamInsufficientData) { // ARROW-6058 std::string data = "shorter than expected"; diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 532fd4c3d7b43..59705ec586e5d 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -51,10 +51,11 @@ bool IsCodecSupported(Compression::type codec) { } std::unique_ptr GetCodec(Compression::type codec) { - return GetCodec(codec, Codec::UseDefaultCompressionLevel()); + return GetCodec(codec, std::make_shared()); } -std::unique_ptr GetCodec(Compression::type codec, int compression_level) { +std::unique_ptr GetCodec(Compression::type codec, + const std::shared_ptr& codec_options) { std::unique_ptr result; if (codec == Compression::LZO) { throw ParquetException( @@ -69,7 +70,7 @@ std::unique_ptr GetCodec(Compression::type codec, int compression_level) throw ParquetException(ss.str()); } - PARQUET_ASSIGN_OR_THROW(result, Codec::Create(codec, compression_level)); + PARQUET_ASSIGN_OR_THROW(result, Codec::Create(codec, codec_options)); return result; } diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 6ec6870d3a04c..0a40fcd8e2140 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -488,7 +488,8 @@ PARQUET_EXPORT std::unique_ptr GetCodec(Compression::type codec); PARQUET_EXPORT -std::unique_ptr GetCodec(Compression::type codec, int compression_level); +std::unique_ptr GetCodec(Compression::type codec, + const std::shared_ptr& codec_options); struct ParquetCipher { enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 }; diff --git a/r/src/compression.cpp b/r/src/compression.cpp index 148c6e14002f5..1ef3713edbdc5 100644 --- a/r/src/compression.cpp +++ b/r/src/compression.cpp @@ -23,7 +23,8 @@ // [[arrow::export]] std::shared_ptr util___Codec__Create(arrow::Compression::type codec, R_xlen_t compression_level) { - return ValueOrStop(arrow::util::Codec::Create(codec, compression_level)); + return ValueOrStop(arrow::util::Codec::Create( + codec, std::make_shared(CodecOptions(compression_level)))); } // [[arrow::export]]