From 72353adebc62f09b49f27c2ca071e0810193b594 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sat, 6 Apr 2024 09:50:46 +0000 Subject: [PATCH 1/2] Fix verify --- ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index cd3f6dd7d674..fd458b5d2ffb 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo return; } - for (auto& topicToPartitions: NewPartitionsToLockOnTime) { - auto& partitions = topicToPartitions.second; + for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) { for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) { if (partitionsIt->LockOn <= ctx.Now()) { - TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId); + TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId); NeedRebalance = true; partitionsIt = partitions.erase(partitionsIt); } else { @@ -579,8 +578,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size(); auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName()); - Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end()); - Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount); + Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount); for (ui32 c = 0; c < record.GetCount(); ++c) { // if some partition not locked yet, then release it without rebalance From 64508649e6557c0c7f49b99db963bfc9dda803ff Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sat, 6 Apr 2024 10:01:07 +0000 Subject: [PATCH 2/2] Fix --- ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index fd458b5d2ffb..4719b02d143e 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -407,6 +407,8 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy } void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) { + NewPartitionsToLockOnTime.clear(); + for (auto& [topicName, topicInfo] : TopicsInfo) { if (topicInfo.TabletID == tabletId) { auto partitionsIt = TopicPartitions.find(topicName);