diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 74cba669f5ae..3c8cbd9c8531 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -744,6 +744,10 @@ NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBas const auto& idxLookupJoin = node.Cast(); + if (!idxLookupJoin.Input().Maybe()) { + return node; + } + return Build(ctx, node.Pos()) .Output() .Stage() diff --git a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp index a3167ef544ce..ac341172146f 100644 --- a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp @@ -724,7 +724,7 @@ create table `/Root/test/ds/store_sales` } -static TKikimrRunner GetKikimrWithJoinSettings(){ +static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false){ TVector settings; NKikimrKqp::TKqpSetting setting; @@ -741,7 +741,10 @@ static TKikimrRunner GetKikimrWithJoinSettings(){ //setting.SetValue("grace"); //settings.push_back(setting); - return TKikimrRunner(settings); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookupJoin); + auto serverSettings = TKikimrSettings().SetAppConfig(appConfig); + return TKikimrRunner(serverSettings); } class TChainConstructor { @@ -811,9 +814,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { chain.JoinTables(); } - Y_UNIT_TEST(FiveWayJoin) { + Y_UNIT_TEST_TWIN(FiveWayJoin, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -848,9 +851,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FourWayJoinLeftFirst) { + Y_UNIT_TEST_TWIN(FourWayJoinLeftFirst, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -885,9 +888,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FiveWayJoinWithPreds) { + Y_UNIT_TEST_TWIN(FiveWayJoinWithPreds, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -923,9 +926,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FiveWayJoinWithComplexPreds) { + Y_UNIT_TEST_TWIN(FiveWayJoinWithComplexPreds, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -961,9 +964,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FiveWayJoinWithComplexPreds2) { + Y_UNIT_TEST_TWIN(FiveWayJoinWithComplexPreds2, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -999,9 +1002,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FiveWayJoinWithPredsAndEquiv) { + Y_UNIT_TEST_TWIN(FiveWayJoinWithPredsAndEquiv, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1037,9 +1040,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FourWayJoinWithPredsAndEquivAndLeft) { + Y_UNIT_TEST_TWIN(FourWayJoinWithPredsAndEquivAndLeft, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1075,9 +1078,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FiveWayJoinWithConstantFold) { + Y_UNIT_TEST_TWIN(FiveWayJoinWithConstantFold, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1113,9 +1116,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(FiveWayJoinWithConstantFoldOpt) { + Y_UNIT_TEST_TWIN(FiveWayJoinWithConstantFoldOpt, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1151,9 +1154,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(DatetimeConstantFold) { + Y_UNIT_TEST_TWIN(DatetimeConstantFold, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1177,9 +1180,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { } } - Y_UNIT_TEST(TPCH2) { + Y_UNIT_TEST_TWIN(TPCH2, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1268,9 +1271,9 @@ limit 100; } } - Y_UNIT_TEST(TPCH9) { + Y_UNIT_TEST_TWIN(TPCH9, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1346,9 +1349,9 @@ order by } } - Y_UNIT_TEST(TPCH3) { + Y_UNIT_TEST_TWIN(TPCH3, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1419,9 +1422,9 @@ limit 10; } } - Y_UNIT_TEST(TPCH21) { + Y_UNIT_TEST_TWIN(TPCH21, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1489,9 +1492,9 @@ limit 100;)"); } } - Y_UNIT_TEST(TPCH5) { + Y_UNIT_TEST_TWIN(TPCH5, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1614,9 +1617,9 @@ order by } } - Y_UNIT_TEST(TPCH10) { + Y_UNIT_TEST_TWIN(TPCH10, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1724,9 +1727,9 @@ limit 20; } } - Y_UNIT_TEST(TPCH11) { + Y_UNIT_TEST_TWIN(TPCH11, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1807,9 +1810,9 @@ order by } } - Y_UNIT_TEST(TPCDS16) { + Y_UNIT_TEST_TWIN(TPCDS16, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1863,9 +1866,9 @@ limit 100; } } - Y_UNIT_TEST(TPCDS61) { + Y_UNIT_TEST_TWIN(TPCDS61, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1931,9 +1934,9 @@ limit 100; } } - Y_UNIT_TEST(TPCDS92) { + Y_UNIT_TEST_TWIN(TPCDS92, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1989,9 +1992,9 @@ limit 100; } } - Y_UNIT_TEST(TPCDS94) { + Y_UNIT_TEST_TWIN(TPCDS94, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -2042,9 +2045,9 @@ limit 100; } } - Y_UNIT_TEST(TPCDS95) { + Y_UNIT_TEST_TWIN(TPCDS95, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -2097,9 +2100,9 @@ limit 100; } } - Y_UNIT_TEST(TPCDS96) { + Y_UNIT_TEST_TWIN(TPCDS96, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -2136,9 +2139,9 @@ limit 100; } } - Y_UNIT_TEST(TPCDS88) { + Y_UNIT_TEST_TWIN(TPCDS88, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -2253,9 +2256,9 @@ from } } - Y_UNIT_TEST(TPCDS90) { + Y_UNIT_TEST_TWIN(TPCDS90, StreamLookupJoin) { - auto kikimr = GetKikimrWithJoinSettings(); + auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index f3867b4e9010..0df76f8b32de 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -778,7 +778,23 @@ TStatus AnnotateDqReplicate(const TExprNode::TPtr& input, TExprContext& ctx) { if (!EnsurePersistableType(replicateInput->Pos(), *inputItemType, ctx)) { return TStatus::Error; } - if (!EnsureStructType(replicateInput->Pos(), *inputItemType, ctx)) { + + if (inputItemType->GetKind() == ETypeAnnotationKind::Tuple) { + if (!EnsureTupleTypeSize(replicateInput->Pos(), inputItemType, 2, ctx)) { + return TStatus::Error; + } + + auto inputTupleType = inputItemType->Cast(); + if (!EnsureStructType(replicateInput->Pos(), *inputTupleType->GetItems()[0], ctx)) { + return TStatus::Error; + } + + bool isOptional = false; + const TStructExprType* structType = nullptr; + if (!EnsureStructOrOptionalStructType(replicateInput->Pos(), *inputTupleType->GetItems()[1], isOptional, structType, ctx)) { + return TStatus::Error; + } + } else if (!EnsureStructType(replicateInput->Pos(), *inputItemType, ctx)) { return TStatus::Error; } const TTypeAnnotationNode* lambdaInputFlowType = ctx.MakeType(inputItemType);