Skip to content

Commit

Permalink
Adjust partition offset if out of bounds (fixes #56)
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Apr 11, 2016
1 parent 05cc42b commit 3d23f0a
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 72 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

#### Version TBD (TBD)

* [#56](https://github.com/mailgun/kafka-pixy/issues/56) Invalid stored offset makes consumer panic

#### Version 0.10.1 (2015-12-21)

* [#49](https://github.com/mailgun/kafka-pixy/pull/49) Topic consumption stops while ownership retained.
Expand Down
25 changes: 13 additions & 12 deletions consumer/dumb_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ func (ce ConsumerErrors) Error() string {
// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope.
//
// Sarama's Consumer type does not currently support automatic consumer group rebalancing and offset tracking,
// however the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. We plan
// to properly integrate this functionality at a later date.
type Consumer interface {
// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
// literal offset, or OffsetNewest or OffsetOldest
// ConsumePartition creates a PartitionConsumer on the given topic/partition
// with the given offset. It will return an error if this Consumer is
// already consuming on the given topic/partition. Offset can be a
// literal offset, or OffsetNewest or OffsetOldest.
//
// If offset is smaller then the oldest offset then the oldest offset is
// returned. If offset is larger then the newest offset then the newest
// offset is returned. If offset is either sarama.OffsetNewest or
// sarama.OffsetOldest constant, then the actual offset value is returned.
// otherwise offset is returned.
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, int64, error)

// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
Expand Down Expand Up @@ -181,14 +184,12 @@ func (c *consumer) chooseStartingOffset(topic string, partition int32, offset in
}

switch {
case offset == sarama.OffsetNewest:
case offset == sarama.OffsetNewest || offset > newestOffset:
return newestOffset, nil
case offset == sarama.OffsetOldest:
case offset == sarama.OffsetOldest || offset < oldestOffset:
return oldestOffset, nil
case offset >= oldestOffset && offset <= newestOffset:
return offset, nil
default:
return 0, sarama.ErrOffsetOutOfRange
return offset, nil
}
}

Expand Down
21 changes: 0 additions & 21 deletions consumer/dumb_consumer_functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,12 @@ package consumer

import (
"fmt"
"math"
"testing"
"time"

"github.com/Shopify/sarama"
)

func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, _, err := consumer.ConsumePartition("test.1", 0, -10); err != sarama.ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

if _, _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != sarama.ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

safeClose(t, consumer)
}

func TestConsumerHighWaterMarkOffset(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
Expand Down
24 changes: 16 additions & 8 deletions consumer/dumb_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,8 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset("my_topic", 0, sarama.OffsetNewest, 1234).
SetOffset("my_topic", 0, sarama.OffsetOldest, 2345),
SetOffset("my_topic", 0, sarama.OffsetNewest, 2000).
SetOffset("my_topic", 0, sarama.OffsetOldest, 1000),
})

master, err := NewConsumer([]string{broker0.Addr()}, nil)
Expand All @@ -795,15 +795,23 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
}

// When/Then
if _, _, err := master.ConsumePartition("my_topic", 0, 0); err != sarama.ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
pc, offset, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}
if _, _, err := master.ConsumePartition("my_topic", 0, 3456); err != sarama.ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
if offset != 1000 {
t.Fatal("Should return 1000, got:", offset)
}
if _, _, err := master.ConsumePartition("my_topic", 0, -3); err != sarama.ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
safeClose(t, pc)

pc, offset, err = master.ConsumePartition("my_topic", 0, 3456)
if err != nil {
t.Fatal(err)
}
if offset != 2000 {
t.Fatal("Should return 2000, got:", offset)
}
safeClose(t, pc)

