From 0da0d6abdfa632ec400a7da3b2cb09b861f464e2 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 3 Oct 2024 09:49:00 +0300 Subject: [PATCH] YQ-3583 Improvements after load tests / to stable (#10019) --- .../fq/libs/row_dispatcher/topic_session.cpp | 9 ++++++-- .../row_dispatcher/ut/topic_session_ut.cpp | 21 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 9623806ee87a..a1dc73bfef63 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -92,6 +92,7 @@ struct TEvPrivate { ui64 PrintStatePeriodSec = 60; ui64 MaxBatchSizeBytes = 10000000; +ui64 MaxHandledEvents = 1000; TVector GetVector(const google::protobuf::RepeatedPtrField& value) { return {value.begin(), value.end()}; @@ -227,6 +228,7 @@ class TTopicSession : public TActorBootstrapped { cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady); IgnoreFunc(NFq::TEvPrivate::TEvCreateSession); + IgnoreFunc(NFq::TEvPrivate::TEvDataParsed); IgnoreFunc(NFq::TEvPrivate::TEvDataAfterFilteration); IgnoreFunc(NFq::TEvPrivate::TEvStatus); IgnoreFunc(NFq::TEvPrivate::TEvDataFiltered); @@ -442,7 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { - while (true) { + for (ui64 i = 0; i < MaxHandledEvents; ++i) { if (!ReadSession) { return; } @@ -475,7 +477,6 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE LOG_ROW_DISPATCHER_TRACE("Data received: " << message.DebugString(true)); TString item = message.GetData(); - item.Detach(); Self.SendToParsing(message.GetOffset(), item); Self.LastMessageOffset = message.GetOffset(); } @@ -540,6 +541,10 @@ void TTopicSession::SendToParsing(ui64 offset, const TString& message) { } } + if (ClientsWithoutPredicate.size() == Clients.size()) { + return; + } + try { Parser->Push(offset, message); } catch (const std::exception& e) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index ba24378e0a35..65c24fcb85f1 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -171,6 +171,27 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } + Y_UNIT_TEST_F(TwoSessionWithoutPredicate, TFixture) { + const TString topicName = "twowithoutpredicate"; + PQCreateStream(topicName); + Init(topicName); + auto source1 = BuildSource(topicName, true); + auto source2 = BuildSource(topicName, true); + StartSession(ReadActorId1, source1); + StartSession(ReadActorId2, source2); + + const std::vector data = { Json1 }; + PQWrite(data, topicName); + ExpectNewDataArrived({ReadActorId1, ReadActorId2}); + Runtime.Send(new IEventHandle(TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch())); + Runtime.Send(new IEventHandle(TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch())); + ExpectMessageBatch(ReadActorId1, { Json1 }); + ExpectMessageBatch(ReadActorId2, { Json1 }); + + StopSession(ReadActorId1, source1); + StopSession(ReadActorId2, source2); + } + Y_UNIT_TEST_F(SessionWithPredicateAndSessionWithoutPredicate, TFixture) { const TString topicName = "topic2"; PQCreateStream(topicName);