Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Compute: Why some ScalarAggregator handles scalar ExecSpan like single value, other like groups value? #43768

Closed
mapleFU opened this issue Aug 20, 2024 · 10 comments

Comments

@mapleFU
Copy link
Member

mapleFU commented Aug 20, 2024

Describe the enhancement requested

struct CountImpl : public ScalarAggregator {
  Status Consume(KernelContext*, const ExecSpan& batch) override {
    if (options.mode == CountOptions::ALL) {
      this->non_nulls += batch.length;
    } else if (batch[0].is_array()) {
      const ArraySpan& input = batch[0].array;
      const int64_t nulls = input.GetNullCount();
      this->nulls += nulls;
      this->non_nulls += input.length - nulls;
    } else {
      const Scalar& input = *batch[0].scalar;
      this->nulls += !input.is_valid * batch.length;
      this->non_nulls += input.is_valid * batch.length;
    }
    return Status::OK();
  }
};

CountImpl regards scalar as a batch.length groups of value, however, other kernels did different:

struct BooleanAnyImpl : public ScalarAggregator {
  explicit BooleanAnyImpl(ScalarAggregateOptions options) : options(std::move(options)) {}

  Status Consume(KernelContext*, const ExecSpan& batch) override {
    // short-circuit if seen a True already
    if (this->any == true && this->count >= options.min_count) {
      return Status::OK();
    }
    if (batch[0].is_scalar()) {
      const Scalar& scalar = *batch[0].scalar;
      this->has_nulls = !scalar.is_valid;
      this->any = scalar.is_valid && checked_cast<const BooleanScalar&>(scalar).value;
      this->count += scalar.is_valid;
      return Status::OK();
    }
    const ArraySpan& data = batch[0].array;
    this->has_nulls = data.GetNullCount() > 0;
    this->count += data.length - data.GetNullCount();
    arrow::internal::OptionalBinaryBitBlockCounter counter(
        data.buffers[0].data, data.offset, data.buffers[1].data, data.offset,
        data.length);
    int64_t position = 0;
    while (position < data.length) {
      const auto block = counter.NextAndBlock();
      if (block.popcount > 0) {
        this->any = true;
        break;
      }
      position += block.length;
    }
    return Status::OK();
  }

This takes it as a single value.

Is this designed? Did I miss something?

Component(s)

C++

@mapleFU
Copy link
Member Author

mapleFU commented Aug 20, 2024

@felipecrv @pitrou @zanmato1984 Would you mind take a check? I'm new in this module

@felipecrv
Copy link
Contributor

It's weird because COUNT is weird in SQL.

