diff --git a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala index d387caad1..282d05c5e 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala @@ -111,11 +111,12 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, collectOffset(0, msg.passThrough) } - private val sendFailureCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { exception => - decider(exception) match { - case Supervision.Stop => closeAndFailStage(exception) - case _ => collectOffsetIgnore(exception) - } + private val sendFailureCb: AsyncCallback[(Int, Throwable)] = getAsyncCallback[(Int, Throwable)] { + case (count, exception) => + decider(exception) match { + case Supervision.Stop => closeAndFailStage(exception) + case _ => collectOffsetIgnore(count, exception) + } } /** send-callback for a single message. */ @@ -123,7 +124,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = if (exception == null) collectOffsetCb.invoke(offset) - else sendFailureCb.invoke(exception) + else sendFailureCb.invoke(1 -> exception) } /** send-callback for a multi-message. */ @@ -133,7 +134,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = if (exception == null) { if (counter.decrementAndGet() == 0) collectOffsetMultiCb.invoke(count -> offset) - } else sendFailureCb.invoke(exception) + } else sendFailureCb.invoke(count -> exception) } // ---- Committing @@ -149,9 +150,10 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, collectOffset(count, offset) } - private def collectOffsetIgnore(exception: Throwable): Unit = { + private def collectOffsetIgnore(count: Int, exception: Throwable): Unit = { log.warning("ignoring send failure {}", exception) awaitingCommitResult -= 1 + awaitingProduceResult -= count } private def scheduleCommit(): Unit = diff --git a/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala b/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala index c8d15c78b..4b1d11276 100644 --- a/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala @@ -424,6 +424,56 @@ class CommittingProducerSinkSpec(_system: ActorSystem) control.drainAndShutdown().futureValue shouldBe Done } + it should "choose to ignore producer errors and shut down cleanly" in assertAllStagesStopped { + val consumer = FakeConsumer(groupId, topic, startOffset = 1616L) + + val elements = immutable.Seq( + consumer.message(partition, "value 1"), + consumer.message(partition, "value 2") + ) + + val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) + .withProducer(producer) + // choose a large commit interval so that completion happens before + val largeCommitInterval = 30.seconds + val committerSettings = CommitterSettings(system).withMaxInterval(largeCommitInterval) + + val control = Source(elements) + .viaMat(ConsumerControlFactory.controlFlow())(Keep.right) + .map { msg => + ProducerMessage.single( + new ProducerRecord("targetTopic", msg.record.key, msg.record.value), + msg.committableOffset + ) + } + .toMat( + Producer + .committableSink(producerSettings, committerSettings) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + )(Keep.both) + .mapMaterializedValue(DrainingControl.apply) + .run() + + // fail the first message + while (!producer.errorNext(new RuntimeException("let producing fail"))) {} + consumer.actor.expectNoMessage(100.millis) + + // second message succeeds and its offset gets committed + while (!producer.completeNext()) {} + + // expect the commit to reach the actor within 1 second because the source completed, which should trigger commit + val commitMsg = consumer.actor.expectMsgClass(1.second, classOf[Internal.Commit]) + commitMsg.tp shouldBe new TopicPartition(topic, partition) + commitMsg.offsetAndMetadata.offset() shouldBe (consumer.startOffset + 2) + consumer.actor.reply(Done) + + eventually { + producer.history.asScala should have size (2) + } + control.drainAndShutdown().futureValue shouldBe Done + } + it should "fail for commit timeout" in assertAllStagesStopped { val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)