Skip to content

Commit

Permalink
fix(kqp): use safe key cast for stream lookup join (ydb-platform#4326)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored and MrLolthe1st committed May 28, 2024
1 parent 8773d99 commit 8c71b1e
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 97 deletions.
27 changes: 18 additions & 9 deletions ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
leftJoinKeys.emplace(leftKey);
}

const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
&& supportedStreamJoinKinds.contains(join.JoinType().Value())
&& !indexName;

auto leftRowArg = Build<TCoArgument>(ctx, join.Pos())
.Name("leftRowArg")
.Done();
Expand Down Expand Up @@ -677,10 +682,19 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}
if (canCast) {
DBG("------ cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
member = Build<TCoConvert>(ctx, join.Pos())
.Input(member)
.Type().Build(rightDataType->GetName())
.Done().Ptr();

if (useStreamIndexLookupJoin) {
// For stream lookup join we should cast keys before join
member = Build<TCoSafeCast>(ctx, join.Pos())
.Value(member)
.Type(ExpandType(join.Pos(), *rightType, ctx))
.Done().Ptr();
} else {
member = Build<TCoConvert>(ctx, join.Pos())
.Input(member)
.Type().Build(rightDataType->GetName())
.Done().Ptr();
}
} else {
DBG("------ can not cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
return {};
Expand All @@ -704,11 +718,6 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
return {};
}

const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
&& supportedStreamJoinKinds.contains(join.JoinType().Value())
&& !indexName;

bool needPrecomputeLeft = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
&& !join.LeftInput().Maybe<TCoParameter>()
&& !IsParameterToListOfStructsRepack(join.LeftInput())
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,18 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
break;
}

UnprocessedRows.pop_front();
auto hasNulls = [](const TOwnedCellVec& cellVec) {
for (const auto& cell : cellVec) {
if (cell.IsNull()) {
return true;
}
}

if (!joinKey.data()->IsNull()) { // don't use nulls as lookup keys, because null != null
return false;
};

UnprocessedRows.pop_front();
if (!hasNulls(joinKey)) { // don't use nulls as lookup keys, because null != null
std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
if (joinKey.size() < KeyColumns.size()) {
// build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
Expand Down Expand Up @@ -730,7 +739,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) {
auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name);
YQL_ENSURE(it != ReadColumns.end());
joinKeyCells[joinKeyIdx] = row[std::distance(ReadColumns.begin(), it)];
joinKeyCells[LookupKeyColumns[joinKeyIdx]->KeyOrder] = row[std::distance(ReadColumns.begin(), it)];
}

auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
Expand Down
Loading

0 comments on commit 8c71b1e

Please sign in to comment.