Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't make lookups on predicate pushdown stage #1560

Merged
merged 5 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx);
TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx, *getParents());
DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx);
return output;
}
Expand Down
80 changes: 77 additions & 3 deletions ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,61 @@

#include <ydb/core/kqp/opt/kqp_opt_impl.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/library/yql/core/yql_opt_utils.h>

namespace {

bool CanPushFlatMap(const NYql::NNodes::TCoFlatMapBase& flatMap, const NYql::TKikimrTableDescription& tableDesc, const NYql::TParentsMap& parentsMap, TVector<TString> & extraColumns) {
Copy link
Collaborator

@ulya-sidorina ulya-sidorina Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Подобная функция уже есть в kqp_opt_log_indexes.cpp, может вынесем в хэлперы?

auto flatMapLambda = flatMap.Lambda();
if (!NYql::IsFilterFlatMap(flatMapLambda)) {
return false;
}

const auto & flatMapLambdaArgument = flatMapLambda.Args().Arg(0).Ref();
auto flatMapLambdaConditional = flatMapLambda.Body().Cast<NYql::NNodes::TCoConditionalValueBase>();

TSet<TString> lambdaSubset;
auto isSubSet = HaveFieldsSubset(flatMapLambdaConditional.Predicate().Ptr(), flatMapLambdaArgument, lambdaSubset, parentsMap, true);
auto argType = NYql::RemoveOptionalType(flatMapLambdaArgument.GetTypeAnn());
if (argType->GetKind() != NYql::ETypeAnnotationKind::Struct) {
return false;
}
// helper doesn't accept if all columns are used
if (!isSubSet && lambdaSubset.size() != argType->Cast<NYql::TStructExprType>()->GetSize()) {
return false;
}

for (auto & lambdaColumn : lambdaSubset) {
auto columnIndex = tableDesc.GetKeyColumnIndex(lambdaColumn);
if (!columnIndex) {
return false;
}
}

extraColumns.insert(extraColumns.end(), lambdaSubset.begin(), lambdaSubset.end());
return true;
}

}

namespace NKikimr::NKqp::NOpt {

using namespace NYql;
using namespace NYql::NNodes;

TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext &kqpCtx) {
TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext &kqpCtx, const NYql::TParentsMap& parentsMap) {
if (!node.Maybe<TKqlDeleteRows>()) {
return node;
}

auto deleteRows = node.Cast<TKqlDeleteRows>();

TMaybeNode<TCoFlatMap> filter;

TMaybeNode<TKqlLookupTableBase> lookup;
TMaybeNode<TKqlReadTable> read;
TMaybeNode<TCoSkipNullMembers> skipNulMembers;
TMaybeNode<TKqlReadTableRanges> readranges;

if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
Expand All @@ -27,7 +66,15 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
} else if (deleteRows.Input().Maybe<TKqlReadTable>()) {
read = deleteRows.Input().Cast<TKqlReadTable>();
} else {
return node;
TMaybeNode<TExprBase> input = deleteRows.Input();
if (input.Maybe<TCoFlatMap>()) {
filter = deleteRows.Input().Cast<TCoFlatMap>();
input = filter.Input();
}
readranges = input.Maybe<TKqlReadTableRanges>();
if (!readranges) {
return node;
}
}

TMaybeNode<TExprBase> deleteInput;
Expand Down Expand Up @@ -90,7 +137,34 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
.Add(structMembers)
.Build()
.Done();
}
} else if (readranges) {
if (deleteRows.Table().Raw() != readranges.Cast().Table().Raw()) {
return node;
}

if (!readranges.Cast().PrefixPointsExpr()) {
return node;
}

const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readranges.Cast().Table().Path().Value());
auto hint = TKqpReadTableExplainPrompt::Parse(readranges.Cast().ExplainPrompt());
if (hint.PointPrefixLen != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

if (filter) {
TVector<TString> extraColumns;
if (!CanPushFlatMap(filter.Cast(), tableDesc, parentsMap, extraColumns)) {
return node;
}
deleteInput = Build<TCoFlatMap>(ctx, node.Pos())
.Lambda(filter.Lambda().Cast())
.Input(readranges.PrefixPointsExpr().Cast())
.Done();
} else {
deleteInput = readranges.PrefixPointsExpr();
}
}

YQL_ENSURE(deleteInput);

Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,6 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
.Done();
}
}
} else if (buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size()) {
YQL_ENSURE(prefixPointsExpr);
residualLambda = pointsExtractionResult.PrunedLambda;
buildLookup(prefixPointsExpr, input);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ NYql::NNodes::TExprBase KqpRewriteTakeOverIndexRead(const NYql::NNodes::TExprBas
const TKqpOptimizeContext& kqpCtx, const NYql::TParentsMap& parentsMap);

NYql::NNodes::TExprBase KqpDeleteOverLookup(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
const TKqpOptimizeContext &kqpCtx);
const TKqpOptimizeContext &kqpCtx, const NYql::TParentsMap& parentsMap);

NYql::NNodes::TExprBase KqpExcessUpsertInputColumns(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx);

Expand Down
76 changes: 0 additions & 76 deletions ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,82 +359,6 @@ Y_UNIT_TEST(ComplexRange) {
2);
}

Y_UNIT_TEST(SqlIn) {
Test(
R"(
SELECT * FROM `/Root/SimpleKey`
WHERE Key IN AsList(100, 102, (100 + 3))
ORDER BY Key;
)",
R"([
[[100];["Value20"]];[[102];["Value22"]];[[103];["Value23"]]
])");
}

