Skip to content

Commit

Permalink
Merge pull request #1086 from adwinsky/fix-consumer-block-when-receiv…
Browse files Browse the repository at this point in the history
…ing-invalid-fetch-response

Unblock consumer when receiving invalid FetchResponse (response did not contain all the expected topic/partition blocks)
  • Loading branch information
eapache authored Jun 14, 2018
2 parents 35324cf + d3f8616 commit e19e955
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
child.offset = offset + 1
}
if len(messages) == 0 {
return nil, ErrIncompleteResponse
child.offset += 1
}
return messages, nil
}
Expand Down
51 changes: 51 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,57 @@ func TestConsumerExtraOffsets(t *testing.T) {
}
}

// In some situations broker may return a block containing only
// messages older then requested, even though there would be
// more messages if higher offset was requested.
func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 4}
fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)

fetchResponse2 := &FetchResponse{Version: 4}
fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)

cfg := NewConfig()
cfg.Consumer.Return.Errors = true
cfg.Version = V1_1_0_0

broker0 := NewMockBroker(t, 0)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 2)
if err != nil {
t.Fatal(err)
}

select {
case msg := <-consumer.Messages():
assertMessageOffset(t, msg, 1000000)
case err := <-consumer.Errors():
t.Fatal(err)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 4}
Expand Down

0 comments on commit e19e955

Please sign in to comment.