Skip to content

Commit

Permalink
Correctly record the RawBytesRead and scan time metric in DirectBuffe…
Browse files Browse the repository at this point in the history
…redInput (#9801)

Summary:
Follow up #8545

Pull Request resolved: #9801

Reviewed By: Yuhta

Differential Revision: D58596780

Pulled By: kagamiori

fbshipit-source-id: ff2b15b57a3214ae76db29b8791e3a1a84512e0a
  • Loading branch information
JkSelf authored and facebook-github-bot committed Jun 17, 2024
1 parent 2b94eaa commit 6b32339
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
2 changes: 2 additions & 0 deletions velox/dwio/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ void CacheInputStream::loadPosition() {
}

const auto nextLoadRegion = nextQuantizedLoadRegion(position_);
// There is no need to update the metric in the loadData method because
// loadSync is always executed regardless and updates the metric.
loadSync(nextLoadRegion);
}

Expand Down
12 changes: 10 additions & 2 deletions velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,19 @@ std::vector<cache::CachePin> DirectCoalescedLoad::loadData(bool prefetch) {
buffers.push_back(folly::Range(request.tinyData.data(), region.length));
}
lastEnd = region.offset + request.loadSize;
size += std::min<int32_t>(loadQuantum_, region.length);
size += request.loadSize;
}

uint64_t usecs = 0;
{
MicrosecondTimer timer(&usecs);
input_->read(buffers, requests_[0].region.offset, LogType::FILE);
}

input_->read(buffers, requests_[0].region.offset, LogType::FILE);
ioStats_->read().increment(size);
ioStats_->incRawBytesRead(size - overread);
ioStats_->incTotalScanTime(usecs * 1'000);
ioStats_->queryThreadIoLatency().increment(usecs);
ioStats_->incRawOverreadBytes(overread);
if (prefetch) {
ioStats_->prefetch().increment(size);
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/DirectInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ void DirectInputStream::loadPosition() {
loadedRegion_.length = (offsetInRegion_ + loadQuantum_ <= region_.length)
? loadQuantum_
: (region_.length - offsetInRegion_);

// Since the loadSync method updates the metric, but it is conditionally
// executed, we also need to update the metric in the loadData method.
loadSync();
}

Expand Down
38 changes: 38 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,44 @@ TEST_F(TableScanTest, allColumns) {
ASSERT_TRUE(it->second.dynamicFilterStats.empty());
}

TEST_F(TableScanTest, directBufferInputRawInputBytes) {
auto vectors = makeVectors(10, 1'000);
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), vectors);
createDuckDbTable(vectors);

auto plan = PlanBuilder(pool_.get())
.startTableScan()
.outputType(rowType_)
.endTableScan()
.planNode();

std::unordered_map<std::string, std::string> config;
std::unordered_map<std::string, std::shared_ptr<Config>> connectorConfigs =
{};
auto queryCtx = core::QueryCtx::create(
executor_.get(),
core::QueryConfig(std::move(config)),
connectorConfigs,
nullptr);

auto task = AssertQueryBuilder(duckDbQueryRunner_)
.plan(plan)
.splits(makeHiveConnectorSplits({filePath}))
.queryCtx(queryCtx)
.assertResults("SELECT * FROM tmp");

// A quick sanity check for memory usage reporting. Check that peak total
// memory usage for the project node is > 0.
auto planStats = toPlanStats(task->taskStats());
auto scanNodeId = plan->id();
auto it = planStats.find(scanNodeId);
ASSERT_TRUE(it != planStats.end());
ASSERT_GT(it->second.rawInputBytes, 0);
EXPECT_GT(getTableScanRuntimeStats(task)["totalScanTime"].sum, 0);
EXPECT_GT(getTableScanRuntimeStats(task)["queryThreadIoLatency"].sum, 0);
}

TEST_F(TableScanTest, connectorStats) {
auto hiveConnector =
std::dynamic_pointer_cast<connector::hive::HiveConnector>(
Expand Down

0 comments on commit 6b32339

Please sign in to comment.