From fee0240099a16c1c8bcade2bd96accc91f66f83a Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 12 Jul 2024 12:28:59 +0300 Subject: [PATCH] Continue to emit resolved timestamps after merge (#6594) --- .../persqueue/partition_sourcemanager.cpp | 3 +- ydb/core/persqueue/sourceid.h | 4 + .../datashard_ut_change_exchange.cpp | 88 +++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index f81b3a7da3b2..c9214300384a 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() { } bool TPartitionSourceManager::TModificationBatch::HasModifications() const { - return !SourceIdWriter.GetSourceIdsToWrite().empty(); + return !SourceIdWriter.GetSourceIdsToWrite().empty() + || !SourceIdWriter.GetSourceIdsToDelete().empty(); } void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) { diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 992e1271c847..e20a97db6acd 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -85,6 +85,10 @@ class TSourceIdWriter { return Registrations; } + const THashSet& GetSourceIdsToDelete() const { + return Deregistrations; + } + template void RegisterSourceId(const TString& sourceId, Args&&... args) { Registrations[sourceId] = TSourceIdInfo(std::forward(args)...); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 31e6d1b7494a..9d16bd2587d1 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3600,6 +3600,94 @@ Y_UNIT_TEST_SUITE(Cdc) { MustNotLoseSchemaSnapshot(true); } + Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + SetSplitMergePartCountLimit(&runtime, -1); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + Cerr << "... prepare" << Endl; + { + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + }); + + auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2)); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); + } + + auto initialTabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(initialTabletIds.size(), 2); + + std::vector> blockedSplitRequests; + auto blockSplitRequests = runtime.AddObserver([&](auto& ev) { + if (ev->Get()->Record.GetPartitionRequest().HasCmdSplitMessageGroup()) { + blockedSplitRequests.emplace_back(ev.Release()); + } + }); + + Cerr << "... merge table" << Endl; + const auto mergeTxId = AsyncMergeTable(server, edgeActor, "/Root/Table", initialTabletIds); + WaitFor(runtime, [&]{ return blockedSplitRequests.size() == initialTabletIds.size(); }, "blocked split requests"); + blockSplitRequests.Remove(); + + std::vector> blockedRegisterRequests; + auto blockRegisterRequests = runtime.AddObserver([&](auto& ev) { + if (ev->Get()->Record.GetPartitionRequest().HasCmdRegisterMessageGroup()) { + blockedRegisterRequests.emplace_back(ev.Release()); + } + }); + + ui32 splitResponses = 0; + auto countSplitResponses = runtime.AddObserver([&](auto& ev) { + ++splitResponses; + }); + + Cerr << "... release split requests" << Endl; + for (auto& ev : std::exchange(blockedSplitRequests, {})) { + runtime.Send(ev.release(), 0, true); + WaitFor(runtime, [prev = splitResponses, &splitResponses]{ return splitResponses > prev; }, "split response"); + } + + Cerr << "... reboot pq tablet" << Endl; + RebootTablet(runtime, ResolvePqTablet(runtime, edgeActor, "/Root/Table/Stream", 0), edgeActor); + countSplitResponses.Remove(); + + Cerr << "... release register requests" << Endl; + blockRegisterRequests.Remove(); + for (auto& ev : std::exchange(blockedRegisterRequests, {})) { + runtime.Send(ev.release(), 0, true); + } + + Cerr << "... wait for merge tx notification" << Endl; + WaitTxNotification(server, edgeActor, mergeTxId); + + Cerr << "... wait for final heartbeat" << Endl; + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); + } + } // Cdc } // NKikimr