diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index 055968c2049a..823096c6f4df 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -45,7 +45,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession.WaitAllMessages(); - for(const auto& info : readSession.ReceivedMessages) { + for(const auto& info : readSession.Impl->ReceivedMessages) { if (info.Data == "message_1.1") { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -92,14 +92,14 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession.WaitAndAssertPartitions({0}, "We are reading only one partition because offset is not commited"); readSession.Run(); - readSession.AutoCommit = true; + readSession.Impl->AutoCommit = true; readSession.Commit(); readSession.WaitAndAssertPartitions({0, 1, 2}, "We are reading all partitions because offset is commited"); readSession.Run(); readSession.WaitAllMessages(); - for(const auto& info : readSession.ReceivedMessages) { + for(const auto& info : readSession.Impl->ReceivedMessages) { if (info.Data == "message_1.1") { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -111,7 +111,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } } - UNIT_ASSERT_C(readSession.EndedPartitionEvents.empty(), "Old SDK is not support EndPartitionEvent"); + UNIT_ASSERT_C(readSession.Impl->EndedPartitionEvents.empty(), "Old SDK is not support EndPartitionEvent"); writeSession->Close(TDuration::Seconds(1)); readSession.Close(); @@ -140,7 +140,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession.WaitAllMessages(); - for(const auto& info : readSession.ReceivedMessages) { + for(const auto& info : readSession.Impl->ReceivedMessages) { if (info.Data == "message_1.1") { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -152,8 +152,8 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } } - UNIT_ASSERT_VALUES_EQUAL_C(1, readSession.EndedPartitionEvents.size(), "Only one partition was ended"); - auto& ev = readSession.EndedPartitionEvents.front(); + UNIT_ASSERT_VALUES_EQUAL_C(1, readSession.Impl->EndedPartitionEvents.size(), "Only one partition was ended"); + auto& ev = readSession.Impl->EndedPartitionEvents.front(); UNIT_ASSERT_VALUES_EQUAL_C(std::vector{}, ev.GetAdjacentPartitionIds(), "There isn`t adjacent partitions after split"); std::vector children = {1, 2}; UNIT_ASSERT_VALUES_EQUAL_C(children, ev.GetChildPartitionIds(), ""); @@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { Cerr << ">>>>> All messages received" << Endl; - for(const auto& info : readSession.ReceivedMessages) { + for(const auto& info : readSession.Impl->ReceivedMessages) { if (info.Data == "message_1.1") { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -267,7 +267,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession.WaitAllMessages(); - for(const auto& info : readSession.ReceivedMessages) { + for(const auto& info : readSession.Impl->ReceivedMessages) { if (info.Data == TString("message_1.1")) { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo); @@ -283,13 +283,13 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } if (autoscaleAwareSDK) { - UNIT_ASSERT_VALUES_EQUAL_C(2, readSession.EndedPartitionEvents.size(), "Two partition was ended which was merged"); - for (auto& ev : readSession.EndedPartitionEvents) { + UNIT_ASSERT_VALUES_EQUAL_C(2, readSession.Impl->EndedPartitionEvents.size(), "Two partition was ended which was merged"); + for (auto& ev : readSession.Impl->EndedPartitionEvents) { UNIT_ASSERT(ev.GetAdjacentPartitionIds() == std::vector{0} || ev.GetAdjacentPartitionIds() == std::vector{1}); UNIT_ASSERT_VALUES_EQUAL_C(std::vector{2}, ev.GetChildPartitionIds(), ""); } } else { - UNIT_ASSERT_VALUES_EQUAL_C(0, readSession.EndedPartitionEvents.size(), "OLD SDK"); + UNIT_ASSERT_VALUES_EQUAL_C(0, readSession.Impl->EndedPartitionEvents.size(), "OLD SDK"); } @@ -353,7 +353,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession.WaitAndAssertPartitions({0}, "Must secondary read for check read from end"); readSession.WaitAndAssertPartitions({}, "Partition must be released for secondary read because start not from the end of partition after 2 seconds"); - readSession.Offsets[0] = 1; + readSession.SetOffset(0, 1); readSession.WaitAndAssertPartitions({0, 1, 2}, "Must read from all partitions because had been read from the end of partition"); readSession.Close(); @@ -393,12 +393,12 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { SplitPartition(setup, ++txId, 0, "a"); TTestReadSession readSession1("Session-0", client, Max(), false, {0, 1, 2}, false); - readSession1.Offsets[0] = 1; + readSession1.SetOffset(0, 1); readSession1.WaitAndAssertPartitions({0, 1, 2}, "Must read all exists partitions because read the partition 0 from offset 1"); - readSession1.Offsets[0] = 0; + readSession1.SetOffset(0, 0); TTestReadSession readSession2("Session-1", client, Max(), false, {0}, false); - readSession2.Offsets[0] = 0; + readSession2.SetOffset(0, 0); readSession2.WaitAndAssertPartitions({0}, "Must read partition 0 because it defined in the readSession"); readSession2.Run(); diff --git a/ydb/core/persqueue/ut/balancing_ut.cpp b/ydb/core/persqueue/ut/balancing_ut.cpp index 9caf286a0aeb..2a308ddc5a39 100644 --- a/ydb/core/persqueue/ut/balancing_ut.cpp +++ b/ydb/core/persqueue/ut/balancing_ut.cpp @@ -71,8 +71,8 @@ Y_UNIT_TEST_SUITE(Balancing) { readSession3.WaitAndAssertPartitions({0}, "The reading session should read partitions 0 and 1 because it clearly required them to be read."); readSession2.WaitAndAssertPartitions({1}, "The reading session should read partitions 0 and 1 because it clearly required them to be read."); - auto p0 = readSession0.Partitions; - p0.insert(readSession1.Partitions.begin(), readSession1.Partitions.end()); + auto p0 = readSession0.Impl->Partitions; + p0.insert(readSession1.Impl->Partitions.begin(), readSession1.Impl->Partitions.end()); UNIT_ASSERT_VALUES_EQUAL_C(8, p0.size(), "Must read all partitions but " << p0); } diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp index 4a063414504e..26a7f240ec8e 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp @@ -119,12 +119,10 @@ std::shared_ptr CreateWriteSession(TTopicClient& cl } -TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set partitions, bool autoscalingSupport) - : Name(name) - , AutoCommit(autoCommit) - , Semaphore(name.c_str(), SemCount) { +TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set partitions, bool autoscalingSupport) { + Impl = std::make_shared(name, autoCommit); - Acquire(); + Impl->Acquire(); auto readSettings = TReadSessionSettings() .ConsumerName(TEST_CONSUMER) @@ -135,81 +133,82 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si } readSettings.EventHandlers_.SimpleDataHandlers( - [&, expectedMessagesCount] + [impl=Impl, expectedMessagesCount] (TReadSessionEvent::TDataReceivedEvent& ev) mutable { auto& messages = ev.GetMessages(); for (size_t i = 0u; i < messages.size(); ++i) { auto& message = messages[i]; - Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId() + Cerr << ">>>>> " << impl->Name << " Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId() << ", message=" << message.GetData() << ", seqNo=" << message.GetSeqNo() << ", offset=" << message.GetOffset() << Endl << Flush; - ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), + impl->ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(), message.GetSeqNo(), message.GetOffset(), message.GetData(), message, - AutoCommit}); + impl->AutoCommit}); - if (AutoCommit) { + if (impl->AutoCommit) { message.Commit(); } } - if (ReceivedMessages.size() == expectedMessagesCount) { - DataPromise.SetValue(ReceivedMessages); + if (impl->ReceivedMessages.size() == expectedMessagesCount) { + impl->DataPromise.SetValue(impl->ReceivedMessages); } }); readSettings.EventHandlers_.StartPartitionSessionHandler( - [&] + [impl=Impl] (TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; + Cerr << ">>>>> " << impl->Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); - Modify([&](std::set& s) { s.insert(partitionId); }); - if (Offsets.contains(partitionId)) { - Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " from offset " << Offsets[partitionId] << Endl << Flush; - ev.Confirm(Offsets[partitionId], TMaybe()); + auto offset = impl->GetOffset(partitionId); + impl->Modify([&](std::set& s) { s.insert(partitionId); }); + if (offset) { + Cerr << ">>>>> " << impl->Name << " Start reading partition " << partitionId << " from offset " << offset.value() << Endl << Flush; + ev.Confirm(offset.value(), TMaybe()); } else { - Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " without offset" << Endl << Flush; + Cerr << ">>>>> " << impl->Name << " Start reading partition " << partitionId << " without offset" << Endl << Flush; ev.Confirm(); } }); readSettings.EventHandlers_.StopPartitionSessionHandler( - [&] + [impl=Impl] (TReadSessionEvent::TStopPartitionSessionEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; + Cerr << ">>>>> " << impl->Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); - Modify([&](std::set& s) { s.erase(partitionId); }); - Cerr << ">>>>> " << Name << " Stop reading partition " << partitionId << Endl << Flush; + impl->Modify([&](std::set& s) { s.erase(partitionId); }); + Cerr << ">>>>> " << impl->Name << " Stop reading partition " << partitionId << Endl << Flush; ev.Confirm(); }); readSettings.EventHandlers_.PartitionSessionClosedHandler( - [&] + [impl=Impl] (TReadSessionEvent::TPartitionSessionClosedEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl << Flush; + Cerr << ">>>>> " << impl->Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); - Modify([&](std::set& s) { s.erase(partitionId); }); - Cerr << ">>>>> " << Name << " Stop (closed) reading partition " << partitionId << Endl << Flush; + impl->Modify([&](std::set& s) { s.erase(partitionId); }); + Cerr << ">>>>> " << impl->Name << " Stop (closed) reading partition " << partitionId << Endl << Flush; }); readSettings.EventHandlers_.SessionClosedHandler( - [Name=name] + [impl=Impl] (const TSessionClosedEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl << Flush; + Cerr << ">>>>> " << impl->Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl << Flush; }); readSettings.EventHandlers_.EndPartitionSessionHandler( - [&] + [impl=Impl] (TReadSessionEvent::TEndPartitionSessionEvent& ev) mutable { - Cerr << ">>>>> " << Name << " Received TEndPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; + Cerr << ">>>>> " << impl->Name << " Received TEndPartitionSessionEvent message " << ev.DebugString() << Endl << Flush; auto partitionId = ev.GetPartitionSession()->GetPartitionId(); - EndedPartitions.insert(partitionId); - EndedPartitionEvents.push_back(ev); + impl->EndedPartitions.insert(partitionId); + impl->EndedPartitionEvents.push_back(ev); }); @@ -217,11 +216,12 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si } void TTestReadSession::WaitAllMessages() { - DataPromise.GetFuture().GetValue(TDuration::Seconds(5)); + Impl->DataPromise.GetFuture().GetValue(TDuration::Seconds(5)); } void TTestReadSession::Commit() { - for (auto& m : ReceivedMessages) { + Cerr << ">>>>> " << Impl->Name << " Commit all received messages" << Endl << Flush; + for (auto& m : Impl->ReceivedMessages) { if (!m.Commited) { m.Msg.Commit(); m.Commited = true; @@ -229,17 +229,17 @@ void TTestReadSession::Commit() { } } -void TTestReadSession::Acquire() { +void TTestReadSession::TImpl::Acquire() { Cerr << ">>>>> " << Name << " Acquire()" << Endl << Flush; Semaphore.Acquire(); } -void TTestReadSession::Release() { +void TTestReadSession::TImpl::Release() { Cerr << ">>>>> " << Name << " Release()" << Endl << Flush; Semaphore.Release(); } -NThreading::TFuture> TTestReadSession::Wait(std::set partitions, const TString& message) { +NThreading::TFuture> TTestReadSession::TImpl::Wait(std::set partitions, const TString& message) { Cerr << ">>>>> " << Name << " Wait partitions " << partitions << " " << message << Endl << Flush; with_lock (Lock) { @@ -256,37 +256,37 @@ NThreading::TFuture> TTestReadSession::Wait(std::set pa void TTestReadSession::Assert(const std::set& expected, NThreading::TFuture> f, const TString& message) { auto actual = f.HasValue() ? f.GetValueSync() : GetPartitions(); - Cerr << ">>>>> " << Name << " Partitions " << actual << " received #2" << Endl << Flush; + Cerr << ">>>>> " << Impl->Name << " Partitions " << actual << " received #2" << Endl << Flush; UNIT_ASSERT_VALUES_EQUAL_C(expected, actual, message); - Release(); + Impl->Release(); } void TTestReadSession::WaitAndAssertPartitions(std::set partitions, const TString& message) { - auto f = Wait(partitions, message); + auto f = Impl->Wait(partitions, message); f.Wait(TDuration::Seconds(60)); Assert(partitions, f, message); } void TTestReadSession::Run() { - ExpectedPartitions = std::nullopt; - Semaphore.TryAcquire(); - Release(); + Impl->ExpectedPartitions = std::nullopt; + Impl->Semaphore.TryAcquire(); + Impl->Release(); } void TTestReadSession::Close() { Run(); - Cerr << ">>>>> " << Name << " Closing reading session " << Endl << Flush; + Cerr << ">>>>> " << Impl->Name << " Closing reading session " << Endl << Flush; Session->Close(); Session.reset(); } std::set TTestReadSession::GetPartitions() { - with_lock (Lock) { - return Partitions; + with_lock (Impl->Lock) { + return Impl->Partitions; } } -void TTestReadSession::Modify(std::function&)> modifier) { +void TTestReadSession::TImpl::Modify(std::function&)> modifier) { bool found = false; with_lock (Lock) { @@ -304,5 +304,24 @@ void TTestReadSession::Modify(std::function&)> modifier) } } +std::optional TTestReadSession::TImpl::GetOffset(ui32 partitionId) const { + with_lock (Lock) { + auto it = Offsets.find(partitionId); + if (it == Offsets.end()) { + return std::nullopt; + } + return it->second; + } +} + +void TTestReadSession::SetOffset(ui32 partitionId, std::optional offset) { + with_lock (Impl->Lock) { + if (offset) { + Impl->Offsets[partitionId] = offset.value(); + } else { + Impl->Offsets.erase(partitionId); + } + } +} } diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h index 36d530f2bd4c..ad660b7fc929 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h @@ -40,32 +40,11 @@ struct TTestReadSession { bool Commited; }; - TString Name; - std::unordered_map Offsets; - - bool AutoCommit; - - std::shared_ptr Session; - - NThreading::TPromise> DataPromise = NThreading::NewPromise>(); - NThreading::TPromise> PartitionsPromise = NThreading::NewPromise>(); - - std::vector ReceivedMessages; - std::set Partitions; - std::optional> ExpectedPartitions; - - std::set EndedPartitions; - std::vector EndedPartitionEvents; - - TMutex Lock; - TSemaphore Semaphore; - static constexpr size_t SemCount = 1; TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max(), bool autoCommit = true, std::set partitions = {}, bool autoscalingSupport = true); void WaitAllMessages(); - NThreading::TFuture> Wait(std::set partitions, const TString& message); void Assert(const std::set& expected, NThreading::TFuture> f, const TString& message); void WaitAndAssertPartitions(std::set partitions, const TString& message); @@ -76,12 +55,45 @@ struct TTestReadSession { void Close(); std::set GetPartitions(); + void SetOffset(ui32 partitionId, std::optional offset); + + struct TImpl { + + TImpl(const TString& name, bool autoCommit) + : Name(name) + , AutoCommit(autoCommit) + , Semaphore(name.c_str(), SemCount) {} + + TString Name; + std::unordered_map Offsets; -private: - void Acquire(); - void Release(); + bool AutoCommit; + + NThreading::TPromise> DataPromise = NThreading::NewPromise>(); + NThreading::TPromise> PartitionsPromise = NThreading::NewPromise>(); + + std::vector ReceivedMessages; + std::set Partitions; + std::optional> ExpectedPartitions; + + std::set EndedPartitions; + std::vector EndedPartitionEvents; + + TMutex Lock; + TSemaphore Semaphore; + + std::optional GetOffset(ui32 partitionId) const; + void Modify(std::function&)> modifier); + + void Acquire(); + void Release(); + + NThreading::TFuture> Wait(std::set partitions, const TString& message); + }; + + std::shared_ptr Session; + std::shared_ptr Impl; - void Modify(std::function&)> modifier); }; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp index 9dc04106fb59..c5d9c3e774c6 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp @@ -688,7 +688,7 @@ namespace NYdb::NTopic::NTests { ReadSession.WaitAllMessages(); - for (const auto& info : ReadSession.ReceivedMessages) { + for (const auto& info : ReadSession.Impl->ReceivedMessages) { if (info.Data == "message_1.1") { UNIT_ASSERT_EQUAL(0, info.PartitionId); UNIT_ASSERT_EQUAL(2, info.SeqNo);