Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop Akka 2.5 dependency #1209

Merged
merged 4 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ script:
jobs:
include:
- stage: check
env: CMD="verifyCodeStyle ; mimaReportBinaryIssues"
name: "Code style check and binary-compatibility check. Run locally with: sbt 'verifyCodeStyle ; mimaReportBinaryIssues'"
- env: CMD="Test/compile ; It/compile"
name: "Compile all code with fatal warnings for Scala 2.13. Run locally with: env CI=true sbt 'clean ; Test/compile ; It/compile'"
- env: CMD="+clusterSharding/Test/compile"
name: "Compile cluster-sharding module for all supported Scala versions. Run locally with: env CI=true sbt '+clusterSharding/Test/compile'"
env: CMD="verifyCodeStyle"
name: "Code style check. Run locally with: sbt verifyCodeStyle"
- env: CMD=";++2.13.2 Test/compile ;++2.13.2 It/compile"
name: "Compile all code with fatal warnings for Scala 2.13. Run locally with: env CI=true sbt ';++2.13.2 Test/compile ;++2.13.2 It/compile'"
- env: CMD="verifyDocs"
name: "Create all API docs for artifacts/website and all reference docs. Run locally with: sbt verifyDocs"

Expand Down Expand Up @@ -59,8 +57,6 @@ jobs:
- stage: publish
env: CMD="+publish"
name: "Publish artifacts for all Scala versions"
- env: CMD="+clusterSharding/publish"
name: "Publish akka-stream-kafka-cluster-sharding for all its supported Scala versions"
- script: openssl aes-256-cbc -K $encrypted_d80875c2ae41_key -iv $encrypted_d80875c2ae41_iv -in .travis/travis_alpakka_kafka_rsa.enc -out .travis/id_rsa -d && eval "$(ssh-agent -s)" && chmod 600 .travis/id_rsa && ssh-add .travis/id_rsa && sbt -jvm-opts .jvmopts-travis '++2.13.2 docs/publishRsync'
name: "Publish API and reference documentation"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging with InflightMetrics
val control = fixture.source
.mapAsync(1) { m =>
meter.mark()
m.committableOffset.commitInternal().map(_ => m)(ExecutionContexts.sameThreadExecutionContext)
m.committableOffset.commitInternal().map(_ => m)(ExecutionContexts.parasitic)
}
.toMat(Sink.foreach { msg =>
if (msg.committableOffset.partitionOffset.offset >= fixture.msgCount - 1)
Expand Down
40 changes: 12 additions & 28 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,24 @@ val Nightly = sys.env.get("TRAVIS_EVENT_TYPE").contains("cron")

val Scala212 = "2.12.11"
val Scala213 = "2.13.2"
val akkaVersion26 = "2.6.6"
val akkaVersion = if (Nightly) akkaVersion26 else "2.5.31"
val AkkaBinaryVersion25 = "2.5"
val AkkaBinaryVersion26 = "2.6"
val AkkaBinaryVersion = if (Nightly) AkkaBinaryVersion26 else AkkaBinaryVersion25

val AkkaBinaryVersionForDocs = "2.6"
val KafkaVersionForDocs = "26"

val akkaVersion = "2.6.10"
val kafkaVersion = "2.6.0"
// TODO Jackson is now a provided dependency of kafka-clients
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0
val jacksonVersion = "2.10.5"
val embeddedKafkaVersion = "2.6.0"
val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
val kafkaVersionForDocs = "26"
val scalatestVersion = "3.1.4"
val testcontainersVersion = "1.14.3"
val slf4jVersion = "1.7.30"
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val confluentAvroSerializerVersion = "6.0.0"
val scalapb = "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.8"

val kafkaBrokerWithoutSlf4jLog4j = "org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12")

val confluentLibsExclusionRules = Seq(
Expand Down Expand Up @@ -204,7 +201,7 @@ lazy val `alpakka-kafka` =
| run a single benchmark backed by Docker containers
""".stripMargin
)
.aggregate(core, testkit, tests, benchmarks, docs)
.aggregate(core, testkit, clusterSharding, tests, benchmarks, docs)

lazy val core = project
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -253,12 +250,6 @@ lazy val testkit = project
)
)

/**
* TODO: Once Akka 2.5 is dropped:
* - add to `alpakka-kafka` aggregate project
* - move `ClusterShardingExample` to `tests` project
* - remove all akka26 paradox properties
*/
lazy val clusterSharding = project
.in(file("./cluster-sharding"))
.dependsOn(core)
Expand All @@ -269,7 +260,7 @@ lazy val clusterSharding = project
name := "akka-stream-kafka-cluster-sharding",
AutomaticModuleName.settings("akka.stream.alpakka.kafka.cluster.sharding"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion26
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
) ++ silencer,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value
Expand All @@ -278,7 +269,7 @@ lazy val clusterSharding = project
)

lazy val tests = project
.dependsOn(core, testkit)
.dependsOn(core, testkit, clusterSharding)
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(MimaPlugin, SitePlugin)
.configs(IntegrationTest.extend(Test))
Expand Down Expand Up @@ -368,22 +359,15 @@ lazy val docs = project
"javadoc.akka.kafka.base_url" -> "",
// Akka
"akka.version" -> akkaVersion,
"extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion/%s",
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersion/",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/$AkkaBinaryVersion/",
"extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersionForDocs/%s",
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersionForDocs/",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/$AkkaBinaryVersionForDocs/",
"javadoc.akka.link_style" -> "direct",
"extref.akka-management.base_url" -> s"https://doc.akka.io/docs/akka-management/current/%s",
// Akka 2.6. These can be removed when we drop Akka 2.5 support.
"akka.version26" -> akkaVersion26,
"extref.akka26.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion26/%s",
"scaladoc.akka.actor.typed.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersion26/",
"extref.akka.actor.typed.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion26/%s",
"scaladoc.akka.cluster.sharding.typed.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersion26/",
"extref.akka.cluster.sharding.typed.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion26/%s",
// Kafka
"kafka.version" -> kafkaVersion,
"extref.kafka.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/%s",
"javadoc.org.apache.kafka.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/javadoc/",
"extref.kafka.base_url" -> s"https://kafka.apache.org/$KafkaVersionForDocs/%s",
"javadoc.org.apache.kafka.base_url" -> s"https://kafka.apache.org/$KafkaVersionForDocs/javadoc/",
"javadoc.org.apache.kafka.link_style" -> "frames",
// Java
"extref.java-docs.base_url" -> "https://docs.oracle.com/en/java/javase/11/%s",
Expand Down
20 changes: 10 additions & 10 deletions docs/src/main/paradox/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ project.description: Alpakka Kafka provides a module to use Kafka with Akka Clus
---
# Akka Cluster Sharding

Akka Cluster allows the user to use an @extref[external shard allocation](akka26:/typed/cluster-sharding.html#external-shard-allocation) strategy in order to give the user more control over how many shards are created and what cluster nodes they are assigned to.
Akka Cluster allows the user to use an @extref[external shard allocation](akka:/typed/cluster-sharding.html#external-shard-allocation) strategy in order to give the user more control over how many shards are created and what cluster nodes they are assigned to.
If you consume Kafka messages into your Akka Cluster application then it's possible to run an Alpakka Kafka Consumer on each cluster node and co-locate Kafka partitions with Akka Cluster shards.
When partitions and shards are co-located together then there is less chance that a message must be transmitted over the network by the Akka Cluster Shard Coordinator to a destination user sharded entity.

This module directly depends on `akka-cluster-sharding-typed` version 2.6.6 or later.
This module directly depends on `akka-cluster-sharding-typed` and requires Akka version 2.6.6 or later.

@@project-info{ projectId="clusterSharding" }

Expand Down Expand Up @@ -48,26 +48,26 @@ In the following example we asynchronously request an extractor that does not us
Given a user entity.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #user-entity }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #user-entity }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #user-entity }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #user-entity }

Create a `MessageExtractor`.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #message-extractor }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #message-extractor }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #message-extractor }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #message-extractor }

Setup Akka Typed Cluster Sharding.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #setup-cluster-sharding }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #setup-cluster-sharding }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #setup-cluster-sharding }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #setup-cluster-sharding }

## Rebalance Listener

Expand All @@ -89,7 +89,7 @@ The same message type is used by separate Alpakka Kafka consumers, but the messa
Create the rebalance listener using the extension and pass it into an Alpakka Kafka @scaladoc[Subscription](akka.kafka.Subscription).

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #rebalance-listener }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #rebalance-listener }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #rebalance-listener }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #rebalance-listener }
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ public abstract class BaseKafkaTest extends KafkaTestKitClass {

public final Logger log = LoggerFactory.getLogger(getClass());

protected final Materializer materializer;

protected BaseKafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) {
protected BaseKafkaTest(ActorSystem system, String bootstrapServers) {
super(system, bootstrapServers);
this.materializer = materializer;
}

@Override
Expand All @@ -67,7 +64,7 @@ protected CompletionStage<Done> produceString(String topic, int messageCount, in
return Source.fromIterator(() -> IntStream.range(0, messageCount).iterator())
.map(Object::toString)
.map(n -> new ProducerRecord<String, String>(topic, partition, DefaultKey(), n))
.runWith(Producer.plainSink(producerDefaults()), materializer);
.runWith(Producer.plainSink(producerDefaults()), system());
}

protected CompletionStage<Done> produceString(String topic, String message) {
Expand All @@ -83,8 +80,7 @@ protected final <K, V> CompletionStage<Done> produce(
Pair<K, V>... messages) {
return Source.from(Arrays.asList(messages))
.map(pair -> new ProducerRecord<>(topic, pair.first(), pair.second()))
.runWith(
Producer.plainSink(producerDefaults(keySerializer, valueSerializer)), materializer);
.runWith(Producer.plainSink(producerDefaults(keySerializer, valueSerializer)), system());
}

protected Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumeString(
Expand All @@ -101,7 +97,7 @@ protected <K, V> Consumer.DrainingControl<List<ConsumerRecord<K, V>>> consume(
Subscriptions.topics(topic))
.take(take)
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(materializer);
.run(system());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ private static EmbeddedKafkaConfig embeddedKafkaConfig(
protected final int kafkaPort;
protected final int replicationFactor;

public EmbeddedKafkaJunit4Test(
ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) {
super(system, materializer, "localhost:" + kafkaPort);
public EmbeddedKafkaJunit4Test(ActorSystem system, int kafkaPort, int replicationFactor) {
super(system, "localhost:" + kafkaPort);
this.kafkaPort = kafkaPort;
this.replicationFactor = replicationFactor;
}

protected EmbeddedKafkaJunit4Test(ActorSystem system, Materializer materializer, int kafkaPort) {
this(system, materializer, kafkaPort, 1);
protected EmbeddedKafkaJunit4Test(ActorSystem system, int kafkaPort) {
this(system, kafkaPort, 1);
}

protected static void startEmbeddedKafka(int kafkaPort, int replicationFactor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package akka.kafka.testkit.javadsl;

import akka.actor.ActorSystem;
import akka.stream.Materializer;
import net.manub.embeddedkafka.EmbeddedKafka$;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import net.manub.embeddedkafka.EmbeddedKafkaConfig$;
Expand Down Expand Up @@ -38,15 +37,14 @@ private static EmbeddedKafkaConfig embeddedKafkaConfig(
protected final int kafkaPort;
protected final int replicationFactor;

protected EmbeddedKafkaTest(
ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) {
super(system, materializer, "localhost:" + kafkaPort);
protected EmbeddedKafkaTest(ActorSystem system, int kafkaPort, int replicationFactor) {
super(system, "localhost:" + kafkaPort);
this.kafkaPort = kafkaPort;
this.replicationFactor = replicationFactor;
}

protected EmbeddedKafkaTest(ActorSystem system, Materializer materializer, int kafkaPort) {
this(system, materializer, kafkaPort, 1);
protected EmbeddedKafkaTest(ActorSystem system, int kafkaPort) {
this(system, kafkaPort, 1);
}

protected void startEmbeddedKafka(int kafkaPort, int replicationFactor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
/** JUnit 4 base-class with some convenience for accessing a Kafka broker. */
public abstract class KafkaJunit4Test extends BaseKafkaTest {

protected KafkaJunit4Test(
ActorSystem system, Materializer materializer, String bootstrapServers) {
super(system, materializer, bootstrapServers);
protected KafkaJunit4Test(ActorSystem system, String bootstrapServers) {
super(system, bootstrapServers);
}

@Before
Expand All @@ -33,6 +32,6 @@ public void cleanUpAdmin() {
public void checkForStageLeaks() {
// you might need to configure `stop-timeout` in your `application.conf`
// as the default of 30s will fail this
StreamTestKit.assertAllStagesStopped(materializer);
StreamTestKit.assertAllStagesStopped(Materializer.matFromSystem(system()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
*/
public abstract class KafkaTest extends BaseKafkaTest {

protected KafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) {
super(system, materializer, bootstrapServers);
protected KafkaTest(ActorSystem system, String bootstrapServers) {
super(system, bootstrapServers);
}

@BeforeAll
Expand All @@ -38,6 +38,6 @@ public void cleanUpAdmin() {
public void checkForStageLeaks() {
// you might need to configure `stop-timeout` in your `application.conf`
// as the default of 30s will fail this
StreamTestKit.assertAllStagesStopped(materializer);
StreamTestKit.assertAllStagesStopped(Materializer.matFromSystem(system()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import akka.actor.ActorSystem;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.internal.TestcontainersKafka;
import akka.stream.Materializer;
import org.junit.After;
import org.junit.Before;

Expand All @@ -24,20 +23,19 @@ public abstract class TestcontainersKafkaJunit4Test extends KafkaJunit4Test {
private static final KafkaTestkitTestcontainersSettings settings =
TestcontainersKafka.Singleton().testcontainersSettings();

protected TestcontainersKafkaJunit4Test(ActorSystem system, Materializer materializer) {
super(system, materializer, startKafka(settings));
protected TestcontainersKafkaJunit4Test(ActorSystem system) {
super(system, startKafka(settings));
}

/** @deprecated Use constructor with `testcontainersSettings` instead. since 2.0.0 */
@Deprecated
protected TestcontainersKafkaJunit4Test(
ActorSystem system, Materializer materializer, String confluentPlatformVersion) {
super(system, materializer, startKafka(confluentPlatformVersion));
protected TestcontainersKafkaJunit4Test(ActorSystem system, String confluentPlatformVersion) {
super(system, startKafka(confluentPlatformVersion));
}

protected TestcontainersKafkaJunit4Test(
ActorSystem system, Materializer materializer, KafkaTestkitTestcontainersSettings settings) {
super(system, materializer, startKafka(settings));
ActorSystem system, KafkaTestkitTestcontainersSettings settings) {
super(system, startKafka(settings));
}

/** @deprecated Use method with `testcontainersSettings` instead. since 2.0.0 */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ public abstract class TestcontainersKafkaTest extends KafkaTest {
public static final KafkaTestkitTestcontainersSettings settings =
TestcontainersKafka.Singleton().testcontainersSettings();

protected TestcontainersKafkaTest(ActorSystem system, Materializer materializer) {
super(system, materializer, startKafka(settings));
protected TestcontainersKafkaTest(ActorSystem system) {
super(system, startKafka(settings));
}

protected TestcontainersKafkaTest(
ActorSystem system, Materializer materializer, KafkaTestkitTestcontainersSettings settings) {
super(system, materializer, startKafka(settings));
ActorSystem system, KafkaTestkitTestcontainersSettings settings) {
super(system, startKafka(settings));
}

/** @deprecated Use constructor with `testcontainersSettings` instead. since 2.0.0 */
@Deprecated
protected TestcontainersKafkaTest(
ActorSystem system, Materializer materializer, String confluentPlatformVersion) {
super(system, materializer, startKafka(confluentPlatformVersion));
super(system, startKafka(confluentPlatformVersion));
}

/** @deprecated Use method with `testcontainersSettings` instead. since 2.0.0 */
Expand Down
Loading