Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Kafka external sharding strategy #418

Merged
merged 58 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0380f89
working on implementing external sharding
May 16, 2020
03e27bd
kafka sharding implemented but untested
May 16, 2020
c3789e3
merged dependencies
Jun 17, 2020
9662794
still working
Jun 23, 2020
f42cb02
Merge branch 'master' of github.com:nolangrace/cloudflow into kafka-s…
Jul 1, 2020
6c4e811
update shardedSourceWithCommittableContext to return Source.future
Jul 1, 2020
59b884a
add cluster sharding typed dependency
Jul 2, 2020
8823432
Akka Kafka Sharding working and new Source.future implementation working
Jul 2, 2020
bd6dd98
restructure akka kafka sharding implementation and connected car example
Jul 2, 2020
46aa0d2
fix generics for akka kafka sharding
Jul 3, 2020
95dd77a
clean up akka actor
Jul 3, 2020
85f7a3a
merged with master
Jul 3, 2020
80535c7
remove accidental changes to call-record-aggregator
Jul 3, 2020
9dda8ee
update shardedPlainSource
Jul 3, 2020
1ef1cd0
add javaapi for plain and committable kafka sharded source
Jul 3, 2020
dbb55e2
add scaladoc for shardedSourceWithCommittableContext and shardedPlain…
Jul 3, 2020
fda5823
implement akka sharded source test functions
Jul 5, 2020
ccb81a3
fix new line
Jul 5, 2020
36d632e
create Cluster and shardedSource Documentation
Jul 7, 2020
b89442b
implement clusterSharding(system.toTyped) helper method in AkkaStream…
Jul 7, 2020
378a056
remove old ClusterSharding helper from AkkStreamletContextImpl
Jul 7, 2020
e8a12fc
upgrade alpakkakafka to 2.0.3+13-0455a136
Jul 8, 2020
3aec21c
Merge branch 'master' of github.com:lightbend/cloudflow into kafka-sh…
Jul 8, 2020
0132abf
resolve conflict
Jul 8, 2020
fec8a2d
change kafkaSharding to use messageExtractor rather than messageExtra…
Jul 8, 2020
ae51d52
update kafka sharded docs
Jul 8, 2020
ee8764e
remove entityIdExtractor from sharded source Testkit
Jul 8, 2020
b0ef068
add api may change annotation to sharded sources
Jul 21, 2020
b6ddeca
experimental warning added to the top of the cluster and external sha…
Jul 21, 2020
cba01a0
update alpakka kafka to 2.0.4
Jul 22, 2020
ae50240
set connected car to cloudflow 2.1.0-snapshot
Jul 22, 2020
3f996cb
update connected car version
Jul 24, 2020
bc59c86
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
09d174f
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
51489b4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
75a239d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
71b555d
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
736586a
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
af64fdf
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
cc02d17
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
2b7d77b
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
0541895
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef99370
change sharded source signature to use NotUsed
Aug 12, 2020
879fe7b
Update core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaSt…
Aug 12, 2020
c0e1a15
fix TextContext and remove implicit execution context from AkkaStream…
Aug 12, 2020
bc962ca
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
868cd6f
remove alpakka shapshots from connected car example
Aug 12, 2020
77af7a4
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
dad2d09
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ad6c144
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
ef9adb2
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
8defbb9
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
27a40b1
Update docs/shared-content-source/docs/modules/develop/pages/clusteri…
Aug 12, 2020
bcdee41
add sharded when logging info for sharded sources
Aug 12, 2020
ae9d9df
fix bug in shardedSourceWithContext Test kit
Aug 12, 2020
f098bb4
Merge branch 'kafka-sharding' of github.com:nolangrace/cloudflow into…
Aug 12, 2020
9fd6931
added timeout config to sharded sources
Aug 15, 2020
c69e023
Fix logging
Aug 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ lazy val akkastream =
libraryDependencies ++= Vector(
AkkaStream,
AkkaStreamKafka,
AkkaStreamKafaSharding,
AkkaShardingTyped,
AkkaCluster,
AkkaManagement,
AkkaHttp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package cloudflow.akkastream.testkit
import java.util.concurrent.atomic.AtomicReference

import scala.concurrent._

import akka.NotUsed
import akka.actor._
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage._
import akka.stream._
import akka.stream.scaladsl._
import com.typesafe.config._

import cloudflow.akkastream._
import cloudflow.streamlets._

Expand Down Expand Up @@ -172,6 +171,16 @@ private[testkit] case class TestContext(

def metricTags(): Map[String, String] =
Map()

override private[akkastream] def shardedSourceWithCommittableContext[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String) = ???

override private[akkastream] def shardedPlainSource[T, E](inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String,
resetPosition: ResetPosition) =
???
}

case class TestContextException(portName: String, msg: String) extends RuntimeException(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package cloudflow.akkastream

import scala.concurrent.Future

import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.kafka.ConsumerMessage.{ Committable, CommittableOffset }
import akka.kafka.CommitterSettings
import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.stream.scaladsl._
import cloudflow.streamlets._

Expand All @@ -38,11 +39,23 @@ trait AkkaStreamletContext extends StreamletContext {
inlet: CodecInlet[T]
): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T]

private[akkastream] def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String
): SourceWithContext[T, CommittableOffset, _]
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

