From a5768298edabdfce64324c52b0411ecb5b7c24fa Mon Sep 17 00:00:00 2001 From: jp0317 Date: Wed, 12 Jul 2023 18:12:24 +0000 Subject: [PATCH 1/4] use bit vector to store prebuffered column chunks --- cpp/src/parquet/file_reader.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index fc30ddb43f29c..be14ee870a822 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -179,7 +179,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source, int64_t source_size, FileMetaData* file_metadata, int row_group_number, const ReaderProperties& props, - std::unordered_set prebuffered_column_chunks, + std::vector prebuffered_column_chunks, std::shared_ptr file_decryptor = nullptr) : source_(std::move(source)), cached_source_(std::move(cached_source)), @@ -203,8 +203,8 @@ class SerializedRowGroup : public RowGroupReader::Contents { ::arrow::io::ReadRange col_range = ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); std::shared_ptr stream; - if (cached_source_ && - prebuffered_column_chunks_.find(i) != prebuffered_column_chunks_.end()) { + if (cached_source_ && !prebuffered_column_chunks_.empty() && + prebuffered_column_chunks_[i]) { // PARQUET-1698: if read coalescing is enabled, read from pre-buffered // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); @@ -272,7 +272,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::unique_ptr row_group_metadata_; ReaderProperties properties_; int row_group_ordinal_; - const std::unordered_set prebuffered_column_chunks_; + const std::vector prebuffered_column_chunks_; std::shared_ptr file_decryptor_; }; @@ -302,7 +302,7 @@ class SerializedFile : public ParquetFileReader::Contents { } std::shared_ptr GetRowGroup(int i) override { - std::unordered_set prebuffered_column_chunks; + std::vector prebuffered_column_chunks; // Avoid updating the map as this function can be called concurrently. The map can // only be updated within Prebuffer(). auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); @@ -366,9 +366,10 @@ class SerializedFile : public ParquetFileReader::Contents { std::vector<::arrow::io::ReadRange> ranges; prebuffered_column_chunks_.clear(); for (int row : row_groups) { - std::unordered_set& prebuffered = prebuffered_column_chunks_[row]; + std::vector& prebuffered = prebuffered_column_chunks_[row]; + prebuffered.resize(file_metadata_->num_columns(), false); for (int col : column_indices) { - prebuffered.insert(col); + prebuffered[col] = true; ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } @@ -579,7 +580,7 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr page_index_reader_; std::unique_ptr bloom_filter_reader_; // Maps a row group to its column chunks that are cached via Prebuffer(). - std::unordered_map> prebuffered_column_chunks_; + std::unordered_map> prebuffered_column_chunks_; std::shared_ptr file_decryptor_; // \return The true length of the metadata in bytes From fd1ae97647a4891da5e5c1c885c1cfbbcc695a8b Mon Sep 17 00:00:00 2001 From: jp0317 Date: Thu, 13 Jul 2023 18:42:35 +0000 Subject: [PATCH 2/4] use AllocateBitmap --- .../parquet/arrow/arrow_reader_writer_test.cc | 2 +- cpp/src/parquet/file_reader.cc | 31 +++++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 69827d5c464b9..2408fb9198fb1 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2415,7 +2415,7 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) { // Pre-buffer 3 columns in the 2nd row group. const std::vector row_groups = {1}; - const std::vector column_indices = {0, 1, 4}; + const std::vector column_indices = {0, 3}; reader->parquet_reader()->PreBuffer(row_groups, column_indices, ::arrow::io::IOContext(), ::arrow::io::CacheOptions::Defaults()); diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index be14ee870a822..dab5f37df549b 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -29,6 +29,7 @@ #include "arrow/io/caching.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" +#include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/int_util_overflow.h" @@ -179,7 +180,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source, int64_t source_size, FileMetaData* file_metadata, int row_group_number, const ReaderProperties& props, - std::vector prebuffered_column_chunks, + std::shared_ptr prebuffered_column_chunks_bitmap, std::shared_ptr file_decryptor = nullptr) : source_(std::move(source)), cached_source_(std::move(cached_source)), @@ -187,7 +188,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { file_metadata_(file_metadata), properties_(props), row_group_ordinal_(row_group_number), - prebuffered_column_chunks_(std::move(prebuffered_column_chunks)), + prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)), file_decryptor_(file_decryptor) { row_group_metadata_ = file_metadata->RowGroup(row_group_number); } @@ -203,8 +204,8 @@ class SerializedRowGroup : public RowGroupReader::Contents { ::arrow::io::ReadRange col_range = ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); std::shared_ptr stream; - if (cached_source_ && !prebuffered_column_chunks_.empty() && - prebuffered_column_chunks_[i]) { + if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && + ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->mutable_data(), i)) { // PARQUET-1698: if read coalescing is enabled, read from pre-buffered // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); @@ -272,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::unique_ptr row_group_metadata_; ReaderProperties properties_; int row_group_ordinal_; - const std::vector prebuffered_column_chunks_; + std::shared_ptr prebuffered_column_chunks_bitmap_; std::shared_ptr file_decryptor_; }; @@ -302,17 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents { } std::shared_ptr GetRowGroup(int i) override { - std::vector prebuffered_column_chunks; + std::shared_ptr prebuffered_column_chunks_bitmap; // Avoid updating the map as this function can be called concurrently. The map can // only be updated within Prebuffer(). auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) { - prebuffered_column_chunks = prebuffered_column_chunks_iter->second; + prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second; } std::unique_ptr contents = std::make_unique( source_, cached_source_, source_size_, file_metadata_.get(), i, properties_, - std::move(prebuffered_column_chunks), file_decryptor_); + std::move(prebuffered_column_chunks_bitmap), file_decryptor_); return std::make_shared(std::move(contents)); } @@ -366,10 +367,13 @@ class SerializedFile : public ParquetFileReader::Contents { std::vector<::arrow::io::ReadRange> ranges; prebuffered_column_chunks_.clear(); for (int row : row_groups) { - std::vector& prebuffered = prebuffered_column_chunks_[row]; - prebuffered.resize(file_metadata_->num_columns(), false); + std::shared_ptr& col_bitmap = prebuffered_column_chunks_[row]; + int num_cols = file_metadata_->num_columns(); + PARQUET_THROW_NOT_OK( + AllocateBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap)); + ::arrow::bit_util::ClearBitmap(col_bitmap->mutable_data(), 0, num_cols); for (int col : column_indices) { - prebuffered[col] = true; + ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col); ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } @@ -579,8 +583,9 @@ class SerializedFile : public ParquetFileReader::Contents { ReaderProperties properties_; std::shared_ptr page_index_reader_; std::unique_ptr bloom_filter_reader_; - // Maps a row group to its column chunks that are cached via Prebuffer(). - std::unordered_map> prebuffered_column_chunks_; + // Maps a row group to a bitmap (stored in the Buffer) that marks its column chunks + // cached via Prebuffer(). + std::unordered_map> prebuffered_column_chunks_; std::shared_ptr file_decryptor_; // \return The true length of the metadata in bytes From 04adeb246d4e4870619d2ea017ca62793fce7b4c Mon Sep 17 00:00:00 2001 From: jp0317 Date: Sun, 16 Jul 2023 18:24:14 +0000 Subject: [PATCH 3/4] use memset to clear bit map and reword a few comments --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 2 +- cpp/src/parquet/file_reader.cc | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 2408fb9198fb1..8585b1ccf11aa 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2413,7 +2413,7 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) { ASSERT_EQ(2, reader->num_row_groups()); - // Pre-buffer 3 columns in the 2nd row group. + // Pre-buffer column 0 and column 3 in the 2nd row group. const std::vector row_groups = {1}; const std::vector column_indices = {0, 3}; reader->parquet_reader()->PreBuffer(row_groups, column_indices, diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index dab5f37df549b..c922639518f8a 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -273,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::unique_ptr row_group_metadata_; ReaderProperties properties_; int row_group_ordinal_; - std::shared_ptr prebuffered_column_chunks_bitmap_; + const std::shared_ptr prebuffered_column_chunks_bitmap_; std::shared_ptr file_decryptor_; }; @@ -304,8 +304,8 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr GetRowGroup(int i) override { std::shared_ptr prebuffered_column_chunks_bitmap; - // Avoid updating the map as this function can be called concurrently. The map can - // only be updated within Prebuffer(). + // Avoid updating the bitmap as this function can be called concurrently. The bitmap + // can only be updated within Prebuffer(). auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) { prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second; @@ -371,7 +371,7 @@ class SerializedFile : public ParquetFileReader::Contents { int num_cols = file_metadata_->num_columns(); PARQUET_THROW_NOT_OK( AllocateBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap)); - ::arrow::bit_util::ClearBitmap(col_bitmap->mutable_data(), 0, num_cols); + memset(col_bitmap->mutable_data(), 0, col_bitmap->size()); for (int col : column_indices) { ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col); ranges.push_back( @@ -583,8 +583,8 @@ class SerializedFile : public ParquetFileReader::Contents { ReaderProperties properties_; std::shared_ptr page_index_reader_; std::unique_ptr bloom_filter_reader_; - // Maps a row group to a bitmap (stored in the Buffer) that marks its column chunks - // cached via Prebuffer(). + // Maps row group ordinal and prebuffer status of its column chunks in the form of a + // bitmap buffer. std::unordered_map> prebuffered_column_chunks_; std::shared_ptr file_decryptor_; From 8c1482b70f61f77a3cc608fa0d86c06990cedb4a Mon Sep 17 00:00:00 2001 From: Jinpeng Date: Tue, 18 Jul 2023 23:27:03 -0400 Subject: [PATCH 4/4] Update cpp/src/parquet/file_reader.cc replace mutable_data() with data() when checking bitmap in Buffer Co-authored-by: Gang Wu --- cpp/src/parquet/file_reader.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index c922639518f8a..adda9a027bded 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -205,7 +205,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); std::shared_ptr stream; if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && - ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->mutable_data(), i)) { + ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) { // PARQUET-1698: if read coalescing is enabled, read from pre-buffered // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); @@ -370,8 +370,7 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr& col_bitmap = prebuffered_column_chunks_[row]; int num_cols = file_metadata_->num_columns(); PARQUET_THROW_NOT_OK( - AllocateBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap)); - memset(col_bitmap->mutable_data(), 0, col_bitmap->size()); + AllocateEmptyBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap)); for (int col : column_indices) { ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col); ranges.push_back(