Skip to content

Commit

Permalink
PARQUET-501: Add OutputStream abstract interface, refactor encoding c…
Browse files Browse the repository at this point in the history
…ode paths

I also did a bit of tidying / reorganization and giving interfaces more descriptive names.

Author: Wes McKinney <wes@cloudera.com>

Closes apache#46 from wesm/PARQUET-501 and squashes the following commits:

491aa89 [Wes McKinney] * Add a basic OutputStream abstract interface and an InMemoryOutputStream   implementation for testing. * Refactor to use OutputStream on data encoding paths, reduce some code   duplication in column-reader-test. * Collect all input/output classes into util/input.* and util/output.*. * Use int64_t in InputStream::Peek/Read.

Change-Id: I0f29001dc8f23e3176e109ae9909a6539f285364
  • Loading branch information
wesm authored and julienledem committed Feb 11, 2016
1 parent b268bb8 commit bc47477
Show file tree
Hide file tree
Showing 18 changed files with 435 additions and 229 deletions.
55 changes: 14 additions & 41 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"

#include "parquet/util/output.h"
#include "parquet/util/test-common.h"

using std::string;
Expand Down Expand Up @@ -60,31 +61,15 @@ class TestPrimitiveReader : public ::testing::Test {
vector<shared_ptr<Page> > pages_;
};

template <typename T>
static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
if (end < start) {
return vector<T>(0);
}

vector<T> out(end - start);
for (size_t i = start; i < end; ++i) {
out[i - start] = values[i];
}
return out;
}


TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
pages_.push_back(page_builder.Finish());
std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
{}, 0, &buffer);
pages_.push_back(page);

// TODO: simplify this
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
ColumnDescriptor descr(type, 0, 0);
InitReader(&descr);
Expand All @@ -102,21 +87,16 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
ASSERT_TRUE(vector_equal(result, values));
}


TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};

size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
{}, 0, &buffer);

pages_.push_back(page_builder.Finish());
pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
ColumnDescriptor descr(type, 1, 0);
Expand Down Expand Up @@ -159,18 +139,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};

size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE);
page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
def_levels, 2, rep_levels, 1, &buffer);

pages_.push_back(page_builder.Finish());
pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/parquet/column/serialized-page.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "parquet/exception.h"
#include "parquet/thrift/util.h"
#include "parquet/util/input_stream.h"

using parquet::PageType;

Expand Down Expand Up @@ -52,7 +51,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (true) {
int bytes_read = 0;
int64_t bytes_read = 0;
const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
if (bytes_read == 0) {
return std::shared_ptr<Page>(nullptr);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column/serialized-page.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

#include "parquet/column/page.h"
#include "parquet/compression/codec.h"
#include "parquet/util/input_stream.h"
#include "parquet/util/input.h"
#include "parquet/thrift/parquet_types.h"

namespace parquet_cpp {
Expand Down
100 changes: 52 additions & 48 deletions cpp/src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,30 @@ class MockPageReader : public PageReader {
size_t page_index_;
};

// TODO(wesm): this is only used for testing for now

static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
static constexpr int INIT_BUFFER_SIZE = 1024;
// TODO(wesm): this is only used for testing for now. Refactor to form part of
// primary file write path

template <int TYPE>
class DataPageBuilder {
public:
typedef typename type_traits<TYPE>::value_type T;

// The passed vector is the owner of the page's data
explicit DataPageBuilder(std::vector<uint8_t>* out) :
out_(out),
buffer_size_(0),
// This class writes data and metadata to the passed inputs
explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) :
sink_(sink),
header_(header),
num_values_(0),
have_def_levels_(false),
have_rep_levels_(false),
have_values_(false) {
out_->resize(INIT_BUFFER_SIZE);
buffer_capacity_ = INIT_BUFFER_SIZE;
}

void AppendDefLevels(const std::vector<int16_t>& levels,
int16_t max_level, parquet::Encoding::type encoding) {
AppendLevels(levels, max_level, encoding);

num_values_ = std::max(levels.size(), num_values_);
header_.__set_definition_level_encoding(encoding);
header_->__set_definition_level_encoding(encoding);
have_def_levels_ = true;
}

Expand All @@ -88,7 +84,7 @@ class DataPageBuilder {
AppendLevels(levels, max_level, encoding);

num_values_ = std::max(levels.size(), num_values_);
header_.__set_repetition_level_encoding(encoding);
header_->__set_repetition_level_encoding(encoding);
have_rep_levels_ = true;
}

Expand All @@ -98,53 +94,31 @@ class DataPageBuilder {
ParquetException::NYI("only plain encoding currently implemented");
}
size_t bytes_to_encode = values.size() * sizeof(T);
Reserve(bytes_to_encode);

PlainEncoder<TYPE> encoder(nullptr);
size_t nbytes = encoder.Encode(&values[0], values.size(), Head());
// In case for some reason it's fewer than bytes_to_encode
buffer_size_ += nbytes;
encoder.Encode(&values[0], values.size(), sink_);

num_values_ = std::max(values.size(), num_values_);
header_.__set_encoding(encoding);
header_->__set_encoding(encoding);
have_values_ = true;
}

std::shared_ptr<Page> Finish() {
void Finish() {
if (!have_values_) {
throw ParquetException("A data page must at least contain values");
}
header_.__set_num_values(num_values_);
return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_);
header_->__set_num_values(num_values_);
}

private:
std::vector<uint8_t>* out_;

size_t buffer_size_;
size_t buffer_capacity_;

parquet::DataPageHeader header_;
InMemoryOutputStream* sink_;
parquet::DataPageHeader* header_;

size_t num_values_;

bool have_def_levels_;
bool have_rep_levels_;
bool have_values_;

void Reserve(size_t nbytes) {
while ((nbytes + buffer_size_) > buffer_capacity_) {
// TODO(wesm): limit to one reserve when this loop runs more than once
size_t new_capacity = 2 * buffer_capacity_;
out_->resize(new_capacity);
buffer_capacity_ = new_capacity;
}
}

uint8_t* Head() {
return &(*out_)[buffer_size_];
}

// Used internally for both repetition and definition levels
void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
parquet::Encoding::type encoding) {
Expand All @@ -153,25 +127,55 @@ class DataPageBuilder {
}

// TODO: compute a more precise maximum size for the encoded levels
std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);

std::vector<uint8_t> encode_buffer(levels.size() * 4);

// We encode into separate memory from the output stream because the
// RLE-encoded bytes have to be preceded in the stream by their absolute
// size.
LevelEncoder encoder;
encoder.Init(encoding, max_level, levels.size(),
encode_buffer.data(), encode_buffer.size());

encoder.Encode(levels.size(), levels.data());

uint32_t rle_bytes = encoder.len();
size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
Reserve(levels_footprint);

*reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes);
buffer_size_ += levels_footprint;
sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(uint32_t));
sink_->Write(encode_buffer.data(), rle_bytes);
}
};

