Skip to content

Commit

Permalink
WIP-
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 16, 2019
1 parent 98a4b96 commit b850a7b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 33 deletions.
50 changes: 25 additions & 25 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,31 @@ jobs:
- env: CMD="mimaReportBinaryIssues"
name: "Check binary compatibility. Run locally with: sbt mimaReportBinaryIssues"

- stage: test
env: CMD="++2.11.12 test"
name: "Run tests with Scala 2.11 and AdoptOpenJDK 8"
if: type != cron
- env: CMD="++2.12.9 test"
name: "Run tests with Scala 2.12 and AdoptOpenJDK 8"
- env: CMD="++2.13.0 test"
name: "Run tests with Scala 2.13 and AdoptOpenJDK 8"

- env:
- JDK="adopt@~1.11.0-4"
- _JAVA_OPTIONS="-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler"
- CMD="++2.11.12 test"
name: "Run tests with Scala 2.11 and AdoptOpenJDK 11"
if: type != cron
- env:
- JDK="adopt@~1.11.0-4"
- _JAVA_OPTIONS="-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler"
- CMD="++2.12.9 test"
name: "Run tests with Scala 2.12 and AdoptOpenJDK 11"
- env:
- JDK="adopt@~1.11.0-4"
- _JAVA_OPTIONS="-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler"
- CMD="++2.13.0 test"
name: "Run tests with Scala 2.13 and AdoptOpenJDK 11"
# - stage: test
# env: CMD="++2.11.12 test"
# name: "Run tests with Scala 2.11 and AdoptOpenJDK 8"
# if: type != cron
# - env: CMD="++2.12.9 test"
# name: "Run tests with Scala 2.12 and AdoptOpenJDK 8"
# - env: CMD="++2.13.0 test"
# name: "Run tests with Scala 2.13 and AdoptOpenJDK 8"
#
# - env:
# - JDK="adopt@~1.11.0-4"
# - _JAVA_OPTIONS="-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler"
# - CMD="++2.11.12 test"
# name: "Run tests with Scala 2.11 and AdoptOpenJDK 11"
# if: type != cron
# - env:
# - JDK="adopt@~1.11.0-4"
# - _JAVA_OPTIONS="-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler"
# - CMD="++2.12.9 test"
# name: "Run tests with Scala 2.12 and AdoptOpenJDK 11"
# - env:
# - JDK="adopt@~1.11.0-4"
# - _JAVA_OPTIONS="-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler"
# - CMD="++2.13.0 test"
# name: "Run tests with Scala 2.13 and AdoptOpenJDK 11"

- stage: integration
env: CMD="dockerComposeTestAll"
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ val silencer = {
val embeddedKafkaSchemaRegistry = "5.1.2"

val kafkaScale = settingKey[Int]("Number of kafka docker containers")
val kafkaInternalTopicsRf = settingKey[Int]("Replication factor of internal Kafka topics (must be <= kafkaScale)")

resolvers in ThisBuild ++= Seq(
// for Embedded Kafka
Expand Down Expand Up @@ -124,6 +125,8 @@ val commonSettings = Def.settings(
// -q Suppress stdout for successful tests.
// -s Try to decode Scala names in stack traces and test names.
testOptions += Tests.Argument(jupiterTestFramework, "-a", "-v", "-q", "-s"),
kafkaScale := 1,
kafkaInternalTopicsRf := { if (kafkaScale.value - 1 == 0) 1 else kafkaScale.value - 1 },
scalafmtOnCompile := true,
headerLicense := Some(
HeaderLicense.Custom(
Expand Down Expand Up @@ -327,6 +330,7 @@ lazy val tests = project
import com.github.ehsanyou.sbt.docker.compose.commands.test._
DockerComposeTestCmd(DockerComposeTest.ItTest)
.withEnvVar("KAFKA_SCALE", kafkaScale.value.toString)
.withEnvVar("KAFKA_INTERNAL_TOPICS_RF", kafkaInternalTopicsRf.value.toString)
}
)

Expand Down Expand Up @@ -408,6 +412,7 @@ lazy val benchmarks = project
import com.github.ehsanyou.sbt.docker.compose.commands.test._
DockerComposeTestCmd(DockerComposeTest.ItTest)
.withEnvVar("KAFKA_SCALE", kafkaScale.value.toString)
.withEnvVar("KAFKA_INTERNAL_TOPICS_RF", kafkaInternalTopicsRf.value.toString)
},
dockerfile in docker := {
val artifact: File = assembly.value
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ services:
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 1 # default was 300 (5 minutes)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: ${KAFKA_SCALE:-1}
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: ${KAFKA_INTERNAL_TOPICS_RF:-1}
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: ${KAFKA_INTERNAL_TOPICS_RF:-1}
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: ${KAFKA_INTERNAL_TOPICS_RF:-1}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class PartitionedSourceFailoverSpec extends ScalatestKafkaSpec(PartitionedSource

val docker = new DefaultDockerClient("unix:///var/run/docker.sock")

val gracefulShutdownTimeoutSeconds = 30
implicit val pc = PatienceConfig(30.seconds, 1.second)

final val logSentMessages: Long => Long = i => {
Expand Down Expand Up @@ -87,8 +88,11 @@ class PartitionedSourceFailoverSpec extends ScalatestKafkaSpec(PartitionedSource
.map(logSentMessages)
.map { number =>
if (number == totalMessages / 2) {
// make sure to have enough brokers available to fulfill replication factor of internal kafka topics
// internal topic replication factor set in build.sbt `Setting`: `kafkaInternalTopicsRf`
log.warn(s"Stopping one Kafka container [$Kafka2ContainerId] after [$number] messages")
docker.stopContainer(Kafka2ContainerId, 0)
docker.stopContainer(Kafka2ContainerId, gracefulShutdownTimeoutSeconds)
docker.waitContainer(Kafka2ContainerId)
}
number
}
Expand All @@ -99,5 +103,5 @@ class PartitionedSourceFailoverSpec extends ScalatestKafkaSpec(PartitionedSource
sleep(2.seconds)
control.drainAndShutdown().futureValue shouldBe totalMessages
}
}
}
}
14 changes: 9 additions & 5 deletions tests/src/it/scala/akka/kafka/PlainSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class PlainSourceFailoverSpec extends ScalatestKafkaSpec(PlainSourceFailoverSpec

val docker = new DefaultDockerClient("unix:///var/run/docker.sock")

val gracefulShutdownTimeoutSeconds = 30
implicit val pc = PatienceConfig(30.seconds, 100.millis)

"plain source" should {
Expand Down Expand Up @@ -65,12 +66,15 @@ class PlainSourceFailoverSpec extends ScalatestKafkaSpec(PlainSourceFailoverSpec
i.toString
}
.map(number => new ProducerRecord(topic, partition0, DefaultKey, number))
.map { c =>
if (c.value().toInt == totalMessages / 2) {
log.info("Stopping one Kafka container")
docker.stopContainer(Kafka2ContainerId, 0)
.map { number =>
if (number.value().toInt == totalMessages / 2) {
// make sure to have enough brokers available to fulfill replication factor of internal kafka topics
// internal topic replication factor set in build.sbt `Setting`: `kafkaInternalTopicsRf`
log.warn(s"Stopping one Kafka container [$Kafka2ContainerId] after [$number] messages")
docker.stopContainer(Kafka2ContainerId, gracefulShutdownTimeoutSeconds)
docker.waitContainer(Kafka2ContainerId)
}
c
number
}
.runWith(Producer.plainSink(producerDefaults))

Expand Down

0 comments on commit b850a7b

Please sign in to comment.