From 645591bca2c1f466e82ae3b044a9457fe14b50d9 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 19 Jan 2025 14:59:35 +0100 Subject: [PATCH] Producer fails faster, Retry publish after Auth error (#1437) 1. When the producer fails to send a message from a batch of messages, it stops sending after the first error. This behavior is extended by also stopping after a failure from Kafka's callback. 2. Introduce producer setting `authErrorRetrySchedule` on which sending after auth errors (`AuthorizationException` and `AuthenticationException`) can be retried. Auth error occur on some slow brokers. Both changes have been made possible by a new test framework in which we have precise control over the order of callbacks. This change is not binary compatible due to changes in `ProducerSettings`. Also: - cleanup `Producer` scaladocs - add more producer tests --------- Co-authored-by: Steven Vroonland --- .../producer/AsyncProducerTestSupport.scala | 282 ++++++++++++ .../zio/kafka/producer/ProducerSpec.scala | 418 ++++++++++-------- .../scala/zio/kafka/producer/Producer.scala | 179 ++++++-- .../zio/kafka/producer/ProducerSettings.scala | 39 +- .../producer/TransactionalProducer.scala | 4 +- .../TransactionalProducerSettings.scala | 17 +- 6 files changed, 719 insertions(+), 220 deletions(-) create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/producer/AsyncProducerTestSupport.scala diff --git a/zio-kafka-test/src/test/scala/zio/kafka/producer/AsyncProducerTestSupport.scala b/zio-kafka-test/src/test/scala/zio/kafka/producer/AsyncProducerTestSupport.scala new file mode 100644 index 000000000..94599c323 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/producer/AsyncProducerTestSupport.scala @@ -0,0 +1,282 @@ +package zio.kafka.producer + +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } +import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, RecordMetadata } +import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition, Uuid } +import org.apache.kafka.clients.producer.{ Producer => KafkaProducer } +import zio._ + +import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger +import java.util.{ List => JList, Map => JMap } +import java.util.concurrent.{ CompletableFuture, Future => JFuture } +import scala.collection.mutable + +/** + * A test framework for unit testing users of the async `send(ProducerRecord, Callback)` method of the Kafka producer, + * featuring precise control over when the callback is called. + * + * This was build because the MockProducer from the Kafka library does not provide callback control. + * + * Use it as follows: + * {{{ + * val sendException = new RuntimeException("fail from send") + * val callbackException = new RuntimeException("fail from callback") + * val mockBehavior = AsyncProducerTestSupport.newMockBehavior[Array[Byte], Array[Byte]]() + * .sendSucceed() // send 0 + * .sendFail(sendException) // send fails immediately (no number) + * .sendSucceed() // send 1 + * .callbackSucceed() // callback for send 0 + * .callbackFail(callbackException) // callback for send 1 + * ////// Out of order callbacks: + * .sendSucceed() // send 2 + * .sendSucceed() // send 3 + * .sendSucceed() // send 4 + * .callbackSucceed(3) // out of order callback of send 3 + * .callbackSucceed() // callback of send 2 + * .callbackSucceed() // callback of send 4 + * ////// Basic checks + * .callbackSucceed() // Fails because all sends already have a callback + * .callbackSucceed(1) // Fails because send 1 already has a callback + * }}} + * + * When all mock behavior have been given, you can use the `run` method and use the given mock producer as if it is a + * regular Kafka producer. + * + * {{{ + * mockBehavior.run { mockProducer => + * // use the producer, make assertions, etc. + * } + * }}} + * + * The 'send' invocations are expected in the order they are given. When 'publisher.send' is invoked while a callback + * mock behavior is pending, the 'send' operation is blocked until the callback is done. + * + * Get a history of all send records (all 'send' attempts) with: `mockBehavior.history()`. + * + * Current limitations: + * - the given mock producer is not thread-safe; only one threads can use it at a time + * - the future that is returned from the `send` method never completes + * - the metadata that is returned is always the same, it contains no information + */ +object AsyncProducerTestSupport { + + // Developer notes: + // `n` is used as the callback index (and therefore the index of `sendSuccess` mock behavior) + // + // While the behavior methods are invoked, and with a final check in `run`, we check that every callback behavior + // (either a `callbackSucceed` or a `callbackException`) is coupled to exactly 1 `sendSuccess` behavior. Too + // little, or additional callbacks result in an exception. + // + // Method `run` works with 2 fibers that run concurrently: + // - a coordinator fiber, that loops over the behaviors, awaiting `send`s and invoking callbacks + // - the test code that can use `mockProducer.send` + // The two coordinate via several promises: + // - the test code awaits a start-promise that tells the `send` may proceed + // - the coordinator awaits the callback-promise that tells that send was invoked, and to get the callback + // that can later be invoked + + trait AsyncProducerTestSupportBehavior[K, V] { + def sendSucceed(): AsyncProducerTestSupportBehavior[K, V] + def sendFail(e: Throwable): AsyncProducerTestSupportBehavior[K, V] + def callbackSucceed(): AsyncProducerTestSupportBehavior[K, V] + def callbackSucceed(n: Int): AsyncProducerTestSupportBehavior[K, V] + def callbackFail(e: Exception): AsyncProducerTestSupportBehavior[K, V] + def callbackFail(n: Int, e: Exception): AsyncProducerTestSupportBehavior[K, V] + def run[R, A](testCode: KafkaProducer[K, V] => ZIO[R, Throwable, A]): ZIO[R, Throwable, A] + def history: Chunk[ProducerRecord[K, V]] + } + + private sealed trait MockBehavior + private final case class SendSucceed(n: Int) extends MockBehavior + private final case class SendFail(e: Throwable) extends MockBehavior { + override def toString: String = s"SendFail(${e.getClass.getSimpleName})" + } + private final case class CallbackSucceed(n: Int) extends MockBehavior + private final case class CallbackFail(n: Int, e: Exception) extends MockBehavior { + override def toString: String = s"CallbackFail($n, ${e.getClass.getSimpleName})" + } + + def newMockBehavior[K, V](): AsyncProducerTestSupportBehavior[K, V] = new AsyncProducerTestSupportBehavior[K, V] { + private val behaviorBuilder: ChunkBuilder[MockBehavior] = Chunk.newBuilder + private val callbacksAvailable: mutable.Set[Int] = mutable.BitSet.empty + private var callbackCount: Int = 0 + private val _history: ChunkBuilder[ProducerRecord[K, V]] = Chunk.newBuilder + + override def sendSucceed(): AsyncProducerTestSupportBehavior[K, V] = { + behaviorBuilder += SendSucceed(callbackCount) + callbacksAvailable += callbackCount + callbackCount += 1 + this + } + override def sendFail(e: Throwable): AsyncProducerTestSupportBehavior[K, V] = { + behaviorBuilder += SendFail(e) + this + } + private def addCallback(n: Option[Int], op: Int => MockBehavior): AsyncProducerTestSupportBehavior[K, V] = { + n match { + case Some(n) => + if (callbacksAvailable.contains(n)) { + callbacksAvailable -= n + behaviorBuilder += op(n) + } else { + throw new AssertionError( + s"Callback mock behavior for send $n can not be added because that send expectation does not exist, or it already has a callback" + ) + } + case None => + throw new AssertionError( + s"Callback mock behavior can not be added because all send expectations already have a callback" + ) + } + this + } + override def callbackSucceed(): AsyncProducerTestSupportBehavior[K, V] = + addCallback(callbacksAvailable.minOption, CallbackSucceed(_)) + override def callbackSucceed(n: Int): AsyncProducerTestSupportBehavior[K, V] = + addCallback(Some(n), CallbackSucceed(_)) + override def callbackFail(e: Exception): AsyncProducerTestSupportBehavior[K, V] = + addCallback(callbacksAvailable.minOption, CallbackFail(_, e)) + override def callbackFail(n: Int, e: Exception): AsyncProducerTestSupportBehavior[K, V] = + addCallback(Some(n), CallbackFail(_, e)) + + override def run[R, A](testCode: KafkaProducer[K, V] => ZIO[R, Throwable, A]): ZIO[R, Throwable, A] = { + if (callbacksAvailable.nonEmpty) { + throw new AssertionError(s"Missing ${callbacksAvailable.size} callback mock behaviors") + } + + final case class SendExpectation( + mockBehavior: MockBehavior, // For scala 3 change to: `SendSucceed | SendFail` + startPromise: Promise[Nothing, Unit], + callbackPromise: Promise[Nothing, Callback] + ) + + def fromOptionOrDie[A1](value: => Option[A1]): ZIO[Any, Nothing, A1] = + ZIO + .fromOption(value) + .orDieWith(_ => new AssertionError("Bug in AsyncProducerTestSupport")) + + def callbackPromiseForN(sendExpectations: Seq[SendExpectation], n: Int): Option[Promise[Nothing, Callback]] = + sendExpectations.collectFirst { + case SendExpectation(SendSucceed(n1), _, callbackPromise) if n1 == n => callbackPromise + } + + val behaviors = behaviorBuilder.result() + for { + sendExpectations <- ZIO.collect(behaviors) { + case behavior @ (SendSucceed(_) | SendFail(_)) => + for { + startPromise <- Promise.make[Nothing, Unit] + callbackPromise <- Promise.make[Nothing, Callback] + } yield SendExpectation(behavior, startPromise, callbackPromise) + case _ => ZIO.fail(None) + } + runtime <- ZIO.runtime[Any] + mockProducer = new NotSupportedProducer[K, V] { + private val currentSendIndex = new AtomicInteger(0) + override def send( + record: ProducerRecord[K, V], + callback: Callback + ): JFuture[RecordMetadata] = { + _history += record + val sendIndex = currentSendIndex.getAndIncrement() + if (sendIndex >= sendExpectations.size) + throw new AssertionError(s"No mock behavior defined for send $sendIndex") + val sendExpectation = sendExpectations(sendIndex) + Unsafe.unsafe { implicit u => + runtime.unsafe.run { + for { + _ <- sendExpectation.startPromise.await + _ <- sendExpectation.callbackPromise.succeed(callback) + } yield () + } + .getOrThrowFiberFailure() + } + (sendExpectation.mockBehavior: @unchecked) match { + // return a dummy future, it is never completed + case _: SendSucceed => new CompletableFuture[RecordMetadata]() + case SendFail(e) => throw e + } + } + } + sei = sendExpectations.iterator + handleBehaviors = + ZIO.foreach(behaviors) { + case mb @ (SendSucceed(_) | SendFail(_)) => + for { + sendOperation <- fromOptionOrDie(sei.nextOption()) + _ <- sendOperation.startPromise.succeed(()) + _ <- ZIO + .raceFirst( + sendOperation.callbackPromise.await, + Seq(ZIO.logInfo(s"Still expecting mock behavior $mb").delay(3.seconds).forever) + ) + .timeoutFail(new AssertionError(s"Timed out waiting for mock behavior $mb"))(1.minute) + } yield () + case CallbackSucceed(n) => + for { + callbackPromise <- fromOptionOrDie(callbackPromiseForN(sendExpectations, n)) + callback <- callbackPromise.await + // return dummy metadata + metadata = new RecordMetadata(new TopicPartition("", 0), 0, 0, 0, 0, 0) + _ <- ZIO.attempt(callback.onCompletion(metadata, null)) + } yield () + case CallbackFail(n, e) => + for { + callbackPromise <- fromOptionOrDie(callbackPromiseForN(sendExpectations, n)) + callback <- callbackPromise.await + _ <- ZIO.attempt(callback.onCompletion(null, e)) + } yield () + } + result <- handleBehaviors &> testCode(mockProducer) + } yield result + } + + override def history: Chunk[ProducerRecord[K, V]] = _history.result() + } +} + +/** + * A [[KafkaProducer]] that does not support any operation. + * + * @tparam K + * key type + * @tparam V + * value type + */ +class NotSupportedProducer[K, V] extends KafkaProducer[K, V] { + override def initTransactions(): Unit = throw new UnsupportedOperationException() + + override def beginTransaction(): Unit = throw new UnsupportedOperationException() + + override def sendOffsetsToTransaction( + offsets: JMap[TopicPartition, OffsetAndMetadata], + consumerGroupId: String + ): Unit = throw new UnsupportedOperationException() + + override def sendOffsetsToTransaction( + offsets: JMap[TopicPartition, OffsetAndMetadata], + groupMetadata: ConsumerGroupMetadata + ): Unit = throw new UnsupportedOperationException() + + override def commitTransaction(): Unit = throw new UnsupportedOperationException() + + override def abortTransaction(): Unit = throw new UnsupportedOperationException() + + override def send(record: ProducerRecord[K, V]): JFuture[RecordMetadata] = throw new UnsupportedOperationException() + + override def send(record: ProducerRecord[K, V], callback: Callback): JFuture[RecordMetadata] = + throw new UnsupportedOperationException() + + override def flush(): Unit = throw new UnsupportedOperationException() + + override def partitionsFor(topic: String): JList[PartitionInfo] = throw new UnsupportedOperationException() + + override def metrics(): JMap[MetricName, _ <: Metric] = throw new UnsupportedOperationException() + + override def clientInstanceId(timeout: Duration): Uuid = throw new UnsupportedOperationException() + + override def close(): Unit = throw new UnsupportedOperationException() + + override def close(timeout: Duration): Unit = throw new UnsupportedOperationException() +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala index 5fe6fa203..60a490219 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala @@ -1,221 +1,289 @@ package zio.kafka.producer -import org.apache.kafka.clients.producer -import org.apache.kafka.clients.producer.{ MockProducer, ProducerRecord, RecordMetadata } -import org.apache.kafka.common.KafkaException +import org.apache.kafka.clients.producer.{ Producer => KafkaProducer, ProducerRecord, RecordMetadata } import org.apache.kafka.common.errors.AuthenticationException -import org.apache.kafka.common.serialization.ByteArraySerializer import zio._ -import zio.test.TestAspect.withLiveClock +import zio.test.TestAspect.{ flaky, withLiveClock } import zio.test._ -import java.util.concurrent.Future -import java.util.concurrent.atomic.AtomicBoolean - object ProducerSpec extends ZIOSpecDefault { - - private object TestKeyValueSerializer extends ByteArraySerializer - - private class BinaryMockProducer(autoComplete: Boolean) - extends MockProducer[Array[Byte], Array[Byte]]( - autoComplete, - TestKeyValueSerializer, - TestKeyValueSerializer - ) { - - private val nextSendAllowed = new AtomicBoolean(autoComplete) - - override def send( - record: ProducerRecord[Array[Byte], Array[Byte]], - callback: producer.Callback - ): Future[RecordMetadata] = { - awaitSendAllowed() - val sendResult = super.send(record, callback) - nextSendAllowed.set(autoComplete) - - sendResult - } - - def allowNextSendAndAwaitSendCompletion(): Unit = { - allowNextSend() - awaitSendCompletion() - } - - def allowNextSend(): Unit = - nextSendAllowed.set(true) - - def awaitSendAllowed(): Unit = - awaitSendCondition(true) - - def awaitSendCompletion(): Unit = - awaitSendCondition(false) - - private def awaitSendCondition(expectedCondition: Boolean): Unit = { - var awaitingSendCondition = true - while (awaitingSendCondition) - awaitingSendCondition = expectedCondition != nextSendAllowed.get() - } - - } + private val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + private val testAuthenticationExceptionMessage = "test authentication exception" + private val authException = new AuthenticationException(testAuthenticationExceptionMessage) override def spec: Spec[TestEnvironment with Scope, Any] = suite("Producer")( suite("produceChunkAsyncWithFailures")( test("successfully produces chunk of records") { - withProducer() { (_, producer) => - val recordsToSend = Chunk( - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord() - ) - for { - results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten - } yield assertTrue( + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .callbackSucceed() + .callbackSucceed() + .callbackSucceed() + .callbackSucceed() + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend) + } yield { + val history = mockBehavior.history + assertTrue( results.length == recordsToSend.length, - results.forall(_.isRight) + results.forall(_.isRight), + history.size == 5 ) } }, - test("omits sending further records in chunk in case the first send call fails") { - withProducer() { (mockJavaProducer, producer) => - val recordsToSend = Chunk( - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord() + test("successfully produces chunk of records, with callbacks in an arbitrary order") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendSucceed() + .callbackSucceed(1) + .sendSucceed() + .callbackSucceed(0) + .sendSucceed() + .callbackSucceed(2) + .sendSucceed() + .callbackSucceed(4) + .callbackSucceed(3) + for { + results <- runTest(mockBehavior, recordsToSend) + } yield { + val history = mockBehavior.history + assertTrue( + results.length == recordsToSend.length, + results.forall(_.isRight), + history.size == 5 ) - val testAuthenticationExceptionMessage = "test authentication exception" - mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) - for { - results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten - } yield assertTrue( + } + }, + test("omits sending further records after failure from send") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendFail(authException) + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend) + } yield { + val history = mockBehavior.history + assertTrue( results.length == recordsToSend.length, - results.head.isLeft, - results.head.left.forall(_.getMessage == testAuthenticationExceptionMessage), - results.tail.forall(_ == Left(Producer.PublishOmittedException)) + results.head.isRight, + results(1).left.forall(_.getMessage == testAuthenticationExceptionMessage), + results.drop(2).forall(_ == Left(Producer.PublishOmittedException)), + history.size == 2 ) } }, - test("provides correct results in case last send call fails") { - withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => - val recordsToSend = Chunk( - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord() + test("omits sending further records after exception from callback") { + // This test is flaky because there is a race between the producer reading and the callback setting the + // `previousSendCallsSucceed` flag. Most often the producer is faster and is therefore already sending the + // next record. In this test we assume the producer is faster, hence we expect one more record after the + // callback failure. + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendSucceed() + .callbackFail(authException) + .sendSucceed() + .callbackSucceed() + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend) + } yield { + val history = mockBehavior.history + assertTrue( + results.length == recordsToSend.length, + results(0).left.forall(_.getMessage == testAuthenticationExceptionMessage), + results(1).isRight, + results(2).isRight, + results.drop(3).forall(_ == Left(Producer.PublishOmittedException)), + history.size == 3 ) - val testAuthenticationExceptionMessage = "test authentication exception" - val mockJavaProducerBehaviour = ZIO.succeed { - // Send calls behaviours - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) - mockJavaProducer.allowNextSend() - // Send callbacks behaviours - mockJavaProducer.completeNext() - mockJavaProducer.completeNext() - mockJavaProducer.completeNext() - mockJavaProducer.completeNext() - } - for { - _ <- mockJavaProducerBehaviour.forkScoped - results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten - } yield assertTrue( + } + } @@ flaky(3), + test("retries send after an AuthenticationException from send") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendFail(authException) // send fails immediately + .callbackSucceed() // send 0 ok + // Sending record 1 failed with AuthError, sending records 2, 3, and 4 was skipped + // All 4 are retried: + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .callbackSucceed() + .callbackSucceed() + .callbackSucceed() + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend, Schedule.recurs(1)) + } yield { + val history = mockBehavior.history + assertTrue( results.length == recordsToSend.length, - results.init.forall(_.isRight), - results.last.isLeft, - results.last.left.forall(_.getMessage == testAuthenticationExceptionMessage) + results.forall(_.isRight), + history.size == 6 ) } }, - test("omits sending further records in chunk and provides correct results in case middle send call fails") { - withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => - val recordsToSend = Chunk( - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord() - ) - val testAuthenticationExceptionMessage = "test authentication exception" - val mockJavaProducerBehaviour = ZIO.succeed { - // Send calls behaviours - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) - mockJavaProducer.allowNextSend() - // Send callbacks behaviours - mockJavaProducer.completeNext() - mockJavaProducer.completeNext() - } - for { - _ <- mockJavaProducerBehaviour.forkScoped - results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten - } yield assertTrue( + test("retries send after an AuthenticationException from callback") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .callbackSucceed() + .callbackFail(authException) + .callbackSucceed() + .callbackFail(authException) + .callbackSucceed() + // Sending record 0, 2 and 4 -> okay + // Sending record 1 and 3 failed with AuthError + // Only 1 and 3 are retried: + .sendSucceed() + .sendSucceed() + .callbackSucceed() + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend, Schedule.recurs(1)) + } yield { + val history = mockBehavior.history + assertTrue( results.length == recordsToSend.length, - results(0).isRight, - results(1).isRight, - results(2).left.forall(_.getMessage == testAuthenticationExceptionMessage), - results(3) == Left(Producer.PublishOmittedException), - results(4) == Left(Producer.PublishOmittedException) + results.forall(_.isRight), + history.size == 7 ) } }, - test( - "omits sending further records in chunk and provides correct results in case second publication to broker fails along with middle send call failure" - ) { - withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => - val recordsToSend = Chunk( - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord(), - makeProducerRecord() + test("does not retry send after another Exception from send") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendFail(new RuntimeException()) + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend, Schedule.recurs(1)) + } yield { + val history = mockBehavior.history + assertTrue( + results.length == recordsToSend.length, + results.head.isRight, + results.tail.forall(_.isLeft), + history.size == 2 ) - val testAuthenticationExceptionMessage = "test authentication exception" - val testKafkaExceptionMessage = "unexpected broker exception" - val mockJavaProducerBehaviour = ZIO.succeed { - // Send calls behaviours - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.allowNextSendAndAwaitSendCompletion() - mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) - mockJavaProducer.allowNextSend() - // Send callbacks behaviours - mockJavaProducer.completeNext() - mockJavaProducer.errorNext(new KafkaException(testKafkaExceptionMessage)) - } - for { - _ <- mockJavaProducerBehaviour.forkScoped - results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten - } yield assertTrue( + } + }, + test("does not retry send after another Exception from callback, even when there is also an AuthException") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .callbackSucceed() + .callbackFail(authException) + .callbackSucceed() + .callbackFail(new RuntimeException()) + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend, Schedule.recurs(1)) + } yield { + val history = mockBehavior.history + assertTrue( results.length == recordsToSend.length, results(0).isRight, results(1).isLeft, - results(1).left.forall(_.getMessage == testKafkaExceptionMessage), - results(2).left.forall(_.getMessage == testAuthenticationExceptionMessage), - results(3) == Left(Producer.PublishOmittedException), - results(4) == Left(Producer.PublishOmittedException) + results(2).isRight, + results(3).isLeft, + results(4).isRight, + history.size == 5 + ) + } + }, + test("does multiple send retries after an AuthenticationException") { + val mockBehavior = AsyncProducerTestSupport + .newMockBehavior[Array[Byte], Array[Byte]]() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .sendSucceed() + .callbackSucceed() + .callbackFail(authException) + .callbackSucceed() + .callbackFail(authException) + .callbackSucceed() + // Sending record 0, 2 and 4 -> okay + // Sending record 1 and 3 failed with AuthError + // Only 1 and 3 are retried: + .sendSucceed() + .sendSucceed() + .callbackFail(authException) + .callbackFail(authException) + // Retried again + .sendSucceed() + .sendSucceed() + .callbackFail(authException) + .callbackSucceed() + // Retried again + .sendSucceed() + .callbackSucceed() + for { + results <- runTest(mockBehavior, recordsToSend, Schedule.forever) + } yield { + val history = mockBehavior.history + assertTrue( + results.length == recordsToSend.length, + results.forall(_.isRight), + history.size == 10 ) } } ) ) @@ withLiveClock - private def withProducer(autoCompleteProducerRequests: Boolean = true)( - producerTest: (BinaryMockProducer, Producer) => ZIO[Scope, Throwable, TestResult] - ): ZIO[Scope, Throwable, TestResult] = + private def withProducer[A]( + mockJavaProducer: KafkaProducer[Array[Byte], Array[Byte]], + authErrorRetrySchedule: Schedule[Any, Throwable, Any] + )( + producerTest: Producer => ZIO[Scope, Throwable, A] + ): ZIO[Any, Throwable, A] = ZIO.scoped { - val mockJavaProducer = new BinaryMockProducer(autoCompleteProducerRequests) + val producerSettings = ProducerSettings() + .withAuthErrorRetrySchedule(authErrorRetrySchedule) Producer - .fromJavaProducer(mockJavaProducer, ProducerSettings()) - .flatMap(producerTest(mockJavaProducer, _)) + .fromJavaProducer(mockJavaProducer, producerSettings) + .flatMap(producerTest(_)) + } + + private def runTest( + mockBehavior: AsyncProducerTestSupport.AsyncProducerTestSupportBehavior[Array[Byte], Array[Byte]], + recordsToSend: Chunk[ProducerRecord[Array[Byte], Array[Byte]]], + authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.stop + ): ZIO[Any, Throwable, Chunk[Either[Throwable, RecordMetadata]]] = + mockBehavior.run { mockProducer => + withProducer(mockProducer, authErrorRetrySchedule) { producer => + producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } } private def makeProducerRecord( diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index bc77c6469..0da3def5a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -1,6 +1,7 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio._ @@ -15,16 +16,18 @@ import scala.util.control.{ NoStackTrace, NonFatal } trait Producer { /** - * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version - * that allows to avoid round-trip-time penalty for each record. + * Produces a single record and await broker acknowledgement. + * + * Use `produceAsync` to avoid the round-trip-time penalty for each record. */ def produce( record: ProducerRecord[Array[Byte], Array[Byte]] ): Task[RecordMetadata] /** - * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version - * that allows to avoid round-trip-time penalty for each record. + * Produces a single record and await broker acknowledgement. + * + * Use `produceAsync` to avoid the round-trip-time penalty for each record. */ def produce[R, K, V]( record: ProducerRecord[K, V], @@ -33,8 +36,9 @@ trait Producer { ): RIO[R, RecordMetadata] /** - * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version - * that allows to avoid round-trip-time penalty for each record. + * Produces a single record and await broker acknowledgement. + * + * Use `produceAsync` to avoid the round-trip-time penalty for each record. */ def produce[R, K, V]( topic: String, @@ -61,7 +65,7 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement. + * throughput. */ def produceAsync( record: ProducerRecord[Array[Byte], Array[Byte]] @@ -75,7 +79,7 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement. + * throughput. */ def produceAsync[R, K, V]( record: ProducerRecord[K, V], @@ -91,7 +95,7 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](topic*]] for version that awaits broker acknowledgement. + * throughput. */ def produceAsync[R, K, V]( topic: String, @@ -102,8 +106,9 @@ trait Producer { ): RIO[R, Task[RecordMetadata]] /** - * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time - * penalty for each chunk. + * Produces a chunk of records. + * + * Use `produceChunkAsync` to avoid the round-trip-time penalty for each record. * * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. @@ -113,8 +118,9 @@ trait Producer { ): Task[Chunk[RecordMetadata]] /** - * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time - * penalty for each chunk. + * Produces a chunk of records. + * + * Use `produceChunkAsync` to avoid the round-trip-time penalty for each record. * * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. @@ -241,10 +247,10 @@ object Producer { for { runtime <- ZIO.runtime[Any] sendQueue <- - Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( + Queue.bounded[(Chunk[ByteRecord], Chunk[Either[Throwable, RecordMetadata]] => UIO[Unit])]( settings.sendBufferSize ) - producer = new ProducerLive(javaProducer, runtime, sendQueue) + producer = new ProducerLive(settings, javaProducer, runtime, sendQueue) _ <- ZIO.blocking(producer.sendFromQueue).forkScoped } yield producer @@ -380,11 +386,16 @@ object Producer { } private[producer] final class ProducerLive( + settings: ProducerSettings, private[producer] val p: JProducer[Array[Byte], Array[Byte]], runtime: Runtime[Any], - sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] + sendQueue: Queue[(Chunk[ByteRecord], Chunk[Either[Throwable, RecordMetadata]] => UIO[Unit])] ) extends Producer { + private val leftPublishOmitted = Left(Producer.PublishOmittedException) + private val retryAfterAuthException: Throwable = + new RuntimeException("Authentication error, retry?") with NoStackTrace + override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] = produceAsync(record).flatten @@ -406,10 +417,36 @@ private[producer] final class ProducerLive( // noinspection YieldingZIOEffectInspection override def produceAsync(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[Task[RecordMetadata]] = - for { - done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] - _ <- sendQueue.offer((Chunk.single(record), done)) - } yield done.await.flatMap(result => ZIO.fromEither(result.head)) + ZIO.suspendSucceed { + def loop( + done: Promise[Throwable, RecordMetadata], + driver: Schedule.Driver[Any, Any, Throwable, Any] + ): ZIO[Any, Nothing, Any] = { + val continuation: Chunk[Either[Throwable, RecordMetadata]] => UIO[Unit] = { results => + // Since we're sending only 1 record, we know `results` has 1 item + results.head match { + case Right(recordMetaData) => + done.succeed(recordMetaData).unit + case Left(error @ (_: AuthorizationException | _: AuthenticationException)) => + driver + .next(retryAfterAuthException) + .foldZIO( + _ => done.fail(error).unit, // schedule say "no" + _ => loop(done, driver).unit // start retry + ) + case Left(error) => + done.fail(error).unit + } + } + + sendQueue.offer(Chunk.single(record) -> continuation) + } + + for { + done <- Promise.make[Throwable, RecordMetadata] + _ <- settings.authErrorRetrySchedule.driver.flatMap(d => loop(done, d)) + } yield done.await + } override def produceAsync[R, K, V]( record: ProducerRecord[K, V], @@ -464,10 +501,82 @@ private[producer] final class ProducerLive( ): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] = if (records.isEmpty) ZIO.succeed(ZIO.succeed(Chunk.empty)) else { - for { - done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] - _ <- sendQueue.offer((records, done)) - } yield done.await + val totalRecordCount = records.size + + lazy val finalResults: Array[Either[Throwable, RecordMetadata]] = + Array.fill(totalRecordCount)(leftPublishOmitted) + + def storeResults(recordIndices: Seq[Int], results: Chunk[Either[Throwable, RecordMetadata]]): Unit = + (recordIndices lazyZip results).foreach { case (index, result) => finalResults(index) = result } + + def complete(done: Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]]): UIO[Unit] = + done.succeed(Chunk.fromArray(finalResults)).unit + + ZIO.suspendSucceed { + def loop( + recordIndices: Seq[Int], + records: Chunk[ByteRecord], + done: Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]], + driver: Schedule.Driver[Any, Any, Throwable, Any] + ): ZIO[Any, Nothing, Any] = { + def retryFailedRecords(results: Chunk[Either[Throwable, RecordMetadata]]): UIO[Unit] = { + // Note that if we get here, all Left's can be retried. Also, we know there is at least 1 Left. + val toRetry: Seq[(RuntimeFlags, ByteRecord)] = + (recordIndices lazyZip records lazyZip results).flatMap { + case (_, _, Right(_)) => Seq.empty + case (i, record, Left(_)) => Seq((i, record)) + } + val (retryIndices, retryRecords) = toRetry.unzip + ZIO.logInfo( + s"Retrying publish ${retryRecords.size} (of ${records.size}) records after AuthorizationException/AuthenticationException" + ) *> + loop(retryIndices, Chunk.from(retryRecords), done, driver).unit + } + + val continuation: Chunk[Either[Throwable, RecordMetadata]] => UIO[Unit] = { results => + if (results.forall(_.isRight)) { + // All records were successfully published, we're done + if (recordIndices.size == totalRecordCount) { + // Optimization (not allocating `finalResults`) for when everything goes well first time + done.succeed(results).unit + } else { + // Copy results of this attempt to the final results + storeResults(recordIndices, results) + complete(done) + } + } else { + // Copy results of this attempt to the final results + storeResults(recordIndices, results) + // There are some failures, let's see if we can retry some records. + // We only retry after an auth error. Any other error and we give up. + val hasFatalError = results.exists { + case Right(_) | + Left(_: AuthorizationException | _: AuthenticationException | Producer.PublishOmittedException) => + false + case _ => true + } + if (hasFatalError) { + complete(done) + } else { + // Ask the schedule if we should retry + driver + .next(retryAfterAuthException) + .foldZIO( + _ => complete(done), // Schedule says no + _ => retryFailedRecords(results) + ) + } + } + } + + sendQueue.offer(records -> continuation) + } + + for { + done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] + _ <- settings.authErrorRetrySchedule.driver.flatMap(d => loop(records.indices, records, done, d)) + } yield done.await + } } override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] = @@ -484,7 +593,7 @@ private[producer] final class ProducerLive( val sendFromQueue: ZIO[Any, Nothing, Any] = ZStream .fromQueueWithShutdown(sendQueue) - .mapZIO { case (serializedRecords, done) => + .mapZIO { case (serializedRecords, continuation) => ZIO.succeed { val recordsLength = serializedRecords.length val sentRecordsCounter = new AtomicInteger(0) @@ -503,23 +612,29 @@ private[producer] final class ProducerLive( val sentResultsChunk = Chunk.fromArray(sentResults) Unsafe.unsafe { implicit u => - val _ = runtime.unsafe.run(done.succeed(sentResultsChunk)) + val _ = runtime.unsafe.run(continuation(sentResultsChunk)) } } } - var previousSendCallsSucceed = true + // Must be volatile so that reads in this thread always see the latest value, even when the callback sets it + // from another thread. + @volatile var previousSendCallsSucceed = true recordsIterator.foreach { case (record: ByteRecord, recordIndex: Int) => if (previousSendCallsSucceed) { try { val _ = p.send( record, - (metadata: RecordMetadata, err: Exception) => - insertSentResult( - recordIndex, - if (err eq null) Right(metadata) else Left(err) - ) + (metadata: RecordMetadata, err: Exception) => { + val sendResult = if (err eq null) { + Right(metadata) + } else { + previousSendCallsSucceed = false + Left(err) + } + insertSentResult(recordIndex, sendResult) + } ) } catch { case NonFatal(err) => diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala index 06f185bda..b0d1c27f7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala @@ -4,9 +4,20 @@ import org.apache.kafka.clients.producer.ProducerConfig import zio._ import zio.kafka.security.KafkaCredentialStore +/** + * Settings for the Producer. + * + * To stay source compatible with future releases, you are recommended to construct the settings as follows: + * {{{ + * ProducerSettings(bootstrapServers) + * .withCloseTimeout(30.seconds) + * .... etc. + * }}} + */ final case class ProducerSettings( closeTimeout: Duration = 30.seconds, sendBufferSize: Int = 4096, + authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.stop, properties: Map[String, AnyRef] = Map.empty ) { def driverSettings: Map[String, AnyRef] = properties @@ -32,7 +43,33 @@ final case class ProducerSettings( def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings = withProperties(credentialsStore.properties) - def withSendBufferSize(sendBufferSize: Int) = copy(sendBufferSize = sendBufferSize) + /** + * @param sendBufferSize + * The maximum number of record chunks that can queue up while waiting for the underlying producer to become + * available. + */ + def withSendBufferSize(sendBufferSize: Int): ProducerSettings = + copy(sendBufferSize = sendBufferSize) + + /** + * @param authErrorRetrySchedule + * The schedule at which the producer will retry producing, even when producing fails with an + * [[org.apache.kafka.common.errors.AuthorizationException]] or + * [[org.apache.kafka.common.errors.AuthenticationException]]. + * + * This setting helps with failed producing due to too slow authorization or authentication in the broker. + * + * For example, to retry 5 times, spaced by 500ms, you can set this to + * {{{Schedule.recurs(5) && Schedule.spaced(500.millis)}}} + * + * The default is `Schedule.stop` which is, to fail the producer on the first auth error. + * + * ⚠️ Retrying can cause records to be produced in a different order than the order in which they were given to + * zio-kafka. + */ + def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ProducerSettings = + copy(authErrorRetrySchedule = authErrorRetrySchedule) + } object ProducerSettings { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 1056bd29a..8bf5c7834 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -97,10 +97,10 @@ object TransactionalProducer { semaphore <- Semaphore.make(1) runtime <- ZIO.runtime[Any] sendQueue <- - Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( + Queue.bounded[(Chunk[ByteRecord], Chunk[Either[Throwable, RecordMetadata]] => UIO[Unit])]( settings.producerSettings.sendBufferSize ) - live = new ProducerLive(rawProducer, runtime, sendQueue) + live = new ProducerLive(settings.producerSettings, rawProducer, runtime, sendQueue) _ <- ZIO.blocking(live.sendFromQueue).forkScoped } yield new LiveTransactionalProducer(live, semaphore) } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala index 85b302553..cd36d609a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala @@ -13,11 +13,8 @@ object TransactionalProducerSettings { def apply(bootstrapServers: List[String], transactionalId: String): TransactionalProducerSettings = TransactionalProducerSettings( - ProducerSettings( - 30.seconds, - 4096, - Map(ProducerConfig.TRANSACTIONAL_ID_CONFIG -> transactionalId) - ).withBootstrapServers(bootstrapServers) + ProducerSettings(bootstrapServers) + .withProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) ) def apply( @@ -28,10 +25,10 @@ object TransactionalProducerSettings { sendBufferSize: Int ): TransactionalProducerSettings = TransactionalProducerSettings( - ProducerSettings( - closeTimeout, - sendBufferSize, - properties.updated(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) - ).withBootstrapServers(bootstrapServers) + ProducerSettings(bootstrapServers) + .withCloseTimeout(closeTimeout) + .withSendBufferSize(sendBufferSize) + .withProperties(properties) + .withProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) ) }