Skip to content

Commit

Permalink
Pass ProducerSettings to stage (#952)
Browse files Browse the repository at this point in the history
* Move producer factories to settings.
* Deprecate flow/sink overloads that take producers
* Block use of producer factories with transactional flows and sinks
* Deprecate ProducerSettings.producerFactory
* Make ProducerSpec compatible with async producer assignment
  • Loading branch information
seglo authored Oct 29, 2019
1 parent 1da2090 commit f6fe343
Show file tree
Hide file tree
Showing 19 changed files with 210 additions and 213 deletions.
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ akka.kafka.producer {
# Duration to wait for `KafkaProducer.close` to finish.
close-timeout = 60s

# Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
# when the producer instance is shared across multiple producer stages.
close-on-producer-stop = true

# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
Expand Down
61 changes: 46 additions & 15 deletions core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object ProducerSettings {
"Value serializer should be defined or declared in configuration"
)
val closeTimeout = config.getDuration("close-timeout").asScala
val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
Expand All @@ -69,11 +70,12 @@ object ProducerSettings {
keySerializer,
valueSerializer,
closeTimeout,
closeOnProducerStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync = None,
ProducerSettings.createKafkaProducer
producerFactorySync = None
)
}

Expand Down Expand Up @@ -156,7 +158,6 @@ object ProducerSettings {
new KafkaProducer[K, V](settings.getProperties,
settings.keySerializerOpt.orNull,
settings.valueSerializerOpt.orNull)

}

/**
Expand All @@ -172,13 +173,20 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val keySerializerOpt: Option[Serializer[K]],
val valueSerializerOpt: Option[Serializer[V]],
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val parallelism: Int,
val dispatcher: String,
val eosCommitInterval: FiniteDuration,
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
val producerFactory: ProducerSettings[K, V] => Producer[K, V]
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]
) {

@deprecated(
"Use createKafkaProducer(), createKafkaProducerAsync(), or createKafkaProducerCompletionStage() to get a new KafkaProducer",
"1.1.1"
)
def producerFactory: ProducerSettings[K, V] => Producer[K, V] = _ => createKafkaProducer()

/**
* A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
*/
Expand Down Expand Up @@ -217,18 +225,25 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
copy(properties = properties.updated(key, value))

/**
* Duration to wait for `KafkaConsumer.close` to finish.
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout)

/**
* Java API:
* Duration to wait for `KafkaConsumer.close` to finish.
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout.asScala)

/**
* Call `KafkaProducer.close` on the [[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage
* receives a shutdown signal.
*/
def withCloseProducerOnStop(closeProducerOnStop: Boolean): ProducerSettings[K, V] =
copy(closeProducerOnStop = closeProducerOnStop)

/**
* Tuning parameter of how many sends that can run in parallel.
*/
Expand Down Expand Up @@ -275,12 +290,20 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
): ProducerSettings[K, V] =
copy(enrichAsync = Some((s: ProducerSettings[K, V]) => value.apply(s).toScala))

/**
* Replaces the default Kafka producer creation logic with an external producer. This will also set
* `closeProducerOnStop = false` by default.
*/
def withProducer(
producer: Producer[K, V]
): ProducerSettings[K, V] = copy(producerFactorySync = Some(_ => producer), closeProducerOnStop = false)

/**
* Replaces the default Kafka producer creation logic.
*/
def withProducerFactory(
factory: ProducerSettings[K, V] => Producer[K, V]
): ProducerSettings[K, V] = copy(producerFactory = factory)
): ProducerSettings[K, V] = copy(producerFactorySync = Some(factory))

/**
* Get the Kafka producer settings as map.
Expand All @@ -292,33 +315,36 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
keySerializer: Option[Serializer[K]] = keySerializerOpt,
valueSerializer: Option[Serializer[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
closeProducerOnStop: Boolean = closeProducerOnStop,
parallelism: Int = parallelism,
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval,
enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync,
producerFactory: ProducerSettings[K, V] => Producer[K, V] = producerFactory
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties,
keySerializer,
valueSerializer,
closeTimeout,
closeProducerOnStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync,
producerFactory)
producerFactorySync)

override def toString: String =
"akka.kafka.ProducerSettings(" +
s"properties=${properties.mkString(",")}," +
s"keySerializer=$keySerializerOpt," +
s"valueSerializer=$valueSerializerOpt," +
s"closeTimeout=${closeTimeout.toCoarsest}," +
s"closeProducerOnStop=$closeProducerOnStop," +
s"parallelism=$parallelism," +
s"dispatcher=$dispatcher," +
s"eosCommitInterval=${eosCommitInterval.toCoarsest}" +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}" +
")"
s"eosCommitInterval=${eosCommitInterval.toCoarsest}," +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," +
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})"

/**
* Applies `enrichAsync` to complement these settings from asynchronous sources.
Expand All @@ -339,7 +365,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
"Asynchronous settings enrichment is set via `withEnrichAsync` or `withEnrichCompletionStage`, you must use `createKafkaProducerAsync` or `createKafkaProducerCompletionStage` to apply it"
)
} else {
producerFactory.apply(this)
producerFactorySync match {
case Some(factory) => factory.apply(this)
case _ => ProducerSettings.createKafkaProducer(this)
}
}

/**
Expand All @@ -349,7 +378,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* (without blocking for `enriched`).
*/
def createKafkaProducerAsync()(implicit executionContext: ExecutionContext): Future[Producer[K, V]] =
enriched.map(producerFactory)
producerFactorySync match {
case Some(factory) => enriched.map(factory)
case _ => enriched.map(ProducerSettings.createKafkaProducer)
}

