Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop embedded-kafka testkit support #1229

Merged
merged 8 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ updates.ignore = [
{ groupId = "com.typesafe.akka" }
]
updates.pin = [
{ groupId = "org.apache.kafka", artifactId="kafka-clients", version="2.4." }
{ groupId = "io.github.embeddedkafka", version="2.4." }
{ groupId = "org.apache.kafka", artifactId="kafka-clients", version="2.6." }
# To be updated in tandem with upstream Akka
{ groupId = "com.fasterxml.jackson.core", version="2.10." }
{ groupId = "org.scalatest", artifactId = "scalatest", version = "3.1." }
Expand Down
22 changes: 3 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ val kafkaVersion = "2.6.0"
// TODO Jackson is now a provided dependency of kafka-clients
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0
val jacksonVersion = "2.10.5"
val embeddedKafkaVersion = "2.6.0"
val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
val scalatestVersion = "3.1.4"
val testcontainersVersion = "1.14.3"
val slf4jVersion = "1.7.30"
Expand Down Expand Up @@ -239,10 +237,7 @@ lazy val testkit = project
"org.testcontainers" % "kafka" % testcontainersVersion % Provided,
"org.scalatest" %% "scalatest" % scalatestVersion % Provided,
"junit" % "junit" % "4.12" % Provided,
"org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided,
"org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12"),
"org.apache.commons" % "commons-compress" % "1.20" % Provided, // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1
embeddedKafka % Provided exclude ("log4j", "log4j")
"org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided
) ++ silencer,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value
Expand Down Expand Up @@ -299,8 +294,7 @@ lazy val tests = project
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test,
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
"org.mockito" % "mockito-core" % "3.5.13" % Test,
embeddedKafka % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12")
"org.mockito" % "mockito-core" % "3.5.13" % Test
) ++ silencer ++ {
scalaBinaryVersion.value match {
case "2.13" =>
Expand All @@ -313,16 +307,7 @@ lazy val tests = project
}
},
resolvers ++= Seq(
"Confluent Maven Repo" at "https://packages.confluent.io/maven/",
// required to bring in com.github.everit-org.json-schema:org.everit.json.schema:1.12.1
// $ sbt "tests/test:whatDependsOn com.github.everit-org.json-schema org.everit.json.schema 1.12.1"
//[info] com.github.everit-org.json-schema:org.everit.json.schema:1.12.1
//[info] +-io.confluent:kafka-json-schema-provider:5.5.0
//[info] +-io.confluent:kafka-schema-registry:5.5.0
//[info] +-io.github.embeddedkafka:embedded-kafka-schema-registry_2.12:5.5.0.1
//[info] +-com.typesafe.akka:akka-stream-kafka-tests_2.12:2.0.2+26-ddcdbcb8+20200526-1427 [S]
//[info]
"Jitpack" at "https://jitpack.io"
"Confluent Maven Repo" at "https://packages.confluent.io/maven/"
),
publish / skip := true,
whitesourceIgnore := true,
Expand Down Expand Up @@ -353,7 +338,6 @@ lazy val docs = project
Paradox / siteSubdirName := s"docs/alpakka-kafka/${projectInfoVersion.value}",
paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
paradoxProperties ++= Map(
"embeddedKafka.version" -> embeddedKafkaVersion,
"confluent.version" -> confluentAvroSerializerVersion,
"scalatest.version" -> scalatestVersion,
"scaladoc.akka.kafka.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
Expand Down
18 changes: 9 additions & 9 deletions docs/src/main/paradox/producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #settings }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #settings }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #settings }

In addition to programmatic construction of the @apidoc[ProducerSettings$] it can also be created from configuration (`application.conf`).

Expand Down Expand Up @@ -87,7 +87,7 @@ Scala
The materialized value of the sink is a `Future[Done]` which is completed with `Done` when the stream completes, or with with an exception in case an error occurs.

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #plainSink }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #plainSink }
The materialized value of the sink is a `CompletionStage<Done>` which is completed with `Done` when the stream completes, or with an exception in case an error occurs.


Expand All @@ -104,7 +104,7 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #singleMessage }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #singleMessage }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #singleMessage }


For flows the @apidoc[ProducerMessage.Message]s continue as @apidoc[akka.kafka.ProducerMessage.Result] elements containing:
Expand All @@ -122,7 +122,7 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #multiMessage }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #multiMessage }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #multiMessage }

For flows the @apidoc[ProducerMessage.MultiMessage]s continue as @apidoc[akka.kafka.ProducerMessage.MultiResult] elements containing:

Expand All @@ -141,7 +141,7 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #passThroughMessage }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #passThroughMessage }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #passThroughMessage }


For flows the @apidoc[ProducerMessage.PassThroughMessage]s continue as @apidoc[ProducerMessage.PassThroughResult] elements containing the `passThrough` data.
Expand All @@ -157,7 +157,7 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #flow }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #flow }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #flow }


## Connecting a Producer to a Consumer
Expand All @@ -182,15 +182,15 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #producer }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #producer }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #producer }

The @javadoc[KafkaProducer](org.apache.kafka.clients.producer.KafkaProducer) instance (or @scala[Future]@java[CompletionStage]) is passed as a parameter to @apidoc[ProducerSettings] using the methods `withProducer` and `withProducerFactory`.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #plainSinkWithProducer }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #plainSinkWithProducer }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #plainSinkWithProducer }


## Accessing KafkaProducer metrics
Expand All @@ -201,7 +201,7 @@ Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #producerMetrics }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #producerMetrics }
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #producerMetrics }

@@@ index

Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/paradox/testing-testcontainers.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ The Testcontainers dependency must be added to your project explicitly.
The example below shows skeleton test classes for JUnit 4 and JUnit 5. The Kafka broker will start before the first test and be stopped after all test classes are finished.

Java JUnit 4
: @@snip [snip](/tests/src/test/java/docs/javadsl/AssignmentWithTestcontainersTest.java) { #testkit }
: @@snip [snip](/tests/src/test/java/docs/javadsl/AssignmentTest.java) { #testkit }

Java JUnit 5
: @@snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #testkit }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ProducerTest.java) { #testkit }


## Testing with a Docker Kafka cluster from Scala code
Expand Down
144 changes: 7 additions & 137 deletions docs/src/main/paradox/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ project.description: Alpakka Kafka provides a Testkit with support for running l

To simplify testing of streaming integrations with Alpakka Kafka, it provides the **Alpakka Kafka testkit**. It provides help for

@@@ note

**Embedded Kafka testkit support has been deprecated since 2.0.4 and will be removed in the next minor release.**

**Use @ref:[testcontainers (Docker)](testing-testcontainers.md) instead.**

@@@

* ~~@ref:[Using an embedded Kafka broker](#testing-with-an-embedded-kafka-server)~~
* @ref:[Using Docker to launch a local Kafka cluster with testcontainers](testing-testcontainers.md)
* @ref:[Mocking the Alpakka Kafka Consumers and Producers](#mocking-the-consumer-or-producer)

Expand All @@ -35,144 +26,23 @@ To simplify testing of streaming integrations with Alpakka Kafka, it provides th
Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.

The table below shows Alpakka Kafka testkit's direct dependencies and the second tab shows all libraries it depends on transitively.
We've overriden the `commons-compress` library to use a version with [fewer known security vulnerabilities](https://commons.apache.org/proper/commons-compress/security-reports.html).

@@dependencies { projectId="testkit" }

## Running Kafka with your tests

@@@ note

**Embedded Kafka testkit support has been deprecated since 2.0.4**

@@@

The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster. There are two main options:

1. ~~@ref:[Embedded Kafka](#testing-with-an-embedded-kafka-server)~~
2. @ref:[Testcontainers (Docker)](testing-testcontainers.md)
The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster using @ref:[Testcontainers (Docker)](testing-testcontainers.md).

The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers.
You must mix in or implement these types into your test classes to use them.
See the documentation for each for more details.

| Type | Test Framework | Runtime Mode | Cluster | Schema Registry | Lang | Lifetime |
|---------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|---------------------|-------------|---------------------|--------------|--------------------------|
| ~~@ref:[`akka.kafka.testkit.javadsl.EmbeddedKafkaTest`](#testing-with-avro-and-schema-registry-from-java-code)~~ | ~~JUnit 5~~ | ~~Embedded Kafka~~ | ~~No~~ | ~~Yes~~ | ~~Java~~ | ~~All tests, Per class~~ |
| ~~@ref:[`akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test`](#testing-with-avro-and-schema-registry-from-java-code)~~ | ~~JUnit 4~~ | ~~Embedded Kafka~~ | ~~No~~ | ~~Yes~~ | ~~Java~~ | ~~All tests, Per class~~ |
| ~~@ref:[`akka.kafka.testkit.scaladsl.EmbeddedKafkaLike`](#testing-with-avro-and-schema-registry-from-scala-code)~~ | ~~ScalaTest~~ | ~~Embedded Kafka~~ | ~~No~~ | ~~Yes~~ | ~~Scala~~ | ~~Per class~~ |
| @ref:[`akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-java-code) | JUnit 5 | Testcontainers | Yes | Yes | Java | All tests, Per class |
| @ref:[`akka.kafka.testkit.javadsl.TestcontainersKafkaTest`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-java-code) | JUnit 4 | Testcontainers | Yes | Yes | Java | All tests, Per class |
| @ref:[`akka.kafka.testkit.scaladsl.TestcontainersKafkaLike`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-scala-code) | ScalaTest | Testcontainers | Yes | Yes | Scala | All tests |
| @ref:[`akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-scala-code) | ScalaTest | Testcontainers | Yes | Yes | Scala | Per class |

## Testing with an embedded Kafka server

@@@ note

**Embedded Kafka testkit support has been deprecated since 2.0.4**

@@@

To test the Alpakka Kafka connector the [Embedded Kafka library](https://github.com/embeddedkafka/embedded-kafka) is an important tool as it helps to easily start and stop Kafka brokers from test cases.

Add the Embedded Kafka to your test dependencies:

@@dependency [Maven,sbt,Gradle] {
group=io.github.embeddedkafka
artifact=embedded-kafka_2.12
version=$embeddedKafka.version$
scope=test
}

@@@ note

As Kafka uses Scala internally, only the Scala versions supported by Kafka can be used together with Embedded Kafka. To be independent of Kafka's supported Scala versions, run @ref:[Kafka in a Docker container](testing-testcontainers.md).

The helpers for running Embedded Kafka are available for **Scala 2.12 and Scala 2.13**.

@@@

The testkit contains helper classes used by the tests in the Alpakka Kafka connector and may be used for other testing, as well.


### Testing with Avro and Schema Registry

@@@ note

**Embedded Kafka testkit support has been deprecated since 2.0.4**

@@@

If you need to run tests using [Confluent's Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html), you might include [embedded-kafka-schema-registry](https://github.com/embeddedkafka/embedded-kafka-schema-registry) instead.


### Testing with Avro and Schema Registry from Java code

@@@ note

**Embedded Kafka testkit support has been deprecated since 2.0.4**

@@@

Test classes may extend @scaladoc[EmbeddedKafkaTest](akka.kafka.testkit.javadsl.EmbeddedKafkaTest) (JUnit 5) or @scaladoc[EmbeddedKafkaJunit4Test](akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test) (JUnit 4) to automatically start and stop an embedded Kafka broker.

Furthermore it provides

* preconfigured consumer settings (`ConsumerSettings<String, String> consumerDefaults`),
* preconfigured producer settings (`ProducerSettings<String, String> producerDefaults`),
* unique topic creation (`createTopic(int number, int partitions, int replication)`), and
* `CompletionStage` value extraction helper (`<T> T resultOf(CompletionStage<T> stage, java.time.Duration timeout)`).

The example below shows skeleton test classes for JUnit 4 and JUnit 5.

Java JUnit 4
: @@snip [snip](/tests/src/test/java/docs/javadsl/AssignmentTest.java) { #testkit }

Java JUnit 5
: @@snip [snip](/tests/src/test/java/docs/javadsl/ProducerExampleTest.java) { #testkit }

The JUnit test base classes run the @javadoc[assertAllStagesStopped](akka.stream.testkit.javadsl.StreamTestKit#assertAllStagesStopped(akka.stream.Materializer)) check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.


### Testing with Avro and Schema Registry from Scala code

@@@ note

**Embedded Kafka testkit support has been deprecated since 2.0.4**

@@@

The @scaladoc[KafkaSpec](akka.kafka.testkit.scaladsl.KafkaSpec) class offers access to

* preconfigured consumer settings (`consumerDefaults: ConsumerSettings[String, String]`),
* preconfigured producer settings (`producerDefaults: ProducerSettings[String, String]`),
* unique topic creation (`createTopic(number: Int = 0, partitions: Int = 1, replication: Int = 1)`),
* an implicit `LoggingAdapter` for use with the `log()` operator, and
* other goodies.

@scaladoc[EmbeddedKafkaLike](akka.kafka.testkit.scaladsl.EmbeddedKafkaLike) extends @scaladoc[KafkaSpec](akka.kafka.testkit.scaladsl.KafkaSpec) to add automatic starting and stopping of the embedded Kafka broker.

Some Alpakka Kafka tests implemented in Scala use [Scalatest](http://www.scalatest.org/) with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest $scalatest.version$.)

@@dependency [Maven,sbt,Gradle] {
group=org.scalatest
artifact=scalatest
version=$scalatest.version$
scope=test
}

Scala
: @@snip [snip](/tests/src/test/scala/akka/kafka/scaladsl/SpecBase.scala) { #testkit }

By mixing in @scaladoc[EmbeddedKafkaLike](akka.kafka.testkit.scaladsl.EmbeddedKafkaLike) an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.

Scala
: @@snip [snip](/tests/src/test/scala/akka/kafka/scaladsl/EmbeddedKafkaSampleSpec.scala) { #embeddedkafka }

With this `EmbeddedKafkaSpecBase` class test classes can extend it to automatically start and stop a Kafka broker to test with. To configure the Kafka broker non-default, override the `createKafkaConfig` as shown above.

To ensure proper shutdown of all stages in every test, wrap your test code in @scaladoc[assertAllStagesStopped](akka.stream.testkit.scaladsl.StreamTestKit$). This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.
| Type | Test Framework | Cluster | Lang | Lifetime |
|---------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|-------------|--------------|--------------------------|
| @ref:[`akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-java-code) | JUnit 5 | Yes | Java | All tests, Per class |
| @ref:[`akka.kafka.testkit.javadsl.TestcontainersKafkaTest`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-java-code) | JUnit 4 | Yes | Java | All tests, Per class |
| @ref:[`akka.kafka.testkit.scaladsl.TestcontainersKafkaLike`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-scala-code) | ScalaTest | Yes | Scala | All tests |
| @ref:[`akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike`](testing-testcontainers.md#testing-with-a-docker-kafka-cluster-from-scala-code) | ScalaTest | Yes | Scala | Per class |

## Alternative testing libraries

Expand Down
Loading