Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Kafka external sharding strategy #418

Merged
merged 58 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0380f89
working on implementing external sharding
May 16, 2020
03e27bd
kafka sharding implemented but untested
May 16, 2020
c3789e3
merged dependencies
Jun 17, 2020
9662794
still working
Jun 23, 2020
f42cb02
Merge branch 'master' of github.com:nolangrace/cloudflow into kafka-s…
Jul 1, 2020
6c4e811
update shardedSourceWithCommittableContext to return Source.future
Jul 1, 2020
59b884a
add cluster sharding typed dependency
Jul 2, 2020
8823432
Akka Kafka Sharding working and new Source.future implementation working
Jul 2, 2020
bd6dd98
restructure akka kafka sharding implementation and connected car example
Jul 2, 2020
46aa0d2
fix generics for akka kafka sharding
Jul 3, 2020
95dd77a
clean up akka actor
Jul 3, 2020
85f7a3a
merged with master
Jul 3, 2020
80535c7
remove accidental changes to call-record-aggregator
Jul 3, 2020
9dda8ee
update shardedPlainSource
Jul 3, 2020
1ef1cd0
add javaapi for plain and committable kafka sharded source
Jul 3, 2020
dbb55e2
add scaladoc for shardedSourceWithCommittableContext and shardedPlain…
Jul 3, 2020
fda5823
implement akka sharded source test functions
Jul 5, 2020
ccb81a3
fix new line
Jul 5, 2020
36d632e
create Cluster and shardedSource Documentation
Jul 7, 2020
b89442b
implement clusterSharding(system.toTyped) helper method in AkkaStream…
Jul 7, 2020
378a056
remove old ClusterSharding helper from AkkStreamletContextImpl
Jul 7, 2020
e8a12fc
upgrade alpakkakafka to 2.0.3+13-0455a136
Jul 8, 2020
3aec21c
Merge branch 'master' of github.com:lightbend/cloudflow into kafka-sh…
Jul 8, 2020
0132abf
resolve conflict
Jul 8, 2020
fec8a2d
change kafkaSharding to use messageExtractor rather than messageExtra…
Jul 8, 2020
ae51d52
update kafka sharded docs
Jul 8, 2020
ee8764e
remove entityIdExtractor from sharded source Testkit
Jul 8, 2020
b0ef068
add api may change annotation to sharded sources
Jul 21, 2020
b6ddeca
experimental warning added to the top of the cluster and external sha…
Jul 21, 2020
cba01a0
update alpakka kafka to 2.0.4
Jul 22, 2020
ae50240
set connected car to cloudflow 2.1.0-snapshot
Jul 22, 2020
3f996cb
update connected car version
Jul 24, 2020
bc59c86
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
09d174f
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
51489b4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
75a239d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
71b555d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
736586a
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
af64fdf
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
cc02d17
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
2b7d77b
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
0541895
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef99370
change sharded source signature to use NotUsed
Aug 12, 2020
879fe7b
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
c0e1a15
fix TextContext and remove implicit execution context from AkkaStream…
Aug 12, 2020
bc962ca
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
868cd6f
remove alpakka shapshots from connected car example
Aug 12, 2020
77af7a4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
dad2d09
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ad6c144
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef9adb2
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
8defbb9
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
27a40b1
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
bcdee41
add sharded when logging info for sharded sources
Aug 12, 2020
ae9d9df
fix bug in shardedSourceWithContext Test kit
Aug 12, 2020
f098bb4
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
9fd6931
added timeout config to sharded sources
Aug 15, 2020
c69e023
Fix logging
Aug 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ lazy val akkastream =
libraryDependencies ++= Vector(
AkkaStream,
AkkaStreamKafka,
AkkaStreamKafaSharding,
AkkaShardingTyped,
AkkaCluster,
AkkaManagement,
AkkaHttp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package cloudflow.akkastream.testkit
import java.util.concurrent.atomic.AtomicReference

import scala.concurrent._

import akka.NotUsed
import akka.actor._
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey }
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage._
import akka.stream._
import akka.stream.scaladsl._
import com.typesafe.config._

import cloudflow.akkastream._
import cloudflow.streamlets._

Expand Down Expand Up @@ -73,6 +74,11 @@ private[testkit] case class TestContext(
)
.getOrElse(throw TestContextException(inlet.name, s"Bad test context, could not find source for inlet ${inlet.name}"))