template <int TYPE, typename T>
static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
const std::vector<int16_t>& def_levels, int16_t max_def_level,
const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
std::vector<uint8_t>* out_buffer) {
size_t num_values = values.size();

InMemoryOutputStream page_stream;
parquet::DataPageHeader page_header;

test::DataPageBuilder<TYPE> page_builder(&page_stream, &page_header);

if (!rep_levels.empty()) {
page_builder.AppendRepLevels(rep_levels, max_rep_level,
parquet::Encoding::RLE);
}

if (!def_levels.empty()) {
page_builder.AppendDefLevels(def_levels, max_def_level,
parquet::Encoding::RLE);
}

page_builder.AppendValues(values, parquet::Encoding::PLAIN);
page_builder.Finish();

// Hand off the data stream to the passed std::vector
page_stream.Transfer(out_buffer);

return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header);
}


} // namespace test

} // namespace parquet_cpp
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/parquet/encodings/encodings.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "parquet/exception.h"
#include "parquet/types.h"

#include "parquet/util/output.h"
#include "parquet/util/rle-encoding.h"
#include "parquet/util/bit-stream-utils.inline.h"

Expand Down Expand Up @@ -82,14 +83,9 @@ class Encoder {

virtual ~Encoder() {}

// TODO(wesm): use an output stream

// Subclasses should override the ones they support
//
// @returns: the number of bytes written to dst
virtual size_t Encode(const T* src, int num_values, uint8_t* dst) {
virtual void Encode(const T* src, int num_values, OutputStream* dst) {
throw ParquetException("Encoder does not implement this type.");
return 0;
}

const parquet::Encoding::type encoding() const { return encoding_; }
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/parquet/encodings/plain-encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ TEST(BooleanTest, TestEncodeDecode) {
PlainEncoder<Type::BOOLEAN> encoder(nullptr);
PlainDecoder<Type::BOOLEAN> decoder(nullptr);

std::vector<uint8_t> encode_buffer(nbytes);
InMemoryOutputStream dst;
encoder.Encode(draws, nvalues, &dst);

size_t encoded_bytes = encoder.Encode(draws, nvalues, &encode_buffer[0]);
ASSERT_EQ(nbytes, encoded_bytes);
std::vector<uint8_t> encode_buffer;
dst.Transfer(&encode_buffer);

ASSERT_EQ(nbytes, encode_buffer.size());

std::vector<uint8_t> decode_buffer(nbytes);
const uint8_t* decode_data = &decode_buffer[0];

decoder.SetData(nvalues, &encode_buffer[0], encoded_bytes);
decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
ASSERT_EQ(nvalues, values_decoded);

Expand Down
Loading

0 comments on commit bc47477

Please sign in to comment.