Skip to content

Commit

Permalink
add CodecOptions to customize the compression parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
yyang52 committed Jun 5, 2023
1 parent 388f3a8 commit 1e5c232
Show file tree
Hide file tree
Showing 22 changed files with 387 additions and 131 deletions.
33 changes: 33 additions & 0 deletions cpp/examples/arrow/parquet_read_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,44 @@ arrow::Status WriteInBatches(std::string path_to_file) {
return arrow::Status::OK();
}

arrow::Status WriteWithCodecOptions(std::string path_to_file) {
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, GetTable());

// Customize codec options with compression level and window bits
// the default window bits is 15, here we set a small number of 12 for some use scenario
// with limited hisotry buffer size
auto codec_options = std::make_shared<arrow::util::GZipCodecOptions>();
codec_options->compression_level_ = 9;
codec_options->window_bits = 12;

// Choose compression
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.compression(arrow::Compression::GZIP)
->codec_options(codec_options)
->build();

// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();

std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));

ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(),
arrow::default_memory_pool(), outfile,
/*chunk_size=*/3, props, arrow_props));
return arrow::Status::OK();
}

arrow::Status RunExamples(std::string path_to_file) {
ARROW_RETURN_NOT_OK(WriteFullFile(path_to_file));
ARROW_RETURN_NOT_OK(ReadFullFile(path_to_file));
ARROW_RETURN_NOT_OK(WriteInBatches(path_to_file));
ARROW_RETURN_NOT_OK(ReadInBatches(path_to_file));
ARROW_RETURN_NOT_OK(WriteWithCodecOptions(path_to_file));
return arrow::Status::OK();
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/flight/flight_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ int main(int argc, char** argv) {
const int level = level_str.empty() ? arrow::util::kUseDefaultCompressionLevel
: std::stoi(level_str);
const auto type = arrow::util::Codec::GetCompressionType(name).ValueOrDie();
auto codec = arrow::util::Codec::Create(type, level).ValueOrDie();
auto codec = arrow::util::Codec::Create(
type, std::make_shared<arrow::util::CodecOptions>(level))
.ValueOrDie();
std::cout << "Compression method: " << name;
if (!level_str.empty()) {
std::cout << ", level " << level;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,8 @@ Status WriteTable(const Table& table, io::OutputStream* dst,
ipc_options.allow_64bit = true;
ARROW_ASSIGN_OR_RAISE(
ipc_options.codec,
util::Codec::Create(properties.compression, properties.compression_level));
util::Codec::Create(properties.compression, std::make_shared<util::CodecOptions>(
properties.compression_level)));

std::shared_ptr<RecordBatchWriter> writer;
ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), ipc_options));
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Future;

namespace util {
class Codec;
class CodecOptions;
} // namespace util

