Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommittingProducerSink: Fix count on failure #1043

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,20 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
collectOffset(0, msg.passThrough)
}

private val sendFailureCb: AsyncCallback[Throwable] = getAsyncCallback[Throwable] { exception =>
decider(exception) match {
case Supervision.Stop => closeAndFailStage(exception)
case _ => collectOffsetIgnore(exception)
}
private val sendFailureCb: AsyncCallback[(Int, Throwable)] = getAsyncCallback[(Int, Throwable)] {
case (count, exception) =>
decider(exception) match {
case Supervision.Stop => closeAndFailStage(exception)
case _ => collectOffsetIgnore(count, exception)
}
}

/** send-callback for a single message. */
private final class SendCallback(offset: Committable) extends Callback {

override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
if (exception == null) collectOffsetCb.invoke(offset)
else sendFailureCb.invoke(exception)
else sendFailureCb.invoke(1 -> exception)
}

/** send-callback for a multi-message. */
Expand All @@ -133,7 +134,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
if (exception == null) {
if (counter.decrementAndGet() == 0) collectOffsetMultiCb.invoke(count -> offset)
} else sendFailureCb.invoke(exception)
} else sendFailureCb.invoke(count -> exception)
}

// ---- Committing
Expand All @@ -149,9 +150,10 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
collectOffset(count, offset)
}

private def collectOffsetIgnore(exception: Throwable): Unit = {
private def collectOffsetIgnore(count: Int, exception: Throwable): Unit = {
log.warning("ignoring send failure {}", exception)
awaitingCommitResult -= 1
awaitingProduceResult -= count
}

private def scheduleCommit(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,56 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
control.drainAndShutdown().futureValue shouldBe Done
}

it should "choose to ignore producer errors and shut down cleanly" in assertAllStagesStopped {
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)

val elements = immutable.Seq(
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
val largeCommitInterval = 30.seconds
val committerSettings = CommitterSettings(system).withMaxInterval(largeCommitInterval)

val control = Source(elements)
.viaMat(ConsumerControlFactory.controlFlow())(Keep.right)
.map { msg =>
ProducerMessage.single(
new ProducerRecord("targetTopic", msg.record.key, msg.record.value),
msg.committableOffset
)
}
.toMat(
Producer
.committableSink(producerSettings, committerSettings)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()

// fail the first message
while (!producer.errorNext(new RuntimeException("let producing fail"))) {}
consumer.actor.expectNoMessage(100.millis)

// second message succeeds and its offset gets committed
while (!producer.completeNext()) {}

// expect the commit to reach the actor within 1 second because the source completed, which should trigger commit
val commitMsg = consumer.actor.expectMsgClass(1.second, classOf[Internal.Commit])
commitMsg.tp shouldBe new TopicPartition(topic, partition)
commitMsg.offsetAndMetadata.offset() shouldBe (consumer.startOffset + 2)
consumer.actor.reply(Done)

eventually {
producer.history.asScala should have size (2)
}
control.drainAndShutdown().futureValue shouldBe Done
}

it should "fail for commit timeout" in assertAllStagesStopped {
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)

Expand Down