Skip to content

Commit

Permalink
address comments and ensure compability
Browse files Browse the repository at this point in the history
  • Loading branch information
yyang52 committed Jun 8, 2023
1 parent ea0d84c commit 369d045
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 149 deletions.
4 changes: 1 addition & 3 deletions cpp/src/arrow/flight/flight_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,7 @@ int main(int argc, char** argv) {
const int level = level_str.empty() ? arrow::util::kUseDefaultCompressionLevel
: std::stoi(level_str);
const auto type = arrow::util::Codec::GetCompressionType(name).ValueOrDie();
auto codec = arrow::util::Codec::Create(
type, std::make_shared<arrow::util::CodecOptions>(level))
.ValueOrDie();
auto codec = arrow::util::Codec::Create(type, level).ValueOrDie();
std::cout << "Compression method: " << name;
if (!level_str.empty()) {
std::cout << ", level " << level;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,7 @@ Status WriteTable(const Table& table, io::OutputStream* dst,
ipc_options.allow_64bit = true;
ARROW_ASSIGN_OR_RAISE(
ipc_options.codec,
util::Codec::Create(properties.compression, std::make_shared<util::CodecOptions>(
properties.compression_level)));
util::Codec::Create(properties.compression, properties.compression_level));

std::shared_ptr<RecordBatchWriter> writer;
ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), ipc_options));
Expand Down
86 changes: 7 additions & 79 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ Result<int> Codec::DefaultCompressionLevel(Compression::type codec_type) {
return codec->default_compression_level();
}

