Skip to content

Commit

Permalink
GH-41579: [C++][Python][Parquet] Support reading/writing key-value me…
Browse files Browse the repository at this point in the history
…tadata from/to ColumnChunkMetaData (#41580)

### Rationale for this change
Parquet standard allows reading/writing key-value metadata from/to ColumnChunkMetaData, but there is no way to do that with Parquet C++.

### What changes are included in this PR?
Support reading/writing key-value metadata from/to ColumnChunkMetaData with Parquet C++ reader/writer. Support reading key-value metadata from ColumnChunkMetaData with pyarrow.parquet.

### Are these changes tested?
Yes, unit tests are added

### Are there any user-facing changes?
Yes.
- Users can read or write key-value metadata for column chunks with Parquet C++.
- Users can read key-value metadata for column chunks with PyArrow.
- parquet-reader tool prints key-value metadata in column chunks when `--print-key-value-metadata` option is used.

* GitHub Issue: #41579

Lead-authored-by: Chungmin Lee <chungminlee@microsoft.com>
Co-authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
  • Loading branch information
clee704 and mapleFU committed Aug 15, 2024
1 parent 2e434da commit 2767dc5
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 26 deletions.
24 changes: 24 additions & 0 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/float16.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding_internal.h"
#include "arrow/util/type_traits.h"
Expand Down Expand Up @@ -832,6 +833,9 @@ class ColumnWriterImpl {
void FlushBufferedDataPages();

ColumnChunkMetaDataBuilder* metadata_;
// key_value_metadata_ for the column chunk
// It would be nullptr if there is no KeyValueMetadata set.
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
const ColumnDescriptor* descr_;
// scratch buffer if validity bits need to be recalculated.
std::shared_ptr<ResizableBuffer> bits_buffer_;
Expand Down Expand Up @@ -1100,6 +1104,7 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}

Expand Down Expand Up @@ -1397,6 +1402,25 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
return pages_change_on_record_boundaries_;
}

void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}

void ResetKeyValueMetadata() override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
key_value_metadata_ = nullptr;
}

private:
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstring>
#include <memory>

#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
Expand Down Expand Up @@ -181,6 +182,17 @@ class PARQUET_EXPORT ColumnWriter {
/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;

/// \brief Add key-value metadata to the ColumnChunk.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// \throw ParquetException if Close() has been called.
virtual void AddKeyValueMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0;

/// \brief Reset the ColumnChunk key-value metadata.
/// \throw ParquetException if Close() has been called.
virtual void ResetKeyValueMetadata() = 0;

/// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
/// error status if the array data type is not compatible with the concrete
/// writer type.
Expand Down
69 changes: 69 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <gtest/gtest.h>

#include "arrow/io/buffered.h"
#include "arrow/io/file.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/config.h"
#include "arrow/util/key_value_metadata.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
Expand All @@ -51,6 +53,9 @@ using schema::PrimitiveNode;

namespace test {

using ::testing::IsNull;
using ::testing::NotNull;

// The default size used in most tests.
const int SMALL_SIZE = 100;
#ifdef PARQUET_VALGRIND
Expand Down Expand Up @@ -385,6 +390,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->encoding_stats();
}

std::shared_ptr<const KeyValueMetadata> metadata_key_value_metadata() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
return metadata_accessor->key_value_metadata();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down Expand Up @@ -1705,5 +1719,60 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
}
}

using TestInt32Writer = TestPrimitiveWriter<Int32Type>;

TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->AddKeyValueMetadata(
KeyValueMetadata::Make({"hello", "bye"}, {"world", "earth"}));
// overwrite the previous value
writer->AddKeyValueMetadata(KeyValueMetadata::Make({"bye"}, {"moon"}));
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_EQ(2, key_value_metadata->size());
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("hello"));
ASSERT_EQ("world", value);
ASSERT_OK_AND_ASSIGN(value, key_value_metadata->Get("bye"));
ASSERT_EQ("moon", value);
}

TEST_F(TestInt32Writer, ResetKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"}));
writer->ResetKeyValueMetadata();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
auto sink = CreateOutputStream();
{
auto file_writer = ParquetFileWriter::Open(
sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_.schema_root()));
auto rg_writer = file_writer->AppendRowGroup();
auto col_writer = rg_writer->NextColumn();
col_writer->AddKeyValueMetadata(KeyValueMetadata::Make({"foo"}, {"bar"}));
file_writer->Close();
}
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto file_reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
auto key_value_metadata =
file_reader->metadata()->RowGroup(0)->ColumnChunk(0)->key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_EQ(1U, key_value_metadata->size());
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("foo"));
ASSERT_EQ("bar", value);
}

} // namespace test
} // namespace parquet
84 changes: 65 additions & 19 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,39 @@ std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_d
throw ParquetException("Can't decode page statistics for selected column type");
}

