Skip to content

Commit

Permalink
Disable merges for indexImplTables partitions when build is in progre…
Browse files Browse the repository at this point in the history
…ss (#10166)
  • Loading branch information
jepett0 authored Oct 11, 2024
1 parent 0da7943 commit 5d66195
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 12 deletions.
12 changes: 8 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,19 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
subDomainInfo->EffectiveStoragePools(),
shardInfo->BindedChannels);

const auto pathElement = Self->PathsById[pathId];
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxStoreTableStats.PersistSingleStats: main stats from"
<< " datashardId(TabletID)=" << datashardId << " maps to shardIdx: " << shardIdx
<< ", pathId: " << pathId << ", pathId map=" << Self->PathsById[pathId]->Name
<< ", pathId: " << pathId << ", pathId map=" << pathElement->Name
<< ", is column=" << isColumnTable << ", is olap=" << isOlapStore);

const TPartitionStats newStats = PrepareStats(ctx, rec, channelsMapping);

LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Add stats from shard with datashardId(TabletID)=" << datashardId
"Add stats from shard with datashardId(TabletID)=" << datashardId
<< ", pathId " << pathId.LocalPathId
<< ": RowCount " << newStats.RowCount
<< ": RowCount " << newStats.RowCount
<< ", DataSize " << newStats.DataSize
<< (newStats.HasBorrowedData ? ", with borrowed parts" : ""));

Expand Down Expand Up @@ -404,11 +405,14 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds());
}

const TTableIndexInfo* index = Self->Indexes.Value(pathElement->ParentPathId, nullptr).Get();
const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(pathId);

const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
TVector<TShardIdx> shardsToMerge;
if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)) {
if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady)
&& table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)
) {
TTxId txId = Self->GetCachedTxId(ctx);

if (!txId) {
Expand Down
21 changes: 14 additions & 7 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <ydb/core/util/pb.h>
#include <ydb/public/api/protos/ydb_export.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

#include <library/cpp/testing/unittest/registar.h>

Expand Down Expand Up @@ -1679,12 +1680,18 @@ namespace NSchemeShardUT_Private {
*index.mutable_data_columns() = {cfg.DataColumns.begin(), cfg.DataColumns.end()};

switch (cfg.IndexType) {
case NKikimrSchemeOp::EIndexTypeGlobal:
*index.mutable_global_index() = Ydb::Table::GlobalIndex();
break;
case NKikimrSchemeOp::EIndexTypeGlobalAsync:
*index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex();
break;
case NKikimrSchemeOp::EIndexTypeGlobal: {
auto& settings = *index.mutable_global_index()->mutable_settings();
if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(settings);
}
} break;
case NKikimrSchemeOp::EIndexTypeGlobalAsync: {
auto& settings = *index.mutable_global_async_index()->mutable_settings();
if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(settings);
}
} break;
case NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree: {
auto& settings = *index.mutable_global_vector_kmeans_tree_index();
settings = Ydb::Table::GlobalVectorKMeansTreeIndex();
Expand Down Expand Up @@ -2011,7 +2018,7 @@ namespace NSchemeShardUT_Private {
Runtime.SendToPipe(shardData.ShardId, sender, proposal);
TAutoPtr<IEventHandle> handle;
auto event = Runtime.GrabEdgeEventIf<TEvDataShard::TEvProposeTransactionResult>(handle,
[=](const TEvDataShard::TEvProposeTransactionResult& event) {
[this, shardData](const TEvDataShard::TEvProposeTransactionResult& event) {
return event.GetTxId() == TxId && event.GetOrigin() == shardData.ShardId;
});
activeZone = true;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
template<bool OPT1, bool OPT2> \
void N(NUnitTest::TTestContext&)

namespace NYdb::NTable {
struct TGlobalIndexSettings;
}

namespace NSchemeShardUT_Private {
using namespace NKikimr;

Expand Down Expand Up @@ -361,6 +365,7 @@ namespace NSchemeShardUT_Private {
NKikimrSchemeOp::EIndexType IndexType = NKikimrSchemeOp::EIndexTypeGlobal;
TVector<TString> IndexColumns;
TVector<TString> DataColumns;
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
};

std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PEERDIR(
ydb/public/lib/scheme_types
ydb/library/yql/public/issue
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_table
)

SRCS(
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

using namespace NKikimr;
using namespace NSchemeShard;
Expand Down Expand Up @@ -574,5 +575,5 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) {
NLs::IndexState(NKikimrSchemeOp::EIndexStateReady),
NLs::IndexKeys({"indexed"}),
});
}
}
}
153 changes: 153 additions & 0 deletions ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/testlib/actors/wait_events.h>
#include <ydb/core/testlib/tablet_helpers.h>

#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/metering/metering.h>

#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;
Expand Down Expand Up @@ -781,6 +785,155 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {

}

Y_UNIT_TEST(MergeIndexTableShardsOnlyWhenReady) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
opts.DisableStatsBatching(true);
TTestEnv env(runtime, opts);

NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

Ydb::Table::GlobalIndexSettings settings;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
partition_at_keys {
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 10 } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 20 } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 30 } }
}
}
)", &settings));

TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> indexApplicationBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpApplyIndexBuild;
});

ui64 indexInitializationTx = 0;
TWaitForFirstEvent<TEvSchemeShard::TEvModifySchemeTransaction> indexInitializationWaiter(runtime,
[&indexInitializationTx](const auto& ev){
const auto& record = ev->Get()->Record;
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild) {
indexInitializationTx = record.GetTxId();
return true;
}
return false;
}
);

const ui64 buildIndexTx = ++txId;
TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{
"ByValue", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {},
{ NYdb::NTable::TGlobalIndexSettings::FromProto(settings) }
});

indexInitializationWaiter.Wait();
UNIT_ASSERT_VALUES_UNEQUAL(indexInitializationTx, 0);
env.TestWaitNotification(runtime, indexInitializationTx);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
NLs::PathExist,
NLs::IndexState(NKikimrSchemeOp::EIndexStateWriteOnly)
});

TVector<ui64> indexShards;
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& partitions = record.GetPathDescription().GetTablePartitions();
indexShards.clear();
indexShards.reserve(partitions.size());
for (const auto& partition : partitions) {
indexShards.emplace_back(partition.GetDatashardId());
}
};
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
NLs::PathExist,
NLs::PartitionCount(4),
shardCollector
});
UNIT_ASSERT_VALUES_EQUAL(indexShards.size(), 4);

{
// make sure no shards are merged
TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> mergeBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions;
});

{
// wait for all index shards to send statistics
THashSet<ui64> shardsWithStats;
using TEvType = TEvDataShard::TEvPeriodicTableStats;
auto statsObserver = runtime.AddObserver<TEvType>([&shardsWithStats](const TEvType::TPtr& ev) {
shardsWithStats.emplace(ev->Get()->Record.GetDatashardId());
});

runtime.WaitFor("all index shards to send statistics", [&]{
return AllOf(indexShards, [&shardsWithStats](ui64 indexShard) {
return shardsWithStats.contains(indexShard);
});
});
}

// we expect to not have observed any attempts to merge
UNIT_ASSERT(mergeBlocker.empty());

// wait for 1 minute to ensure that no merges have been started by SchemeShard
env.SimulateSleep(runtime, TDuration::Minutes(1));
UNIT_ASSERT(mergeBlocker.empty());
}

// splits are allowed even if the index is not ready
TestSplitTable(runtime, ++txId, "/MyRoot/Table/ByValue/indexImplTable", Sprintf(R"(
SourceTabletId: %lu
SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5 } } } }
)",
indexShards.front()
)
);
env.TestWaitNotification(runtime, txId);

indexApplicationBlocker.Stop().Unblock();
env.TestWaitNotification(runtime, buildIndexTx);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
NLs::IndexState(NKikimrSchemeOp::EIndexStateReady)
});

// wait until all index impl table shards are merged into one
while (true) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
shardCollector
});
if (indexShards.size() > 1) {
// If a merge happens, old shards are deleted and replaced with a new one.
// That is why we need to wait for * all * the shards to be deleted.
env.TestWaitTabletDeletion(runtime, indexShards);
} else {
break;
}
}
}

Y_UNIT_TEST(DropIndex) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
64 changes: 64 additions & 0 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tablet_flat/util_fmt_cell.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>

Expand Down Expand Up @@ -277,6 +278,69 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
// test requires more txids than cached at start
}

Y_UNIT_TEST(MergeIndexTableShards) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
TTestEnv env(runtime, opts);

ui64 txId = 100;

TBlockEvents<TEvDataShard::TEvPeriodicTableStats> statsBlocker(runtime);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
}
IndexDescription {
Name: "ByValue"
KeyColumnNames: ["value"]
IndexImplTableDescriptions {
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "A" } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "B" } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "C" } } } }
}
}
)"
);
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true),
{ NLs::PartitionCount(4) }
);

statsBlocker.Stop().Unblock();

TVector<ui64> indexShards;
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& partitions = record.GetPathDescription().GetTablePartitions();
indexShards.clear();
indexShards.reserve(partitions.size());
for (const auto& partition : partitions) {
indexShards.emplace_back(partition.GetDatashardId());
}
};

// wait until all index impl table shards are merged into one
while (true) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
shardCollector
});
if (indexShards.size() > 1) {
// If a merge happens, old shards are deleted and replaced with a new one.
// That is why we need to wait for * all * the shards to be deleted.
env.TestWaitTabletDeletion(runtime, indexShards);
} else {
break;
}
}
}

Y_UNIT_TEST(AutoMergeInOne) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
Expand Down

0 comments on commit 5d66195

Please sign in to comment.