diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 69827d5c464b9..8585b1ccf11aa 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2413,9 +2413,9 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) { ASSERT_EQ(2, reader->num_row_groups()); - // Pre-buffer 3 columns in the 2nd row group. + // Pre-buffer column 0 and column 3 in the 2nd row group. const std::vector row_groups = {1}; - const std::vector column_indices = {0, 1, 4}; + const std::vector column_indices = {0, 3}; reader->parquet_reader()->PreBuffer(row_groups, column_indices, ::arrow::io::IOContext(), ::arrow::io::CacheOptions::Defaults()); diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index fc30ddb43f29c..adda9a027bded 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -29,6 +29,7 @@ #include "arrow/io/caching.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" +#include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/int_util_overflow.h" @@ -179,7 +180,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source, int64_t source_size, FileMetaData* file_metadata, int row_group_number, const ReaderProperties& props, - std::unordered_set prebuffered_column_chunks, + std::shared_ptr prebuffered_column_chunks_bitmap, std::shared_ptr file_decryptor = nullptr) : source_(std::move(source)), cached_source_(std::move(cached_source)), @@ -187,7 +188,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { file_metadata_(file_metadata), properties_(props), row_group_ordinal_(row_group_number), - prebuffered_column_chunks_(std::move(prebuffered_column_chunks)), + prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)), file_decryptor_(file_decryptor) { row_group_metadata_ = file_metadata->RowGroup(row_group_number); } @@ -203,8 +204,8 @@ class SerializedRowGroup : public RowGroupReader::Contents { ::arrow::io::ReadRange col_range = ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); std::shared_ptr stream; - if (cached_source_ && - prebuffered_column_chunks_.find(i) != prebuffered_column_chunks_.end()) { + if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && + ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) { // PARQUET-1698: if read coalescing is enabled, read from pre-buffered // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); @@ -272,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::unique_ptr row_group_metadata_; ReaderProperties properties_; int row_group_ordinal_; - const std::unordered_set prebuffered_column_chunks_; + const std::shared_ptr prebuffered_column_chunks_bitmap_; std::shared_ptr file_decryptor_; }; @@ -302,17 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents { } std::shared_ptr GetRowGroup(int i) override { - std::unordered_set prebuffered_column_chunks; - // Avoid updating the map as this function can be called concurrently. The map can - // only be updated within Prebuffer(). + std::shared_ptr prebuffered_column_chunks_bitmap; + // Avoid updating the bitmap as this function can be called concurrently. The bitmap + // can only be updated within Prebuffer(). auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) { - prebuffered_column_chunks = prebuffered_column_chunks_iter->second; + prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second; } std::unique_ptr contents = std::make_unique( source_, cached_source_, source_size_, file_metadata_.get(), i, properties_, - std::move(prebuffered_column_chunks), file_decryptor_); + std::move(prebuffered_column_chunks_bitmap), file_decryptor_); return std::make_shared(std::move(contents)); } @@ -366,9 +367,12 @@ class SerializedFile : public ParquetFileReader::Contents { std::vector<::arrow::io::ReadRange> ranges; prebuffered_column_chunks_.clear(); for (int row : row_groups) { - std::unordered_set& prebuffered = prebuffered_column_chunks_[row]; + std::shared_ptr& col_bitmap = prebuffered_column_chunks_[row]; + int num_cols = file_metadata_->num_columns(); + PARQUET_THROW_NOT_OK( + AllocateEmptyBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap)); for (int col : column_indices) { - prebuffered.insert(col); + ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col); ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } @@ -578,8 +582,9 @@ class SerializedFile : public ParquetFileReader::Contents { ReaderProperties properties_; std::shared_ptr page_index_reader_; std::unique_ptr bloom_filter_reader_; - // Maps a row group to its column chunks that are cached via Prebuffer(). - std::unordered_map> prebuffered_column_chunks_; + // Maps row group ordinal and prebuffer status of its column chunks in the form of a + // bitmap buffer. + std::unordered_map> prebuffered_column_chunks_; std::shared_ptr file_decryptor_; // \return The true length of the metadata in bytes