Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Kafka external sharding strategy #418

Merged
merged 58 commits into from
Aug 17, 2020

Conversation

nolangrace
Copy link

What changes were proposed in this pull request?

Implemented shardedSourceWithCommittableContext and shardedPlainSource

to provide cloudflow with the features illustrated here
https://doc.akka.io/docs/alpakka-kafka/current/cluster-sharding.html

Why are the changes needed?

Fewer potential network hops when leveraging stateful streaming in cloudflow

Does this PR introduce any user-facing change?

2 new source types

How was this patch tested?

Entirely untested at the moment

@nolangrace

This comment has been minimized.

@nolangrace
Copy link
Author

@RayRoestenburg I have implemented the pattern we talked about. It's still a little more complex than I would like but I think it looks much better

    val source = shardedSourceWithCommittableContext(in,
                                      typeKey: EntityTypeKey[ConnectedCarERecordWrapper],
                                      (msg: ConnectedCarERecordWrapper) => msg.record.carId+""
                                    )


    val clusterSharding = ClusterSharding(system.toTyped)


    def runnableGraph = source.map(kafkaEnvelope => {

      clusterSharding.init(
        Entity(typeKey)(createBehavior = _ => ConnectedCarActor())
          .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
          .withMessageExtractor(kafkaEnvelope.kafkaShardingExtractor)
          .withSettings(ClusterShardingSettings(system.toTyped)))


      kafkaEnvelope.source
        .via(flow)
        .to(committableSink(out))
        .run()
    }).to(Sink.ignore)

@RayRoestenburg
Copy link
Contributor

@nolangrace I made some comments, already looks a lot better, hopefully we can simplify it even further.

@nolangrace
Copy link
Author

nolangrace commented Jul 2, 2020

val in    = AvroInlet[ConnectedCarERecord]("in")

...

    val typeKey = EntityTypeKey[ConnectedCarERecordWrapper]("Car")

    val messageExtractor = (msg: ConnectedCarERecordWrapper) => msg.record.carId+""
    val entity = Entity(typeKey)(createBehavior = entityContext => ConnectedCarActor(entityContext.entityId))

    val source:SourceWithContext[
      ConnectedCarERecord,
      CommittableOffset, _] = shardedSourceWithCommittableContext(in, entity, messageExtractor)

    val clusterSharding = ClusterSharding(system.toTyped)

    def runnableGraph = source.via(flow).to(committableSink(out))

@RayRoestenburg all great recommendations this looks much better now. Much more like a standard AkkaStreamlet

Copy link
Contributor

@RayRoestenburg RayRoestenburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking a lot better! API looks good.
The change in call-record-aggregator plugins is probably a random mistake.
It would also be good if shardedSourceWithContext in TestContext just falls back to normal sourceWithCommittableContext, so users can just test their streamlet, since it is all local. Or do you have other ideas for this?
After that, some testing, Java API, scaladocs, and it will be good to go! (which reminds me at some point would be great to add docs for how to use Akka Cluster in Streamlets.)

Copy link
Contributor

@RayRoestenburg RayRoestenburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple more comments, please have a look @nolangrace

Nolan Grace and others added 14 commits August 12, 2020 14:54
…reamletLogic.scala

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
@nolangrace
Copy link
Author

@RayRoestenburg I have fixed the suggestions you made. I also retested manually using runLocal and deploying to a gke cluster. Its looking good from my point of view

.sourceWithOffsetContext(consumerSettings, subscription)
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue
.asSource
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have the control available in the materialized value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I agree but maybe something that should be implemented in a separate PR because all of the existing cloudflow sources are implemented in the same way. @RayRoestenburg what are your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru @nolangrace yes leave as is in this PR, and add an issue to change the signature to expose the control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added #638 . The idea has always been that cloudflow takes care of draining etc, but it's better to provide it than to provide NotUsed.

Co-authored-by: Enno <458526+ennru@users.noreply.github.com>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants