Skip to content

Commit

Permalink
Review feedback; DRY deferred producer creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Nov 22, 2019
1 parent 2a0bec0 commit 81b3f90
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@

package akka.kafka.internal

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch}
import akka.kafka.ProducerMessage._
import akka.kafka.{CommitDelivery, CommitterSettings, ProducerSettings}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream.stage._
import akka.stream.{Attributes, Inlet, SinkShape, Supervision}
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

import scala.concurrent.{Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -50,27 +47,25 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
stage: CommittingProducerSinkStage[K, V, IN],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging {
with StageLogging
with DeferredProducer[K, V] {

import CommittingProducerSinkStage._

/** The promise behind the materialized future. */
final val streamCompletion = Promise[Done]

private lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)

/** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */
private var producer: Producer[K, V] = _

override protected def logSource: Class[_] = classOf[CommittingProducerSinkStage[_, _, _]]

private val closeAndFailStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable](closeAndFailStage)
override protected val producerSettings: ProducerSettings[K, V] = stage.producerSettings

override protected val closeAndFailStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable](closeAndFailStage)

private def closeAndFailStage(ex: Throwable): Unit = {
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
// https://github.com/akka/alpakka-kafka/pull/318
producer.close(0L, TimeUnit.MILLISECONDS)
}
closeProducerImmediately()
failStage(ex)
streamCompletion.failure(ex)
}
Expand All @@ -82,32 +77,12 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
}

/** When the producer is set up, the sink pulls and schedules the first commit. */
private def assignProducer(p: Producer[K, V]): Unit = {
producer = p
override protected def producerAssigned(): Unit = {
tryPull(stage.in)
scheduleCommit()
log.debug("CommittingProducerSink initialized")
}

private def resolveProducer(): Unit = {
val producerFuture = stage.producerSettings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
case None =>
val assign = getAsyncCallback(assignProducer)
producerFuture
.transform(
producer => assign.invoke(producer),
e => {
log.error(e, "producer creation failed")
closeAndFailStageCb.invoke(e)
e
}
)(ExecutionContexts.sameThreadExecutionContext)
}
}

// ---- Producing
/** Counter for number of outstanding messages that are sent, but didn't get the callback, yet. */
private var awaitingProduceResult = 0L
Expand Down Expand Up @@ -182,25 +157,25 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
scheduleOnce(CommittingProducerSinkStage.CommitNow, stage.committerSettings.maxInterval)

override protected def onTimer(timerKey: Any): Unit = timerKey match {
case CommittingProducerSinkStage.CommitNow => commit("interval")
case CommittingProducerSinkStage.CommitNow => commit(Interval)
}

private def collectOffset(count: Int, offset: Committable): Unit = {
awaitingProduceResult -= count
offsetBatch = offsetBatch.updated(offset)
if (offsetBatch.batchSize >= stage.committerSettings.maxBatch) commit("batch size")
else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit("upstream closed")
if (offsetBatch.batchSize >= stage.committerSettings.maxBatch) commit(BatchSize)
else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit(UpstreamClosed)
}

private def commit(triggeredBy: String): Unit = {
private def commit(triggeredBy: TriggerdBy): Unit = {
if (offsetBatch.batchSize != 0) {
log.debug("commit triggered by {} (awaitingProduceResult={} awaitingCommitResult={})",
triggeredBy,
awaitingProduceResult,
awaitingCommitResult)
val batchSize = offsetBatch.batchSize
offsetBatch
.commitScaladsl()
.commitInternal()
.onComplete(t => commitResultCB.invoke(batchSize -> t))(materializer.executionContext)
offsetBatch = CommittableOffsetBatch.empty
}
Expand Down Expand Up @@ -240,7 +215,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
completeStage()
streamCompletion.success(Done)
} else {
commit("upstream finish")
commit(UpstreamFinish)
setKeepGoing(true)
upstreamCompletionState = Some(Success(Done))
}
Expand All @@ -249,7 +224,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
if (awaitingCommitResult == 0) {
closeAndFailStage(ex)
} else {
commit("upstream failure")
commit(UpstreamFailure)
setKeepGoing(true)
upstreamCompletionState = Some(Failure(ex))
}
Expand Down Expand Up @@ -279,18 +254,25 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
super.postStop()
}

private def closeProducer(): Unit =
if (producer != null && stage.producerSettings.closeProducerOnStop) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(stage.producerSettings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
}
}
}

