Skip to content

Commit

Permalink
Merge pull request #1383 from dnwe/handle-throttled-fetchresponse
Browse files Browse the repository at this point in the history
bug: handle and log throttled FetchResponses
  • Loading branch information
bai authored Jun 3, 2019
2 parents 41f00bc + b5630e5 commit 49e70e7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
8 changes: 8 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,14 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
}

// If request was throttled and empty we log and return without error
if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
Logger.Printf(
"consumer/broker/%d FetchResponse throttled %v\n",
child.broker.broker.ID(), response.ThrottleTime)
return nil, nil
}

block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
Expand Down
47 changes: 47 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"os"
"os/signal"
"reflect"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1240,3 +1241,49 @@ ConsumerLoop:

log.Printf("Consumed: %d\n", consumed)
}

func Test_partitionConsumer_parseResponse(t *testing.T) {
type args struct {
response *FetchResponse
}
tests := []struct {
name string
args args
want []*ConsumerMessage
wantErr bool
}{
{
name: "empty but throttled FetchResponse is not considered an error",
args: args{
response: &FetchResponse{
ThrottleTime: time.Millisecond,
},
},
},
{
name: "empty FetchResponse is considered an incomplete response by default",
args: args{
response: &FetchResponse{},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
child := &partitionConsumer{
broker: &brokerConsumer{
broker: &Broker{},
},
conf: &Config{},
}
got, err := child.parseResponse(tt.args.response)
if (err != nil) != tt.wantErr {
t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 49e70e7

Please sign in to comment.