Skip to content

Commit

Permalink
ARROW-13965: [C++] dynamic_casts in parquet TypedColumnWriterImpl imp…
Browse files Browse the repository at this point in the history
…acting performance

This PR adds observer pointers to TypedColumnWriterImpl to avoid repeated dynamic_casts upon writing many small batches.  This can lead to significant performance gains.  As an example, using stream_reader_writer from examples/parquet/parquet_stream_api modified to write 20M rows, an improvement of over 30% was seen in write times.
```
original:
52.919u 2.146s 0:55.81 98.6%	0+0k 0+2646928io 0pf+0w
53.328u 2.327s 0:56.38 98.6%	0+0k 0+2646928io 0pf+0w
53.048u 2.341s 0:56.06 98.7%	0+0k 0+2646928io 0pf+0w

patched:
35.367u 2.012s 0:37.88 98.6%	0+0k 0+2646928io 0pf+0w
35.133u 1.717s 0:37.36 98.6%	0+0k 0+2646928io 0pf+0w
35.228u 1.950s 0:37.61 98.8%	0+0k 0+2646928io 0pf+0w
```

Closes apache#11131 from etseidl/downcast

Lead-authored-by: seidl <seidl2@llnl.gov>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: etseidl <etseidl@users.noreply.github.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
3 people committed Sep 16, 2021
1 parent 1130d18 commit 5260fd5
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
properties) {
current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
properties->memory_pool());
// We have to dynamic_cast as some compilers don't want to static_cast
// through virtual inheritance.
current_value_encoder_ = dynamic_cast<TypedEncoder<DType>*>(current_encoder_.get());
// Will be null if not using dictionary, but that's ok
current_dict_encoder_ = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());

if (properties->statistics_enabled(descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
Expand Down Expand Up @@ -1159,15 +1164,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
ArrowWriteContext* context, bool maybe_parent_nulls);

void WriteDictionaryPage() override {
// We have to dynamic cast here because of TypedEncoder<Type> as
// some compilers don't want to cast through virtual inheritance
auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
DCHECK(dict_encoder);
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
dict_encoder->WriteDict(buffer->mutable_data());

DictionaryPage page(buffer, dict_encoder->num_entries(),
DCHECK(current_dict_encoder_);
std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(
properties_->memory_pool(), current_dict_encoder_->dict_encoded_size());
current_dict_encoder_->WriteDict(buffer->mutable_data());

DictionaryPage page(buffer, current_dict_encoder_->num_entries(),
properties_->dictionary_page_encoding());
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}
Expand Down Expand Up @@ -1207,6 +1209,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
std::unique_ptr<Encoder> current_encoder_;
// Downcasted observers of current_encoder_.
// The downcast is performed once as opposed to at every use since
// dynamic_cast is so expensive, and static_cast is not available due
// to virtual inheritance.
ValueEncoderType* current_value_encoder_;
DictEncoder<DType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;

Expand Down Expand Up @@ -1358,6 +1366,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// Only PLAIN encoding is supported for fallback in V1
current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
properties_->memory_pool());
current_value_encoder_ = dynamic_cast<ValueEncoderType*>(current_encoder_.get());
current_dict_encoder_ = nullptr; // not using dict
encoding_ = Encoding::PLAIN;
}
}
Expand All @@ -1375,17 +1385,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
return;
}

// We have to dynamic cast here because TypedEncoder<Type> as some compilers
// don't want to cast through virtual inheritance
auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
if (current_dict_encoder_->dict_encoded_size() >=
properties_->dictionary_pagesize_limit()) {
FallbackToPlainEncoding();
}
}

void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) {
dynamic_cast<ValueEncoderType*>(current_encoder_.get())
->Put(values, static_cast<int>(num_values));
current_value_encoder_->Put(values, static_cast<int>(num_values));
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
Expand All @@ -1394,12 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
const uint8_t* valid_bits, int64_t valid_bits_offset) {
if (num_values != num_spaced_values) {
dynamic_cast<ValueEncoderType*>(current_encoder_.get())
->PutSpaced(values, static_cast<int>(num_spaced_values), valid_bits,
valid_bits_offset);
current_value_encoder_->PutSpaced(values, static_cast<int>(num_spaced_values),
valid_bits, valid_bits_offset);
} else {
dynamic_cast<ValueEncoderType*>(current_encoder_.get())
->Put(values, static_cast<int>(num_values));
current_value_encoder_->Put(values, static_cast<int>(num_values));
}
if (page_statistics_ != nullptr) {
const int64_t num_nulls = num_spaced_values - num_values;
Expand Down

0 comments on commit 5260fd5

Please sign in to comment.