/**
* Java API.
Expand All @@ -360,6 +392,5 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* @param executor Executor for asynchronous producer creation
*/
def createKafkaProducerCompletionStage(executor: Executor): CompletionStage[Producer[K, V]] =
enriched.map(producerFactory)(ExecutionContext.fromExecutor(executor)).toJava

createKafkaProducerAsync()(ExecutionContext.fromExecutor(executor)).toJava
}
17 changes: 7 additions & 10 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ProducerMessage._
import akka.kafka.ProducerSettings
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
Expand All @@ -19,7 +20,6 @@ import akka.stream.stage._
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand All @@ -28,14 +28,12 @@ import scala.util.{Failure, Success, Try}
*/
@InternalApi
private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val producerProvider: ExecutionContext => Future[Producer[K, V]]
val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[IN, Future[OUT]]]
with ProducerStage[K, V, P, IN, OUT] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new DefaultProducerStageLogic(this, producerProvider, inheritedAttributes)
new DefaultProducerStageLogic(this, inheritedAttributes)
}

/**
Expand All @@ -45,7 +43,6 @@ private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT
*/
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
stage: ProducerStage[K, V, P, IN, OUT],
producerFactory: ExecutionContext => Future[Producer[K, V]],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
Expand All @@ -68,7 +65,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:

override def preStart(): Unit = {
super.preStart()
val producerFuture = producerFactory(materializer.executionContext)
val producerFuture = stage.settings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
Expand Down Expand Up @@ -190,7 +187,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
else
decider(exception) match {
case Supervision.Stop =>
if (stage.closeProducerOnStop) {
if (stage.settings.closeProducerOnStop) {
producer.close(0, TimeUnit.MILLISECONDS)
}
failStageCb.invoke(exception)
Expand All @@ -205,11 +202,11 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override def postStop(): Unit = {
log.debug("Stage completed")

if (stage.closeProducerOnStop && producer != null) {
if (stage.settings.closeProducerOnStop && producer != null) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(stage.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/akka/kafka/internal/ProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import java.util.concurrent.atomic.AtomicInteger

import akka.annotation.InternalApi
import akka.kafka.ProducerMessage._
import akka.kafka.ProducerSettings
import akka.stream._
import org.apache.kafka.clients.producer.Producer

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.concurrent.Future

/**
* INTERNAL API
Expand All @@ -22,9 +21,7 @@ import scala.concurrent.duration._
*/
@InternalApi
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]] {
val closeTimeout: FiniteDuration
val closeProducerOnStop: Boolean
val producerProvider: ExecutionContext => Future[Producer[K, V]]
val settings: ProducerSettings[K, V]

val in: Inlet[IN] = Inlet[IN]("messages")
val out: Outlet[Future[OUT]] = Outlet[Future[OUT]]("result")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
*/

package akka.kafka.internal

import akka.Done
import akka.annotation.InternalApi
import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker}
import akka.kafka.ProducerMessage.{Envelope, Results}
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.{Attributes, FlowShape}
import akka.kafka.{ConsumerMessage, ProducerSettings}
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.TopicPartition

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

Expand All @@ -26,15 +26,12 @@ import scala.jdk.CollectionConverters._
*/
@InternalApi
private[kafka] final class TransactionalProducerStage[K, V, P](
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val producerProvider: ExecutionContext => Future[Producer[K, V]],
commitInterval: FiniteDuration
val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]]
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TransactionalProducerStageLogic(this, producerProvider, inheritedAttributes, commitInterval)
new TransactionalProducerStageLogic(this, inheritedAttributes)
}

/** Internal API */
Expand Down Expand Up @@ -100,12 +97,8 @@ private object TransactionalProducerStage {
*/
private final class TransactionalProducerStageLogic[K, V, P](
stage: TransactionalProducerStage[K, V, P],
producerFactory: ExecutionContext => Future[Producer[K, V]],
inheritedAttributes: Attributes,
commitInterval: FiniteDuration
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage,
producerFactory,
inheritedAttributes)
inheritedAttributes: Attributes
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, inheritedAttributes)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {
Expand All @@ -128,7 +121,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
initTransactions()
beginTransaction()
resumeDemand()
scheduleOnce(commitSchedulerKey, commitInterval)
scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}

// suspend demand until a Producer has been created
Expand Down Expand Up @@ -158,7 +151,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
case _ =>
scheduleOnce(commitSchedulerKey, commitInterval)
scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}
}

Expand Down Expand Up @@ -201,7 +194,7 @@ private final class TransactionalProducerStageLogic[K, V, P](

val onInternalCommitAckCb: AsyncCallback[Unit] = {
getAsyncCallback[Unit](
_ => scheduleOnce(commitSchedulerKey, commitInterval)
_ => scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/javadsl/Committer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.{Done, NotUsed}
import akka.kafka.ConsumerMessage.{Committable, CommittableOffset, CommittableOffsetBatch}
import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch}
import akka.kafka.{scaladsl, CommitterSettings}
import akka.stream.javadsl.{Flow, FlowWithContext, Sink}

Expand Down
Loading

0 comments on commit f6fe343

Please sign in to comment.