Skip to content

Commit

Permalink
apacheGH-44079: [C++][Parquet] Remove deprecated APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 12, 2024
1 parent 85fc3eb commit 9eff241
Show file tree
Hide file tree
Showing 9 changed files with 10 additions and 352 deletions.
20 changes: 0 additions & 20 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,6 @@ Status FileWriter::Make(::arrow::MemoryPool* pool,
return Status::OK();
}

Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<WriterProperties> properties,
std::unique_ptr<FileWriter>* writer) {
ARROW_ASSIGN_OR_RAISE(
*writer, Open(std::move(schema), pool, std::move(sink), std::move(properties),
default_arrow_writer_properties()));
return Status::OK();
}

Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const ArrowWriterProperties& properties,
std::shared_ptr<const KeyValueMetadata>* out) {
Expand Down Expand Up @@ -560,16 +550,6 @@ Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* poo
return Status::OK();
}

Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<WriterProperties> properties,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
std::unique_ptr<FileWriter>* writer) {
ARROW_ASSIGN_OR_RAISE(*writer, Open(std::move(schema), pool, std::move(sink),
std::move(properties), arrow_properties));
return Status::OK();
}

Result<std::unique_ptr<FileWriter>> FileWriter::Open(
const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink,
Expand Down
12 changes: 0 additions & 12 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,6 @@ class PARQUET_EXPORT FileWriter {
std::shared_ptr<ArrowWriterProperties> arrow_properties =
default_arrow_writer_properties());

ARROW_DEPRECATED("Deprecated in 11.0.0. Use Result-returning variants instead.")
static ::arrow::Status Open(const ::arrow::Schema& schema, MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<WriterProperties> properties,
std::unique_ptr<FileWriter>* writer);
ARROW_DEPRECATED("Deprecated in 11.0.0. Use Result-returning variants instead.")
static ::arrow::Status Open(const ::arrow::Schema& schema, MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<WriterProperties> properties,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
std::unique_ptr<FileWriter>* writer);

/// Return the Arrow schema to be written to.
virtual std::shared_ptr<::arrow::Schema> schema() const = 0;

Expand Down
106 changes: 0 additions & 106 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,6 @@ constexpr int64_t kMinLevelBatchSize = 1024;
// Both RecordReader and the ColumnReader use this for skipping.
constexpr int64_t kSkipScratchBatchSize = 1024;

inline bool HasSpacedValues(const ColumnDescriptor* descr) {
if (descr->max_repetition_level() > 0) {
// repeated+flat case
return !descr->schema_node()->is_required();
} else {
// non-repeated+nested case
// Find if a node forces nulls in the lowest level along the hierarchy
const schema::Node* node = descr->schema_node().get();
while (node) {
if (node->is_optional()) {
return true;
}
node = node->parent();
}
return false;
}
}

// Throws exception if number_decoded does not match expected.
inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) {
if (ARROW_PREDICT_FALSE(number_decoded != expected)) {
Expand Down Expand Up @@ -979,11 +961,6 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, int64_t* values_read) override;

int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, uint8_t* valid_bits, int64_t valid_bits_offset,
int64_t* levels_read, int64_t* values_read,
int64_t* null_count) override;

int64_t Skip(int64_t num_values_to_skip) override;

Type::type type() const override { return this->descr_->physical_type(); }
Expand Down Expand Up @@ -1153,89 +1130,6 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def
return total_values;
}

template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
int64_t* values_read, int64_t* null_count_out) {
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*levels_read = 0;
*values_read = 0;
*null_count_out = 0;
return 0;
}

// Number of non-null values to read
int64_t total_values;
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0) {
int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}

