Skip to content

Commit

Permalink
Merge pull request #387 from Shopify/move-handle-response
Browse files Browse the repository at this point in the history
Put `handleResponse` method on partitionConsumer
  • Loading branch information
eapache committed Mar 23, 2015
2 parents 94a77f7 + e71403d commit efedcc1
Showing 1 changed file with 67 additions and 67 deletions.
134 changes: 67 additions & 67 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,72 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
}

if len(block.MsgSet.Messages) == 0 {
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if block.MsgSet.PartialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}

return nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default

incomplete := false
atLeastOne := false
prelude := true
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
if prelude && msg.Offset < child.offset {
continue
}
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
child.offset = msg.Offset + 1
} else {
incomplete = true
}
}

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
}
return nil
}

// brokerConsumer

type brokerConsumer struct {
Expand Down Expand Up @@ -442,7 +508,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
}

for child := range w.subscriptions {
if err := w.handleResponse(child, response); err != nil {
if err := child.handleResponse(response); err != nil {
switch err {
default:
child.sendError(err)
Expand Down Expand Up @@ -505,69 +571,3 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {

return w.broker.Fetch(request)
}

func (w *brokerConsumer) handleResponse(child *partitionConsumer, response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
}

if len(block.MsgSet.Messages) == 0 {
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if block.MsgSet.PartialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}

return nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default

incomplete := false
atLeastOne := false
prelude := true
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
if prelude && msg.Offset < child.offset {
continue
}
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
child.offset = msg.Offset + 1
} else {
incomplete = true
}
}

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
}
return nil
}

0 comments on commit efedcc1

Please sign in to comment.