Skip to content

Commit

Permalink
Merge pull request #437 from Shopify/consumer-async-partitions
Browse files Browse the repository at this point in the history
Consumer async partitions
  • Loading branch information
eapache committed Apr 29, 2015
2 parents 7abda07 + 3bd5a52 commit 1bd758a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 43 deletions.
80 changes: 51 additions & 29 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
trigger: make(chan none, 1),
dying: make(chan none),
dying: make(chan error, 1),
fetchSize: c.conf.Consumer.Fetch.Default,
}

Expand All @@ -144,6 +145,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
}

go withRecover(child.dispatcher)
go withRecover(child.responseFeeder)

child.broker = c.refBrokerConsumer(leader)
child.broker.input <- child
Expand Down Expand Up @@ -259,10 +261,12 @@ type partitionConsumer struct {
topic string
partition int32

broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
trigger, dying chan none
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse
trigger chan none
dying chan error

fetchSize int32
offset int64
Expand Down Expand Up @@ -306,8 +310,7 @@ func (child *partitionConsumer) dispatcher() {
child.consumer.unrefBrokerConsumer(child.broker)
}
child.consumer.removeChild(child)
close(child.messages)
close(child.errors)
close(child.feeder)
}

func (child *partitionConsumer) dispatch() error {
Expand Down Expand Up @@ -361,11 +364,11 @@ func (child *partitionConsumer) Errors() <-chan *ConsumerError {
}

func (child *partitionConsumer) AsyncClose() {
// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
// also just close itself)
close(child.dying)
child.dying <- nil
}

func (child *partitionConsumer) Close() error {
Expand All @@ -392,6 +395,33 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&child.highWaterMarkOffset)
}

func (child *partitionConsumer) responseFeeder() {
for response := range child.feeder {
switch err := child.handleResponse(response); err {
case nil:
break
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
child.sendError(err)
child.AsyncClose()
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dying <- err
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dying <- err
}

child.broker.acks.Done()
}

close(child.messages)
close(child.errors)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
Expand Down Expand Up @@ -468,6 +498,7 @@ type brokerConsumer struct {
newSubscriptions chan []*partitionConsumer
wait chan none
subscriptions map[*partitionConsumer]none
acks sync.WaitGroup
refs int
}

Expand Down Expand Up @@ -550,25 +581,11 @@ func (bc *brokerConsumer) subscriptionConsumer() {
return
}

bc.acks.Add(len(bc.subscriptions))
for child := range bc.subscriptions {
if err := child.handleResponse(response); err != nil {
switch err {
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
child.AsyncClose()
fallthrough
default:
child.sendError(err)
fallthrough
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(bc.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
}
}
child.feeder <- response
}
bc.acks.Wait()
}
}

Expand All @@ -581,10 +598,15 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC

for child := range bc.subscriptions {
select {
case <-child.dying:
close(child.trigger)
case err := <-child.dying:
if err == nil {
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
} else {
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
child.trigger <- none{}
}
delete(bc.subscriptions, child)
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
default:
}
}
Expand Down
20 changes: 6 additions & 14 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestConsumerOffsetManual(t *testing.T) {
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

for i := 0; i <= 10; i++ {
for i := 0; i < 10; i++ {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
leader.Returns(fetchResponse)
Expand Down Expand Up @@ -326,27 +326,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
leader0.Returns(fetchResponse)
time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering

// leader0 provides last message on partition 1
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader0.Returns(fetchResponse)

// leader1 provides last message on partition 0
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader1.Returns(fetchResponse)

wg.Wait()
leader1.Close()
leader0.Close()
wg.Wait()
seedBroker.Close()
safeClose(t, master)
}

func TestConsumerInterleavedClose(t *testing.T) {
t.Skip("Enable once bug #325 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand Down Expand Up @@ -379,6 +368,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)
time.Sleep(50 * time.Millisecond)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
Expand All @@ -392,7 +382,9 @@ func TestConsumerInterleavedClose(t *testing.T) {
if err != nil {
t.Fatal(err)
}
<-c0.Messages()

fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

Expand Down

0 comments on commit 1bd758a

Please sign in to comment.