const bool has_spaced_values = HasSpacedValues(this->descr_);
int64_t null_count = 0;
if (!has_spaced_values) {
int64_t values_to_read =
std::count(def_levels, def_levels + num_def_levels, this->max_def_level_);
total_values = this->ReadValues(values_to_read, values);
::arrow::bit_util::SetBitsTo(valid_bits, valid_bits_offset,
/*length=*/total_values,
/*bits_are_set=*/true);
*values_read = total_values;
} else {
internal::LevelInfo info;
info.repeated_ancestor_def_level = this->max_def_level_ - 1;
info.def_level = this->max_def_level_;
info.rep_level = this->max_rep_level_;
internal::ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = num_def_levels;
validity_io.valid_bits = valid_bits;
validity_io.valid_bits_offset = valid_bits_offset;
validity_io.null_count = null_count;
validity_io.values_read = *values_read;

internal::DefLevelsToBitmap(def_levels, num_def_levels, info, &validity_io);
null_count = validity_io.null_count;
*values_read = validity_io.values_read;

total_values =
this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
valid_bits, valid_bits_offset);
}
*levels_read = num_def_levels;
*null_count_out = null_count;

} else {
// Required field, read all values
total_values = this->ReadValues(batch_size, values);
::arrow::bit_util::SetBitsTo(valid_bits, valid_bits_offset,
/*length=*/total_values,
/*bits_are_set=*/true);
*null_count_out = 0;
*values_read = total_values;
*levels_read = total_values;
}

this->ConsumeBufferedValues(*levels_read);
return total_values;
}

template <typename DType>
void TypedColumnReaderImpl<DType>::InitScratchForSkip() {
if (this->scratch_for_skip_ == nullptr) {
Expand Down
42 changes: 0 additions & 42 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,48 +219,6 @@ class TypedColumnReader : public ColumnReader {
virtual int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, int64_t* values_read) = 0;

/// Read a batch of repetition levels, definition levels, and values from the
/// column and leave spaces for null entries on the lowest level in the values
/// buffer.
///
/// In comparison to ReadBatch the length of repetition and definition levels
/// is the same as of the number of values read for max_definition_level == 1.
/// In the case of max_definition_level > 1, the repetition and definition
/// levels are larger than the values but the values include the null entries
/// with definition_level == (max_definition_level - 1).
///
/// To fully exhaust a row group, you must read batches until the number of
/// values read reaches the number of stored values according to the metadata.
///
/// @param batch_size the number of levels to read
/// @param[out] def_levels The Parquet definition levels, output has
/// the length levels_read.
/// @param[out] rep_levels The Parquet repetition levels, output has
/// the length levels_read.
/// @param[out] values The values in the lowest nested level including
/// spacing for nulls on the lowest levels; output has the length
/// values_read.
/// @param[out] valid_bits Memory allocated for a bitmap that indicates if
/// the row is null or on the maximum definition level. For performance
/// reasons the underlying buffer should be able to store 1 bit more than
/// required. If this requires an additional byte, this byte is only read
/// but never written to.
/// @param valid_bits_offset The offset in bits of the valid_bits where the
/// first relevant bit resides.
/// @param[out] levels_read The number of repetition/definition levels that were read.
/// @param[out] values_read The number of values read, this includes all
/// non-null entries as well as all null-entries on the lowest level
/// (i.e. definition_level == max_definition_level - 1)
/// @param[out] null_count The number of nulls on the lowest levels.
/// (i.e. (values_read - null_count) is total number of non-null entries)
///
/// \deprecated Since 4.0.0
ARROW_DEPRECATED("Doesn't handle nesting correctly and unused outside of unit tests.")
virtual int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values, uint8_t* valid_bits,
int64_t valid_bits_offset, int64_t* levels_read,
int64_t* values_read, int64_t* null_count) = 0;

// Skip reading values. This method will work for both repeated and
// non-repeated fields. Note that this method is skipping values and not
// records. This distinction is important for repeated fields, meaning that
Expand Down
68 changes: 0 additions & 68 deletions cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,58 +125,6 @@ class TestPrimitiveReader : public ::testing::Test {
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}
void CheckResultsSpaced() {
std::vector<int32_t> vresult(num_levels_, -1);
std::vector<int16_t> dresult(num_levels_, -1);
std::vector<int16_t> rresult(num_levels_, -1);
std::vector<uint8_t> valid_bits(num_levels_, 255);
int total_values_read = 0;
int batch_actual = 0;
int levels_actual = 0;
int64_t null_count = -1;
int64_t levels_read = 0;
int64_t values_read;

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
int32_t batch_size = 8;
int batch = 0;
// This will cover both the cases
// 1) batch_size < page_size (multiple ReadBatch from a single page)
// 2) batch_size > page_size (BatchRead limits to a single page)
do {
ARROW_SUPPRESS_DEPRECATION_WARNING
batch = static_cast<int>(reader->ReadBatchSpaced(
batch_size, dresult.data() + levels_actual, rresult.data() + levels_actual,
vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0,
&levels_read, &values_read, &null_count));
ARROW_UNSUPPRESS_DEPRECATION_WARNING
total_values_read += batch - static_cast<int>(null_count);
batch_actual += batch;
levels_actual += static_cast<int>(levels_read);
batch_size = std::min(1 << 24, std::max(batch_size * 2, 4096));
} while ((batch > 0) || (levels_read > 0));

ASSERT_EQ(num_levels_, levels_actual);
ASSERT_EQ(num_values_, total_values_read);
if (max_def_level_ > 0) {
ASSERT_TRUE(vector_equal(def_levels_, dresult));
ASSERT_TRUE(vector_equal_with_def_levels(values_, dresult, max_def_level_,
max_rep_level_, vresult));
} else {
ASSERT_TRUE(vector_equal(values_, vresult));
}
if (max_rep_level_ > 0) {
ASSERT_TRUE(vector_equal(rep_levels_, rresult));
}
// catch improper writes at EOS
ARROW_SUPPRESS_DEPRECATION_WARNING
batch_actual = static_cast<int>(
reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr, valid_bits.data(), 0,
&levels_read, &values_read, &null_count));
ARROW_UNSUPPRESS_DEPRECATION_WARNING
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, null_count);
}

