From e1d10561d31aa3ac2c9c3f13a7f14ab4e22148a2 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 3 Sep 2021 08:43:28 -0400
Subject: [PATCH] ARROW-13782: [C++] Add skip_nulls/min_count to
tdigest/mode/quantile
Closes #11061 from lidavidm/arrow-13782
Authored-by: David Li
Signed-off-by: David Li
---
cpp/src/arrow/compute/api_aggregate.cc | 46 +++-
cpp/src/arrow/compute/api_aggregate.h | 29 ++-
.../arrow/compute/kernels/aggregate_mode.cc | 35 ++-
.../compute/kernels/aggregate_quantile.cc | 18 +-
.../compute/kernels/aggregate_tdigest.cc | 30 ++-
.../arrow/compute/kernels/aggregate_test.cc | 231 ++++++++++++++++--
.../arrow/compute/kernels/hash_aggregate.cc | 42 +++-
.../compute/kernels/hash_aggregate_test.cc | 48 ++--
python/pyarrow/_compute.pyx | 29 ++-
python/pyarrow/compute.py | 10 +-
python/pyarrow/includes/libarrow.pxd | 14 +-
python/pyarrow/tests/test_compute.py | 11 +-
r/src/compute.cpp | 10 +-
13 files changed, 462 insertions(+), 91 deletions(-)
diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc
index 6d7bdfa6cf90c..1216fe27d4e70 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -85,18 +85,23 @@ static auto kScalarAggregateOptionsType = GetFunctionOptionsType(DataMember("mode", &CountOptions::mode));
-static auto kModeOptionsType =
- GetFunctionOptionsType(DataMember("n", &ModeOptions::n));
+static auto kModeOptionsType = GetFunctionOptionsType(
+ DataMember("n", &ModeOptions::n), DataMember("skip_nulls", &ModeOptions::skip_nulls),
+ DataMember("min_count", &ModeOptions::min_count));
static auto kVarianceOptionsType = GetFunctionOptionsType(
DataMember("ddof", &VarianceOptions::ddof),
DataMember("skip_nulls", &VarianceOptions::skip_nulls),
DataMember("min_count", &VarianceOptions::min_count));
static auto kQuantileOptionsType = GetFunctionOptionsType(
DataMember("q", &QuantileOptions::q),
- DataMember("interpolation", &QuantileOptions::interpolation));
+ DataMember("interpolation", &QuantileOptions::interpolation),
+ DataMember("skip_nulls", &QuantileOptions::skip_nulls),
+ DataMember("min_count", &QuantileOptions::min_count));
static auto kTDigestOptionsType = GetFunctionOptionsType(
DataMember("q", &TDigestOptions::q), DataMember("delta", &TDigestOptions::delta),
- DataMember("buffer_size", &TDigestOptions::buffer_size));
+ DataMember("buffer_size", &TDigestOptions::buffer_size),
+ DataMember("skip_nulls", &TDigestOptions::skip_nulls),
+ DataMember("min_count", &TDigestOptions::min_count));
static auto kIndexOptionsType =
GetFunctionOptionsType(DataMember("value", &IndexOptions::value));
} // namespace
@@ -112,7 +117,11 @@ CountOptions::CountOptions(CountMode mode)
: FunctionOptions(internal::kCountOptionsType), mode(mode) {}
constexpr char CountOptions::kTypeName[];
-ModeOptions::ModeOptions(int64_t n) : FunctionOptions(internal::kModeOptionsType), n(n) {}
+ModeOptions::ModeOptions(int64_t n, bool skip_nulls, uint32_t min_count)
+ : FunctionOptions(internal::kModeOptionsType),
+ n{n},
+ skip_nulls{skip_nulls},
+ min_count{min_count} {}
constexpr char ModeOptions::kTypeName[];
VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count)
@@ -122,27 +131,38 @@ VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count)
min_count(min_count) {}
constexpr char VarianceOptions::kTypeName[];
-QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation)
+QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation,
+ bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kQuantileOptionsType),
q{q},
- interpolation{interpolation} {}
-QuantileOptions::QuantileOptions(std::vector q, enum Interpolation interpolation)
+ interpolation{interpolation},
+ skip_nulls{skip_nulls},
+ min_count{min_count} {}
+QuantileOptions::QuantileOptions(std::vector q, enum Interpolation interpolation,
+ bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kQuantileOptionsType),
q{std::move(q)},
- interpolation{interpolation} {}
+ interpolation{interpolation},
+ skip_nulls{skip_nulls},
+ min_count{min_count} {}
constexpr char QuantileOptions::kTypeName[];
-TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size)
+TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size,
+ bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kTDigestOptionsType),
q{q},
delta{delta},
- buffer_size{buffer_size} {}
+ buffer_size{buffer_size},
+ skip_nulls{skip_nulls},
+ min_count{min_count} {}
TDigestOptions::TDigestOptions(std::vector q, uint32_t delta,
- uint32_t buffer_size)
+ uint32_t buffer_size, bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kTDigestOptionsType),
q{std::move(q)},
delta{delta},
- buffer_size{buffer_size} {}
+ buffer_size{buffer_size},
+ skip_nulls{skip_nulls},
+ min_count{min_count} {}
constexpr char TDigestOptions::kTypeName[];
IndexOptions::IndexOptions(std::shared_ptr value)
diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h
index 8c27da497653c..c8df81773d4c5 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -82,11 +82,16 @@ class ARROW_EXPORT CountOptions : public FunctionOptions {
/// By default, returns the most common value and count.
class ARROW_EXPORT ModeOptions : public FunctionOptions {
public:
- explicit ModeOptions(int64_t n = 1);
+ explicit ModeOptions(int64_t n = 1, bool skip_nulls = true, uint32_t min_count = 0);
constexpr static char const kTypeName[] = "ModeOptions";
static ModeOptions Defaults() { return ModeOptions{}; }
int64_t n = 1;
+ /// If true (the default), null values are ignored. Otherwise, if any value is null,
+ /// emit null.
+ bool skip_nulls;
+ /// If less than this many non-null values are observed, emit null.
+ uint32_t min_count;
};
/// \brief Control Delta Degrees of Freedom (ddof) of Variance and Stddev kernel
@@ -121,10 +126,12 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
MIDPOINT,
};
- explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR);
+ explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR,
+ bool skip_nulls = true, uint32_t min_count = 0);
explicit QuantileOptions(std::vector q,
- enum Interpolation interpolation = LINEAR);
+ enum Interpolation interpolation = LINEAR,
+ bool skip_nulls = true, uint32_t min_count = 0);
constexpr static char const kTypeName[] = "QuantileOptions";
static QuantileOptions Defaults() { return QuantileOptions{}; }
@@ -132,6 +139,11 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
/// quantile must be between 0 and 1 inclusive
std::vector q;
enum Interpolation interpolation;
+ /// If true (the default), null values are ignored. Otherwise, if any value is null,
+ /// emit null.
+ bool skip_nulls;
+ /// If less than this many non-null values are observed, emit null.
+ uint32_t min_count;
};
/// \brief Control TDigest approximate quantile kernel behavior
@@ -140,9 +152,11 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
class ARROW_EXPORT TDigestOptions : public FunctionOptions {
public:
explicit TDigestOptions(double q = 0.5, uint32_t delta = 100,
- uint32_t buffer_size = 500);
+ uint32_t buffer_size = 500, bool skip_nulls = true,
+ uint32_t min_count = 0);
explicit TDigestOptions(std::vector q, uint32_t delta = 100,
- uint32_t buffer_size = 500);
+ uint32_t buffer_size = 500, bool skip_nulls = true,
+ uint32_t min_count = 0);
constexpr static char const kTypeName[] = "TDigestOptions";
static TDigestOptions Defaults() { return TDigestOptions{}; }
@@ -152,6 +166,11 @@ class ARROW_EXPORT TDigestOptions : public FunctionOptions {
uint32_t delta;
/// input buffer size, default 500
uint32_t buffer_size;
+ /// If true (the default), null values are ignored. Otherwise, if any value is null,
+ /// emit null.
+ bool skip_nulls;
+ /// If less than this many non-null values are observed, emit null.
+ uint32_t min_count;
};
/// \brief Control Index kernel behavior
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index 6ad0eeb64564c..f225f6bf569c3 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -130,6 +130,13 @@ struct CountModer {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// count values in all chunks, ignore nulls
const Datum& datum = batch[0];
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ if ((!options.skip_nulls && datum.null_count() > 0) ||
+ (datum.length() - datum.null_count() < options.min_count)) {
+ return PrepareOutput(/*n=*/0, ctx, out).status();
+ }
+
CountValues(this->counts.data(), datum, this->min);
// generator to emit next value:count pair
@@ -154,9 +161,16 @@ struct CountModer {
template <>
struct CountModer {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const Datum& datum = batch[0];
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ if ((!options.skip_nulls && datum.null_count() > 0) ||
+ (datum.length() - datum.null_count() < options.min_count)) {
+ return PrepareOutput(/*n=*/0, ctx, out).status();
+ }
+
int64_t counts[2]{};
- const Datum& datum = batch[0];
for (const auto& array : datum.chunks()) {
if (array->length() > array->null_count()) {
const int64_t true_count =
@@ -167,7 +181,6 @@ struct CountModer {
}
}
- const ModeOptions& options = ModeState::Get(ctx);
const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
const int64_t n = std::min(options.n, distinct_values);
@@ -198,12 +211,19 @@ struct SortModer {
using Allocator = arrow::stl::allocator;
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ if ((!options.skip_nulls && datum.null_count() > 0) ||
+ (in_length < options.min_count)) {
+ return PrepareOutput(/*n=*/0, ctx, out).status();
+ }
+
// copy all chunks to a buffer, ignore nulls and nans
std::vector in_buffer(Allocator(ctx->memory_pool()));
uint64_t nan_count = 0;
- const Datum& datum = batch[0];
- const int64_t in_length = datum.length() - datum.null_count();
if (in_length > 0) {
in_buffer.resize(in_length);
CopyNonNullValues(datum, in_buffer.data());
@@ -305,6 +325,13 @@ struct Moder::value>> {
template
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ if ((!options.skip_nulls && !scalar.is_valid) ||
+ (static_cast(scalar.is_valid) < options.min_count)) {
+ return PrepareOutput(/*n=*/0, ctx, out).status();
+ }
+
if (scalar.is_valid) {
bool called = false;
return Finalize(ctx, out, [&]() {
diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
index 7d2ffe0770cbe..bfd97f813e569 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
@@ -79,12 +79,18 @@ struct SortQuantiler {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const QuantileOptions& options = QuantileState::Get(ctx);
+ const Datum& datum = batch[0];
// copy all chunks to a buffer, ignore nulls and nans
std::vector in_buffer(Allocator(ctx->memory_pool()));
+ int64_t in_length = 0;
+ if ((!options.skip_nulls && datum.null_count() > 0) ||
+ (datum.length() - datum.null_count() < options.min_count)) {
+ in_length = 0;
+ } else {
+ in_length = datum.length() - datum.null_count();
+ }
- const Datum& datum = batch[0];
- const int64_t in_length = datum.length() - datum.null_count();
if (in_length > 0) {
in_buffer.resize(in_length);
CopyNonNullValues(datum, in_buffer.data());
@@ -232,7 +238,11 @@ struct CountQuantiler {
// count values in all chunks, ignore nulls
const Datum& datum = batch[0];
- int64_t in_length = CountValues(this->counts.data(), datum, this->min);
+ int64_t in_length = 0;
+ if ((options.skip_nulls || (!options.skip_nulls && datum.null_count() == 0)) &&
+ (datum.length() - datum.null_count() >= options.min_count)) {
+ in_length = CountValues(this->counts.data(), datum, this->min);
+ }
// prepare out array
int64_t out_length = options.q.size();
@@ -394,7 +404,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
ArrayData* output = out->mutable_array();
- if (!scalar.is_valid) {
+ if (!scalar.is_valid || options.min_count > 1) {
output->length = 0;
output->null_count = 0;
return Status::OK();
diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
index be8d66c4c247e..3b616c664a9ff 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
@@ -37,14 +37,23 @@ struct TDigestImpl : public ScalarAggregator {
using CType = typename ArrowType::c_type;
explicit TDigestImpl(const TDigestOptions& options)
- : q{options.q}, tdigest{options.delta, options.buffer_size} {}
+ : options{options},
+ tdigest{options.delta, options.buffer_size},
+ count{0},
+ all_valid{true} {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (!this->all_valid) return Status::OK();
+ if (!options.skip_nulls && batch[0].null_count() > 0) {
+ this->all_valid = false;
+ return Status::OK();
+ }
if (batch[0].is_array()) {
const ArrayData& data = *batch[0].array();
const CType* values = data.GetValues(1);
if (data.length > data.GetNullCount()) {
+ this->count += data.length - data.GetNullCount();
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
@@ -55,6 +64,7 @@ struct TDigestImpl : public ScalarAggregator {
} else {
const CType value = UnboxScalar::Unbox(*batch[0].scalar());
if (batch[0].scalar()->is_valid) {
+ this->count += 1;
for (int64_t i = 0; i < batch.length; i++) {
this->tdigest.NanAdd(value);
}
@@ -64,13 +74,21 @@ struct TDigestImpl : public ScalarAggregator {
}
Status MergeFrom(KernelContext*, KernelState&& src) override {
- auto& other = checked_cast(src);
+ const auto& other = checked_cast(src);
+ if (!this->all_valid || !other.all_valid) {
+ this->all_valid = false;
+ return Status::OK();
+ }
this->tdigest.Merge(other.tdigest);
+ this->count += other.count;
return Status::OK();
}
Status Finalize(KernelContext* ctx, Datum* out) override {
- const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size();
+ const int64_t out_length =
+ (this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count)
+ ? 0
+ : options.q.size();
auto out_data = ArrayData::Make(float64(), out_length, 0);
out_data->buffers.resize(2, nullptr);
@@ -79,7 +97,7 @@ struct TDigestImpl : public ScalarAggregator {
ctx->Allocate(out_length * sizeof(double)));
double* out_buffer = out_data->template GetMutableValues(1);
for (int64_t i = 0; i < out_length; ++i) {
- out_buffer[i] = this->tdigest.Quantile(this->q[i]);
+ out_buffer[i] = this->tdigest.Quantile(this->options.q[i]);
}
}
@@ -87,8 +105,10 @@ struct TDigestImpl : public ScalarAggregator {
return Status::OK();
}
- const std::vector q;
+ const TDigestOptions options;
TDigest tdigest;
+ int64_t count;
+ bool all_valid;
};
struct TDigestInitState {
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index eb73e703b6eda..587e203318499 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -1954,10 +1954,10 @@ class TestPrimitiveModeKernel : public ::testing::Test {
using Traits = TypeTraits;
using CType = typename ArrowType::c_type;
- void AssertModesAre(const Datum& array, const int n,
+ void AssertModesAre(const Datum& array, const ModeOptions options,
const std::vector& expected_modes,
const std::vector& expected_counts) {
- ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, ModeOptions{n}));
+ ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options));
ValidateOutput(out);
const StructArray out_array(out.array());
ASSERT_EQ(out_array.length(), expected_modes.size());
@@ -1978,11 +1978,18 @@ class TestPrimitiveModeKernel : public ::testing::Test {
const std::vector& expected_modes,
const std::vector& expected_counts) {
auto array = ArrayFromJSON(type_singleton(), json);
- AssertModesAre(array, n, expected_modes, expected_counts);
+ AssertModesAre(array, ModeOptions(n), expected_modes, expected_counts);
+ }
+
+ void AssertModesAre(const std::string& json, const ModeOptions options,
+ const std::vector& expected_modes,
+ const std::vector& expected_counts) {
+ auto array = ArrayFromJSON(type_singleton(), json);
+ AssertModesAre(array, options, expected_modes, expected_counts);
}
void AssertModeIs(const Datum& array, CType expected_mode, int64_t expected_count) {
- AssertModesAre(array, 1, {expected_mode}, {expected_count});
+ AssertModesAre(array, ModeOptions(1), {expected_mode}, {expected_count});
}
void AssertModeIs(const std::string& json, CType expected_mode,
@@ -1997,8 +2004,8 @@ class TestPrimitiveModeKernel : public ::testing::Test {
AssertModeIs(chunked, expected_mode, expected_count);
}
- void AssertModesEmpty(const Datum& array, int n) {
- ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, ModeOptions{n}));
+ void AssertModesEmpty(const Datum& array, ModeOptions options) {
+ ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options));
auto out_array = out.make_array();
ValidateOutput(*out_array);
ASSERT_EQ(out.array()->length, 0);
@@ -2006,12 +2013,17 @@ class TestPrimitiveModeKernel : public ::testing::Test {
void AssertModesEmpty(const std::string& json, int n = 1) {
auto array = ArrayFromJSON(type_singleton(), json);
- AssertModesEmpty(array, n);
+ AssertModesEmpty(array, ModeOptions(n));
}
void AssertModesEmpty(const std::vector& json, int n = 1) {
auto chunked = ChunkedArrayFromJSON(type_singleton(), json);
- AssertModesEmpty(chunked, n);
+ AssertModesEmpty(chunked, ModeOptions(n));
+ }
+
+ void AssertModesEmpty(const std::string& json, ModeOptions options) {
+ auto array = ArrayFromJSON(type_singleton(), json);
+ AssertModesEmpty(array, options);
}
std::shared_ptr type_singleton() { return Traits::type_singleton(); }
@@ -2049,13 +2061,37 @@ TEST_F(TestBooleanModeKernel, Basics) {
{true, false}, {3, 2});
this->AssertModesEmpty({"[null, null]", "[]", "[null]"}, 4);
- auto ty = struct_({field("mode", boolean()), field("count", int64())});
- Datum mode_true = ArrayFromJSON(ty, "[[true, 1]]");
- Datum mode_false = ArrayFromJSON(ty, "[[false, 1]]");
- Datum mode_empty = ArrayFromJSON(ty, "[]");
- EXPECT_THAT(Mode(Datum(true)), ResultWith(mode_true));
- EXPECT_THAT(Mode(Datum(false)), ResultWith(mode_false));
- EXPECT_THAT(Mode(MakeNullScalar(boolean())), ResultWith(mode_empty));
+ auto in_ty = boolean();
+ this->AssertModesAre("[true, false, false, null]", ModeOptions(/*n=*/1), {false}, {2});
+ this->AssertModesEmpty("[true, false, false, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false));
+ this->AssertModesAre("[true, false, false, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3),
+ {false}, {2});
+ this->AssertModesEmpty("[false, false, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3));
+ this->AssertModesAre("[true, false, false]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3),
+ {false}, {2});
+ this->AssertModesEmpty("[true, false, false, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3));
+ this->AssertModesEmpty("[true, false]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3));
+ this->AssertModesAre(ScalarFromJSON(in_ty, "true"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false), {true}, {1});
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "true"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "true"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2));
+
+ this->AssertModesAre(ScalarFromJSON(in_ty, "true"), ModeOptions(/*n=*/1), {true}, {1});
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1));
}
TYPED_TEST_SUITE(TestIntegerModeKernel, IntegralArrowTypes);
@@ -2077,10 +2113,35 @@ TYPED_TEST(TestIntegerModeKernel, Basics) {
this->AssertModesEmpty("[null, null, null]", 10);
auto in_ty = this->type_singleton();
- auto ty = struct_({field("mode", in_ty), field("count", int64())});
- EXPECT_THAT(Mode(*MakeScalar(in_ty, 5)),
- ResultWith(Datum(ArrayFromJSON(ty, "[[5, 1]]"))));
- EXPECT_THAT(Mode(MakeNullScalar(in_ty)), ResultWith(Datum(ArrayFromJSON(ty, "[]"))));
+
+ this->AssertModesAre("[1, 2, 2, null]", ModeOptions(/*n=*/1), {2}, {2});
+ this->AssertModesEmpty("[1, 2, 2, null]", ModeOptions(/*n=*/1, /*skip_nulls=*/false));
+ this->AssertModesAre("[1, 2, 2, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {2},
+ {2});
+ this->AssertModesEmpty("[2, 2, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3));
+ this->AssertModesAre(
+ "[1, 2, 2]", ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {2}, {2});
+ this->AssertModesEmpty("[1, 2, 2, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3));
+ this->AssertModesEmpty("[1, 2]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3));
+ this->AssertModesAre(ScalarFromJSON(in_ty, "1"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false), {1}, {1});
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2));
+
+ this->AssertModesAre(ScalarFromJSON(in_ty, "5"), ModeOptions(/*n=*/1), {5}, {1});
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1));
}
TYPED_TEST_SUITE(TestFloatingModeKernel, RealArrowTypes);
@@ -2108,10 +2169,35 @@ TYPED_TEST(TestFloatingModeKernel, Floats) {
this->AssertModesAre("[NaN, NaN, 1, null, 1, 2, 2]", 3, {1, 2, NAN}, {2, 2, 2});
auto in_ty = this->type_singleton();
- auto ty = struct_({field("mode", in_ty), field("count", int64())});
- EXPECT_THAT(Mode(*MakeScalar(in_ty, 5.0)),
- ResultWith(Datum(ArrayFromJSON(ty, "[[5.0, 1]]"))));
- EXPECT_THAT(Mode(MakeNullScalar(in_ty)), ResultWith(Datum(ArrayFromJSON(ty, "[]"))));
+
+ this->AssertModesAre("[1, 2, 2, null]", ModeOptions(/*n=*/1), {2}, {2});
+ this->AssertModesEmpty("[1, 2, 2, null]", ModeOptions(/*n=*/1, /*skip_nulls=*/false));
+ this->AssertModesAre("[1, 2, 2, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {2},
+ {2});
+ this->AssertModesEmpty("[2, 2, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3));
+ this->AssertModesAre(
+ "[1, 2, 2]", ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {2}, {2});
+ this->AssertModesEmpty("[1, 2, 2, null]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3));
+ this->AssertModesEmpty("[1, 2]",
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3));
+ this->AssertModesAre(ScalarFromJSON(in_ty, "1"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false), {1}, {1});
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "1"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2));
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2));
+
+ this->AssertModesAre(ScalarFromJSON(in_ty, "5"), ModeOptions(/*n=*/1), {5}, {1});
+ this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1));
}
TEST_F(TestInt8ModeKernelValueRange, Basics) {
@@ -2672,6 +2758,36 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) {
this->AssertQuantilesEmpty({"[null, null]", "[]", "[null]"}, {0.3, 0.4});
auto ty = this->type_singleton();
+
+ QuantileOptions keep_nulls(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/false,
+ /*min_count=*/0);
+ QuantileOptions min_count(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/true,
+ /*min_count=*/3);
+ QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR,
+ /*skip_nulls=*/false, /*min_count=*/3);
+ auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]"));
+ auto empty = ResultWith(ArrayFromJSON(float64(), "[]"));
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls), not_empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), min_count), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), min_count), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), min_count), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), min_count), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls_min_count),
+ not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls_min_count),
+ empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls_min_count), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls_min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls_min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls_min_count), empty);
+
for (const auto interpolation : this->interpolations_) {
QuantileOptions options({0.0, 0.5, 1.0}, interpolation);
auto expected_ty = (interpolation == QuantileOptions::LINEAR ||
@@ -2718,6 +2834,36 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) {
this->AssertQuantilesEmpty({"[NaN, NaN]", "[]", "[null]"}, {0.3, 0.4});
auto ty = this->type_singleton();
+
+ QuantileOptions keep_nulls(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/false,
+ /*min_count=*/0);
+ QuantileOptions min_count(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/true,
+ /*min_count=*/3);
+ QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR,
+ /*skip_nulls=*/false, /*min_count=*/3);
+ auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]"));
+ auto empty = ResultWith(ArrayFromJSON(float64(), "[]"));
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls), not_empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), min_count), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), min_count), not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), min_count), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), min_count), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls_min_count),
+ not_empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls_min_count),
+ empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls_min_count), empty);
+ EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5, null]"), keep_nulls_min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "3"), keep_nulls_min_count), empty);
+ EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls_min_count), empty);
+
for (const auto interpolation : this->interpolations_) {
QuantileOptions options({0.0, 0.5, 1.0}, interpolation);
auto expected_ty = (interpolation == QuantileOptions::LINEAR ||
@@ -3015,5 +3161,44 @@ TEST(TestTDigestKernel, Scalar) {
}
}
+TEST(TestTDigestKernel, Options) {
+ auto ty = float64();
+ TDigestOptions keep_nulls(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
+ /*skip_nulls=*/false, /*min_count=*/0);
+ TDigestOptions min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
+ /*skip_nulls=*/true, /*min_count=*/3);
+ TDigestOptions keep_nulls_min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
+ /*skip_nulls=*/false, /*min_count=*/3);
+
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls),
+ ResultWith(ArrayFromJSON(ty, "[2.0]")));
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+ EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls),
+ ResultWith(ArrayFromJSON(ty, "[1.0]")));
+ EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), min_count),
+ ResultWith(ArrayFromJSON(ty, "[2.0]")));
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, null]"), min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+ EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+ EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls_min_count),
+ ResultWith(ArrayFromJSON(ty, "[2.0]")));
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0]"), keep_nulls_min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+ EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls_min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+ EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls_min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+ EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls_min_count),
+ ResultWith(ArrayFromJSON(ty, "[]")));
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 3ea692857cf20..23bb73f2a7ffd 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -1593,6 +1593,8 @@ struct GroupedTDigestImpl : public GroupedAggregator {
options_ = *checked_cast(options);
ctx_ = ctx;
pool_ = ctx->memory_pool();
+ counts_ = TypedBufferBuilder(pool_);
+ no_nulls_ = TypedBufferBuilder(pool_);
return Status::OK();
}
@@ -1602,12 +1604,21 @@ struct GroupedTDigestImpl : public GroupedAggregator {
for (int64_t i = 0; i < added_groups; i++) {
tdigests_.emplace_back(options_.delta, options_.buffer_size);
}
+ RETURN_NOT_OK(counts_.Append(new_num_groups, 0));
+ RETURN_NOT_OK(no_nulls_.Append(new_num_groups, true));
return Status::OK();
}
Status Consume(const ExecBatch& batch) override {
- VisitGroupedValuesNonNull(
- batch, [&](uint32_t g, CType value) { tdigests_[g].NanAdd(value); });
+ int64_t* counts = counts_.mutable_data();
+ uint8_t* no_nulls = no_nulls_.mutable_data();
+ VisitGroupedValues(
+ batch,
+ [&](uint32_t g, CType value) {
+ tdigests_[g].NanAdd(value);
+ counts[g]++;
+ },
+ [&](uint32_t g) { BitUtil::SetBitTo(no_nulls, g, false); });
return Status::OK();
}
@@ -1615,15 +1626,26 @@ struct GroupedTDigestImpl : public GroupedAggregator {
const ArrayData& group_id_mapping) override {
auto other = checked_cast(&raw_other);
+ int64_t* counts = counts_.mutable_data();
+ uint8_t* no_nulls = no_nulls_.mutable_data();
+
+ const int64_t* other_counts = other->counts_.data();
+ const uint8_t* other_no_nulls = no_nulls_.mutable_data();
+
auto g = group_id_mapping.GetValues(1);
for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
tdigests_[*g].Merge(other->tdigests_[other_g]);
+ counts[*g] += other_counts[other_g];
+ BitUtil::SetBitTo(
+ no_nulls, *g,
+ BitUtil::GetBit(no_nulls, *g) && BitUtil::GetBit(other_no_nulls, other_g));
}
return Status::OK();
}
Result Finalize() override {
+ const int64_t* counts = counts_.data();
std::shared_ptr null_bitmap;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr values,
@@ -1633,7 +1655,7 @@ struct GroupedTDigestImpl : public GroupedAggregator {
double* results = reinterpret_cast(values->mutable_data());
for (int64_t i = 0; static_cast(i) < tdigests_.size(); ++i) {
- if (!tdigests_[i].is_empty()) {
+ if (!tdigests_[i].is_empty() && counts[i] >= options_.min_count) {
for (int64_t j = 0; j < slot_length; j++) {
results[i * slot_length + j] = tdigests_[i].Quantile(options_.q[j]);
}
@@ -1649,6 +1671,18 @@ struct GroupedTDigestImpl : public GroupedAggregator {
std::fill(&results[i * slot_length], &results[(i + 1) * slot_length], 0.0);
}
+ if (!options_.skip_nulls) {
+ null_count = kUnknownNullCount;
+ if (null_bitmap) {
+ arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0,
+ no_nulls_.data(), /*right_offset=*/0,
+ static_cast(tdigests_.size()),
+ /*out_offset=*/0, null_bitmap->mutable_data());
+ } else {
+ ARROW_ASSIGN_OR_RAISE(null_bitmap, no_nulls_.Finish());
+ }
+ }
+
auto child = ArrayData::Make(float64(), tdigests_.size() * options_.q.size(),
{nullptr, std::move(values)}, /*null_count=*/0);
return ArrayData::Make(out_type(), tdigests_.size(), {std::move(null_bitmap)},
@@ -1661,6 +1695,8 @@ struct GroupedTDigestImpl : public GroupedAggregator {
TDigestOptions options_;
std::vector tdigests_;
+ TypedBufferBuilder counts_;
+ TypedBufferBuilder no_nulls_;
ExecContext* ctx_;
MemoryPool* pool_;
};
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 32e8efa0ab8de..df13bd569ea24 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -1086,27 +1086,40 @@ TEST(GroupBy, VarianceAndStddev) {
TEST(GroupBy, TDigest) {
auto batch = RecordBatchFromJSON(
schema({field("argument", float64()), field("key", int64())}), R"([
- [1, 1],
- [null, 1],
- [0, 2],
- [null, 3],
- [4, null],
- [3, 1],
- [0, 2],
- [-1, 2],
- [1, null],
- [NaN, 3]
+ [1, 1],
+ [null, 1],
+ [0, 2],
+ [null, 3],
+ [1, 4],
+ [4, null],
+ [3, 1],
+ [0, 2],
+ [-1, 2],
+ [1, null],
+ [NaN, 3],
+ [1, 4],
+ [1, 4],
+ [null, 4]
])");
TDigestOptions options1(std::vector{0.5, 0.9, 0.99});
TDigestOptions options2(std::vector{0.5, 0.9, 0.99}, /*delta=*/50,
/*buffer_size=*/1024);
+ TDigestOptions keep_nulls(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
+ /*skip_nulls=*/false, /*min_count=*/0);
+ TDigestOptions min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
+ /*skip_nulls=*/true, /*min_count=*/3);
+ TDigestOptions keep_nulls_min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
+ /*skip_nulls=*/false, /*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy(
{
batch->GetColumnByName("argument"),
batch->GetColumnByName("argument"),
batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
+ batch->GetColumnByName("argument"),
},
{
batch->GetColumnByName("key"),
@@ -1115,6 +1128,9 @@ TEST(GroupBy, TDigest) {
{"hash_tdigest", nullptr},
{"hash_tdigest", &options1},
{"hash_tdigest", &options2},
+ {"hash_tdigest", &keep_nulls},
+ {"hash_tdigest", &min_count},
+ {"hash_tdigest", &keep_nulls_min_count},
}));
AssertDatumsApproxEqual(
@@ -1122,13 +1138,17 @@ TEST(GroupBy, TDigest) {
field("hash_tdigest", fixed_size_list(float64(), 1)),
field("hash_tdigest", fixed_size_list(float64(), 3)),
field("hash_tdigest", fixed_size_list(float64(), 3)),
+ field("hash_tdigest", fixed_size_list(float64(), 1)),
+ field("hash_tdigest", fixed_size_list(float64(), 1)),
+ field("hash_tdigest", fixed_size_list(float64(), 1)),
field("key_0", int64()),
}),
R"([
- [[1.0], [1.0, 3.0, 3.0], [1.0, 3.0, 3.0], 1],
- [[0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], 2],
- [null, null, null, 3],
- [[1.0], [1.0, 4.0, 4.0], [1.0, 4.0, 4.0], null]
+ [[1.0], [1.0, 3.0, 3.0], [1.0, 3.0, 3.0], null, null, null, 1],
+ [[0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0], [0.0], [0.0], 2],
+ [null, null, null, null, null, null, 3],
+ [[1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0], null, [1.0], null, 4],
+ [[1.0], [1.0, 4.0, 4.0], [1.0, 4.0, 4.0], [1.0], null, null, null]
])"),
aggregated_and_grouped,
/*verbose=*/true);
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index 39bb5315f7aec..29c579f85a985 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -924,13 +924,13 @@ class IndexOptions(_IndexOptions):
cdef class _ModeOptions(FunctionOptions):
- def _set_options(self, n):
- self.wrapped.reset(new CModeOptions(n))
+ def _set_options(self, n, skip_nulls, min_count):
+ self.wrapped.reset(new CModeOptions(n, skip_nulls, min_count))
class ModeOptions(_ModeOptions):
- def __init__(self, n=1):
- self._set_options(n)
+ def __init__(self, n=1, skip_nulls=True, min_count=0):
+ self._set_options(n, skip_nulls, min_count)
cdef class _SetLookupOptions(FunctionOptions):
@@ -1096,7 +1096,7 @@ class SortOptions(_SortOptions):
cdef class _QuantileOptions(FunctionOptions):
- def _set_options(self, quantiles, interp):
+ def _set_options(self, quantiles, interp, skip_nulls, min_count):
interp_dict = {
'linear': CQuantileInterp_LINEAR,
'lower': CQuantileInterp_LOWER,
@@ -1109,24 +1109,29 @@ cdef class _QuantileOptions(FunctionOptions):
'{!r} is not a valid interpolation'
.format(interp))
self.wrapped.reset(
- new CQuantileOptions(quantiles, interp_dict[interp]))
+ new CQuantileOptions(quantiles, interp_dict[interp],
+ skip_nulls, min_count))
class QuantileOptions(_QuantileOptions):
- def __init__(self, *, q=0.5, interpolation='linear'):
+ def __init__(self, *, q=0.5, interpolation='linear',
+ skip_nulls=True, min_count=0):
if not isinstance(q, (list, tuple, np.ndarray)):
q = [q]
- self._set_options(q, interpolation)
+ self._set_options(q, interpolation, skip_nulls, min_count)
cdef class _TDigestOptions(FunctionOptions):
- def _set_options(self, quantiles, delta, buffer_size):
+ def _set_options(self, quantiles, delta, buffer_size,
+ skip_nulls, min_count):
self.wrapped.reset(
- new CTDigestOptions(quantiles, delta, buffer_size))
+ new CTDigestOptions(quantiles, delta, buffer_size,
+ skip_nulls, min_count))
class TDigestOptions(_TDigestOptions):
- def __init__(self, *, q=0.5, delta=100, buffer_size=500):
+ def __init__(self, *, q=0.5, delta=100, buffer_size=500,
+ skip_nulls=True, min_count=0):
if not isinstance(q, (list, tuple, np.ndarray)):
q = [q]
- self._set_options(q, delta, buffer_size)
+ self._set_options(q, delta, buffer_size, skip_nulls, min_count)
diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py
index 6f8b9fa3dae43..4bc4034f64c7d 100644
--- a/python/pyarrow/compute.py
+++ b/python/pyarrow/compute.py
@@ -445,7 +445,7 @@ def match_substring_regex(array, pattern, *, ignore_case=False):
MatchSubstringOptions(pattern, ignore_case))
-def mode(array, n=1):
+def mode(array, n=1, skip_nulls=True, min_count=0):
"""
Return top-n most common values and number of times they occur in a passed
numerical (chunked) array, in descending order of occurance. If there are
@@ -454,6 +454,12 @@ def mode(array, n=1):
Parameters
----------
array : pyarrow.Array or pyarrow.ChunkedArray
+ skip_nulls : bool, default True
+ If True, ignore nulls in the input. Else return an empty array
+ if any input is null.
+ min_count : int, default 0
+ If there are fewer than this many values in the input, return
+ an empty array.
Returns
-------
@@ -470,7 +476,7 @@ def mode(array, n=1):
>>> modes[1]
"""
- options = ModeOptions(n=n)
+ options = ModeOptions(n=n, skip_nulls=skip_nulls, min_count=min_count)
return call_function("mode", [array], options)
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 4f9f4184b2d36..29351e0b648d7 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1993,8 +1993,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
cdef cppclass CModeOptions \
"arrow::compute::ModeOptions"(CFunctionOptions):
- CModeOptions(int64_t n)
+ CModeOptions(int64_t n, c_bool skip_nulls, uint32_t min_count)
int64_t n
+ c_bool skip_nulls
+ uint32_t min_count
cdef cppclass CIndexOptions \
"arrow::compute::IndexOptions"(CFunctionOptions):
@@ -2041,17 +2043,23 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
cdef cppclass CQuantileOptions \
"arrow::compute::QuantileOptions"(CFunctionOptions):
- CQuantileOptions(vector[double] q, CQuantileInterp interpolation)
+ CQuantileOptions(vector[double] q, CQuantileInterp interpolation,
+ c_bool skip_nulls, uint32_t min_count)
vector[double] q
CQuantileInterp interpolation
+ c_bool skip_nulls
+ uint32_t min_count
cdef cppclass CTDigestOptions \
"arrow::compute::TDigestOptions"(CFunctionOptions):
CTDigestOptions(vector[double] q,
- unsigned int delta, unsigned int buffer_size)
+ unsigned int delta, unsigned int buffer_size,
+ c_bool skip_nulls, uint32_t min_count)
vector[double] q
unsigned int delta
unsigned int buffer_size
+ c_bool skip_nulls
+ uint32_t min_count
enum DatumType" arrow::Datum::type":
DatumType_NONE" arrow::Datum::NONE"
diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py
index c6a106fbebdc4..bbef46f247710 100644
--- a/python/pyarrow/tests/test_compute.py
+++ b/python/pyarrow/tests/test_compute.py
@@ -306,6 +306,14 @@ def test_mode_array():
arr = pa.array([], type='int64')
assert len(pc.mode(arr)) == 0
+ arr = pa.array([1, 1, 3, 4, 3, None], type='int64')
+ mode = pc.mode(arr, skip_nulls=False)
+ assert len(mode) == 0
+ mode = pc.mode(arr, min_count=6)
+ assert len(mode) == 0
+ mode = pc.mode(arr, skip_nulls=False, min_count=5)
+ assert len(mode) == 0
+
def test_mode_chunked_array():
# ARROW-9917
@@ -650,7 +658,8 @@ def test_generated_signatures():
"options=None, skip_nulls=True, min_count=1)")
sig = inspect.signature(pc.quantile)
assert str(sig) == ("(array, *, memory_pool=None, "
- "options=None, q=0.5, interpolation='linear')")
+ "options=None, q=0.5, interpolation='linear', "
+ "skip_nulls=True, min_count=0)")
sig = inspect.signature(pc.binary_join_element_wise)
assert str(sig) == ("(*strings, memory_pool=None, options=None, "
"null_handling='emit_null', null_replacement='')")
diff --git a/r/src/compute.cpp b/r/src/compute.cpp
index e84f70016a5ef..7d17f111d7488 100644
--- a/r/src/compute.cpp
+++ b/r/src/compute.cpp
@@ -225,6 +225,12 @@ std::shared_ptr make_compute_options(
cpp11::as_cpp(
interpolation);
}
+ if (!Rf_isNull(options["na.min_count"])) {
+ out->min_count = cpp11::as_cpp(options["na.min_count"]);
+ }
+ if (!Rf_isNull(options["na.rm"])) {
+ out->skip_nulls = cpp11::as_cpp(options["na.rm"]);
+ }
return out;
}
@@ -376,8 +382,8 @@ std::shared_ptr make_compute_options(
using Options = arrow::compute::VarianceOptions;
auto out = std::make_shared();
out->ddof = cpp11::as_cpp(options["ddof"]);
- if (!Rf_isNull(options["na.min_count"])) {
- out->min_count = cpp11::as_cpp(options["na.min_count"]);
+ if (!Rf_isNull(options["min_count"])) {
+ out->min_count = cpp11::as_cpp(options["min_count"]);
}
if (!Rf_isNull(options["na.rm"])) {
out->skip_nulls = cpp11::as_cpp(options["na.rm"]);