Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Kafka external sharding strategy #418

Merged
merged 58 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0380f89
working on implementing external sharding
May 16, 2020
03e27bd
kafka sharding implemented but untested
May 16, 2020
c3789e3
merged dependencies
Jun 17, 2020
9662794
still working
Jun 23, 2020
f42cb02
Merge branch 'master' of github.com:nolangrace/cloudflow into kafka-s…
Jul 1, 2020
6c4e811
update shardedSourceWithCommittableContext to return Source.future
Jul 1, 2020
59b884a
add cluster sharding typed dependency
Jul 2, 2020
8823432
Akka Kafka Sharding working and new Source.future implementation working
Jul 2, 2020
bd6dd98
restructure akka kafka sharding implementation and connected car example
Jul 2, 2020
46aa0d2
fix generics for akka kafka sharding
Jul 3, 2020
95dd77a
clean up akka actor
Jul 3, 2020
85f7a3a
merged with master
Jul 3, 2020
80535c7
remove accidental changes to call-record-aggregator
Jul 3, 2020
9dda8ee
update shardedPlainSource
Jul 3, 2020
1ef1cd0
add javaapi for plain and committable kafka sharded source
Jul 3, 2020
dbb55e2
add scaladoc for shardedSourceWithCommittableContext and shardedPlain…
Jul 3, 2020
fda5823
implement akka sharded source test functions
Jul 5, 2020
ccb81a3
fix new line
Jul 5, 2020
36d632e
create Cluster and shardedSource Documentation
Jul 7, 2020
b89442b
implement clusterSharding(system.toTyped) helper method in AkkaStream…
Jul 7, 2020
378a056
remove old ClusterSharding helper from AkkStreamletContextImpl
Jul 7, 2020
e8a12fc
upgrade alpakkakafka to 2.0.3+13-0455a136
Jul 8, 2020
3aec21c
Merge branch 'master' of github.com:lightbend/cloudflow into kafka-sh…
Jul 8, 2020
0132abf
resolve conflict
Jul 8, 2020
fec8a2d
change kafkaSharding to use messageExtractor rather than messageExtra…
Jul 8, 2020
ae51d52
update kafka sharded docs
Jul 8, 2020
ee8764e
remove entityIdExtractor from sharded source Testkit
Jul 8, 2020
b0ef068
add api may change annotation to sharded sources
Jul 21, 2020
b6ddeca
experimental warning added to the top of the cluster and external sha…
Jul 21, 2020
cba01a0
update alpakka kafka to 2.0.4
Jul 22, 2020
ae50240
set connected car to cloudflow 2.1.0-snapshot
Jul 22, 2020
3f996cb
update connected car version
Jul 24, 2020
bc59c86
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
09d174f
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
51489b4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
75a239d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
71b555d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
736586a
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
af64fdf
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
cc02d17
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
2b7d77b
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
0541895
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef99370
change sharded source signature to use NotUsed
Aug 12, 2020
879fe7b
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
c0e1a15
fix TextContext and remove implicit execution context from AkkaStream…
Aug 12, 2020
bc962ca
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
868cd6f
remove alpakka shapshots from connected car example
Aug 12, 2020
77af7a4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
dad2d09
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ad6c144
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef9adb2
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
8defbb9
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
27a40b1
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
bcdee41
add sharded when logging info for sharded sources
Aug 12, 2020
ae9d9df
fix bug in shardedSourceWithContext Test kit
Aug 12, 2020
f098bb4
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
9fd6931
added timeout config to sharded sources
Aug 15, 2020
c69e023
Fix logging
Aug 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ lazy val akkastream =
libraryDependencies ++= Vector(
AkkaStream,
AkkaStreamKafka,
AkkaStreamKafaSharding,
AkkaShardingTyped,
AkkaCluster,
AkkaManagement,
AkkaHttp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ package cloudflow.akkastream.testkit
import java.util.concurrent.atomic.AtomicReference

import scala.concurrent._

import akka.NotUsed
import akka.actor._
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity }
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage._
import akka.stream._
import akka.stream.scaladsl._
import com.typesafe.config._

