Skip to content

Commit

Permalink
Address some lints
Browse files Browse the repository at this point in the history
Summary:
Address 3 lints:
1. Use std::vector::at() instead of std::vector::operator[] in the unit test.
2. Add directive to ignore 'bugprone-use-after-move' lint in Task.cpp.
3. Avoid extra copy of shared_ptr in the split preload functor.

Reviewed By: amitkdutta, pranjalssh

Differential Revision: D54180033

fbshipit-source-id: 5d26ed28dd0f482a24ba949a49b34ebc0f00bb71
  • Loading branch information
Sergey Pershin authored and facebook-github-bot committed Feb 26, 2024
1 parent 52ad7f5 commit 929fd37
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
10 changes: 6 additions & 4 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,14 @@ void TableScan::checkPreload() {
maxSplitPreloadPerDriver_;
if (!splitPreloader_) {
splitPreloader_ =
[executor, this](std::shared_ptr<connector::ConnectorSplit> split) {
[executor,
this](const std::shared_ptr<connector::ConnectorSplit>& split) {
preload(split);

executor->add([taskHolder = operatorCtx_->task(), split]() mutable {
split->dataSource->prepare();
split.reset();
executor->add([taskHolder = operatorCtx_->task(),
connectorSplit = split]() mutable {
connectorSplit->dataSource->prepare();
connectorSplit.reset();
});
};
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class TableScan : public SourceOperator {
// callback can schedule preloads on an executor. These preloads may
// outlive the Task and therefore need to capture a shared_ptr to
// it.
std::function<void(std::shared_ptr<connector::ConnectorSplit>)>
std::function<void(const std::shared_ptr<connector::ConnectorSplit>&)>
splitPreloader_{nullptr};

// Count of splits that started background preload.
Expand Down
12 changes: 6 additions & 6 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,8 @@ bool Task::addSplitWithSequence(
}

if (!isTaskRunning) {
// Safe because 'split' is moved away above only if 'isTaskRunning'.
// @lint-ignore CLANGTIDY bugprone-use-after-move
addRemoteSplit(planNodeId, split);
}

Expand All @@ -1195,6 +1197,7 @@ void Task::addSplit(const core::PlanNodeId& planNodeId, exec::Split&& split) {

if (!isTaskRunning) {
// Safe because 'split' is moved away above only if 'isTaskRunning'.
// @lint-ignore CLANGTIDY bugprone-use-after-move
addRemoteSplit(planNodeId, split);
}
}
Expand Down Expand Up @@ -1365,8 +1368,7 @@ BlockingReason Task::getSplitOrFuture(
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits,
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload) {
const ConnectorSplitPreloadFunc& preload) {
std::lock_guard<std::timed_mutex> l(mutex_);
auto& splitsState = getPlanNodeSplitsStateLocked(planNodeId);
return getSplitOrFutureLocked(
Expand All @@ -1384,8 +1386,7 @@ BlockingReason Task::getSplitOrFutureLocked(
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits,
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload) {
const ConnectorSplitPreloadFunc& preload) {
if (splitsStore.splits.empty()) {
if (splitsStore.noMoreSplits) {
return BlockingReason::kNotBlocked;
Expand All @@ -1405,8 +1406,7 @@ exec::Split Task::getSplitLocked(
bool forTableScan,
SplitsStore& splitsStore,
int32_t maxPreloadSplits,
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload) {
const ConnectorSplitPreloadFunc& preload) {
int32_t readySplitIndex = -1;
if (maxPreloadSplits) {
for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits;
Expand Down
15 changes: 8 additions & 7 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class OutputBufferManager;

class HashJoinBridge;
class NestedLoopJoinBridge;

using ConnectorSplitPreloadFunc =
std::function<void(const std::shared_ptr<connector::ConnectorSplit>&)>;

class Task : public std::enable_shared_from_this<Task> {
public:
/// Creates a task to execute a plan fragment, but doesn't start execution
Expand Down Expand Up @@ -360,8 +364,7 @@ class Task : public std::enable_shared_from_this<Task> {
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits = 0,
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload = nullptr);
const ConnectorSplitPreloadFunc& preload = nullptr);

void splitFinished(bool fromTableScan, int64_t splitWeight);

Expand Down Expand Up @@ -791,18 +794,16 @@ class Task : public std::enable_shared_from_this<Task> {
SplitsStore& splitsStore,
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits = 0,
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload = nullptr);
int32_t maxPreloadSplits,
const ConnectorSplitPreloadFunc& preload);

/// Returns next split from the store. The caller must ensure the store is not
/// empty.
exec::Split getSplitLocked(
bool forTableScan,
SplitsStore& splitsStore,
int32_t maxPreloadSplits,
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload);
const ConnectorSplitPreloadFunc& preload);

// Creates for the given split group and fills up the 'SplitGroupState'
// structure, which stores inter-operator state (local exchange, bridges).
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ TEST_F(TableScanTest, tableScanSplitsAndWeights) {
const auto filePaths = makeFilePaths(numSplits);
auto vectors = makeVectors(numSplits, 100);
for (auto i = 0; i < numSplits; i++) {
writeToFile(filePaths[i]->path, vectors[i]);
writeToFile(filePaths.at(i)->path, vectors.at(i));
}

// Set the table scan operators wait twice:
Expand Down Expand Up @@ -1462,7 +1462,7 @@ TEST_F(TableScanTest, tableScanSplitsAndWeights) {
for (auto fileIndex = 0; fileIndex < numSplits; ++fileIndex) {
const int64_t splitWeight = fileIndex * 10 + 1;
totalSplitWeights += splitWeight;
auto split = makeHiveSplit(filePaths[fileIndex]->path, splitWeight);
auto split = makeHiveSplit(filePaths.at(fileIndex)->path, splitWeight);
task->addSplit(scanNodeId, std::move(split));
}
task->noMoreSplits(scanNodeId);
Expand Down

0 comments on commit 929fd37

Please sign in to comment.