-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PartitionedStream hanging after stream consumption started #1107
Comments
Tricky stuff. Since 2.4 zio-kafka polls the consumer continuously. This gives a huge performance improvement, but it also means that the I am not really sure what we can do about this. |
There is continuous polling yes, but the Semaphore used to guarantee exclusive access to the underlying apache kafka consumer is requested for every poll. So other calls like I can't think of a reason why that would not be the case, we'd have to try to reproduce the issue. |
Fair semaphores are not very performant so most implementations are not fair. I could not find out if ZIO's semaphore is fair. |
Fair enough (pun intended) 😛 |
As a workaround, perhaps something like this works to first get committed and end offsets before starting a partition stream: Consumer
.partitionedStream(???)
.mapZIO { case (tp, streamIn) =>
(ZIO.succeed(tp) zip ZIO.succeed(streamIn) zip Consumer.committed(Set(tp)) zip Consumer.endOffsets(
Set(tp)
))
}
.flatMapPar(nrPartitions) { case (tp, streamIn, committed, endOffsets) =>
streamIn
.debug // etc
} |
The issue can occurs with partitions rebalance. This mean the polling is still active, and I need to recompute my starting state with endOffsets/committedOffset for the new assigned partition. I think your workaround could work at first start, but not during rebalance. To complete my issue, then after few minutes (5-10 min) The consumer end up with this log:
I am running my program on kube. At this point, the consumer group doesn't have any assigned partition on the kafka cluster and the process is not finish so kube doesn't restart the pod.. The consumer seems to hang forever. |
@erikvanoosten @erikvanoosten, What do you think to send a new |
Just got confirmation that ZIO's semaphore is not fair. @flavienbert IMHO it will be easier (and more predictable) to make the semaphore that protects the consumer fair, that is give the users access in the order that they arrived. |
Agree with you. Are you handling it? Or do you want me to take care of it? |
Please, go ahead! 🙏 |
Replace the semaphore that is used to control access to the java consumer with a reentrant lock that has fairness enabled. This prevents liveness problems for consumers that need to use the consumer, e.g. to fetch committed offsets. Fixes #1107.
Hi, I notice a bug in
v2.6.0
that is not inv2.3.4
.I am using a
Consumer.partitionedStream
and for each partition I need to get the committed and endOffset with theConsumer.endOffsets
/Consumer.committed
method:I have 12 partitions so I use
mapZIOParUnordered
with a parallelism of 12 to process all my partitions.I notice that as soon as the first stream starts for one partition, then the Consumer.endOffsets and Consumer.committed are really long for others partitions and too long. It looks like the polling of the consumer blocks other threads to achieve Consumer.endOffsets and Consumer.committed tasks. The result after few minutes is that all the partitions are not assigned anymore forever but the process is not kill.
The text was updated successfully, but these errors were encountered: