Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable merges for indexImplTables partitions when build is in progress #10166

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,19 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
subDomainInfo->EffectiveStoragePools(),
shardInfo->BindedChannels);

const auto pathElement = Self->PathsById[pathId];
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxStoreTableStats.PersistSingleStats: main stats from"
<< " datashardId(TabletID)=" << datashardId << " maps to shardIdx: " << shardIdx
<< ", pathId: " << pathId << ", pathId map=" << Self->PathsById[pathId]->Name
<< ", pathId: " << pathId << ", pathId map=" << pathElement->Name
<< ", is column=" << isColumnTable << ", is olap=" << isOlapStore);

const TPartitionStats newStats = PrepareStats(ctx, rec, channelsMapping);

LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Add stats from shard with datashardId(TabletID)=" << datashardId
"Add stats from shard with datashardId(TabletID)=" << datashardId
<< ", pathId " << pathId.LocalPathId
<< ": RowCount " << newStats.RowCount
<< ": RowCount " << newStats.RowCount
<< ", DataSize " << newStats.DataSize
<< (newStats.HasBorrowedData ? ", with borrowed parts" : ""));

Expand Down Expand Up @@ -404,11 +405,14 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds());
}

const TTableIndexInfo* index = Self->Indexes.Value(pathElement->ParentPathId, nullptr).Get();
const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(pathId);

const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
TVector<TShardIdx> shardsToMerge;
if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)) {
if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady)
&& table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)
) {
TTxId txId = Self->GetCachedTxId(ctx);

if (!txId) {
Expand Down
21 changes: 14 additions & 7 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <ydb/core/util/pb.h>
#include <ydb/public/api/protos/ydb_export.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

#include <library/cpp/testing/unittest/registar.h>

Expand Down Expand Up @@ -1679,12 +1680,18 @@ namespace NSchemeShardUT_Private {
*index.mutable_data_columns() = {cfg.DataColumns.begin(), cfg.DataColumns.end()};

switch (cfg.IndexType) {
case NKikimrSchemeOp::EIndexTypeGlobal:
*index.mutable_global_index() = Ydb::Table::GlobalIndex();
break;
case NKikimrSchemeOp::EIndexTypeGlobalAsync:
*index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex();
break;
case NKikimrSchemeOp::EIndexTypeGlobal: {
auto& settings = *index.mutable_global_index()->mutable_settings();
if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(settings);
}
} break;
case NKikimrSchemeOp::EIndexTypeGlobalAsync: {
auto& settings = *index.mutable_global_async_index()->mutable_settings();
if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(settings);
}
} break;
case NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree: {
auto& settings = *index.mutable_global_vector_kmeans_tree_index();
settings = Ydb::Table::GlobalVectorKMeansTreeIndex();
Expand Down Expand Up @@ -2011,7 +2018,7 @@ namespace NSchemeShardUT_Private {
Runtime.SendToPipe(shardData.ShardId, sender, proposal);
TAutoPtr<IEventHandle> handle;
auto event = Runtime.GrabEdgeEventIf<TEvDataShard::TEvProposeTransactionResult>(handle,
[=](const TEvDataShard::TEvProposeTransactionResult& event) {
[this, shardData](const TEvDataShard::TEvProposeTransactionResult& event) {
return event.GetTxId() == TxId && event.GetOrigin() == shardData.ShardId;
});
activeZone = true;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
template<bool OPT1, bool OPT2> \
void N(NUnitTest::TTestContext&)

namespace NYdb::NTable {
struct TGlobalIndexSettings;
}

namespace NSchemeShardUT_Private {
using namespace NKikimr;

Expand Down Expand Up @@ -361,6 +365,7 @@ namespace NSchemeShardUT_Private {
NKikimrSchemeOp::EIndexType IndexType = NKikimrSchemeOp::EIndexTypeGlobal;
TVector<TString> IndexColumns;
TVector<TString> DataColumns;
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, evaluate the following design choice.

I have decided to use a structure defined in the public API to describe a TBuildIndexConfig in SchemeShard's unit tests.

Pros:

  • We don't have to repeat the same code in the unit test helpers.

Cons:

  • Using public API for unit test helpers might limit us greatly. We would have to change the public description of an index every time we decide to test some new functionality in the SchemeShard tests.
    • Counterargument: ut helpers already use a protobuf from the public API to describe an index to be created. It is only natural to use the SDK to work conveniently with the public protobuf. The dependency on the public API and the limitations that come with it would not be lifted, if we didn't use SDK here.

};

std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PEERDIR(
ydb/public/lib/scheme_types
ydb/library/yql/public/issue
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_table
)

SRCS(
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

using namespace NKikimr;
using namespace NSchemeShard;
Expand Down Expand Up @@ -574,5 +575,5 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) {
NLs::IndexState(NKikimrSchemeOp::EIndexStateReady),
NLs::IndexKeys({"indexed"}),
});
}
}
}
153 changes: 153 additions & 0 deletions ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/testlib/actors/wait_events.h>
#include <ydb/core/testlib/tablet_helpers.h>

#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/metering/metering.h>

#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;
Expand Down Expand Up @@ -781,6 +785,155 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {

}

Y_UNIT_TEST(MergeIndexTableShardsOnlyWhenReady) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
opts.DisableStatsBatching(true);
TTestEnv env(runtime, opts);

NDataShard::gDbStatsReportInterval = TDuration::Seconds(0);

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

Ydb::Table::GlobalIndexSettings settings;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
partition_at_keys {
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 10 } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 20 } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UINT64 } } } } }
value { items { uint64_value: 30 } }
}
}
)", &settings));

TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> indexApplicationBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpApplyIndexBuild;
});

ui64 indexInitializationTx = 0;
TWaitForFirstEvent<TEvSchemeShard::TEvModifySchemeTransaction> indexInitializationWaiter(runtime,
[&indexInitializationTx](const auto& ev){
const auto& record = ev->Get()->Record;
if (record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild) {
indexInitializationTx = record.GetTxId();
return true;
}
return false;
}
);

const ui64 buildIndexTx = ++txId;
TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{
"ByValue", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {},
{ NYdb::NTable::TGlobalIndexSettings::FromProto(settings) }
});

indexInitializationWaiter.Wait();
UNIT_ASSERT_VALUES_UNEQUAL(indexInitializationTx, 0);
env.TestWaitNotification(runtime, indexInitializationTx);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
NLs::PathExist,
NLs::IndexState(NKikimrSchemeOp::EIndexStateWriteOnly)
});

TVector<ui64> indexShards;
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& partitions = record.GetPathDescription().GetTablePartitions();
indexShards.clear();
indexShards.reserve(partitions.size());
for (const auto& partition : partitions) {
indexShards.emplace_back(partition.GetDatashardId());
}
};
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
NLs::PathExist,
NLs::PartitionCount(4),
shardCollector
});
UNIT_ASSERT_VALUES_EQUAL(indexShards.size(), 4);

{
// make sure no shards are merged
TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> mergeBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions;
});

{
// wait for all index shards to send statistics
THashSet<ui64> shardsWithStats;
using TEvType = TEvDataShard::TEvPeriodicTableStats;
auto statsObserver = runtime.AddObserver<TEvType>([&shardsWithStats](const TEvType::TPtr& ev) {
shardsWithStats.emplace(ev->Get()->Record.GetDatashardId());
});

runtime.WaitFor("all index shards to send statistics", [&]{
return AllOf(indexShards, [&shardsWithStats](ui64 indexShard) {
return shardsWithStats.contains(indexShard);
});
});
}

// we expect to not have observed any attempts to merge
UNIT_ASSERT(mergeBlocker.empty());

// wait for 1 minute to ensure that no merges have been started by SchemeShard
env.SimulateSleep(runtime, TDuration::Minutes(1));
UNIT_ASSERT(mergeBlocker.empty());
}

// splits are allowed even if the index is not ready
TestSplitTable(runtime, ++txId, "/MyRoot/Table/ByValue/indexImplTable", Sprintf(R"(
SourceTabletId: %lu
SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5 } } } }
)",
indexShards.front()
)
);
env.TestWaitNotification(runtime, txId);

indexApplicationBlocker.Stop().Unblock();
env.TestWaitNotification(runtime, buildIndexTx);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue"), {
NLs::IndexState(NKikimrSchemeOp::EIndexStateReady)
});

// wait until all index impl table shards are merged into one
while (true) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
shardCollector
});
if (indexShards.size() > 1) {
// If a merge happens, old shards are deleted and replaced with a new one.
// That is why we need to wait for * all * the shards to be deleted.
env.TestWaitTabletDeletion(runtime, indexShards);
} else {
break;
}
}
}

Y_UNIT_TEST(DropIndex) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
64 changes: 64 additions & 0 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tablet_flat/util_fmt_cell.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>

Expand Down Expand Up @@ -277,6 +278,69 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
// test requires more txids than cached at start
}

Y_UNIT_TEST(MergeIndexTableShards) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
TTestEnv env(runtime, opts);

ui64 txId = 100;

TBlockEvents<TEvDataShard::TEvPeriodicTableStats> statsBlocker(runtime);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
}
IndexDescription {
Name: "ByValue"
KeyColumnNames: ["value"]
IndexImplTableDescriptions {
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "A" } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "B" } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "C" } } } }
}
}
)"
);
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true),
{ NLs::PartitionCount(4) }
);

statsBlocker.Stop().Unblock();

TVector<ui64> indexShards;
auto shardCollector = [&indexShards](const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& partitions = record.GetPathDescription().GetTablePartitions();
indexShards.clear();
indexShards.reserve(partitions.size());
for (const auto& partition : partitions) {
indexShards.emplace_back(partition.GetDatashardId());
}
};

// wait until all index impl table shards are merged into one
while (true) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/ByValue/indexImplTable", true), {
shardCollector
});
if (indexShards.size() > 1) {
// If a merge happens, old shards are deleted and replaced with a new one.
// That is why we need to wait for * all * the shards to be deleted.
env.TestWaitTabletDeletion(runtime, indexShards);
} else {
break;
}
}
}

Y_UNIT_TEST(AutoMergeInOne) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
Expand Down
Loading