From 81b3f90e75a4cdbc3a6d84dd5386b29e653a6bab Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 22 Nov 2019 10:49:11 +0100 Subject: [PATCH] Review feedback; DRY deferred producer creation --- .../CommittingProducerSinkStage.scala | 88 ++++++++----------- .../kafka/internal/DefaultProducerStage.scala | 60 +++---------- .../kafka/internal/DeferredProducer.scala | 76 ++++++++++++++++ .../internal/TransactionalProducerStage.scala | 10 +-- .../internal/CommittingProducerSinkSpec.scala | 9 +- 5 files changed, 132 insertions(+), 111 deletions(-) create mode 100644 core/src/main/scala/akka/kafka/internal/DeferredProducer.scala diff --git a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala index 09c7d5ebf..ab0eb1a24 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala @@ -5,12 +5,10 @@ package akka.kafka.internal -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.util.concurrent.atomic.AtomicInteger import akka.Done import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch} import akka.kafka.ProducerMessage._ import akka.kafka.{CommitDelivery, CommitterSettings, ProducerSettings} @@ -18,10 +16,9 @@ import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Supervision.Decider import akka.stream.stage._ import akka.stream.{Attributes, Inlet, SinkShape, Supervision} -import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata} +import org.apache.kafka.clients.producer.{Callback, RecordMetadata} import scala.concurrent.{Future, Promise} -import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** @@ -50,7 +47,10 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, stage: CommittingProducerSinkStage[K, V, IN], inheritedAttributes: Attributes ) extends TimerGraphStageLogic(stage.shape) - with StageLogging { + with StageLogging + with DeferredProducer[K, V] { + + import CommittingProducerSinkStage._ /** The promise behind the materialized future. */ final val streamCompletion = Promise[Done] @@ -58,19 +58,14 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, private lazy val decider: Decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - /** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */ - private var producer: Producer[K, V] = _ - override protected def logSource: Class[_] = classOf[CommittingProducerSinkStage[_, _, _]] - private val closeAndFailStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable](closeAndFailStage) + override protected val producerSettings: ProducerSettings[K, V] = stage.producerSettings + + override protected val closeAndFailStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable](closeAndFailStage) private def closeAndFailStage(ex: Throwable): Unit = { - if (producer != null) { - // Discard unsent ProducerRecords after encountering a send-failure in ProducerStage - // https://github.com/akka/alpakka-kafka/pull/318 - producer.close(0L, TimeUnit.MILLISECONDS) - } + closeProducerImmediately() failStage(ex) streamCompletion.failure(ex) } @@ -82,32 +77,12 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, } /** When the producer is set up, the sink pulls and schedules the first commit. */ - private def assignProducer(p: Producer[K, V]): Unit = { - producer = p + override protected def producerAssigned(): Unit = { tryPull(stage.in) scheduleCommit() log.debug("CommittingProducerSink initialized") } - private def resolveProducer(): Unit = { - val producerFuture = stage.producerSettings.createKafkaProducerAsync()(materializer.executionContext) - producerFuture.value match { - case Some(Success(p)) => assignProducer(p) - case Some(Failure(e)) => failStage(e) - case None => - val assign = getAsyncCallback(assignProducer) - producerFuture - .transform( - producer => assign.invoke(producer), - e => { - log.error(e, "producer creation failed") - closeAndFailStageCb.invoke(e) - e - } - )(ExecutionContexts.sameThreadExecutionContext) - } - } - // ---- Producing /** Counter for number of outstanding messages that are sent, but didn't get the callback, yet. */ private var awaitingProduceResult = 0L @@ -182,17 +157,17 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, scheduleOnce(CommittingProducerSinkStage.CommitNow, stage.committerSettings.maxInterval) override protected def onTimer(timerKey: Any): Unit = timerKey match { - case CommittingProducerSinkStage.CommitNow => commit("interval") + case CommittingProducerSinkStage.CommitNow => commit(Interval) } private def collectOffset(count: Int, offset: Committable): Unit = { awaitingProduceResult -= count offsetBatch = offsetBatch.updated(offset) - if (offsetBatch.batchSize >= stage.committerSettings.maxBatch) commit("batch size") - else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit("upstream closed") + if (offsetBatch.batchSize >= stage.committerSettings.maxBatch) commit(BatchSize) + else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit(UpstreamClosed) } - private def commit(triggeredBy: String): Unit = { + private def commit(triggeredBy: TriggerdBy): Unit = { if (offsetBatch.batchSize != 0) { log.debug("commit triggered by {} (awaitingProduceResult={} awaitingCommitResult={})", triggeredBy, @@ -200,7 +175,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, awaitingCommitResult) val batchSize = offsetBatch.batchSize offsetBatch - .commitScaladsl() + .commitInternal() .onComplete(t => commitResultCB.invoke(batchSize -> t))(materializer.executionContext) offsetBatch = CommittableOffsetBatch.empty } @@ -240,7 +215,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, completeStage() streamCompletion.success(Done) } else { - commit("upstream finish") + commit(UpstreamFinish) setKeepGoing(true) upstreamCompletionState = Some(Success(Done)) } @@ -249,7 +224,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, if (awaitingCommitResult == 0) { closeAndFailStage(ex) } else { - commit("upstream failure") + commit(UpstreamFailure) setKeepGoing(true) upstreamCompletionState = Some(Failure(ex)) } @@ -279,18 +254,25 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, super.postStop() } - private def closeProducer(): Unit = - if (producer != null && stage.producerSettings.closeProducerOnStop) { - try { - // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case - producer.flush() - producer.close(stage.producerSettings.closeTimeout.toMillis, TimeUnit.MILLISECONDS) - } catch { - case NonFatal(ex) => log.error(ex, "Problem occurred during producer close") - } - } } private object CommittingProducerSinkStage { val CommitNow = "commit" + + sealed trait TriggerdBy + case object BatchSize extends TriggerdBy { + override def toString: String = "batch size" + } + case object Interval extends TriggerdBy { + override def toString: String = "interval" + } + case object UpstreamClosed extends TriggerdBy { + override def toString: String = "upstream closed" + } + case object UpstreamFinish extends TriggerdBy { + override def toString: String = "upstream finish" + } + case object UpstreamFailure extends TriggerdBy { + override def toString: String = "upstream failure" + } } diff --git a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala index 50c96429e..3a1d275fa 100644 --- a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala @@ -4,23 +4,21 @@ */ package akka.kafka.internal -import java.util.concurrent.TimeUnit + import java.util.concurrent.atomic.AtomicInteger import akka.Done import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.ProducerMessage._ import akka.kafka.ProducerSettings import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState} import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Supervision.Decider -import akka.stream.{Attributes, FlowShape, Supervision} import akka.stream.stage._ -import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata} +import akka.stream.{Attributes, FlowShape, Supervision} +import org.apache.kafka.clients.producer.{Callback, RecordMetadata} import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** @@ -46,47 +44,25 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: inheritedAttributes: Attributes ) extends TimerGraphStageLogic(stage.shape) with StageLogging + with DeferredProducer[K, V] with MessageCallback[K, V, P] with ProducerCompletionState { private lazy val decider: Decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) protected val awaitingConfirmation = new AtomicInteger(0) - protected var producer: Producer[K, V] = _ private var inIsClosed = false private var completionState: Option[Try[Done]] = None override protected def logSource: Class[_] = classOf[DefaultProducerStage[_, _, _, _, _]] + final override val producerSettings: ProducerSettings[K, V] = stage.settings + override def preStart(): Unit = { super.preStart() resolveProducer() } - protected def assignProducer(p: Producer[K, V]): Unit = { - producer = p - resumeDemand() - } - - private def resolveProducer(): Unit = { - val producerFuture = stage.settings.createKafkaProducerAsync()(materializer.executionContext) - producerFuture.value match { - case Some(Success(p)) => assignProducer(p) - case Some(Failure(e)) => failStage(e) - case None => - val assign = getAsyncCallback(assignProducer) - producerFuture - .transform( - producer => assign.invoke(producer), - e => { - log.error(e, "producer creation failed") - failStageCb.invoke(e) - e - } - )(ExecutionContexts.sameThreadExecutionContext) - } - } - def checkForCompletion(): Unit = if (isClosed(stage.in) && awaitingConfirmation.get == 0) { completionState match { @@ -104,17 +80,15 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: checkForCompletion() } - val failStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { ex => - if (producer != null) { - // Discard unsent ProducerRecords after encountering a send-failure in ProducerStage - // https://github.com/akka/alpakka-kafka/pull/318 - producer.close(0L, TimeUnit.MILLISECONDS) - } + override protected val closeAndFailStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { ex => + closeProducerImmediately() failStage(ex) } def postSend(msg: Envelope[K, V, P]) = () + override protected def producerAssigned(): Unit = resumeDemand() + protected def resumeDemand(tryToPull: Boolean = true): Unit = { setHandler(stage.out, new OutHandler { override def onPull(): Unit = tryPull(stage.in) @@ -196,7 +170,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: if (exception == null) onSuccess(metadata) else decider(exception) match { - case Supervision.Stop => failStageCb.invoke(exception) + case Supervision.Stop => closeAndFailStageCb.invoke(exception) case _ => promise.failure(exception) } if (awaitingConfirmation.decrementAndGet() == 0 && inIsClosed) @@ -210,16 +184,4 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: super.postStop() } - private def closeProducer(): Unit = - if (stage.settings.closeProducerOnStop && producer != null) { - try { - // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case - producer.flush() - producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS) - log.debug("Producer closed") - } catch { - case NonFatal(ex) => log.error(ex, "Problem occurred during producer close") - } - } - } diff --git a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala new file mode 100644 index 000000000..252bb0374 --- /dev/null +++ b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.internal + +import java.util.concurrent.TimeUnit + +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.kafka.ProducerSettings +import akka.stream.stage._ +import org.apache.kafka.clients.producer.Producer + +import scala.util.control.NonFatal +import scala.util.{Failure, Success} + +/** + * INTERNAL API + */ +@InternalApi +private[kafka] trait DeferredProducer[K, V] { + self: GraphStageLogic with StageLogging => + + /** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */ + protected var producer: Producer[K, V] = _ + + protected def producerSettings: ProducerSettings[K, V] + protected def producerAssigned(): Unit + protected def closeAndFailStageCb: AsyncCallback[Throwable] + + private def assignProducer(p: Producer[K, V]): Unit = { + producer = p + producerAssigned() + } + + final protected def resolveProducer(): Unit = { + val producerFuture = producerSettings.createKafkaProducerAsync()(materializer.executionContext) + producerFuture.value match { + case Some(Success(p)) => assignProducer(p) + case Some(Failure(e)) => failStage(e) + case None => + val assign = getAsyncCallback(assignProducer) + producerFuture + .transform( + producer => assign.invoke(producer), + e => { + log.error(e, "producer creation failed") + closeAndFailStageCb.invoke(e) + e + } + )(ExecutionContexts.sameThreadExecutionContext) + } + } + + protected def closeProducerImmediately(): Unit = + if (producer != null) { + // Discard unsent ProducerRecords after encountering a send-failure in ProducerStage + // https://github.com/akka/alpakka-kafka/pull/318 + producer.close(0L, TimeUnit.MILLISECONDS) + } + + protected def closeProducer(): Unit = + if (producerSettings.closeProducerOnStop && producer != null) { + try { + // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case + producer.flush() + producer.close(producerSettings.closeTimeout.toMillis, TimeUnit.MILLISECONDS) + log.debug("Producer closed") + } catch { + case NonFatal(ex) => log.error(ex, "Problem occurred during producer close") + } + } + +} diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 00268d6e7..6aec520ae 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -14,7 +14,6 @@ import akka.kafka.{ConsumerMessage, ProducerSettings} import akka.stream.stage._ import akka.stream.{Attributes, FlowShape} import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.common.TopicPartition import scala.concurrent.Future @@ -116,12 +115,11 @@ private final class TransactionalProducerStageLogic[K, V, P]( override def preStart(): Unit = super.preStart() - override protected def assignProducer(p: Producer[K, V]): Unit = { - producer = p + override protected def producerAssigned(): Unit = { initTransactions() beginTransaction() resumeDemand() - scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval) + scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) } // suspend demand until a Producer has been created @@ -151,7 +149,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( suspendDemand() scheduleOnce(commitSchedulerKey, messageDrainInterval) case _ => - scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval) + scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) } } @@ -194,7 +192,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( val onInternalCommitAckCb: AsyncCallback[Unit] = { getAsyncCallback[Unit]( - _ => scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval) + _ => scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) ) } diff --git a/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala b/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala index e42503873..dbd8d2a86 100644 --- a/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala @@ -109,7 +109,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) - val committerSettings = CommitterSettings(system).withMaxBatch(2L) + val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds) val control = Source(elements) .concat(Source.maybe) // keep the source alive @@ -146,8 +146,9 @@ class CommittingProducerSinkSpec(_system: ActorSystem) val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) - val commitInterval = 5.seconds - val committerSettings = CommitterSettings(system).withMaxInterval(commitInterval) + // choose a large commit interval so that completion happens before + val largeCommitInterval = 30.seconds + val committerSettings = CommitterSettings(system).withMaxInterval(largeCommitInterval) val control = Source(elements) .viaMat(ConsumerControlFactory.controlFlow())(Keep.right) @@ -161,6 +162,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) .mapMaterializedValue(DrainingControl.apply) .run() + // expect the commit to reach the actor within 1 second val commitMsg = consumer.actor.expectMsgClass(1.second, classOf[Internal.Commit]) commitMsg.tp shouldBe new TopicPartition(topic, partition) commitMsg.offsetAndMetadata.offset() shouldBe (consumer.startOffset + 2) @@ -261,6 +263,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 2") ) + // this producer does not auto complete messages val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer)