From c8b3e2e53f39b8fdb8f8d4572d19cf84a3d2c833 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 15 Apr 2015 22:24:15 -0400 Subject: [PATCH 1/2] consumer: shut down on OffsetOutOfRange The old behaviour was to redispatch it (so it would go to another broker if necessary) and then retry with the same offset, which was a rather useless thing to do unless the offset had somehow ended up slightly ahead of the available messages (which is unlikely - it is far more likely to fall behind). Instead, simply shut down the PartitionConsumer so the user gets an error (if they're subscribed) and their messages channel closes. They then get to choose whether to give up and switch to a different offset, yell for a human, or whatever. --- consumer.go | 5 +++++ consumer_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/consumer.go b/consumer.go index 25635c026..85a2a67b4 100644 --- a/consumer.go +++ b/consumer.go @@ -516,6 +516,11 @@ func (w *brokerConsumer) subscriptionConsumer() { for child := range w.subscriptions { if err := child.handleResponse(response); err != nil { switch err { + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // so shut it down and force the user to choose what to do + child.AsyncClose() + fallthrough default: child.sendError(err) fallthrough diff --git a/consumer_test.go b/consumer_test.go index 3b899a801..0611c6e23 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -101,6 +101,46 @@ func TestConsumerLatestOffset(t *testing.T) { } } +func TestConsumerShutsDownOutOfRange(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + offsetResponseNewest := new(OffsetResponse) + offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) + leader.Returns(offsetResponseNewest) + + offsetResponseOldest := new(OffsetResponse) + offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) + leader.Returns(offsetResponseOldest) + + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) + leader.Returns(fetchResponse) + + master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + seedBroker.Close() + + consumer, err := master.ConsumePartition("my_topic", 0, 101) + if err != nil { + t.Fatal(err) + } + + if _, ok := <-consumer.Messages(); ok { + t.Error("Expected the consumer to shut down") + } + + leader.Close() + safeClose(t, master) +} + func TestConsumerFunnyOffsets(t *testing.T) { // for topics that are compressed and/or compacted (different things!) we have to be // able to handle receiving offsets that are non-sequential (though still strictly increasing) and From d82cd1f99d0286e7c76f5d3e8cfa9e4d27514877 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 16 Apr 2015 14:00:54 +0000 Subject: [PATCH 2/2] Update godoc --- consumer.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/consumer.go b/consumer.go index 85a2a67b4..99897b950 100644 --- a/consumer.go +++ b/consumer.go @@ -211,11 +211,12 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { // when it passes out of scope. // // The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range -// loop. The PartitionConsumer will under no circumstances stop by itself once it is started, it will -// just keep retrying if it encounters errors. By default, it logs these errors to sarama.Logger; -// if you want to handle errors yourself, set your config's Consumer.Return.Errors to true, and read -// from the Errors channel as well, using a select statement or in a separate goroutine. Check out -// the examples of Consumer to see examples of these different approaches. +// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported +// as out of range by the brokers. In this case you should decide what you want to do (try a different offset, +// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. +// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set +// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement +// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches. type PartitionConsumer interface { // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,