Skip to content

Commit

Permalink
Merge 06d6024 into c85f845
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike authored Aug 13, 2024
2 parents c85f845 + 06d6024 commit 28681fb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
10 changes: 3 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1300,13 +1300,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const auto& input = stage.GetInputs(inputIndex);

// Current assumptions:
// 1. `Broadcast` can not be the 1st stage input unless it's a single input
// 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
if (inputIndex == 0) {
if (stage.InputsSize() > 1) {
YQL_ENSURE(input.GetTypeCase() != NKqpProto::TKqpPhyConnection::kBroadcast);
}
} else {
// 1. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
// 2. Stages where 1st input is `Broadcast` are not partitioned.
if (inputIndex > 0) {
switch (input.GetTypeCase()) {
case NKqpProto::TKqpPhyConnection::kBroadcast:
case NKqpProto::TKqpPhyConnection::kHashShuffle:
Expand Down
49 changes: 49 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3955,6 +3955,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
AssertTableReads(result, "/Root/SecondaryKeys/Index/indexImplTable", 1);
}

Y_UNIT_TEST(MultipleBroadcastJoin) {
TKikimrSettings kisettings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX);
kisettings.SetAppConfig(appConfig);

TKikimrRunner kikimr(kisettings);

auto db = kikimr.GetTableClient();
auto client = kikimr.GetQueryClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
auto session = db.CreateSession().GetValueSync().GetSession();
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
create table demo_ba(id text, some text, ref1 text, ref2 text, primary key(id));
create table demo_ref1(id text, code text, some text, primary key(id), index ix_code global on (code));
create table demo_ref2(id text, code text, some text, primary key(id), index ix_code global on (code));
)").GetValueSync());
}

auto query = R"(
select ba_0.id, ba_0.some,
r_1.id, r_1.some, r_1.code,
r_2.id, r_2.some, r_2.code
from demo_ba ba_0
left join demo_ref1 r_1 on r_1.id=ba_0.ref1
left join demo_ref2 r_2 on r_2.code=ba_0.ref2
where ba_0.id in ("ba#10"u,"ba#20"u,"ba#30"u,"ba#40"u,"ba#50"u,"ba#60"u,"ba#70"u,"ba#80"u,"ba#90"u,"ba#100"u);
)";

auto settings = NYdb::NQuery::TExecuteQuerySettings()
.Syntax(NYdb::NQuery::ESyntax::YqlV1)
.ConcurrentResultSets(false);
{
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
//CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0)));
//CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
}
{
auto it = client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
Cerr << StreamResultToYson(it);
}

}

Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) {
TKikimrSettings settings;
Expand Down

0 comments on commit 28681fb

Please sign in to comment.