Skip to content

Commit

Permalink
Merge f0f0e94 into 9acd579
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Apr 8, 2024
2 parents 9acd579 + f0f0e94 commit 335a12f
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
return;
}

for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
auto& partitions = topicToPartitions.second;
for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) {
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
if (partitionsIt->LockOn <= ctx.Now()) {
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId);
NeedRebalance = true;
partitionsIt = partitions.erase(partitionsIt);
} else {
Expand Down Expand Up @@ -408,6 +407,8 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy
}

void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
NewPartitionsToLockOnTime.clear();

for (auto& [topicName, topicInfo] : TopicsInfo) {
if (topicInfo.TabletID == tabletId) {
auto partitionsIt = TopicPartitions.find(topicName);
Expand Down Expand Up @@ -579,8 +580,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size();

auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end());
Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount);
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);

for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
Expand All @@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
}

NeedRebalance = true;
size_t partitionToReleaseIndex = 0;
size_t i = 0;
ui32 partitionToRelease = 0;
ui32 i = 0;

for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
++i;
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
partitionToReleaseIndex = partIndex;
if (rand() % i == 0) {
partitionToRelease = curPartition;
}
}
}
topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);

topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
}
}

Expand Down

0 comments on commit 335a12f

Please sign in to comment.