Skip to content

Commit

Permalink
CommittingProducerSink: Fix count on failure (#1043)
Browse files Browse the repository at this point in the history
Correct the updating of the count of outstanding produce requests
when a producer failure occurs in CommittingProducerSinkStage.
  • Loading branch information
gabrielreid authored Feb 9, 2020
1 parent cf91fb3 commit a3a7940
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,20 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
collectOffset(0, msg.passThrough)
}

private val sendFailureCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { exception =>
decider(exception) match {
case Supervision.Stop => closeAndFailStage(exception)
case _ => collectOffsetIgnore(exception)
}
private val sendFailureCb: AsyncCallback[(Int, Throwable)] = getAsyncCallback[(Int, Throwable)] {
case (count, exception) =>
decider(exception) match {
case Supervision.Stop => closeAndFailStage(exception)
case _ => collectOffsetIgnore(count, exception)
}
}

/** send-callback for a single message. */
private final class SendCallback(offset: Committable) extends Callback {

override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
if (exception == null) collectOffsetCb.invoke(offset)
else sendFailureCb.invoke(exception)
else sendFailureCb.invoke(1 -> exception)
}

/** send-callback for a multi-message. */
Expand All @@ -133,7 +134,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
if (exception == null) {
if (counter.decrementAndGet() == 0) collectOffsetMultiCb.invoke(count -> offset)
} else sendFailureCb.invoke(exception)
} else sendFailureCb.invoke(count -> exception)
}

// ---- Committing
Expand All @@ -149,9 +150,10 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
collectOffset(count, offset)
}

private def collectOffsetIgnore(exception: Throwable): Unit = {
private def collectOffsetIgnore(count: Int, exception: Throwable): Unit = {
log.warning("ignoring send failure {}", exception)
awaitingCommitResult -= 1
awaitingProduceResult -= count
}

private def scheduleCommit(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,56 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
control.drainAndShutdown().futureValue shouldBe Done
}

it should "choose to ignore producer errors and shut down cleanly" in assertAllStagesStopped {
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)

val elements = immutable.Seq(
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
val largeCommitInterval = 30.seconds
val committerSettings = CommitterSettings(system).withMaxInterval(largeCommitInterval)

val control = Source(elements)
.viaMat(ConsumerControlFactory.controlFlow())(Keep.right)
.map { msg =>
ProducerMessage.single(
new ProducerRecord("targetTopic", msg.record.key, msg.record.value),
msg.committableOffset
)
}
.toMat(
Producer
.committableSink(producerSettings, committerSettings)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()

// fail the first message
while (!producer.errorNext(new RuntimeException("let producing fail"))) {}
consumer.actor.expectNoMessage(100.millis)

// second message succeeds and its offset gets committed
while (!producer.completeNext()) {}

// expect the commit to reach the actor within 1 second because the source completed, which should trigger commit
val commitMsg = consumer.actor.expectMsgClass(1.second, classOf[Internal.Commit])
commitMsg.tp shouldBe new TopicPartition(topic, partition)
commitMsg.offsetAndMetadata.offset() shouldBe (consumer.startOffset + 2)
consumer.actor.reply(Done)

eventually {
producer.history.asScala should have size (2)
}
control.drainAndShutdown().futureValue shouldBe Done
}

it should "fail for commit timeout" in assertAllStagesStopped {
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)

Expand Down

0 comments on commit a3a7940

Please sign in to comment.