From b5630e543b4c61fb6ed744c6c0d155ee5d02e4a6 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 29 May 2019 12:56:29 +0100 Subject: [PATCH] bug: handle and log throttled FetchResponses Kafka returns a throttled FetchResponse with an empty array of topics. Previously Sarama treated this as an ErrIncompleteResponse and would abandon and re-create the subscription instead of retrying the FetchRequest. --- consumer.go | 8 ++++++++ consumer_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/consumer.go b/consumer.go index 461026eb6..4718a7dc9 100644 --- a/consumer.go +++ b/consumer.go @@ -561,6 +561,14 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry) } + // If request was throttled and empty we log and return without error + if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 { + Logger.Printf( + "consumer/broker/%d FetchResponse throttled %v\n", + child.broker.broker.ID(), response.ThrottleTime) + return nil, nil + } + block := response.GetBlock(child.topic, child.partition) if block == nil { return nil, ErrIncompleteResponse diff --git a/consumer_test.go b/consumer_test.go index c7b1cc1e0..4107dfabf 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -4,6 +4,7 @@ import ( "log" "os" "os/signal" + "reflect" "sync" "sync/atomic" "testing" @@ -1240,3 +1241,49 @@ ConsumerLoop: log.Printf("Consumed: %d\n", consumed) } + +func Test_partitionConsumer_parseResponse(t *testing.T) { + type args struct { + response *FetchResponse + } + tests := []struct { + name string + args args + want []*ConsumerMessage + wantErr bool + }{ + { + name: "empty but throttled FetchResponse is not considered an error", + args: args{ + response: &FetchResponse{ + ThrottleTime: time.Millisecond, + }, + }, + }, + { + name: "empty FetchResponse is considered an incomplete response by default", + args: args{ + response: &FetchResponse{}, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + child := &partitionConsumer{ + broker: &brokerConsumer{ + broker: &Broker{}, + }, + conf: &Config{}, + } + got, err := child.parseResponse(tt.args.response) + if (err != nil) != tt.wantErr { + t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want) + } + }) + } +}