def shardedSourceWithCommittableContext[T, M, E](inlet: CodecInlet[T], shardEntity: Entity[M, E]) = {
ClusterSharding(system.toTyped).init(shardEntity)
sourceWithContext(inlet)
}

private def flowWithCommittableContext[T](outlet: CodecOutlet[T]): cloudflow.akkastream.scaladsl.FlowWithCommittableContext[T, T] = {
val flow = Flow[T]

Expand Down Expand Up @@ -109,6 +115,12 @@ private[testkit] case class TestContext(

def plainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition): Source[T, NotUsed] =
sourceWithCommittableContext[T](inlet).asSource.map(_._1).mapMaterializedValue(_ ⇒ NotUsed)

def shardedPlainSource[T, M, E](inlet: CodecInlet[T], shardEntity: Entity[M, E], resetPosition: ResetPosition = Latest): Source[T, _] = {
ClusterSharding(system.toTyped).init(shardEntity)
plainSource(inlet, resetPosition)
}

def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed] = sinkRef[T](outlet).sink.contramap { el ⇒
(el, TestCommittableOffset())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package cloudflow.akkastream

import scala.concurrent.Future

import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.kafka.ConsumerMessage.{ Committable, CommittableOffset }
import akka.kafka.CommitterSettings
import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.stream.scaladsl._
import cloudflow.streamlets._

Expand All @@ -38,11 +39,21 @@ trait AkkaStreamletContext extends StreamletContext {
inlet: CodecInlet[T]
): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T]

private[akkastream] def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E]
): SourceWithContext[T, CommittableOffset, _]
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

@deprecated("Use `sourceWithCommittableContext` instead.", "1.3.4")
private[akkastream] def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T]

private[akkastream] def plainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition): Source[T, NotUsed]
private[akkastream] def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed]
private[akkastream] def shardedPlainSource[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
resetPosition: ResetPosition = Latest
): Source[T, _]
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

private[akkastream] def committableSink[T](outlet: CodecOutlet[T], committerSettings: CommitterSettings): Sink[(T, Committable), NotUsed]
private[akkastream] def committableSink[T](committerSettings: CommitterSettings): Sink[(T, Committable), NotUsed]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import java.nio.file.{ Files, Paths }

import scala.concurrent._
import scala.util._

import akka._
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity }
import akka.kafka._
import akka.kafka.ConsumerMessage._
import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.kafka.scaladsl._
import akka.stream._
import akka.stream.scaladsl._

import com.typesafe.config._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization._

import cloudflow.streamlets._

