diff --git a/velox/dwio/common/CacheInputStream.cpp b/velox/dwio/common/CacheInputStream.cpp index 659dc88119b6..db942c99f084 100644 --- a/velox/dwio/common/CacheInputStream.cpp +++ b/velox/dwio/common/CacheInputStream.cpp @@ -358,6 +358,8 @@ void CacheInputStream::loadPosition() { } const auto nextLoadRegion = nextQuantizedLoadRegion(position_); + // There is no need to update the metric in the loadData method because + // loadSync is always executed regardless and updates the metric. loadSync(nextLoadRegion); } diff --git a/velox/dwio/common/DirectBufferedInput.cpp b/velox/dwio/common/DirectBufferedInput.cpp index 8fb6e6fb0b0c..fe48166ac868 100644 --- a/velox/dwio/common/DirectBufferedInput.cpp +++ b/velox/dwio/common/DirectBufferedInput.cpp @@ -272,11 +272,19 @@ std::vector DirectCoalescedLoad::loadData(bool prefetch) { buffers.push_back(folly::Range(request.tinyData.data(), region.length)); } lastEnd = region.offset + request.loadSize; - size += std::min(loadQuantum_, region.length); + size += request.loadSize; + } + + uint64_t usecs = 0; + { + MicrosecondTimer timer(&usecs); + input_->read(buffers, requests_[0].region.offset, LogType::FILE); } - input_->read(buffers, requests_[0].region.offset, LogType::FILE); ioStats_->read().increment(size); + ioStats_->incRawBytesRead(size - overread); + ioStats_->incTotalScanTime(usecs * 1'000); + ioStats_->queryThreadIoLatency().increment(usecs); ioStats_->incRawOverreadBytes(overread); if (prefetch) { ioStats_->prefetch().increment(size); diff --git a/velox/dwio/common/DirectInputStream.cpp b/velox/dwio/common/DirectInputStream.cpp index 0f2582c0c478..3d8c8a13f636 100644 --- a/velox/dwio/common/DirectInputStream.cpp +++ b/velox/dwio/common/DirectInputStream.cpp @@ -190,6 +190,9 @@ void DirectInputStream::loadPosition() { loadedRegion_.length = (offsetInRegion_ + loadQuantum_ <= region_.length) ? loadQuantum_ : (region_.length - offsetInRegion_); + + // Since the loadSync method updates the metric, but it is conditionally + // executed, we also need to update the metric in the loadData method. loadSync(); } diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index e8e8044346e9..e9b5a748945c 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -272,6 +272,44 @@ TEST_F(TableScanTest, allColumns) { ASSERT_TRUE(it->second.dynamicFilterStats.empty()); } +TEST_F(TableScanTest, directBufferInputRawInputBytes) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + createDuckDbTable(vectors); + + auto plan = PlanBuilder(pool_.get()) + .startTableScan() + .outputType(rowType_) + .endTableScan() + .planNode(); + + std::unordered_map config; + std::unordered_map> connectorConfigs = + {}; + auto queryCtx = core::QueryCtx::create( + executor_.get(), + core::QueryConfig(std::move(config)), + connectorConfigs, + nullptr); + + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .splits(makeHiveConnectorSplits({filePath})) + .queryCtx(queryCtx) + .assertResults("SELECT * FROM tmp"); + + // A quick sanity check for memory usage reporting. Check that peak total + // memory usage for the project node is > 0. + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_GT(it->second.rawInputBytes, 0); + EXPECT_GT(getTableScanRuntimeStats(task)["totalScanTime"].sum, 0); + EXPECT_GT(getTableScanRuntimeStats(task)["queryThreadIoLatency"].sum, 0); +} + TEST_F(TableScanTest, connectorStats) { auto hiveConnector = std::dynamic_pointer_cast(