-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PartitionConsumer.Close() hangs frequently #325
Comments
Could you provide the complete goroutine trace? The |
Here's the relevant bits:
|
Based on that stack trace the part that's stuck is goroutine 69, which is trying to return a message to the user. However, the |
Captured one more, including the message-consuming goroutine :
|
The new stack trace on the surface looks really weird. Goroutine 56 is blocked trying to write to a channel, and goroutine 66 is apparently blocked trying to read from that same channel. The only possible explanation is if they're not the same channel, because they belong to different PartitionConsumers. After some time digging through the sarama-cluster code, this actually makes sense. It owns multiple PartitionConsumers, but on shutdown it stops reading from all of them, then does effectively: for c := range myPartitionConsumers {
c.Close()
} Since you're no longer reading from any of the PartitionConsumers, the internal consumer code blocks waiting to write a message out to e.g. partition 3, while you're trying to close partition 1. This deadlocks. This is a Sarama design bug (the partitions should not be quite so tightly locked together), but it's not immediately obvious to me what the best fix is internally. Fortunately, there is a very easy workaround: just launch all your wg := sync.WaitGroup{}
for c := range myPartitionConsumers {
wg.Add(1)
go func(c *PartitionConsumer) {
defer wg.Done()
c.Close()
}(c)
}
wg.Wait() Concurrency is hard :( |
Here's one (sort of hacky?) theoretical fix for this on Sarama's end: if at any point the brokerConsumer spends more than 10ms waiting to write to a channel, spawn that write out into a goroutine and unsubscribe the relevant PartitionConsumer so that the rest of them can keep going... |
Yes, you are right, the goroutine trace was based on two partition
consumers running in parallel.
I am not sure if your solution would work, but I will try. Alternatively, I
could create a Consumer instance for each partition, with exactly one
PartitionConsumer each. Then, the partition consumers would not share any
dependencies, right? Not elegant, but effective.
Concurrency is hard, but there is a little too much magic in the latest
sarama - in my opinion. Channels are great, but having too many loops is
not very intuitive and quite hard to follow, especially if (private)
channels are shared between multiple objects. Again, I can't say I spent
hours working through the consumer code, but I tried to debug the issue and
had quite a hard time to figure out relative basics.
|
That would work, but would defeat the entire purpose of the new consumer, which was to share the network connection and win a substantial efficiency gain in doing so. We have a wiki page that gives an architectural overview of the new consumer: https://github.com/Shopify/sarama/wiki/Consumer. It is slightly out of date (this has now landed in master) but the architectural info is correct. |
What I do in my very similar consumergroup library is to have a This approach seems to avoid the deadlocks you are seeing. |
Breaking out of the loop isn't the problem, that bit works just fine. Your
|
Pushed the WaitGroup workaround https://github.com/bsm/sarama-cluster/blob/ad1454f/consumer.go#L386 and it seems to work, except for this case https://travis-ci.org/bsm/sarama-cluster/jobs/53535191. I couldn't reproduce it locally though |
That travis failure is the result of calling |
I updated my consumergroup code to the latest sarama and I am not seeing issues like this. Check out this branch if you're interested: wvanbergen/kafka#37 |
@eapache not sure, PartitionConsumers are closed synchronously and exclusively in @wvanbergen all the problems are related to races under non-trivial conditions. I've never seen any issues in my integration tests either and that's why I specifically added a 'fuzzing' test (https://github.com/bsm/sarama-cluster/blob/master/fuzzing_test.go) which has - so far - been a pretty good at uncovering unexpected behaviour. As you can also see, the test failed only intermittently https://travis-ci.org/bsm/sarama-cluster/builds/53535189 and only when run with |
@dim the panic is "panic: close of closed channel"; the channel being closed is |
@eapache yep, thanks, that's what I thought too and - after more digging - I think I can see an edge case in sarama-cluster where the logic could go wrong. will fix on my end. |
Skip it until the bug is fixed. Also add a missing error check in the test above.
I have a difficult time to recreate the exact issue, since it is intermittent, but it happens frequently enough to be a problem. Calls to PartitionConsumer.Close() tend to hang in https://github.com/Shopify/sarama/blob/6e7ef3c/consumer.go#L341 because the errors channel is never closed by the dispatcher in https://github.com/Shopify/sarama/blob/6e7ef3c/consumer.go#L260. When I dump my goroutines, I can see that it's hanging in https://github.com/Shopify/sarama/blob/6e7ef3c/consumer.go#L231. Does
AsyncClose()
need an additionalclient.trigger <- none{}
?The text was updated successfully, but these errors were encountered: