Skip to content

Commit

Permalink
Divide partitions amount all consumers. See wvanbergen#61
Browse files Browse the repository at this point in the history
  • Loading branch information
nemosupremo committed Jul 5, 2015
1 parent e236a65 commit 8928a13
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions consumergroup/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,30 @@ func dividePartitionsBetweenConsumers(consumers kazoo.ConsumergroupInstanceList,

plen := len(partitions)
clen := len(consumers)
if clen == 0 {
return result
}

sort.Sort(partitions)
sort.Sort(consumers)

n := plen / clen
if plen%clen > 0 {
n++
}
m := plen % clen
p := 0
for i, consumer := range consumers {
first := i * n
if first > plen {
first = plen
first := p
last := first + n
if m > 0 && i > m {
last++
}

last := (i + 1) * n
if last > plen {
last = plen
}

for _, pl := range partitions[first:last] {
result[consumer.ID] = append(result[consumer.ID], pl.partition)
}
p = last
}

return result
Expand Down

0 comments on commit 8928a13

Please sign in to comment.