-
Notifications
You must be signed in to change notification settings - Fork 90
Conversation
This comment has been minimized.
This comment has been minimized.
@RayRoestenburg I have implemented the pattern we talked about. It's still a little more complex than I would like but I think it looks much better
|
core/cloudflow-akka-testkit/src/main/scala/cloudflow/akkastream/testkit/TestContext.scala
Outdated
Show resolved
Hide resolved
...kka-connected-car-streamlet/src/main/scala/connectedcar/streamlets/ConnectedCarCluster.scala
Outdated
Show resolved
Hide resolved
...kka-connected-car-streamlet/src/main/scala/connectedcar/streamlets/ConnectedCarCluster.scala
Outdated
Show resolved
Hide resolved
...kka-connected-car-streamlet/src/main/scala/connectedcar/streamlets/ConnectedCarCluster.scala
Outdated
Show resolved
Hide resolved
...kka-connected-car-streamlet/src/main/scala/connectedcar/streamlets/ConnectedCarCluster.scala
Outdated
Show resolved
Hide resolved
@nolangrace I made some comments, already looks a lot better, hopefully we can simplify it even further. |
@RayRoestenburg all great recommendations this looks much better now. Much more like a standard AkkaStreamlet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking a lot better! API looks good.
The change in call-record-aggregator plugins is probably a random mistake.
It would also be good if shardedSourceWithContext
in TestContext just falls back to normal sourceWithCommittableContext
, so users can just test their streamlet, since it is all local. Or do you have other ideas for this?
After that, some testing, Java API, scaladocs, and it will be good to go! (which reminds me at some point would be great to add docs for how to use Akka Cluster in Streamlets.)
core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletContext.scala
Outdated
Show resolved
Hide resolved
core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletContextImpl.scala
Outdated
Show resolved
Hide resolved
core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletLogic.scala
Outdated
Show resolved
Hide resolved
docs/shared-content-source/docs/modules/develop/pages/clustering-akka-streamlet.adoc
Outdated
Show resolved
Hide resolved
docs/shared-content-source/docs/modules/develop/pages/clustering-akka-streamlet.adoc
Outdated
Show resolved
Hide resolved
docs/shared-content-source/docs/modules/develop/pages/clustering-akka-streamlet.adoc
Outdated
Show resolved
Hide resolved
docs/shared-content-source/docs/modules/develop/pages/clustering-akka-streamlet.adoc
Outdated
Show resolved
Hide resolved
docs/shared-content-source/docs/modules/develop/pages/clustering-akka-streamlet.adoc
Outdated
Show resolved
Hide resolved
docs/shared-content-source/docs/modules/develop/pages/clustering-akka-streamlet.adoc
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple more comments, please have a look @nolangrace
…reamletLogic.scala Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…ng-akka-streamlet.adoc Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
@RayRoestenburg I have fixed the suggestions you made. I also retested manually using runLocal and deploying to a gke cluster. Its looking good from my point of view |
.sourceWithOffsetContext(consumerSettings, subscription) | ||
// TODO clean this up, once SourceWithContext has mapError and mapMaterializedValue | ||
.asSource | ||
.mapMaterializedValue(_ ⇒ NotUsed) // TODO we should likely use control to gracefully stop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to have the control available in the materialized value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, I agree but maybe something that should be implemented in a separate PR because all of the existing cloudflow sources are implemented in the same way. @RayRoestenburg what are your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru @nolangrace yes leave as is in this PR, and add an issue to change the signature to expose the control.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added #638 . The idea has always been that cloudflow takes care of draining etc, but it's better to provide it than to provide NotUsed.
...ding/akka-connected-car-streamlet/src/main/scala/connectedcar/actors/ConnectedCarActor.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Enno <458526+ennru@users.noreply.github.com>
What changes were proposed in this pull request?
Implemented shardedSourceWithCommittableContext and shardedPlainSource
to provide cloudflow with the features illustrated here
https://doc.akka.io/docs/alpakka-kafka/current/cluster-sharding.html
Why are the changes needed?
Fewer potential network hops when leveraging stateful streaming in cloudflow
Does this PR introduce any user-facing change?
2 new source types
How was this patch tested?
Entirely untested at the moment