class Buffer;
Expand Down
22 changes: 16 additions & 6 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ Result<int> Codec::DefaultCompressionLevel(Compression::type codec_type) {
return codec->default_compression_level();
}

Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
Result<std::unique_ptr<Codec>> Codec::Create(
Compression::type codec_type, const std::shared_ptr<CodecOptions>& codec_options) {
if (!IsAvailable(codec_type)) {
if (codec_type == Compression::LZO) {
return Status::NotImplemented("LZO codec not implemented");
Expand All @@ -151,6 +151,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
"' not built");
}

auto compression_level = codec_options->compression_level_;
if (compression_level != kUseDefaultCompressionLevel &&
!SupportsCompressionLevel(codec_type)) {
return Status::Invalid("Codec '", GetCodecAsString(codec_type),
Expand All @@ -166,16 +167,25 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
codec = internal::MakeSnappyCodec();
#endif
break;
case Compression::GZIP:
case Compression::GZIP: {
#ifdef ARROW_WITH_ZLIB
codec = internal::MakeGZipCodec(compression_level);
std::shared_ptr<GZipCodecOptions> opt =
std::dynamic_pointer_cast<GZipCodecOptions>(codec_options);
codec = internal::MakeGZipCodec(compression_level,
opt ? opt->gzip_format : GZipFormat::GZIP,
opt ? opt->window_bits : kGZipDefaultWindowBits);
#endif
break;
case Compression::BROTLI:
}
case Compression::BROTLI: {
#ifdef ARROW_WITH_BROTLI
codec = internal::MakeBrotliCodec(compression_level);
std::shared_ptr<BrotliCodecOptions> opt =
std::dynamic_pointer_cast<BrotliCodecOptions>(codec_options);
codec = internal::MakeBrotliCodec(
compression_level, opt ? opt->window_bits : kBrotliDefaultWindowBits);
#endif
break;
}
case Compression::LZ4:
#ifdef ARROW_WITH_LZ4
codec = internal::MakeLz4RawCodec(compression_level);
Expand Down
48 changes: 47 additions & 1 deletion cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,50 @@ class ARROW_EXPORT Decompressor {
// XXX add methods for buffer size heuristics?
};

/// \brief Compression codec options
class ARROW_EXPORT CodecOptions {
public:
CodecOptions(int compression_level = kUseDefaultCompressionLevel) {
compression_level_ = compression_level;
}
virtual ~CodecOptions() = default;

int compression_level_;
};

// ----------------------------------------------------------------------
// gzip codec options implementation

struct GZipFormat {
enum type {
ZLIB,
DEFLATE,
GZIP,
};
};

constexpr int kGZipDefaultWindowBits = 15;

class GZipCodecOptions : public CodecOptions {
public:
~GZipCodecOptions() = default;

GZipFormat::type gzip_format = GZipFormat::GZIP;
int window_bits = kGZipDefaultWindowBits;
};

// ----------------------------------------------------------------------
// brotli codec options implementation

constexpr int kBrotliDefaultWindowBits = 22;

class BrotliCodecOptions : public CodecOptions {
public:
~BrotliCodecOptions() = default;

int window_bits = kBrotliDefaultWindowBits;
};

/// \brief Compression codec
class ARROW_EXPORT Codec {
public:
Expand All @@ -124,7 +168,9 @@ class ARROW_EXPORT Codec {

/// \brief Create a codec for the given compression algorithm
static Result<std::unique_ptr<Codec>> Create(
Compression::type codec, int compression_level = kUseDefaultCompressionLevel);
Compression::type codec,
const std::shared_ptr<CodecOptions>& codec_options =
std::make_shared<CodecOptions>(kUseDefaultCompressionLevel));

/// \brief Return true if support for indicated codec has been enabled
static bool IsAvailable(Compression::type codec);
Expand Down
29 changes: 19 additions & 10 deletions cpp/src/arrow/util/compression_brotli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class BrotliDecompressor : public Decompressor {

class BrotliCompressor : public Compressor {
public:
explicit BrotliCompressor(int compression_level)
: compression_level_(compression_level) {}
explicit BrotliCompressor(int compression_level, int window_bits)
: compression_level_(compression_level), window_bits_(window_bits) {}

~BrotliCompressor() override {
if (state_ != nullptr) {
Expand All @@ -109,6 +109,12 @@ class BrotliCompressor : public Compressor {
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) {
return BrotliError("Brotli set compression level failed");
}
if (window_bits_ < BROTLI_MIN_WINDOW_BITS || window_bits_ > BROTLI_MAX_WINDOW_BITS) {
return Status::Invalid("window_bits should be within 10 ~ 24");
}
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_LGWIN, window_bits_)) {
return BrotliError("Brotli set window size failed");
}
return Status::OK();
}

Expand Down Expand Up @@ -166,15 +172,17 @@ class BrotliCompressor : public Compressor {

private:
const int compression_level_;
const int window_bits_;
};

// ----------------------------------------------------------------------
// Brotli codec implementation

class BrotliCodec : public Codec {
public:
explicit BrotliCodec(int compression_level)
: compression_level_(compression_level == kUseDefaultCompressionLevel
explicit BrotliCodec(int compression_level, int window_bits)
: window_bits_(window_bits),
compression_level_(compression_level == kUseDefaultCompressionLevel
? kBrotliDefaultCompressionLevel
: compression_level) {}

Expand All @@ -201,16 +209,16 @@ class BrotliCodec : public Codec {
DCHECK_GE(input_len, 0);
DCHECK_GE(output_buffer_len, 0);
std::size_t output_size = static_cast<size_t>(output_buffer_len);
if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW,
BROTLI_DEFAULT_MODE, static_cast<size_t>(input_len), input,
&output_size, output_buffer) == BROTLI_FALSE) {
if (BrotliEncoderCompress(compression_level_, window_bits_, BROTLI_DEFAULT_MODE,
static_cast<size_t>(input_len), input, &output_size,
output_buffer) == BROTLI_FALSE) {
return Status::IOError("Brotli compression failure.");
}
return output_size;
}

Result<std::shared_ptr<Compressor>> MakeCompressor() override {
auto ptr = std::make_shared<BrotliCompressor>(compression_level_);
auto ptr = std::make_shared<BrotliCompressor>(compression_level_, window_bits_);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand All @@ -232,12 +240,13 @@ class BrotliCodec : public Codec {

private:
const int compression_level_;
const int window_bits_;
};

} // namespace

std::unique_ptr<Codec> MakeBrotliCodec(int compression_level) {
return std::make_unique<BrotliCodec>(compression_level);
std::unique_ptr<Codec> MakeBrotliCodec(int compression_level, int window_bits) {
return std::make_unique<BrotliCodec>(compression_level, window_bits);
}

} // namespace internal
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/util/compression_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,20 @@ constexpr int kBrotliDefaultCompressionLevel = 8;

// Brotli codec.
std::unique_ptr<Codec> MakeBrotliCodec(
int compression_level = kBrotliDefaultCompressionLevel);
int compression_level = kBrotliDefaultCompressionLevel,
int window_bits = kBrotliDefaultWindowBits);

// BZ2 codec.
constexpr int kBZ2DefaultCompressionLevel = 9;

std::unique_ptr<Codec> MakeBZ2Codec(int compression_level = kBZ2DefaultCompressionLevel);

// GZip
constexpr int kGZipDefaultCompressionLevel = 9;

struct GZipFormat {
enum type {
ZLIB,
DEFLATE,
GZIP,
};
};

std::unique_ptr<Codec> MakeGZipCodec(int compression_level = kGZipDefaultCompressionLevel,
GZipFormat::type format = GZipFormat::GZIP);
GZipFormat::type format = GZipFormat::GZIP,
int window_bits = kGZipDefaultWindowBits);

// Snappy
std::unique_ptr<Codec> MakeSnappyCodec();
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

#include "arrow/util/compression_internal.h"

#include <cstdint>
#include <cstring>
#include <memory>

#include <lz4.h>
#include <lz4frame.h>
#include <lz4hc.h>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <memory>

#include "arrow/result.h"
#include "arrow/status.h"
Expand Down
76 changes: 74 additions & 2 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,81 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) {
continue;
}
const auto level = combination.level;
const auto codec_options = std::make_shared<arrow::util::CodecOptions>(level);
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, level);
auto result2 = Codec::Create(compression, level);
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
}
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
GZipFormat::type format;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {{2, GZipFormat::ZLIB, 12, true},
{9, GZipFormat::GZIP, 9, true},
{9, GZipFormat::GZIP, 20, false},
{5, GZipFormat::DEFLATE, -12, false},
{-992, GZipFormat::GZIP, 15, false}};

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::GZIP;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = std::make_shared<arrow::util::GZipCodecOptions>();
codec_options->compression_level_ = combination.level;
codec_options->gzip_format = combination.format;
codec_options->window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
}
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {
{8, 22, true}, {11, 10, true}, {1, 24, true}, {5, -12, false}, {-992, 25, false}};

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::BROTLI;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = std::make_shared<arrow::util::BrotliCodecOptions>();
codec_options->compression_level_ = combination.level;
codec_options->window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
Expand Down
Loading

0 comments on commit 1e5c232

Please sign in to comment.