Skip to content

Commit

Permalink
apacheGH-35287: [C++][Parquet] Add CodecOptions to customize the comp…
Browse files Browse the repository at this point in the history
…ression parameter (apache#35886)

### Rationale for this change

Based on apache#35287, we'd like to add a CodecOptions to make more compression parameters (such as window_bits) customizable when creating the Codec for parquet writer.

Authored-by: Yang Yang [yang10.yang@ intel.com](mailto:yang10.yang@ intel.com)
Co-authored-by: Rambacher, Mark [mark.rambacher@ intel.com](mailto:mark.rambacher@ intel.com)

### What changes are included in this PR?

Add CodecOptions and replace `compression_level` when creating the Codec. The design is basically based on previous discussions. 
### Are these changes tested?

Yes
### Are there any user-facing changes?

Yes, when user creates the `WriterProperties`

* Closes: apache#35287

Lead-authored-by: yyang52 <yang10.yang@intel.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
yyang52 and pitrou authored Jul 11, 2023
1 parent c09b8c8 commit a9035cb
Show file tree
Hide file tree
Showing 17 changed files with 442 additions and 106 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Future;

namespace util {
class Codec;
class CodecOptions;
} // namespace util

class Buffer;
Expand Down
24 changes: 19 additions & 5 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Result<int> Codec::DefaultCompressionLevel(Compression::type codec_type) {
}

Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
const CodecOptions& codec_options) {
if (!IsAvailable(codec_type)) {
if (codec_type == Compression::LZO) {
return Status::NotImplemented("LZO codec not implemented");
Expand All @@ -151,6 +151,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
"' not built");
}

auto compression_level = codec_options.compression_level;
if (compression_level != kUseDefaultCompressionLevel &&
!SupportsCompressionLevel(codec_type)) {
return Status::Invalid("Codec '", GetCodecAsString(codec_type),
Expand All @@ -166,16 +167,23 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
codec = internal::MakeSnappyCodec();
#endif
break;
case Compression::GZIP:
case Compression::GZIP: {
#ifdef ARROW_WITH_ZLIB
codec = internal::MakeGZipCodec(compression_level);
auto opt = dynamic_cast<const GZipCodecOptions*>(&codec_options);
codec = internal::MakeGZipCodec(compression_level,
opt ? opt->gzip_format : GZipFormat::GZIP,
opt ? opt->window_bits : std::nullopt);
#endif
break;
case Compression::BROTLI:
}
case Compression::BROTLI: {
#ifdef ARROW_WITH_BROTLI
codec = internal::MakeBrotliCodec(compression_level);
auto opt = dynamic_cast<const BrotliCodecOptions*>(&codec_options);
codec = internal::MakeBrotliCodec(compression_level,
opt ? opt->window_bits : std::nullopt);
#endif
break;
}
case Compression::LZ4:
#ifdef ARROW_WITH_LZ4
codec = internal::MakeLz4RawCodec(compression_level);
Expand Down Expand Up @@ -210,6 +218,12 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
return std::move(codec);
}

// use compression level to create Codec
Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
return Codec::Create(codec_type, CodecOptions{compression_level});
}

bool Codec::IsAvailable(Compression::type codec_type) {
switch (codec_type) {
case Compression::UNCOMPRESSED:
Expand Down
43 changes: 41 additions & 2 deletions cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <string>

#include "arrow/result.h"
Expand Down Expand Up @@ -107,6 +108,40 @@ class ARROW_EXPORT Decompressor {
// XXX add methods for buffer size heuristics?
};

/// \brief Compression codec options
class ARROW_EXPORT CodecOptions {
public:
explicit CodecOptions(int compression_level = kUseDefaultCompressionLevel)
: compression_level(compression_level) {}

virtual ~CodecOptions() = default;

int compression_level;
};

// ----------------------------------------------------------------------
// GZip codec options implementation

enum class GZipFormat {
ZLIB,
DEFLATE,
GZIP,
};

class ARROW_EXPORT GZipCodecOptions : public CodecOptions {
public:
GZipFormat gzip_format = GZipFormat::GZIP;
std::optional<int> window_bits;
};

// ----------------------------------------------------------------------
// brotli codec options implementation

class ARROW_EXPORT BrotliCodecOptions : public CodecOptions {
public:
std::optional<int> window_bits;
};

/// \brief Compression codec
class ARROW_EXPORT Codec {
public:
Expand All @@ -122,9 +157,13 @@ class ARROW_EXPORT Codec {
/// \brief Return compression type for name (all lower case)
static Result<Compression::type> GetCompressionType(const std::string& name);

/// \brief Create a codec for the given compression algorithm
/// \brief Create a codec for the given compression algorithm with CodecOptions
static Result<std::unique_ptr<Codec>> Create(
Compression::type codec, int compression_level = kUseDefaultCompressionLevel);
Compression::type codec, const CodecOptions& codec_options = CodecOptions{});

/// \brief Create a codec for the given compression algorithm
static Result<std::unique_ptr<Codec>> Create(Compression::type codec,
int compression_level);

/// \brief Return true if support for indicated codec has been enabled
static bool IsAvailable(Compression::type codec);
Expand Down
36 changes: 26 additions & 10 deletions cpp/src/arrow/util/compression_brotli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class BrotliDecompressor : public Decompressor {

class BrotliCompressor : public Compressor {
public:
explicit BrotliCompressor(int compression_level)
: compression_level_(compression_level) {}
explicit BrotliCompressor(int compression_level, int window_bits)
: compression_level_(compression_level), window_bits_(window_bits) {}

~BrotliCompressor() override {
if (state_ != nullptr) {
Expand All @@ -109,6 +109,9 @@ class BrotliCompressor : public Compressor {
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) {
return BrotliError("Brotli set compression level failed");
}
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_LGWIN, window_bits_)) {
return BrotliError("Brotli set window size failed");
}
return Status::OK();
}

Expand Down Expand Up @@ -166,17 +169,19 @@ class BrotliCompressor : public Compressor {

private:
const int compression_level_;
const int window_bits_;
};

// ----------------------------------------------------------------------
// Brotli codec implementation

class BrotliCodec : public Codec {
public:
explicit BrotliCodec(int compression_level)
explicit BrotliCodec(int compression_level, int window_bits)
: compression_level_(compression_level == kUseDefaultCompressionLevel
? kBrotliDefaultCompressionLevel
: compression_level) {}
: compression_level),
window_bits_(window_bits) {}

Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override {
Expand All @@ -201,16 +206,16 @@ class BrotliCodec : public Codec {
DCHECK_GE(input_len, 0);
DCHECK_GE(output_buffer_len, 0);
std::size_t output_size = static_cast<size_t>(output_buffer_len);
if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW,
BROTLI_DEFAULT_MODE, static_cast<size_t>(input_len), input,
&output_size, output_buffer) == BROTLI_FALSE) {
if (BrotliEncoderCompress(compression_level_, window_bits_, BROTLI_DEFAULT_MODE,
static_cast<size_t>(input_len), input, &output_size,
output_buffer) == BROTLI_FALSE) {
return Status::IOError("Brotli compression failure.");
}
return output_size;
}

Result<std::shared_ptr<Compressor>> MakeCompressor() override {
auto ptr = std::make_shared<BrotliCompressor>(compression_level_);
auto ptr = std::make_shared<BrotliCompressor>(compression_level_, window_bits_);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand All @@ -221,6 +226,14 @@ class BrotliCodec : public Codec {
return ptr;
}

Status Init() override {
if (window_bits_ < BROTLI_MIN_WINDOW_BITS || window_bits_ > BROTLI_MAX_WINDOW_BITS) {
return Status::Invalid("Brotli window_bits should be between ",
BROTLI_MIN_WINDOW_BITS, " and ", BROTLI_MAX_WINDOW_BITS);
}
return Status::OK();
}

Compression::type compression_type() const override { return Compression::BROTLI; }

int compression_level() const override { return compression_level_; }
Expand All @@ -232,12 +245,15 @@ class BrotliCodec : public Codec {

private:
const int compression_level_;
const int window_bits_;
};

} // namespace

std::unique_ptr<Codec> MakeBrotliCodec(int compression_level) {
return std::make_unique<BrotliCodec>(compression_level);
std::unique_ptr<Codec> MakeBrotliCodec(int compression_level,
std::optional<int> window_bits) {
return std::make_unique<BrotliCodec>(compression_level,
window_bits.value_or(BROTLI_DEFAULT_WINDOW));
}

} // namespace internal
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/util/compression_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,20 @@ constexpr int kBrotliDefaultCompressionLevel = 8;

// Brotli codec.
std::unique_ptr<Codec> MakeBrotliCodec(
int compression_level = kBrotliDefaultCompressionLevel);
int compression_level = kBrotliDefaultCompressionLevel,
std::optional<int> window_bits = std::nullopt);

// BZ2 codec.
constexpr int kBZ2DefaultCompressionLevel = 9;

std::unique_ptr<Codec> MakeBZ2Codec(int compression_level = kBZ2DefaultCompressionLevel);

// GZip
constexpr int kGZipDefaultCompressionLevel = 9;

struct GZipFormat {
enum type {
ZLIB,
DEFLATE,
GZIP,
};
};

std::unique_ptr<Codec> MakeGZipCodec(int compression_level = kGZipDefaultCompressionLevel,
GZipFormat::type format = GZipFormat::GZIP);
GZipFormat format = GZipFormat::GZIP,
std::optional<int> window_bits = std::nullopt);

// Snappy
std::unique_ptr<Codec> MakeSnappyCodec();
Expand Down
76 changes: 74 additions & 2 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,81 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) {
continue;
}
const auto level = combination.level;
const auto codec_options = arrow::util::CodecOptions(level);
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, level);
auto result2 = Codec::Create(compression, level);
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
}
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
GZipFormat format;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {{2, GZipFormat::ZLIB, 12, true},
{9, GZipFormat::GZIP, 9, true},
{9, GZipFormat::GZIP, 20, false},
{5, GZipFormat::DEFLATE, -12, false},
{-992, GZipFormat::GZIP, 15, false}};

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::GZIP;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = arrow::util::GZipCodecOptions();
codec_options.compression_level = combination.level;
codec_options.gzip_format = combination.format;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
}
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {
{8, 22, true}, {11, 10, true}, {1, 24, true}, {5, -12, false}, {-992, 25, false}};

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::BROTLI;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = arrow::util::BrotliCodecOptions();
codec_options.compression_level = combination.level;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
Expand Down
Loading

0 comments on commit a9035cb

Please sign in to comment.