From 04935a5ec551864b59bce17d004da5e6198eb96a Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 11 Jul 2024 19:56:30 +0300 Subject: [PATCH] Fix bug with broadcasting records --- .../change_sender_common_ops.h | 2 +- .../ut_cdc_stream_reboots.cpp | 168 +++++++++++------- 2 files changed, 109 insertions(+), 61 deletions(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 8c9f45a1c698..823b208f5323 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -336,7 +336,7 @@ class TBaseChangeSender { Y_ABORT_UNLESS(it != Broadcasting.end()); auto& broadcast = it->second; - if (broadcast.Partitions.contains(partitionId)) { + if (broadcast.CompletedPartitions.contains(partitionId)) { return false; } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 42ecc9f6397d..7444ec4dac15 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -556,68 +556,77 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState, + const google::protobuf::RepeatedPtrField& tablePartitions, + const google::protobuf::RepeatedPtrField& topicPartitions) + { + for (const auto& topicPartition : topicPartitions) { + auto request = MakeHolder(); + { + auto& record = *request->Record.MutablePartitionRequest(); + record.SetPartition(topicPartition.GetPartitionId()); + auto& cmd = *record.MutableCmdGetMaxSeqNo(); + for (const auto& tablePartition : tablePartitions) { + cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); + } + } + + const auto& sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release()); + + auto response = runtime.GrabEdgeEvent(sender); + { + const auto& record = response->Get()->Record.GetPartitionResponse(); + const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); + + UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size()); + for (const auto& item: result) { + if (item.GetState() != expectedState) { + return false; + } + } + } + } + + return true; + } + struct TItem { TString Path; - ui32 nPartitions; + ui32 ExpectedPartitionCount; }; - void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) { + void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic, + const google::protobuf::RepeatedPtrField* initialTablePartitions = nullptr) + { auto tableDesc = DescribePath(runtime, table.Path, true, true); const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); - UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions); + UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount); auto topicDesc = DescribePrivatePath(runtime, topic.Path); const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions(); - UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions); + UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount); while (true) { runtime.SimulateSleep(TDuration::Seconds(1)); - bool done = true; - - for (ui32 i = 0; i < topic.nPartitions; ++i) { - auto request = MakeHolder(); - { - auto& record = *request->Record.MutablePartitionRequest(); - record.SetPartition(topicPartitions[i].GetPartitionId()); - auto& cmd = *record.MutableCmdGetMaxSeqNo(); - for (const auto& tablePartition : tablePartitions) { - cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); - } - } - - const auto& sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release()); - - auto response = runtime.GrabEdgeEvent(sender); - { - const auto& record = response->Get()->Record.GetPartitionResponse(); - const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); - - UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions); - for (const auto& item: result) { - done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED; - if (!done) { - break; - } - } - } - - if (!done) { - break; - } - } - - if (done) { + if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) { break; } } + + if (initialTablePartitions) { + UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions)); + } } - Y_UNIT_TEST_WITH_REBOOTS(SplitTable) { + template + void SplitTable(const TString& cdcStreamDesc) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + NKikimrScheme::TEvDescribeSchemeResult initialTableDesc; { TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -625,15 +634,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { KeyColumnNames: ["key"] )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -651,16 +654,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max())}, {TCell::Make(Max())}); - CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); + CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}, + &initialTableDesc.GetPathDescription().GetTablePartitions()); } }); } - Y_UNIT_TEST_WITH_REBOOTS(MergeTable) { + Y_UNIT_TEST_WITH_REBOOTS(SplitTable) { + SplitTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + } + + Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) { + SplitTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + } + + template + void MergeTable(const TString& cdcStreamDesc) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + NKikimrScheme::TEvDescribeSchemeResult initialTableDesc; { TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -674,15 +704,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -696,11 +720,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max())}, {TCell::Make(Max())}); - CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}); + CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}, + &initialTableDesc.GetPathDescription().GetTablePartitions()); } }); } + Y_UNIT_TEST_WITH_REBOOTS(MergeTable) { + MergeTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + } + + Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) { + MergeTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + } + Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {