Skip to content

Commit

Permalink
ARROW-13782: [C++] Add skip_nulls/min_count to tdigest/mode/quantile
Browse files Browse the repository at this point in the history
Closes #11061 from lidavidm/arrow-13782

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
lidavidm committed Sep 3, 2021
1 parent 1440d5a commit a45fc3f
Show file tree
Hide file tree
Showing 13 changed files with 462 additions and 91 deletions.
46 changes: 33 additions & 13 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,23 @@ static auto kScalarAggregateOptionsType = GetFunctionOptionsType<ScalarAggregate
DataMember("min_count", &ScalarAggregateOptions::min_count));
static auto kCountOptionsType =
GetFunctionOptionsType<CountOptions>(DataMember("mode", &CountOptions::mode));
static auto kModeOptionsType =
GetFunctionOptionsType<ModeOptions>(DataMember("n", &ModeOptions::n));
static auto kModeOptionsType = GetFunctionOptionsType<ModeOptions>(
DataMember("n", &ModeOptions::n), DataMember("skip_nulls", &ModeOptions::skip_nulls),
DataMember("min_count", &ModeOptions::min_count));
static auto kVarianceOptionsType = GetFunctionOptionsType<VarianceOptions>(
DataMember("ddof", &VarianceOptions::ddof),
DataMember("skip_nulls", &VarianceOptions::skip_nulls),
DataMember("min_count", &VarianceOptions::min_count));
static auto kQuantileOptionsType = GetFunctionOptionsType<QuantileOptions>(
DataMember("q", &QuantileOptions::q),
DataMember("interpolation", &QuantileOptions::interpolation));
DataMember("interpolation", &QuantileOptions::interpolation),
DataMember("skip_nulls", &QuantileOptions::skip_nulls),
DataMember("min_count", &QuantileOptions::min_count));
static auto kTDigestOptionsType = GetFunctionOptionsType<TDigestOptions>(
DataMember("q", &TDigestOptions::q), DataMember("delta", &TDigestOptions::delta),
DataMember("buffer_size", &TDigestOptions::buffer_size));
DataMember("buffer_size", &TDigestOptions::buffer_size),
DataMember("skip_nulls", &TDigestOptions::skip_nulls),
DataMember("min_count", &TDigestOptions::min_count));
static auto kIndexOptionsType =
GetFunctionOptionsType<IndexOptions>(DataMember("value", &IndexOptions::value));
} // namespace
Expand All @@ -112,7 +117,11 @@ CountOptions::CountOptions(CountMode mode)
: FunctionOptions(internal::kCountOptionsType), mode(mode) {}
constexpr char CountOptions::kTypeName[];

ModeOptions::ModeOptions(int64_t n) : FunctionOptions(internal::kModeOptionsType), n(n) {}
ModeOptions::ModeOptions(int64_t n, bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kModeOptionsType),
n{n},
skip_nulls{skip_nulls},
min_count{min_count} {}
constexpr char ModeOptions::kTypeName[];

VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count)
Expand All @@ -122,27 +131,38 @@ VarianceOptions::VarianceOptions(int ddof, bool skip_nulls, uint32_t min_count)
min_count(min_count) {}
constexpr char VarianceOptions::kTypeName[];

QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation)
QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation,
bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kQuantileOptionsType),
q{q},
interpolation{interpolation} {}
QuantileOptions::QuantileOptions(std::vector<double> q, enum Interpolation interpolation)
interpolation{interpolation},
skip_nulls{skip_nulls},
min_count{min_count} {}
QuantileOptions::QuantileOptions(std::vector<double> q, enum Interpolation interpolation,
bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kQuantileOptionsType),
q{std::move(q)},
interpolation{interpolation} {}
interpolation{interpolation},
skip_nulls{skip_nulls},
min_count{min_count} {}
constexpr char QuantileOptions::kTypeName[];

TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size)
TDigestOptions::TDigestOptions(double q, uint32_t delta, uint32_t buffer_size,
bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kTDigestOptionsType),
q{q},
delta{delta},
buffer_size{buffer_size} {}
buffer_size{buffer_size},
skip_nulls{skip_nulls},
min_count{min_count} {}
TDigestOptions::TDigestOptions(std::vector<double> q, uint32_t delta,
uint32_t buffer_size)
uint32_t buffer_size, bool skip_nulls, uint32_t min_count)
: FunctionOptions(internal::kTDigestOptionsType),
q{std::move(q)},
delta{delta},
buffer_size{buffer_size} {}
buffer_size{buffer_size},
skip_nulls{skip_nulls},
min_count{min_count} {}
constexpr char TDigestOptions::kTypeName[];

IndexOptions::IndexOptions(std::shared_ptr<Scalar> value)
Expand Down
29 changes: 24 additions & 5 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,16 @@ class ARROW_EXPORT CountOptions : public FunctionOptions {
/// By default, returns the most common value and count.
class ARROW_EXPORT ModeOptions : public FunctionOptions {
public:
explicit ModeOptions(int64_t n = 1);
explicit ModeOptions(int64_t n = 1, bool skip_nulls = true, uint32_t min_count = 0);
constexpr static char const kTypeName[] = "ModeOptions";
static ModeOptions Defaults() { return ModeOptions{}; }

int64_t n = 1;
/// If true (the default), null values are ignored. Otherwise, if any value is null,
/// emit null.
bool skip_nulls;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};

/// \brief Control Delta Degrees of Freedom (ddof) of Variance and Stddev kernel
Expand Down Expand Up @@ -121,17 +126,24 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
MIDPOINT,
};

explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR);
explicit QuantileOptions(double q = 0.5, enum Interpolation interpolation = LINEAR,
bool skip_nulls = true, uint32_t min_count = 0);

explicit QuantileOptions(std::vector<double> q,
enum Interpolation interpolation = LINEAR);
enum Interpolation interpolation = LINEAR,
bool skip_nulls = true, uint32_t min_count = 0);

constexpr static char const kTypeName[] = "QuantileOptions";
static QuantileOptions Defaults() { return QuantileOptions{}; }

/// quantile must be between 0 and 1 inclusive
std::vector<double> q;
enum Interpolation interpolation;
/// If true (the default), null values are ignored. Otherwise, if any value is null,
/// emit null.
bool skip_nulls;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};

/// \brief Control TDigest approximate quantile kernel behavior
Expand All @@ -140,9 +152,11 @@ class ARROW_EXPORT QuantileOptions : public FunctionOptions {
class ARROW_EXPORT TDigestOptions : public FunctionOptions {
public:
explicit TDigestOptions(double q = 0.5, uint32_t delta = 100,
uint32_t buffer_size = 500);
uint32_t buffer_size = 500, bool skip_nulls = true,
uint32_t min_count = 0);
explicit TDigestOptions(std::vector<double> q, uint32_t delta = 100,
uint32_t buffer_size = 500);
uint32_t buffer_size = 500, bool skip_nulls = true,
uint32_t min_count = 0);
constexpr static char const kTypeName[] = "TDigestOptions";
static TDigestOptions Defaults() { return TDigestOptions{}; }

Expand All @@ -152,6 +166,11 @@ class ARROW_EXPORT TDigestOptions : public FunctionOptions {
uint32_t delta;
/// input buffer size, default 500
uint32_t buffer_size;
/// If true (the default), null values are ignored. Otherwise, if any value is null,
/// emit null.
bool skip_nulls;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};

/// \brief Control Index kernel behavior
Expand Down
35 changes: 31 additions & 4 deletions cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ struct CountModer {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// count values in all chunks, ignore nulls
const Datum& datum = batch[0];

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && datum.null_count() > 0) ||
(datum.length() - datum.null_count() < options.min_count)) {
return PrepareOutput<T>(/*n=*/0, ctx, out).status();
}

CountValues<CType>(this->counts.data(), datum, this->min);

// generator to emit next value:count pair
Expand All @@ -154,9 +161,16 @@ struct CountModer {
template <>
struct CountModer<BooleanType> {
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && datum.null_count() > 0) ||
(datum.length() - datum.null_count() < options.min_count)) {
return PrepareOutput<BooleanType>(/*n=*/0, ctx, out).status();
}

int64_t counts[2]{};

const Datum& datum = batch[0];
for (const auto& array : datum.chunks()) {
if (array->length() > array->null_count()) {
const int64_t true_count =
Expand All @@ -167,7 +181,6 @@ struct CountModer<BooleanType> {
}
}

const ModeOptions& options = ModeState::Get(ctx);
const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
const int64_t n = std::min(options.n, distinct_values);

Expand Down Expand Up @@ -198,12 +211,19 @@ struct SortModer {
using Allocator = arrow::stl::allocator<CType>;

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && datum.null_count() > 0) ||
(in_length < options.min_count)) {
return PrepareOutput<T>(/*n=*/0, ctx, out).status();
}

// copy all chunks to a buffer, ignore nulls and nans
std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));

uint64_t nan_count = 0;
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
if (in_length > 0) {
in_buffer.resize(in_length);
CopyNonNullValues(datum, in_buffer.data());
Expand Down Expand Up @@ -305,6 +325,13 @@ struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
template <typename T>
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && !scalar.is_valid) ||
(static_cast<uint32_t>(scalar.is_valid) < options.min_count)) {
return PrepareOutput<T>(/*n=*/0, ctx, out).status();
}

if (scalar.is_valid) {
bool called = false;
return Finalize<T>(ctx, out, [&]() {
Expand Down
18 changes: 14 additions & 4 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,18 @@ struct SortQuantiler {

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const QuantileOptions& options = QuantileState::Get(ctx);
const Datum& datum = batch[0];

// copy all chunks to a buffer, ignore nulls and nans
std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
int64_t in_length = 0;
if ((!options.skip_nulls && datum.null_count() > 0) ||
(datum.length() - datum.null_count() < options.min_count)) {
in_length = 0;
} else {
in_length = datum.length() - datum.null_count();
}

const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
if (in_length > 0) {
in_buffer.resize(in_length);
CopyNonNullValues(datum, in_buffer.data());
Expand Down Expand Up @@ -232,7 +238,11 @@ struct CountQuantiler {

// count values in all chunks, ignore nulls
const Datum& datum = batch[0];
int64_t in_length = CountValues<CType>(this->counts.data(), datum, this->min);
int64_t in_length = 0;
if ((options.skip_nulls || (!options.skip_nulls && datum.null_count() == 0)) &&
(datum.length() - datum.null_count() >= options.min_count)) {
in_length = CountValues<CType>(this->counts.data(), datum, this->min);
}

// prepare out array
int64_t out_length = options.q.size();
Expand Down Expand Up @@ -394,7 +404,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
ArrayData* output = out->mutable_array();
if (!scalar.is_valid) {
if (!scalar.is_valid || options.min_count > 1) {
output->length = 0;
output->null_count = 0;
return Status::OK();
Expand Down
30 changes: 25 additions & 5 deletions cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,23 @@ struct TDigestImpl : public ScalarAggregator {
using CType = typename ArrowType::c_type;

explicit TDigestImpl(const TDigestOptions& options)
: q{options.q}, tdigest{options.delta, options.buffer_size} {}
: options{options},
tdigest{options.delta, options.buffer_size},
count{0},
all_valid{true} {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (!this->all_valid) return Status::OK();
if (!options.skip_nulls && batch[0].null_count() > 0) {
this->all_valid = false;
return Status::OK();
}
if (batch[0].is_array()) {
const ArrayData& data = *batch[0].array();
const CType* values = data.GetValues<CType>(1);

if (data.length > data.GetNullCount()) {
this->count += data.length - data.GetNullCount();
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
Expand All @@ -55,6 +64,7 @@ struct TDigestImpl : public ScalarAggregator {
} else {
const CType value = UnboxScalar<ArrowType>::Unbox(*batch[0].scalar());
if (batch[0].scalar()->is_valid) {
this->count += 1;
for (int64_t i = 0; i < batch.length; i++) {
this->tdigest.NanAdd(value);
}
Expand All @@ -64,13 +74,21 @@ struct TDigestImpl : public ScalarAggregator {
}

Status MergeFrom(KernelContext*, KernelState&& src) override {
auto& other = checked_cast<ThisType&>(src);
const auto& other = checked_cast<const ThisType&>(src);
if (!this->all_valid || !other.all_valid) {
this->all_valid = false;
return Status::OK();
}
this->tdigest.Merge(other.tdigest);
this->count += other.count;
return Status::OK();
}

Status Finalize(KernelContext* ctx, Datum* out) override {
const int64_t out_length = this->tdigest.is_empty() ? 0 : this->q.size();
const int64_t out_length =
(this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count)
? 0
: options.q.size();
auto out_data = ArrayData::Make(float64(), out_length, 0);
out_data->buffers.resize(2, nullptr);

Expand All @@ -79,16 +97,18 @@ struct TDigestImpl : public ScalarAggregator {
ctx->Allocate(out_length * sizeof(double)));
double* out_buffer = out_data->template GetMutableValues<double>(1);
for (int64_t i = 0; i < out_length; ++i) {
out_buffer[i] = this->tdigest.Quantile(this->q[i]);
out_buffer[i] = this->tdigest.Quantile(this->options.q[i]);
}
}

*out = Datum(std::move(out_data));
return Status::OK();
}

const std::vector<double> q;
const TDigestOptions options;
TDigest tdigest;
int64_t count;
bool all_valid;
};

struct TDigestInitState {
Expand Down
Loading

0 comments on commit a45fc3f

Please sign in to comment.