@deprecated("Use `sourceWithCommittableContext` instead.", "1.3.4")
private[akkastream] def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T]

private[akkastream] def plainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition): Source[T, NotUsed]
private[akkastream] def plainSink[T](outlet: CodecOutlet[T]): Sink[T, NotUsed]
private[akkastream] def shardedPlainSource[T, E](
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String,
resetPosition: ResetPosition
): (Source[T, NotUsed], Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[E]])

private[akkastream] def committableSink[T](outlet: CodecOutlet[T], committerSettings: CommitterSettings): Sink[(T, Committable), NotUsed]
private[akkastream] def committableSink[T](committerSettings: CommitterSettings): Sink[(T, Committable), NotUsed]
Expand Down Expand Up @@ -116,3 +129,6 @@ case object Earliest extends ResetPosition {
case object Latest extends ResetPosition {
val autoOffsetReset = "latest"
}

case class ShardedSourceEnvelope[T, E](source: SourceWithContext[T, CommittableOffset, _],
kafkaShardingExtractor: KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[E])
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import java.nio.file.{ Files, Paths }

import scala.concurrent._
import scala.util._

import akka._
import akka.actor.ActorSystem
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey }
import akka.kafka._
import akka.kafka.ConsumerMessage._
import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.kafka.scaladsl._
import akka.stream._
import akka.stream.scaladsl._

import com.typesafe.config._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization._

import cloudflow.streamlets._

