diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 1452a4295817..254ab91d0af8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -264,18 +264,19 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, subDomainInfo->EffectiveStoragePools(), shardInfo->BindedChannels); + const auto pathElement = Self->PathsById[pathId]; LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxStoreTableStats.PersistSingleStats: main stats from" << " datashardId(TabletID)=" << datashardId << " maps to shardIdx: " << shardIdx - << ", pathId: " << pathId << ", pathId map=" << Self->PathsById[pathId]->Name + << ", pathId: " << pathId << ", pathId map=" << pathElement->Name << ", is column=" << isColumnTable << ", is olap=" << isOlapStore); const TPartitionStats newStats = PrepareStats(ctx, rec, channelsMapping); LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Add stats from shard with datashardId(TabletID)=" << datashardId + "Add stats from shard with datashardId(TabletID)=" << datashardId << ", pathId " << pathId.LocalPathId - << ": RowCount " << newStats.RowCount + << ": RowCount " << newStats.RowCount << ", DataSize " << newStats.DataSize << (newStats.HasBorrowedData ? ", with borrowed parts" : "")); @@ -404,11 +405,14 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds()); } + const TTableIndexInfo* index = Self->Indexes.Value(pathElement->ParentPathId, nullptr).Get(); const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(pathId); const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings(); TVector shardsToMerge; - if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)) { + if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady) + && table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex) + ) { TTxId txId = Self->GetCachedTxId(ctx); if (!txId) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index c4b2763d8f70..7fe813667928 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -16,6 +16,7 @@ #include #include +#include #include @@ -1679,12 +1680,18 @@ namespace NSchemeShardUT_Private { *index.mutable_data_columns() = {cfg.DataColumns.begin(), cfg.DataColumns.end()}; switch (cfg.IndexType) { - case NKikimrSchemeOp::EIndexTypeGlobal: - *index.mutable_global_index() = Ydb::Table::GlobalIndex(); - break; - case NKikimrSchemeOp::EIndexTypeGlobalAsync: - *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex(); - break; + case NKikimrSchemeOp::EIndexTypeGlobal: { + auto& settings = *index.mutable_global_index()->mutable_settings(); + if (cfg.GlobalIndexSettings) { + cfg.GlobalIndexSettings[0].SerializeTo(settings); + } + } break; + case NKikimrSchemeOp::EIndexTypeGlobalAsync: { + auto& settings = *index.mutable_global_async_index()->mutable_settings(); + if (cfg.GlobalIndexSettings) { + cfg.GlobalIndexSettings[0].SerializeTo(settings); + } + } break; case NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree: { auto& settings = *index.mutable_global_vector_kmeans_tree_index(); settings = Ydb::Table::GlobalVectorKMeansTreeIndex(); @@ -2011,7 +2018,7 @@ namespace NSchemeShardUT_Private { Runtime.SendToPipe(shardData.ShardId, sender, proposal); TAutoPtr handle; auto event = Runtime.GrabEdgeEventIf(handle, - [=](const TEvDataShard::TEvProposeTransactionResult& event) { + [this, shardData](const TEvDataShard::TEvProposeTransactionResult& event) { return event.GetTxId() == TxId && event.GetOrigin() == shardData.ShardId; }); activeZone = true; diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 6b6b338ea79f..eb4feee35b56 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -62,6 +62,10 @@ template \ void N(NUnitTest::TTestContext&) +namespace NYdb::NTable { + struct TGlobalIndexSettings; +} + namespace NSchemeShardUT_Private { using namespace NKikimr; @@ -361,6 +365,7 @@ namespace NSchemeShardUT_Private { NKikimrSchemeOp::EIndexType IndexType = NKikimrSchemeOp::EIndexTypeGlobal; TVector IndexColumns; TVector DataColumns; + TVector GlobalIndexSettings = {}; }; std::unique_ptr CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ya.make b/ydb/core/tx/schemeshard/ut_helpers/ya.make index 353849a5f51a..5eb4bf14ca88 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ya.make +++ b/ydb/core/tx/schemeshard/ut_helpers/ya.make @@ -22,6 +22,7 @@ PEERDIR( ydb/public/lib/scheme_types ydb/library/yql/public/issue ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_table ) SRCS( diff --git a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp index e96f9a0ac11d..7337b0a8a71c 100644 --- a/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp +++ b/ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp @@ -5,6 +5,7 @@ #include #include #include +#include using namespace NKikimr; using namespace NSchemeShard; @@ -574,5 +575,5 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) { NLs::IndexState(NKikimrSchemeOp::EIndexStateReady), NLs::IndexKeys({"indexed"}), }); - } + } } diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp index 4512e02ac601..5cf2ef6cb5c1 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp @@ -2,11 +2,15 @@ #include #include #include +#include +#include #include #include #include +#include + using namespace NKikimr; using namespace NSchemeShard; using namespace NSchemeShardUT_Private; @@ -781,6 +785,155 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) { } + Y_UNIT_TEST(MergeIndexTableShardsOnlyWhenReady) { + TTestBasicRuntime runtime; + + TTestEnvOptions opts; + opts.EnableBackgroundCompaction(false); + opts.DisableStatsBatching(true); + TTestEnv env(runtime, opts); + + NDataShard::gDbStatsReportInterval = TDuration::Seconds(0); + + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + Ydb::Table::GlobalIndexSettings settings; + UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"( + partition_at_keys { + split_points { + type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } } + value { items { uint64_value: 10 } } + } + split_points { + type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } } + value { items { uint64_value: 20 } } + } + split_points { + type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } } + value { items { uint64_value: 30 } } + } + } + )", &settings)); + + TBlockEvents indexApplicationBlocker(runtime, [](const auto& ev) { + const auto& modifyScheme = ev->Get()->Record.GetTransaction(0); + return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpApplyIndexBuild; + }); + + ui64 indexInitializationTx = 0; + TWaitForFirstEvent indexInitializationWaiter(runtime, + [&indexInitializationTx](const auto& ev){ + const auto& record = ev->Get()->Record; + if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild) { + indexInitializationTx = record.GetTxId(); + return true; + } + return false; + } + ); + + const ui64 buildIndexTx = ++txId; + TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{ + "ByValue", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {}, + { NYdb::NTable::TGlobalIndexSettings::FromProto(settings) } + }); + + indexInitializationWaiter.Wait(); + UNIT_ASSERT_VALUES_UNEQUAL(indexInitializationTx, 0); + env.TestWaitNotification(runtime, indexInitializationTx); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), { + NLs::PathExist, + NLs::IndexState(NKikimrSchemeOp::EIndexStateWriteOnly) + }); + + TVector indexShards; + auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess); + const auto& partitions = record.GetPathDescription().GetTablePartitions(); + indexShards.clear(); + indexShards.reserve(partitions.size()); + for (const auto& partition : partitions) { + indexShards.emplace_back(partition.GetDatashardId()); + } + }; + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), { + NLs::PathExist, + NLs::PartitionCount(4), + shardCollector + }); + UNIT_ASSERT_VALUES_EQUAL(indexShards.size(), 4); + + { + // make sure no shards are merged + TBlockEvents mergeBlocker(runtime, [](const auto& ev) { + const auto& modifyScheme = ev->Get()->Record.GetTransaction(0); + return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions; + }); + + { + // wait for all index shards to send statistics + THashSet shardsWithStats; + using TEvType = TEvDataShard::TEvPeriodicTableStats; + auto statsObserver = runtime.AddObserver([&shardsWithStats](const TEvType::TPtr& ev) { + shardsWithStats.emplace(ev->Get()->Record.GetDatashardId()); + }); + + runtime.WaitFor("all index shards to send statistics", [&]{ + return AllOf(indexShards, [&shardsWithStats](ui64 indexShard) { + return shardsWithStats.contains(indexShard); + }); + }); + } + + // we expect to not have observed any attempts to merge + UNIT_ASSERT(mergeBlocker.empty()); + + // wait for 1 minute to ensure that no merges have been started by SchemeShard + env.SimulateSleep(runtime, TDuration::Minutes(1)); + UNIT_ASSERT(mergeBlocker.empty()); + } + + // splits are allowed even if the index is not ready + TestSplitTable(runtime, ++txId, "/MyRoot/Table/ByValue/indexImplTable", Sprintf(R"( + SourceTabletId: %lu + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5 } } } } + )", + indexShards.front() + ) + ); + env.TestWaitNotification(runtime, txId); + + indexApplicationBlocker.Stop().Unblock(); + env.TestWaitNotification(runtime, buildIndexTx); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), { + NLs::IndexState(NKikimrSchemeOp::EIndexStateReady) + }); + + // wait until all index impl table shards are merged into one + while (true) { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), { + shardCollector + }); + if (indexShards.size() > 1) { + // If a merge happens, old shards are deleted and replaced with a new one. + // That is why we need to wait for * all * the shards to be deleted. + env.TestWaitTabletDeletion(runtime, indexShards); + } else { + break; + } + } + } + Y_UNIT_TEST(DropIndex) { TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp b/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp index 2ea65bb9caea..947da178f4b4 100644 --- a/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp +++ b/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -277,6 +278,69 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) { // test requires more txids than cached at start } + Y_UNIT_TEST(MergeIndexTableShards) { + TTestBasicRuntime runtime; + + TTestEnvOptions opts; + opts.EnableBackgroundCompaction(false); + TTestEnv env(runtime, opts); + + ui64 txId = 100; + + TBlockEvents statsBlocker(runtime); + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ByValue" + KeyColumnNames: ["value"] + IndexImplTableDescriptions { + SplitBoundary { KeyPrefix { Tuple { Optional { Text: "A" } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Text: "B" } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Text: "C" } } } } + } + } + )" + ); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), + { NLs::PartitionCount(4) } + ); + + statsBlocker.Stop().Unblock(); + + TVector indexShards; + auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess); + const auto& partitions = record.GetPathDescription().GetTablePartitions(); + indexShards.clear(); + indexShards.reserve(partitions.size()); + for (const auto& partition : partitions) { + indexShards.emplace_back(partition.GetDatashardId()); + } + }; + + // wait until all index impl table shards are merged into one + while (true) { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), { + shardCollector + }); + if (indexShards.size() > 1) { + // If a merge happens, old shards are deleted and replaced with a new one. + // That is why we need to wait for * all * the shards to be deleted. + env.TestWaitTabletDeletion(runtime, indexShards); + } else { + break; + } + } + } + Y_UNIT_TEST(AutoMergeInOne) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {