From 7d3e6f7dbc1c27fd2adb8a6d4f8acb5b7d27e576 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Mon, 21 Oct 2019 12:19:26 +0200 Subject: [PATCH] Change naming --- .../akka/kafka/scaladsl/RebalanceSpec.scala | 61 ++++++++++--------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala index 4979345e3..a27557e74 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala @@ -13,6 +13,7 @@ import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestProbe import akka.{Done, NotUsed} +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.scalatest._ @@ -37,23 +38,23 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { val topic1 = createTopic(topicSuffix, partitions = 2) val group1 = createGroupId(1) val consumerSettings = consumerDefaults - .withProperty("max.poll.records", "1") + .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1") .withGroupId(group1) produceToTwoPartitions(topic1, count).futureValue shouldBe Done // Subscribe to the topic (without demand) - val rebalanceActor1 = TestProbe() - val subscription1 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref) + val probe1rebalanceActor = TestProbe() + val probe1subscription = Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref) val (control1, probe1) = Consumer - .plainSource(consumerSettings, subscription1) + .plainSource(consumerSettings, probe1subscription) .toMat(TestSink.probe)(Keep.both) .run() // Await initial partition assignment - rebalanceActor1.expectMsgClass(classOf[TopicPartitionsRevoked]) - rebalanceActor1.expectMsg( - TopicPartitionsAssigned(subscription1, + probe1rebalanceActor.expectMsgClass(classOf[TopicPartitionsRevoked]) + probe1rebalanceActor.expectMsg( + TopicPartitionsAssigned(probe1subscription, Set(new TopicPartition(topic1, partition0), new TopicPartition(topic1, partition1))) ) @@ -61,43 +62,47 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { probe1.request(count * 2L) // Subscribe to the topic (without demand) - val rebalanceActor2 = TestProbe() - val subscription2 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref) + val prove2rebalanceActor = TestProbe() + val probe2subscription = Subscriptions.topics(topic1).withRebalanceListener(prove2rebalanceActor.ref) val (control2, probe2) = Consumer - .plainSource(consumerSettings, subscription2) + .plainSource(consumerSettings, probe2subscription) .toMat(TestSink.probe)(Keep.both) .run() // Await a revoke to both consumers - rebalanceActor1.expectMsg( - TopicPartitionsRevoked(subscription1, + probe1rebalanceActor.expectMsg( + TopicPartitionsRevoked(probe1subscription, Set(new TopicPartition(topic1, partition0), new TopicPartition(topic1, partition1))) ) - rebalanceActor2.expectMsgClass(classOf[TopicPartitionsRevoked]) + prove2rebalanceActor.expectMsgClass(classOf[TopicPartitionsRevoked]) // the rebalance finishes - rebalanceActor1.expectMsg(TopicPartitionsAssigned(subscription1, Set(new TopicPartition(topic1, partition0)))) - rebalanceActor2.expectMsg(TopicPartitionsAssigned(subscription2, Set(new TopicPartition(topic1, partition1)))) + probe1rebalanceActor.expectMsg( + TopicPartitionsAssigned(probe1subscription, Set(new TopicPartition(topic1, partition0))) + ) + prove2rebalanceActor.expectMsg( + TopicPartitionsAssigned(probe2subscription, Set(new TopicPartition(topic1, partition1))) + ) - val messages1 = probe1.expectNextN(count + 1L) + val probe1messages = probe1.expectNextN(count + 1L) probe2.request(count.toLong) - val messages2 = probe2.expectNextN(count.toLong) + val probe2messages = probe2.expectNextN(count.toLong) - val messages1p0 = messages1.filter(_.partition() == partition0) - val messages1p1 = messages1.filter(_.partition() == partition1) + val probe1messages0 = probe1messages.filter(_.partition() == partition0) + val probe1messages1 = probe1messages.filter(_.partition() == partition1) - if (messages1p0.size == 1) { // this depending on which partition gets issued "first" during poll - // this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued - messages1p0 should have size (1) - messages1p1 should have size (count.toLong) + if (probe1messages0.size == 1) { // this depending on which partition gets issued "first" during poll + // this is the problematic message: even though partition 0 is balanced away the enqueued messages are issued + probe1messages0 should have size (1) + probe1messages1 should have size (count.toLong) - messages2 should have size (count.toLong) + probe2messages should have size (count.toLong) } else { - // this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued - messages1p0 should have size (count.toLong) - messages1p1 should have size (1) + // this is the problematic message: even though partition 0 is balanced away the enqueued messages are issued + probe1messages0 should have size (count.toLong) + probe1messages1 should have size (1) - messages2 should have size (count.toLong) + probe2messages should have size (count.toLong) } probe1.cancel()