Skip to content

Commit

Permalink
ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#…
Browse files Browse the repository at this point in the history
…13275)

In fringe cases, users may have Parquet files where deserializing exceeds our default Thrift size limits.


Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored Jun 7, 2022
1 parent 6a33061 commit dd33c75
Show file tree
Hide file tree
Showing 17 changed files with 407 additions and 151 deletions.
1 change: 0 additions & 1 deletion cpp/cmake_modules/SetupCxxFlags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-conversion")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wunused-result")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel")
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ parquet::ReaderProperties MakeReaderProperties(
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
properties.file_decryption_properties(
parquet_scan_options->reader_properties->file_decryption_properties());
properties.set_thrift_string_size_limit(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
parquet_scan_options->reader_properties->thrift_container_size_limit());
return properties;
}

Expand Down
31 changes: 22 additions & 9 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,15 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
class SerializedPageReader : public PageReader {
public:
SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
Compression::type codec, ::arrow::MemoryPool* pool,
Compression::type codec, const ReaderProperties& properties,
const CryptoContext* crypto_ctx)
: stream_(std::move(stream)),
decompression_buffer_(AllocateBuffer(pool, 0)),
: properties_(properties),
stream_(std::move(stream)),
decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
page_ordinal_(0),
seen_num_rows_(0),
total_num_rows_(total_num_rows),
decryption_buffer_(AllocateBuffer(pool, 0)) {
decryption_buffer_(AllocateBuffer(properties_.memory_pool(), 0)) {
if (crypto_ctx != nullptr) {
crypto_ctx_ = *crypto_ctx;
InitDecryption();
Expand All @@ -254,6 +255,7 @@ class SerializedPageReader : public PageReader {
int compressed_len, int uncompressed_len,
int levels_byte_len = 0);

const ReaderProperties properties_;
std::shared_ptr<ArrowInputStream> stream_;

format::PageHeader current_page_header_;
Expand Down Expand Up @@ -326,9 +328,10 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
}

std::shared_ptr<Page> SerializedPageReader::NextPage() {
ThriftDeserializer deserializer(properties_);

// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with

while (seen_num_rows_ < total_num_rows_) {
uint32_t header_size = 0;
uint32_t allowed_page_size = kDefaultPageHeaderSize;
Expand All @@ -349,8 +352,9 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
data_page_header_aad_);
}
DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size,
&current_page_header_, crypto_ctx_.meta_decryptor);
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
&header_size, &current_page_header_,
crypto_ctx_.meta_decryptor);
break;
} catch (std::exception& e) {
// Failed to deserialize. Double the allowed page header size and try again
Expand Down Expand Up @@ -508,13 +512,22 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(

} // namespace

std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
int64_t total_num_rows,
Compression::type codec,
const ReaderProperties& properties,
const CryptoContext* ctx) {
return std::unique_ptr<PageReader>(new SerializedPageReader(
std::move(stream), total_num_rows, codec, properties, ctx));
}

std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
int64_t total_num_rows,
Compression::type codec,
::arrow::MemoryPool* pool,
const CryptoContext* ctx) {
return std::unique_ptr<PageReader>(
new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx));
return std::unique_ptr<PageReader>(new SerializedPageReader(
std::move(stream), total_num_rows, codec, ReaderProperties(pool), ctx));
}

namespace {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "parquet/exception.h"
#include "parquet/level_conversion.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/types.h"

Expand Down Expand Up @@ -106,6 +107,10 @@ class PARQUET_EXPORT PageReader {
std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
Compression::type codec, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
const CryptoContext* ctx = NULLPTR);
static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream,
int64_t total_num_rows, Compression::type codec,
const ReaderProperties& properties,
const CryptoContext* ctx = NULLPTR);

// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
ApplicationVersion app_version(this->writer_properties_->created_by());
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
auto metadata_accessor = ColumnChunkMetaData::Make(
metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
return metadata_accessor->is_stats_set();
}

Expand All @@ -286,8 +286,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
ApplicationVersion app_version(this->writer_properties_->created_by());
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
auto metadata_accessor = ColumnChunkMetaData::Make(
metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
auto encoded_stats = metadata_accessor->statistics()->Encode();
return {encoded_stats.has_min, encoded_stats.has_max};
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ uint32_t SerializedFile::ParseUnencryptedFileMetadata(
}
uint32_t read_metadata_len = metadata_len;
// The encrypted read path falls through to here, so pass in the decryptor
file_metadata_ =
FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, file_decryptor_);
file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &read_metadata_len,
properties_, file_decryptor_);
return read_metadata_len;
}

Expand Down
Loading

0 comments on commit dd33c75

Please sign in to comment.