From 5fe51e65c85864b710e6e81543fd6dc287fb2b6c Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 13 Feb 2024 16:42:06 -0500 Subject: [PATCH] Report rawInputPositions stat for MergeExchange --- velox/exec/MergeSource.cpp | 1 + velox/exec/tests/MultiFragmentTest.cpp | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index 36904506f8bd..08bb79b8acac 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -167,6 +167,7 @@ class MergeExchangeSource : public MergeSource { auto lockedStats = mergeExchange_->stats().wlock(); lockedStats->addInputVector(data->estimateFlatSize(), data->size()); + lockedStats->rawInputPositions += data->size(); } // Since VectorStreamGroup::read() may cause inputStream to be at end, diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 52f539138dea..32957ad8154e 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -404,8 +404,10 @@ TEST_F(MultiFragmentTest, mergeExchange) { } auto finalSortTaskId = makeTaskId("orderby", tasks.size()); + core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() .mergeExchange(outputType, {"c0"}) + .capturePlanNodeId(mergeExchangeId) .partitionedOutput({}, 1) .planNode(); @@ -421,6 +423,15 @@ TEST_F(MultiFragmentTest, mergeExchange) { for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); } + + const auto finalSortStats = toPlanStats(task->taskStats()); + const auto& mergeExchangeStats = finalSortStats.at(mergeExchangeId); + + EXPECT_EQ(20'000, mergeExchangeStats.inputRows); + EXPECT_EQ(20'000, mergeExchangeStats.rawInputRows); + + EXPECT_LT(0, mergeExchangeStats.inputBytes); + EXPECT_LT(0, mergeExchangeStats.rawInputBytes); } // Test reordering and dropping columns in PartitionedOutput operator.