Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AggregateNode::isPreGrouped() API #7763

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
void AggregationNode::addDetails(std::stringstream& stream) const {
stream << stepName(step_) << " ";

if (isPreGrouped()) {
stream << "STREAMING ";
}

if (!groupingKeys_.empty()) {
stream << "[";
addFields(stream, groupingKeys_);
Expand Down
13 changes: 13 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,19 @@ class AggregationNode : public PlanNode {
return preGroupedKeys_;
}

bool isPreGrouped() const {
return !preGroupedKeys_.empty() &&
std::equal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not semantically equal to aggregationNode->preGroupedKeys().size() == aggregationNode->groupingKeys().size(), for example if they have different orders, previous returns true but this one returns false. Not sure if this can happen. cc @ulysses-you @mbasmanova

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waitinfuture This is a good point. preGroupedKeys should be a subset of groupingKeys, but there is not requirements for the order of keys in preGroupedKeys to match the order of keys in groupingKeys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it depends on how to develop with this interface. If the developer knows the input data are pre-grouped, then just set all grouping keys as preGroupedKeys (that's what Gluten did). Before, it is a bit flaky by checking grouping key size, e.g., developer may set atrribute which is not a grouping key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the safe way is to check every key in preGroupedKeys_ is in groupingKeys_, but not necessarily with the same position.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, a better way is to require that preGroupedKeys is a subset of groupingKeys and keys appear in the same order. We can add a check to the constructor to enforce that.

preGroupedKeys_.begin(),
preGroupedKeys_.end(),
groupingKeys_.begin(),
groupingKeys_.end(),
[](const FieldAccessTypedExprPtr& x,
const FieldAccessTypedExprPtr& y) -> bool {
return (*x == *y);
});
}

const std::vector<std::string>& aggregateNames() const {
return aggregateNames_;
}
Expand Down
4 changes: 1 addition & 3 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,7 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
} else if (
auto aggregationNode =
std::dynamic_pointer_cast<const core::AggregationNode>(planNode)) {
if (!aggregationNode->preGroupedKeys().empty() &&
aggregationNode->preGroupedKeys().size() ==
aggregationNode->groupingKeys().size()) {
if (aggregationNode->isPreGrouped()) {
operators.push_back(std::make_unique<StreamingAggregation>(
id, ctx.get(), aggregationNode));
} else {
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/tests/PlanNodeToStringTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ TEST_F(PlanNodeToStringTest, aggregation) {
ASSERT_EQ(
"-- Aggregation[SINGLE [c0, group_id] sum_c1 := sum(ROW[\"c1\"]) global group IDs: [ 1, 2 ] Group Id key: group_id] -> c0:SMALLINT, group_id:BIGINT, sum_c1:BIGINT\n",
plan->toString(true, false));

plan = PlanBuilder()
.values({data_})
.partialStreamingAggregation({"c0"}, {"sum(c1) AS a"})
.planNode();
ASSERT_EQ(
"-- Aggregation[PARTIAL STREAMING [c0] a := sum(ROW[\"c1\"])] -> c0:SMALLINT, a:BIGINT\n",
plan->toString(true, false));
}

TEST_F(PlanNodeToStringTest, groupId) {
Expand Down