Skip to content

Commit

Permalink
fix(kqp): allow tuple of structs as input for DqReplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Jun 3, 2024
1 parent 45c8bad commit cdfd409
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 53 deletions.
4 changes: 4 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,10 @@ NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBas

const auto& idxLookupJoin = node.Cast<TKqlIndexLookupJoin>();

if (!idxLookupJoin.Input().Maybe<TDqCnUnionAll>()) {
return node;
}

return Build<TDqCnUnionAll>(ctx, node.Pos())
.Output()
.Stage<TDqStage>()
Expand Down
107 changes: 55 additions & 52 deletions ydb/core/kqp/ut/join/kqp_join_order_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ create table `/Root/test/ds/store_sales`

}

static TKikimrRunner GetKikimrWithJoinSettings(){
static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false){
TVector<NKikimrKqp::TKqpSetting> settings;

NKikimrKqp::TKqpSetting setting;
Expand All @@ -741,7 +741,10 @@ static TKikimrRunner GetKikimrWithJoinSettings(){
//setting.SetValue("grace");
//settings.push_back(setting);

return TKikimrRunner(settings);
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookupJoin);
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
return TKikimrRunner(serverSettings);
}

class TChainConstructor {
Expand Down Expand Up @@ -811,9 +814,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
chain.JoinTables();
}

Y_UNIT_TEST(FiveWayJoin) {
Y_UNIT_TEST_TWIN(FiveWayJoin, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -848,9 +851,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FourWayJoinLeftFirst) {
Y_UNIT_TEST_TWIN(FourWayJoinLeftFirst, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -885,9 +888,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FiveWayJoinWithPreds) {
Y_UNIT_TEST_TWIN(FiveWayJoinWithPreds, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -923,9 +926,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FiveWayJoinWithComplexPreds) {
Y_UNIT_TEST_TWIN(FiveWayJoinWithComplexPreds, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -961,9 +964,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FiveWayJoinWithComplexPreds2) {
Y_UNIT_TEST_TWIN(FiveWayJoinWithComplexPreds2, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -999,9 +1002,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FiveWayJoinWithPredsAndEquiv) {
Y_UNIT_TEST_TWIN(FiveWayJoinWithPredsAndEquiv, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1037,9 +1040,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FourWayJoinWithPredsAndEquivAndLeft) {
Y_UNIT_TEST_TWIN(FourWayJoinWithPredsAndEquivAndLeft, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1075,9 +1078,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FiveWayJoinWithConstantFold) {
Y_UNIT_TEST_TWIN(FiveWayJoinWithConstantFold, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1113,9 +1116,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(FiveWayJoinWithConstantFoldOpt) {
Y_UNIT_TEST_TWIN(FiveWayJoinWithConstantFoldOpt, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1151,9 +1154,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(DatetimeConstantFold) {
Y_UNIT_TEST_TWIN(DatetimeConstantFold, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand All @@ -1177,9 +1180,9 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
}
}

Y_UNIT_TEST(TPCH2) {
Y_UNIT_TEST_TWIN(TPCH2, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1268,9 +1271,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCH9) {
Y_UNIT_TEST_TWIN(TPCH9, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1346,9 +1349,9 @@ order by
}
}

Y_UNIT_TEST(TPCH3) {
Y_UNIT_TEST_TWIN(TPCH3, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1419,9 +1422,9 @@ limit 10;
}
}

Y_UNIT_TEST(TPCH21) {
Y_UNIT_TEST_TWIN(TPCH21, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1489,9 +1492,9 @@ limit 100;)");
}
}

Y_UNIT_TEST(TPCH5) {
Y_UNIT_TEST_TWIN(TPCH5, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1614,9 +1617,9 @@ order by
}
}

Y_UNIT_TEST(TPCH10) {
Y_UNIT_TEST_TWIN(TPCH10, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1724,9 +1727,9 @@ limit 20;
}
}

Y_UNIT_TEST(TPCH11) {
Y_UNIT_TEST_TWIN(TPCH11, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1807,9 +1810,9 @@ order by
}
}

Y_UNIT_TEST(TPCDS16) {
Y_UNIT_TEST_TWIN(TPCDS16, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1863,9 +1866,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCDS61) {
Y_UNIT_TEST_TWIN(TPCDS61, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1931,9 +1934,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCDS92) {
Y_UNIT_TEST_TWIN(TPCDS92, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -1989,9 +1992,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCDS94) {
Y_UNIT_TEST_TWIN(TPCDS94, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -2042,9 +2045,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCDS95) {
Y_UNIT_TEST_TWIN(TPCDS95, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -2097,9 +2100,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCDS96) {
Y_UNIT_TEST_TWIN(TPCDS96, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -2136,9 +2139,9 @@ limit 100;
}
}

Y_UNIT_TEST(TPCDS88) {
Y_UNIT_TEST_TWIN(TPCDS88, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -2253,9 +2256,9 @@ from
}
}

Y_UNIT_TEST(TPCDS90) {
Y_UNIT_TEST_TWIN(TPCDS90, StreamLookupJoin) {

auto kikimr = GetKikimrWithJoinSettings();
auto kikimr = GetKikimrWithJoinSettings(StreamLookupJoin);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down
18 changes: 17 additions & 1 deletion ydb/library/yql/dq/type_ann/dq_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,23 @@ TStatus AnnotateDqReplicate(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsurePersistableType(replicateInput->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
if (!EnsureStructType(replicateInput->Pos(), *inputItemType, ctx)) {

if (inputItemType->GetKind() == ETypeAnnotationKind::Tuple) {
if (!EnsureTupleTypeSize(replicateInput->Pos(), inputItemType, 2, ctx)) {
return TStatus::Error;
}

auto inputTupleType = inputItemType->Cast<TTupleExprType>();
if (!EnsureStructType(replicateInput->Pos(), *inputTupleType->GetItems()[0], ctx)) {
return TStatus::Error;
}

bool isOptional = false;
const TStructExprType* structType = nullptr;
if (!EnsureStructOrOptionalStructType(replicateInput->Pos(), *inputTupleType->GetItems()[1], isOptional, structType, ctx)) {
return TStatus::Error;
}
} else if (!EnsureStructType(replicateInput->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
const TTypeAnnotationNode* lambdaInputFlowType = ctx.MakeType<TFlowExprType>(inputItemType);
Expand Down

0 comments on commit cdfd409

Please sign in to comment.