From 47f3e492be144ceb7c2f43943e98be818802800e Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 2 Oct 2024 08:00:49 +0000 Subject: [PATCH 1/4] fix topic_session after load tests --- .../fq/libs/row_dispatcher/topic_session.cpp | 12 ++++++++++- .../row_dispatcher/ut/topic_session_ut.cpp | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 9623806ee87a..1ed11ef5c97d 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -92,6 +92,7 @@ struct TEvPrivate { ui64 PrintStatePeriodSec = 60; ui64 MaxBatchSizeBytes = 10000000; +ui64 MaxHandledEvents = 1000; TVector GetVector(const google::protobuf::RepeatedPtrField& value) { return {value.begin(), value.end()}; @@ -227,6 +228,7 @@ class TTopicSession : public TActorBootstrapped { cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady); IgnoreFunc(NFq::TEvPrivate::TEvCreateSession); + IgnoreFunc(NFq::TEvPrivate::TEvDataParsed); IgnoreFunc(NFq::TEvPrivate::TEvDataAfterFilteration); IgnoreFunc(NFq::TEvPrivate::TEvStatus); IgnoreFunc(NFq::TEvPrivate::TEvDataFiltered); @@ -442,6 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { + ui64 counter = MaxHandledEvents; while (true) { if (!ReadSession) { return; @@ -455,6 +458,9 @@ void TTopicSession::HandleNewEvents() { break; } std::visit(TTopicEventProcessor{*this, LogPrefix}, *event); + if (!counter--) { + break; + } } } @@ -475,7 +481,6 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE LOG_ROW_DISPATCHER_TRACE("Data received: " << message.DebugString(true)); TString item = message.GetData(); - item.Detach(); Self.SendToParsing(message.GetOffset(), item); Self.LastMessageOffset = message.GetOffset(); } @@ -540,6 +545,11 @@ void TTopicSession::SendToParsing(ui64 offset, const TString& message) { } } + if (ClientsWithoutPredicate.size() == Clients.size()) { + Cerr << "ClientsWithoutPredicate" << Endl; + return; + } + try { Parser->Push(offset, message); } catch (const std::exception& e) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index ba24378e0a35..65c24fcb85f1 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -171,6 +171,27 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } + Y_UNIT_TEST_F(TwoSessionWithoutPredicate, TFixture) { + const TString topicName = "twowithoutpredicate"; + PQCreateStream(topicName); + Init(topicName); + auto source1 = BuildSource(topicName, true); + auto source2 = BuildSource(topicName, true); + StartSession(ReadActorId1, source1); + StartSession(ReadActorId2, source2); + + const std::vector data = { Json1 }; + PQWrite(data, topicName); + ExpectNewDataArrived({ReadActorId1, ReadActorId2}); + Runtime.Send(new IEventHandle(TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch())); + Runtime.Send(new IEventHandle(TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch())); + ExpectMessageBatch(ReadActorId1, { Json1 }); + ExpectMessageBatch(ReadActorId2, { Json1 }); + + StopSession(ReadActorId1, source1); + StopSession(ReadActorId2, source2); + } + Y_UNIT_TEST_F(SessionWithPredicateAndSessionWithoutPredicate, TFixture) { const TString topicName = "topic2"; PQCreateStream(topicName); From b5aeefb7041f2d65f34249dfe707fe7467828a2c Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 2 Oct 2024 09:34:41 +0000 Subject: [PATCH 2/4] remove cerr --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 1ed11ef5c97d..69848a149558 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -444,8 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { - ui64 counter = MaxHandledEvents; - while (true) { + for (ui64 counter = 0; i < MaxHandledEvents; ++i { if (!ReadSession) { return; } @@ -458,9 +457,6 @@ void TTopicSession::HandleNewEvents() { break; } std::visit(TTopicEventProcessor{*this, LogPrefix}, *event); - if (!counter--) { - break; - } } } @@ -546,7 +542,6 @@ void TTopicSession::SendToParsing(ui64 offset, const TString& message) { } if (ClientsWithoutPredicate.size() == Clients.size()) { - Cerr << "ClientsWithoutPredicate" << Endl; return; } From 490b7d5e2d024505611b1071fd072818cc030040 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 2 Oct 2024 09:48:22 +0000 Subject: [PATCH 3/4] fix build --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 69848a149558..04ee306c42bb 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -444,7 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { - for (ui64 counter = 0; i < MaxHandledEvents; ++i { + for (ui64 i = 0; i < MaxHandledEvents; ++i { if (!ReadSession) { return; } From a90d06649928538df29f38168a7a3845f6367676 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 2 Oct 2024 10:27:39 +0000 Subject: [PATCH 4/4] fix build --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 04ee306c42bb..a1dc73bfef63 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -444,7 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { - for (ui64 i = 0; i < MaxHandledEvents; ++i { + for (ui64 i = 0; i < MaxHandledEvents; ++i) { if (!ReadSession) { return; }