Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-13782: [C++] Add skip_nulls/min_count to tdigest/mode/quantile #11061

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,23 @@ static auto kScalarAggregateOptionsType = GetFunctionOptionsType<ScalarAggregate
DataMember("min_count", &ScalarAggregateOptions::min_count));
static auto kCountOptionsType =
GetFunctionOptionsType<CountOptions>(DataMember("mode", &CountOptions::mode));
static auto kModeOptionsType =
GetFunctionOptionsType<ModeOptions>(DataMember("n", &ModeOptions::n));
static auto kModeOptionsType = GetFunctionOptionsType<ModeOptions>(
DataMember("n", &ModeOptions::n), DataMember("skip_nulls", &ModeOptions::skip_nulls),
DataMember("min_count", &ModeOptions::min_count));
static auto kVarianceOptionsType = GetFunctionOptionsType<VarianceOptions>(
DataMember("ddof", &VarianceOptions::ddof),
DataMember("skip_nulls", &VarianceOptions::skip_nulls),
DataMember("min_count", &VarianceOptions::min_count));
static auto kQuantileOptionsType = GetFunctionOptionsType<QuantileOptions>(
DataMember("q", &QuantileOptions::q),
DataMember("interpolation", &QuantileOptions::interpolation));
DataMember("interpolation", &QuantileOptions::interpolation),
DataMember("skip_nulls", &QuantileOptions::skip_nulls),
DataMember("min_count", &QuantileOptions::min_count));
static auto kTDigestOptionsType = GetFunctionOptionsType<TDigestOptions>(
DataMember("q", &TDigestOptions::q), DataMember("delta", &TDigestOptions::delta),
DataMember("buffer_size", &TDigestOptions::buffer_size));
DataMember("buffer_size", &TDigestOptions::buffer_size),
DataMember("skip_nulls", &TDigestOptions::skip_nulls),
DataMember("min_count", &TDigestOptions::min_count));
static auto kIndexOptionsType =
GetFunctionOptionsType<IndexOptions>(DataMember("value", &IndexOptions::value));
} // namespace
Expand All @@ -112,7 +117,11 @@ CountOptions::CountOptions(CountMode mode)
: FunctionOptions(internal::kCountOptionsType), mode(mode) {}
constexpr char CountOptions::kTypeName[];

ModeOptions::ModeOptions(int64_t n) : FunctionOptions(internal::kModeOptionsType), n(n) {}
ModeOptions::ModeOptions(int64_t n, bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kModeOptionsType),
n{n},
skip_nulls{skip_nulls},
min_count{min_count} {}
constexpr char ModeOptions::kTypeName[];

VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count)
Expand All @@ -122,27 +131,38 @@ VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count)
min_count(min_count) {}
constexpr char VarianceOptions::kTypeName[];

QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation)
QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation,
bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kQuantileOptionsType),
q{q},
interpolation{interpolation} {}
QuantileOptions::QuantileOptions(std::vector<double> q, enum Interpolation interpolation)
interpolation{interpolation},
skip_nulls{skip_nulls},
min_count{min_count} {}
QuantileOptions::QuantileOptions(std::vector<double> q, enum Interpolation interpolation,
bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kQuantileOptionsType),
q{std::move(q)},
interpolation{interpolation} {}
interpolation{interpolation},
skip_nulls{skip_nulls},
min_count{min_count} {}
constexpr char QuantileOptions::kTypeName[];

TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size)
TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size,
bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kTDigestOptionsType),
q{q},
delta{delta},
buffer_size{buffer_size} {}
buffer_size{buffer_size},
skip_nulls{skip_nulls},
min_count{min_count} {}
TDigestOptions::TDigestOptions(std::vector<double> q, uint32_t delta,
uint32_t buffer_size)
uint32_t buffer_size, bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kTDigestOptionsType),
q{std::move(q)},
delta{delta},
buffer_size{buffer_size} {}
buffer_size{buffer_size},
skip_nulls{skip_nulls},
min_count{min_count} {}
constexpr char TDigestOptions::kTypeName[];

