diff --git a/consumer.go b/consumer.go index 25635c026..99897b950 100644 --- a/consumer.go +++ b/consumer.go @@ -211,11 +211,12 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { // when it passes out of scope. // // The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range -// loop. The PartitionConsumer will under no circumstances stop by itself once it is started, it will -// just keep retrying if it encounters errors. By default, it logs these errors to sarama.Logger; -// if you want to handle errors yourself, set your config's Consumer.Return.Errors to true, and read -// from the Errors channel as well, using a select statement or in a separate goroutine. Check out -// the examples of Consumer to see examples of these different approaches. +// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported +// as out of range by the brokers. In this case you should decide what you want to do (try a different offset, +// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. +// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set +// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement +// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches. type PartitionConsumer interface { // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, @@ -516,6 +517,11 @@ func (w *brokerConsumer) subscriptionConsumer() { for child := range w.subscriptions { if err := child.handleResponse(response); err != nil { switch err { + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // so shut it down and force the user to choose what to do + child.AsyncClose() + fallthrough default: child.sendError(err) fallthrough diff --git a/consumer_test.go b/consumer_test.go index 3b899a801..0611c6e23 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -101,6 +101,46 @@ func TestConsumerLatestOffset(t *testing.T) { } } +func TestConsumerShutsDownOutOfRange(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) + leader.Returns(offsetResponseNewest) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) + leader.Returns(offsetResponseOldest) + + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) + leader.Returns(fetchResponse) + + master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + seedBroker.Close() + + consumer, err := master.ConsumePartition("my_topic", 0, 101) + if err != nil { + t.Fatal(err) + } + + if _, ok := <-consumer.Messages(); ok { + t.Error("Expected the consumer to shut down") + } + + leader.Close() + safeClose(t, master) +} + func TestConsumerFunnyOffsets(t *testing.T) { // for topics that are compressed and/or compacted (different things!) we have to be // able to handle receiving offsets that are non-sequential (though still strictly increasing) and