Skip to content

Commit

Permalink
Report rawInputPositions stat for MergeExchange
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Feb 13, 2024
1 parent 42b10d9 commit 5fe51e6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
1 change: 1 addition & 0 deletions velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class MergeExchangeSource : public MergeSource {

auto lockedStats = mergeExchange_->stats().wlock();
lockedStats->addInputVector(data->estimateFlatSize(), data->size());
lockedStats->rawInputPositions += data->size();
}

// Since VectorStreamGroup::read() may cause inputStream to be at end,
Expand Down
11 changes: 11 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ TEST_F(MultiFragmentTest, mergeExchange) {
}

auto finalSortTaskId = makeTaskId("orderby", tasks.size());
core::PlanNodeId mergeExchangeId;
auto finalSortPlan = PlanBuilder()
.mergeExchange(outputType, {"c0"})
.capturePlanNodeId(mergeExchangeId)
.partitionedOutput({}, 1)
.planNode();

Expand All @@ -421,6 +423,15 @@ TEST_F(MultiFragmentTest, mergeExchange) {
for (auto& task : tasks) {
ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId();
}

const auto finalSortStats = toPlanStats(task->taskStats());
const auto& mergeExchangeStats = finalSortStats.at(mergeExchangeId);

EXPECT_EQ(20'000, mergeExchangeStats.inputRows);
EXPECT_EQ(20'000, mergeExchangeStats.rawInputRows);

EXPECT_LT(0, mergeExchangeStats.inputBytes);
EXPECT_LT(0, mergeExchangeStats.rawInputBytes);
}

// Test reordering and dropping columns in PartitionedOutput operator.
Expand Down

0 comments on commit 5fe51e6

Please sign in to comment.