diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index e76e4f2e33cd..653e5b93011a 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -14,12 +14,7 @@ using namespace NYql::NNodes; namespace { -struct TRowsAndKeysResult { - TDqPhyPrecompute RowsPrecompute; - TDqPhyPrecompute KeysPrecompute; -}; - -TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseResult, +TDqStage ExtractRowsAndKeysStage(const TCondenseInputResult& condenseResult, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) { TCoArgument rowsListArg(ctx.NewArgument(pos, "rows_list")); @@ -40,7 +35,7 @@ TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseRes .Build() .Done(); - auto computeStage = Build(ctx, pos) + return Build(ctx, pos) .Inputs() .Add(condenseResult.StageInputs) .Build() @@ -67,29 +62,6 @@ TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseRes .Build() .Settings().Build() .Done(); - - auto rowsPrecompute = Build(ctx, pos) - .Connection() - .Output() - .Stage(computeStage) - .Index().Build("0") - .Build() - .Build() - .Done(); - - auto keysPrecompute = Build(ctx, pos) - .Connection() - .Output() - .Stage(computeStage) - .Index().Build("1") - .Build() - .Build() - .Done(); - - return TRowsAndKeysResult { - .RowsPrecompute = rowsPrecompute, - .KeysPrecompute = keysPrecompute - }; } // Return set of data columns need to be save during index update @@ -344,10 +316,10 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput .Done(); } -TMaybe RewriteInputForConstraint(const TExprBase& inputRows, const THashSet inputColumns, - const THashSet& checkDefaults, - const TKikimrTableDescription& table, const TSecondaryIndexes& indexes, - TPositionHandle pos, TExprContext& ctx) +TMaybe>> +RewriteInputForConstraint(const TExprBase& inputRows, const THashSet inputColumns, + const THashSet& checkDefaults, const TKikimrTableDescription& table, + const TSecondaryIndexes& indexes, TPositionHandle pos, TExprContext& ctx) { auto condenseResult = CondenseInput(inputRows, ctx); if (!condenseResult) { @@ -384,6 +356,8 @@ TMaybe RewriteInputForConstraint(const TExprBase& inputRow missedKeyInput.clear(); } + TMaybeNode precomputeTableLookupDict; + if (!missedKeyInput.empty() || !checkDefaults.empty()) { TVector columns; @@ -413,11 +387,6 @@ TMaybe RewriteInputForConstraint(const TExprBase& inputRow } for (const auto& x : missedKeyInput) { - auto atom = Build(ctx, pos) - .Value(x) - .Done(); - columns.emplace_back(atom); - auto columnType = table.GetColumnType(TString(x)); YQL_ENSURE(columnType); @@ -431,19 +400,29 @@ TMaybe RewriteInputForConstraint(const TExprBase& inputRow .Done()); } - for (const auto& x : mainPk) { - auto atom = Build(ctx, pos) - .Value(x) - .Done(); - columns.emplace_back(atom); + const THashSet indexKeyColumns = CreateKeyColumnSetToRead(indexes); + const THashSet indexDataColumns = CreateDataColumnSetToRead(indexes); + + for (const auto& x : indexKeyColumns) { + columns.push_back(Build(ctx, pos).Value(x).Done()); } - for(const auto& x: checkDefaults) { + for (const auto& x : indexDataColumns) { + // Handle the case of multiple indexes + // one of them has 'foo' as data column but for another one foo is just indexed column + if (indexKeyColumns.contains(x)) + continue; + columns.push_back(Build(ctx, pos).Value(x).Done()); + } + + for (const auto& x : mainPk) { + if (indexKeyColumns.contains(x)) + continue; columns.push_back(Build(ctx, pos).Value(x).Done()); } auto inPrecompute = PrecomputeCondenseInputResult(*condenseResult, pos, ctx); - auto precomputeTableLookupDict = PrecomputeTableLookupDict(inPrecompute, table, columns, pos, ctx, true); + precomputeTableLookupDict = PrecomputeTableLookupDict(inPrecompute, table, columns, pos, ctx, true); TVector keyLookupTuples; for (const auto& key : mainPk) { @@ -512,7 +491,15 @@ TMaybe RewriteInputForConstraint(const TExprBase& inputRow auto helper = CreateUpsertUniqBuildHelper(table, inputColumns, usedIndexes, pos, ctx); if (helper->GetChecksNum() == 0) { - return condenseResult; + // Return result of read stage only in case of uniq index + // We do not want to change plan for non uniq index for a while + if (hasUniqIndex) { + return std::make_pair> + (std::move(*condenseResult), std::move(precomputeTableLookupDict)); + } else { + return std::make_pair> + (std::move(*condenseResult), {}); + } } auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); @@ -590,11 +577,19 @@ TMaybe RewriteInputForConstraint(const TExprBase& inputRow stageInputs.insert(stageInputs.end(), uniquePrecomputes.begin(), uniquePrecomputes.end()); stageInputs.emplace_back(noExistingKeysPrecompute); - return TCondenseInputResult { + auto res = TCondenseInputResult { .Stream = body, .StageInputs = std::move(stageInputs), .StageArgs = std::move(stageArgs) }; + + if (hasUniqIndex) { + return std::make_pair>(std::move(res), + std::move(precomputeTableLookupDict)); + } else { + return std::make_pair>(std::move(res), + {}); + } } } // namespace @@ -633,9 +628,7 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, return {}; } - auto condenseInputResult = DeduplicateInput(checkedInput.GetRef(), table, ctx); - - auto inputRowsAndKeys = PrecomputeRowsAndKeys(condenseInputResult, table, pos, ctx); + auto condenseInputResult = DeduplicateInput(checkedInput->first, table, ctx); // For UPSERT check that indexes is not empty YQL_ENSURE(mode == TKqpPhyUpsertIndexMode::UpdateOn || indexes); @@ -643,14 +636,42 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, THashSet indexDataColumns = CreateDataColumnSetToRead(indexes); THashSet indexKeyColumns = CreateKeyColumnSetToRead(indexes); - auto lookupDict = PrecomputeTableLookupDict(inputRowsAndKeys.KeysPrecompute, table, indexDataColumns, indexKeyColumns, pos, ctx); - if (!lookupDict) { - return {}; + TMaybeNode lookupDict; + TMaybeNode rowsPrecompute; + + if (checkedInput->second) { + rowsPrecompute = PrecomputeCondenseInputResult(condenseInputResult, pos, ctx); + // In case of uniq index use main table read stage from checking uniq constraint + lookupDict = checkedInput->second; + } else { + auto inputRowsAndKeysStage = ExtractRowsAndKeysStage(condenseInputResult, table, pos, ctx); + rowsPrecompute = Build(ctx, pos) + .Connection() + .Output() + .Stage(inputRowsAndKeysStage) + .Index().Build("0") + .Build() + .Build() + .Done(); + + auto keysPrecompute = Build(ctx, pos) + .Connection() + .Output() + .Stage(inputRowsAndKeysStage) + .Index().Build("1") + .Build() + .Build() + .Done(); + + lookupDict = PrecomputeTableLookupDict(keysPrecompute, table, indexDataColumns, indexKeyColumns, pos, ctx); + if (!lookupDict) { + return {}; + } } TExprBase tableUpsertRows = (mode == TKqpPhyUpsertIndexMode::UpdateOn) - ? MakeNonexistingRowsFilter(inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(), pk, pos, ctx) - : inputRowsAndKeys.RowsPrecompute; + ? MakeNonexistingRowsFilter(rowsPrecompute.Cast(), lookupDict.Cast(), pk, pos, ctx) + : rowsPrecompute.Cast(); auto tableUpsert = Build(ctx, pos) .Table(BuildTableMeta(table, pos, ctx)) @@ -780,7 +801,7 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "recalc_dict_arg_" + indexDesc->Name)); auto reComputeDictStage = Build(ctx, pos) .Inputs() - .Add(inputRowsAndKeys.RowsPrecompute) // input rows + .Add(rowsPrecompute.Cast()) // input rows .Add(lookupDict.Cast()) // dict contains loockuped from table rows .Build() .Program() @@ -875,9 +896,9 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, if (needIndexTableUpdate) { auto upsertIndexRows = optUpsert - ? MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed, + ? MakeUpsertIndexRows(mode, rowsPrecompute.Cast(), lookupDictRecomputed, inputColumnsSet, indexTableColumns, table, pos, ctx, true) - : MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(), + : MakeUpsertIndexRows(mode, rowsPrecompute.Cast(), lookupDict.Cast(), inputColumnsSet, indexTableColumns, table, pos, ctx, false); auto indexUpsert = Build(ctx, pos) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 89d94c0adc83..3215bf2717be 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -923,7 +923,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); } - auto uniqExtraStages = uniq ? 6 : 0; + auto uniqExtraStages = uniq ? 5 : 0; { // Upsert - add new row const TString query2 = Q1_(R"( @@ -941,12 +941,14 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), uniqExtraStages + 5); // One read from main table - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).name(), "/Root/TestTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).reads().rows(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).reads().rows(), 0); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 2).table_access().size(), 0); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 3).table_access().size(), 0); @@ -988,9 +990,9 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), uniqExtraStages + 5); // One read from main table - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).name(), "/Root/TestTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).reads().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).reads().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 2).table_access().size(), 0); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 3).table_access().size(), 0);