import cloudflow.akkastream._
import cloudflow.streamlets._

import scala.concurrent.duration.{ DurationInt, FiniteDuration }

private[testkit] abstract class Completed

private[testkit] case class TestContext(
Expand Down Expand Up @@ -73,6 +75,25 @@ private[testkit] case class TestContext(
)
.getOrElse(throw TestContextException(inlet.name, s"Bad test context, could not find source for inlet ${inlet.name}"))

def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
kafkaTimeout: FiniteDuration = 10.seconds
): SourceWithContext[T, CommittableOffset, Future[NotUsed]] = {
ClusterSharding(system.toTyped).init(shardEntity)

Source
.futureSource(
Future {
sourceWithContext(inlet).asSource
.asInstanceOf[Source[(T, CommittableOffset), NotUsed]]
}(system.dispatcher)
)
.asSourceWithContext { case (_, committableOffset) ⇒ committableOffset }
.map { case (record, _) ⇒ record }

}

private def flowWithCommittableContext[T](outlet: CodecOutlet[T]): cloudflow.akkastream.scaladsl.FlowWithCommittableContext[T, T] = {
val flow = Flow[T]

Expand Down Expand Up @@ -109,6 +130,19 @@ private[testkit] case class TestContext(

def plainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition): Source[T, NotUsed] =
sourceWithCommittableContext[T](inlet).asSource.map(_._1).mapMaterializedValue(_ ⇒ NotUsed)

def shardedPlainSource[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
resetPosition: ResetPosition = Latest,
kafkaTimeout: FiniteDuration = 10.seconds): Source[T, Future[NotUsed]] = {
ClusterSharding(system.toTyped).init(shardEntity)
Source.futureSource(
Future {
plainSource(inlet, resetPosition)
}(system.dispatcher)
)
}

def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed] = sinkRef[T](outlet).sink.contramap { el ⇒
(el, TestCommittableOffset())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
package cloudflow.akkastream

import scala.concurrent.Future

import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.kafka.ConsumerMessage.{ Committable, CommittableOffset }
import akka.kafka.CommitterSettings
import akka.stream.scaladsl._
import cloudflow.streamlets._

import scala.concurrent.duration.{ DurationInt, FiniteDuration }

/**
* Runtime context for [[AkkaStreamlet]]s, which provides means to create [[akka.stream.scaladsl.Source Source]]s and [[akka.stream.scaladsl.Sink Sink]]s respectively
* for [[cloudflow.streamlets.CodecInlet CodeInlet]]s and [[cloudflow.streamlets.CodecOutlet CodeOutlet]]s.
Expand All @@ -38,11 +40,23 @@ trait AkkaStreamletContext extends StreamletContext {
inlet: CodecInlet[T]
): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T]

private[akkastream] def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
kafkaTimeout: FiniteDuration = 10.seconds
): SourceWithContext[T, CommittableOffset, Future[NotUsed]]

@deprecated("Use `sourceWithCommittableContext` instead.", "1.3.4")
private[akkastream] def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T]

private[akkastream] def plainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition): Source[T, NotUsed]
private[akkastream] def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed]
private[akkastream] def shardedPlainSource[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
resetPosition: ResetPosition = Latest,
kafkaTimeout: FiniteDuration = 10.seconds
): Source[T, Future[NotUsed]]

private[akkastream] def committableSink[T](outlet: CodecOutlet[T], committerSettings: CommitterSettings): Sink[(T, Committable), NotUsed]
private[akkastream] def committableSink[T](committerSettings: CommitterSettings): Sink[(T, Committable), NotUsed]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ import java.nio.file.{ Files, Paths }

import scala.concurrent._
import scala.util._

import akka._
import akka.actor.ActorSystem
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity }
import akka.kafka._
import akka.kafka.ConsumerMessage._
import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.kafka.scaladsl._
import akka.stream._
import akka.stream.scaladsl._

import com.typesafe.config._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization._

import cloudflow.streamlets._

import scala.concurrent.duration.{ DurationInt, FiniteDuration }

/**
* Implementation of the StreamletContext trait.
*/
Expand Down Expand Up @@ -91,6 +92,71 @@ final class AkkaStreamletContextImpl(
override def sourceWithCommittableContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T] =
sourceWithContext[T](inlet)

private[akkastream] def shardedSourceWithContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
kafkaTimeout: FiniteDuration = 10.seconds
): SourceWithContext[T, CommittableOffset, Future[NotUsed]] = {
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(topic.bootstrapServers.getOrElse(internalKafkaBootstrapServers))
.withGroupId(gId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(shardEntity.typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
.topics(topic.name)
.withRebalanceListener(rebalanceListener.toClassic)

system.log.info(s"Creating sharded committable source for group: $gId topic: ${topic.name}")

val messageExtractor: Future[KafkaClusterSharding.KafkaShardingMessageExtractor[M]] =
KafkaClusterSharding(system).messageExtractor(
topic = topic.name,
timeout = kafkaTimeout,
settings = consumerSettings
)

Source
.futureSource {
messageExtractor.map { m =>
ClusterSharding(system.toTyped).init(
shardEntity
.withAllocationStrategy(
shardEntity.allocationStrategy
.getOrElse(new ExternalShardAllocationStrategy(system, shardEntity.typeKey.name))
)
.withMessageExtractor(m)
)

Consumer
.sourceWithOffsetContext(consumerSettings, subscription)
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue
.asSource
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have the control available in the materialized value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I agree but maybe something that should be implemented in a separate PR because all of the existing cloudflow sources are implemented in the same way. @RayRoestenburg what are your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru @nolangrace yes leave as is in this PR, and add an issue to change the signature to expose the control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added #638 . The idea has always been that cloudflow takes care of draining etc, but it's better to provide it than to provide NotUsed.

.via(handleTermination)
.map {
case (record, committableOffset) ⇒ inlet.codec.decode(record.value) -> committableOffset
}
}(system.dispatcher)
}
.asSourceWithContext { case (_, committableOffset) ⇒ committableOffset }
.map { case (record, _) ⇒ record }
}

override def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
kafkaTimeout: FiniteDuration = 10.seconds
): SourceWithContext[T, CommittableOffset, Future[NotUsed]] =
shardedSourceWithContext(inlet, shardEntity)

@deprecated("Use sourceWithCommittableContext", "1.3.4")
override def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T] =
sourceWithContext[T](inlet)
Expand Down Expand Up @@ -156,6 +222,58 @@ final class AkkaStreamletContextImpl(
}
}

def shardedPlainSource[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
resetPosition: ResetPosition = Latest,
kafkaTimeout: FiniteDuration = 10.seconds): Source[T, Future[NotUsed]] = {
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(topic.bootstrapServers.getOrElse(internalKafkaBootstrapServers))
.withGroupId(gId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetPosition.autoOffsetReset)
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(shardEntity.typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
.topics(topic.name)
.withRebalanceListener(rebalanceListener.toClassic)

system.log.info(s"Creating sharded plain source for group: $gId topic: ${topic.name}")

val messageExtractor: Future[KafkaClusterSharding.KafkaShardingMessageExtractor[M]] =
KafkaClusterSharding(system).messageExtractor(
topic = topic.name,
timeout = kafkaTimeout,
settings = consumerSettings
)

Source
.futureSource {
messageExtractor.map { m =>
ClusterSharding(system.toTyped).init(
shardEntity
.withAllocationStrategy(
shardEntity.allocationStrategy
.getOrElse(new ExternalShardAllocationStrategy(system, shardEntity.typeKey.name))
)
.withMessageExtractor(m)
)

Consumer
.plainSource(consumerSettings, subscription)
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
.via(handleTermination)
.map { record ⇒
inlet.codec.decode(record.value)
}
}(system.dispatcher)
}
}

def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed] = {
val topic = findTopicForPort(outlet)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)
Expand Down
Loading