Skip to content

Commit

Permalink
Continue to emit resolved timestamps after merge (ydb-platform#6594)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jul 12, 2024
1 parent 28e1d6d commit 967ced7
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
}

bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
return !SourceIdWriter.GetSourceIdsToWrite().empty();
return !SourceIdWriter.GetSourceIdsToWrite().empty()
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
}

void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class TSourceIdWriter {
return Registrations;
}

const THashSet<TString>& GetSourceIdsToDelete() const {
return Deregistrations;
}

template <typename... Args>
void RegisterSourceId(const TString& sourceId, Args&&... args) {
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3434,6 +3434,94 @@ Y_UNIT_TEST_SUITE(Cdc) {
MustNotLoseSchemaSnapshot(true);
}

Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
SetSplitMergePartCountLimit(&runtime, -1);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));

Cerr << "... prepare" << Endl;
{
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
});

auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2));
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
R"({"resolved":"***"})",
});
}

auto initialTabletIds = GetTableShards(server, edgeActor, "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(initialTabletIds.size(), 2);

std::vector<std::unique_ptr<IEventHandle>> blockedSplitRequests;
auto blockSplitRequests = runtime.AddObserver<TEvPersQueue::TEvRequest>([&](auto& ev) {
if (ev->Get()->Record.GetPartitionRequest().HasCmdSplitMessageGroup()) {
blockedSplitRequests.emplace_back(ev.Release());
}
});

Cerr << "... merge table" << Endl;
const auto mergeTxId = AsyncMergeTable(server, edgeActor, "/Root/Table", initialTabletIds);
WaitFor(runtime, [&]{ return blockedSplitRequests.size() == initialTabletIds.size(); }, "blocked split requests");
blockSplitRequests.Remove();

std::vector<std::unique_ptr<IEventHandle>> blockedRegisterRequests;
auto blockRegisterRequests = runtime.AddObserver<TEvPersQueue::TEvRequest>([&](auto& ev) {
if (ev->Get()->Record.GetPartitionRequest().HasCmdRegisterMessageGroup()) {
blockedRegisterRequests.emplace_back(ev.Release());
}
});

ui32 splitResponses = 0;
auto countSplitResponses = runtime.AddObserver<TEvPersQueue::TEvResponse>([&](auto& ev) {
++splitResponses;
});

Cerr << "... release split requests" << Endl;
for (auto& ev : std::exchange(blockedSplitRequests, {})) {
runtime.Send(ev.release(), 0, true);
WaitFor(runtime, [prev = splitResponses, &splitResponses]{ return splitResponses > prev; }, "split response");
}

Cerr << "... reboot pq tablet" << Endl;
RebootTablet(runtime, ResolvePqTablet(runtime, edgeActor, "/Root/Table/Stream", 0), edgeActor);
countSplitResponses.Remove();

Cerr << "... release register requests" << Endl;
blockRegisterRequests.Remove();
for (auto& ev : std::exchange(blockedRegisterRequests, {})) {
runtime.Send(ev.release(), 0, true);
}

Cerr << "... wait for merge tx notification" << Endl;
WaitTxNotification(server, edgeActor, mergeTxId);

Cerr << "... wait for final heartbeat" << Endl;
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
R"({"resolved":"***"})",
R"({"resolved":"***"})",
});
}

} // Cdc

} // NKikimr
Expand Down

0 comments on commit 967ced7

Please sign in to comment.