Skip to content

Commit

Permalink
Change naming
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Oct 21, 2019
1 parent ce2972e commit 7d3e6f7
Showing 1 changed file with 33 additions and 28 deletions.
61 changes: 33 additions & 28 deletions tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestProbe
import akka.{Done, NotUsed}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.scalatest._
Expand All @@ -37,67 +38,71 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside {
val topic1 = createTopic(topicSuffix, partitions = 2)
val group1 = createGroupId(1)
val consumerSettings = consumerDefaults
.withProperty("max.poll.records", "1")
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
.withGroupId(group1)

produceToTwoPartitions(topic1, count).futureValue shouldBe Done

// Subscribe to the topic (without demand)
val rebalanceActor1 = TestProbe()
val subscription1 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref)
val probe1rebalanceActor = TestProbe()
val probe1subscription = Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
val (control1, probe1) = Consumer
.plainSource(consumerSettings, subscription1)
.plainSource(consumerSettings, probe1subscription)
.toMat(TestSink.probe)(Keep.both)
.run()

// Await initial partition assignment
rebalanceActor1.expectMsgClass(classOf[TopicPartitionsRevoked])
rebalanceActor1.expectMsg(
TopicPartitionsAssigned(subscription1,
probe1rebalanceActor.expectMsgClass(classOf[TopicPartitionsRevoked])
probe1rebalanceActor.expectMsg(
TopicPartitionsAssigned(probe1subscription,
Set(new TopicPartition(topic1, partition0), new TopicPartition(topic1, partition1)))
)

// request all messages
probe1.request(count * 2L)

// Subscribe to the topic (without demand)
val rebalanceActor2 = TestProbe()
val subscription2 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref)
val prove2rebalanceActor = TestProbe()
val probe2subscription = Subscriptions.topics(topic1).withRebalanceListener(prove2rebalanceActor.ref)
val (control2, probe2) = Consumer
.plainSource(consumerSettings, subscription2)
.plainSource(consumerSettings, probe2subscription)
.toMat(TestSink.probe)(Keep.both)
.run()

// Await a revoke to both consumers
rebalanceActor1.expectMsg(
TopicPartitionsRevoked(subscription1,
probe1rebalanceActor.expectMsg(
TopicPartitionsRevoked(probe1subscription,
Set(new TopicPartition(topic1, partition0), new TopicPartition(topic1, partition1)))
)
rebalanceActor2.expectMsgClass(classOf[TopicPartitionsRevoked])
prove2rebalanceActor.expectMsgClass(classOf[TopicPartitionsRevoked])

// the rebalance finishes
rebalanceActor1.expectMsg(TopicPartitionsAssigned(subscription1, Set(new TopicPartition(topic1, partition0))))
rebalanceActor2.expectMsg(TopicPartitionsAssigned(subscription2, Set(new TopicPartition(topic1, partition1))))
probe1rebalanceActor.expectMsg(
TopicPartitionsAssigned(probe1subscription, Set(new TopicPartition(topic1, partition0)))
)
prove2rebalanceActor.expectMsg(
TopicPartitionsAssigned(probe2subscription, Set(new TopicPartition(topic1, partition1)))
)

val messages1 = probe1.expectNextN(count + 1L)
val probe1messages = probe1.expectNextN(count + 1L)
probe2.request(count.toLong)
val messages2 = probe2.expectNextN(count.toLong)
val probe2messages = probe2.expectNextN(count.toLong)

val messages1p0 = messages1.filter(_.partition() == partition0)
val messages1p1 = messages1.filter(_.partition() == partition1)
val probe1messages0 = probe1messages.filter(_.partition() == partition0)
val probe1messages1 = probe1messages.filter(_.partition() == partition1)

if (messages1p0.size == 1) { // this depending on which partition gets issued "first" during poll
// this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued
messages1p0 should have size (1)
messages1p1 should have size (count.toLong)
if (probe1messages0.size == 1) { // this depending on which partition gets issued "first" during poll
// this is the problematic message: even though partition 0 is balanced away the enqueued messages are issued
probe1messages0 should have size (1)
probe1messages1 should have size (count.toLong)

messages2 should have size (count.toLong)
probe2messages should have size (count.toLong)
} else {
// this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued
messages1p0 should have size (count.toLong)
messages1p1 should have size (1)
// this is the problematic message: even though partition 0 is balanced away the enqueued messages are issued
probe1messages0 should have size (count.toLong)
probe1messages1 should have size (1)

messages2 should have size (count.toLong)
probe2messages should have size (count.toLong)
}

probe1.cancel()
Expand Down

0 comments on commit 7d3e6f7

Please sign in to comment.