Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass ProducerSettings to stage #952

Merged
merged 1 commit into from
Oct 29, 2019

Conversation

seglo
Copy link
Contributor

@seglo seglo commented Oct 24, 2019

Purpose

Pass the ProducerSettings object directly into producer stages so that the stage has full access to the producer factory helper methods. It also saves on the number of parameters that need to be passed to these stages.

I also took the opportunity to add more producer factory options to ProducerSettings so that it's not necessary to maintain overloads to pass external or shared KafkaProducer instances to Producer flows and sinks. I deprecated methods that take these overloads.

References

Changes

  • Pass ProducerSettings to stage.
  • Move producer factories to settings
  • Deprecate flow/sink overloads that take producers

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a lot of sense together with the changes for Akka Discovery.
Especially that we can reduce the number of Producer factory methods.

This requires an update of

@seglo seglo added this to the 2.0.0 milestone Oct 25, 2019
@seglo seglo force-pushed the seglo/pass-settings-to-producer branch from 7e72595 to 98203c2 Compare October 25, 2019 20:32
@seglo seglo force-pushed the seglo/pass-settings-to-producer branch from 98203c2 to 662f6b9 Compare October 25, 2019 20:56
core/src/main/resources/reference.conf Outdated Show resolved Hide resolved
core/src/main/scala/akka/kafka/scaladsl/Producer.scala Outdated Show resolved Hide resolved
core/src/main/scala/akka/kafka/scaladsl/Producer.scala Outdated Show resolved Hide resolved
@@ -86,6 +86,8 @@ object Transactional {
transactionalId: String
): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = {
require(transactionalId != null && transactionalId.length > 0, "You must define a Transactional id.")
require(settings.producerFactory.isDefined, "You cannot use a shared or external producer factory.")
require(settings.producerFactoryAsync.isDefined, "You cannot use a shared or external async producer factory.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this won't work with Akka Discovery? Isn't it OK as long as it creates a new instance? Guarding against withProducer is the important bit, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work with Akka Discovery. The internal producerFactorySync and producerFactoryAsync values are optional and only set when the user uses a withProducer method. They're not used internally to create a producer by default. We'll know for sure once the tests aren't blocked by other issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. I mixed up the enrichAsync and producerFactoryAsync.
A feature we tried to unlock with the producer factory was the possibility to wrap the producer with the OpenTracing decorator.

@seglo seglo force-pushed the seglo/pass-settings-to-producer branch from 35d78e7 to 666e84d Compare October 26, 2019 21:19
@seglo seglo requested a review from ennru October 28, 2019 18:59
@seglo
Copy link
Contributor Author

seglo commented Oct 28, 2019

Ready for a final review.

@@ -349,7 +399,11 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* (without blocking for `enriched`).
*/
def createKafkaProducerAsync()(implicit executionContext: ExecutionContext): Future[Producer[K, V]] =
enriched.map(producerFactory)
(producerFactoryAsync, producerFactorySync) match {
case (Some(asyncFactory), _) => asyncFactory(executionContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for the asyncFactory?
Why doesn't it get the enriched settings passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from the Transactional.flow implementation that's been been removed in this PR.
The "async" producer factory was passed to TransactionalProducerStage: https://github.com/akka/alpakka-kafka/pull/952/files#diff-832423d99d7759061130b358267d97a3L101

I don't know if we need this. We no longer require it internally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems irregular to me, remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I [re-]discovered the documentation example used to point to this overload that takes a Future[KafkaProducer[K,V]] that was added with the discovery PR (https://github.com/akka/alpakka-kafka/pull/952/files#diff-832205ad71b0f8522d293275289adc35L60). It seems like the only use for this is if you were to create a KafkaProducer asynchronously externally from Alpakka Kafka, but I'm not sure why we need to support that.

I'll revert the documentation to showing how to share an external KafkaProducer synchronously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's because ProducerSettings exposes createKafkaProducerAsync and createKafkaProducerCompletionStage. I'm using this internally for the transactions PR. Maybe it would be useful to share to the user if they want to get an asynchronous producer for some other purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll hold off removing this for now.

I squashed the commits in this branch to make rebasing easier for the transactions PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thank you for digging. Yes, I added those to allow passing the enriched producer to the factory methods with a shared producer. With this solution for passing the producer factory, we don't need those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@seglo seglo force-pushed the seglo/pass-settings-to-producer branch from 134674a to 990480c Compare October 29, 2019 20:00
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the async factories, either pre-merge or after.
LGTM.

* Move producer factories to settings.
* Deprecate flow/sink overloads that take producers
* Block use of producer factories with transactional flows and sinks
* Deprecate ProducerSettings.producerFactory
* Invert withProducer checks on Transactional.flow
* Make ProducerSpec compatible with async producer assignment
@seglo seglo force-pushed the seglo/pass-settings-to-producer branch from 990480c to afae626 Compare October 29, 2019 21:04
@seglo seglo merged commit f6fe343 into akka:master Oct 29, 2019
@seglo seglo deleted the seglo/pass-settings-to-producer branch October 29, 2019 23:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants