Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass ProducerSettings to stage #952

Merged
merged 1 commit into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ akka.kafka.producer {
# Duration to wait for `KafkaProducer.close` to finish.
close-timeout = 60s

# Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
# when the producer instance is shared across multiple producer stages.
close-on-producer-stop = true

# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
Expand Down
61 changes: 46 additions & 15 deletions core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object ProducerSettings {
"Value serializer should be defined or declared in configuration"
)
val closeTimeout = config.getDuration("close-timeout").asScala
val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
Expand All @@ -69,11 +70,12 @@ object ProducerSettings {
keySerializer,
valueSerializer,
closeTimeout,
closeOnProducerStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync = None,
ProducerSettings.createKafkaProducer
producerFactorySync = None
)
}

Expand Down Expand Up @@ -156,7 +158,6 @@ object ProducerSettings {
new KafkaProducer[K, V](settings.getProperties,
settings.keySerializerOpt.orNull,
settings.valueSerializerOpt.orNull)

}

/**
Expand All @@ -172,13 +173,20 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val keySerializerOpt: Option[Serializer[K]],
val valueSerializerOpt: Option[Serializer[V]],
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val parallelism: Int,
val dispatcher: String,
val eosCommitInterval: FiniteDuration,
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
val producerFactory: ProducerSettings[K, V] => Producer[K, V]
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]
) {

@deprecated(
"Use createKafkaProducer(), createKafkaProducerAsync(), or createKafkaProducerCompletionStage() to get a new KafkaProducer",
"1.1.1"
)
def producerFactory: ProducerSettings[K, V] => Producer[K, V] = _ => createKafkaProducer()

/**
* A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
*/
Expand Down Expand Up @@ -217,18 +225,25 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
copy(properties = properties.updated(key, value))

/**
* Duration to wait for `KafkaConsumer.close` to finish.
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout)

/**
* Java API:
* Duration to wait for `KafkaConsumer.close` to finish.
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout.asScala)

/**
* Call `KafkaProducer.close` on the [[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage
* receives a shutdown signal.
*/
def withCloseProducerOnStop(closeProducerOnStop: Boolean): ProducerSettings[K, V] =
copy(closeProducerOnStop = closeProducerOnStop)

/**
* Tuning parameter of how many sends that can run in parallel.
*/
Expand Down Expand Up @@ -275,12 +290,20 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
): ProducerSettings[K, V] =
copy(enrichAsync = Some((s: ProducerSettings[K, V]) => value.apply(s).toScala))

/**
* Replaces the default Kafka producer creation logic with an external producer. This will also set
* `closeProducerOnStop = false` by default.
*/
def withProducer(
producer: Producer[K, V]
): ProducerSettings[K, V] = copy(producerFactorySync = Some(_ => producer), closeProducerOnStop = false)

/**
* Replaces the default Kafka producer creation logic.
*/
def withProducerFactory(
factory: ProducerSettings[K, V] => Producer[K, V]
): ProducerSettings[K, V] = copy(producerFactory = factory)
): ProducerSettings[K, V] = copy(producerFactorySync = Some(factory))

/**
* Get the Kafka producer settings as map.
Expand All @@ -292,33 +315,36 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
keySerializer: Option[Serializer[K]] = keySerializerOpt,
valueSerializer: Option[Serializer[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
closeProducerOnStop: Boolean = closeProducerOnStop,
parallelism: Int = parallelism,
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval,
enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync,
producerFactory: ProducerSettings[K, V] => Producer[K, V] = producerFactory
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties,
keySerializer,
valueSerializer,
closeTimeout,
closeProducerOnStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync,
producerFactory)
producerFactorySync)

override def toString: String =
"akka.kafka.ProducerSettings(" +
s"properties=${properties.mkString(",")}," +
s"keySerializer=$keySerializerOpt," +
s"valueSerializer=$valueSerializerOpt," +
s"closeTimeout=${closeTimeout.toCoarsest}," +
s"closeProducerOnStop=$closeProducerOnStop," +
s"parallelism=$parallelism," +
s"dispatcher=$dispatcher," +
s"eosCommitInterval=${eosCommitInterval.toCoarsest}" +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}" +
")"
s"eosCommitInterval=${eosCommitInterval.toCoarsest}," +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," +
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})"

/**
* Applies `enrichAsync` to complement these settings from asynchronous sources.
Expand All @@ -339,7 +365,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
"Asynchronous settings enrichment is set via `withEnrichAsync` or `withEnrichCompletionStage`, you must use `createKafkaProducerAsync` or `createKafkaProducerCompletionStage` to apply it"
)
} else {
producerFactory.apply(this)
producerFactorySync match {
case Some(factory) => factory.apply(this)
case _ => ProducerSettings.createKafkaProducer(this)
}
}

/**
Expand All @@ -349,7 +378,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* (without blocking for `enriched`).
*/
def createKafkaProducerAsync()(implicit executionContext: ExecutionContext): Future[Producer[K, V]] =
enriched.map(producerFactory)
producerFactorySync match {
case Some(factory) => enriched.map(factory)
case _ => enriched.map(ProducerSettings.createKafkaProducer)
}

/**
* Java API.
Expand All @@ -360,6 +392,5 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* @param executor Executor for asynchronous producer creation
*/
def createKafkaProducerCompletionStage(executor: Executor): CompletionStage[Producer[K, V]] =
enriched.map(producerFactory)(ExecutionContext.fromExecutor(executor)).toJava

createKafkaProducerAsync()(ExecutionContext.fromExecutor(executor)).toJava
}
17 changes: 7 additions & 10 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ProducerMessage._
import akka.kafka.ProducerSettings
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
Expand All @@ -19,7 +20,6 @@ import akka.stream.stage._
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand All @@ -28,14 +28,12 @@ import scala.util.{Failure, Success, Try}
*/
@InternalApi
private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val producerProvider: ExecutionContext => Future[Producer[K, V]]
val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[IN, Future[OUT]]]
with ProducerStage[K, V, P, IN, OUT] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new DefaultProducerStageLogic(this, producerProvider, inheritedAttributes)
new DefaultProducerStageLogic(this, inheritedAttributes)
}

/**
Expand All @@ -45,7 +43,6 @@ private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT
*/
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
stage: ProducerStage[K, V, P, IN, OUT],
producerFactory: ExecutionContext => Future[Producer[K, V]],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
Expand All @@ -68,7 +65,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:

override def preStart(): Unit = {
super.preStart()
val producerFuture = producerFactory(materializer.executionContext)
val producerFuture = stage.settings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
Expand Down Expand Up @@ -190,7 +187,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
else
decider(exception) match {
case Supervision.Stop =>
if (stage.closeProducerOnStop) {
if (stage.settings.closeProducerOnStop) {
producer.close(0, TimeUnit.MILLISECONDS)
}
failStageCb.invoke(exception)
Expand All @@ -205,11 +202,11 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override def postStop(): Unit = {
log.debug("Stage completed")

if (stage.closeProducerOnStop && producer != null) {
if (stage.settings.closeProducerOnStop && producer != null) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(stage.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/akka/kafka/internal/ProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import java.util.concurrent.atomic.AtomicInteger

import akka.annotation.InternalApi
import akka.kafka.ProducerMessage._
import akka.kafka.ProducerSettings
import akka.stream._
import org.apache.kafka.clients.producer.Producer

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.concurrent.Future

/**
* INTERNAL API
Expand All @@ -22,9 +21,7 @@ import scala.concurrent.duration._
*/
@InternalApi
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]] {
val closeTimeout: FiniteDuration
val closeProducerOnStop: Boolean
val producerProvider: ExecutionContext => Future[Producer[K, V]]
val settings: ProducerSettings[K, V]

val in: Inlet[IN] = Inlet[IN]("messages")
val out: Outlet[Future[OUT]] = Outlet[Future[OUT]]("result")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
*/

package akka.kafka.internal

import akka.Done
import akka.annotation.InternalApi
import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker}
import akka.kafka.ProducerMessage.{Envelope, Results}
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.{Attributes, FlowShape}
import akka.kafka.{ConsumerMessage, ProducerSettings}
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.TopicPartition

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

Expand All @@ -26,15 +26,12 @@ import scala.jdk.CollectionConverters._
*/
@InternalApi
private[kafka] final class TransactionalProducerStage[K, V, P](
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val producerProvider: ExecutionContext => Future[Producer[K, V]],
commitInterval: FiniteDuration
val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]]
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TransactionalProducerStageLogic(this, producerProvider, inheritedAttributes, commitInterval)
new TransactionalProducerStageLogic(this, inheritedAttributes)
}

/** Internal API */
Expand Down Expand Up @@ -100,12 +97,8 @@ private object TransactionalProducerStage {
*/
private final class TransactionalProducerStageLogic[K, V, P](
stage: TransactionalProducerStage[K, V, P],
producerFactory: ExecutionContext => Future[Producer[K, V]],
inheritedAttributes: Attributes,
commitInterval: FiniteDuration
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage,
producerFactory,
inheritedAttributes)
inheritedAttributes: Attributes
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, inheritedAttributes)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {
Expand All @@ -128,7 +121,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
initTransactions()
beginTransaction()
resumeDemand()
scheduleOnce(commitSchedulerKey, commitInterval)
scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}

// suspend demand until a Producer has been created
Expand Down Expand Up @@ -158,7 +151,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
case _ =>
scheduleOnce(commitSchedulerKey, commitInterval)
scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}
}

Expand Down Expand Up @@ -201,7 +194,7 @@ private final class TransactionalProducerStageLogic[K, V, P](

val onInternalCommitAckCb: AsyncCallback[Unit] = {
getAsyncCallback[Unit](
_ => scheduleOnce(commitSchedulerKey, commitInterval)
_ => scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/javadsl/Committer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.{Done, NotUsed}
import akka.kafka.ConsumerMessage.{Committable, CommittableOffset, CommittableOffsetBatch}
import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch}
import akka.kafka.{scaladsl, CommitterSettings}
import akka.stream.javadsl.{Flow, FlowWithContext, Sink}

Expand Down
Loading