Skip to content

Commit

Permalink
DRY up integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 20, 2019
1 parent e151a42 commit 5548ab1
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 72 deletions.
18 changes: 0 additions & 18 deletions benchmarks/src/it/scala/akka/kafka/benchmarks/TestTester.scala

This file was deleted.

5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ lazy val tests = project
}
} ++
Seq( // integration test dependencies
),
) ++ silencer,
resolvers += "Confluent Maven Repo" at "https://packages.confluent.io/maven/",
publish / skip := true,
whitesourceIgnore := true,
Expand All @@ -312,7 +312,8 @@ lazy val tests = project
"SerializationTest.java" ||
"TransactionsExampleTest.java"
} else (Test / unmanagedSources / excludeFilter).value
}
},
scalacOptions += "-P:silencer:globalFilters=scala.jdk.CollectionConverters"
)

lazy val docs = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public KafkaContainer(
// TODO Only for backward compatibility
withNetwork(Network.newNetwork());
withNetworkAliases("kafka-" + Base58.randomString(6));
withNetworkAliases("broker-" + brokerId);
withExposedPorts(exposedPort);

// Use two listeners with different names, it will force Kafka to communicate with itself via
Expand Down
13 changes: 6 additions & 7 deletions testkit/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,19 @@ akka.kafka.testkit {

testcontainers {

# define this to select a different Kafka version be choosing the desired version of Confluent Platform:
#
# Available Docker images - https://hub.docker.com/r/confluentinc/cp-kafka/tags
# Kafka versions in Confluent Platform - https://docs.confluent.io/current/installation/versions-interoperability.html
# define this to select a different Kafka version by choosing the desired version of Confluent Platform
# available Docker images: https://hub.docker.com/r/confluentinc/cp-kafka/tags
# Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html
confluent-platform-version = "5.2.1"

# the number of Kafka brokers to include in a test cluster
num-brokers = 1

# set this to use a replication factor greater than 1 for internal Kafka topics such as Consumer Offsets and
# Transaction log. this replication factor must be less than or equal to `num-brokers`
# set this to use a replication factor for internal Kafka topics such as Consumer Offsets and Transaction log.
# this replication factor must be less than or equal to `num-brokers`
internal-topics-replication-factor = 1

# set this to use a start port other than `9093`. when `num-brokers` greater than 1 than exposed ports will start
# set this to use a start port other than `9093`. when `num-brokers` is greater than 1 then exposed ports will start
# at this port number and increment by one for each subsequent broker.
start-port = 9093
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ class KafkaTestkitTestcontainersSettings private (val confluentPlatformVersion:
configureZooKeeper)

override def toString: String =
"KafkaTestkitTestcontainersSettings(" +
s"confluentPlatformVersion=$confluentPlatformVersion," +
s"numBrokers=$numBrokers," +
s"internalTopicsReplicationFactor=$internalTopicsReplicationFactor," +
s"startPort=$startPort"
s"startPort=$startPort)"
}

object KafkaTestkitTestcontainersSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ object TestcontainersKafka {
* Override this to select a different Kafka version be choosing the desired version of Confluent Platform:
* [[https://hub.docker.com/r/confluentinc/cp-kafka/tags Available Docker images]],
* [[https://docs.confluent.io/current/installation/versions-interoperability.html Kafka versions in Confluent Platform]]
*
* Deprecated: set Confluent Platform version in [[KafkaTestkitTestcontainersSettings]]
*/
@deprecated("Use testcontainersSettings instead.", "1.1.1")
def confluentPlatformVersion: String = ConfluentPlatformVersion
Expand Down
41 changes: 41 additions & 0 deletions tests/src/it/scala/akka/kafka/IntegrationTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package akka.kafka

import akka.NotUsed
import akka.stream.scaladsl.Flow
import org.apache.kafka.common.TopicPartition
import org.slf4j.Logger
import org.testcontainers.containers.GenericContainer

import scala.collection.JavaConverters._

object IntegrationTests {
val MessageLogInterval = 500L

def logSentMessages()
(implicit log: Logger): Flow[Long, Long, NotUsed] = Flow[Long].map { i =>
if (i % MessageLogInterval == 0) log.info(s"Sent [$i] messages so far.")
i
}

def logReceivedMessages()
(implicit log: Logger): Flow[Long, Long, NotUsed] = Flow[Long].map { i =>
if (i % MessageLogInterval == 0) log.info(s"Received [$i] messages so far.")
i
}

def logReceivedMessages(tp: TopicPartition)
(implicit log: Logger): Flow[Long, Long, NotUsed] = Flow[Long].map { i =>
if (i % MessageLogInterval == 0) log.info(s"$tp: Received [$i] messages so far.")
i
}

def stopRandomBroker(brokers: Vector[GenericContainer[_]], msgCount: Long)
(implicit log: Logger): Unit = {
val broker: GenericContainer[_] = brokers(scala.util.Random.nextInt(brokers.length))
val id = broker.getContainerId
val networkAliases = broker.getNetworkAliases.asScala.mkString(",")
log.warn(s"Stopping one Kafka container with network aliases [$networkAliases], container id [$id], after [$msgCount] messages")
broker.stop()
}

}
31 changes: 7 additions & 24 deletions tests/src/it/scala/akka/kafka/PartitionedSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,22 @@ import akka.stream.scaladsl.{Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import org.testcontainers.containers.GenericContainer

import scala.concurrent.Future
import scala.concurrent.duration._

class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassLike with WordSpecLike with ScalaFutures with Matchers {
implicit val pc = PatienceConfig(45.seconds, 1.second)

final val logSentMessages: Long => Long = i => {
if (i % 1000 == 0) log.info(s"Sent [$i] messages so far.")
i
}

final def logReceivedMessages(tp: TopicPartition): Long => Long = i => {
if (i % 1000 == 0) log.info(s"$tp: Received [$i] messages so far.")
i
}

override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system)
.withNumBrokers(3)
.withInternalTopicsReplicationFactor(2)

"partitioned source" should {
"not lose any messages when a Kafka node dies" in assertAllStagesStopped {
val broker2: GenericContainer[_] = brokerContainers(1)
val broker2ContainerId: String = broker2.getContainerId

val totalMessages = 1000 * 10L
val partitions = 4

Expand All @@ -64,14 +49,13 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
log.info(s"Sub-source for ${tp}")
source
.scan(0L)((c, _) => c + 1)
.map(logReceivedMessages(tp))
.via(IntegrationTests.logReceivedMessages(tp)(log))
// shutdown substream after receiving at its share of the total messages
.takeWhile(count => count < (totalMessages / partitions), inclusive = true)
.runWith(Sink.last)
.map { res =>
log.info(s"$tp: Received [$res] messages in total.")
res
}
}
.mergeSubstreams
// sum of sums. sum the last results of substreams.
.scan(0L)((c, subValue) => c + subValue)
.takeWhile(count => count < totalMessages, inclusive = true)
.runWith(Sink.last)
Expand All @@ -85,12 +69,11 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
ProducerConfig.ACKS_CONFIG -> "all"
)

val result: Future[Done] = Source(0L until totalMessages)
.map(logSentMessages)
val result: Future[Done] = Source(1L to totalMessages)
.via(IntegrationTests.logSentMessages()(log))
.map { number =>
if (number == totalMessages / 2) {
log.warn(s"Stopping one Kafka container [$broker2ContainerId] after [$number] messages")
broker2.stop()
IntegrationTests.stopRandomBroker(brokerContainers, number)(log)
}
number
}
Expand Down
29 changes: 9 additions & 20 deletions tests/src/it/scala/akka/kafka/PlainSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import org.testcontainers.containers.GenericContainer

import scala.concurrent.Future
import scala.concurrent.duration._
Expand All @@ -24,10 +23,7 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL

"plain source" should {
"not lose any messages when a Kafka node dies" in assertAllStagesStopped {
val broker2: GenericContainer[_] = brokerContainers(1)
val broker2ContainerId: String = broker2.getContainerId

val totalMessages = 1000 * 10
val totalMessages = 1000 * 10L
val partitions = 1

// TODO: This is probably not necessary anymore since the testcontainer setup blocks until all brokers are online.
Expand All @@ -46,12 +42,9 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
.withGroupId(groupId)
.withProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") // default was 5 * 60 * 1000 (five minutes)

val consumerMatValue: Future[Int] = Consumer.plainSource(consumerConfig, Subscriptions.topics(topic))
.scan(0)((c, _) => c + 1)
.map { i =>
if (i % 1000 == 0) log.info(s"Received [$i] messages so far.")
i
}
val consumerMatValue: Future[Long] = Consumer.plainSource(consumerConfig, Subscriptions.topics(topic))
.scan(0L)((c, _) => c + 1)
.via(IntegrationTests.logReceivedMessages()(log))
.takeWhile(count => count < totalMessages, inclusive = true)
.runWith(Sink.last)

Expand All @@ -64,19 +57,15 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
ProducerConfig.ACKS_CONFIG -> "all"
)

val result = Source(1 to totalMessages)
.map { i =>
if (i % 1000 == 0) log.info(s"Sent [$i] messages so far.")
i.toString
}
.map(number => new ProducerRecord(topic, partition0, DefaultKey, number))
val result = Source(0L to totalMessages)
.via(IntegrationTests.logReceivedMessages()(log))
.map { number =>
if (number.value().toInt == totalMessages / 2) {
log.warn(s"Stopping one Kafka container [$broker2ContainerId] after [$number] messages")
broker2.stop()
if (number == totalMessages / 2) {
IntegrationTests.stopRandomBroker(brokerContainers, number)(log)
}
number
}
.map(number => new ProducerRecord(topic, partition0, DefaultKey, number.toString))
.runWith(Producer.plainSink(producerConfig))

result.futureValue
Expand Down

0 comments on commit 5548ab1

Please sign in to comment.