Skip to content

Commit

Permalink
Transfer scheme history to new partitions (#9959)
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm authored Nov 5, 2024
1 parent 3283fdc commit 2707851
Show file tree
Hide file tree
Showing 33 changed files with 492 additions and 177 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-S
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.ChangeSchemaAndSplit
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
Expand Down
77 changes: 69 additions & 8 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {

void Execute() {
TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 24, 4);

Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

{
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
Expand Down Expand Up @@ -403,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
}

Y_UNIT_TEST(TableReshardingModuloN) {
TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
TShardingTypeTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute();
}

class TAsyncReshardingTest: public TReshardingTest {
Expand Down Expand Up @@ -435,11 +432,36 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
TReshardingTest::CheckCount(NumRows);
}

void AddManyColumns() {
auto alterQuery = TStringBuilder() << "ALTER TABLESTORE `/Root/olapStore` ";
for (int i = 0; i < 10000; i++) {
alterQuery << " ADD COLUMN col_" << i << " Int8";
if (i < 10000 - 1) {
alterQuery << ", ";
}
}

auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

void RestartAllShards() {
for (i64 id : CSController->GetShardActualIds()) {
Kikimr.GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(new TEvents::TEvPoisonPill(), id, false));
}
}

void ChangeSchema() {
auto alterQuery =
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=level, "
"`SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, "
"`COMPRESSION.TYPE`=`zstd`);";
const char* alterQuery;
if (HasNewCol) {
alterQuery = "ALTER TABLESTORE `/Root/olapStore` DROP COLUMN new_col";
} else {
alterQuery = "ALTER TABLESTORE `/Root/olapStore` ADD COLUMN new_col Int8";
}
HasNewCol = !HasNewCol;

auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();

Expand All @@ -454,6 +476,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
ui64 LastPathId = 1000000;
ui64 LastTs = 300000000;
ui64 NumRows = 0;
ui64 HasNewCol = false;
};

Y_UNIT_TEST(UpsertWhileSplitTest) {
Expand Down Expand Up @@ -498,6 +521,44 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
tester.StartResharding("SPLIT");
tester.WaitResharding();

tester.RestartAllShards();

tester.CheckCount();
}

Y_UNIT_TEST(MultipleSchemaVersions) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

for (int i = 0; i < 3; i++) {
tester.AddBatch(1);
tester.ChangeSchema();
}

tester.StartResharding("SPLIT");
tester.WaitResharding();

tester.RestartAllShards();

tester.CheckCount();
}

Y_UNIT_TEST(HugeSchemeHistory) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddManyColumns();

for (int i = 0; i < 100; i++) {
tester.AddBatch(1);
tester.ChangeSchema();
}

tester.StartResharding("SPLIT");
tester.WaitResharding();

tester.RestartAllShards();

tester.CheckCount();
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,5 @@ enum ETxTypes {
TXTYPE_GC_START = 34 [(TxTypeOpts) = {Name: "TxGarbageCollectionStart"}];
TXTYPE_APPLY_NORMALIZER = 35 [(TxTypeOpts) = {Name: "TxApplyNormalizer"}];
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
}
Loading

0 comments on commit 2707851

Please sign in to comment.