 SELECT a, COUNT(5) FROM test1 GROUP BY a;

will return the value of column a for each group and COUNT(<any literal here>) will count the size of the group.

But

SELECT a, any(TRUE) FROM test1 GROUP BY a;

will always produce TRUE and never bother looking at the values in the grouped values from the table.

I think it would be worth having a comment in the CountImpl branch that handles scalars by ignoring the scalar value.

@mapleFU
Copy link
Member Author

mapleFU commented Aug 21, 2024

@felipecrv I repect this founding, however, currently we also have a: this->count += data.length - data.GetNullCount(); in bool any, which counts the element counts hits the prediction. The count would be wrong when regard scalar as single value?

What I expected ( maybe I'm totally wrong!) :

diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 1fbcd6a24..d8a19277b 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -532,13 +532,13 @@ struct BooleanAnyImpl : public ScalarAggregator {
     }
     if (batch[0].is_scalar()) {
       const Scalar& scalar = *batch[0].scalar;
-      this->has_nulls = !scalar.is_valid;
-      this->any = scalar.is_valid && checked_cast<const BooleanScalar&>(scalar).value;
-      this->count += scalar.is_valid;
+      this->has_nulls |= !scalar.is_valid;
+      this->any |= scalar.is_valid && checked_cast<const BooleanScalar&>(scalar).value;
+      this->count += scalar.is_valid * batch.length;
       return Status::OK();
     }
     const ArraySpan& data = batch[0].array;
-    this->has_nulls = data.GetNullCount() > 0;
+    this->has_nulls |= data.GetNullCount() > 0;
     this->count += data.length - data.GetNullCount();
     arrow::internal::OptionalBinaryBitBlockCounter counter(
         data.buffers[0].data, data.offset, data.buffers[1].data, data.offset,
@@ -603,9 +603,11 @@ struct BooleanAllImpl : public ScalarAggregator {
     }
     if (batch[0].is_scalar()) {
       const Scalar& scalar = *batch[0].scalar;
-      this->has_nulls = !scalar.is_valid;
-      this->count += scalar.is_valid;
-      this->all = !scalar.is_valid || checked_cast<const BooleanScalar&>(scalar).value;
+      this->has_nulls |= !scalar.is_valid;
+      this->count += scalar.is_valid * batch.length;
+      if(!scalar.is_valid || checked_cast<const BooleanScalar&>(scalar).value) {
+        this->all = false;
+      }
       return Status::OK();
     }
     const ArraySpan& data = batch[0].array;

@zanmato1984
Copy link
Contributor

I guess your major concern is that counting a valid scalar only once (i.e., this->count += scalar.is_valid), as opposed to the actual batch length times (i.e., this->count += scalar.is_valid * batch.length as you proposed), would make the count less and emit wrong result given how this count is used:

class ARROW_EXPORT ScalarAggregateOptions : public FunctionOptions {
  ...
  /// If less than this many non-null values are observed, emit null.
  uint32_t min_count;
};
    if ((!options.skip_nulls && !this->any && this->has_nulls) ||
        this->count < options.min_count) {
      out->value = std::make_shared<BooleanScalar>();
    }

For example, assume a batch contains 3 rows and the min_count is 2. According to the current logic, any(false) would have count 1 (as opposed to the batch length 3) and emit null (count == 1 < min_count == 2). I think this is wrong.

@felipecrv What do you think? Thanks.

@mapleFU
Copy link
Member Author

mapleFU commented Aug 21, 2024

template <typename ArgType>
struct IndexImpl : public ScalarAggregator {
  using ArgValue = typename internal::GetViewType<ArgType>::T;
  Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
    // short-circuit
    if (index >= 0 || !options.value->is_valid) {
      return Status::OK();
    }

    const ArgValue desired = internal::UnboxScalar<ArgType>::Unbox(*options.value);

    if (batch[0].is_scalar()) {
      seen = batch.length;
      if (batch[0].scalar->is_valid) {
        const ArgValue v = internal::UnboxScalar<ArgType>::Unbox(*batch[0].scalar);
        if (v == desired) {
          index = 0;
          return Status::Cancelled("Found");
        }
      }
      return Status::OK();
    }

    const ArraySpan& input = batch[0].array;
    seen = input.length;
    int64_t i = 0;

    ARROW_UNUSED(internal::VisitArrayValuesInline<ArgType>(
        input,
        [&](ArgValue v) -> Status {
          if (v == desired) {
            index = i;
            return Status::Cancelled("Found");
          } else {
            ++i;
            return Status::OK();
          }
        },
        [&]() -> Status {
          ++i;
          return Status::OK();
        }));

    return Status::OK();
  }

Besides, "index" kernel even set seen to input array, like it will only handle input once..

Edit: oh i understand this, seems this is ok

@mapleFU
Copy link
Member Author

mapleFU commented Aug 21, 2024

Edit: I've try to write tests here and found it's actually bug-free:

  1. ExecBatch will regarded as size == 1 when all input is scalar ( has constant )
  2. So, the size is always 1, this is also handled by PromoteExecSpanScalars

So, actually it's always 1 here

@zanmato1984
Copy link
Contributor

Edit: I've try to write tests here and found it's actually bug-free:

  1. ExecBatch will regarded as size == 1 when all input is scalar ( has constant )
  2. So, the size is always 1, this is also handled by PromoteExecSpanScalars

So, actually it's always 1 here

You are right in the context when solely compute kernels are involved. In this case, you can assume that when the argument of any is scalar, then the batch length must be 1.

However this might not be the case in a more complex context, e.g. acero. Here is a concrete test that reproduces the expected bug (explained at last):

TEST(ScalarAggregate, BuggyAny) {
  std::shared_ptr<Schema> in_schema = schema({field("not_used", int32())});
  std::vector<ExecBatch> batches{
      ExecBatchFromJSON({int32()}, "[[42], [42], [42], [42]]")};

  std::vector<Aggregate> aggregates = {
      Aggregate("any",
                std::make_shared<compute::ScalarAggregateOptions>(/*skip_nulls=*/false,
                                                                  /*min_count=*/2),
                FieldRef("literal_true"))};

  Declaration plan = Declaration::Sequence(
      {{"exec_batch_source", ExecBatchSourceNodeOptions(in_schema, std::move(batches))},
       {"project", ProjectNodeOptions({literal(true)}, {"literal_true"})},
       {"aggregate", AggregateNodeOptions(aggregates)}});

  ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches,
                       DeclarationToExecBatches(plan));

  std::cout << out_batches.batches[0].values[0].ToString() << std::endl;
}

Output:

Scalar(null)

Explain: One source node with 1 batch of 4 rows (contents don't matter), followed by a projection node which outputs literal true only (also 4 rows). The tricky part is what this projection node emits: a batch of logically 4 rows but of a single scalar column. When this batch is eventually ingested into the subsequent aggregation node, which calls any on this scalar column with min_count being 2, boom.

@mapleFU
Copy link
Member Author

mapleFU commented Aug 22, 2024

Aha boom! I'll recheck this

So this is due to different use case in arrow::compute ?

@mapleFU
Copy link
Member Author

mapleFU commented Aug 23, 2024

So this is because Acero call consume api directly in ExecBatch:

Status ScalarAggregateNode::DoConsume(const ExecSpan& batch, size_t thread_index) {
  for (size_t i = 0; i < kernels_.size(); ++i) {
    arrow::util::tracing::Span span;
    START_COMPUTE_SPAN(span, aggs_[i].function,
                       {{"function.name", aggs_[i].function},
                        {"function.options",
                         aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
                        {"function.kind", std::string(kind_name()) + "::Consume"}});
    KernelContext batch_ctx{plan()->query_context()->exec_context()};
    DCHECK_LT(thread_index, states_[i].size());
    batch_ctx.SetState(states_[i][thread_index].get());

    std::vector<ExecValue> column_values;
    for (const int field : target_fieldsets_[i]) {
      column_values.push_back(batch.values[field]);
    }
    ExecSpan column_batch{std::move(column_values), batch.length};
    RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, column_batch));
  }
  return Status::OK();
}

I've got this, thanks!

mapleFU added a commit that referenced this issue Sep 2, 2024
…put with length in Acero (#43799)

### Rationale for this change

See #43768

### What changes are included in this PR?

Fix the case when boolean_{any|all} meets constant input with length in Acero

### Are these changes tested?

Yes

### Are there any user-facing changes?

no

* GitHub Issue: #43768

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: mwish <1506118561@qq.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
@mapleFU mapleFU added this to the 18.0.0 milestone Sep 2, 2024
@mapleFU
Copy link
Member Author

mapleFU commented Sep 2, 2024

Issue resolved by pull request 43799
#43799

@mapleFU mapleFU closed this as completed Sep 2, 2024
mapleFU added a commit to mapleFU/arrow that referenced this issue Sep 3, 2024
…ant input with length in Acero (apache#43799)

### Rationale for this change

See apache#43768

### What changes are included in this PR?

Fix the case when boolean_{any|all} meets constant input with length in Acero

### Are these changes tested?

Yes

### Are there any user-facing changes?

no

* GitHub Issue: apache#43768

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: mwish <1506118561@qq.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
zanmato1984 added a commit to zanmato1984/arrow that referenced this issue Sep 6, 2024
…ant input with length in Acero (apache#43799)

### Rationale for this change

See apache#43768

### What changes are included in this PR?

Fix the case when boolean_{any|all} meets constant input with length in Acero

### Are these changes tested?

Yes

### Are there any user-facing changes?

no

* GitHub Issue: apache#43768

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: mwish <1506118561@qq.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
khwilson pushed a commit to khwilson/arrow that referenced this issue Sep 14, 2024
…ant input with length in Acero (apache#43799)

### Rationale for this change

See apache#43768

### What changes are included in this PR?

Fix the case when boolean_{any|all} meets constant input with length in Acero

### Are these changes tested?

Yes

### Are there any user-facing changes?

no

* GitHub Issue: apache#43768

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: mwish <1506118561@qq.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants