Skip to content

Commit

Permalink
Fix kafka read session verify (#3522)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Apr 8, 2024
1 parent 1acf95f commit aee5571
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
return;
}

for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
auto& partitions = topicToPartitions.second;
for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) {
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
if (partitionsIt->LockOn <= ctx.Now()) {
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId);
NeedRebalance = true;
partitionsIt = partitions.erase(partitionsIt);
} else {
Expand Down Expand Up @@ -408,6 +407,8 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy
}

void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
NewPartitionsToLockOnTime.clear();

for (auto& [topicName, topicInfo] : TopicsInfo) {
if (topicInfo.TabletID == tabletId) {
auto partitionsIt = TopicPartitions.find(topicName);
Expand Down Expand Up @@ -579,8 +580,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size();

auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end());
Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount);
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);

for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
Expand Down

0 comments on commit aee5571

Please sign in to comment.