From aee55716dfebcedda78b28010c16a4c3f70bca47 Mon Sep 17 00:00:00 2001 From: niksaveliev Date: Mon, 8 Apr 2024 10:23:26 +0500 Subject: [PATCH] Fix kafka read session verify (#3522) --- .../kafka_proxy/actors/kafka_read_session_actor.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index cd3f6dd7d674..4719b02d143e 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo return; } - for (auto& topicToPartitions: NewPartitionsToLockOnTime) { - auto& partitions = topicToPartitions.second; + for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) { for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) { if (partitionsIt->LockOn <= ctx.Now()) { - TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId); + TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId); NeedRebalance = true; partitionsIt = partitions.erase(partitionsIt); } else { @@ -408,6 +407,8 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy } void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) { + NewPartitionsToLockOnTime.clear(); + for (auto& [topicName, topicInfo] : TopicsInfo) { if (topicInfo.TabletID == tabletId) { auto partitionsIt = TopicPartitions.find(topicName); @@ -579,8 +580,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size(); auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName()); - Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end()); - Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount); + Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount); for (ui32 c = 0; c < record.GetCount(); ++c) { // if some partition not locked yet, then release it without rebalance