From e3133f8c932dd7bad29c6747fba04acdd4153767 Mon Sep 17 00:00:00 2001 From: FloatingCrowbar Date: Wed, 9 Oct 2024 14:06:42 +0300 Subject: [PATCH] Avoid removing empty message (#10167) (#10233) --- .../deprecated/persqueue_v0/grpc_pq_actor.h | 2 +- .../persqueue_v0/grpc_pq_read_actor.cpp | 29 +++++--------- ydb/services/persqueue_v1/actors/helpers.cpp | 38 +++++++++---------- ydb/services/persqueue_v1/actors/helpers.h | 4 +- .../actors/read_session_actor.cpp | 4 +- 5 files changed, 32 insertions(+), 45 deletions(-) diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h index d97a4e6aaaa9..6fef53f8ad4e 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h @@ -726,7 +726,7 @@ class TReadSessionActor : public TActorBootstrapped { static ui32 NormalizeMaxReadSize(ui32 sourceValue); static ui32 NormalizeMaxReadPartitionsCount(ui32 sourceValue); - static bool RemoveEmptyMessages(NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are nonempty messages + static bool HasMessages(const NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are any messages private: IReadSessionHandlerRef Handler; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp index 79a61c72dc6e..a930e737d189 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp @@ -1527,7 +1527,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0); i64 diff = formedResponse->Response.ByteSize(); - const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.MutableBatchedData()); + const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData()); if (hasMessages) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid); formedResponse->Response.MutableBatchedData()->SetCookie(ReadIdToResponse); @@ -1760,26 +1760,15 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) { } } -bool TReadSessionActor::RemoveEmptyMessages(TReadResponse::TBatchedData& data) { - bool hasNonEmptyMessages = false; - auto isMessageEmpty = [&](TReadResponse::TBatchedData::TMessageData& message) -> bool { - if (message.GetData().empty()) { - return true; - } else { - hasNonEmptyMessages = true; - return false; +bool TReadSessionActor::HasMessages(const TReadResponse::TBatchedData& data) { + for (const auto& partData : data.GetPartitionData()) { + for (const auto& batch : partData.GetBatch()) { + if (batch.MessageDataSize() > 0) { + return true; + } } - }; - auto batchRemover = [&](TReadResponse::TBatchedData::TBatch& batch) -> bool { - NProtoBuf::RemoveRepeatedFieldItemIf(batch.MutableMessageData(), isMessageEmpty); - return batch.MessageDataSize() == 0; - }; - auto partitionDataRemover = [&](TReadResponse::TBatchedData::TPartitionData& partition) -> bool { - NProtoBuf::RemoveRepeatedFieldItemIf(partition.MutableBatch(), batchRemover); - return partition.BatchSize() == 0; - }; - NProtoBuf::RemoveRepeatedFieldItemIf(data.MutablePartitionData(), partitionDataRemover); - return hasNonEmptyMessages; + } + return false; } diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp index b020d327986c..99b0febae676 100644 --- a/ydb/services/persqueue_v1/actors/helpers.cpp +++ b/ydb/services/persqueue_v1/actors/helpers.cpp @@ -4,29 +4,27 @@ namespace NKikimr::NGRpcProxy::V1 { -bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) { - auto batchRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch) -> bool { - return batch.message_data_size() == 0; - }; - auto partitionDataRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool { - NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover); - return partition.batches_size() == 0; - }; - NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover); - return !data.partition_data().empty(); +bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) { + for (const auto& partData : data.partition_data()) { + for (const auto& batch : partData.batches()) { + if (batch.message_data_size() > 0) { + return true; + } + } + } + return false; } // TODO: remove after refactor -bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) { - auto batchRemover = [&](Topic::StreamReadMessage::ReadResponse::Batch& batch) -> bool { - return batch.message_data_size() == 0; - }; - auto partitionDataRemover = [&](Topic::StreamReadMessage::ReadResponse::PartitionData& partition) -> bool { - NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover); - return partition.batches_size() == 0; - }; - NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover); - return !data.partition_data().empty(); +bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) { + for (const auto& partData : data.partition_data()) { + for (const auto& batch : partData.batches()) { + if (batch.message_data_size() > 0) { + return true; + } + } + } + return false; } } diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h index a7b4ddb92e30..5fca2a34d467 100644 --- a/ydb/services/persqueue_v1/actors/helpers.h +++ b/ydb/services/persqueue_v1/actors/helpers.h @@ -13,8 +13,8 @@ static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering using namespace Ydb; -bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data); +bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data); -bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data); +bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data); } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index d40a4f1324d7..aa1d4fd28bb3 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1947,9 +1947,9 @@ ui64 TReadSessionActor::PrepareResponse(typename TFormedRe formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize(); if constexpr (UseMigrationProtocol) { - formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch()); + formedResponse->HasMessages = HasMessages(formedResponse->Response.data_batch()); } else { - formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response()); + formedResponse->HasMessages = HasMessages(formedResponse->Response.read_response()); } return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;