IndexOptions::IndexOptions(std::shared_ptr<Scalar> value)
Expand Down
29 changes: 24 additions & 5 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,16 @@ class ARROW_EXPORT CountOptions : public FunctionOptions {
/// By default, returns the most common value and count.
class ARROW_EXPORT ModeOptions : public FunctionOptions {
public:
explicit ModeOptions(int64_t n = 1);
explicit ModeOptions(int64_t n = 1, bool skip_nulls = true, uint32_t min_count = 0);
constexpr static char const kTypeName[] = "ModeOptions";
static ModeOptions Defaults() { return ModeOptions{}; }

int64_t n = 1;
/// If true (the default), null values are ignored. Otherwise, if any value is null,
/// emit null.
bool skip_nulls;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};

/// \brief Control Delta Degrees of Freedom (ddof) of Variance and Stddev kernel
Expand Down Expand Up @@ -121,17 +126,24 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
MIDPOINT,
};

explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR);
explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR,
bool skip_nulls = true, uint32_t min_count = 0);

explicit QuantileOptions(std::vector<double> q,
enum Interpolation interpolation = LINEAR);
enum Interpolation interpolation = LINEAR,
bool skip_nulls = true, uint32_t min_count = 0);

constexpr static char const kTypeName[] = "QuantileOptions";
static QuantileOptions Defaults() { return QuantileOptions{}; }

/// quantile must be between 0 and 1 inclusive
std::vector<double> q;
enum Interpolation interpolation;
/// If true (the default), null values are ignored. Otherwise, if any value is null,
/// emit null.
bool skip_nulls;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};

/// \brief Control TDigest approximate quantile kernel behavior
Expand All @@ -140,9 +152,11 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
class ARROW_EXPORT TDigestOptions : public FunctionOptions {
public:
explicit TDigestOptions(double q = 0.5, uint32_t delta = 100,
uint32_t buffer_size = 500);
uint32_t buffer_size = 500, bool skip_nulls = true,
uint32_t min_count = 0);
explicit TDigestOptions(std::vector<double> q, uint32_t delta = 100,
uint32_t buffer_size = 500);
uint32_t buffer_size = 500, bool skip_nulls = true,
uint32_t min_count = 0);
constexpr static char const kTypeName[] = "TDigestOptions";
static TDigestOptions Defaults() { return TDigestOptions{}; }

Expand All @@ -152,6 +166,11 @@ class ARROW_EXPORT TDigestOptions : public FunctionOptions {
uint32_t delta;
/// input buffer size, default 500
uint32_t buffer_size;
/// If true (the default), null values are ignored. Otherwise, if any value is null,
/// emit null.
bool skip_nulls;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};

/// \brief Control Index kernel behavior
Expand Down
35 changes: 31 additions & 4 deletions cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ struct CountModer {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// count values in all chunks, ignore nulls
const Datum& datum = batch[0];

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && datum.null_count() > 0) ||
(datum.length() - datum.null_count() < options.min_count)) {
return PrepareOutput<T>(/*n=*/0, ctx, out).status();
}

CountValues<CType>(this->counts.data(), datum, this->min);

// generator to emit next value:count pair
Expand All @@ -154,9 +161,16 @@ struct CountModer {
template <>
struct CountModer<BooleanType> {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && datum.null_count() > 0) ||
(datum.length() - datum.null_count() < options.min_count)) {
return PrepareOutput<BooleanType>(/*n=*/0, ctx, out).status();
}

int64_t counts[2]{};

const Datum& datum = batch[0];
for (const auto& array : datum.chunks()) {
if (array->length() > array->null_count()) {
const int64_t true_count =
Expand All @@ -167,7 +181,6 @@ struct CountModer<BooleanType> {
}
}

const ModeOptions& options = ModeState::Get(ctx);
const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
const int64_t n = std::min(options.n, distinct_values);

Expand Down Expand Up @@ -198,12 +211,19 @@ struct SortModer {
using Allocator = arrow::stl::allocator<CType>;

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && datum.null_count() > 0) ||
(in_length < options.min_count)) {
return PrepareOutput<T>(/*n=*/0, ctx, out).status();
}

// copy all chunks to a buffer, ignore nulls and nans
std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));

uint64_t nan_count = 0;
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
if (in_length > 0) {
in_buffer.resize(in_length);
CopyNonNullValues(datum, in_buffer.data());
Expand Down Expand Up @@ -305,6 +325,13 @@ struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
template <typename T>
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && !scalar.is_valid) ||
(static_cast<uint32_t>(scalar.is_valid) < options.min_count)) {
return PrepareOutput<T>(/*n=*/0, ctx, out).status();
}

