Skip to content

Commit

Permalink
Add AggregateNode::isPreGrouped() API (#7763)
Browse files Browse the repository at this point in the history
Summary:
Add AggregateNode::isPreGrouped() convenience API to easily tell whether
aggregation can be executed in streaming mode.

Also add "STREAMING" label to the output of AggregateNode.toString() if
applicable.

Pull Request resolved: #7763

Reviewed By: Yuhta

Differential Revision: D51613554

Pulled By: mbasmanova

fbshipit-source-id: b06dff7cce17fed32145101639faa405386bfbac
  • Loading branch information
ulysses-you authored and facebook-github-bot committed Nov 28, 2023
1 parent a9846b9 commit 93478ae
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
4 changes: 4 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
void AggregationNode::addDetails(std::stringstream& stream) const {
stream << stepName(step_) << " ";

if (isPreGrouped()) {
stream << "STREAMING ";
}

if (!groupingKeys_.empty()) {
stream << "[";
addFields(stream, groupingKeys_);
Expand Down
13 changes: 13 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,19 @@ class AggregationNode : public PlanNode {
return preGroupedKeys_;
}

bool isPreGrouped() const {
return !preGroupedKeys_.empty() &&
std::equal(
preGroupedKeys_.begin(),
preGroupedKeys_.end(),
groupingKeys_.begin(),
groupingKeys_.end(),
[](const FieldAccessTypedExprPtr& x,
const FieldAccessTypedExprPtr& y) -> bool {
return (*x == *y);
});
}

const std::vector<std::string>& aggregateNames() const {
return aggregateNames_;
}
Expand Down
4 changes: 1 addition & 3 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,7 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
} else if (
auto aggregationNode =
std::dynamic_pointer_cast<const core::AggregationNode>(planNode)) {
if (!aggregationNode->preGroupedKeys().empty() &&
aggregationNode->preGroupedKeys().size() ==
aggregationNode->groupingKeys().size()) {
if (aggregationNode->isPreGrouped()) {
operators.push_back(std::make_unique<StreamingAggregation>(
id, ctx.get(), aggregationNode));
} else {
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/tests/PlanNodeToStringTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ TEST_F(PlanNodeToStringTest, aggregation) {
ASSERT_EQ(
"-- Aggregation[SINGLE [c0, group_id] sum_c1 := sum(ROW[\"c1\"]) global group IDs: [ 1, 2 ] Group Id key: group_id] -> c0:SMALLINT, group_id:BIGINT, sum_c1:BIGINT\n",
plan->toString(true, false));

plan = PlanBuilder()
.values({data_})
.partialStreamingAggregation({"c0"}, {"sum(c1) AS a"})
.planNode();
ASSERT_EQ(
"-- Aggregation[PARTIAL STREAMING [c0] a := sum(ROW[\"c1\"])] -> c0:SMALLINT, a:BIGINT\n",
plan->toString(true, false));
}

TEST_F(PlanNodeToStringTest, groupId) {
Expand Down

0 comments on commit 93478ae

Please sign in to comment.