Skip to content

Commit

Permalink
[*] refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov committed Aug 14, 2024
1 parent e9ccd40 commit dc06f8a
Showing 1 changed file with 26 additions and 39 deletions.
65 changes: 26 additions & 39 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class TFixture : public NUnitTest::TBaseFixture {
void Write(const TString& message, NTable::TTransaction* tx = nullptr);

size_t AckCount() const { return WrittenAckCount + WrittenInTxAckCount; }

void WaitForEvent();
};

void SetUp(NUnitTest::TTestContext&) override;
Expand Down Expand Up @@ -595,26 +597,31 @@ auto TFixture::GetTopicReadSession(const TString& topicPath,
void TFixture::TTopicWriteSessionContext::WaitForContinuationToken()
{
while (!ContinuationToken.Defined()) {
Session->WaitEvent().Wait();
for (auto& event : Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++WrittenInTxAckCount;
break;
default:
break;
}
WaitForEvent();
}
}

void TFixture::TTopicWriteSessionContext::WaitForEvent()
{
Session->WaitEvent().Wait();
for (auto& event : Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++WrittenInTxAckCount;
break;
default:
break;
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
}
}
Expand Down Expand Up @@ -713,27 +720,7 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup
UNIT_ASSERT(context.AckCount() <= context.WriteCount);

while (context.AckCount() < context.WriteCount) {
context.Session->WaitEvent().Wait();
for (auto& event : context.Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
context.ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++context.WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++context.WrittenInTxAckCount;
break;
default:
break;
}
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
}
context.WaitForEvent();
}

UNIT_ASSERT((context.WrittenAckCount + context.WrittenInTxAckCount) == context.WriteCount);
Expand Down

0 comments on commit dc06f8a

Please sign in to comment.