private object CommittingProducerSinkStage {
val CommitNow = "commit"

sealed trait TriggerdBy
case object BatchSize extends TriggerdBy {
override def toString: String = "batch size"
}
case object Interval extends TriggerdBy {
override def toString: String = "interval"
}
case object UpstreamClosed extends TriggerdBy {
override def toString: String = "upstream closed"
}
case object UpstreamFinish extends TriggerdBy {
override def toString: String = "upstream finish"
}
case object UpstreamFailure extends TriggerdBy {
override def toString: String = "upstream failure"
}
}
60 changes: 11 additions & 49 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@
*/

package akka.kafka.internal
import java.util.concurrent.TimeUnit

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ProducerMessage._
import akka.kafka.ProducerSettings
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream.{Attributes, FlowShape, Supervision}
import akka.stream.stage._
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}
import akka.stream.{Attributes, FlowShape, Supervision}
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -46,47 +44,25 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
with DeferredProducer[K, V]
with MessageCallback[K, V, P]
with ProducerCompletionState {

private lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
protected val awaitingConfirmation = new AtomicInteger(0)
protected var producer: Producer[K, V] = _
private var inIsClosed = false
private var completionState: Option[Try[Done]] = None

override protected def logSource: Class[_] = classOf[DefaultProducerStage[_, _, _, _, _]]

final override val producerSettings: ProducerSettings[K, V] = stage.settings

override def preStart(): Unit = {
super.preStart()
resolveProducer()
}

protected def assignProducer(p: Producer[K, V]): Unit = {
producer = p
resumeDemand()
}

private def resolveProducer(): Unit = {
val producerFuture = stage.settings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
case None =>
val assign = getAsyncCallback(assignProducer)
producerFuture
.transform(
producer => assign.invoke(producer),
e => {
log.error(e, "producer creation failed")
failStageCb.invoke(e)
e
}
)(ExecutionContexts.sameThreadExecutionContext)
}
}

def checkForCompletion(): Unit =
if (isClosed(stage.in) && awaitingConfirmation.get == 0) {
completionState match {
Expand All @@ -104,17 +80,15 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
checkForCompletion()
}

val failStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { ex =>
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
// https://github.com/akka/alpakka-kafka/pull/318
producer.close(0L, TimeUnit.MILLISECONDS)
}
override protected val closeAndFailStageCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { ex =>
closeProducerImmediately()
failStage(ex)
}

def postSend(msg: Envelope[K, V, P]) = ()

override protected def producerAssigned(): Unit = resumeDemand()

protected def resumeDemand(tryToPull: Boolean = true): Unit = {
setHandler(stage.out, new OutHandler {
override def onPull(): Unit = tryPull(stage.in)
Expand Down Expand Up @@ -196,7 +170,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
if (exception == null) onSuccess(metadata)
else
decider(exception) match {
case Supervision.Stop => failStageCb.invoke(exception)
case Supervision.Stop => closeAndFailStageCb.invoke(exception)
case _ => promise.failure(exception)
}
if (awaitingConfirmation.decrementAndGet() == 0 && inIsClosed)
Expand All @@ -210,16 +184,4 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
super.postStop()
}

private def closeProducer(): Unit =
if (stage.settings.closeProducerOnStop && producer != null) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
}
}

}
76 changes: 76 additions & 0 deletions core/src/main/scala/akka/kafka/internal/DeferredProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.internal

import java.util.concurrent.TimeUnit

import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ProducerSettings
import akka.stream.stage._
import org.apache.kafka.clients.producer.Producer

import scala.util.control.NonFatal
import scala.util.{Failure, Success}

/**
* INTERNAL API
*/
@InternalApi
private[kafka] trait DeferredProducer[K, V] {
self: GraphStageLogic with StageLogging =>

/** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */
protected var producer: Producer[K, V] = _

protected def producerSettings: ProducerSettings[K, V]
protected def producerAssigned(): Unit
protected def closeAndFailStageCb: AsyncCallback[Throwable]

private def assignProducer(p: Producer[K, V]): Unit = {
producer = p
producerAssigned()
}

final protected def resolveProducer(): Unit = {
val producerFuture = producerSettings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
case None =>
val assign = getAsyncCallback(assignProducer)
producerFuture
.transform(
producer => assign.invoke(producer),
e => {
log.error(e, "producer creation failed")
closeAndFailStageCb.invoke(e)
e
}
)(ExecutionContexts.sameThreadExecutionContext)
}
}

protected def closeProducerImmediately(): Unit =
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
// https://github.com/akka/alpakka-kafka/pull/318
producer.close(0L, TimeUnit.MILLISECONDS)
}

protected def closeProducer(): Unit =
if (producerSettings.closeProducerOnStop && producer != null) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(producerSettings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
}
}

}
Loading

0 comments on commit 81b3f90

Please sign in to comment.