Result<std::unique_ptr<Codec>> Codec::Create(
Compression::type codec_type, const std::shared_ptr<CodecOptions>& codec_options) {
Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
const CodecOptions& codec_options) {
if (!IsAvailable(codec_type)) {
if (codec_type == Compression::LZO) {
return Status::NotImplemented("LZO codec not implemented");
Expand All @@ -151,7 +151,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(
"' not built");
}

auto compression_level = codec_options->compression_level_;
auto compression_level = codec_options.compression_level_;
if (compression_level != kUseDefaultCompressionLevel &&
!SupportsCompressionLevel(codec_type)) {
return Status::Invalid("Codec '", GetCodecAsString(codec_type),
Expand All @@ -169,8 +169,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(
break;
case Compression::GZIP: {
#ifdef ARROW_WITH_ZLIB
std::shared_ptr<GZipCodecOptions> opt =
std::dynamic_pointer_cast<GZipCodecOptions>(codec_options);
auto opt = dynamic_cast<const GZipCodecOptions*>(&codec_options);
codec = internal::MakeGZipCodec(compression_level,
opt ? opt->gzip_format : GZipFormat::GZIP,
opt ? opt->window_bits : kGZipDefaultWindowBits);
Expand All @@ -179,8 +178,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(
}
case Compression::BROTLI: {
#ifdef ARROW_WITH_BROTLI
std::shared_ptr<BrotliCodecOptions> opt =
std::dynamic_pointer_cast<BrotliCodecOptions>(codec_options);
auto opt = dynamic_cast<const BrotliCodecOptions*>(&codec_options);
codec = internal::MakeBrotliCodec(
compression_level, opt ? opt->window_bits : kBrotliDefaultWindowBits);
#endif
Expand Down Expand Up @@ -220,80 +218,10 @@ Result<std::unique_ptr<Codec>> Codec::Create(
return std::move(codec);
}

// Deprecated and use CodecOptions to create Codec instead
// use compression level to create Codec
Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
if (!IsAvailable(codec_type)) {
if (codec_type == Compression::LZO) {
return Status::NotImplemented("LZO codec not implemented");
}

auto name = GetCodecAsString(codec_type);
if (name == "unknown") {
return Status::Invalid("Unrecognized codec");
}

return Status::NotImplemented("Support for codec '", GetCodecAsString(codec_type),
"' not built");
}

if (compression_level != kUseDefaultCompressionLevel &&
!SupportsCompressionLevel(codec_type)) {
return Status::Invalid("Codec '", GetCodecAsString(codec_type),
"' doesn't support setting a compression level.");
}

std::unique_ptr<Codec> codec;
switch (codec_type) {
case Compression::UNCOMPRESSED:
return nullptr;
case Compression::SNAPPY:
#ifdef ARROW_WITH_SNAPPY
codec = internal::MakeSnappyCodec();
#endif
break;
case Compression::GZIP:
#ifdef ARROW_WITH_ZLIB
codec = internal::MakeGZipCodec(compression_level);
#endif
break;
case Compression::BROTLI:
#ifdef ARROW_WITH_BROTLI
codec = internal::MakeBrotliCodec(compression_level);
#endif
break;
case Compression::LZ4:
#ifdef ARROW_WITH_LZ4
codec = internal::MakeLz4RawCodec(compression_level);
#endif
break;
case Compression::LZ4_FRAME:
#ifdef ARROW_WITH_LZ4
codec = internal::MakeLz4FrameCodec(compression_level);
#endif
break;
case Compression::LZ4_HADOOP:
#ifdef ARROW_WITH_LZ4
codec = internal::MakeLz4HadoopRawCodec();
#endif
break;
case Compression::ZSTD:
#ifdef ARROW_WITH_ZSTD
codec = internal::MakeZSTDCodec(compression_level);
#endif
break;
case Compression::BZ2:
#ifdef ARROW_WITH_BZ2
codec = internal::MakeBZ2Codec(compression_level);
#endif
break;
default:
break;
}

DCHECK_NE(codec, nullptr);
RETURN_NOT_OK(codec->Init());
return std::move(codec);
return Codec::Create(codec_type, CodecOptions(compression_level));
}

bool Codec::IsAvailable(Compression::type codec_type) {
Expand Down
18 changes: 5 additions & 13 deletions cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,8 @@ struct GZipFormat {

constexpr int kGZipDefaultWindowBits = 15;

class GZipCodecOptions : public CodecOptions {
class ARROW_EXPORT GZipCodecOptions : public CodecOptions {
public:
~GZipCodecOptions() = default;

GZipFormat::type gzip_format = GZipFormat::GZIP;
int window_bits = kGZipDefaultWindowBits;
};
Expand All @@ -144,10 +142,8 @@ class GZipCodecOptions : public CodecOptions {

constexpr int kBrotliDefaultWindowBits = 22;

class BrotliCodecOptions : public CodecOptions {
class ARROW_EXPORT BrotliCodecOptions : public CodecOptions {
public:
~BrotliCodecOptions() = default;

int window_bits = kBrotliDefaultWindowBits;
};

Expand All @@ -166,15 +162,11 @@ class ARROW_EXPORT Codec {
/// \brief Return compression type for name (all lower case)
static Result<Compression::type> GetCompressionType(const std::string& name);

/// \brief Create a codec for the given compression algorithm
static Result<std::unique_ptr<Codec>> Create(
Compression::type codec,
const std::shared_ptr<CodecOptions>& codec_options =
std::make_shared<CodecOptions>(kUseDefaultCompressionLevel));
/// \brief Create a codec for the given compression algorithm with CodecOptions
static Result<std::unique_ptr<Codec>> Create(Compression::type codec,
const CodecOptions& codec_options = {});

/// \brief Create a codec for the given compression algorithm
/// \deprecated and left for backwards compatibility.
ARROW_DEPRECATED("Use CodecOptions to create Codec")
static Result<std::unique_ptr<Codec>> Create(Compression::type codec,
int compression_level);

Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/util/compression_brotli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ class BrotliCodec : public Codec {
}

Status Init() override {
if (window_bits_ < BROTLI_MIN_WINDOW_BITS || window_bits_ > BROTLI_MAX_WINDOW_BITS) {
return Status::Invalid("window_bits should be within 10 ~ 24");
}
return Status::Invalid("window_bits should be between ", BROTLI_MIN_WINDOW_BITS,
" and ", BROTLI_MAX_WINDOW_BITS);
return Status::OK();
}

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

#include "arrow/util/compression_internal.h"

#include <lz4.h>
#include <lz4frame.h>
#include <lz4hc.h>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <memory>

#include <lz4.h>
#include <lz4frame.h>
#include <lz4hc.h>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/bit_util.h"
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) {
continue;
}
const auto level = combination.level;
const auto codec_options = std::make_shared<arrow::util::CodecOptions>(level);
const auto codec_options = arrow::util::CodecOptions(level);
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
Expand Down Expand Up @@ -424,10 +424,10 @@ TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = std::make_shared<arrow::util::GZipCodecOptions>();
codec_options->compression_level_ = combination.level;
codec_options->gzip_format = combination.format;
codec_options->window_bits = combination.window_bits;
auto codec_options = arrow::util::GZipCodecOptions();
codec_options.compression_level_ = combination.level;
codec_options.gzip_format = combination.format;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
Expand Down Expand Up @@ -458,9 +458,9 @@ TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = std::make_shared<arrow::util::BrotliCodecOptions>();
codec_options->compression_level_ = combination.level;
codec_options->window_bits = combination.window_bits;
auto codec_options = arrow::util::BrotliCodecOptions();
codec_options.compression_level_ = combination.level;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/arrow/util/compression_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

#include "arrow/util/compression_internal.h"

#include <zconf.h>
#include <zlib.h>
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>

#include <zconf.h>
#include <zlib.h>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
Expand All @@ -43,7 +44,7 @@ namespace {
// there.

// Maximum window size
constexpr int WINDOW_BITS = 15;
constexpr int kGZipMaxWindowBits = 15;

// Minimum window size
constexpr int kGZipMinWindowBits = 9;
Expand All @@ -57,8 +58,7 @@ constexpr int DETECT_CODEC = 32;
constexpr int kGZipMinCompressionLevel = 1;
constexpr int kGZipMaxCompressionLevel = 9;

int CompressionWindowBitsForFormat(GZipFormat::type format, int input_window_bits) {
int window_bits = input_window_bits;
int CompressionWindowBitsForFormat(GZipFormat::type format, int window_bits) {
switch (format) {
case GZipFormat::DEFLATE:
window_bits = -window_bits;
Expand All @@ -72,12 +72,12 @@ int CompressionWindowBitsForFormat(GZipFormat::type format, int input_window_bit
return window_bits;
}

int DecompressionWindowBitsForFormat(GZipFormat::type format, int input_window_bits) {
int DecompressionWindowBitsForFormat(GZipFormat::type format, int window_bits) {
if (format == GZipFormat::DEFLATE) {
return -input_window_bits;
return -window_bits;
} else {
/* If not deflate, autodetect format from header */
return input_window_bits | DETECT_CODEC;
return window_bits | DETECT_CODEC;
}
}

Expand Down Expand Up @@ -468,8 +468,9 @@ class GZipCodec : public Codec {
}

Status Init() override {
if (window_bits_ < kGZipMinWindowBits || window_bits_ > WINDOW_BITS) {
return Status::Invalid("window_bits should be within 9 ~ 15");
if (window_bits_ < kGZipMinWindowBits || window_bits_ > kGZipMaxWindowBits) {
return Status::Invalid("window_bits should be between ", kGZipMinWindowBits,
" and ", kGZipMaxWindowBits);
}
const Status init_compressor_status = InitCompressor();
if (!init_compressor_status.ok()) {
Expand Down
21 changes: 17 additions & 4 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
class SerializedPageWriter : public PageWriter {
public:
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
const std::shared_ptr<CodecOptions>& codec_options,
const CodecOptions& codec_options,
ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal,
int16_t column_chunk_ordinal, bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
Expand Down Expand Up @@ -611,7 +611,7 @@ class SerializedPageWriter : public PageWriter {
class BufferedPageWriter : public PageWriter {
public:
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
const std::shared_ptr<CodecOptions>& codec_options,
const CodecOptions& codec_options,
ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal,
int16_t current_column_ordinal, bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
Expand Down Expand Up @@ -691,19 +691,32 @@ std::unique_ptr<PageWriter> PageWriter::Open(
OffsetIndexBuilder* offset_index_builder) {
if (buffered_row_group) {
return std::unique_ptr<PageWriter>(new BufferedPageWriter(
std::move(sink), codec, codec_options, metadata, row_group_ordinal,
std::move(sink), codec, *codec_options.get(), metadata, row_group_ordinal,
column_chunk_ordinal, page_write_checksum_enabled, pool,
std::move(meta_encryptor), std::move(data_encryptor), column_index_builder,
offset_index_builder));
} else {
return std::unique_ptr<PageWriter>(new SerializedPageWriter(
std::move(sink), codec, codec_options, metadata, row_group_ordinal,
std::move(sink), codec, *codec_options.get(), metadata, row_group_ordinal,
column_chunk_ordinal, page_write_checksum_enabled, pool,
std::move(meta_encryptor), std::move(data_encryptor), column_index_builder,
offset_index_builder));
}
}

std::unique_ptr<PageWriter> PageWriter::Open(
std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
std::shared_ptr<Encryptor> data_encryptor, bool page_write_checksum_enabled,
ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* offset_index_builder) {
return PageWriter::Open(sink, codec, std::make_shared<CodecOptions>(compression_level),
metadata, row_group_ordinal, column_chunk_ordinal, pool,
buffered_row_group, meta_encryptor, data_encryptor,
page_write_checksum_enabled, column_index_builder,
offset_index_builder);
}
// ----------------------------------------------------------------------
// ColumnWriter

Expand Down
Loading

0 comments on commit 369d045

Please sign in to comment.