diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 645f5f7ae..874fe8e3a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -220,13 +220,12 @@ object Producer { settings: ProducerSettings ): ZIO[Scope, Throwable, Producer] = for { - runtime <- ZIO.runtime[Any] sendQueue <- Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.sendBufferSize ) - producer = new ProducerLive(javaProducer, runtime, sendQueue) - _ <- ZIO.blocking(producer.sendFromQueue).forkScoped + producer = new ProducerLive(javaProducer, sendQueue) + _ <- producer.sendFromQueue.forkScoped } yield producer /** @@ -362,7 +361,6 @@ object Producer { private[producer] final class ProducerLive( private[producer] val p: JProducer[Array[Byte], Array[Byte]], - runtime: Runtime[Any], sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] ) extends Producer { @@ -460,51 +458,52 @@ private[producer] final class ProducerLive( /** * Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost, - * therefore this stream is run on a the blocking thread pool + * therefore this stream is run on the blocking thread pool. */ val sendFromQueue: ZIO[Any, Nothing, Any] = - ZStream - .fromQueueWithShutdown(sendQueue) - .mapZIO { case (serializedRecords, done) => - ZIO.succeed { - try { - val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex - val res: Array[Either[Throwable, RecordMetadata]] = - new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) - val count: AtomicLong = new AtomicLong - val length = serializedRecords.length - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - val _ = p.send( - rec, - (metadata: RecordMetadata, err: Exception) => - Unsafe.unsafe { implicit u => - exec { - if (err != null) res(idx) = Left(err) - else res(idx) = Right(metadata) - - if (count.incrementAndGet == length) { - exec { - runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure() - } - } - } - } - ) - } - } catch { - case NonFatal(e) => - Unsafe.unsafe { implicit u => - exec { - runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure() + ZIO.blocking { + ZStream + .fromQueueWithShutdown(sendQueue) + .mapZIO { case (serializedRecords, done) => + sendChunk(serializedRecords) + .flatMap(done.succeed(_)) + } + .runDrain + } + + private def sendChunk( + serializedRecords: Chunk[ByteRecord] + ): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] = + ZIO + .async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback => + try { + val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex + val res: Array[Either[Throwable, RecordMetadata]] = + new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) + val count: AtomicLong = new AtomicLong + val length = serializedRecords.length + + while (it.hasNext) { + val (rec, idx): (ByteRecord, Int) = it.next() + + // Since we might be sending to multiple partitions, the callbacks + // are _not_ necessarily called in order. + p.send( + rec, + (metadata: RecordMetadata, err: Exception) => { + res(idx) = Either.cond(err == null, metadata, err) + + if (count.incrementAndGet == length) { + callback(ZIO.succeed(Chunk.fromArray(res))) } } + ) } + } catch { + case NonFatal(e) => + callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e)))) } } - .runDrain private def serialize[R, K, V]( r: ProducerRecord[K, V], @@ -516,6 +515,4 @@ private[producer] final class ProducerLive( value <- valueSerializer.serialize(r.topic, r.headers, r.value()) } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) - /** Used to prevent warnings about not using the result of an expression. */ - @inline private def exec[A](f: => A): Unit = { val _ = f } } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 173f43100..c27f90633 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -96,12 +96,11 @@ object TransactionalProducer { )(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie) _ <- ZIO.attemptBlocking(rawProducer.initTransactions()) semaphore <- Semaphore.make(1) - runtime <- ZIO.runtime[Any] sendQueue <- Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.producerSettings.sendBufferSize ) - live = new ProducerLive(rawProducer, runtime, sendQueue) - _ <- ZIO.blocking(live.sendFromQueue).forkScoped + live = new ProducerLive(rawProducer, sendQueue) + _ <- live.sendFromQueue.forkScoped } yield new LiveTransactionalProducer(live, semaphore) }