diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf
index 2a33bbb5e..9affeb40e 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -22,6 +22,10 @@ akka.kafka.producer {
# Duration to wait for `KafkaProducer.close` to finish.
close-timeout = 60s
+ # Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
+ # when the producer instance is shared across multiple producer stages.
+ close-on-producer-stop = true
+
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala
index 36368081f..e6313b460 100644
--- a/core/src/main/scala/akka/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala
@@ -61,6 +61,7 @@ object ProducerSettings {
"Value serializer should be defined or declared in configuration"
)
val closeTimeout = config.getDuration("close-timeout").asScala
+ val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
@@ -69,11 +70,12 @@ object ProducerSettings {
keySerializer,
valueSerializer,
closeTimeout,
+ closeOnProducerStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync = None,
- ProducerSettings.createKafkaProducer
+ producerFactorySync = None
)
}
@@ -156,7 +158,6 @@ object ProducerSettings {
new KafkaProducer[K, V](settings.getProperties,
settings.keySerializerOpt.orNull,
settings.valueSerializerOpt.orNull)
-
}
/**
@@ -172,13 +173,20 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val keySerializerOpt: Option[Serializer[K]],
val valueSerializerOpt: Option[Serializer[V]],
val closeTimeout: FiniteDuration,
+ val closeProducerOnStop: Boolean,
val parallelism: Int,
val dispatcher: String,
val eosCommitInterval: FiniteDuration,
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
- val producerFactory: ProducerSettings[K, V] => Producer[K, V]
+ val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]
) {
+ @deprecated(
+ "Use createKafkaProducer(), createKafkaProducerAsync(), or createKafkaProducerCompletionStage() to get a new KafkaProducer",
+ "1.1.1"
+ )
+ def producerFactory: ProducerSettings[K, V] => Producer[K, V] = _ => createKafkaProducer()
+
/**
* A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
*/
@@ -217,18 +225,25 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
copy(properties = properties.updated(key, value))
/**
- * Duration to wait for `KafkaConsumer.close` to finish.
+ * Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout)
/**
* Java API:
- * Duration to wait for `KafkaConsumer.close` to finish.
+ * Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout.asScala)
+ /**
+ * Call `KafkaProducer.close` on the [[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage
+ * receives a shutdown signal.
+ */
+ def withCloseProducerOnStop(closeProducerOnStop: Boolean): ProducerSettings[K, V] =
+ copy(closeProducerOnStop = closeProducerOnStop)
+
/**
* Tuning parameter of how many sends that can run in parallel.
*/
@@ -275,12 +290,20 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
): ProducerSettings[K, V] =
copy(enrichAsync = Some((s: ProducerSettings[K, V]) => value.apply(s).toScala))
+ /**
+ * Replaces the default Kafka producer creation logic with an external producer. This will also set
+ * `closeProducerOnStop = false` by default.
+ */
+ def withProducer(
+ producer: Producer[K, V]
+ ): ProducerSettings[K, V] = copy(producerFactorySync = Some(_ => producer), closeProducerOnStop = false)
+
/**
* Replaces the default Kafka producer creation logic.
*/
def withProducerFactory(
factory: ProducerSettings[K, V] => Producer[K, V]
- ): ProducerSettings[K, V] = copy(producerFactory = factory)
+ ): ProducerSettings[K, V] = copy(producerFactorySync = Some(factory))
/**
* Get the Kafka producer settings as map.
@@ -292,21 +315,23 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
keySerializer: Option[Serializer[K]] = keySerializerOpt,
valueSerializer: Option[Serializer[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
+ closeProducerOnStop: Boolean = closeProducerOnStop,
parallelism: Int = parallelism,
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval,
enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync,
- producerFactory: ProducerSettings[K, V] => Producer[K, V] = producerFactory
+ producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties,
keySerializer,
valueSerializer,
closeTimeout,
+ closeProducerOnStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync,
- producerFactory)
+ producerFactorySync)
override def toString: String =
"akka.kafka.ProducerSettings(" +
@@ -314,11 +339,12 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
s"keySerializer=$keySerializerOpt," +
s"valueSerializer=$valueSerializerOpt," +
s"closeTimeout=${closeTimeout.toCoarsest}," +
+ s"closeProducerOnStop=$closeProducerOnStop," +
s"parallelism=$parallelism," +
s"dispatcher=$dispatcher," +
- s"eosCommitInterval=${eosCommitInterval.toCoarsest}" +
- s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}" +
- ")"
+ s"eosCommitInterval=${eosCommitInterval.toCoarsest}," +
+ s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," +
+ s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})"
/**
* Applies `enrichAsync` to complement these settings from asynchronous sources.
@@ -339,7 +365,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
"Asynchronous settings enrichment is set via `withEnrichAsync` or `withEnrichCompletionStage`, you must use `createKafkaProducerAsync` or `createKafkaProducerCompletionStage` to apply it"
)
} else {
- producerFactory.apply(this)
+ producerFactorySync match {
+ case Some(factory) => factory.apply(this)
+ case _ => ProducerSettings.createKafkaProducer(this)
+ }
}
/**
@@ -349,7 +378,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* (without blocking for `enriched`).
*/
def createKafkaProducerAsync()(implicit executionContext: ExecutionContext): Future[Producer[K, V]] =
- enriched.map(producerFactory)
+ producerFactorySync match {
+ case Some(factory) => enriched.map(factory)
+ case _ => enriched.map(ProducerSettings.createKafkaProducer)
+ }
/**
* Java API.
@@ -360,6 +392,5 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* @param executor Executor for asynchronous producer creation
*/
def createKafkaProducerCompletionStage(executor: Executor): CompletionStage[Producer[K, V]] =
- enriched.map(producerFactory)(ExecutionContext.fromExecutor(executor)).toJava
-
+ createKafkaProducerAsync()(ExecutionContext.fromExecutor(executor)).toJava
}
diff --git a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
index b34b439e7..bb44a09f8 100644
--- a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
+++ b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
@@ -11,6 +11,7 @@ import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ProducerMessage._
+import akka.kafka.ProducerSettings
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
@@ -19,7 +20,6 @@ import akka.stream.stage._
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}
import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
@@ -28,14 +28,12 @@ import scala.util.{Failure, Success, Try}
*/
@InternalApi
private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
- val closeTimeout: FiniteDuration,
- val closeProducerOnStop: Boolean,
- val producerProvider: ExecutionContext => Future[Producer[K, V]]
+ val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[IN, Future[OUT]]]
with ProducerStage[K, V, P, IN, OUT] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new DefaultProducerStageLogic(this, producerProvider, inheritedAttributes)
+ new DefaultProducerStageLogic(this, inheritedAttributes)
}
/**
@@ -45,7 +43,6 @@ private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT
*/
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
stage: ProducerStage[K, V, P, IN, OUT],
- producerFactory: ExecutionContext => Future[Producer[K, V]],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
@@ -68,7 +65,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override def preStart(): Unit = {
super.preStart()
- val producerFuture = producerFactory(materializer.executionContext)
+ val producerFuture = stage.settings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
@@ -190,7 +187,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
else
decider(exception) match {
case Supervision.Stop =>
- if (stage.closeProducerOnStop) {
+ if (stage.settings.closeProducerOnStop) {
producer.close(0, TimeUnit.MILLISECONDS)
}
failStageCb.invoke(exception)
@@ -205,11 +202,11 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override def postStop(): Unit = {
log.debug("Stage completed")
- if (stage.closeProducerOnStop && producer != null) {
+ if (stage.settings.closeProducerOnStop && producer != null) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
- producer.close(stage.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
+ producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
diff --git a/core/src/main/scala/akka/kafka/internal/ProducerStage.scala b/core/src/main/scala/akka/kafka/internal/ProducerStage.scala
index 1f9fdc912..7f651bc54 100644
--- a/core/src/main/scala/akka/kafka/internal/ProducerStage.scala
+++ b/core/src/main/scala/akka/kafka/internal/ProducerStage.scala
@@ -9,11 +9,10 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.annotation.InternalApi
import akka.kafka.ProducerMessage._
+import akka.kafka.ProducerSettings
import akka.stream._
-import org.apache.kafka.clients.producer.Producer
-import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration._
+import scala.concurrent.Future
/**
* INTERNAL API
@@ -22,9 +21,7 @@ import scala.concurrent.duration._
*/
@InternalApi
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]] {
- val closeTimeout: FiniteDuration
- val closeProducerOnStop: Boolean
- val producerProvider: ExecutionContext => Future[Producer[K, V]]
+ val settings: ProducerSettings[K, V]
val in: Inlet[IN] = Inlet[IN]("messages")
val out: Outlet[Future[OUT]] = Outlet[Future[OUT]]("result")
diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
index 4df96bcaa..00268d6e7 100644
--- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
+++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
@@ -4,20 +4,20 @@
*/
package akka.kafka.internal
+
import akka.Done
import akka.annotation.InternalApi
-import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker}
import akka.kafka.ProducerMessage.{Envelope, Results}
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
-import akka.stream.{Attributes, FlowShape}
+import akka.kafka.{ConsumerMessage, ProducerSettings}
import akka.stream.stage._
+import akka.stream.{Attributes, FlowShape}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.TopicPartition
-import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -26,15 +26,12 @@ import scala.jdk.CollectionConverters._
*/
@InternalApi
private[kafka] final class TransactionalProducerStage[K, V, P](
- val closeTimeout: FiniteDuration,
- val closeProducerOnStop: Boolean,
- val producerProvider: ExecutionContext => Future[Producer[K, V]],
- commitInterval: FiniteDuration
+ val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]]
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new TransactionalProducerStageLogic(this, producerProvider, inheritedAttributes, commitInterval)
+ new TransactionalProducerStageLogic(this, inheritedAttributes)
}
/** Internal API */
@@ -100,12 +97,8 @@ private object TransactionalProducerStage {
*/
private final class TransactionalProducerStageLogic[K, V, P](
stage: TransactionalProducerStage[K, V, P],
- producerFactory: ExecutionContext => Future[Producer[K, V]],
- inheritedAttributes: Attributes,
- commitInterval: FiniteDuration
-) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage,
- producerFactory,
- inheritedAttributes)
+ inheritedAttributes: Attributes
+) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, inheritedAttributes)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {
@@ -128,7 +121,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
initTransactions()
beginTransaction()
resumeDemand()
- scheduleOnce(commitSchedulerKey, commitInterval)
+ scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}
// suspend demand until a Producer has been created
@@ -158,7 +151,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
case _ =>
- scheduleOnce(commitSchedulerKey, commitInterval)
+ scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}
}
@@ -201,7 +194,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
val onInternalCommitAckCb: AsyncCallback[Unit] = {
getAsyncCallback[Unit](
- _ => scheduleOnce(commitSchedulerKey, commitInterval)
+ _ => scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
)
}
diff --git a/core/src/main/scala/akka/kafka/javadsl/Committer.scala b/core/src/main/scala/akka/kafka/javadsl/Committer.scala
index 5fd460ef2..42467d231 100644
--- a/core/src/main/scala/akka/kafka/javadsl/Committer.scala
+++ b/core/src/main/scala/akka/kafka/javadsl/Committer.scala
@@ -9,7 +9,7 @@ import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.{Done, NotUsed}
-import akka.kafka.ConsumerMessage.{Committable, CommittableOffset, CommittableOffsetBatch}
+import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch}
import akka.kafka.{scaladsl, CommitterSettings}
import akka.stream.javadsl.{Flow, FlowWithContext, Sink}
diff --git a/core/src/main/scala/akka/kafka/javadsl/Producer.scala b/core/src/main/scala/akka/kafka/javadsl/Producer.scala
index c251c2a70..acac4e5a2 100644
--- a/core/src/main/scala/akka/kafka/javadsl/Producer.scala
+++ b/core/src/main/scala/akka/kafka/javadsl/Producer.scala
@@ -41,7 +41,10 @@ object Producer {
* partition number, and an optional key and value.
*
* Supports sharing a Kafka Producer instance.
+ *
+ * @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since 1.1.1
*/
+ @Deprecated
def plainSink[K, V](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]
@@ -51,23 +54,6 @@ object Producer {
.mapMaterializedValue(_.toJava)
.asJava
- /**
- * Create a sink for publishing records to Kafka topics.
- *
- * The [[org.apache.kafka.clients.producer.ProducerRecord Kafka ProducerRecord]] contains the topic name to which the record is being sent, an optional
- * partition number, and an optional key and value.
- *
- * Supports sharing a Kafka Producer instance provided by a `CompletionStage`
- */
- def plainSink[K, V](
- settings: ProducerSettings[K, V],
- producer: CompletionStage[org.apache.kafka.clients.producer.Producer[K, V]]
- ): Sink[ProducerRecord[K, V], CompletionStage[Done]] =
- scaladsl.Producer
- .plainSink(settings, producer.toScala)
- .mapMaterializedValue(_.toJava)
- .asJava
-
/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
@@ -166,7 +152,10 @@ object Producer {
* committing, so it is "at-least once delivery" semantics.
*
* Uses a shared a Kafka Producer instance.
+ *
+ * @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since 1.1.1
*/
+ @Deprecated
def committableSink[K, V, IN <: Envelope[K, V, ConsumerMessage.Committable]](
producerSettings: ProducerSettings[K, V],
committerSettings: CommitterSettings,
@@ -222,7 +211,10 @@ object Producer {
* committing, so it is "at-least once delivery" semantics.
*
* Uses a shared a Kafka Producer instance.
+ *
+ * @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since 1.1.1
*/
+ @Deprecated
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
def committableSinkWithOffsetContext[K, V, IN <: Envelope[K, V, _], C <: Committable](
producerSettings: ProducerSettings[K, V],
@@ -341,7 +333,10 @@ object Producer {
* be committed later in the flow.
*
* Supports sharing a Kafka Producer instance.
+ *
+ * @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since 1.1.1
*/
+ @Deprecated
def flexiFlow[K, V, PassThrough](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]
@@ -351,32 +346,6 @@ object Producer {
.asJava
.asInstanceOf[Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed]]
- /**
- * Create a flow to conditionally publish records to Kafka topics and then pass it on.
- *
- * It publishes records to Kafka topics conditionally:
- *
- * - [[akka.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and continues in the stream as [[akka.kafka.ProducerMessage.Result Result]]
- *
- * - [[akka.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and continues in the stream as [[akka.kafka.ProducerMessage.MultiResult MultiResult]]
- *
- * - [[akka.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[akka.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
- *
- * The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
- * or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
- * be committed later in the flow.
- *
- * Supports sharing a Kafka Producer instance provided by a `CompletionStage`.
- */
- def flexiFlow[K, V, PassThrough](
- settings: ProducerSettings[K, V],
- producer: CompletionStage[org.apache.kafka.clients.producer.Producer[K, V]]
- ): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] =
- scaladsl.Producer
- .flexiFlow(settings, producer.toScala)
- .asJava
- .asInstanceOf[Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed]]
-
/**
* API MAY CHANGE
*
@@ -395,7 +364,10 @@ object Producer {
* Supports sharing a Kafka Producer instance.
*
* @tparam C the flow context type
+ *
+ * @deprecated Pass in external or shared producer using `ProducerSettings.withProducerFactory` or `ProducerSettings.withProducer`, since 1.1.1
*/
+ @Deprecated
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
def flowWithContext[K, V, C](
settings: ProducerSettings[K, V],
diff --git a/core/src/main/scala/akka/kafka/scaladsl/Committer.scala b/core/src/main/scala/akka/kafka/scaladsl/Committer.scala
index 8e5ef8a16..3f71980c5 100644
--- a/core/src/main/scala/akka/kafka/scaladsl/Committer.scala
+++ b/core/src/main/scala/akka/kafka/scaladsl/Committer.scala
@@ -9,7 +9,7 @@ import akka.dispatch.ExecutionContexts
import akka.annotation.ApiMayChange
import akka.{Done, NotUsed}
import akka.kafka.CommitterSettings
-import akka.kafka.ConsumerMessage.{Committable, CommittableOffset, CommittableOffsetBatch}
+import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch}
import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, Sink}
import scala.concurrent.Future
diff --git a/core/src/main/scala/akka/kafka/scaladsl/Producer.scala b/core/src/main/scala/akka/kafka/scaladsl/Producer.scala
index d4ff8bec8..00a290bc7 100644
--- a/core/src/main/scala/akka/kafka/scaladsl/Producer.scala
+++ b/core/src/main/scala/akka/kafka/scaladsl/Producer.scala
@@ -15,7 +15,7 @@ import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.kafka.clients.producer.ProducerRecord
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.Future
/**
* Akka Stream connector for publishing messages to Kafka topics.
@@ -42,22 +42,13 @@ object Producer {
*
* Supports sharing a Kafka Producer instance.
*/
+ @deprecated(
+ "Pass in external or shared producer using ProducerSettings.withProducerFactory or ProducerSettings.withProducer",
+ "1.1.1"
+ )
def plainSink[K, V](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]
- ): Sink[ProducerRecord[K, V], Future[Done]] = plainSink(settings, Future.successful(producer))
-
- /**
- * Create a sink for publishing records to Kafka topics.
- *
- * The [[org.apache.kafka.clients.producer.ProducerRecord Kafka ProducerRecord]] contains the topic name to which the record is being sent, an optional
- * partition number, and an optional key and value.
- *
- * Supports sharing a Kafka Producer instance provided by a future.
- */
- def plainSink[K, V](
- settings: ProducerSettings[K, V],
- producer: Future[org.apache.kafka.clients.producer.Producer[K, V]]
): Sink[ProducerRecord[K, V], Future[Done]] =
Flow[ProducerRecord[K, V]]
.map(Message(_, NotUsed))
@@ -161,7 +152,7 @@ object Producer {
committerSettings: CommitterSettings,
producer: org.apache.kafka.clients.producer.Producer[K, V]
): Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
- flexiFlow[K, V, ConsumerMessage.Committable](producerSettings, producer)
+ flexiFlow[K, V, ConsumerMessage.Committable](producerSettings.withProducer(producer))
.map(_.passThrough)
.toMat(Committer.sink(committerSettings))(Keep.right)
@@ -238,9 +229,7 @@ object Producer {
val flow = Flow
.fromGraph(
new DefaultProducerStage[K, V, PassThrough, Message[K, V, PassThrough], Result[K, V, PassThrough]](
- settings.closeTimeout,
- closeProducerOnStop = true,
- producerProvider = (ec: ExecutionContext) => settings.createKafkaProducerAsync()(ec)
+ settings
)
)
.mapAsync(settings.parallelism)(identity)
@@ -269,9 +258,7 @@ object Producer {
val flow = Flow
.fromGraph(
new DefaultProducerStage[K, V, PassThrough, Envelope[K, V, PassThrough], Results[K, V, PassThrough]](
- settings.closeTimeout,
- closeProducerOnStop = true,
- producerProvider = (ec: ExecutionContext) => settings.createKafkaProducerAsync()(ec)
+ settings
)
)
.mapAsync(settings.parallelism)(identity)
@@ -321,12 +308,13 @@ object Producer {
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]
): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = {
+ val settingsWithProducer = settings
+ .withProducer(producer)
+ .withCloseProducerOnStop(false)
val flow = Flow
.fromGraph(
new DefaultProducerStage[K, V, PassThrough, Message[K, V, PassThrough], Result[K, V, PassThrough]](
- closeTimeout = settings.closeTimeout,
- closeProducerOnStop = false,
- producerProvider = (_: ExecutionContext) => Future.successful(producer)
+ settingsWithProducer
)
)
.mapAsync(settings.parallelism)(identity)
@@ -351,39 +339,21 @@ object Producer {
*
* Supports sharing a Kafka Producer instance.
*/
+ @deprecated(
+ "Pass in external or shared producer using ProducerSettings.withProducerFactory or ProducerSettings.withProducer",
+ "1.1.1"
+ )
def flexiFlow[K, V, PassThrough](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]
- ): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] =
- flexiFlow(settings, Future.successful(producer))
-
- /**
- * Create a flow to conditionally publish records to Kafka topics and then pass it on.
- *
- * It publishes records to Kafka topics conditionally:
- *
- * - [[akka.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and continues in the stream as [[akka.kafka.ProducerMessage.Result Result]]
- *
- * - [[akka.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and continues in the stream as [[akka.kafka.ProducerMessage.MultiResult MultiResult]]
- *
- * - [[akka.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[akka.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
- *
- * The messages support the possibility to pass through arbitrary data, which can for example be a [[ConsumerMessage.CommittableOffset CommittableOffset]]
- * or [[ConsumerMessage.CommittableOffsetBatch CommittableOffsetBatch]] that can
- * be committed later in the flow.
- *
- * Supports sharing a Kafka Producer instance provided from a future.
- */
- def flexiFlow[K, V, PassThrough](
- settings: ProducerSettings[K, V],
- producer: Future[org.apache.kafka.clients.producer.Producer[K, V]]
): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] = {
+ val settingsWithProducer = settings
+ .withProducer(producer)
+ .withCloseProducerOnStop(false)
val flow = Flow
.fromGraph(
new DefaultProducerStage[K, V, PassThrough, Envelope[K, V, PassThrough], Results[K, V, PassThrough]](
- closeTimeout = settings.closeTimeout,
- closeProducerOnStop = false,
- producerProvider = (_: ExecutionContext) => producer
+ settingsWithProducer
)
)
.mapAsync(settings.parallelism)(identity)
@@ -410,6 +380,10 @@ object Producer {
*
* @tparam C the flow context type
*/
+ @deprecated(
+ "Pass in external or shared producer using ProducerSettings.withProducerFactory or ProducerSettings.withProducer",
+ "1.1.1"
+ )
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
def flowWithContext[K, V, C](
settings: ProducerSettings[K, V],
diff --git a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala
index 0e7d2f841..a02c4caf5 100644
--- a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala
+++ b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala
@@ -17,7 +17,7 @@ import akka.{Done, NotUsed}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerConfig
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.Future
/**
* Akka Stream connector to support transactions between Kafka topics.
@@ -86,6 +86,7 @@ object Transactional {
transactionalId: String
): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = {
require(transactionalId != null && transactionalId.length > 0, "You must define a Transactional id.")
+ require(settings.producerFactorySync.isEmpty, "You cannot use a shared or external producer factory.")
val txSettings = settings.withProperties(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString,
@@ -95,12 +96,7 @@ object Transactional {
val flow = Flow
.fromGraph(
- new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](
- txSettings.closeTimeout,
- closeProducerOnStop = true,
- producerProvider = (ec: ExecutionContext) => txSettings.createKafkaProducerAsync()(ec),
- settings.eosCommitInterval
- )
+ new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](txSettings)
)
.mapAsync(txSettings.parallelism)(identity)
diff --git a/docs/src/main/paradox/producer.md b/docs/src/main/paradox/producer.md
index 4f3e3ef67..a65313cb1 100644
--- a/docs/src/main/paradox/producer.md
+++ b/docs/src/main/paradox/producer.md
@@ -15,33 +15,34 @@ Alpakka Kafka offers producer flows and sinks that connect to Kafka and write da
These factory methods are part of the @scala[@scaladoc[Producer API](akka.kafka.scaladsl.Producer$)]@java[@scaladoc[Producer API](akka.kafka.javadsl.Producer$)].
-| Shared producer | Factory method | Stream element type | Pass-through | Context |
-|-----------------|-------------------|---------------------|--------------|---------|
-| Available | `plainSink` | `ProducerRecord` | N/A | N/A |
-| Available | `flexiFlow` | `Envelope` | Any | N/A |
-| Available | `flowWithContext` | `Envelope` | No | Any |
+| Factory method | May use shared producer | Stream element type | Pass-through | Context |
+|-------------------|-------------------------|---------------------|--------------|---------|
+| `plainSink` | Yes | `ProducerRecord` | N/A | N/A |
+| `flexiFlow` | Yes | `Envelope` | Any | N/A |
+| `flowWithContext` | Yes | `Envelope` | No | Any |
### Committing producer sinks
These producers produce messages to Kafka and commit the offsets of incoming messages regularly.
-| Shared producer | Factory method | Stream element type | Pass-through | Context |
-|-----------------|------------------------------------|---------------------|---------------|---------------|
-| Available | `committableSink` | `Envelope` | `Committable` | N/A |
-| Available | `committableSinkWithOffsetContext` | `Envelope` | Any | `Committable` |
+| Factory method | May use shared producer | Stream element type | Pass-through | Context |
+|------------------------------------|-------------------------|---------------------|---------------|---------------|
+| `committableSink` | Yes | `Envelope` | `Committable` | N/A |
+| `committableSinkWithOffsetContext` | Yes | `Envelope` | Any | `Committable` |
For details about the batched committing see @ref:[Consumer: Offset Storage in Kafka - committing](consumer.md#offset-storage-in-kafka-committing).
### Transactional producers
These factory methods are part of the @scala[@scaladoc[Transactional API](akka.kafka.scaladsl.Transactional$)]@java[@scaladoc[Transactional API](akka.kafka.javadsl.Transactional$)]. For details see @ref[Transactions](transactions.md).
+Alpakka Kafka must manage the producer when using transactions.
-| Shared producer | Factory method | Stream element type | Pass-through |
-|-----------------|-------------------------|---------------------|--------------|
-| No | `sink` | `Envelope` | N/A |
-| No | `flow` | `Envelope` | No |
-| No | `sinkWithOffsetContext` | `Envelope` | N/A |
-| No | `flowWithOffsetContext` | `Envelope` | No |
+| Factory method | May use shared producer | Stream element type | Pass-through |
+|-------------------------|-------------------------|---------------------|--------------|
+| `sink` | No | `Envelope` | N/A |
+| `flow` | No | `Envelope` | No |
+| `sinkWithOffsetContext` | No | `Envelope` | N/A |
+| `flowWithOffsetContext` | No | `Envelope` | No |
## Settings
@@ -171,6 +172,7 @@ Java
## Sharing the KafkaProducer instance
The underlying `KafkaProducer` (@javadoc[Kafka API](org.apache.kafka.clients.producer.KafkaProducer)) is thread safe and sharing a single producer instance across streams will generally be faster than having multiple instances.
+You cannot share `KafkaProducer` with the Transactional flows and sinks.
To create a `KafkaProducer` from the Kafka connector settings described [above](#settings), the `ProducerSettings` contains the factory methods @scala[`createKafkaProducerAsync`]@java[`createKafkaProducerCompletionStage`] and `createKafkaProducer` (blocking for asynchronous enriching).
@@ -180,7 +182,7 @@ Scala
Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerExampleTest.java) { #producer }
-The `KafkaProducer` instance (or @scala[Future]@java[CompletionStage]) is passed as a parameter to the `Producer` factory methods.
+The `KafkaProducer` instance (or @scala[Future]@java[CompletionStage]) is passed as a parameter to `ProducerSettings` (@scaladoc[API](akka.kafka.ProducerSettings)) using the methods `withProducer` and `withProducerFactory`.
Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #plainSinkWithProducer }
diff --git a/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala b/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala
index f4c55f84d..5e9b04a56 100644
--- a/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala
+++ b/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -146,7 +146,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A
// using a hash of the key. If neither key nor partition is present a partition
// will be assigned in a round-robin fashion.
.map(n => new ProducerRecord(topic, partition, DefaultKey, n))
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
/**
* Produce messages to topic using specified range and return
@@ -162,7 +162,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A
.map {
case (n, ts) => new ProducerRecord(topic, partition0, ts, DefaultKey, n.toString)
}
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
/**
* Produce batches over several topics.
diff --git a/tests/src/it/resources/logback-test.xml b/tests/src/it/resources/logback-test.xml
index 3962e8b15..742a2ebf1 100644
--- a/tests/src/it/resources/logback-test.xml
+++ b/tests/src/it/resources/logback-test.xml
@@ -14,6 +14,7 @@
+
diff --git a/tests/src/test/java/docs/javadsl/ProducerExampleTest.java b/tests/src/test/java/docs/javadsl/ProducerExampleTest.java
index 702d3acf5..6a24f6e3c 100644
--- a/tests/src/test/java/docs/javadsl/ProducerExampleTest.java
+++ b/tests/src/test/java/docs/javadsl/ProducerExampleTest.java
@@ -105,14 +105,17 @@ void plainSink() throws Exception {
void plainSinkWithSharedProducer() throws Exception {
String topic = createTopic();
// #plainSinkWithProducer
- final CompletionStage>
- kafkaProducer = producerSettings.createKafkaProducerCompletionStage(executor);
+ // create a producer
+ final org.apache.kafka.clients.producer.Producer
+ kafkaProducer = producerSettings.createKafkaProducer();
+ final ProducerSettings settingsWithProducer =
+ producerSettings.withProducer(kafkaProducer);
CompletionStage done =
Source.range(1, 100)
.map(number -> number.toString())
.map(value -> new ProducerRecord(topic, value))
- .runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
+ .runWith(Producer.plainSink(settingsWithProducer), materializer);
// #plainSinkWithProducer
Consumer.DrainingControl>> control =
@@ -126,7 +129,7 @@ void plainSinkWithSharedProducer() throws Exception {
// #plainSinkWithProducer
// close the producer after use
- system.registerOnTermination(() -> kafkaProducer.thenAccept(p -> p.close()));
+ kafkaProducer.close();
// #plainSinkWithProducer
}
diff --git a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala
index e2c6c017c..b507dc2b0 100644
--- a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala
@@ -86,43 +86,57 @@ class ProducerSpec(_system: ActorSystem)
}
val settings =
- ProducerSettings(system, new StringSerializer, new StringSerializer).withEosCommitInterval(10.milliseconds)
+ ProducerSettings(system, new StringSerializer, new StringSerializer)
+ .withEosCommitInterval(10.milliseconds)
def testProducerFlow[P](mock: ProducerMock[K, V],
- closeOnStop: Boolean = true): Flow[Message[K, V, P], Result[K, V, P], NotUsed] =
+ closeOnStop: Boolean = true): Flow[Message[K, V, P], Result[K, V, P], NotUsed] = {
+ val pSettings = settings.withProducer(mock.mock).withCloseProducerOnStop(closeOnStop)
Flow
.fromGraph(
- new DefaultProducerStage[K, V, P, Message[K, V, P], Result[K, V, P]](settings.closeTimeout,
- closeOnStop,
- _ => Future.successful(mock.mock))
+ new DefaultProducerStage[K, V, P, Message[K, V, P], Result[K, V, P]](pSettings)
)
.mapAsync(1)(identity)
+ }
- def testTransactionProducerFlow[P](mock: ProducerMock[K, V],
- closeOnStop: Boolean = true): Flow[Envelope[K, V, P], Results[K, V, P], NotUsed] =
+ def testTransactionProducerFlow[P](
+ mock: ProducerMock[K, V],
+ closeOnStop: Boolean = true
+ ): Flow[Envelope[K, V, P], Results[K, V, P], NotUsed] = {
+ val pSettings = settings.withProducerFactory(_ => mock.mock).withCloseProducerOnStop(closeOnStop)
Flow
.fromGraph(
- new TransactionalProducerStage[K, V, P](settings.closeTimeout,
- closeOnStop,
- _ => Future.successful(mock.mock),
- settings.eosCommitInterval)
+ new TransactionalProducerStage[K, V, P](pSettings)
)
.mapAsync(1)(identity)
+ }
- "Producer" should "not send messages when source is empty" in {
+ "Producer" should "send one message and shutdown the producer gracefully" in {
assertAllStagesStopped {
- val client = new ProducerMock[K, V](ProducerMock.handlers.fail)
+ val input = recordAndMetadata(1)
+
+ val client = {
+ val inputMap = Map(input)
+ new ProducerMock[K, V](ProducerMock.handlers.delayedMap(100.millis)(x => Try { inputMap(x) }))
+ }
+ val committer = new CommittedMarkerMock
- val probe = Source
- .empty[Msg]
+ val (source, sink) = TestSource
+ .probe[TxMsg]
.via(testProducerFlow(client))
- .runWith(TestSink.probe)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
- probe
- .request(1)
- .expectComplete()
+ val txMsg = toTxMessage(input, committer.mock)
+ source.sendNext(txMsg)
+ sink.requestNext()
+
+ // we must wait for the producer to be asynchronously assigned before observing interactions with the mock
+ awaitAssert(client.verifySend(times(1)))
+
+ source.sendComplete()
+ sink.expectComplete()
- client.verifySend(never())
client.verifyClosed()
client.verifyNoMoreInteractions()
}
@@ -134,7 +148,7 @@ class ProducerSpec(_system: ActorSystem)
val mockProducer = new MockProducer[String, String](true, null, null)
- val fut: Future[Done] = Source(input).runWith(Producer.plainSink(settings, mockProducer))
+ val fut: Future[Done] = Source(input).runWith(Producer.plainSink(settings.withProducer(mockProducer)))
Await.result(fut, Duration.apply("1 second"))
mockProducer.close()
@@ -357,18 +371,29 @@ class ProducerSpec(_system: ActorSystem)
it should "initialize and begin a transaction when first run" in {
assertAllStagesStopped {
- val client = new ProducerMock[K, V](ProducerMock.handlers.fail)
+ val input = recordAndMetadata(1)
- val probe = Source
- .empty[Msg]
+ val client = {
+ val inputMap = Map(input)
+ new ProducerMock[K, V](ProducerMock.handlers.delayedMap(100.millis)(x => Try { inputMap(x) }))
+ }
+ val committer = new CommittedMarkerMock
+
+ val (source, sink) = TestSource
+ .probe[TxMsg]
.via(testTransactionProducerFlow(client))
- .runWith(TestSink.probe)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
- probe
- .request(1)
- .expectComplete()
+ val txMsg = toTxMessage(input, committer.mock)
+ source.sendNext(txMsg)
+ sink.requestNext()
+
+ // we must wait for the producer to be asynchronously assigned before observing interactions with the mock
+ awaitAssert(client.verifyTxInitialized())
- client.verifyTxInitialized()
+ source.sendComplete()
+ sink.expectComplete()
}
}
diff --git a/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala
index ef4999427..32899278c 100644
--- a/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala
+++ b/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala
@@ -113,7 +113,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside {
),
NotUsed)
}
- .via(Producer.flexiFlow(producerDefaults, testProducer))
+ .via(Producer.flexiFlow(producerDefaults.withProducer(testProducer)))
.runWith(Sink.ignore)
// Subscribe to the topic (without demand)
@@ -197,7 +197,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside {
),
NotUsed)
}
- .via(Producer.flexiFlow(producerDefaults, testProducer))
+ .via(Producer.flexiFlow(producerDefaults.withProducer(testProducer)))
.runWith(Sink.ignore)
// Subscribe to the topic (without demand)
@@ -571,5 +571,5 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside {
immutable.Seq(new ProducerRecord(topic, partition0, DefaultKey, n),
new ProducerRecord(topic, partition1, DefaultKey, n))
)
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
}
diff --git a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala
index f28a57402..6a9ca09ad 100644
--- a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala
@@ -85,7 +85,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside
def createAndRunProducer(elements: immutable.Iterable[Long]) =
Source(elements)
.map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString))
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
val control = createAndRunConsumer(subscription1)
diff --git a/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala
index df49b95c0..064c6841b 100644
--- a/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala
+++ b/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala
@@ -119,7 +119,7 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with
val producer = Source(1L to totalMessages)
.map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString))
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
producer.futureValue shouldBe Done
sleep(2.seconds)
@@ -200,7 +200,7 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with
number
}
.map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString))
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
producer.futureValue shouldBe Done
@@ -286,7 +286,7 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with
number
}
.map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString))
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
producer.futureValue shouldBe Done
@@ -480,7 +480,7 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with
awaitProduce(
Source(1L to totalMessages)
.map(n => new ProducerRecord(topic, (n % partitions).toInt, DefaultKey, n.toString))
- .runWith(Producer.plainSink(producerDefaults, testProducer))
+ .runWith(Producer.plainSink(producerDefaults.withProducer(testProducer)))
)
eventually {
exceptionTriggered.get() shouldBe true
diff --git a/tests/src/test/scala/docs/scaladsl/ProducerExample.scala b/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
index f8a600e19..b6ec7377d 100644
--- a/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
@@ -71,12 +71,14 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
val producerSettings = producerDefaults
val topic = createTopic()
// #plainSinkWithProducer
- val kafkaProducer = producerSettings.createKafkaProducerAsync()
+ // create a producer
+ val kafkaProducer = producerSettings.createKafkaProducer()
+ val settingsWithProducer = producerSettings.withProducer(kafkaProducer)
val done = Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String](topic, value))
- .runWith(Producer.plainSink(producerSettings, kafkaProducer))
+ .runWith(Producer.plainSink(settingsWithProducer))
// #plainSinkWithProducer
val (control2, result) = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
@@ -89,7 +91,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
// #plainSinkWithProducer
// close the producer after use
- system.registerOnTermination(kafkaProducer.foreach(p => p.close()))
+ kafkaProducer.close()
// #plainSinkWithProducer
}