/**
Expand All @@ -45,7 +45,8 @@ final class AkkaStreamletContextImpl(
private[cloudflow] override val streamletDefinition: StreamletDefinition,
sys: ActorSystem
) extends AkkaStreamletContext {
implicit val system: ActorSystem = sys
implicit val system: ActorSystem = sys
implicit val ec: ExecutionContext = sys.dispatcher
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

override def config: Config = streamletDefinition.config

Expand Down Expand Up @@ -91,6 +92,68 @@ final class AkkaStreamletContextImpl(
override def sourceWithCommittableContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T] =
sourceWithContext[T](inlet)

private[akkastream] def shardedSourceWithContext[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E]): SourceWithContext[T, CommittableOffset, _] = {
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(topic.bootstrapServers.getOrElse(internalKafkaBootstrapServers))
.withGroupId(gId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(shardEntity.typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
.topics(topic.name)
.withRebalanceListener(rebalanceListener.toClassic)

system.log.info(s"Creating committable source for group: $gId topic: ${topic.name}")
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

import scala.concurrent.duration._
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingMessageExtractor[M]] =
KafkaClusterSharding(system).messageExtractor(
topic = topic.name,
timeout = 10.seconds,
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
settings = consumerSettings
)

Source
.futureSource {
messageExtractor.map { m =>
ClusterSharding(system.toTyped).init(
shardEntity
.withAllocationStrategy(
shardEntity.allocationStrategy
.getOrElse(new ExternalShardAllocationStrategy(system, shardEntity.typeKey.name))
)
.withMessageExtractor(m)
)

Consumer
.sourceWithOffsetContext(consumerSettings, subscription)
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue
.asSource
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have the control available in the materialized value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I agree but maybe something that should be implemented in a separate PR because all of the existing cloudflow sources are implemented in the same way. @RayRoestenburg what are your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru @nolangrace yes leave as is in this PR, and add an issue to change the signature to expose the control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added #638 . The idea has always been that cloudflow takes care of draining etc, but it's better to provide it than to provide NotUsed.

.via(handleTermination)
.map {
case (record, committableOffset) ⇒ inlet.codec.decode(record.value) -> committableOffset
}
}
}
.asSourceWithContext { case (_, committableOffset) ⇒ committableOffset }
.map { case (record, _) ⇒ record }
}

override def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E]
): SourceWithContext[T, CommittableOffset, _] =
shardedSourceWithContext(inlet, shardEntity)

@deprecated("Use sourceWithCommittableContext", "1.3.4")
override def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T] =
sourceWithContext[T](inlet)
Expand Down Expand Up @@ -156,6 +219,56 @@ final class AkkaStreamletContextImpl(
}
}

def shardedPlainSource[T, M, E](inlet: CodecInlet[T], shardEntity: Entity[M, E], resetPosition: ResetPosition = Latest): Source[T, _] = {
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(topic.bootstrapServers.getOrElse(internalKafkaBootstrapServers))
.withGroupId(gId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetPosition.autoOffsetReset)
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(shardEntity.typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
.topics(topic.name)
.withRebalanceListener(rebalanceListener.toClassic)

system.log.info(s"Creating committable source for group: $gId topic: ${topic.name}")

import scala.concurrent.duration._
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingMessageExtractor[M]] =
KafkaClusterSharding(system).messageExtractor(
topic = topic.name,
timeout = 10.seconds,
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
settings = consumerSettings
)

Source
.futureSource {
messageExtractor.map { m =>
ClusterSharding(system.toTyped).init(
shardEntity
.withAllocationStrategy(
shardEntity.allocationStrategy
.getOrElse(new ExternalShardAllocationStrategy(system, shardEntity.typeKey.name))
)
.withMessageExtractor(m)
)

Consumer
.plainSource(consumerSettings, subscription)
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
.via(handleTermination)
.map { record ⇒
inlet.codec.decode(record.value)
}
}
}
}

def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed] = {
val topic = findTopicForPort(outlet)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import java.nio.file.Path

import akka.NotUsed
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.ApiMayChange
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity }
import akka.stream.scaladsl._

import akka.kafka._
import akka.kafka.ConsumerMessage._

import com.typesafe.config.Config

import cloudflow.streamlets._
import cloudflow.akkastream.scaladsl._

Expand Down Expand Up @@ -89,6 +89,12 @@ abstract class AkkaStreamletLogic(implicit val context: AkkaStreamletContext) ex
*/
def getExecutionContext() = executionContext

/**
* Helper method to make it easier to start typed cluster sharding
* with an classic actor system
*/
def clusterSharding() = ClusterSharding(system.toTyped)

nolangrace marked this conversation as resolved.
Show resolved Hide resolved
/**
* This source emits `T` records together with the offset position as context, thus makes it possible
* to commit offset positions to Kafka (as received through the `inlet`).
Expand Down Expand Up @@ -132,6 +138,36 @@ abstract class AkkaStreamletLogic(implicit val context: AkkaStreamletContext) ex
def getSourceWithCommittableContext[T](inlet: CodecInlet[T]): akka.stream.javadsl.SourceWithContext[T, Committable, _] =
context.sourceWithCommittableContext(inlet).asJava

/**
* This source is designed to function the same as [[sourceWithCommittableContext]]
* while also leveraging Akka Kafka Cluster Sharding for stateful streaming.
*
* This source emits `T` records together with the committable context, thus makes it possible
* to commit offset positions to Kafka using `committableSink(outlet: CodecOutlet[T])`.
*
* It is required to use this source with Akka Cluster. This source will start up
* Akka Cluster Sharding using the supplied `shardEntity` and configure the kafka external
* shard strategy to co-locate Kafka partition consumption with Akka Cluster shards.
*
* @param inlet the inlet to consume messages from. The inlet specifies a [[cloudflow.streamlets.Codec]] that is used to deserialize the records read from the underlying transport.
* @param shardEntity is used to specific the settings for the started shard region
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
**/
@ApiMayChange
def shardedSourceWithCommittableContext[T, M, E](inlet: CodecInlet[T],
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
shardEntity: Entity[M, E]): SourceWithContext[T, CommittableOffset, _] =
context.shardedSourceWithCommittableContext(inlet, shardEntity)

