diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 1c2fd2dea6307..0ce08502921f3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -161,7 +161,8 @@ bool IsNan(const Scalar& value) { } std::optional ColumnChunkStatisticsAsExpression( - const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { + const FieldRef& field_ref, const SchemaField& schema_field, + const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics // are ignored by returning nullptr. The goal is two fold. First // avoid an optimization which breaks the computation. Second, allow the @@ -180,7 +181,8 @@ std::optional ColumnChunkStatisticsAsExpression( return std::nullopt; } - return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics); + return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref, + *statistics); } void AddColumnIndices(const SchemaField& schema_field, @@ -360,8 +362,9 @@ Result IsSupportedParquetFile(const ParquetFileFormat& format, } // namespace std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( - const Field& field, const parquet::Statistics& statistics) { - auto field_expr = compute::field_ref(field.name()); + const Field& field, const FieldRef& field_ref, + const parquet::Statistics& statistics) { + auto field_expr = compute::field_ref(field_ref); // Optimize for corner case where all values are nulls if (statistics.num_values() == 0 && statistics.null_count() > 0) { @@ -418,6 +421,13 @@ std::optional ParquetFileFragment::EvaluateStatisticsAsExpr return std::nullopt; } +std::optional ParquetFileFragment::EvaluateStatisticsAsExpression( + const Field& field, const parquet::Statistics& statistics) { + const auto field_name = field.name(); + return EvaluateStatisticsAsExpression(field, FieldRef(std::move(field_name)), + statistics); +} + ParquetFileFormat::ParquetFileFormat() : FileFormat(std::make_shared()) {} @@ -810,7 +820,7 @@ Status ParquetFileFragment::SetMetadata( manifest_ = std::move(manifest); statistics_expressions_.resize(row_groups_->size(), compute::literal(true)); - statistics_expressions_complete_.resize(physical_schema_->num_fields(), false); + statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false); for (int row_group : *row_groups_) { // Ensure RowGroups are indexing valid RowGroups before augmenting. @@ -900,16 +910,25 @@ Result> ParquetFileFragment::TestRowGroups( ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_)); if (match.empty()) continue; - if (statistics_expressions_complete_[match[0]]) continue; - statistics_expressions_complete_[match[0]] = true; + const SchemaField* schema_field = &manifest_->schema_fields[match[0]]; + + for (size_t i = 1; i < match.indices().size(); ++i) { + if (schema_field->field->type()->id() != Type::STRUCT) { + return Status::Invalid("nested paths only supported for structs"); + } + schema_field = &schema_field->children[match[i]]; + } + + if (!schema_field->is_leaf()) continue; + if (statistics_expressions_complete_[schema_field->column_index]) continue; + statistics_expressions_complete_[schema_field->column_index] = true; - const SchemaField& schema_field = manifest_->schema_fields[match[0]]; int i = 0; for (int row_group : *row_groups_) { auto row_group_metadata = metadata_->RowGroup(row_group); - if (auto minmax = - ColumnChunkStatisticsAsExpression(schema_field, *row_group_metadata)) { + if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field, + *row_group_metadata)) { FoldingAnd(&statistics_expressions_[i], std::move(*minmax)); ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i], statistics_expressions_[i].Bind(*physical_schema_)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f527ce5d70ae0..1e81a34fb3cf0 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -177,6 +177,10 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { static std::optional EvaluateStatisticsAsExpression( const Field& field, const parquet::Statistics& statistics); + static std::optional EvaluateStatisticsAsExpression( + const Field& field, const FieldRef& field_ref, + const parquet::Statistics& statistics); + private: ParquetFileFragment(FileSource source, std::shared_ptr format, compute::Expression partition_expression, @@ -207,7 +211,11 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// or std::nullopt if all row groups are selected. std::optional> row_groups_; + // the expressions (combined for all columns for which statistics have been + // processed) are stored per column group std::vector statistics_expressions_; + // statistics status are kept track of by Parquet Schema column indices + // (i.e. not Arrow schema field index) std::vector statistics_expressions_complete_; std::shared_ptr metadata_; std::shared_ptr manifest_; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 84d4342a25e20..76cd0af3b835f 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -655,6 +655,12 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, {5, 6}, and_(greater_equal(field_ref("i64"), literal(6)), less(field_ref("i64"), literal(8)))); + + // nested field reference + CountRowGroupsInFragment(fragment, {0, 1, 2, 3, 4}, + less(field_ref(FieldRef("struct", "i32")), literal(6))); + CountRowGroupsInFragment(fragment, {1}, + equal(field_ref(FieldRef("struct", "str")), literal("2"))); } TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) { diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e2bb4400c8bde..ae2146c0bdaee 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1648,6 +1648,42 @@ def test_fragments_parquet_subset_invalid(tempdir): fragment.subset() +@pytest.mark.parquet +def test_fragments_parquet_subset_with_nested_fields(tempdir): + # ensure row group filtering with nested field works + f1 = pa.array([0, 1, 2, 3]) + f21 = pa.array([0.1, 0.2, 0.3, 0.4]) + f22 = pa.array([1, 2, 3, 4]) + f2 = pa.StructArray.from_arrays([f21, f22], names=["f21", "f22"]) + struct_col = pa.StructArray.from_arrays([f1, f2], names=["f1", "f2"]) + table = pa.table({"col": struct_col}) + pq.write_table(table, tempdir / "data_struct.parquet", row_group_size=2) + + dataset = ds.dataset(tempdir / "data_struct.parquet", format="parquet") + fragment = list(dataset.get_fragments())[0] + assert fragment.num_row_groups == 2 + + subfrag = fragment.subset(ds.field("col", "f1") > 2) + assert subfrag.num_row_groups == 1 + subfrag = fragment.subset(ds.field("col", "f1") > 5) + assert subfrag.num_row_groups == 0 + + subfrag = fragment.subset(ds.field("col", "f2", "f21") > 0) + assert subfrag.num_row_groups == 2 + subfrag = fragment.subset(ds.field("col", "f2", "f22") <= 2) + assert subfrag.num_row_groups == 1 + + # nonexisting field ref + with pytest.raises(pa.ArrowInvalid, match="No match for FieldRef.Nested"): + fragment.subset(ds.field("col", "f3") > 0) + + # comparison with struct field is not implemented + with pytest.raises( + NotImplementedError, match="Function 'greater' has no kernel matching" + ): + fragment.subset(ds.field("col", "f2") > 0) + + @pytest.mark.pandas @pytest.mark.parquet def test_fragments_repr(tempdir, dataset):