Skip to content

Commit

Permalink
Fix/simplify/enable tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Apr 29, 2015
1 parent fc6e499 commit 9772a2c
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestConsumerOffsetManual(t *testing.T) {
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

for i := 0; i <= 10; i++ {
for i := 0; i < 10; i++ {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
leader.Returns(fetchResponse)
Expand Down Expand Up @@ -326,27 +326,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
leader0.Returns(fetchResponse)
time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering

// leader0 provides last message on partition 1
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader0.Returns(fetchResponse)

// leader1 provides last message on partition 0
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader1.Returns(fetchResponse)

wg.Wait()
leader1.Close()
leader0.Close()
wg.Wait()
seedBroker.Close()
safeClose(t, master)
}

func TestConsumerInterleavedClose(t *testing.T) {
t.Skip("Enable once bug #325 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand Down Expand Up @@ -379,6 +368,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)
time.Sleep(50 * time.Millisecond)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
Expand All @@ -392,7 +382,9 @@ func TestConsumerInterleavedClose(t *testing.T) {
if err != nil {
t.Fatal(err)
}
<-c0.Messages()

fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

Expand Down

0 comments on commit 9772a2c

Please sign in to comment.