Skip to content
This repository was archived by the owner on Nov 22, 2024. It is now read-only.

Kafka external sharding strategy #418

Merged
merged 58 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0380f89
working on implementing external sharding
May 16, 2020
03e27bd
kafka sharding implemented but untested
May 16, 2020
c3789e3
merged dependencies
Jun 17, 2020
9662794
still working
Jun 23, 2020
f42cb02
Merge branch 'master' of github.com:nolangrace/cloudflow into kafka-s…
Jul 1, 2020
6c4e811
update shardedSourceWithCommittableContext to return Source.future
Jul 1, 2020
59b884a
add cluster sharding typed dependency
Jul 2, 2020
8823432
Akka Kafka Sharding working and new Source.future implementation working
Jul 2, 2020
bd6dd98
restructure akka kafka sharding implementation and connected car example
Jul 2, 2020
46aa0d2
fix generics for akka kafka sharding
Jul 3, 2020
95dd77a
clean up akka actor
Jul 3, 2020
85f7a3a
merged with master
Jul 3, 2020
80535c7
remove accidental changes to call-record-aggregator
Jul 3, 2020
9dda8ee
update shardedPlainSource
Jul 3, 2020
1ef1cd0
add javaapi for plain and committable kafka sharded source
Jul 3, 2020
dbb55e2
add scaladoc for shardedSourceWithCommittableContext and shardedPlain…
Jul 3, 2020
fda5823
implement akka sharded source test functions
Jul 5, 2020
ccb81a3
fix new line
Jul 5, 2020
36d632e
create Cluster and shardedSource Documentation
Jul 7, 2020
b89442b
implement clusterSharding(system.toTyped) helper method in AkkaStream…
Jul 7, 2020
378a056
remove old ClusterSharding helper from AkkStreamletContextImpl
Jul 7, 2020
e8a12fc
upgrade alpakkakafka to 2.0.3+13-0455a136
Jul 8, 2020
3aec21c
Merge branch 'master' of github.com:lightbend/cloudflow into kafka-sh…
Jul 8, 2020
0132abf
resolve conflict
Jul 8, 2020
fec8a2d
change kafkaSharding to use messageExtractor rather than messageExtra…
Jul 8, 2020
ae51d52
update kafka sharded docs
Jul 8, 2020
ee8764e
remove entityIdExtractor from sharded source Testkit
Jul 8, 2020
b0ef068
add api may change annotation to sharded sources
Jul 21, 2020
b6ddeca
experimental warning added to the top of the cluster and external sha…
Jul 21, 2020
cba01a0
update alpakka kafka to 2.0.4
Jul 22, 2020
ae50240
set connected car to cloudflow 2.1.0-snapshot
Jul 22, 2020
3f996cb
update connected car version
Jul 24, 2020
bc59c86
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
09d174f
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
51489b4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
75a239d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
71b555d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
736586a
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
af64fdf
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
cc02d17
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
2b7d77b
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
0541895
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef99370
change sharded source signature to use NotUsed
Aug 12, 2020
879fe7b
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
c0e1a15
fix TextContext and remove implicit execution context from AkkaStream…
Aug 12, 2020
bc962ca
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
868cd6f
remove alpakka shapshots from connected car example
Aug 12, 2020
77af7a4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
dad2d09
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ad6c144
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef9adb2
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
8defbb9
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
27a40b1
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
bcdee41
add sharded when logging info for sharded sources
Aug 12, 2020
ae9d9df
fix bug in shardedSourceWithContext Test kit
Aug 12, 2020
f098bb4
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
9fd6931
added timeout config to sharded sources
Aug 15, 2020
c69e023
Fix logging
Aug 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.concurrent._
import akka.NotUsed
import akka.actor._
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage._
import akka.stream._
Expand Down Expand Up @@ -172,9 +172,9 @@ private[testkit] case class TestContext(
def metricTags(): Map[String, String] =
Map()

override private[akkastream] def shardedSourceWithCommittableContext[T, E](inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String) = ???
override private[akkastream] def shardedSourceWithCommittableContext[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String) = ???

override private[akkastream] def shardedPlainSource[T, E](inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package cloudflow.akkastream
import scala.concurrent.Future
import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.kafka.ConsumerMessage.{ Committable, CommittableOffset }
import akka.kafka.CommitterSettings
import akka.kafka.cluster.sharding.KafkaClusterSharding
Expand All @@ -39,11 +39,11 @@ trait AkkaStreamletContext extends StreamletContext {
inlet: CodecInlet[T]
): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T]

private[akkastream] def shardedSourceWithCommittableContext[T, E](
private[akkastream] def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String
): Source[ShardedSourceEnvelope[T, E], _]
shardEntity: Entity[M, E],
entityIdExtractor: M => String
): SourceWithContext[T, CommittableOffset, _]

@deprecated("Use `sourceWithCommittableContext` instead.", "1.3.4")
private[akkastream] def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import scala.concurrent._
import scala.util._
import akka._
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey }
import akka.kafka._
import akka.kafka.ConsumerMessage._
import akka.kafka.cluster.sharding.KafkaClusterSharding
Expand Down Expand Up @@ -90,9 +92,9 @@ final class AkkaStreamletContextImpl(
override def sourceWithCommittableContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T] =
sourceWithContext[T](inlet)