Y_UNIT_TEST(BasicLookup) {
Test(
R"(
SELECT * FROM `/Root/SimpleKey`
WHERE Key = 100 or Key = 102 or Key = 103 or Key = null;
)",
R"([
[[100];["Value20"]];[[102];["Value22"]];[[103];["Value23"]]
])");
}

Y_UNIT_TEST(ComplexLookup) {
Test(
R"(
SELECT Key, Value FROM `/Root/SimpleKey`
WHERE Key = 100 or Key = 102 or Key = (100 + 3);
)",
R"([
[[100];["Value20"]];[[102];["Value22"]];[[103];["Value23"]]
])");
}

Y_UNIT_TEST(SqlInComplexKey) {
Test(
R"(
SELECT Key, Fk, Value FROM `/Root/ComplexKey`
WHERE (Key, Fk) IN AsList(
(1, 101),
(2, 102),
(2, 102 + 1),
)
ORDER BY Key, Fk;
)",
R"([
[[1];[101];["Value1"]];[[2];[102];["Value1"]];[[2];[103];["Value3"]]
])");
}

Y_UNIT_TEST(BasicLookupComplexKey) {
Test(
R"(
SELECT Key, Fk, Value FROM `/Root/ComplexKey`
WHERE (Key = 1 and Fk = 101) OR
(2 = Key and 102 = Fk) OR
(2 = Key and 103 = Fk);
)",
R"([
[[1];[101];["Value1"]];[[2];[102];["Value1"]];[[2];[103];["Value3"]]
])");
}

Y_UNIT_TEST(ComplexLookupComplexKey) {
Test(
R"(
SELECT Key, Fk, Value FROM `/Root/ComplexKey`
WHERE (Key = 1 and Fk = 101) OR
(2 = Key and 102 = Fk) OR
(2 = Key and 102 + 1 = Fk);
)",
R"([
[[1];[101];["Value1"]];[[2];[102];["Value1"]];[[2];[103];["Value3"]]
])");
}

Y_UNIT_TEST(PointJoin) {
Test(
R"(
Expand Down
18 changes: 8 additions & 10 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,23 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM `/Root/Test` WHERE Group = $group AND Name = $name;
)";

auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
} else {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
}

auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$group").OptionalUint32(1).Build()
.AddParam("$name").OptionalString("Paul").Build()
.Build();

NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);

auto result = session.ExecuteDataQuery(query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params).ExtractValueSync();
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[[300u];["None"];[1u];["Paul"]]])", FormatResultSetYson(result.GetResultSet(0)));

AssertTableStats(result, "/Root/Test", {
.ExpectedReads = 1,
});

params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$group").OptionalUint32(1).Build()
.Build();
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3882,6 +3882,9 @@ Y_UNIT_TEST_SUITE(KqpPg) {
auto fullScan = FindPlanNodeByKv(plan, "Node Type", "Filter-TableFullScan");
UNIT_ASSERT_C(!fullScan.IsDefined(), "got fullscan, expected lookup");
auto lookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
if (!lookup.IsDefined()) {
lookup = FindPlanNodeByKv(plan, "Node Type", "TableRangeScan");
}
UNIT_ASSERT_C(lookup.IsDefined(), "no Table Lookup in plan");
}
{
Expand Down
19 changes: 7 additions & 12 deletions ydb/core/kqp/ut/query/kqp_explain_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,18 +493,23 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
NJson::ReadJsonTree(result.GetPlan(), &plan, true);
UNIT_ASSERT(ValidatePlanNodeIds(plan));

Cerr << "Plan " << result.GetPlan() << Endl;

auto node = FindPlanNodeByKv(plan, "Name", "TableRangeScan");
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");


if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
node = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
} else {
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
}

UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
if (node.IsDefined()) {
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
}
}

Y_UNIT_TEST(FewEffects) {
Expand Down Expand Up @@ -536,16 +541,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_VALUES_EQUAL(fullScansCount, 1);

auto rangeScansCount = CountPlanNodesByKv(plan, "Node Type", "TableRangeScan");
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);

ui32 lookupsCount = 0;
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
} else {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
}

UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1);
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 2);

/* check tables section */
const auto& tableInfo = plan.GetMapSafe().at("tables").GetArraySafe()[0].GetMapSafe();
Expand All @@ -565,7 +561,6 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_VALUES_EQUAL(counter["MultiErase"], deletesCount);
UNIT_ASSERT_VALUES_EQUAL(counter["FullScan"], fullScansCount);
UNIT_ASSERT_VALUES_EQUAL(counter["Scan"], rangeScansCount);
UNIT_ASSERT_VALUES_EQUAL(counter["Lookup"], lookupsCount);
}

Y_UNIT_TEST(ExplainDataQueryWithParams) {
Expand Down
21 changes: 6 additions & 15 deletions ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,12 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (result.GetStatus() == EStatus::SUCCESS)
continue;

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("has no snapshot at");
}), result.GetIssues().ToString());

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
} else {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("stale snapshot");
}), result.GetIssues().ToString());

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
}
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("has no snapshot at");
}), result.GetIssues().ToString());

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);

caught = true;
break;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,8 @@ Y_UNIT_TEST_SUITE(KqpScripting) {

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
UNIT_ASSERT(stats.query_phases(1).table_access().empty());
for (const auto& phase : stats.query_phases()) {
if (phase.table_access().size()) {
if (phase.table_access(0).name() == "/Root/EightShard") {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second;

std::string canon;
if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup() || server->GetSettings().AppConfig->GetTableServiceConfig().GetPredicateExtract20()) {
auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
UNIT_ASSERT(readActorSpan);

Expand Down
Loading
Loading