Skip to content

Commit

Permalink
Use min ISR config and producer send consistency guarantees
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 19, 2019
1 parent 5633dac commit 2a42967
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

package akka.kafka.testkit.internal

import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Properties}

import scala.collection.JavaConverters._

import akka.actor.ActorSystem
import akka.kafka.testkit.KafkaTestkitSettings
import akka.kafka.{CommitterSettings, ConsumerSettings, ProducerSettings}
Expand Down Expand Up @@ -115,36 +116,56 @@ trait KafkaTestKit {
}

/**
* Create a topic with a default suffix, single partition and a replication factor of one.
* Create a topic with a default suffix, single partition, a replication factor of one, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(): String = createTopic(0, 1, 1, Map[String, String]())

/**
* Create a topic with a given suffix, single partitions, a replication factor of one, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int): String = createTopic(suffix, 1, 1, Map[String, String]())

/**
* Create a topic with a given suffix, partition number, a replication factor of one, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(): String = createTopic(0, 1, 1)
def createTopic(suffix: Int, partitions: Int): String =
createTopic(suffix, partitions, 1, Map[String, String]())

/**
* Create a topic with a given suffix, single partitions and a replication factor of one.
* Create a topic with given suffix, partition number, replication factor, and no topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int): String = createTopic(suffix, 1, 1)
def createTopic(suffix: Int, partitions: Int, replication: Int): String =
createTopic(suffix, partitions, replication, Map[String, String]())

/**
* Create a topic with a given suffix, partition number and a replication factor of one.
* JAVA API
* Create a topic with given suffix, partition number, replication factor, and topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int, partitions: Int): String = createTopic(suffix, partitions, 1)
def createTopic(suffix: Int, partitions: Int, replication: Int, config: java.util.Map[String, String]): String =
createTopic(suffix, partitions, replication, config.asScala)

/**
* Create a topic with given suffix, partition number and replication factor.
* Create a topic with given suffix, partition number, replication factor, and topic configuration.
*
* This method will block and return only when the topic has been successfully created.
*/
def createTopic(suffix: Int, partitions: Int, replication: Int): String = {
def createTopic(suffix: Int,
partitions: Int,
replication: Int,
config: scala.collection.Map[String, String]): String = {
val topicName = createTopicName(suffix)
val configs = new util.HashMap[String, String]()
val createResult = adminClient.createTopics(
Arrays.asList(new NewTopic(topicName, partitions, replication.toShort).configs(configs))
Arrays.asList(new NewTopic(topicName, partitions, replication.toShort).configs(config.asJava))
)
createResult.all().get(10, TimeUnit.SECONDS)
topicName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.kafka.testkit.internal

import akka.kafka.testkit.scaladsl.{KafkaSpec, ScalatestKafkaSpec}
import org.testcontainers.containers.GenericContainer
//import org.testcontainers.containers.output.Slf4jLogConsumer

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -42,6 +43,7 @@ object TestcontainersKafka {
}

trait Spec extends KafkaSpec {
//private val logConsumer = new Slf4jLogConsumer(log)
private var cluster: KafkaContainerCluster = _
private var kafkaBootstrapServersInternal: String = _
private var kafkaPortInternal: Int = -1
Expand Down Expand Up @@ -89,6 +91,7 @@ object TestcontainersKafka {
configureKafka(brokerContainers)
configureZooKeeper(zookeeperContainer)
cluster.startAll()
//logContainers()
kafkaBootstrapServersInternal = cluster.getBootstrapServers
kafkaPortInternal =
kafkaBootstrapServersInternal.substring(kafkaBootstrapServersInternal.lastIndexOf(":") + 1).toInt
Expand All @@ -102,6 +105,11 @@ object TestcontainersKafka {
kafkaPortInternal = -1
cluster = null
}

// private def logContainers(): Unit = {
// brokerContainers.foreach(_.followOutput(logConsumer))
// zookeeperContainer.followOutput(logConsumer)
// }
}

private class SpecBase extends ScalatestKafkaSpec(-1) with Spec
Expand Down
3 changes: 3 additions & 0 deletions tests/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>

<logger name="com.github.dockerjava" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>

<root level="DEBUG">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
Expand Down
24 changes: 18 additions & 6 deletions tests/src/it/scala/akka/kafka/PartitionedSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import org.testcontainers.containers.GenericContainer

import scala.concurrent.Future
import scala.concurrent.duration._

class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassLike with WordSpecLike with ScalaFutures with Matchers {
Expand Down Expand Up @@ -48,14 +50,17 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
_.nodes().get().size == testcontainersSettings.numBrokers
}

val topic = createTopic(0, partitions, replication = 3)
val topic = createTopic(0, partitions, replication = 3, Map(
// require at least two replicas be in sync before acknowledging produced record
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG -> "2"
))
val groupId = createGroupId(0)

val consumerConfig = consumerDefaults
.withGroupId(groupId)
.withProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") // default was 5 * 60 * 1000 (five minutes)

val control = Consumer.plainPartitionedSource(consumerConfig, Subscriptions.topics(topic))
val control: DrainingControl[Long] = Consumer.plainPartitionedSource(consumerConfig, Subscriptions.topics(topic))
.groupBy(partitions, _._1)
.mapAsync(8) { case (tp, source) =>
log.info(s"Sub-source for ${tp}")
Expand All @@ -78,7 +83,12 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
!_.members().isEmpty
}

val result = Source(0L until totalMessages)
val producerConfig = producerDefaults.withProperties(
// require acknowledgement from at least min in sync replicas (2). default is 1
ProducerConfig.ACKS_CONFIG -> "all"
)

val result: Future[Done] = Source(0L until totalMessages)
.map(logSentMessages)
.map { number =>
if (number == totalMessages / 2) {
Expand All @@ -88,10 +98,12 @@ class PartitionedSourceFailoverSpec extends SpecBase with TestcontainersKafkaPer
number
}
.map(number => new ProducerRecord(topic, (number % partitions).toInt, DefaultKey, number.toString))
.runWith(Producer.plainSink(producerDefaults))
.runWith(Producer.plainSink(producerConfig))

result.futureValue shouldBe Done
control.drainAndShutdown().futureValue shouldBe totalMessages
log.info("Actual messages received [{}], total messages sent [{}]", control.drainAndShutdown().futureValue, totalMessages)
// assert that we receive at least the number of messages we sent, there could be more due to retries
assert(control.drainAndShutdown().futureValue >= totalMessages)
}
}
}
20 changes: 15 additions & 5 deletions tests/src/it/scala/akka/kafka/PlainSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
import org.testcontainers.containers.GenericContainer
Expand All @@ -22,7 +23,6 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
)

