Skip to content

Commit

Permalink
fetch size overflow (#1376)
Browse files Browse the repository at this point in the history
* updated fetch size overflow check

* Add test case for fetching partial trailing message
  • Loading branch information
andrewhao888 authored and varun06 committed Jul 2, 2019
1 parent c82066c commit c50148e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -599,6 +600,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
// check int32 overflow
if child.fetchSize < 0 {
child.fetchSize = math.MaxInt32
}
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
Expand Down
70 changes: 70 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@ var (
0x06, 0x08, 0x09, 0x0A,
0x04, 0x0B, 0x0C}

partialFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0x05, // Partition
0x00, 0x00, // Error
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
0x00, 0x00, 0x00, 0x40, // Records length

0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x46,
0x00, 0x00, 0x00, 0x00,
0x02,
0xDB, 0x47, 0x14, 0xC9,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// record
0x28,
0x00,
0x00,
}

oneMessageFetchResponseV4 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
Expand Down Expand Up @@ -277,6 +308,45 @@ func TestOneRecordFetchResponse(t *testing.T) {
}
}

func TestPartailFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrNoError {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !partial {
t.Error("Decoding not a partial trailing record")
}

n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 0 {
t.Fatal("Decoding produced incorrect number of records.")
}
}

func TestOneMessageFetchResponseV4(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
Expand Down

0 comments on commit c50148e

Please sign in to comment.