From 022bb1e39ce55f3d1cccefd2513310529b1ae83f Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 20 May 2024 08:09:05 +0000 Subject: [PATCH] Remove unused field and deprecate other --- ydb/core/protos/pqconfig.proto | 4 +-- .../actors/read_session_actor.cpp | 32 ++++++------------- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index c00f8057905c..001dfb82fec5 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -620,11 +620,11 @@ message TReleasePartition { optional uint64 Generation = 2; optional string Session = 3; optional string ClientId = 4; - optional uint32 Count = 5; + optional uint32 Count = 5 [deprecated = true]; // Remove at 2025-1 optional NActorsProto.TActorId PipeClient = 6; optional uint32 Group = 7; optional string Path = 8; - repeated uint32 Partition = 9; + reserved 9; // repeated uint32 Partition = 9 } message TPartitionReleased { diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 7365081a5ace..5da8f49f37e4 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1445,15 +1445,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePar } }; - std::unordered_set partitionsForRealese; - for (ui32 p : record.GetPartition()) { - partitionsForRealese.insert(p); - } - if (group) { - partitionsForRealese.insert(group - 1); - } - - if (partitionsForRealese.empty()) { + if (!group) { // Release partitions by count for (ui32 c = 0; c < record.GetCount(); ++c) { if (Partitions.empty()) { @@ -1484,32 +1476,28 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePar doRelease(jt); } } else { + ui32 partitionId = group - 1; + bool found = false; + // Release partitions by partition id LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " gone release" - << ": partitions# " << JoinRange(", ", partitionsForRealese.begin(), partitionsForRealese.end())); + << ": partition# " << partitionId); for (auto it = Partitions.begin(); it != Partitions.end(); ++it) { auto& partitionInfo = it->second; - if (partitionInfo.Topic->GetInternalName() == converter->GetInternalName()) { - auto pt = partitionsForRealese.find(partitionInfo.Partition.Partition); - if (pt == partitionsForRealese.end()) { - continue; - } - + if (partitionInfo.Topic->GetInternalName() == converter->GetInternalName() && partitionId == partitionInfo.Partition.Partition) { if (!partitionInfo.Releasing) { doRelease(it); } - partitionsForRealese.erase(pt); - if (partitionsForRealese.empty()) { - break; - } + found = true; + break; } } - if (!partitionsForRealese.empty()) { + if (!found) { return CloseSession(PersQueue::ErrorCode::ErrorCode::ERROR, - TStringBuilder() << "internal error: releasing unknown partitions " << JoinRange(", ", partitionsForRealese.begin(), partitionsForRealese.end()), + TStringBuilder() << "internal error: releasing unknown partition " << partitionId, ctx); } }