diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 4dcc0a519..5918e3a00 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -8,9 +8,7 @@ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } -import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal trait Producer { @@ -220,13 +218,12 @@ object Producer { settings: ProducerSettings ): ZIO[Scope, Throwable, Producer] = for { - runtime <- ZIO.runtime[Any] sendQueue <- Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.sendBufferSize ) - producer = new ProducerLive(javaProducer, runtime, sendQueue) - _ <- ZIO.blocking(producer.sendFromQueue).forkScoped + producer = new ProducerLive(javaProducer, sendQueue) + _ <- producer.sendFromQueue.forkScoped } yield producer /** @@ -362,7 +359,6 @@ object Producer { private[producer] final class ProducerLive( private[producer] val p: JProducer[Array[Byte], Array[Byte]], - runtime: Runtime[Any], sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] ) extends Producer { @@ -439,7 +435,6 @@ private[producer] final class ProducerLive( .foreach(records)(serialize(_, keySerializer, valueSerializer)) .flatMap(produceChunkAsync) - // noinspection YieldingZIOEffectInspection override def produceChunkAsyncWithFailures( records: Chunk[ByteRecord] ): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] = @@ -448,7 +443,9 @@ private[producer] final class ProducerLive( for { done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] _ <- sendQueue.offer((records, done)) - } yield done.await + } yield + // noinspection YieldingZIOEffectInspection + done.await } override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] = @@ -459,52 +456,63 @@ private[producer] final class ProducerLive( override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap) /** - * Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost, - * therefore this stream is run on the blocking thread pool + * Currently sending has the following characteristics: + * - You can submit many chunks, they get buffered in the send queue. + * - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record + * was invoked). + * - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that + * they land in the same batch (which is good for compression). + * - Record ordering is retained and guaranteed between chunks. + * - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled + * (see https://kafka.apache.org/documentation/#producerconfigs_retries). */ val sendFromQueue: ZIO[Any, Nothing, Any] = - ZStream - .fromQueueWithShutdown(sendQueue) - .mapZIO { case (serializedRecords, done) => - ZIO.succeed { - try { - val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex - val res: Array[Either[Throwable, RecordMetadata]] = - new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) - val count: AtomicInteger = new AtomicInteger - val length = serializedRecords.length - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - val _ = p.send( - rec, - (metadata: RecordMetadata, err: Exception) => - Unsafe.unsafe { implicit u => - exec { - if (err != null) res(idx) = Left(err) - else res(idx) = Right(metadata) - - if (count.incrementAndGet == length) { - exec { - runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure() - } - } - } - } - ) - } - } catch { - case NonFatal(e) => - Unsafe.unsafe { implicit u => - exec { - runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure() - } - } + ZIO.runtime[Any].flatMap { runtime => + // Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost, + // therefore this stream is run on the blocking thread pool. + ZIO.blocking { + ZStream + .fromQueueWithShutdown(sendQueue) + .mapZIO { case (serializedRecords, done) => + sendChunk(runtime, serializedRecords, done) } - } + .runDrain } - .runDrain + } + + private def sendChunk( + runtime: Runtime[Any], + serializedRecords: Chunk[ByteRecord], + done: Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]] + ): ZIO[Any, Nothing, Unit] = + for { + promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime)) + _ <- done.completeWith(ZIO.foreach(promises.reverse)(_.await.either).map(_.reverse)) + } yield () + + private def sendRecord( + runtime: Runtime[Any] + )(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = { + def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = { + val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f)) + } + + for { + done <- Promise.make[Throwable, RecordMetadata] + _ <- ZIO + .attempt[Any] { + p.send( + record, + (metadata: RecordMetadata, err: Exception) => + unsafeRun { + if (err == null) done.succeed(metadata) + else done.fail(err) + } + ) + } + .catchAll(err => done.fail(err)) + } yield done + } private def serialize[R, K, V]( r: ProducerRecord[K, V], @@ -516,6 +524,4 @@ private[producer] final class ProducerLive( value <- valueSerializer.serialize(r.topic, r.headers, r.value()) } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) - /** Used to prevent warnings about not using the result of an expression. */ - @inline private def exec[A](f: => A): Unit = { val _ = f } } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 173f43100..c27f90633 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -96,12 +96,11 @@ object TransactionalProducer { )(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie) _ <- ZIO.attemptBlocking(rawProducer.initTransactions()) semaphore <- Semaphore.make(1) - runtime <- ZIO.runtime[Any] sendQueue <- Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.producerSettings.sendBufferSize ) - live = new ProducerLive(rawProducer, runtime, sendQueue) - _ <- ZIO.blocking(live.sendFromQueue).forkScoped + live = new ProducerLive(rawProducer, sendQueue) + _ <- live.sendFromQueue.forkScoped } yield new LiveTransactionalProducer(live, semaphore) }