Skip to content

Commit

Permalink
Combine low seletivity vectors generated by join filter
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Sep 19, 2024
1 parent fad7750 commit e1e89f4
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 40 deletions.
91 changes: 67 additions & 24 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,8 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
auto* outputTableRows =
initBuffer<char*>(outputTableRows_, outputTableRowsCapacity_, pool());

int accumulatedNumOutput = 0;

for (;;) {
int numOut = 0;

Expand Down Expand Up @@ -1054,12 +1056,18 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
// to process and the NoMatchDetector isn't carrying forward a row that
// still needs to be written to the output.
if (!numOut && !noMatchDetector_.hasLastMissedRow()) {
if (accumulatedNumOutput > 0) {
numOut = accumulatedNumOutput;
fillOutput(numOut);
input_ = nullptr;
return output_;
}
input_ = nullptr;
return nullptr;
}
VELOX_CHECK_LE(numOut, outputBatchSize);

numOut = evalFilter(numOut);
int numOutputBeforeFilter = numOut;
numOut = evalFilter(numOut, accumulatedNumOutput);

if (numOut == 0) {
continue;
Expand All @@ -1079,6 +1087,30 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
return nullptr;
}

if (accumulatedNumOutput > 0 ||
(filter_ != nullptr && numOut < numOutputBeforeFilter / 10 &&
!emptyBuildSide)) {
accumulatedNumOutput += numOut;
if (!resultIter_->atEnd() &&
accumulatedNumOutput < outputTableRowsCapacity_ &&
accumulatedNumOutput < operatorCtx_->driverCtx()
->queryConfig()
.preferredOutputBatchBytes() &&
outputBatchSize - numOut > 0) {
mapping = folly::Range(
outputRowMapping_->asMutable<vector_size_t>() +
accumulatedNumOutput,
outputTableRowsCapacity_ - accumulatedNumOutput);
outputTableRows =
outputTableRows_->asMutable<char*>() + accumulatedNumOutput;
outputBatchSize = outputBatchSize - numOut;
continue;
}
}
if (accumulatedNumOutput > 0) {
numOut = accumulatedNumOutput;
}

fillOutput(numOut);

if (isLeftSemiOrAntiJoinNoFilter || emptyBuildSide) {
Expand All @@ -1104,7 +1136,18 @@ bool HashProbe::maybeReadSpillOutput() {
return true;
}

RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
RowVectorPtr HashProbe::createFilterInput(
vector_size_t size,
vector_size_t offset) {
BufferPtr outputRowMapping = outputRowMapping_;
if (offset > 0) {
outputRowMapping = BaseVector::sliceBuffer(
*INTEGER(),
outputRowMapping_,
offset,
outputTableRowsCapacity_ - offset,
pool());
}
std::vector<VectorPtr> filterColumns(filterInputType_->size());
for (auto projection : filterInputProjections_) {
if (projectedInputColumns_.find(projection.inputChannel) !=
Expand All @@ -1121,12 +1164,12 @@ RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
}

filterColumns[projection.outputChannel] = wrapChild(
size, outputRowMapping_, input_->childAt(projection.inputChannel));
size, outputRowMapping, input_->childAt(projection.inputChannel));
}

extractColumns(
table_.get(),
folly::Range<char* const*>(outputTableRows_->as<char*>(), size),
folly::Range<char* const*>(outputTableRows_->as<char*>() + offset, size),
filterTableProjections_,
pool(),
filterInputType_->children(),
Expand All @@ -1139,7 +1182,8 @@ RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
void HashProbe::prepareFilterRowsForNullAwareJoin(
RowVectorPtr& filterInput,
vector_size_t numRows,
bool filterPropagateNulls) {
bool filterPropagateNulls,
vector_size_t* rawMapping) {
VELOX_CHECK_LE(numRows, kBatchSize);
if (filterTableInput_ == nullptr) {
filterTableInput_ =
Expand Down Expand Up @@ -1179,7 +1223,6 @@ void HashProbe::prepareFilterRowsForNullAwareJoin(
// with null join key columns(s) as we can apply filtering after they cross
// join with the table rows later.
if (!nonNullInputRows_.isAllSelected()) {
auto* rawMapping = outputRowMapping_->asMutable<vector_size_t>();
for (int i = 0; i < numRows; ++i) {
if (filterInputRows_.isValid(i) &&
!nonNullInputRows_.isValid(rawMapping[i])) {
Expand Down Expand Up @@ -1269,10 +1312,8 @@ void HashProbe::applyFilterOnTableRowsForNullAwareJoin(

SelectivityVector HashProbe::evalFilterForNullAwareJoin(
vector_size_t numRows,
bool filterPropagateNulls) {
auto* rawOutputProbeRowMapping =
outputRowMapping_->asMutable<vector_size_t>();

bool filterPropagateNulls,
vector_size_t* rawMapping) {
// Subset of probe-side rows with a match that passed the filter.
SelectivityVector filterPassedRows(input_->size(), false);

Expand All @@ -1291,7 +1332,7 @@ SelectivityVector HashProbe::evalFilterForNullAwareJoin(
continue;
}

const auto probeRow = rawOutputProbeRowMapping[i];
const auto probeRow = rawMapping[i];
if (nonNullInputRows_.isValid(probeRow)) {
if (filterPassed(i)) {
filterPassedRows.setValid(probeRow, true);
Expand Down Expand Up @@ -1323,15 +1364,15 @@ SelectivityVector HashProbe::evalFilterForNullAwareJoin(
return filterPassedRows;
}

int32_t HashProbe::evalFilter(int32_t numRows) {
int32_t HashProbe::evalFilter(int32_t numRows, int32_t offset) {
if (!filter_) {
return numRows;
}

const bool filterPropagateNulls = filter_->expr(0)->propagatesNulls();
auto* rawOutputProbeRowMapping =
outputRowMapping_->asMutable<vector_size_t>();
auto* outputTableRows = outputTableRows_->asMutable<char*>();
outputRowMapping_->asMutable<vector_size_t>() + offset;
auto* outputTableRows = outputTableRows_->asMutable<char*>() + offset;

filterInputRows_.resizeFill(numRows);

Expand All @@ -1349,11 +1390,11 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
filterInputRows_.updateBounds();
}

RowVectorPtr filterInput = createFilterInput(numRows);
RowVectorPtr filterInput = createFilterInput(numRows, offset);

if (nullAware_) {
prepareFilterRowsForNullAwareJoin(
filterInput, numRows, filterPropagateNulls);
filterInput, numRows, filterPropagateNulls, rawOutputProbeRowMapping);
}

EvalCtx evalCtx(operatorCtx_->execCtx(), filter_.get(), filterInput.get());
Expand Down Expand Up @@ -1431,21 +1472,23 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
static const char* kPassed = "passed";

if (nullAware_) {
leftSemiProjectIsNull_.resize(numRows);
leftSemiProjectIsNull_.clearAll();
leftSemiProjectIsNull_.resize(numRows + offset, false);
if (offset == 0) {
leftSemiProjectIsNull_.clearAll();
}

auto addLast = [&](auto row, std::optional<bool> passed) {
if (passed.has_value()) {
outputTableRows[numPassed] =
passed.value() ? const_cast<char*>(kPassed) : nullptr;
} else {
leftSemiProjectIsNull_.setValid(numPassed, true);
leftSemiProjectIsNull_.setValid(numPassed + offset, true);
}
rawOutputProbeRowMapping[numPassed++] = row;
};

auto passedRows =
evalFilterForNullAwareJoin(numRows, filterPropagateNulls);
auto passedRows = evalFilterForNullAwareJoin(
numRows, filterPropagateNulls, rawOutputProbeRowMapping);
for (auto i = 0; i < numRows; ++i) {
// filterPassed(i) -> TRUE
// else passed -> NULL
Expand Down Expand Up @@ -1481,8 +1524,8 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
rawOutputProbeRowMapping[numPassed++] = row;
};
if (nullAware_) {
auto passedRows =
evalFilterForNullAwareJoin(numRows, filterPropagateNulls);
auto passedRows = evalFilterForNullAwareJoin(
numRows, filterPropagateNulls, rawOutputProbeRowMapping);
for (auto i = 0; i < numRows; ++i) {
auto probeRow = rawOutputProbeRowMapping[i];
bool passed = passedRows.isValid(probeRow);
Expand Down
10 changes: 6 additions & 4 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class HashProbe : public Operator {

// Applies 'filter_' to 'outputTableRows_' and updates 'outputRowMapping_'.
// Returns the number of passing rows.
vector_size_t evalFilter(vector_size_t numRows);
vector_size_t evalFilter(vector_size_t numRows, vector_size_t offset);

inline bool filterPassed(vector_size_t row) {
return filterInputRows_.isValid(row) &&
Expand All @@ -141,7 +141,7 @@ class HashProbe : public Operator {
// Create a temporary input vector to be passed to the filter. This ensures it
// gets destroyed in case its wrapping an unloaded vector which eventually
// needs to be wrapped in fillOutput().
RowVectorPtr createFilterInput(vector_size_t size);
RowVectorPtr createFilterInput(vector_size_t size, vector_size_t offset);

// Prepare filter row selectivity for null-aware join. 'numRows'
// specifies the number of rows in 'filterInputRows_' to process. If
Expand All @@ -150,12 +150,14 @@ class HashProbe : public Operator {
void prepareFilterRowsForNullAwareJoin(
RowVectorPtr& filterInput,
vector_size_t numRows,
bool filterPropagateNulls);
bool filterPropagateNulls,
vector_size_t* rawMapping);

// Evaluate the filter for null-aware anti or left semi project join.
SelectivityVector evalFilterForNullAwareJoin(
vector_size_t numRows,
bool filterPropagateNulls);
bool filterPropagateNulls,
vector_size_t* rawMapping);

// Combine the selected probe-side rows with all or null-join-key (depending
// on the iterator) build side rows and evaluate the filter. Mark probe rows
Expand Down
91 changes: 91 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8092,4 +8092,95 @@ TEST_F(HashJoinTest, nanKeys) {
makeFlatVector<int64_t>({1, 2})});
facebook::velox::test::assertEqualVectors(expected, result);
}

TEST_F(HashJoinTest, combineSmallVectorsAfterFilter) {
// Verify low selectivity / small vectors are combined to 1 vector.
auto probeVectors = makeBatches(1, [&](auto /*unused*/) {
return makeRowVector(
{"t0", "t1"},
{
makeFlatVector<int32_t>(1'000, [](auto row) { return row; }),
makeFlatVector<int64_t>(1'000, [](auto row) { return row * 10; }),
});
});

auto buildVectors = makeBatches(3, [&](auto /*unused*/) {
return makeRowVector(
{"u0", "u1"},
{
makeFlatVector<int32_t>(
1'000, [](auto row) { return -100 + (row / 5); }),
makeFlatVector<int64_t>(
1'000, [](auto row) { return -1000 + (row / 5) * 10; }),
});
});

std::shared_ptr<TempFilePath> probeFile = TempFilePath::create();
writeToFile(probeFile->getPath(), probeVectors);

std::shared_ptr<TempFilePath> buildFile = TempFilePath::create();
writeToFile(buildFile->getPath(), buildVectors);

createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);

core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(probeVectors[0]->type()))
.capturePlanNodeId(probeScanId)
.hashJoin(
{"t0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(buildVectors[0]->type()))
.capturePlanNodeId(buildScanId)
.planNode(),
"(t1 + u1) % 3 = 0",
{"t0", "t1", "match"},
core::JoinType::kLeftSemiProject)
.planNode();
SplitInput splitInput = {
{probeScanId,
{exec::Split(makeHiveConnectorSplit(probeFile->getPath()))}},
{buildScanId,
{exec::Split(makeHiveConnectorSplit(buildFile->getPath()))}},
};
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(plan)
.inputSplits(splitInput)
.checkSpillStats(false)
.referenceQuery(
"SELECT t0, t1, t0 IN (SELECT u0 FROM u WHERE (t1 + u1) % 3 = 0) FROM t")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto stats = task->taskStats();
for (auto& pipeline : stats.pipelineStats) {
for (auto op : pipeline.operatorStats) {
if (op.operatorType == "HashProbe") {
ASSERT_EQ(op.outputVectors, 1);
}
}
}
})
.run();

HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(flipJoinSides(plan))
.inputSplits(splitInput)
.checkSpillStats(false)
.referenceQuery(
"SELECT t0, t1, t0 IN (SELECT u0 FROM u WHERE (t1 + u1) % 3 = 0) FROM t")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto stats = task->taskStats();
for (auto& pipeline : stats.pipelineStats) {
for (auto op : pipeline.operatorStats) {
if (op.operatorType == "HashProbe") {
ASSERT_EQ(op.outputVectors, 1);
}
}
}
})
.run();
}
} // namespace
24 changes: 12 additions & 12 deletions velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,18 @@ class BaseVector {
storageByteCount_ = std::nullopt;
}

// Slice a buffer with specific type.
//
// For boolean type and if the offset is not multiple of 8, return a shifted
// copy; otherwise return a BufferView into the original buffer (with shared
// ownership of original buffer).
static BufferPtr sliceBuffer(
const Type&,
const BufferPtr&,
vector_size_t offset,
vector_size_t length,
memory::MemoryPool*);

protected:
// Returns a brief summary of the vector. The default implementation includes
// encoding, type, number of rows and number of nulls.
Expand All @@ -882,18 +894,6 @@ class BaseVector {
ensureNullsCapacity(length_, true);
}

// Slice a buffer with specific type.
//
// For boolean type and if the offset is not multiple of 8, return a shifted
// copy; otherwise return a BufferView into the original buffer (with shared
// ownership of original buffer).
static BufferPtr sliceBuffer(
const Type&,
const BufferPtr&,
vector_size_t offset,
vector_size_t length,
memory::MemoryPool*);

BufferPtr sliceNulls(vector_size_t offset, vector_size_t length) const {
return sliceBuffer(*BOOLEAN(), nulls_, offset, length, pool_);
}
Expand Down

0 comments on commit e1e89f4

Please sign in to comment.