Skip to content

Commit

Permalink
fix test (ydb-platform#4535)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored and MrLolthe1st committed May 28, 2024
1 parent 7aaa08d commit 671b79d
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 92 deletions.
32 changes: 16 additions & 16 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

readSession.WaitAllMessages();

for(const auto& info : readSession.ReceivedMessages) {
for(const auto& info : readSession.Impl->ReceivedMessages) {
if (info.Data == "message_1.1") {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
Expand Down Expand Up @@ -92,14 +92,14 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

readSession.WaitAndAssertPartitions({0}, "We are reading only one partition because offset is not commited");
readSession.Run();
readSession.AutoCommit = true;
readSession.Impl->AutoCommit = true;
readSession.Commit();
readSession.WaitAndAssertPartitions({0, 1, 2}, "We are reading all partitions because offset is commited");
readSession.Run();

readSession.WaitAllMessages();

for(const auto& info : readSession.ReceivedMessages) {
for(const auto& info : readSession.Impl->ReceivedMessages) {
if (info.Data == "message_1.1") {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
Expand All @@ -111,7 +111,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
}

UNIT_ASSERT_C(readSession.EndedPartitionEvents.empty(), "Old SDK is not support EndPartitionEvent");
UNIT_ASSERT_C(readSession.Impl->EndedPartitionEvents.empty(), "Old SDK is not support EndPartitionEvent");

writeSession->Close(TDuration::Seconds(1));
readSession.Close();
Expand Down Expand Up @@ -140,7 +140,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

readSession.WaitAllMessages();

for(const auto& info : readSession.ReceivedMessages) {
for(const auto& info : readSession.Impl->ReceivedMessages) {
if (info.Data == "message_1.1") {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
Expand All @@ -152,8 +152,8 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
}

UNIT_ASSERT_VALUES_EQUAL_C(1, readSession.EndedPartitionEvents.size(), "Only one partition was ended");
auto& ev = readSession.EndedPartitionEvents.front();
UNIT_ASSERT_VALUES_EQUAL_C(1, readSession.Impl->EndedPartitionEvents.size(), "Only one partition was ended");
auto& ev = readSession.Impl->EndedPartitionEvents.front();
UNIT_ASSERT_VALUES_EQUAL_C(std::vector<ui32>{}, ev.GetAdjacentPartitionIds(), "There isn`t adjacent partitions after split");
std::vector<ui32> children = {1, 2};
UNIT_ASSERT_VALUES_EQUAL_C(children, ev.GetChildPartitionIds(), "");
Expand Down Expand Up @@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

Cerr << ">>>>> All messages received" << Endl;

for(const auto& info : readSession.ReceivedMessages) {
for(const auto& info : readSession.Impl->ReceivedMessages) {
if (info.Data == "message_1.1") {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
Expand Down Expand Up @@ -267,7 +267,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

readSession.WaitAllMessages();

for(const auto& info : readSession.ReceivedMessages) {
for(const auto& info : readSession.Impl->ReceivedMessages) {
if (info.Data == TString("message_1.1")) {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
Expand All @@ -283,13 +283,13 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}

if (autoscaleAwareSDK) {
UNIT_ASSERT_VALUES_EQUAL_C(2, readSession.EndedPartitionEvents.size(), "Two partition was ended which was merged");
for (auto& ev : readSession.EndedPartitionEvents) {
UNIT_ASSERT_VALUES_EQUAL_C(2, readSession.Impl->EndedPartitionEvents.size(), "Two partition was ended which was merged");
for (auto& ev : readSession.Impl->EndedPartitionEvents) {
UNIT_ASSERT(ev.GetAdjacentPartitionIds() == std::vector<ui32>{0} || ev.GetAdjacentPartitionIds() == std::vector<ui32>{1});
UNIT_ASSERT_VALUES_EQUAL_C(std::vector<ui32>{2}, ev.GetChildPartitionIds(), "");
}
} else {
UNIT_ASSERT_VALUES_EQUAL_C(0, readSession.EndedPartitionEvents.size(), "OLD SDK");
UNIT_ASSERT_VALUES_EQUAL_C(0, readSession.Impl->EndedPartitionEvents.size(), "OLD SDK");
}


Expand Down Expand Up @@ -353,7 +353,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
readSession.WaitAndAssertPartitions({0}, "Must secondary read for check read from end");
readSession.WaitAndAssertPartitions({}, "Partition must be released for secondary read because start not from the end of partition after 2 seconds");

readSession.Offsets[0] = 1;
readSession.SetOffset(0, 1);
readSession.WaitAndAssertPartitions({0, 1, 2}, "Must read from all partitions because had been read from the end of partition");

readSession.Close();
Expand Down Expand Up @@ -393,12 +393,12 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
SplitPartition(setup, ++txId, 0, "a");

TTestReadSession readSession1("Session-0", client, Max<size_t>(), false, {0, 1, 2}, false);
readSession1.Offsets[0] = 1;
readSession1.SetOffset(0, 1);
readSession1.WaitAndAssertPartitions({0, 1, 2}, "Must read all exists partitions because read the partition 0 from offset 1");
readSession1.Offsets[0] = 0;
readSession1.SetOffset(0, 0);

TTestReadSession readSession2("Session-1", client, Max<size_t>(), false, {0}, false);
readSession2.Offsets[0] = 0;
readSession2.SetOffset(0, 0);

readSession2.WaitAndAssertPartitions({0}, "Must read partition 0 because it defined in the readSession");
readSession2.Run();
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/balancing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Y_UNIT_TEST_SUITE(Balancing) {
readSession3.WaitAndAssertPartitions({0}, "The reading session should read partitions 0 and 1 because it clearly required them to be read.");
readSession2.WaitAndAssertPartitions({1}, "The reading session should read partitions 0 and 1 because it clearly required them to be read.");

auto p0 = readSession0.Partitions;
p0.insert(readSession1.Partitions.begin(), readSession1.Partitions.end());
auto p0 = readSession0.Impl->Partitions;
p0.insert(readSession1.Impl->Partitions.begin(), readSession1.Impl->Partitions.end());
UNIT_ASSERT_VALUES_EQUAL_C(8, p0.size(), "Must read all partitions but " << p0);
}

Expand Down
115 changes: 67 additions & 48 deletions ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,10 @@ std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& cl
}


TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set<ui32> partitions, bool autoscalingSupport)
: Name(name)
, AutoCommit(autoCommit)
, Semaphore(name.c_str(), SemCount) {
TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set<ui32> partitions, bool autoscalingSupport) {
Impl = std::make_shared<TImpl>(name, autoCommit);

Acquire();
Impl->Acquire();

auto readSettings = TReadSessionSettings()
.ConsumerName(TEST_CONSUMER)
Expand All @@ -135,111 +133,113 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
}

readSettings.EventHandlers_.SimpleDataHandlers(
[&, expectedMessagesCount]
[impl=Impl, expectedMessagesCount]
(TReadSessionEvent::TDataReceivedEvent& ev) mutable {
auto& messages = ev.GetMessages();
for (size_t i = 0u; i < messages.size(); ++i) {
auto& message = messages[i];

Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId()
Cerr << ">>>>> " << impl->Name << " Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId()
<< ", message=" << message.GetData()
<< ", seqNo=" << message.GetSeqNo()
<< ", offset=" << message.GetOffset()
<< Endl << Flush;
ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
impl->ReceivedMessages.push_back({message.GetPartitionSession()->GetPartitionId(),
message.GetSeqNo(),
message.GetOffset(),
message.GetData(),
message,
AutoCommit});
impl->AutoCommit});

if (AutoCommit) {
if (impl->AutoCommit) {
message.Commit();
}
}

if (ReceivedMessages.size() == expectedMessagesCount) {
DataPromise.SetValue(ReceivedMessages);
if (impl->ReceivedMessages.size() == expectedMessagesCount) {
impl->DataPromise.SetValue(impl->ReceivedMessages);
}
});

readSettings.EventHandlers_.StartPartitionSessionHandler(
[&]
[impl=Impl]
(TReadSessionEvent::TStartPartitionSessionEvent& ev) mutable {
Cerr << ">>>>> " << Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
Cerr << ">>>>> " << impl->Name << " Received TStartPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
Modify([&](std::set<size_t>& s) { s.insert(partitionId); });
if (Offsets.contains(partitionId)) {
Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " from offset " << Offsets[partitionId] << Endl << Flush;
ev.Confirm(Offsets[partitionId], TMaybe<ui64>());
auto offset = impl->GetOffset(partitionId);
impl->Modify([&](std::set<size_t>& s) { s.insert(partitionId); });
if (offset) {
Cerr << ">>>>> " << impl->Name << " Start reading partition " << partitionId << " from offset " << offset.value() << Endl << Flush;
ev.Confirm(offset.value(), TMaybe<ui64>());
} else {
Cerr << ">>>>> " << Name << " Start reading partition " << partitionId << " without offset" << Endl << Flush;
Cerr << ">>>>> " << impl->Name << " Start reading partition " << partitionId << " without offset" << Endl << Flush;
ev.Confirm();
}
});

readSettings.EventHandlers_.StopPartitionSessionHandler(
[&]
[impl=Impl]
(TReadSessionEvent::TStopPartitionSessionEvent& ev) mutable {
Cerr << ">>>>> " << Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
Cerr << ">>>>> " << impl->Name << " Received TStopPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
Modify([&](std::set<size_t>& s) { s.erase(partitionId); });
Cerr << ">>>>> " << Name << " Stop reading partition " << partitionId << Endl << Flush;
impl->Modify([&](std::set<size_t>& s) { s.erase(partitionId); });
Cerr << ">>>>> " << impl->Name << " Stop reading partition " << partitionId << Endl << Flush;
ev.Confirm();
});

readSettings.EventHandlers_.PartitionSessionClosedHandler(
[&]
[impl=Impl]
(TReadSessionEvent::TPartitionSessionClosedEvent& ev) mutable {
Cerr << ">>>>> " << Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl << Flush;
Cerr << ">>>>> " << impl->Name << " Received TPartitionSessionClosedEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
Modify([&](std::set<size_t>& s) { s.erase(partitionId); });
Cerr << ">>>>> " << Name << " Stop (closed) reading partition " << partitionId << Endl << Flush;
impl->Modify([&](std::set<size_t>& s) { s.erase(partitionId); });
Cerr << ">>>>> " << impl->Name << " Stop (closed) reading partition " << partitionId << Endl << Flush;
});

readSettings.EventHandlers_.SessionClosedHandler(
[Name=name]
[impl=Impl]
(const TSessionClosedEvent& ev) mutable {
Cerr << ">>>>> " << Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl << Flush;
Cerr << ">>>>> " << impl->Name << " Received TSessionClosedEvent message " << ev.DebugString() << Endl << Flush;
});

readSettings.EventHandlers_.EndPartitionSessionHandler(
[&]
[impl=Impl]
(TReadSessionEvent::TEndPartitionSessionEvent& ev) mutable {
Cerr << ">>>>> " << Name << " Received TEndPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
Cerr << ">>>>> " << impl->Name << " Received TEndPartitionSessionEvent message " << ev.DebugString() << Endl << Flush;
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
EndedPartitions.insert(partitionId);
EndedPartitionEvents.push_back(ev);
impl->EndedPartitions.insert(partitionId);
impl->EndedPartitionEvents.push_back(ev);
});


Session = client.CreateReadSession(readSettings);
}

void TTestReadSession::WaitAllMessages() {
DataPromise.GetFuture().GetValue(TDuration::Seconds(5));
Impl->DataPromise.GetFuture().GetValue(TDuration::Seconds(5));
}

void TTestReadSession::Commit() {
for (auto& m : ReceivedMessages) {
Cerr << ">>>>> " << Impl->Name << " Commit all received messages" << Endl << Flush;
for (auto& m : Impl->ReceivedMessages) {
if (!m.Commited) {
m.Msg.Commit();
m.Commited = true;
}
}
}

void TTestReadSession::Acquire() {
void TTestReadSession::TImpl::Acquire() {
Cerr << ">>>>> " << Name << " Acquire()" << Endl << Flush;
Semaphore.Acquire();
}

void TTestReadSession::Release() {
void TTestReadSession::TImpl::Release() {
Cerr << ">>>>> " << Name << " Release()" << Endl << Flush;
Semaphore.Release();
}

NThreading::TFuture<std::set<size_t>> TTestReadSession::Wait(std::set<size_t> partitions, const TString& message) {
NThreading::TFuture<std::set<size_t>> TTestReadSession::TImpl::Wait(std::set<size_t> partitions, const TString& message) {
Cerr << ">>>>> " << Name << " Wait partitions " << partitions << " " << message << Endl << Flush;

with_lock (Lock) {
Expand All @@ -256,37 +256,37 @@ NThreading::TFuture<std::set<size_t>> TTestReadSession::Wait(std::set<size_t> pa

void TTestReadSession::Assert(const std::set<size_t>& expected, NThreading::TFuture<std::set<size_t>> f, const TString& message) {
auto actual = f.HasValue() ? f.GetValueSync() : GetPartitions();
Cerr << ">>>>> " << Name << " Partitions " << actual << " received #2" << Endl << Flush;
Cerr << ">>>>> " << Impl->Name << " Partitions " << actual << " received #2" << Endl << Flush;
UNIT_ASSERT_VALUES_EQUAL_C(expected, actual, message);
Release();
Impl->Release();
}

void TTestReadSession::WaitAndAssertPartitions(std::set<size_t> partitions, const TString& message) {
auto f = Wait(partitions, message);
auto f = Impl->Wait(partitions, message);
f.Wait(TDuration::Seconds(60));
Assert(partitions, f, message);
}

void TTestReadSession::Run() {
ExpectedPartitions = std::nullopt;
Semaphore.TryAcquire();
Release();
Impl->ExpectedPartitions = std::nullopt;
Impl->Semaphore.TryAcquire();
Impl->Release();
}

void TTestReadSession::Close() {
Run();
Cerr << ">>>>> " << Name << " Closing reading session " << Endl << Flush;
Cerr << ">>>>> " << Impl->Name << " Closing reading session " << Endl << Flush;
Session->Close();
Session.reset();
}

std::set<size_t> TTestReadSession::GetPartitions() {
with_lock (Lock) {
return Partitions;
with_lock (Impl->Lock) {
return Impl->Partitions;
}
}

void TTestReadSession::Modify(std::function<void (std::set<size_t>&)> modifier) {
void TTestReadSession::TImpl::Modify(std::function<void (std::set<size_t>&)> modifier) {
bool found = false;

with_lock (Lock) {
Expand All @@ -304,5 +304,24 @@ void TTestReadSession::Modify(std::function<void (std::set<size_t>&)> modifier)
}
}

std::optional<ui64> TTestReadSession::TImpl::GetOffset(ui32 partitionId) const {
with_lock (Lock) {
auto it = Offsets.find(partitionId);
if (it == Offsets.end()) {
return std::nullopt;
}
return it->second;
}
}

void TTestReadSession::SetOffset(ui32 partitionId, std::optional<ui64> offset) {
with_lock (Impl->Lock) {
if (offset) {
Impl->Offsets[partitionId] = offset.value();
} else {
Impl->Offsets.erase(partitionId);
}
}
}

}
Loading

0 comments on commit 671b79d

Please sign in to comment.