/**
* Java API
* @see [[shardedSourceWithCommittableContext]]
*/
@ApiMayChange
def getShardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E]
): akka.stream.javadsl.SourceWithContext[T, Committable, _] =
context.shardedSourceWithCommittableContext(inlet, shardEntity).asJava

/**
* The `plainSource` emits `T` records (as received through the `inlet`).
*
Expand All @@ -152,6 +188,41 @@ abstract class AkkaStreamletLogic(implicit val context: AkkaStreamletContext) ex
def getPlainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition): akka.stream.javadsl.Source[T, NotUsed] =
plainSource(inlet, resetPosition).asJava

/**
* This source is designed to function the same as [[plainSource]]
* while also leveraging Akka Kafka Cluster Sharding for stateful streaming.
*
* The `plainSource` emits `T` records (as received through the `inlet`).
*
* It has no support for committing offsets to Kafka.
*
* It is required to use this source with Akka Cluster. This source will start up
* Akka Cluster Sharding using the supplied `shardEntity` and configure the kafka external
* shard strategy to co-locate Kafka partition consumption with Akka Cluster shards.
*
* @param the inlet to consume messages from. The inlet specifies a [[cloudflow.streamlets.Codec]] that is used to deserialize the records read from the underlying transport.
* @param shardEntity is used to specific the settings for the started shard region
**/
@ApiMayChange
def shardedPlainSource[T, M, E](inlet: CodecInlet[T], shardEntity: Entity[M, E], resetPosition: ResetPosition = Latest): Source[T, _] =
context.shardedPlainSource(inlet, shardEntity, resetPosition)

/**
* Java API
*/
@ApiMayChange
def getShardedPlainSource[T, M, E](inlet: CodecInlet[T], shardEntity: Entity[M, E]): akka.stream.javadsl.Source[T, _] =
shardedPlainSource(inlet, shardEntity, Latest).asJava

/**
* Java API
*/
@ApiMayChange
def getShardedPlainSource[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
resetPosition: ResetPosition = Latest): akka.stream.javadsl.Source[T, _] =
shardedPlainSource(inlet, shardEntity, resetPosition).asJava

/**
* Creates a sink for publishing `T` records to the outlet. The records are partitioned according to the `partitioner` of the `outlet`.
* The `outlet` specifies a [[cloudflow.streamlets.Codec]] that will be used to serialize the records that are written to Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package cloudflow.akkastream

import akka.annotation.ApiMayChange
import cloudflow.streamlets._

@ApiMayChange
trait Clustering { this: AkkaStreamlet ⇒
protected[cloudflow] override def attributes = Set(AkkaClusterAttribute) ++ customAttributes
}
4 changes: 3 additions & 1 deletion core/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Version {
val Akka = "2.6.6"
val AkkaHttp = "10.1.12"
val AkkaMgmt = "1.0.8"
val AlpakkaKafka = "2.0.3"
val AlpakkaKafka = "2.0.4"
val Scala = "2.12.11"
val Spark = "2.4.5"
val Flink = "1.10.0"
Expand All @@ -22,10 +22,12 @@ object Library {
val AkkaHttpJackson = "com.typesafe.akka" %% "akka-http-jackson" % Version.AkkaHttp
val AkkaHttpSprayJson = "com.typesafe.akka" %% "akka-http-spray-json" % Version.AkkaHttp
val AkkaActor = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed"% Version.Akka
val AkkaStream = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreamContrib = "com.typesafe.akka" %% "akka-stream-contrib" % "0.10"
val AkkaStreamKafka = ("com.typesafe.akka" %% "akka-stream-kafka" % Version.AlpakkaKafka) .exclude("com.fasterxml.jackson.core","jackson-databind") .exclude("com.fasterxml.jackson.module", "jackson-module-scala")
val AkkaStreamKafaSharding = "com.typesafe.akka" %% "akka-stream-kafka-cluster-sharding" % Version.AlpakkaKafka
val AkkaStreamKafkaTestkit = ("com.typesafe.akka" %% "akka-stream-kafka-testkit" % Version.AlpakkaKafka) .exclude("com.typesafe.akka", "akka-stream-testkit")
val AkkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % Version.Akka

Expand Down
Loading