Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Apr 8, 2024
1 parent 5ae8fd6 commit f0f0e94
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
}

NeedRebalance = true;
size_t partitionToReleaseIndex = 0;
size_t i = 0;
ui32 partitionToRelease = 0;
ui32 i = 0;

for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
++i;
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
partitionToReleaseIndex = partIndex;
if (rand() % i == 0) {
partitionToRelease = curPartition;
}
}
}
topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);

topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
}
}

Expand Down

0 comments on commit f0f0e94

Please sign in to comment.