"plain source" should {

"not lose any messages when a Kafka node dies" in assertAllStagesStopped {
val broker2: GenericContainer[_] = brokerContainers(1)
val broker2ContainerId: String = broker2.getContainerId
Expand All @@ -36,7 +36,10 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
_.nodes().get().size == testcontainersSettings.numBrokers
}

val topic = createTopic(suffix = 0, partitions, replication = 3)
val topic = createTopic(0, partitions, replication = 3, Map(
// require at least two replicas be in sync before acknowledging produced record
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG -> "2"
))
val groupId = createGroupId(0)

val consumerConfig = consumerDefaults
Expand All @@ -56,6 +59,11 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
case singleConsumer :: Nil => singleConsumer.assignment.topicPartitions.size == partitions
}

val producerConfig = producerDefaults.withProperties(
// require acknowledgement from at least min in sync replicas (2). default is 1
ProducerConfig.ACKS_CONFIG -> "all"
)

val result = Source(1 to totalMessages)
.map { i =>
if (i % 1000 == 0) log.info(s"Sent [$i] messages so far.")
Expand All @@ -69,10 +77,12 @@ class PlainSourceFailoverSpec extends SpecBase with TestcontainersKafkaPerClassL
}
number
}
.runWith(Producer.plainSink(producerDefaults))
.runWith(Producer.plainSink(producerConfig))

result.futureValue
consumer.futureValue shouldBe totalMessages
log.info("Actual messages received [{}], total messages sent [{}]", consumer.futureValue, totalMessages)
// assert that we receive at least the number of messages we sent, there could be more due to retries
assert(consumer.futureValue >= totalMessages)
}
}
}

0 comments on commit 2a42967

Please sign in to comment.