diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index 89cfb4e5af30..3d189ee6b4d1 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -197,7 +197,7 @@ void TBaseChangeSender::SendRecords() { EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) { if (Senders.contains(partitionId)) { auto& sender = Senders.at(partitionId); - sender.Prepared.push_back(std::move(it->second)); + sender.Prepared.push_back(it->second); if (!sender.ActorId) { Y_ABORT_UNLESS(!sender.Ready); registrations.insert(partitionId); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 0be548fc63df..da7f5f397cb2 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -3147,6 +3148,45 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampsMultiplePartitions) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions().Shards(2)); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + TVector>> records(2); // partition to records + while (true) { + for (ui32 i = 0; i < records.size(); ++i) { + records[i] = GetRecords(*server->GetRuntime(), edgeActor, "/Root/Table/Stream", i); + } + + if (AllOf(records, [](const auto& x) { return !x.empty(); })) { + break; + } + + SimulateSleep(server, TDuration::Seconds(1)); + } + + UNIT_ASSERT(records.size() > 1); + UNIT_ASSERT(!records[0].empty()); + AssertJsonsEqual(records[0][0].second, R"({"resolved":"***"})"); + + for (ui32 i = 1; i < records.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(records[i][0].second, records[0][0].second); + } + } + Y_UNIT_TEST(InitialScanAndResolvedTimestamps) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())