diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 696ac0e409b6..010c1d6e5849 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -38,6 +38,8 @@ class TFixture : public NUnitTest::TBaseFixture { void Write(const TString& message, NTable::TTransaction* tx = nullptr); size_t AckCount() const { return WrittenAckCount + WrittenInTxAckCount; } + + void WaitForEvent(); }; void SetUp(NUnitTest::TTestContext&) override; @@ -595,26 +597,31 @@ auto TFixture::GetTopicReadSession(const TString& topicPath, void TFixture::TTopicWriteSessionContext::WaitForContinuationToken() { while (!ContinuationToken.Defined()) { - Session->WaitEvent().Wait(); - for (auto& event : Session->GetEvents()) { - if (auto* e = std::get_if(&event)) { - ContinuationToken = std::move(e->ContinuationToken); - } else if (auto* e = std::get_if(&event)) { - for (auto& ack : e->Acks) { - switch (ack.State) { - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: - ++WrittenAckCount; - break; - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: - ++WrittenInTxAckCount; - break; - default: - break; - } + WaitForEvent(); + } +} + +void TFixture::TTopicWriteSessionContext::WaitForEvent() +{ + Session->WaitEvent().Wait(); + for (auto& event : Session->GetEvents()) { + if (auto* e = std::get_if(&event)) { + ContinuationToken = std::move(e->ContinuationToken); + } else if (auto* e = std::get_if(&event)) { + for (auto& ack : e->Acks) { + switch (ack.State) { + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: + ++WrittenAckCount; + break; + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: + ++WrittenInTxAckCount; + break; + default: + break; } - } else if (auto* e = std::get_if(&event)) { - UNIT_FAIL(""); } + } else if (auto* e = std::get_if(&event)) { + UNIT_FAIL(""); } } } @@ -713,27 +720,7 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup UNIT_ASSERT(context.AckCount() <= context.WriteCount); while (context.AckCount() < context.WriteCount) { - context.Session->WaitEvent().Wait(); - for (auto& event : context.Session->GetEvents()) { - if (auto* e = std::get_if(&event)) { - context.ContinuationToken = std::move(e->ContinuationToken); - } else if (auto* e = std::get_if(&event)) { - for (auto& ack : e->Acks) { - switch (ack.State) { - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: - ++context.WrittenAckCount; - break; - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: - ++context.WrittenInTxAckCount; - break; - default: - break; - } - } - } else if (auto* e = std::get_if(&event)) { - UNIT_FAIL(""); - } - } + context.WaitForEvent(); } UNIT_ASSERT((context.WrittenAckCount + context.WrittenInTxAckCount) == context.WriteCount);