Skip to content

Commit

Permalink
Don't close the dying channel right away
Browse files Browse the repository at this point in the history
Now that we have internal goroutines writing to it, we can't close it
arbitrarily or we risk a panic writing to a closed channel. Instead just write
nil, and have the receiver close it (since it knows nobody else is writing at
the moment).
  • Loading branch information
eapache committed Apr 29, 2015
1 parent 99e8c77 commit 41464b3
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,11 @@ func (child *partitionConsumer) Errors() <-chan *ConsumerError {
}

func (child *partitionConsumer) AsyncClose() {
// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
// also just close itself)
close(child.dying)
child.dying <- nil
}

func (child *partitionConsumer) Close() error {
Expand Down Expand Up @@ -403,9 +403,9 @@ func (child *partitionConsumer) responseFeeder() {
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
child.AsyncClose()
child.sendError(err)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.broker.broker.ID(), err)
child.sendError(err)
child.AsyncClose()
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dying <- err
Expand Down Expand Up @@ -601,6 +601,7 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC
case err := <-child.dying:
if err == nil {
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.dying)
close(child.trigger)
} else {
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
Expand Down

0 comments on commit 41464b3

Please sign in to comment.