-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C++] Compute: Why some ScalarAggregator handles scalar ExecSpan like single value, other like groups value? #43768
Comments
@felipecrv @pitrou @zanmato1984 Would you mind take a check? I'm new in this module |
It's weird because SELECT a, COUNT(5) FROM test1 GROUP BY a; will return the value of column But SELECT a, any(TRUE) FROM test1 GROUP BY a; will always produce I think it would be worth having a comment in the |
@felipecrv I repect this founding, however, currently we also have a: What I expected ( maybe I'm totally wrong!) : diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 1fbcd6a24..d8a19277b 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -532,13 +532,13 @@ struct BooleanAnyImpl : public ScalarAggregator {
}
if (batch[0].is_scalar()) {
const Scalar& scalar = *batch[0].scalar;
- this->has_nulls = !scalar.is_valid;
- this->any = scalar.is_valid && checked_cast<const BooleanScalar&>(scalar).value;
- this->count += scalar.is_valid;
+ this->has_nulls |= !scalar.is_valid;
+ this->any |= scalar.is_valid && checked_cast<const BooleanScalar&>(scalar).value;
+ this->count += scalar.is_valid * batch.length;
return Status::OK();
}
const ArraySpan& data = batch[0].array;
- this->has_nulls = data.GetNullCount() > 0;
+ this->has_nulls |= data.GetNullCount() > 0;
this->count += data.length - data.GetNullCount();
arrow::internal::OptionalBinaryBitBlockCounter counter(
data.buffers[0].data, data.offset, data.buffers[1].data, data.offset,
@@ -603,9 +603,11 @@ struct BooleanAllImpl : public ScalarAggregator {
}
if (batch[0].is_scalar()) {
const Scalar& scalar = *batch[0].scalar;
- this->has_nulls = !scalar.is_valid;
- this->count += scalar.is_valid;
- this->all = !scalar.is_valid || checked_cast<const BooleanScalar&>(scalar).value;
+ this->has_nulls |= !scalar.is_valid;
+ this->count += scalar.is_valid * batch.length;
+ if(!scalar.is_valid || checked_cast<const BooleanScalar&>(scalar).value) {
+ this->all = false;
+ }
return Status::OK();
}
const ArraySpan& data = batch[0].array; |
I guess your major concern is that counting a valid scalar only once (i.e.,
For example, assume a batch contains 3 rows and the @felipecrv What do you think? Thanks. |
template <typename ArgType>
struct IndexImpl : public ScalarAggregator {
using ArgValue = typename internal::GetViewType<ArgType>::T;
Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
// short-circuit
if (index >= 0 || !options.value->is_valid) {
return Status::OK();
}
const ArgValue desired = internal::UnboxScalar<ArgType>::Unbox(*options.value);
if (batch[0].is_scalar()) {
seen = batch.length;
if (batch[0].scalar->is_valid) {
const ArgValue v = internal::UnboxScalar<ArgType>::Unbox(*batch[0].scalar);
if (v == desired) {
index = 0;
return Status::Cancelled("Found");
}
}
return Status::OK();
}
const ArraySpan& input = batch[0].array;
seen = input.length;
int64_t i = 0;
ARROW_UNUSED(internal::VisitArrayValuesInline<ArgType>(
input,
[&](ArgValue v) -> Status {
if (v == desired) {
index = i;
return Status::Cancelled("Found");
} else {
++i;
return Status::OK();
}
},
[&]() -> Status {
++i;
return Status::OK();
}));
return Status::OK();
} Besides, "index" kernel even set Edit: oh i understand this, seems this is ok |
Edit: I've try to write tests here and found it's actually bug-free:
So, actually it's always 1 here |
You are right in the context when solely compute kernels are involved. In this case, you can assume that when the argument of However this might not be the case in a more complex context, e.g. acero. Here is a concrete test that reproduces the expected bug (explained at last): TEST(ScalarAggregate, BuggyAny) {
std::shared_ptr<Schema> in_schema = schema({field("not_used", int32())});
std::vector<ExecBatch> batches{
ExecBatchFromJSON({int32()}, "[[42], [42], [42], [42]]")};
std::vector<Aggregate> aggregates = {
Aggregate("any",
std::make_shared<compute::ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/2),
FieldRef("literal_true"))};
Declaration plan = Declaration::Sequence(
{{"exec_batch_source", ExecBatchSourceNodeOptions(in_schema, std::move(batches))},
{"project", ProjectNodeOptions({literal(true)}, {"literal_true"})},
{"aggregate", AggregateNodeOptions(aggregates)}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches,
DeclarationToExecBatches(plan));
std::cout << out_batches.batches[0].values[0].ToString() << std::endl;
} Output:
Explain: One source node with 1 batch of 4 rows (contents don't matter), followed by a projection node which outputs literal |
Aha boom! I'll recheck this So this is due to different use case in arrow::compute ? |
So this is because Acero call Status ScalarAggregateNode::DoConsume(const ExecSpan& batch, size_t thread_index) {
for (size_t i = 0; i < kernels_.size(); ++i) {
arrow::util::tracing::Span span;
START_COMPUTE_SPAN(span, aggs_[i].function,
{{"function.name", aggs_[i].function},
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext batch_ctx{plan()->query_context()->exec_context()};
DCHECK_LT(thread_index, states_[i].size());
batch_ctx.SetState(states_[i][thread_index].get());
std::vector<ExecValue> column_values;
for (const int field : target_fieldsets_[i]) {
column_values.push_back(batch.values[field]);
}
ExecSpan column_batch{std::move(column_values), batch.length};
RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, column_batch));
}
return Status::OK();
} I've got this, thanks! |
…put with length in Acero (#43799) ### Rationale for this change See #43768 ### What changes are included in this PR? Fix the case when boolean_{any|all} meets constant input with length in Acero ### Are these changes tested? Yes ### Are there any user-facing changes? no * GitHub Issue: #43768 Lead-authored-by: mwish <maplewish117@gmail.com> Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Rossi Sun <zanmato1984@gmail.com> Signed-off-by: mwish <maplewish117@gmail.com>
Issue resolved by pull request 43799 |
…ant input with length in Acero (apache#43799) ### Rationale for this change See apache#43768 ### What changes are included in this PR? Fix the case when boolean_{any|all} meets constant input with length in Acero ### Are these changes tested? Yes ### Are there any user-facing changes? no * GitHub Issue: apache#43768 Lead-authored-by: mwish <maplewish117@gmail.com> Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Rossi Sun <zanmato1984@gmail.com> Signed-off-by: mwish <maplewish117@gmail.com>
…ant input with length in Acero (apache#43799) ### Rationale for this change See apache#43768 ### What changes are included in this PR? Fix the case when boolean_{any|all} meets constant input with length in Acero ### Are these changes tested? Yes ### Are there any user-facing changes? no * GitHub Issue: apache#43768 Lead-authored-by: mwish <maplewish117@gmail.com> Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Rossi Sun <zanmato1984@gmail.com> Signed-off-by: mwish <maplewish117@gmail.com>
…ant input with length in Acero (apache#43799) ### Rationale for this change See apache#43768 ### What changes are included in this PR? Fix the case when boolean_{any|all} meets constant input with length in Acero ### Are these changes tested? Yes ### Are there any user-facing changes? no * GitHub Issue: apache#43768 Lead-authored-by: mwish <maplewish117@gmail.com> Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Rossi Sun <zanmato1984@gmail.com> Signed-off-by: mwish <maplewish117@gmail.com>
Describe the enhancement requested
CountImpl
regardsscalar
as abatch.length
groups of value, however, other kernels did different:This takes it as a single value.
Is this designed? Did I miss something?
Component(s)
C++
The text was updated successfully, but these errors were encountered: