From 3d23f0ac08daf43800aff5a9539c3f0d75395409 Mon Sep 17 00:00:00 2001 From: Maxim Vladimirsky Date: Mon, 11 Apr 2016 11:30:10 -0700 Subject: [PATCH] Adjust partition offset if out of bounds (fixes #56) --- CHANGELOG.md | 4 +++ consumer/dumb_consumer.go | 25 ++++++++--------- consumer/dumb_consumer_functional_test.go | 21 --------------- consumer/dumb_consumer_test.go | 24 +++++++++++------ consumer/smart_consumer.go | 15 ++++++++--- consumer/smart_consumer_test.go | 33 +++++++++++++++++++++++ producer/producer_test.go | 20 +++++++------- service/service_test.go | 32 +++++++++++----------- testhelpers/kafkahelper/kafkahelper.go | 22 ++++++++++++++- 9 files changed, 124 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b483a095..ff00a176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +#### Version TBD (TBD) + +* [#56](https://github.com/mailgun/kafka-pixy/issues/56) Invalid stored offset makes consumer panic + #### Version 0.10.1 (2015-12-21) * [#49](https://github.com/mailgun/kafka-pixy/pull/49) Topic consumption stops while ownership retained. diff --git a/consumer/dumb_consumer.go b/consumer/dumb_consumer.go index 06144253..bc71c4fe 100644 --- a/consumer/dumb_consumer.go +++ b/consumer/dumb_consumer.go @@ -45,14 +45,17 @@ func (ce ConsumerErrors) Error() string { // Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of // scope. -// -// Sarama's Consumer type does not currently support automatic consumer group rebalancing and offset tracking, -// however the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. We plan -// to properly integrate this functionality at a later date. type Consumer interface { - // ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will - // return an error if this Consumer is already consuming on the given topic/partition. Offset can be a - // literal offset, or OffsetNewest or OffsetOldest + // ConsumePartition creates a PartitionConsumer on the given topic/partition + // with the given offset. It will return an error if this Consumer is + // already consuming on the given topic/partition. Offset can be a + // literal offset, or OffsetNewest or OffsetOldest. + // + // If offset is smaller then the oldest offset then the oldest offset is + // returned. If offset is larger then the newest offset then the newest + // offset is returned. If offset is either sarama.OffsetNewest or + // sarama.OffsetOldest constant, then the actual offset value is returned. + // otherwise offset is returned. ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, int64, error) // Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed. @@ -181,14 +184,12 @@ func (c *consumer) chooseStartingOffset(topic string, partition int32, offset in } switch { - case offset == sarama.OffsetNewest: + case offset == sarama.OffsetNewest || offset > newestOffset: return newestOffset, nil - case offset == sarama.OffsetOldest: + case offset == sarama.OffsetOldest || offset < oldestOffset: return oldestOffset, nil - case offset >= oldestOffset && offset <= newestOffset: - return offset, nil default: - return 0, sarama.ErrOffsetOutOfRange + return offset, nil } } diff --git a/consumer/dumb_consumer_functional_test.go b/consumer/dumb_consumer_functional_test.go index 7b883d76..8d4bddb5 100644 --- a/consumer/dumb_consumer_functional_test.go +++ b/consumer/dumb_consumer_functional_test.go @@ -2,33 +2,12 @@ package consumer import ( "fmt" - "math" "testing" "time" "github.com/Shopify/sarama" ) -func TestFuncConsumerOffsetOutOfRange(t *testing.T) { - setupFunctionalTest(t) - defer teardownFunctionalTest(t) - - consumer, err := NewConsumer(kafkaBrokers, nil) - if err != nil { - t.Fatal(err) - } - - if _, _, err := consumer.ConsumePartition("test.1", 0, -10); err != sarama.ErrOffsetOutOfRange { - t.Error("Expected ErrOffsetOutOfRange, got:", err) - } - - if _, _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != sarama.ErrOffsetOutOfRange { - t.Error("Expected ErrOffsetOutOfRange, got:", err) - } - - safeClose(t, consumer) -} - func TestConsumerHighWaterMarkOffset(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) diff --git a/consumer/dumb_consumer_test.go b/consumer/dumb_consumer_test.go index 0c7fecb4..204530ee 100644 --- a/consumer/dumb_consumer_test.go +++ b/consumer/dumb_consumer_test.go @@ -785,8 +785,8 @@ func TestConsumerOffsetOutOfRange(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("my_topic", 0, sarama.OffsetNewest, 1234). - SetOffset("my_topic", 0, sarama.OffsetOldest, 2345), + SetOffset("my_topic", 0, sarama.OffsetNewest, 2000). + SetOffset("my_topic", 0, sarama.OffsetOldest, 1000), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -795,15 +795,23 @@ func TestConsumerOffsetOutOfRange(t *testing.T) { } // When/Then - if _, _, err := master.ConsumePartition("my_topic", 0, 0); err != sarama.ErrOffsetOutOfRange { - t.Fatal("Should return ErrOffsetOutOfRange, got:", err) + pc, offset, err := master.ConsumePartition("my_topic", 0, 0) + if err != nil { + t.Fatal(err) } - if _, _, err := master.ConsumePartition("my_topic", 0, 3456); err != sarama.ErrOffsetOutOfRange { - t.Fatal("Should return ErrOffsetOutOfRange, got:", err) + if offset != 1000 { + t.Fatal("Should return 1000, got:", offset) } - if _, _, err := master.ConsumePartition("my_topic", 0, -3); err != sarama.ErrOffsetOutOfRange { - t.Fatal("Should return ErrOffsetOutOfRange, got:", err) + safeClose(t, pc) + + pc, offset, err = master.ConsumePartition("my_topic", 0, 3456) + if err != nil { + t.Fatal(err) } + if offset != 2000 { + t.Fatal("Should return 2000, got:", offset) + } + safeClose(t, pc) safeClose(t, master) broker0.Close() diff --git a/consumer/smart_consumer.go b/consumer/smart_consumer.go index c07c031b..85659505 100644 --- a/consumer/smart_consumer.go +++ b/consumer/smart_consumer.go @@ -275,7 +275,9 @@ func (ec *exclusiveConsumer) run() { om, err := ec.offsetMgrFactory.NewOffsetManager(ec.group, ec.topic, ec.partition) if err != nil { - panic(fmt.Errorf("<%s> failed to spawn partition manager: err=(%s)", ec.contextID, err)) + // Must never happen. + log.Errorf("<%s> failed to spawn offset manager: err=(%s)", ec.contextID, err) + return } defer om.Stop() @@ -289,11 +291,16 @@ func (ec *exclusiveConsumer) run() { pc, concreteOffset, err := ec.dumbConsumer.ConsumePartition(ec.topic, ec.partition, initialOffset.Offset) if err != nil { - panic(fmt.Errorf("<%s> failed to start partition consumer: offset=%d, err=(%s)", ec.contextID, initialOffset.Offset, err)) + // Must never happen. + log.Errorf("<%s> failed to start partition consumer: offset=%d, err=(%s)", ec.contextID, initialOffset.Offset, err) + return } defer pc.Close() - log.Infof("<%s> initialized: initialOffset=%d, concreteOffset=%d", - ec.contextID, initialOffset.Offset, concreteOffset) + if initialOffset.Offset != concreteOffset { + log.Errorf("<%s> invalid initial offset: stored=%d, adjusted=%d", + ec.contextID, initialOffset.Offset, concreteOffset) + } + log.Infof("<%s> initialized: offset=%d", ec.contextID, concreteOffset) var lastSubmittedOffset, lastCommittedOffset int64 diff --git a/consumer/smart_consumer_test.go b/consumer/smart_consumer_test.go index dedfce51..3c941e10 100644 --- a/consumer/smart_consumer_test.go +++ b/consumer/smart_consumer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/mailgun/kafka-pixy/offsetmgr" "github.com/mailgun/kafka-pixy/testhelpers" "github.com/mailgun/kafka-pixy/testhelpers/kafkahelper" "github.com/mailgun/log" @@ -37,6 +38,38 @@ func (s *SmartConsumerSuite) TearDownSuite(c *C) { s.kh.Close() } +// If initial offset stored in Kafka is greater then the newest offset for a +// partition, then the first message consumed from the partition is the next one +// posted to it. +func (s *SmartConsumerSuite) TestInitialOffsetTooLarge(c *C) { + oldestOffsets := s.kh.GetOldestOffsets("test.1") + newestOffsets := s.kh.GetNewestOffsets("test.1") + log.Infof("*** test.1 offsets: oldest=%v, newest=%v", oldestOffsets, newestOffsets) + + omf := offsetmgr.NewFactory(s.kh.Client()) + defer omf.Stop() + om, err := omf.NewOffsetManager("group-1", "test.1", 0) + c.Assert(err, IsNil) + om.SubmitOffset(newestOffsets[0]+100, "") + om.Stop() + + sc, err := Spawn(testhelpers.NewTestConfig("group-1")) + c.Assert(err, IsNil) + + // When + _, err = sc.Consume("group-1", "test.1") + + // Then + c.Assert(err, FitsTypeOf, ErrRequestTimeout(fmt.Errorf(""))) + + produced := s.kh.PutMessages("offset-too-large", "test.1", map[string]int{"key": 1}) + consumed := s.consume(c, sc, "group-1", "test.1", 1) + c.Assert(consumed["key"][0].Offset, Equals, newestOffsets[0]) + assertMsg(c, consumed["key"][0], produced["key"][0]) + + sc.Stop() +} + // If a topic has only one partition then the consumer will retrieve messages // in the order they were produced. func (s *SmartConsumerSuite) TestSinglePartitionTopic(c *C) { diff --git a/producer/producer_test.go b/producer/producer_test.go index a5f18ed7..30c6fc03 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -54,12 +54,12 @@ func (s *ProducerSuite) TestStartAndStop(c *C) { func (s *ProducerSuite) TestProduce(c *C) { // Given p, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When _, err := p.Produce("test.4", sarama.StringEncoder("1"), sarama.StringEncoder("Foo")) // Then c.Assert(err, IsNil) - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]+1) // Cleanup p.Stop() @@ -81,7 +81,7 @@ func (s *ProducerSuite) TestProduceInvalidTopic(c *C) { func (s *ProducerSuite) TestAsyncProduce(c *C) { // Given p, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 10; i++ { p.AsyncProduce("test.4", sarama.StringEncoder("1"), sarama.StringEncoder(strconv.Itoa(i))) @@ -91,7 +91,7 @@ func (s *ProducerSuite) TestAsyncProduce(c *C) { p.AsyncProduce("test.4", sarama.StringEncoder("5"), sarama.StringEncoder(strconv.Itoa(i))) } p.Stop() - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(s.failedMessages(), DeepEquals, []string{}) c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]+20) @@ -106,13 +106,13 @@ func (s *ProducerSuite) TestAsyncProduce(c *C) { func (s *ProducerSuite) TestAsyncProduceNilKey(c *C) { // Given p, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 100; i++ { p.AsyncProduce("test.4", nil, sarama.StringEncoder(strconv.Itoa(i))) } p.Stop() - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(s.failedMessages(), DeepEquals, []string{}) delta0 := offsetsAfter[0] - offsetsBefore[0] @@ -129,14 +129,14 @@ func (s *ProducerSuite) TestTooSmallShutdownTimeout(c *C) { // Given s.cfg.Producer.ShutdownTimeout = 0 p, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 100; i++ { v := sarama.StringEncoder(strconv.Itoa(i)) p.AsyncProduce("test.4", v, v) } p.Stop() - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(s.failedMessages(), DeepEquals, []string{}) delta := int64(0) @@ -151,13 +151,13 @@ func (s *ProducerSuite) TestTooSmallShutdownTimeout(c *C) { func (s *ProducerSuite) TestAsyncProduceEmptyKey(c *C) { // Given p, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 10; i++ { p.AsyncProduce("test.4", sarama.StringEncoder(""), sarama.StringEncoder(strconv.Itoa(i))) } p.Stop() - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(s.failedMessages(), DeepEquals, []string{}) c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]) diff --git a/service/service_test.go b/service/service_test.go index b25f7644..86d64ee5 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -97,7 +97,7 @@ func (s *ServiceSuite) TestInvalidKafkaPeers(c *C) { func (s *ServiceSuite) TestProduce(c *C) { // Given svc, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 10; i++ { @@ -113,7 +113,7 @@ func (s *ServiceSuite) TestProduce(c *C) { "text/plain", strings.NewReader(strconv.Itoa(i))) } svc.Stop() // Have to stop before getOffsets - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]+20) @@ -128,7 +128,7 @@ func (s *ServiceSuite) TestProduce(c *C) { func (s *ServiceSuite) TestProduceNilKey(c *C) { // Given svc, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 100; i++ { @@ -136,7 +136,7 @@ func (s *ServiceSuite) TestProduceNilKey(c *C) { "text/plain", strings.NewReader(strconv.Itoa(i))) } svc.Stop() // Have to stop before getOffsets - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then delta0 := offsetsAfter[0] - offsetsBefore[0] @@ -151,7 +151,7 @@ func (s *ServiceSuite) TestProduceNilKey(c *C) { // submitted to a particular partition determined by the empty key hash. func (s *ServiceSuite) TestProduceEmptyKey(c *C) { svc, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When for i := 0; i < 10; i++ { @@ -159,7 +159,7 @@ func (s *ServiceSuite) TestProduceEmptyKey(c *C) { "text/plain", strings.NewReader(strconv.Itoa(i))) } svc.Stop() // Have to stop before getOffsets - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]) @@ -171,7 +171,7 @@ func (s *ServiceSuite) TestProduceEmptyKey(c *C) { // Utf8 messages are submitted without a problem. func (s *ServiceSuite) TestUtf8Message(c *C) { svc, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When s.unixClient.Post("http://_/topics/test.4/messages?key=foo", @@ -179,7 +179,7 @@ func (s *ServiceSuite) TestUtf8Message(c *C) { svc.Stop() // Have to stop before getOffsets // Then - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") msgs := s.kh.GetMessages("test.4", offsetsBefore, offsetsAfter) c.Assert(msgs, DeepEquals, [][]string{[]string(nil), {"Превед Медвед"}, []string(nil), []string(nil)}) @@ -202,7 +202,7 @@ func (s *ServiceSuite) TestTCPDoesNotWork(c *C) { // API is served on a TCP socket if it is explicitly configured. func (s *ServiceSuite) TestBothAPI(c *C) { - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") s.cfg.TCPAddr = "127.0.0.1:55501" svc, err := Spawn(s.cfg) c.Assert(err, IsNil) @@ -217,7 +217,7 @@ func (s *ServiceSuite) TestBothAPI(c *C) { svc.Stop() // Have to stop before getOffsets c.Assert(err1, IsNil) c.Assert(err2, IsNil) - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") msgs := s.kh.GetMessages("test.4", offsetsBefore, offsetsAfter) c.Assert(msgs, DeepEquals, [][]string{[]string(nil), {"Превед", "Kitty"}, []string(nil), []string(nil)}) @@ -242,7 +242,7 @@ func (s *ServiceSuite) TestStoppedServerCall(c *C) { // Messages that have maximum possible size indeed go through. Note that we // assume that the broker's limit is the same as the producer's one or higher. func (s *ServiceSuite) TestLargestMessage(c *C) { - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") maxMsgSize := sarama.NewConfig().Producer.MaxMessageBytes - ProdMsgMetadataSize([]byte("foo")) msg := GenMessage(maxMsgSize) s.cfg.TCPAddr = "127.0.0.1:55501" @@ -255,7 +255,7 @@ func (s *ServiceSuite) TestLargestMessage(c *C) { // Then c.Assert(r.StatusCode, Equals, http.StatusOK) c.Assert(ParseJSONBody(c, r), DeepEquals, map[string]interface{}{}) - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") messages := s.kh.GetMessages("test.4", offsetsBefore, offsetsAfter) readMsg := messages[1][0] c.Assert(readMsg, Equals, msg) @@ -265,7 +265,7 @@ func (s *ServiceSuite) TestLargestMessage(c *C) { // dropped. Note that we assume that the broker's limit is the same as the // producer's one or higher. func (s *ServiceSuite) TestMessageTooLarge(c *C) { - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") maxMsgSize := sarama.NewConfig().Producer.MaxMessageBytes - ProdMsgMetadataSize([]byte("foo")) + 1 msg := GenMessage(maxMsgSize) s.cfg.TCPAddr = "127.0.0.1:55501" @@ -278,20 +278,20 @@ func (s *ServiceSuite) TestMessageTooLarge(c *C) { // Then c.Assert(r.StatusCode, Equals, http.StatusOK) c.Assert(ParseJSONBody(c, r), DeepEquals, map[string]interface{}{}) - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") c.Assert(offsetsAfter, DeepEquals, offsetsBefore) } func (s *ServiceSuite) TestSyncProduce(c *C) { // Given svc, _ := Spawn(s.cfg) - offsetsBefore := s.kh.GetOffsets("test.4") + offsetsBefore := s.kh.GetNewestOffsets("test.4") // When r, err := s.unixClient.Post("http://_/topics/test.4/messages?key=1&sync", "text/plain", strings.NewReader("Foo")) svc.Stop() // Have to stop before getOffsets - offsetsAfter := s.kh.GetOffsets("test.4") + offsetsAfter := s.kh.GetNewestOffsets("test.4") // Then c.Assert(err, IsNil) diff --git a/testhelpers/kafkahelper/kafkahelper.go b/testhelpers/kafkahelper/kafkahelper.go index f99b9ddd..6c779bc2 100644 --- a/testhelpers/kafkahelper/kafkahelper.go +++ b/testhelpers/kafkahelper/kafkahelper.go @@ -40,13 +40,17 @@ func New(c *C) *T { return kh } +func (kh *T) Client() sarama.Client { + return kh.client +} + func (kh *T) Close() { kh.producer.Close() kh.consumer.Close() kh.client.Close() } -func (kh *T) GetOffsets(topic string) []int64 { +func (kh *T) GetNewestOffsets(topic string) []int64 { offsets := []int64{} partitions, err := kh.client.Partitions(topic) if err != nil { @@ -62,6 +66,22 @@ func (kh *T) GetOffsets(topic string) []int64 { return offsets } +func (kh *T) GetOldestOffsets(topic string) []int64 { + offsets := []int64{} + partitions, err := kh.client.Partitions(topic) + if err != nil { + panic(err) + } + for _, p := range partitions { + offset, err := kh.client.GetOffset(topic, p, sarama.OffsetOldest) + if err != nil { + panic(err) + } + offsets = append(offsets, offset) + } + return offsets +} + func (kh *T) GetMessages(topic string, begin, end []int64) [][]string { writtenMsgs := make([][]string, len(begin)) for i := range begin {