Skip to content

Commit

Permalink
When no messages are available for the partition, oldest and newest a…
Browse files Browse the repository at this point in the history
…re the same.
  • Loading branch information
wvanbergen committed Mar 21, 2015
1 parent 0ad57d2 commit 28bd4ba
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
12 changes: 9 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,17 @@ func (client *client) GetOffsetRange(topic string, partitionID int32) (int64, in
if block.Err != ErrNoError {
return -1, -1, block.Err
}
if len(block.Offsets) != 2 {

if len(block.Offsets) == 1 {
// No messages in this partition (anymore), so oldest and newest are the same
return block.Offsets[0], block.Offsets[0], nil
} else if len(block.Offsets) == 2 {
// The first offset is the log head, the second one is the first available.
return block.Offsets[1], block.Offsets[0], nil
} else
Logger.Printf("Incomplete GetOffsetRange response: %+v\n", block.Offsets)
return -1, -1, ErrIncompleteResponse
}

return block.Offsets[1], block.Offsets[0], nil
}

func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
Expand Down
11 changes: 11 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,17 @@ func TestClientGetOffsetRange(t *testing.T) {
t.Errorf("Expected oldest 155 and newest 2345, got %d and %d", oldest, newest)
}

noMessagesOffsetResponse := new(OffsetResponse)
noMessagesOffsetResponse.AddTopicPartition("my_topic", 0, 2345)
leader.Returns(noMessagesOffsetResponse)

oldest, newest, err := client.GetOffsetRange("my_topic", 0)
if err != nil {
t.Error(err)
} else if oldest != 2345 || newest != 2345 {
t.Errorf("Expected oldest and newest to be 2345, got %d and %d", oldest, newest)
}

safeClose(t, client)
leader.Close()
seedBroker.Close()
Expand Down
8 changes: 4 additions & 4 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {

kafkaShouldBeAvailable = os.Getenv("CI") != ""

if os.Getenv("DEBUG") == "true" {
if os.Getenv("LOGGER") == "true" {
Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
}
Expand Down Expand Up @@ -199,12 +199,12 @@ func TestFuncClientGetOffsetRange(t *testing.T) {
t.Error("Expected ErrUnknownTopicOrPartition, got", err)
}

oldest, newest, err := c.GetOffsetRange("many_partitions", 1)
oldest, newest, err := c.GetOffsetRange("single_partition", 0)
if err != nil {
t.Error("Expected no error, got", err)
}
if oldest >= newest {
t.Errorf("The oldest available offset (%d) should be smaller than the newest (%d)", oldest, newest)
if oldest > newest {
t.Errorf("The oldest available offset (%d) should not be greater than the newest (%d)", oldest, newest)
}

safeClose(t, c)
Expand Down

0 comments on commit 28bd4ba

Please sign in to comment.