diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 6d7bdfa6cf90c..1216fe27d4e70 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -85,18 +85,23 @@ static auto kScalarAggregateOptionsType = GetFunctionOptionsType(DataMember("mode", &CountOptions::mode)); -static auto kModeOptionsType = - GetFunctionOptionsType(DataMember("n", &ModeOptions::n)); +static auto kModeOptionsType = GetFunctionOptionsType( + DataMember("n", &ModeOptions::n), DataMember("skip_nulls", &ModeOptions::skip_nulls), + DataMember("min_count", &ModeOptions::min_count)); static auto kVarianceOptionsType = GetFunctionOptionsType( DataMember("ddof", &VarianceOptions::ddof), DataMember("skip_nulls", &VarianceOptions::skip_nulls), DataMember("min_count", &VarianceOptions::min_count)); static auto kQuantileOptionsType = GetFunctionOptionsType( DataMember("q", &QuantileOptions::q), - DataMember("interpolation", &QuantileOptions::interpolation)); + DataMember("interpolation", &QuantileOptions::interpolation), + DataMember("skip_nulls", &QuantileOptions::skip_nulls), + DataMember("min_count", &QuantileOptions::min_count)); static auto kTDigestOptionsType = GetFunctionOptionsType( DataMember("q", &TDigestOptions::q), DataMember("delta", &TDigestOptions::delta), - DataMember("buffer_size", &TDigestOptions::buffer_size)); + DataMember("buffer_size", &TDigestOptions::buffer_size), + DataMember("skip_nulls", &TDigestOptions::skip_nulls), + DataMember("min_count", &TDigestOptions::min_count)); static auto kIndexOptionsType = GetFunctionOptionsType(DataMember("value", &IndexOptions::value)); } // namespace @@ -112,7 +117,11 @@ CountOptions::CountOptions(CountMode mode) : FunctionOptions(internal::kCountOptionsType), mode(mode) {} constexpr char CountOptions::kTypeName[]; -ModeOptions::ModeOptions(int64_t n) : FunctionOptions(internal::kModeOptionsType), n(n) {} +ModeOptions::ModeOptions(int64_t n, bool skip_nulls, uint32_t min_count) + : FunctionOptions(internal::kModeOptionsType), + n{n}, + skip_nulls{skip_nulls}, + min_count{min_count} {} constexpr char ModeOptions::kTypeName[]; VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count) @@ -122,27 +131,38 @@ VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count) min_count(min_count) {} constexpr char VarianceOptions::kTypeName[]; -QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation) +QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation, + bool skip_nulls, uint32_t min_count) : FunctionOptions(internal::kQuantileOptionsType), q{q}, - interpolation{interpolation} {} -QuantileOptions::QuantileOptions(std::vector q, enum Interpolation interpolation) + interpolation{interpolation}, + skip_nulls{skip_nulls}, + min_count{min_count} {} +QuantileOptions::QuantileOptions(std::vector q, enum Interpolation interpolation, + bool skip_nulls, uint32_t min_count) : FunctionOptions(internal::kQuantileOptionsType), q{std::move(q)}, - interpolation{interpolation} {} + interpolation{interpolation}, + skip_nulls{skip_nulls}, + min_count{min_count} {} constexpr char QuantileOptions::kTypeName[]; -TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size) +TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size, + bool skip_nulls, uint32_t min_count) : FunctionOptions(internal::kTDigestOptionsType), q{q}, delta{delta}, - buffer_size{buffer_size} {} + buffer_size{buffer_size}, + skip_nulls{skip_nulls}, + min_count{min_count} {} TDigestOptions::TDigestOptions(std::vector q, uint32_t delta, - uint32_t buffer_size) + uint32_t buffer_size, bool skip_nulls, uint32_t min_count) : FunctionOptions(internal::kTDigestOptionsType), q{std::move(q)}, delta{delta}, - buffer_size{buffer_size} {} + buffer_size{buffer_size}, + skip_nulls{skip_nulls}, + min_count{min_count} {} constexpr char TDigestOptions::kTypeName[]; IndexOptions::IndexOptions(std::shared_ptr value) diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 8c27da497653c..c8df81773d4c5 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -82,11 +82,16 @@ class ARROW_EXPORT CountOptions : public FunctionOptions { /// By default, returns the most common value and count. class ARROW_EXPORT ModeOptions : public FunctionOptions { public: - explicit ModeOptions(int64_t n = 1); + explicit ModeOptions(int64_t n = 1, bool skip_nulls = true, uint32_t min_count = 0); constexpr static char const kTypeName[] = "ModeOptions"; static ModeOptions Defaults() { return ModeOptions{}; } int64_t n = 1; + /// If true (the default), null values are ignored. Otherwise, if any value is null, + /// emit null. + bool skip_nulls; + /// If less than this many non-null values are observed, emit null. + uint32_t min_count; }; /// \brief Control Delta Degrees of Freedom (ddof) of Variance and Stddev kernel @@ -121,10 +126,12 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions { MIDPOINT, }; - explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR); + explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR, + bool skip_nulls = true, uint32_t min_count = 0); explicit QuantileOptions(std::vector q, - enum Interpolation interpolation = LINEAR); + enum Interpolation interpolation = LINEAR, + bool skip_nulls = true, uint32_t min_count = 0); constexpr static char const kTypeName[] = "QuantileOptions"; static QuantileOptions Defaults() { return QuantileOptions{}; } @@ -132,6 +139,11 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions { /// quantile must be between 0 and 1 inclusive std::vector q; enum Interpolation interpolation; + /// If true (the default), null values are ignored. Otherwise, if any value is null, + /// emit null. + bool skip_nulls; + /// If less than this many non-null values are observed, emit null. + uint32_t min_count; }; /// \brief Control TDigest approximate quantile kernel behavior @@ -140,9 +152,11 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions { class ARROW_EXPORT TDigestOptions : public FunctionOptions { public: explicit TDigestOptions(double q = 0.5, uint32_t delta = 100, - uint32_t buffer_size = 500); + uint32_t buffer_size = 500, bool skip_nulls = true, + uint32_t min_count = 0); explicit TDigestOptions(std::vector q, uint32_t delta = 100, - uint32_t buffer_size = 500); + uint32_t buffer_size = 500, bool skip_nulls = true, + uint32_t min_count = 0); constexpr static char const kTypeName[] = "TDigestOptions"; static TDigestOptions Defaults() { return TDigestOptions{}; } @@ -152,6 +166,11 @@ class ARROW_EXPORT TDigestOptions : public FunctionOptions { uint32_t delta; /// input buffer size, default 500 uint32_t buffer_size; + /// If true (the default), null values are ignored. Otherwise, if any value is null, + /// emit null. + bool skip_nulls; + /// If less than this many non-null values are observed, emit null. + uint32_t min_count; }; /// \brief Control Index kernel behavior diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc index 6ad0eeb64564c..f225f6bf569c3 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc @@ -130,6 +130,13 @@ struct CountModer { Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { // count values in all chunks, ignore nulls const Datum& datum = batch[0]; + + const ModeOptions& options = ModeState::Get(ctx); + if ((!options.skip_nulls && datum.null_count() > 0) || + (datum.length() - datum.null_count() < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, out).status(); + } + CountValues(this->counts.data(), datum, this->min); // generator to emit next value:count pair @@ -154,9 +161,16 @@ struct CountModer { template <> struct CountModer { Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const Datum& datum = batch[0]; + + const ModeOptions& options = ModeState::Get(ctx); + if ((!options.skip_nulls && datum.null_count() > 0) || + (datum.length() - datum.null_count() < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, out).status(); + } + int64_t counts[2]{}; - const Datum& datum = batch[0]; for (const auto& array : datum.chunks()) { if (array->length() > array->null_count()) { const int64_t true_count = @@ -167,7 +181,6 @@ struct CountModer { } } - const ModeOptions& options = ModeState::Get(ctx); const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0); const int64_t n = std::min(options.n, distinct_values); @@ -198,12 +211,19 @@ struct SortModer { using Allocator = arrow::stl::allocator; Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const Datum& datum = batch[0]; + const int64_t in_length = datum.length() - datum.null_count(); + + const ModeOptions& options = ModeState::Get(ctx); + if ((!options.skip_nulls && datum.null_count() > 0) || + (in_length < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, out).status(); + } + // copy all chunks to a buffer, ignore nulls and nans std::vector in_buffer(Allocator(ctx->memory_pool())); uint64_t nan_count = 0; - const Datum& datum = batch[0]; - const int64_t in_length = datum.length() - datum.null_count(); if (in_length > 0) { in_buffer.resize(in_length); CopyNonNullValues(datum, in_buffer.data()); @@ -305,6 +325,13 @@ struct Moder::value>> { template Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) { using CType = typename T::c_type; + + const ModeOptions& options = ModeState::Get(ctx); + if ((!options.skip_nulls && !scalar.is_valid) || + (static_cast(scalar.is_valid) < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, out).status(); + } + if (scalar.is_valid) { bool called = false; return Finalize(ctx, out, [&]() { diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index 7d2ffe0770cbe..bfd97f813e569 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -79,12 +79,18 @@ struct SortQuantiler { Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { const QuantileOptions& options = QuantileState::Get(ctx); + const Datum& datum = batch[0]; // copy all chunks to a buffer, ignore nulls and nans std::vector in_buffer(Allocator(ctx->memory_pool())); + int64_t in_length = 0; + if ((!options.skip_nulls && datum.null_count() > 0) || + (datum.length() - datum.null_count() < options.min_count)) { + in_length = 0; + } else { + in_length = datum.length() - datum.null_count(); + } - const Datum& datum = batch[0]; - const int64_t in_length = datum.length() - datum.null_count(); if (in_length > 0) { in_buffer.resize(in_length); CopyNonNullValues(datum, in_buffer.data()); @@ -232,7 +238,11 @@ struct CountQuantiler { // count values in all chunks, ignore nulls const Datum& datum = batch[0]; - int64_t in_length = CountValues(this->counts.data(), datum, this->min); + int64_t in_length = 0; + if ((options.skip_nulls || (!options.skip_nulls && datum.null_count() == 0)) && + (datum.length() - datum.null_count() >= options.min_count)) { + in_length = CountValues(this->counts.data(), datum, this->min); + } // prepare out array int64_t out_length = options.q.size(); @@ -394,7 +404,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, const Scalar& scalar, Datum* out) { using CType = typename T::c_type; ArrayData* output = out->mutable_array(); - if (!scalar.is_valid) { + if (!scalar.is_valid || options.min_count > 1) { output->length = 0; output->null_count = 0; return Status::OK(); diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index be8d66c4c247e..3b616c664a9ff 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -37,14 +37,23 @@ struct TDigestImpl : public ScalarAggregator { using CType = typename ArrowType::c_type; explicit TDigestImpl(const TDigestOptions& options) - : q{options.q}, tdigest{options.delta, options.buffer_size} {} + : options{options}, + tdigest{options.delta, options.buffer_size}, + count{0}, + all_valid{true} {} Status Consume(KernelContext*, const ExecBatch& batch) override { + if (!this->all_valid) return Status::OK(); + if (!options.skip_nulls && batch[0].null_count() > 0) { + this->all_valid = false; + return Status::OK(); + } if (batch[0].is_array()) { const ArrayData& data = *batch[0].array(); const CType* values = data.GetValues(1); if (data.length > data.GetNullCount()) { + this->count += data.length - data.GetNullCount(); VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { @@ -55,6 +64,7 @@ struct TDigestImpl : public ScalarAggregator { } else { const CType value = UnboxScalar::Unbox(*batch[0].scalar()); if (batch[0].scalar()->is_valid) { + this->count += 1; for (int64_t i = 0; i < batch.length; i++) { this->tdigest.NanAdd(value); } @@ -64,13 +74,21 @@ struct TDigestImpl : public ScalarAggregator { } Status MergeFrom(KernelContext*, KernelState&& src) override { - auto& other = checked_cast(src); + const auto& other = checked_cast(src); + if (!this->all_valid || !other.all_valid) { + this->all_valid = false; + return Status::OK(); + } this->tdigest.Merge(other.tdigest); + this->count += other.count; return Status::OK(); } Status Finalize(KernelContext* ctx, Datum* out) override { - const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size(); + const int64_t out_length = + (this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count) + ? 0 + : options.q.size(); auto out_data = ArrayData::Make(float64(), out_length, 0); out_data->buffers.resize(2, nullptr); @@ -79,7 +97,7 @@ struct TDigestImpl : public ScalarAggregator { ctx->Allocate(out_length * sizeof(double))); double* out_buffer = out_data->template GetMutableValues(1); for (int64_t i = 0; i < out_length; ++i) { - out_buffer[i] = this->tdigest.Quantile(this->q[i]); + out_buffer[i] = this->tdigest.Quantile(this->options.q[i]); } } @@ -87,8 +105,10 @@ struct TDigestImpl : public ScalarAggregator { return Status::OK(); } - const std::vector q; + const TDigestOptions options; TDigest tdigest; + int64_t count; + bool all_valid; }; struct TDigestInitState { diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index eb73e703b6eda..587e203318499 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1954,10 +1954,10 @@ class TestPrimitiveModeKernel : public ::testing::Test { using Traits = TypeTraits; using CType = typename ArrowType::c_type; - void AssertModesAre(const Datum& array, const int n, + void AssertModesAre(const Datum& array, const ModeOptions options, const std::vector& expected_modes, const std::vector& expected_counts) { - ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, ModeOptions{n})); + ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options)); ValidateOutput(out); const StructArray out_array(out.array()); ASSERT_EQ(out_array.length(), expected_modes.size()); @@ -1978,11 +1978,18 @@ class TestPrimitiveModeKernel : public ::testing::Test { const std::vector& expected_modes, const std::vector& expected_counts) { auto array = ArrayFromJSON(type_singleton(), json); - AssertModesAre(array, n, expected_modes, expected_counts); + AssertModesAre(array, ModeOptions(n), expected_modes, expected_counts); + } + + void AssertModesAre(const std::string& json, const ModeOptions options, + const std::vector& expected_modes, + const std::vector& expected_counts) { + auto array = ArrayFromJSON(type_singleton(), json); + AssertModesAre(array, options, expected_modes, expected_counts); } void AssertModeIs(const Datum& array, CType expected_mode, int64_t expected_count) { - AssertModesAre(array, 1, {expected_mode}, {expected_count}); + AssertModesAre(array, ModeOptions(1), {expected_mode}, {expected_count}); } void AssertModeIs(const std::string& json, CType expected_mode, @@ -1997,8 +2004,8 @@ class TestPrimitiveModeKernel : public ::testing::Test { AssertModeIs(chunked, expected_mode, expected_count); } - void AssertModesEmpty(const Datum& array, int n) { - ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, ModeOptions{n})); + void AssertModesEmpty(const Datum& array, ModeOptions options) { + ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options)); auto out_array = out.make_array(); ValidateOutput(*out_array); ASSERT_EQ(out.array()->length, 0); @@ -2006,12 +2013,17 @@ class TestPrimitiveModeKernel : public ::testing::Test { void AssertModesEmpty(const std::string& json, int n = 1) { auto array = ArrayFromJSON(type_singleton(), json); - AssertModesEmpty(array, n); + AssertModesEmpty(array, ModeOptions(n)); } void AssertModesEmpty(const std::vector& json, int n = 1) { auto chunked = ChunkedArrayFromJSON(type_singleton(), json); - AssertModesEmpty(chunked, n); + AssertModesEmpty(chunked, ModeOptions(n)); + } + + void AssertModesEmpty(const std::string& json, ModeOptions options) { + auto array = ArrayFromJSON(type_singleton(), json); + AssertModesEmpty(array, options); } std::shared_ptr type_singleton() { return Traits::type_singleton(); } @@ -2049,13 +2061,37 @@ TEST_F(TestBooleanModeKernel, Basics) { {true, false}, {3, 2}); this->AssertModesEmpty({"[null, null]", "[]", "[null]"}, 4); - auto ty = struct_({field("mode", boolean()), field("count", int64())}); - Datum mode_true = ArrayFromJSON(ty, "[[true, 1]]"); - Datum mode_false = ArrayFromJSON(ty, "[[false, 1]]"); - Datum mode_empty = ArrayFromJSON(ty, "[]"); - EXPECT_THAT(Mode(Datum(true)), ResultWith(mode_true)); - EXPECT_THAT(Mode(Datum(false)), ResultWith(mode_false)); - EXPECT_THAT(Mode(MakeNullScalar(boolean())), ResultWith(mode_empty)); + auto in_ty = boolean(); + this->AssertModesAre("[true, false, false, null]", ModeOptions(/*n=*/1), {false}, {2}); + this->AssertModesEmpty("[true, false, false, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false)); + this->AssertModesAre("[true, false, false, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), + {false}, {2}); + this->AssertModesEmpty("[false, false, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3)); + this->AssertModesAre("[true, false, false]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), + {false}, {2}); + this->AssertModesEmpty("[true, false, false, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3)); + this->AssertModesEmpty("[true, false]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3)); + this->AssertModesAre(ScalarFromJSON(in_ty, "true"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {true}, {1}); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "true"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "true"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2)); + + this->AssertModesAre(ScalarFromJSON(in_ty, "true"), ModeOptions(/*n=*/1), {true}, {1}); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1)); } TYPED_TEST_SUITE(TestIntegerModeKernel, IntegralArrowTypes); @@ -2077,10 +2113,35 @@ TYPED_TEST(TestIntegerModeKernel, Basics) { this->AssertModesEmpty("[null, null, null]", 10); auto in_ty = this->type_singleton(); - auto ty = struct_({field("mode", in_ty), field("count", int64())}); - EXPECT_THAT(Mode(*MakeScalar(in_ty, 5)), - ResultWith(Datum(ArrayFromJSON(ty, "[[5, 1]]")))); - EXPECT_THAT(Mode(MakeNullScalar(in_ty)), ResultWith(Datum(ArrayFromJSON(ty, "[]")))); + + this->AssertModesAre("[1, 2, 2, null]", ModeOptions(/*n=*/1), {2}, {2}); + this->AssertModesEmpty("[1, 2, 2, null]", ModeOptions(/*n=*/1, /*skip_nulls=*/false)); + this->AssertModesAre("[1, 2, 2, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {2}, + {2}); + this->AssertModesEmpty("[2, 2, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3)); + this->AssertModesAre( + "[1, 2, 2]", ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {2}, {2}); + this->AssertModesEmpty("[1, 2, 2, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3)); + this->AssertModesEmpty("[1, 2]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3)); + this->AssertModesAre(ScalarFromJSON(in_ty, "1"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {1}, {1}); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2)); + + this->AssertModesAre(ScalarFromJSON(in_ty, "5"), ModeOptions(/*n=*/1), {5}, {1}); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1)); } TYPED_TEST_SUITE(TestFloatingModeKernel, RealArrowTypes); @@ -2108,10 +2169,35 @@ TYPED_TEST(TestFloatingModeKernel, Floats) { this->AssertModesAre("[NaN, NaN, 1, null, 1, 2, 2]", 3, {1, 2, NAN}, {2, 2, 2}); auto in_ty = this->type_singleton(); - auto ty = struct_({field("mode", in_ty), field("count", int64())}); - EXPECT_THAT(Mode(*MakeScalar(in_ty, 5.0)), - ResultWith(Datum(ArrayFromJSON(ty, "[[5.0, 1]]")))); - EXPECT_THAT(Mode(MakeNullScalar(in_ty)), ResultWith(Datum(ArrayFromJSON(ty, "[]")))); + + this->AssertModesAre("[1, 2, 2, null]", ModeOptions(/*n=*/1), {2}, {2}); + this->AssertModesEmpty("[1, 2, 2, null]", ModeOptions(/*n=*/1, /*skip_nulls=*/false)); + this->AssertModesAre("[1, 2, 2, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {2}, + {2}); + this->AssertModesEmpty("[2, 2, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3)); + this->AssertModesAre( + "[1, 2, 2]", ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {2}, {2}); + this->AssertModesEmpty("[1, 2, 2, null]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3)); + this->AssertModesEmpty("[1, 2]", + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3)); + this->AssertModesAre(ScalarFromJSON(in_ty, "1"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {1}, {1}); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2)); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2)); + + this->AssertModesAre(ScalarFromJSON(in_ty, "5"), ModeOptions(/*n=*/1), {5}, {1}); + this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1)); } TEST_F(TestInt8ModeKernelValueRange, Basics) { @@ -2672,6 +2758,36 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) { this->AssertQuantilesEmpty({"[null, null]", "[]", "[null]"}, {0.3, 0.4}); auto ty = this->type_singleton(); + + QuantileOptions keep_nulls(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/false, + /*min_count=*/0); + QuantileOptions min_count(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/true, + /*min_count=*/3); + QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR, + /*skip_nulls=*/false, /*min_count=*/3); + auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]")); + auto empty = ResultWith(ArrayFromJSON(float64(), "[]")); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls), not_empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), min_count), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), min_count), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), min_count), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), min_count), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls_min_count), + not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls_min_count), + empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls_min_count), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls_min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls_min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls_min_count), empty); + for (const auto interpolation : this->interpolations_) { QuantileOptions options({0.0, 0.5, 1.0}, interpolation); auto expected_ty = (interpolation == QuantileOptions::LINEAR || @@ -2718,6 +2834,36 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) { this->AssertQuantilesEmpty({"[NaN, NaN]", "[]", "[null]"}, {0.3, 0.4}); auto ty = this->type_singleton(); + + QuantileOptions keep_nulls(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/false, + /*min_count=*/0); + QuantileOptions min_count(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/true, + /*min_count=*/3); + QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR, + /*skip_nulls=*/false, /*min_count=*/3); + auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]")); + auto empty = ResultWith(ArrayFromJSON(float64(), "[]")); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls), not_empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), min_count), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), min_count), not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), min_count), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), min_count), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls_min_count), + not_empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls_min_count), + empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls_min_count), empty); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls_min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls_min_count), empty); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls_min_count), empty); + for (const auto interpolation : this->interpolations_) { QuantileOptions options({0.0, 0.5, 1.0}, interpolation); auto expected_ty = (interpolation == QuantileOptions::LINEAR || @@ -3015,5 +3161,44 @@ TEST(TestTDigestKernel, Scalar) { } } +TEST(TestTDigestKernel, Options) { + auto ty = float64(); + TDigestOptions keep_nulls(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, + /*skip_nulls=*/false, /*min_count=*/0); + TDigestOptions min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, + /*skip_nulls=*/true, /*min_count=*/3); + TDigestOptions keep_nulls_min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, + /*skip_nulls=*/false, /*min_count=*/3); + + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[2.0]"))); + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[]"))); + EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[1.0]"))); + EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[]"))); + + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), min_count), + ResultWith(ArrayFromJSON(ty, "[2.0]"))); + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, null]"), min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); + EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); + EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); + + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[2.0]"))); + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0]"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); + EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); + EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); + EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[]"))); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 3ea692857cf20..23bb73f2a7ffd 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1593,6 +1593,8 @@ struct GroupedTDigestImpl : public GroupedAggregator { options_ = *checked_cast(options); ctx_ = ctx; pool_ = ctx->memory_pool(); + counts_ = TypedBufferBuilder(pool_); + no_nulls_ = TypedBufferBuilder(pool_); return Status::OK(); } @@ -1602,12 +1604,21 @@ struct GroupedTDigestImpl : public GroupedAggregator { for (int64_t i = 0; i < added_groups; i++) { tdigests_.emplace_back(options_.delta, options_.buffer_size); } + RETURN_NOT_OK(counts_.Append(new_num_groups, 0)); + RETURN_NOT_OK(no_nulls_.Append(new_num_groups, true)); return Status::OK(); } Status Consume(const ExecBatch& batch) override { - VisitGroupedValuesNonNull( - batch, [&](uint32_t g, CType value) { tdigests_[g].NanAdd(value); }); + int64_t* counts = counts_.mutable_data(); + uint8_t* no_nulls = no_nulls_.mutable_data(); + VisitGroupedValues( + batch, + [&](uint32_t g, CType value) { + tdigests_[g].NanAdd(value); + counts[g]++; + }, + [&](uint32_t g) { BitUtil::SetBitTo(no_nulls, g, false); }); return Status::OK(); } @@ -1615,15 +1626,26 @@ struct GroupedTDigestImpl : public GroupedAggregator { const ArrayData& group_id_mapping) override { auto other = checked_cast(&raw_other); + int64_t* counts = counts_.mutable_data(); + uint8_t* no_nulls = no_nulls_.mutable_data(); + + const int64_t* other_counts = other->counts_.data(); + const uint8_t* other_no_nulls = no_nulls_.mutable_data(); + auto g = group_id_mapping.GetValues(1); for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { tdigests_[*g].Merge(other->tdigests_[other_g]); + counts[*g] += other_counts[other_g]; + BitUtil::SetBitTo( + no_nulls, *g, + BitUtil::GetBit(no_nulls, *g) && BitUtil::GetBit(other_no_nulls, other_g)); } return Status::OK(); } Result Finalize() override { + const int64_t* counts = counts_.data(); std::shared_ptr null_bitmap; ARROW_ASSIGN_OR_RAISE( std::shared_ptr values, @@ -1633,7 +1655,7 @@ struct GroupedTDigestImpl : public GroupedAggregator { double* results = reinterpret_cast(values->mutable_data()); for (int64_t i = 0; static_cast(i) < tdigests_.size(); ++i) { - if (!tdigests_[i].is_empty()) { + if (!tdigests_[i].is_empty() && counts[i] >= options_.min_count) { for (int64_t j = 0; j < slot_length; j++) { results[i * slot_length + j] = tdigests_[i].Quantile(options_.q[j]); } @@ -1649,6 +1671,18 @@ struct GroupedTDigestImpl : public GroupedAggregator { std::fill(&results[i * slot_length], &results[(i + 1) * slot_length], 0.0); } + if (!options_.skip_nulls) { + null_count = kUnknownNullCount; + if (null_bitmap) { + arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0, + no_nulls_.data(), /*right_offset=*/0, + static_cast(tdigests_.size()), + /*out_offset=*/0, null_bitmap->mutable_data()); + } else { + ARROW_ASSIGN_OR_RAISE(null_bitmap, no_nulls_.Finish()); + } + } + auto child = ArrayData::Make(float64(), tdigests_.size() * options_.q.size(), {nullptr, std::move(values)}, /*null_count=*/0); return ArrayData::Make(out_type(), tdigests_.size(), {std::move(null_bitmap)}, @@ -1661,6 +1695,8 @@ struct GroupedTDigestImpl : public GroupedAggregator { TDigestOptions options_; std::vector tdigests_; + TypedBufferBuilder counts_; + TypedBufferBuilder no_nulls_; ExecContext* ctx_; MemoryPool* pool_; }; diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 32e8efa0ab8de..df13bd569ea24 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1086,27 +1086,40 @@ TEST(GroupBy, VarianceAndStddev) { TEST(GroupBy, TDigest) { auto batch = RecordBatchFromJSON( schema({field("argument", float64()), field("key", int64())}), R"([ - [1, 1], - [null, 1], - [0, 2], - [null, 3], - [4, null], - [3, 1], - [0, 2], - [-1, 2], - [1, null], - [NaN, 3] + [1, 1], + [null, 1], + [0, 2], + [null, 3], + [1, 4], + [4, null], + [3, 1], + [0, 2], + [-1, 2], + [1, null], + [NaN, 3], + [1, 4], + [1, 4], + [null, 4] ])"); TDigestOptions options1(std::vector{0.5, 0.9, 0.99}); TDigestOptions options2(std::vector{0.5, 0.9, 0.99}, /*delta=*/50, /*buffer_size=*/1024); + TDigestOptions keep_nulls(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, + /*skip_nulls=*/false, /*min_count=*/0); + TDigestOptions min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, + /*skip_nulls=*/true, /*min_count=*/3); + TDigestOptions keep_nulls_min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, + /*skip_nulls=*/false, /*min_count=*/3); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, internal::GroupBy( { batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), }, { batch->GetColumnByName("key"), @@ -1115,6 +1128,9 @@ TEST(GroupBy, TDigest) { {"hash_tdigest", nullptr}, {"hash_tdigest", &options1}, {"hash_tdigest", &options2}, + {"hash_tdigest", &keep_nulls}, + {"hash_tdigest", &min_count}, + {"hash_tdigest", &keep_nulls_min_count}, })); AssertDatumsApproxEqual( @@ -1122,13 +1138,17 @@ TEST(GroupBy, TDigest) { field("hash_tdigest", fixed_size_list(float64(), 1)), field("hash_tdigest", fixed_size_list(float64(), 3)), field("hash_tdigest", fixed_size_list(float64(), 3)), + field("hash_tdigest", fixed_size_list(float64(), 1)), + field("hash_tdigest", fixed_size_list(float64(), 1)), + field("hash_tdigest", fixed_size_list(float64(), 1)), field("key_0", int64()), }), R"([ - [[1.0], [1.0, 3.0, 3.0], [1.0, 3.0, 3.0], 1], - [[0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], 2], - [null, null, null, 3], - [[1.0], [1.0, 4.0, 4.0], [1.0, 4.0, 4.0], null] + [[1.0], [1.0, 3.0, 3.0], [1.0, 3.0, 3.0], null, null, null, 1], + [[0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0], [0.0], [0.0], 2], + [null, null, null, null, null, null, 3], + [[1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0], null, [1.0], null, 4], + [[1.0], [1.0, 4.0, 4.0], [1.0, 4.0, 4.0], [1.0], null, null, null] ])"), aggregated_and_grouped, /*verbose=*/true); diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 39bb5315f7aec..29c579f85a985 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -924,13 +924,13 @@ class IndexOptions(_IndexOptions): cdef class _ModeOptions(FunctionOptions): - def _set_options(self, n): - self.wrapped.reset(new CModeOptions(n)) + def _set_options(self, n, skip_nulls, min_count): + self.wrapped.reset(new CModeOptions(n, skip_nulls, min_count)) class ModeOptions(_ModeOptions): - def __init__(self, n=1): - self._set_options(n) + def __init__(self, n=1, skip_nulls=True, min_count=0): + self._set_options(n, skip_nulls, min_count) cdef class _SetLookupOptions(FunctionOptions): @@ -1096,7 +1096,7 @@ class SortOptions(_SortOptions): cdef class _QuantileOptions(FunctionOptions): - def _set_options(self, quantiles, interp): + def _set_options(self, quantiles, interp, skip_nulls, min_count): interp_dict = { 'linear': CQuantileInterp_LINEAR, 'lower': CQuantileInterp_LOWER, @@ -1109,24 +1109,29 @@ cdef class _QuantileOptions(FunctionOptions): '{!r} is not a valid interpolation' .format(interp)) self.wrapped.reset( - new CQuantileOptions(quantiles, interp_dict[interp])) + new CQuantileOptions(quantiles, interp_dict[interp], + skip_nulls, min_count)) class QuantileOptions(_QuantileOptions): - def __init__(self, *, q=0.5, interpolation='linear'): + def __init__(self, *, q=0.5, interpolation='linear', + skip_nulls=True, min_count=0): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] - self._set_options(q, interpolation) + self._set_options(q, interpolation, skip_nulls, min_count) cdef class _TDigestOptions(FunctionOptions): - def _set_options(self, quantiles, delta, buffer_size): + def _set_options(self, quantiles, delta, buffer_size, + skip_nulls, min_count): self.wrapped.reset( - new CTDigestOptions(quantiles, delta, buffer_size)) + new CTDigestOptions(quantiles, delta, buffer_size, + skip_nulls, min_count)) class TDigestOptions(_TDigestOptions): - def __init__(self, *, q=0.5, delta=100, buffer_size=500): + def __init__(self, *, q=0.5, delta=100, buffer_size=500, + skip_nulls=True, min_count=0): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] - self._set_options(q, delta, buffer_size) + self._set_options(q, delta, buffer_size, skip_nulls, min_count) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 6f8b9fa3dae43..4bc4034f64c7d 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -445,7 +445,7 @@ def match_substring_regex(array, pattern, *, ignore_case=False): MatchSubstringOptions(pattern, ignore_case)) -def mode(array, n=1): +def mode(array, n=1, skip_nulls=True, min_count=0): """ Return top-n most common values and number of times they occur in a passed numerical (chunked) array, in descending order of occurance. If there are @@ -454,6 +454,12 @@ def mode(array, n=1): Parameters ---------- array : pyarrow.Array or pyarrow.ChunkedArray + skip_nulls : bool, default True + If True, ignore nulls in the input. Else return an empty array + if any input is null. + min_count : int, default 0 + If there are fewer than this many values in the input, return + an empty array. Returns ------- @@ -470,7 +476,7 @@ def mode(array, n=1): >>> modes[1] """ - options = ModeOptions(n=n) + options = ModeOptions(n=n, skip_nulls=skip_nulls, min_count=min_count) return call_function("mode", [array], options) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4f9f4184b2d36..29351e0b648d7 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1993,8 +1993,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: cdef cppclass CModeOptions \ "arrow::compute::ModeOptions"(CFunctionOptions): - CModeOptions(int64_t n) + CModeOptions(int64_t n, c_bool skip_nulls, uint32_t min_count) int64_t n + c_bool skip_nulls + uint32_t min_count cdef cppclass CIndexOptions \ "arrow::compute::IndexOptions"(CFunctionOptions): @@ -2041,17 +2043,23 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: cdef cppclass CQuantileOptions \ "arrow::compute::QuantileOptions"(CFunctionOptions): - CQuantileOptions(vector[double] q, CQuantileInterp interpolation) + CQuantileOptions(vector[double] q, CQuantileInterp interpolation, + c_bool skip_nulls, uint32_t min_count) vector[double] q CQuantileInterp interpolation + c_bool skip_nulls + uint32_t min_count cdef cppclass CTDigestOptions \ "arrow::compute::TDigestOptions"(CFunctionOptions): CTDigestOptions(vector[double] q, - unsigned int delta, unsigned int buffer_size) + unsigned int delta, unsigned int buffer_size, + c_bool skip_nulls, uint32_t min_count) vector[double] q unsigned int delta unsigned int buffer_size + c_bool skip_nulls + uint32_t min_count enum DatumType" arrow::Datum::type": DatumType_NONE" arrow::Datum::NONE" diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index c6a106fbebdc4..bbef46f247710 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -306,6 +306,14 @@ def test_mode_array(): arr = pa.array([], type='int64') assert len(pc.mode(arr)) == 0 + arr = pa.array([1, 1, 3, 4, 3, None], type='int64') + mode = pc.mode(arr, skip_nulls=False) + assert len(mode) == 0 + mode = pc.mode(arr, min_count=6) + assert len(mode) == 0 + mode = pc.mode(arr, skip_nulls=False, min_count=5) + assert len(mode) == 0 + def test_mode_chunked_array(): # ARROW-9917 @@ -650,7 +658,8 @@ def test_generated_signatures(): "options=None, skip_nulls=True, min_count=1)") sig = inspect.signature(pc.quantile) assert str(sig) == ("(array, *, memory_pool=None, " - "options=None, q=0.5, interpolation='linear')") + "options=None, q=0.5, interpolation='linear', " + "skip_nulls=True, min_count=0)") sig = inspect.signature(pc.binary_join_element_wise) assert str(sig) == ("(*strings, memory_pool=None, options=None, " "null_handling='emit_null', null_replacement='')") diff --git a/r/src/compute.cpp b/r/src/compute.cpp index e84f70016a5ef..7d17f111d7488 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -225,6 +225,12 @@ std::shared_ptr make_compute_options( cpp11::as_cpp( interpolation); } + if (!Rf_isNull(options["na.min_count"])) { + out->min_count = cpp11::as_cpp(options["na.min_count"]); + } + if (!Rf_isNull(options["na.rm"])) { + out->skip_nulls = cpp11::as_cpp(options["na.rm"]); + } return out; } @@ -376,8 +382,8 @@ std::shared_ptr make_compute_options( using Options = arrow::compute::VarianceOptions; auto out = std::make_shared(); out->ddof = cpp11::as_cpp(options["ddof"]); - if (!Rf_isNull(options["na.min_count"])) { - out->min_count = cpp11::as_cpp(options["na.min_count"]); + if (!Rf_isNull(options["min_count"])) { + out->min_count = cpp11::as_cpp(options["min_count"]); } if (!Rf_isNull(options["na.rm"])) { out->skip_nulls = cpp11::as_cpp(options["na.rm"]);