-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass ProducerSettings to stage #952
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes a lot of sense together with the changes for Akka Discovery.
Especially that we can reduce the number of Producer factory methods.
This requires an update of
core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Outdated
Show resolved
Hide resolved
7e72595
to
98203c2
Compare
98203c2
to
662f6b9
Compare
@@ -86,6 +86,8 @@ object Transactional { | |||
transactionalId: String | |||
): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = { | |||
require(transactionalId != null && transactionalId.length > 0, "You must define a Transactional id.") | |||
require(settings.producerFactory.isDefined, "You cannot use a shared or external producer factory.") | |||
require(settings.producerFactoryAsync.isDefined, "You cannot use a shared or external async producer factory.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this won't work with Akka Discovery? Isn't it OK as long as it creates a new instance? Guarding against withProducer
is the important bit, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should work with Akka Discovery. The internal producerFactorySync
and producerFactoryAsync
values are optional and only set when the user uses a withProducer
method. They're not used internally to create a producer by default. We'll know for sure once the tests aren't blocked by other issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right. I mixed up the enrichAsync
and producerFactoryAsync
.
A feature we tried to unlock with the producer factory was the possibility to wrap the producer with the OpenTracing decorator.
35d78e7
to
666e84d
Compare
Ready for a final review. |
@@ -349,7 +399,11 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( | |||
* (without blocking for `enriched`). | |||
*/ | |||
def createKafkaProducerAsync()(implicit executionContext: ExecutionContext): Future[Producer[K, V]] = | |||
enriched.map(producerFactory) | |||
(producerFactoryAsync, producerFactorySync) match { | |||
case (Some(asyncFactory), _) => asyncFactory(executionContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use case for the asyncFactory
?
Why doesn't it get the enriched settings passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this from the Transactional.flow
implementation that's been been removed in this PR.
The "async" producer factory was passed to TransactionalProducerStage
: https://github.com/akka/alpakka-kafka/pull/952/files#diff-832423d99d7759061130b358267d97a3L101
I don't know if we need this. We no longer require it internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems irregular to me, remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I [re-]discovered the documentation example used to point to this overload that takes a Future[KafkaProducer[K,V]]
that was added with the discovery PR (https://github.com/akka/alpakka-kafka/pull/952/files#diff-832205ad71b0f8522d293275289adc35L60). It seems like the only use for this is if you were to create a KafkaProducer
asynchronously externally from Alpakka Kafka, but I'm not sure why we need to support that.
I'll revert the documentation to showing how to share an external KafkaProducer
synchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's because ProducerSettings
exposes createKafkaProducerAsync
and createKafkaProducerCompletionStage
. I'm using this internally for the transactions PR. Maybe it would be useful to share to the user if they want to get an asynchronous producer for some other purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll hold off removing this for now.
I squashed the commits in this branch to make rebasing easier for the transactions PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thank you for digging. Yes, I added those to allow passing the enriched producer to the factory methods with a shared producer. With this solution for passing the producer factory, we don't need those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
134674a
to
990480c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove the async factories, either pre-merge or after.
LGTM.
* Move producer factories to settings. * Deprecate flow/sink overloads that take producers * Block use of producer factories with transactional flows and sinks * Deprecate ProducerSettings.producerFactory * Invert withProducer checks on Transactional.flow * Make ProducerSpec compatible with async producer assignment
990480c
to
afae626
Compare
Purpose
Pass the
ProducerSettings
object directly into producer stages so that the stage has full access to the producer factory helper methods. It also saves on the number of parameters that need to be passed to these stages.I also took the opportunity to add more producer factory options to
ProducerSettings
so that it's not necessary to maintain overloads to pass external or sharedKafkaProducer
instances toProducer
flows and sinks. I deprecated methods that take these overloads.References
Changes