/**
Expand All @@ -45,7 +45,8 @@ final class AkkaStreamletContextImpl(
private[cloudflow] override val streamletDefinition: StreamletDefinition,
sys: ActorSystem
) extends AkkaStreamletContext {
implicit val system: ActorSystem = sys
implicit val system: ActorSystem = sys
implicit val ec: ExecutionContext = sys.dispatcher
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

override def config: Config = streamletDefinition.config

Expand Down Expand Up @@ -91,6 +92,71 @@ final class AkkaStreamletContextImpl(
override def sourceWithCommittableContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithCommittableContext[T] =
sourceWithContext[T](inlet)

private[akkastream] def shardedSourceWithContext[T, M, E](inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String): SourceWithContext[T, CommittableOffset, _] = {
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(topic.bootstrapServers.getOrElse(internalKafkaBootstrapServers))
.withGroupId(gId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(shardEntity.typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
.topics(topic.name)
.withRebalanceListener(rebalanceListener.toClassic)

system.log.info(s"Creating committable source for group: $gId topic: ${topic.name}")
nolangrace marked this conversation as resolved.
Show resolved Hide resolved

import scala.concurrent.duration._
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[M]] =
KafkaClusterSharding(system).messageExtractorNoEnvelope(
timeout = 10.seconds,
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
topic = topic.name,
entityIdExtractor = entityIdExtractor,
settings = consumerSettings
)

Source
.futureSource {
messageExtractor.map { m =>
ClusterSharding(system.toTyped).init(
shardEntity
.withAllocationStrategy(
shardEntity.allocationStrategy
.getOrElse(new ExternalShardAllocationStrategy(system, shardEntity.typeKey.name))
)
.withMessageExtractor(m)
)

Consumer
.sourceWithOffsetContext(consumerSettings, subscription)
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue
.asSource
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have the control available in the materialized value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I agree but maybe something that should be implemented in a separate PR because all of the existing cloudflow sources are implemented in the same way. @RayRoestenburg what are your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru @nolangrace yes leave as is in this PR, and add an issue to change the signature to expose the control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added #638 . The idea has always been that cloudflow takes care of draining etc, but it's better to provide it than to provide NotUsed.

.via(handleTermination)
.map {
case (record, committableOffset) ⇒ inlet.codec.decode(record.value) -> committableOffset
}
}
}
.asSourceWithContext { case (_, committableOffset) ⇒ committableOffset }
.map { case (record, _) ⇒ record }
}

override def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String
): SourceWithContext[T, CommittableOffset, _] =
shardedSourceWithContext[T, M, E](inlet, shardEntity, entityIdExtractor)

@deprecated("Use sourceWithCommittableContext", "1.3.4")
override def sourceWithOffsetContext[T](inlet: CodecInlet[T]): cloudflow.akkastream.scaladsl.SourceWithOffsetContext[T] =
sourceWithContext[T](inlet)
Expand Down Expand Up @@ -174,6 +240,49 @@ final class AkkaStreamletContextImpl(
.mapMaterializedValue(_ ⇒ NotUsed)
}

def shardedPlainSource[T, E](
inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String,
resetPosition: ResetPosition = Latest
): (Source[T, NotUsed], Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[E]]) = {
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
// TODO clean this up, lot of copying code, refactor.
val topic = findTopicForPort(inlet)
val gId = topic.groupId(streamletDefinition.appId, streamletRef, inlet)
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(topic.bootstrapServers.getOrElse(internalKafkaBootstrapServers))
.withGroupId(gId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetPosition.autoOffsetReset)
.withProperties(topic.kafkaConsumerProperties)

val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system).rebalanceListener(typeKey)

import akka.actor.typed.scaladsl.adapter._
val subscription = Subscriptions
.topics(topic.name)
.withRebalanceListener(rebalanceListener.toClassic)

val consumer = Consumer
.plainSource(consumerSettings, subscription)
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
.via(handleTermination)
.map { record ⇒
inlet.codec.decode(record.value)
}

import scala.concurrent.duration._
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[E]] =
KafkaClusterSharding(system).messageExtractorNoEnvelope(
timeout = 10.seconds,
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
topic = "user-topic",
entityIdExtractor = entityIdExtractor,
settings = consumerSettings
)

(consumer, messageExtractor)
}

def sinkRef[T](outlet: CodecOutlet[T]): WritableSinkRef[T] = {
val topic = findTopicForPort(outlet)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import java.nio.file.Path

import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.stream.scaladsl._

import akka.kafka._
import akka.kafka.ConsumerMessage._

import akka.kafka.cluster.sharding.KafkaClusterSharding
import com.typesafe.config.Config

import cloudflow.streamlets._
import cloudflow.akkastream.scaladsl._

import scala.concurrent.Future

/**
* Provides an entry-point for defining the behavior of an AkkaStreamlet.
* Override the `run` method to implement the specific logic / code that should be executed once the streamlet deployed
Expand Down Expand Up @@ -118,6 +119,13 @@ abstract class AkkaStreamletLogic(implicit val context: AkkaStreamletContext) ex
def sourceWithCommittableContext[T](inlet: CodecInlet[T]): SourceWithCommittableContext[T] =
context.sourceWithCommittableContext(inlet)

def shardedSourceWithCommittableContext[T, M, E](
inlet: CodecInlet[T],
shardEntity: Entity[M, E],
entityIdExtractor: M => String
): SourceWithContext[T, CommittableOffset, _] =
context.shardedSourceWithCommittableContext(inlet, shardEntity, entityIdExtractor)

/**
* Java API
*/
Expand All @@ -141,6 +149,14 @@ abstract class AkkaStreamletLogic(implicit val context: AkkaStreamletContext) ex
def plainSource[T](inlet: CodecInlet[T], resetPosition: ResetPosition = Latest): akka.stream.scaladsl.Source[T, NotUsed] =
context.plainSource(inlet, resetPosition)

def shardedPlainSource[T, E](
inlet: CodecInlet[T],
typeKey: EntityTypeKey[E],
entityIdExtractor: E => String,
resetPosition: ResetPosition = Latest
): (Source[T, NotUsed], Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[E]]) =
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
context.shardedPlainSource(inlet, typeKey, entityIdExtractor, resetPosition)

/**
* Java API
*/
Expand Down
2 changes: 2 additions & 0 deletions core/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ object Library {
val AkkaHttpJackson = "com.typesafe.akka" %% "akka-http-jackson" % Version.AkkaHttp
val AkkaHttpSprayJson = "com.typesafe.akka" %% "akka-http-spray-json" % Version.AkkaHttp
val AkkaActor = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed"% Version.Akka
val AkkaStream = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreamContrib = "com.typesafe.akka" %% "akka-stream-contrib" % "0.10"
val AkkaStreamKafka = ("com.typesafe.akka" %% "akka-stream-kafka" % Version.AlpakkaKafka) .exclude("com.fasterxml.jackson.core","jackson-databind") .exclude("com.fasterxml.jackson.module", "jackson-module-scala")
val AkkaStreamKafaSharding = "com.typesafe.akka" %% "akka-stream-kafka-cluster-sharding" % Version.AlpakkaKafka
val AkkaStreamKafkaTestkit = ("com.typesafe.akka" %% "akka-stream-kafka-testkit" % Version.AlpakkaKafka) .exclude("com.typesafe.akka", "akka-stream-testkit")
val AkkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % Version.Akka

Expand Down
1 change: 1 addition & 0 deletions examples/call-record-aggregator/.sbtopts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-mem 2048
2 changes: 1 addition & 1 deletion examples/call-record-aggregator/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.2.1")
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC18")
nolangrace marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
akka {
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,30 @@
package connectedcar.actors

import akka.actor.{ Actor, ActorLogging, ActorRef }
import akka.cluster.sharding.ShardRegion
import connectedcar.data.{ ConnectedCarAgg, ConnectedCarERecord }
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors

import scala.concurrent.ExecutionContext
import connectedcar.data.{ConnectedCarAgg, ConnectedCarERecord}

object ConnectedCarActor {
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg: ConnectedCarERecord ⇒ (msg.carId.toString, msg)
}
case class ConnectedCarERecordWrapper(record: ConnectedCarERecord, sender:ActorRef[ConnectedCarAgg])

private val numberOfShards = 100
object ConnectedCarActor {
def apply(): Behavior[ConnectedCarERecordWrapper] = {
def updated(numberOfRecords: Int, driverName: String, carId:Int, averageSpeed: Double, currentSpeed: Double): Behavior[ConnectedCarERecordWrapper] = {
Behaviors.receive { (context, message) => {
context.log.info("Updated CarId: " + carId +
" Driver Name: " + driverName + " CarSpeed: " + currentSpeed + " From Actor:" + message.sender.path)

val extractShardId: ShardRegion.ExtractShardId = {
case msg: ConnectedCarERecord ⇒ (msg.carId % numberOfShards).toString
}
}

class ConnectedCarActor extends Actor with ActorLogging {

val carId: String = "Car-" + self.path.name
var driverName: String = null
var currentSpeed = 0.0
var averageSpeed = 0.0
var numberOfRecords = 0

var treeActor: ActorRef = null
implicit val ec: ExecutionContext = context.dispatcher

override def receive: Receive = {
case record: ConnectedCarERecord ⇒ {
if (numberOfRecords == 0) {
driverName = record.driver
averageSpeed = record.speed
} else {
averageSpeed = ((averageSpeed * numberOfRecords) + record.speed) / (numberOfRecords + 1)
}

numberOfRecords += 1
currentSpeed = record.speed
val newAverage = ((averageSpeed * numberOfRecords) + message.record.speed) / (numberOfRecords + 1)
val newNumberOfRecords = numberOfRecords+1

log.info("Updated CarId: " + carId + " Driver Name: " + driverName + " CarSpeed: " + currentSpeed + " From Actor:" + sender().path)
message.sender ! ConnectedCarAgg(message.record.carId, message.record.driver, averageSpeed, newNumberOfRecords)

sender() ! ConnectedCarAgg(record.carId, record.driver, averageSpeed, numberOfRecords)
updated(newNumberOfRecords, message.record.driver, message.record.carId, newAverage, message.record.speed)
}
}
}

updated(0, "", 0, 0, 0.0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object CarDataPrinter extends AkkaStreamlet {
override def createLogic() = new RunnableGraphStreamletLogic() {
val flow = FlowWithCommittableContext[ConnectedCarAgg]
.map { record ⇒
log.info("CarId: " + record.carId)
println("CarId: " + record.carId)
}

def runnableGraph =
Expand Down
Loading