diff --git a/consumer.go b/consumer.go index 461026eb6..4b29ab2a8 100644 --- a/consumer.go +++ b/consumer.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "fmt" + "math" "sync" "sync/atomic" "time" @@ -591,6 +592,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu child.offset++ // skip this one so we can keep processing future messages } else { child.fetchSize *= 2 + // check int32 overflow + if child.fetchSize < 0 { + child.fetchSize = math.MaxInt32 + } if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max { child.fetchSize = child.conf.Consumer.Fetch.Max } diff --git a/fetch_response_test.go b/fetch_response_test.go index 917027644..58a098754 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -86,6 +86,37 @@ var ( 0x06, 0x08, 0x09, 0x0A, 0x04, 0x0B, 0x0C} + partialFetchResponse = []byte{ + 0x00, 0x00, 0x00, 0x00, // ThrottleTime + 0x00, 0x00, 0x00, 0x01, // Number of Topics + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic + 0x00, 0x00, 0x00, 0x01, // Number of Partitions + 0x00, 0x00, 0x00, 0x05, // Partition + 0x00, 0x00, // Error + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset + 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions + 0x00, 0x00, 0x00, 0x40, // Records length + + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x46, + 0x00, 0x00, 0x00, 0x00, + 0x02, + 0xDB, 0x47, 0x14, 0xC9, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // record + 0x28, + 0x00, + 0x00, + } + oneMessageFetchResponseV4 = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime 0x00, 0x00, 0x00, 0x01, // Number of Topics @@ -277,6 +308,45 @@ func TestOneRecordFetchResponse(t *testing.T) { } } +func TestPartailFetchResponse(t *testing.T) { + response := FetchResponse{} + testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4) + + if len(response.Blocks) != 1 { + t.Fatal("Decoding produced incorrect number of topic blocks.") + } + + if len(response.Blocks["topic"]) != 1 { + t.Fatal("Decoding produced incorrect number of partition blocks for topic.") + } + + block := response.GetBlock("topic", 5) + if block == nil { + t.Fatal("GetBlock didn't return block.") + } + if block.Err != ErrNoError { + t.Error("Decoding didn't produce correct error code.") + } + if block.HighWaterMarkOffset != 0x10101010 { + t.Error("Decoding didn't produce correct high water mark offset.") + } + partial, err := block.isPartial() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !partial { + t.Error("Decoding not a partial trailing record") + } + + n, err := block.numRecords() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 0 { + t.Fatal("Decoding produced incorrect number of records.") + } +} + func TestOneMessageFetchResponseV4(t *testing.T) { response := FetchResponse{} testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)