Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use testcontainers for multi-broker integration tests and benchmarks #939

Merged
Prev Previous commit
Next Next commit
Use min ISR config and producer send consistency guarantees
  • Loading branch information
seglo committed Oct 19, 2019
commit dd8908ab16ea72b8c3731b8390288165a2de06c2
Original file line number Diff line number Diff line change
@@ -5,11 +5,12 @@

package akka.kafka.testkit.internal

import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Properties}

import scala.collection.JavaConverters._
seglo marked this conversation as resolved.
Show resolved Hide resolved

import akka.actor.ActorSystem
import akka.kafka.testkit.KafkaTestkitSettings
import akka.kafka.{CommitterSettings, ConsumerSettings, ProducerSettings}
@@ -115,36 +116,56 @@ trait KafkaTestKit {
}

/**
* Create a topic with a default suffix, single partition and a replication factor of one.
* Create a topic with a default suffix, single partition, a replication factor of one, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(): String = createTopic(0, 1, 1, Map[String, String]())

/**
* Create a topic with a given suffix, single partitions, a replication factor of one, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int): String = createTopic(suffix, 1, 1, Map[String, String]())

/**
* Create a topic with a given suffix, partition number, a replication factor of one, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(): String = createTopic(0, 1, 1)
def createTopic(suffix: Int, partitions: Int): String =
createTopic(suffix, partitions, 1, Map[String, String]())

/**
* Create a topic with a given suffix, single partitions and a replication factor of one.
* Create a topic with given suffix, partition number, replication factor, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int): String = createTopic(suffix, 1, 1)
def createTopic(suffix: Int, partitions: Int, replication: Int): String =
createTopic(suffix, partitions, replication, Map[String, String]())

/**
* Create a topic with a given suffix, partition number and a replication factor of one.
* JAVA API
* Create a topic with given suffix, partition number, replication factor, and topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int, partitions: Int): String = createTopic(suffix, partitions, 1)
def createTopic(suffix: Int, partitions: Int, replication: Int, config: java.util.Map[String, String]): String =
createTopic(suffix, partitions, replication, config.asScala)

/**
* Create a topic with given suffix, partition number and replication factor.
* Create a topic with given suffix, partition number, replication factor, and topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int, partitions: Int, replication: Int): String = {
def createTopic(suffix: Int,
partitions: Int,
replication: Int,
config: scala.collection.Map[String, String]): String = {
val topicName = createTopicName(suffix)
val configs = new util.HashMap[String, String]()
val createResult = adminClient.createTopics(
Arrays.asList(new NewTopic(topicName, partitions, replication.toShort).configs(configs))
Arrays.asList(new NewTopic(topicName, partitions, replication.toShort).configs(config.asJava))
)
createResult.all().get(10, TimeUnit.SECONDS)
topicName
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ package akka.kafka.testkit.internal

import akka.kafka.testkit.scaladsl.{KafkaSpec, ScalatestKafkaSpec}
import org.testcontainers.containers.GenericContainer
//import org.testcontainers.containers.output.Slf4jLogConsumer

import scala.collection.JavaConverters._

@@ -42,6 +43,7 @@ object TestcontainersKafka {
}

trait Spec extends KafkaSpec {
//private val logConsumer = new Slf4jLogConsumer(log)
private var cluster: KafkaContainerCluster = _
private var kafkaBootstrapServersInternal: String = _
private var kafkaPortInternal: Int = -1
@@ -89,6 +91,7 @@ object TestcontainersKafka {
configureKafka(brokerContainers)
configureZooKeeper(zookeeperContainer)
cluster.startAll()
//logContainers()
kafkaBootstrapServersInternal = cluster.getBootstrapServers
kafkaPortInternal =
kafkaBootstrapServersInternal.substring(kafkaBootstrapServersInternal.lastIndexOf(":") + 1).toInt
@@ -102,6 +105,11 @@ object TestcontainersKafka {
kafkaPortInternal = -1
cluster = null
}

// private def logContainers(): Unit = {
// brokerContainers.foreach(_.followOutput(logConsumer))
// zookeeperContainer.followOutput(logConsumer)
// }
}

private class SpecBase extends ScalatestKafkaSpec(-1) with Spec
3 changes: 3 additions & 0 deletions tests/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -23,6 +23,9 @@
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>

<logger name="com.github.dockerjava" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>

<root level="DEBUG">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
26 changes: 19 additions & 7 deletions tests/src/it/scala/akka/kafka/PartitionedSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
@@ -8,16 +8,18 @@ import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import org.testcontainers.containers.GenericContainer

import scala.concurrent.Future
import scala.concurrent.duration._

class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassLike with WordSpecLike with ScalaFutures with Matchers {
implicit val pc = PatienceConfig(30.seconds, 1.second)
implicit val pc = PatienceConfig(45.seconds, 1.second)

final val logSentMessages: Long => Long = i => {
if (i % 1000 == 0) log.info(s"Sent [$i] messages so far.")
@@ -48,14 +50,17 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
_.nodes().get().size == testcontainersSettings.numBrokers
}

val topic = createTopic(0, partitions, replication = 3)
val topic = createTopic(0, partitions, replication = 3, Map(
// require at least two replicas be in sync before acknowledging produced record
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG -> "2"
))
val groupId = createGroupId(0)

val consumerConfig = consumerDefaults
.withGroupId(groupId)
.withProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") // default was 5 * 60 * 1000 (five minutes)

val control = Consumer.plainPartitionedSource(consumerConfig, Subscriptions.topics(topic))
val control: DrainingControl[Long] = Consumer.plainPartitionedSource(consumerConfig, Subscriptions.topics(topic))
.groupBy(partitions, _._1)
.mapAsync(8) { case (tp, source) =>
log.info(s"Sub-source for ${tp}")
@@ -78,7 +83,12 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
!_.members().isEmpty
}

val result = Source(0L until totalMessages)
val producerConfig = producerDefaults.withProperties(
// require acknowledgement from at least min in sync replicas (2). default is 1
ProducerConfig.ACKS_CONFIG -> "all"
)

val result: Future[Done] = Source(0L until totalMessages)
.map(logSentMessages)
.map { number =>
if (number == totalMessages / 2) {
@@ -88,10 +98,12 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
number
}
.map(number => new ProducerRecord(topic, (number % partitions).toInt, DefaultKey, number.toString))
.runWith(Producer.plainSink(producerDefaults))
.runWith(Producer.plainSink(producerConfig))

result.futureValue shouldBe Done
control.drainAndShutdown().futureValue shouldBe totalMessages
log.info("Actual messages received [{}], total messages sent [{}]", control.drainAndShutdown().futureValue, totalMessages)
// assert that we receive at least the number of messages we sent, there could be more due to retries
assert(control.drainAndShutdown().futureValue >= totalMessages)
}
}
}
22 changes: 16 additions & 6 deletions tests/src/it/scala/akka/kafka/PlainSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
@@ -6,23 +6,23 @@ import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import org.testcontainers.containers.GenericContainer

import scala.concurrent.duration._

class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassLike with WordSpecLike with ScalaFutures with Matchers {
implicit val pc = PatienceConfig(30.seconds, 100.millis)
implicit val pc = PatienceConfig(45.seconds, 100.millis)

override def testcontainersSettings = TestcontainersKafkaSettings(
numBrokers = 3,
internalTopicsReplicationFactor = 2
)

"plain source" should {

"not lose any messages when a Kafka node dies" in assertAllStagesStopped {
val broker2: GenericContainer[_] = brokerContainers(1)
val broker2ContainerId: String = broker2.getContainerId
@@ -36,7 +36,10 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
_.nodes().get().size == testcontainersSettings.numBrokers
}

val topic = createTopic(suffix = 0, partitions, replication = 3)
val topic = createTopic(0, partitions, replication = 3, Map(
// require at least two replicas be in sync before acknowledging produced record
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG -> "2"
))
val groupId = createGroupId(0)

val consumerConfig = consumerDefaults
@@ -56,6 +59,11 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
case singleConsumer :: Nil => singleConsumer.assignment.topicPartitions.size == partitions
}

val producerConfig = producerDefaults.withProperties(
// require acknowledgement from at least min in sync replicas (2). default is 1
ProducerConfig.ACKS_CONFIG -> "all"
)

val result = Source(1 to totalMessages)
.map { i =>
if (i % 1000 == 0) log.info(s"Sent [$i] messages so far.")
@@ -69,10 +77,12 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
}
number
}
.runWith(Producer.plainSink(producerDefaults))
.runWith(Producer.plainSink(producerConfig))

result.futureValue
consumer.futureValue shouldBe totalMessages
log.info("Actual messages received [{}], total messages sent [{}]", consumer.futureValue, totalMessages)
// assert that we receive at least the number of messages we sent, there could be more due to retries
assert(consumer.futureValue >= totalMessages)
}
}
}