void Clear() {
values_.clear();
Expand All @@ -194,14 +142,6 @@ class TestPrimitiveReader : public ::testing::Test {
InitReader(d);
CheckResults();
Clear();

num_values_ =
MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
values_, data_buffer_, pages_, Encoding::PLAIN);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResultsSpaced();
Clear();
}

void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
Expand All @@ -212,14 +152,6 @@ class TestPrimitiveReader : public ::testing::Test {
InitReader(d);
CheckResults();
Clear();

num_values_ =
MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResultsSpaced();
Clear();
}

protected:
Expand Down
12 changes: 0 additions & 12 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,18 +718,6 @@ std::unique_ptr<PageWriter> PageWriter::Open(
}
}

std::unique_ptr<PageWriter> PageWriter::Open(
std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
std::shared_ptr<Encryptor> data_encryptor, bool page_write_checksum_enabled,
ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* offset_index_builder) {
return PageWriter::Open(sink, codec, metadata, row_group_ordinal, column_chunk_ordinal,
pool, buffered_row_group, meta_encryptor, data_encryptor,
page_write_checksum_enabled, column_index_builder,
offset_index_builder, CodecOptions{compression_level});
}
// ----------------------------------------------------------------------
// ColumnWriter

Expand Down
15 changes: 0 additions & 15 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,6 @@ class PARQUET_EXPORT PageWriter {
OffsetIndexBuilder* offset_index_builder = NULLPTR,
const CodecOptions& codec_options = CodecOptions{});

ARROW_DEPRECATED("Deprecated in 13.0.0. Use CodecOptions-taking overload instead.")
static std::unique_ptr<PageWriter> Open(
std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal = -1, int16_t column_chunk_ordinal = -1,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
bool buffered_row_group = false,
std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
bool page_write_checksum_enabled = false,
// column_index_builder MUST outlive the PageWriter
ColumnIndexBuilder* column_index_builder = NULLPTR,
// offset_index_builder MUST outlive the PageWriter
OffsetIndexBuilder* offset_index_builder = NULLPTR);

// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on reaching dictionary
// page limit
Expand Down
Loading

0 comments on commit 9eff241

Please sign in to comment.