safeClose(t, master)
broker0.Close()
Expand Down
15 changes: 11 additions & 4 deletions consumer/smart_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ func (ec *exclusiveConsumer) run() {

om, err := ec.offsetMgrFactory.NewOffsetManager(ec.group, ec.topic, ec.partition)
if err != nil {
panic(fmt.Errorf("<%s> failed to spawn partition manager: err=(%s)", ec.contextID, err))
// Must never happen.
log.Errorf("<%s> failed to spawn offset manager: err=(%s)", ec.contextID, err)
return
}
defer om.Stop()

Expand All @@ -289,11 +291,16 @@ func (ec *exclusiveConsumer) run() {

pc, concreteOffset, err := ec.dumbConsumer.ConsumePartition(ec.topic, ec.partition, initialOffset.Offset)
if err != nil {
panic(fmt.Errorf("<%s> failed to start partition consumer: offset=%d, err=(%s)", ec.contextID, initialOffset.Offset, err))
// Must never happen.
log.Errorf("<%s> failed to start partition consumer: offset=%d, err=(%s)", ec.contextID, initialOffset.Offset, err)
return
}
defer pc.Close()
log.Infof("<%s> initialized: initialOffset=%d, concreteOffset=%d",
ec.contextID, initialOffset.Offset, concreteOffset)
if initialOffset.Offset != concreteOffset {
log.Errorf("<%s> invalid initial offset: stored=%d, adjusted=%d",
ec.contextID, initialOffset.Offset, concreteOffset)
}
log.Infof("<%s> initialized: offset=%d", ec.contextID, concreteOffset)

var lastSubmittedOffset, lastCommittedOffset int64

Expand Down
33 changes: 33 additions & 0 deletions consumer/smart_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/mailgun/kafka-pixy/offsetmgr"
"github.com/mailgun/kafka-pixy/testhelpers"
"github.com/mailgun/kafka-pixy/testhelpers/kafkahelper"
"github.com/mailgun/log"
Expand Down Expand Up @@ -37,6 +38,38 @@ func (s *SmartConsumerSuite) TearDownSuite(c *C) {
s.kh.Close()
}

// If initial offset stored in Kafka is greater then the newest offset for a
// partition, then the first message consumed from the partition is the next one
// posted to it.
func (s *SmartConsumerSuite) TestInitialOffsetTooLarge(c *C) {
oldestOffsets := s.kh.GetOldestOffsets("test.1")
newestOffsets := s.kh.GetNewestOffsets("test.1")
log.Infof("*** test.1 offsets: oldest=%v, newest=%v", oldestOffsets, newestOffsets)

omf := offsetmgr.NewFactory(s.kh.Client())
defer omf.Stop()
om, err := omf.NewOffsetManager("group-1", "test.1", 0)
c.Assert(err, IsNil)
om.SubmitOffset(newestOffsets[0]+100, "")
om.Stop()

sc, err := Spawn(testhelpers.NewTestConfig("group-1"))
c.Assert(err, IsNil)

// When
_, err = sc.Consume("group-1", "test.1")

// Then
c.Assert(err, FitsTypeOf, ErrRequestTimeout(fmt.Errorf("")))

produced := s.kh.PutMessages("offset-too-large", "test.1", map[string]int{"key": 1})
consumed := s.consume(c, sc, "group-1", "test.1", 1)
c.Assert(consumed["key"][0].Offset, Equals, newestOffsets[0])
assertMsg(c, consumed["key"][0], produced["key"][0])

sc.Stop()
}

// If a topic has only one partition then the consumer will retrieve messages
// in the order they were produced.
func (s *SmartConsumerSuite) TestSinglePartitionTopic(c *C) {
Expand Down
20 changes: 10 additions & 10 deletions producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func (s *ProducerSuite) TestStartAndStop(c *C) {
func (s *ProducerSuite) TestProduce(c *C) {
// Given
p, _ := Spawn(s.cfg)
offsetsBefore := s.kh.GetOffsets("test.4")
offsetsBefore := s.kh.GetNewestOffsets("test.4")
// When
_, err := p.Produce("test.4", sarama.StringEncoder("1"), sarama.StringEncoder("Foo"))
// Then
c.Assert(err, IsNil)
offsetsAfter := s.kh.GetOffsets("test.4")
offsetsAfter := s.kh.GetNewestOffsets("test.4")
c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]+1)
// Cleanup
p.Stop()
Expand All @@ -81,7 +81,7 @@ func (s *ProducerSuite) TestProduceInvalidTopic(c *C) {
func (s *ProducerSuite) TestAsyncProduce(c *C) {
// Given
p, _ := Spawn(s.cfg)
offsetsBefore := s.kh.GetOffsets("test.4")
offsetsBefore := s.kh.GetNewestOffsets("test.4")
// When
for i := 0; i < 10; i++ {
p.AsyncProduce("test.4", sarama.StringEncoder("1"), sarama.StringEncoder(strconv.Itoa(i)))
Expand All @@ -91,7 +91,7 @@ func (s *ProducerSuite) TestAsyncProduce(c *C) {
p.AsyncProduce("test.4", sarama.StringEncoder("5"), sarama.StringEncoder(strconv.Itoa(i)))
}
p.Stop()
offsetsAfter := s.kh.GetOffsets("test.4")
offsetsAfter := s.kh.GetNewestOffsets("test.4")
// Then
c.Assert(s.failedMessages(), DeepEquals, []string{})
c.Assert(offsetsAfter[0], Equals, offsetsBefore[0]+20)
Expand All @@ -106,13 +106,13 @@ func (s *ProducerSuite) TestAsyncProduce(c *C) {
func (s *ProducerSuite) TestAsyncProduceNilKey(c *C) {
// Given
p, _ := Spawn(s.cfg)
offsetsBefore := s.kh.GetOffsets("test.4")
offsetsBefore := s.kh.GetNewestOffsets("test.4")
// When
for i := 0; i < 100; i++ {
p.AsyncProduce("test.4", nil, sarama.StringEncoder(strconv.Itoa(i)))
}
p.Stop()
offsetsAfter := s.kh.GetOffsets("test.4")
offsetsAfter := s.kh.GetNewestOffsets("test.4")
// Then
c.Assert(s.failedMessages(), DeepEquals, []string{})
delta0 := offsetsAfter[0] - offsetsBefore[0]
Expand All @@ -129,14 +129,14 @@ func (s *ProducerSuite) TestTooSmallShutdownTimeout(c *C) {
// Given
s.cfg.Producer.ShutdownTimeout = 0
p, _ := Spawn(s.cfg)
offsetsBefore := s.kh.GetOffsets("test.4")
offsetsBefore := s.kh.GetNewestOffsets("test.4")
// When
for i := 0; i < 100; i++ {
v := sarama.StringEncoder(strconv.Itoa(i))
p.AsyncProduce("test.4", v, v)
}
p.Stop()
offsetsAfter := s.kh.GetOffsets("test.4")
offsetsAfter := s.kh.GetNewestOffsets("test.4")
// Then
c.Assert(s.failedMessages(), DeepEquals, []string{})
delta := int64(0)
Expand All @@ -151,13 +151,13 @@ func (s *ProducerSuite) TestTooSmallShutdownTimeout(c *C) {
func (s *ProducerSuite) TestAsyncProduceEmptyKey(c *C) {
// Given
p, _ := Spawn(s.cfg)
offsetsBefore := s.kh.GetOffsets("test.4")
offsetsBefore := s.kh.GetNewestOffsets("test.4")
// When
for i := 0; i < 10; i++ {
p.AsyncProduce("test.4", sarama.StringEncoder(""), sarama.StringEncoder(strconv.Itoa(i)))
}
p.Stop()
offsetsAfter := s.kh.GetOffsets("test.4")
offsetsAfter := s.kh.GetNewestOffsets("test.4")
// Then
c.Assert(s.failedMessages(), DeepEquals, []string{})
c.Assert(offsetsAfter[0], Equals, offsetsBefore[0])
Expand Down
Loading

0 comments on commit 3d23f0a

Please sign in to comment.