if (scalar.is_valid) {
bool called = false;
return Finalize<T>(ctx, out, [&]() {
Expand Down
18 changes: 14 additions & 4 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,18 @@ struct SortQuantiler {

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const QuantileOptions& options = QuantileState::Get(ctx);
const Datum& datum = batch[0];

// copy all chunks to a buffer, ignore nulls and nans
std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
int64_t in_length = 0;
if ((!options.skip_nulls && datum.null_count() > 0) ||
(datum.length() - datum.null_count() < options.min_count)) {
in_length = 0;
} else {
in_length = datum.length() - datum.null_count();
}

const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
if (in_length > 0) {
in_buffer.resize(in_length);
CopyNonNullValues(datum, in_buffer.data());
Expand Down Expand Up @@ -232,7 +238,11 @@ struct CountQuantiler {

// count values in all chunks, ignore nulls
const Datum& datum = batch[0];
int64_t in_length = CountValues<CType>(this->counts.data(), datum, this->min);
int64_t in_length = 0;
if ((options.skip_nulls || (!options.skip_nulls && datum.null_count() == 0)) &&
(datum.length() - datum.null_count() >= options.min_count)) {
in_length = CountValues<CType>(this->counts.data(), datum, this->min);
}

// prepare out array
int64_t out_length = options.q.size();
Expand Down Expand Up @@ -394,7 +404,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
ArrayData* output = out->mutable_array();
if (!scalar.is_valid) {
if (!scalar.is_valid || options.min_count > 1) {
output->length = 0;
output->null_count = 0;
return Status::OK();
Expand Down
30 changes: 25 additions & 5 deletions cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,23 @@ struct TDigestImpl : public ScalarAggregator {
using CType = typename ArrowType::c_type;

explicit TDigestImpl(const TDigestOptions& options)
: q{options.q}, tdigest{options.delta, options.buffer_size} {}
: options{options},
tdigest{options.delta, options.buffer_size},
count{0},
all_valid{true} {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (!this->all_valid) return Status::OK();
if (!options.skip_nulls && batch[0].null_count() > 0) {
this->all_valid = false;
return Status::OK();
}
if (batch[0].is_array()) {
const ArrayData& data = *batch[0].array();
const CType* values = data.GetValues<CType>(1);

if (data.length > data.GetNullCount()) {
this->count += data.length - data.GetNullCount();
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
Expand All @@ -55,6 +64,7 @@ struct TDigestImpl : public ScalarAggregator {
} else {
const CType value = UnboxScalar<ArrowType>::Unbox(*batch[0].scalar());
if (batch[0].scalar()->is_valid) {
this->count += 1;
for (int64_t i = 0; i < batch.length; i++) {
this->tdigest.NanAdd(value);
}
Expand All @@ -64,13 +74,21 @@ struct TDigestImpl : public ScalarAggregator {
}

Status MergeFrom(KernelContext*, KernelState&& src) override {
auto& other = checked_cast<ThisType&>(src);
const auto& other = checked_cast<const ThisType&>(src);
if (!this->all_valid || !other.all_valid) {
this->all_valid = false;
return Status::OK();
}
this->tdigest.Merge(other.tdigest);
this->count += other.count;
return Status::OK();
}

Status Finalize(KernelContext* ctx, Datum* out) override {
const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size();
const int64_t out_length =
(this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count)
? 0
: options.q.size();
auto out_data = ArrayData::Make(float64(), out_length, 0);
out_data->buffers.resize(2, nullptr);

Expand All @@ -79,16 +97,18 @@ struct TDigestImpl : public ScalarAggregator {
ctx->Allocate(out_length * sizeof(double)));
double* out_buffer = out_data->template GetMutableValues<double>(1);
for (int64_t i = 0; i < out_length; ++i) {
out_buffer[i] = this->tdigest.Quantile(this->q[i]);
out_buffer[i] = this->tdigest.Quantile(this->options.q[i]);
}
}

*out = Datum(std::move(out_data));
return Status::OK();
}

const std::vector<double> q;
const TDigestOptions options;
TDigest tdigest;
int64_t count;
bool all_valid;
};

struct TDigestInitState {
Expand Down
Loading