From b268bb8e1fc7b1beb2879d8e887ea7fd1f751135 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Wed, 10 Feb 2016 20:43:44 -0800 Subject: [PATCH] PARQUET-169: Implement support for bulk reading and writing rep/def levels Added a LevelDecoder and LevelEncoder class to read and write batches of def/rep levels. Added tests to verify the functionality. Author: Deepak Majeti Closes #30 from majetideepak/master and squashes the following commits: 18d7e51 [Deepak Majeti] fixed argument order of asserts inside test cases 5e0000b [Deepak Majeti] PARQUET-169: Implement support for reading repetition and definition levels Change-Id: Ie3ed4ac5c5ceabe60b095d3b5eab45941bd71698 --- cpp/src/parquet/column/CMakeLists.txt | 2 + cpp/src/parquet/column/column-reader-test.cc | 59 ++++++- cpp/src/parquet/column/levels-test.cc | 129 +++++++++++++++ cpp/src/parquet/column/levels.h | 156 +++++++++++++++++++ cpp/src/parquet/column/page.h | 8 + cpp/src/parquet/column/reader.cc | 67 +++----- cpp/src/parquet/column/reader.h | 17 +- cpp/src/parquet/column/test-util.h | 19 +-- cpp/src/parquet/reader-test.cc | 1 - cpp/src/parquet/reader.cc | 6 +- 10 files changed, 393 insertions(+), 71 deletions(-) create mode 100644 cpp/src/parquet/column/levels-test.cc create mode 100644 cpp/src/parquet/column/levels.h diff --git a/cpp/src/parquet/column/CMakeLists.txt b/cpp/src/parquet/column/CMakeLists.txt index 7eb334ecc89fc..423f54498edc8 100644 --- a/cpp/src/parquet/column/CMakeLists.txt +++ b/cpp/src/parquet/column/CMakeLists.txt @@ -18,9 +18,11 @@ # Headers: top level install(FILES page.h + levels.h reader.h serialized-page.h scanner.h DESTINATION include/parquet/column) ADD_PARQUET_TEST(column-reader-test) +ADD_PARQUET_TEST(levels-test) diff --git a/cpp/src/parquet/column/column-reader-test.cc b/cpp/src/parquet/column/column-reader-test.cc index 920ae562560de..0d4aea16b3dd3 100644 --- a/cpp/src/parquet/column/column-reader-test.cc +++ b/cpp/src/parquet/column/column-reader-test.cc @@ -124,9 +124,6 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { Int32Reader* reader = static_cast(reader_.get()); - std::vector vexpected; - std::vector dexpected; - size_t values_read = 0; size_t batch_actual = 0; @@ -157,6 +154,62 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { ASSERT_EQ(0, values_read); } +TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { + vector values = {1, 2, 3, 4, 5}; + vector def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1}; + vector rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1}; + + size_t num_values = values.size(); + parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; + + vector page1; + test::DataPageBuilder page_builder(&page1); + + // Definition levels precede the values + page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE); + page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE); + page_builder.AppendValues(values, parquet::Encoding::PLAIN); + + pages_.push_back(page_builder.Finish()); + + NodePtr type = schema::Int32("a", Repetition::REPEATED); + ColumnDescriptor descr(type, 2, 1); + InitReader(&descr); + + Int32Reader* reader = static_cast(reader_.get()); + + size_t values_read = 0; + size_t batch_actual = 0; + + vector vresult(3, -1); + vector dresult(5, -1); + vector rresult(5, -1); + + batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(3, values_read); + + ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5))); + ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5))); + + batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(2, values_read); + + ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10))); + ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10))); + + // EOS, pass all nullptrs to check for improper writes. Do not segfault / + // core dump + batch_actual = reader->ReadBatch(5, nullptr, nullptr, + nullptr, &values_read); + ASSERT_EQ(0, batch_actual); + ASSERT_EQ(0, values_read); +} } // namespace test } // namespace parquet_cpp diff --git a/cpp/src/parquet/column/levels-test.cc b/cpp/src/parquet/column/levels-test.cc new file mode 100644 index 0000000000000..99cc21ee4c21f --- /dev/null +++ b/cpp/src/parquet/column/levels-test.cc @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include + +#include "parquet/thrift/parquet_types.h" +#include "parquet/column/levels.h" + +using std::string; + +namespace parquet_cpp { + +class TestLevels : public ::testing::Test { + public: + int GenerateLevels(int min_repeat_factor, int max_repeat_factor, + int max_level, std::vector& input_levels) { + int total_count = 0; + // for each repetition count upto max_repeat_factor + for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { + // repeat count increase by a factor of 2 for every iteration + int repeat_count = (1 << repeat); + // generate levels for repetition count upto the maximum level + int value = 0; + int bwidth = 0; + while (value <= max_level) { + for (int i = 0; i < repeat_count; i++) { + input_levels[total_count++] = value; + } + value = (2 << bwidth) - 1; + bwidth++; + } + } + return total_count; + } + + void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level, + std::vector& input_levels) { + LevelEncoder encoder; + LevelDecoder decoder; + int levels_count = 0; + std::vector output_levels; + std::vector bytes; + int num_levels = input_levels.size(); + output_levels.resize(num_levels); + bytes.resize(2 * num_levels); + ASSERT_EQ(num_levels, output_levels.size()); + ASSERT_EQ(2 * num_levels, bytes.size()); + // start encoding and decoding + if (encoding == parquet::Encoding::RLE) { + // leave space to write the rle length value + encoder.Init(encoding, max_level, num_levels, + bytes.data() + sizeof(uint32_t), bytes.size()); + + levels_count = encoder.Encode(num_levels, input_levels.data()); + (reinterpret_cast(bytes.data()))[0] = encoder.len(); + + } else { + encoder.Init(encoding, max_level, num_levels, + bytes.data(), bytes.size()); + levels_count = encoder.Encode(num_levels, input_levels.data()); + } + + ASSERT_EQ(num_levels, levels_count); + + decoder.Init(encoding, max_level, num_levels, bytes.data()); + levels_count = decoder.Decode(num_levels, output_levels.data()); + + ASSERT_EQ(num_levels, levels_count); + + for (int i = 0; i < num_levels; i++) { + EXPECT_EQ(input_levels[i], output_levels[i]); + } + } +}; + +// test levels with maximum bit-width from 1 to 8 +// increase the repetition count for each iteration by a factor of 2 +TEST_F(TestLevels, TestEncodeDecodeLevels) { + int min_repeat_factor = 0; + int max_repeat_factor = 7; // 128 + int max_bit_width = 8; + std::vector input_levels; + parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE, + parquet::Encoding::BIT_PACKED}; + + // for each encoding + for (int encode = 0; encode < 2; encode++) { + parquet::Encoding::type encoding = encodings[encode]; + // BIT_PACKED requires a sequence of atleast 8 + if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3; + + // for each maximum bit-width + for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { + int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor)); + int num_levels = (bit_width + 1) * num_levels_per_width; + input_levels.resize(num_levels); + ASSERT_EQ(num_levels, input_levels.size()); + + // find the maximum level for the current bit_width + int max_level = (1 << bit_width) - 1; + // Generate levels + int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor, + max_level, input_levels); + ASSERT_EQ(num_levels, total_count); + VerifyLevelsEncoding(encoding, max_level, input_levels); + } + } +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/column/levels.h b/cpp/src/parquet/column/levels.h new file mode 100644 index 0000000000000..405622302cb66 --- /dev/null +++ b/cpp/src/parquet/column/levels.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_COLUMN_LEVELS_H +#define PARQUET_COLUMN_LEVELS_H + +#include "parquet/exception.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/encodings/encodings.h" +#include "parquet/util/rle-encoding.h" + +namespace parquet_cpp { + +class LevelEncoder { + public: + LevelEncoder() {} + + // Initialize the LevelEncoder. + void Init(parquet::Encoding::type encoding, int16_t max_level, + int num_buffered_values, uint8_t* data, int data_size) { + bit_width_ = BitUtil::Log2(max_level + 1); + encoding_ = encoding; + switch (encoding) { + case parquet::Encoding::RLE: { + rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); + break; + } + case parquet::Encoding::BIT_PACKED: { + int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); + bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); + break; + } + default: + throw ParquetException("Unknown encoding type for levels."); + } + } + + // Encodes a batch of levels from an array and returns the number of levels encoded + size_t Encode(size_t batch_size, const int16_t* levels) { + size_t num_encoded = 0; + if (!rle_encoder_ && !bit_packed_encoder_) { + throw ParquetException("Level encoders are not initialized."); + } + + if (encoding_ == parquet::Encoding::RLE) { + for (size_t i = 0; i < batch_size; ++i) { + if (!rle_encoder_->Put(*(levels + i))) { + break; + } + ++num_encoded; + } + rle_encoder_->Flush(); + rle_length_ = rle_encoder_->len(); + } else { + for (size_t i = 0; i < batch_size; ++i) { + if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { + break; + } + ++num_encoded; + } + bit_packed_encoder_->Flush(); + } + return num_encoded; + } + + int32_t len() { + assert(encoding_ == parquet::Encoding::RLE); + return rle_length_; + } + + private: + int bit_width_; + int rle_length_; + parquet::Encoding::type encoding_; + std::unique_ptr rle_encoder_; + std::unique_ptr bit_packed_encoder_; +}; + + +class LevelDecoder { + public: + LevelDecoder() {} + + // Initialize the LevelDecoder and return the number of bytes consumed + size_t Init(parquet::Encoding::type encoding, int16_t max_level, + int num_buffered_values, const uint8_t* data) { + uint32_t num_bytes = 0; + uint32_t total_bytes = 0; + bit_width_ = BitUtil::Log2(max_level + 1); + encoding_ = encoding; + switch (encoding) { + case parquet::Encoding::RLE: { + num_bytes = *reinterpret_cast(data); + const uint8_t* decoder_data = data + sizeof(uint32_t); + rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); + return sizeof(uint32_t) + num_bytes; + } + case parquet::Encoding::BIT_PACKED: { + num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); + bit_packed_decoder_.reset(new BitReader(data, num_bytes)); + return num_bytes; + } + default: + throw ParquetException("Unknown encoding type for levels."); + } + return -1; + } + + // Decodes a batch of levels into an array and returns the number of levels decoded + size_t Decode(size_t batch_size, int16_t* levels) { + size_t num_decoded = 0; + if (!rle_decoder_ && !bit_packed_decoder_) { + throw ParquetException("Level decoders are not initialized."); + } + + if (encoding_ == parquet::Encoding::RLE) { + for (size_t i = 0; i < batch_size; ++i) { + if (!rle_decoder_->Get(levels + i)) { + break; + } + ++num_decoded; + } + } else { + for (size_t i = 0; i < batch_size; ++i) { + if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { + break; + } + ++num_decoded; + } + } + return num_decoded; + } + + private: + int bit_width_; + parquet::Encoding::type encoding_; + std::unique_ptr rle_decoder_; + std::unique_ptr bit_packed_decoder_; +}; + +} // namespace parquet_cpp +#endif // PARQUET_COLUMN_LEVELS_H diff --git a/cpp/src/parquet/column/page.h b/cpp/src/parquet/column/page.h index 46f5d624e700c..f2740b61fac58 100644 --- a/cpp/src/parquet/column/page.h +++ b/cpp/src/parquet/column/page.h @@ -84,6 +84,14 @@ class DataPage : public Page { return header_.encoding; } + parquet::Encoding::type repetition_level_encoding() const { + return header_.repetition_level_encoding; + } + + parquet::Encoding::type definition_level_encoding() const { + return header_.definition_level_encoding; + } + private: parquet::DataPageHeader header_; }; diff --git a/cpp/src/parquet/column/reader.cc b/cpp/src/parquet/column/reader.cc index 0fe7a6ef2b74a..878bd4ff33a89 100644 --- a/cpp/src/parquet/column/reader.cc +++ b/cpp/src/parquet/column/reader.cc @@ -59,18 +59,6 @@ void TypedColumnReader::ConfigureDictionary(const DictionaryPage* page) { current_decoder_ = decoders_[encoding].get(); } - -static size_t InitializeLevelDecoder(const uint8_t* buffer, - int16_t max_level, std::unique_ptr& decoder) { - int num_definition_bytes = *reinterpret_cast(buffer); - - decoder.reset(new RleDecoder(buffer + sizeof(uint32_t), - num_definition_bytes, - BitUtil::NumRequiredBits(max_level))); - - return sizeof(uint32_t) + num_definition_bytes; -} - // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) { @@ -109,24 +97,30 @@ bool TypedColumnReader::ReadNewPage() { // the page size to determine the number of bytes in the encoded data. size_t data_size = page->size(); - max_definition_level_ = descr_->max_definition_level(); - - // Read definition levels. - if (max_definition_level_ > 0) { - // Temporary hack until schema resolution implemented - - size_t def_levels_bytes = InitializeLevelDecoder(buffer, - max_definition_level_, definition_level_decoder_); - + int16_t max_definition_level = descr_->max_definition_level(); + int16_t max_repetition_level = descr_->max_repetition_level(); + //Data page Layout: Repetition Levels - Definition Levels - encoded values. + //Levels are encoded as rle or bit-packed. + //Init repetition levels + if (max_repetition_level > 0) { + size_t rep_levels_bytes = repetition_level_decoder_.Init( + page->repetition_level_encoding(), + max_repetition_level, num_buffered_values_, buffer); + buffer += rep_levels_bytes; + data_size -= rep_levels_bytes; + } + //TODO figure a way to set max_definition_level_ to 0 + //if the initial value is invalid + + //Init definition levels + if (max_definition_level > 0) { + size_t def_levels_bytes = definition_level_decoder_.Init( + page->definition_level_encoding(), + max_definition_level, num_buffered_values_, buffer); buffer += def_levels_bytes; data_size -= def_levels_bytes; - } else { - // REQUIRED field - max_definition_level_ = 0; } - // TODO: repetition levels - // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. parquet::Encoding::type encoding = page->encoding(); @@ -172,31 +166,18 @@ bool TypedColumnReader::ReadNewPage() { // ---------------------------------------------------------------------- // Batch read APIs -static size_t DecodeMany(RleDecoder* decoder, int16_t* levels, size_t batch_size) { - size_t num_decoded = 0; - - // TODO(wesm): Push this decoding down into RleDecoder itself - for (size_t i = 0; i < batch_size; ++i) { - if (!decoder->Get(levels + i)) { - break; - } - ++num_decoded; - } - return num_decoded; -} - size_t ColumnReader::ReadDefinitionLevels(size_t batch_size, int16_t* levels) { - if (!definition_level_decoder_) { + if (descr_->max_definition_level() == 0) { return 0; } - return DecodeMany(definition_level_decoder_.get(), levels, batch_size); + return definition_level_decoder_.Decode(batch_size, levels); } size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) { - if (!repetition_level_decoder_) { + if (descr_->max_repetition_level() == 0) { return 0; } - return DecodeMany(repetition_level_decoder_.get(), levels, batch_size); + return repetition_level_decoder_.Decode(batch_size, levels); } // ---------------------------------------------------------------------- diff --git a/cpp/src/parquet/column/reader.h b/cpp/src/parquet/column/reader.h index 0d10f0fe4475d..4585de8125fea 100644 --- a/cpp/src/parquet/column/reader.h +++ b/cpp/src/parquet/column/reader.h @@ -33,9 +33,11 @@ #include "parquet/encodings/encodings.h" #include "parquet/schema/descriptor.h" #include "parquet/util/rle-encoding.h" +#include "parquet/column/levels.h" namespace parquet_cpp { + class Codec; class Scanner; @@ -85,13 +87,10 @@ class ColumnReader { std::shared_ptr current_page_; // Not set if full schema for this field has no optional or repeated elements - std::unique_ptr definition_level_decoder_; + LevelDecoder definition_level_decoder_; // Not set for flat schemas. - std::unique_ptr repetition_level_decoder_; - - // Temporarily storing this to assist with batch reading - int16_t max_definition_level_; + LevelDecoder repetition_level_decoder_; // The total number of values stored in the data page. This is the maximum of // the number of encoded definition levels or encoded values. For @@ -182,13 +181,12 @@ inline size_t TypedColumnReader::ReadBatch(int batch_size, int16_t* def_le size_t values_to_read = 0; // If the field is required and non-repeated, there are no definition levels - if (definition_level_decoder_) { + if (descr_->max_definition_level() > 0) { num_def_levels = ReadDefinitionLevels(batch_size, def_levels); - // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. for (size_t i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == max_definition_level_) { + if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; } } @@ -198,9 +196,8 @@ inline size_t TypedColumnReader::ReadBatch(int batch_size, int16_t* def_le } // Not present for non-repeated fields - if (repetition_level_decoder_) { + if (descr_->max_repetition_level() > 0) { num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels); - if (num_def_levels != num_rep_levels) { throw ParquetException("Number of decoded rep / def levels did not match"); } diff --git a/cpp/src/parquet/column/test-util.h b/cpp/src/parquet/column/test-util.h index 9aa4e5a190307..8861134dffe2e 100644 --- a/cpp/src/parquet/column/test-util.h +++ b/cpp/src/parquet/column/test-util.h @@ -155,22 +155,19 @@ class DataPageBuilder { // TODO: compute a more precise maximum size for the encoded levels std::vector encode_buffer(DEFAULT_DATA_PAGE_SIZE); - RleEncoder encoder(&encode_buffer[0], encode_buffer.size(), - BitUtil::NumRequiredBits(max_level)); - - // TODO(wesm): push down vector encoding - for (int16_t level : levels) { - if (!encoder.Put(level)) { - throw ParquetException("out of space"); - } - } - uint32_t rle_bytes = encoder.Flush(); + LevelEncoder encoder; + encoder.Init(encoding, max_level, levels.size(), + encode_buffer.data(), encode_buffer.size()); + + encoder.Encode(levels.size(), levels.data()); + + uint32_t rle_bytes = encoder.len(); size_t levels_footprint = sizeof(uint32_t) + rle_bytes; Reserve(levels_footprint); *reinterpret_cast(Head()) = rle_bytes; - memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes); + memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes); buffer_size_ += levels_footprint; } }; diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index 49f25f0b3c074..ffc882c002d7a 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -89,7 +89,6 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) { // column 0, id std::shared_ptr scanner(new Int32Scanner(group->Column(0))); - int32_t val; bool is_null; for (size_t i = 0; i < 8; ++i) { diff --git a/cpp/src/parquet/reader.cc b/cpp/src/parquet/reader.cc index d3bc0a66075b6..2f30ebfe28c58 100644 --- a/cpp/src/parquet/reader.cc +++ b/cpp/src/parquet/reader.cc @@ -242,10 +242,10 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { << ")" << std::endl; } - for (int i = 0; i < num_row_groups(); ++i) { - stream << "--- Row Group " << i << " ---\n"; + for (int r = 0; r < num_row_groups(); ++r) { + stream << "--- Row Group " << r << " ---\n"; - RowGroupReader* group_reader = RowGroup(i); + RowGroupReader* group_reader = RowGroup(r); // Print column metadata size_t num_columns = group_reader->num_columns();