// Get KeyValueMetadata from parquet Thrift RowGroup or ColumnChunk metadata.
//
// Returns nullptr if the metadata is not set.
template <typename Metadata>
std::shared_ptr<KeyValueMetadata> FromThriftKeyValueMetadata(const Metadata& source) {
std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (source.__isset.key_value_metadata) {
std::vector<std::string> keys;
std::vector<std::string> values;
keys.reserve(source.key_value_metadata.size());
values.reserve(source.key_value_metadata.size());
for (const auto& it : source.key_value_metadata) {
keys.push_back(it.key);
values.push_back(it.value);
}
metadata = std::make_shared<KeyValueMetadata>(std::move(keys), std::move(values));
}
return metadata;
}

template <typename Metadata>
void ToThriftKeyValueMetadata(const KeyValueMetadata& source, Metadata* metadata) {
std::vector<format::KeyValue> key_value_metadata;
key_value_metadata.reserve(static_cast<size_t>(source.size()));
for (int64_t i = 0; i < source.size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(source.key(i));
kv_pair.__set_value(source.value(i));
key_value_metadata.emplace_back(std::move(kv_pair));
}
metadata->__set_key_value_metadata(std::move(key_value_metadata));
}

// MetaData Accessor

// ColumnCryptoMetaData
Expand Down Expand Up @@ -233,6 +266,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
encoding_stats.count});
}
possible_stats_ = nullptr;
InitKeyValueMetadata();
}

bool Equals(const ColumnChunkMetaDataImpl& other) const {
Expand Down Expand Up @@ -343,7 +377,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return std::nullopt;
}

const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const {
return key_value_metadata_;
}

private:
void InitKeyValueMetadata() {
key_value_metadata_ = FromThriftKeyValueMetadata(*column_metadata_);
}

mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
std::vector<PageEncodingStats> encoding_stats_;
Expand All @@ -353,6 +395,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const ColumnDescriptor* descr_;
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
Expand Down Expand Up @@ -471,6 +514,11 @@ bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const {
return impl_->Equals(*other.impl_);
}

const std::shared_ptr<const KeyValueMetadata>& ColumnChunkMetaData::key_value_metadata()
const {
return impl_->key_value_metadata();
}

// row-group metadata
class RowGroupMetaData::RowGroupMetaDataImpl {
public:
Expand Down Expand Up @@ -913,7 +961,7 @@ class FileMetaData::FileMetaDataImpl {
std::vector<parquet::ColumnOrder> column_orders;
if (metadata_->__isset.column_orders) {
column_orders.reserve(metadata_->column_orders.size());
for (auto column_order : metadata_->column_orders) {
for (auto& column_order : metadata_->column_orders) {
if (column_order.__isset.TYPE_ORDER) {
column_orders.push_back(ColumnOrder::type_defined_);
} else {
Expand All @@ -928,14 +976,7 @@ class FileMetaData::FileMetaDataImpl {
}

void InitKeyValueMetadata() {
std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (metadata_->__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : metadata_->key_value_metadata) {
metadata->Append(it.key, it.value);
}
}
key_value_metadata_ = std::move(metadata);
key_value_metadata_ = FromThriftKeyValueMetadata(*metadata_);
}
};

Expand Down Expand Up @@ -1590,6 +1631,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings));
column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats));

if (key_value_metadata_) {
ToThriftKeyValueMetadata(*key_value_metadata_, &column_chunk_->meta_data);
}

const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
// column is encrypted
Expand Down Expand Up @@ -1656,6 +1701,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
return column_chunk_->meta_data.total_compressed_size;
}

void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
key_value_metadata_ = std::move(key_value_metadata);
}

private:
void Init(format::ColumnChunk* column_chunk) {
column_chunk_ = column_chunk;
Expand All @@ -1670,6 +1719,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
const ColumnDescriptor* column_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
Expand Down Expand Up @@ -1727,6 +1777,11 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result)
impl_->SetStatistics(result);
}

void ColumnChunkMetaDataBuilder::SetKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
impl_->SetKeyValueMetadata(std::move(key_value_metadata));
}

int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
return impl_->total_compressed_size();
}
Expand Down Expand Up @@ -1925,16 +1980,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
} else if (key_value_metadata) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(
static_cast<size_t>(key_value_metadata_->size()));
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(key_value_metadata_->key(i));
kv_pair.__set_value(key_value_metadata_->value(i));
metadata_->key_value_metadata.push_back(std::move(kv_pair));
}
metadata_->__isset.key_value_metadata = true;
ToThriftKeyValueMetadata(*key_value_metadata_, metadata_.get());
}

int32_t file_version = 0;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const;
std::optional<IndexLocation> GetColumnIndexLocation() const;
std::optional<IndexLocation> GetOffsetIndexLocation() const;
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;

private:
explicit ColumnChunkMetaData(
Expand Down Expand Up @@ -466,8 +467,12 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// column chunk
// Used when a dataset is spread across multiple files
void set_file_path(const std::string& path);

// column metadata
void SetStatistics(const EncodedStatistics& stats);

void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata);

// get the column descriptor
const ColumnDescriptor* descr() const;

Expand Down
Loading

0 comments on commit 2767dc5

Please sign in to comment.