Skip to content

Commit

Permalink
Fix bug in records broadcasting (#5513)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jul 12, 2024
1 parent ebb5270 commit 651efd4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ydb/core/change_exchange/change_sender_common_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void TBaseChangeSender::SendRecords() {
EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) {
if (Senders.contains(partitionId)) {
auto& sender = Senders.at(partitionId);
sender.Prepared.push_back(std::move(it->second));
sender.Prepared.push_back(it->second);
if (!sender.ActorId) {
Y_ABORT_UNLESS(!sender.Ready);
registrations.insert(partitionId);
Expand Down
40 changes: 40 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <library/cpp/json/json_reader.h>
#include <library/cpp/json/json_writer.h>

#include <util/generic/algorithm.h>
#include <util/generic/size_literals.h>
#include <util/string/join.h>
#include <util/string/printf.h>
Expand Down Expand Up @@ -3147,6 +3148,45 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(ResolvedTimestampsMultiplePartitions) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions().Shards(2));

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));

TVector<TVector<std::pair<TString, TString>>> records(2); // partition to records
while (true) {
for (ui32 i = 0; i < records.size(); ++i) {
records[i] = GetRecords(*server->GetRuntime(), edgeActor, "/Root/Table/Stream", i);
}

if (AllOf(records, [](const auto& x) { return !x.empty(); })) {
break;
}

SimulateSleep(server, TDuration::Seconds(1));
}

UNIT_ASSERT(records.size() > 1);
UNIT_ASSERT(!records[0].empty());
AssertJsonsEqual(records[0][0].second, R"({"resolved":"***"})");

for (ui32 i = 1; i < records.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(records[i][0].second, records[0][0].second);
}
}

Y_UNIT_TEST(InitialScanAndResolvedTimestamps) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Expand Down

0 comments on commit 651efd4

Please sign in to comment.