private[akkastream] def shardedSourceWithContext[T, E](inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String): Source[ShardedSourceEnvelope[T, E], _] = {
private[akkastream] def shardedSourceWithContext[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String): SourceWithContext[T, CommittableOffset, _] = {
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)

Expand All @@ -103,7 +105,7 @@ final class AkkaStreamletContextImpl(
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(typeKey)
KafkaClusterSharding(system).rebalanceListener(shardEntity.typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
Expand All @@ -112,40 +114,48 @@ final class AkkaStreamletContextImpl(

system.log.info(s"Creating committable source for group: $gId topic: ${topic.name}")

val consumer = Consumer
.sourceWithOffsetContext(consumerSettings, subscription)
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue
.asSource
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
.via(handleTermination)
.map {
case (record, committableOffset) ⇒ inlet.codec.decode(record.value) -> committableOffset
}
.asSourceWithContext { case (_, committableOffset) ⇒ committableOffset }
.map { case (record, _) ⇒ record }

import scala.concurrent.duration._
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[E]] =
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[M]] =
KafkaClusterSharding(system).messageExtractorNoEnvelope(
timeout = 10.seconds,
topic = topic.name,
entityIdExtractor = entityIdExtractor,
settings = consumerSettings
)

Source.future {
messageExtractor.map { messageExtractor =>
ShardedSourceEnvelope(consumer, messageExtractor)
Source
.futureSource {
messageExtractor.map { m =>
ClusterSharding(system.toTyped).init(
shardEntity
.withAllocationStrategy(
shardEntity.allocationStrategy
.getOrElse(new ExternalShardAllocationStrategy(system, shardEntity.typeKey.name))
)
.withMessageExtractor(m)
)

Consumer
.sourceWithOffsetContext(consumerSettings, subscription)
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue
.asSource
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have the control available in the materialized value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I agree but maybe something that should be implemented in a separate PR because all of the existing cloudflow sources are implemented in the same way. @RayRoestenburg what are your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru @nolangrace yes leave as is in this PR, and add an issue to change the signature to expose the control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added #638 . The idea has always been that cloudflow takes care of draining etc, but it's better to provide it than to provide NotUsed.

.via(handleTermination)
.map {
case (record, committableOffset) ⇒ inlet.codec.decode(record.value) -> committableOffset
}
}
}
}
.asSourceWithContext { case (_, committableOffset) ⇒ committableOffset }
.map { case (record, _) ⇒ record }
}

override def shardedSourceWithCommittableContext[T, E](
override def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String
): Source[ShardedSourceEnvelope[T, E], _] =
shardedSourceWithContext[T, E](inlet, typeKey, entityIdExtractor)
shardEntity: Entity[M, E],
entityIdExtractor: M => String
): SourceWithContext[T, CommittableOffset, _] =
shardedSourceWithContext[T, M, E](inlet, shardEntity, entityIdExtractor)

@deprecated("Use sourceWithCommittableContext", "1.3.4")
override def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.file.Path

import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.stream.scaladsl._
import akka.kafka._
import akka.kafka.ConsumerMessage._
Expand Down Expand Up @@ -119,12 +119,12 @@ abstract class AkkaStreamletLogic(implicit val context: AkkaStreamletContext) ex
def sourceWithCommittableContext[T](inlet: CodecInlet[T]): SourceWithCommittableContext[T] =
context.sourceWithCommittableContext(inlet)

def shardedSourceWithCommittableContext[T, E](
def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String
): Source[ShardedSourceEnvelope[T, E], _] =
context.shardedSourceWithCommittableContext(inlet, typeKey, entityIdExtractor)
shardEntity: Entity[M, E],
entityIdExtractor: M => String
): SourceWithContext[T, CommittableOffset, _] =
context.shardedSourceWithCommittableContext(inlet, shardEntity, entityIdExtractor)

/**
* Java API
Expand Down
2 changes: 1 addition & 1 deletion examples/call-record-aggregator/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.2.1")
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC18")
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,14 @@ object ConnectedCarCluster extends AkkaStreamlet with Clustering {
override def createLogic = new RunnableGraphStreamletLogic() {
val typeKey = EntityTypeKey[ConnectedCarERecordWrapper]("Car")

val source = shardedSourceWithCommittableContext(in,
typeKey: EntityTypeKey[ConnectedCarERecordWrapper],
(msg: ConnectedCarERecordWrapper) => msg.record.carId+""
)
val messageExtractor = (msg: ConnectedCarERecordWrapper) => msg.record.carId+""
val entity = Entity(typeKey)(createBehavior = entityContext => ConnectedCarActor())

val clusterSharding = ClusterSharding(system.toTyped)

def runnableGraph = source.map(kafkaEnvelope => {
val source = shardedSourceWithCommittableContext(in, entity, messageExtractor)

clusterSharding.init(
Entity(typeKey)(createBehavior = _ => ConnectedCarActor())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
.withMessageExtractor(kafkaEnvelope.kafkaShardingExtractor)
.withSettings(ClusterShardingSettings(system.toTyped)))
val clusterSharding = ClusterSharding(system.toTyped)

kafkaEnvelope.source
.via(flow)
.to(committableSink(out))
.run()
}).to(Sink.ignore)
def runnableGraph = source.via(flow).to(Sink.ignore)

implicit val timeout: Timeout = 3.seconds
def flow =
Expand Down