From 29db1816548bd9de950ecd5322c690dfb4fe9713 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 17 Jul 2024 21:17:56 +0200 Subject: [PATCH 1/2] Alternative producer implementation Refactoring of the producer so that it handles errors per record. --- .../scala/zio/kafka/producer/Producer.scala | 93 +++++++++++-------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 939f36d31..ce983f5fd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -1,6 +1,8 @@ package zio.kafka.producer +import org.apache.kafka.clients.producer import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio._ @@ -9,6 +11,7 @@ import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } import java.util.concurrent.atomic.AtomicLong +import scala.collection.immutable.BitSet import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal @@ -457,53 +460,63 @@ private[producer] final class ProducerLive( override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap) /** - * Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost, - * therefore this stream is run on the blocking thread pool. + * Currently sending has the following characteristics: + * - You can submit many chunks, they get buffered in the send queue. + * - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record + * was invoked). + * - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that + * they land in the same batch (which is good for compression). + * - Record ordering is retained and guaranteed between chunks. + * - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled + * (see https://kafka.apache.org/documentation/#producerconfigs_retries). */ val sendFromQueue: ZIO[Any, Nothing, Any] = - ZIO.blocking { - ZStream - .fromQueueWithShutdown(sendQueue) - .mapZIO { case (serializedRecords, done) => - sendChunk(serializedRecords) - .flatMap(done.succeed(_)) - } - .runDrain + ZIO.runtime[Any].flatMap { runtime => + // Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost, + // therefore this stream is run on the blocking thread pool. + ZIO.blocking { + ZStream + .fromQueueWithShutdown(sendQueue) + .mapZIO { case (serializedRecords, done) => + sendChunk(runtime, serializedRecords) + .flatMap(done.succeed(_)) + } + .runDrain + } } private def sendChunk( + runtime: Runtime[Any], serializedRecords: Chunk[ByteRecord] ): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] = - ZIO - .async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback => - try { - val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex - val res: Array[Either[Throwable, RecordMetadata]] = - new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) - val count: AtomicLong = new AtomicLong - val length = serializedRecords.length - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - // Since we might be sending to multiple partitions, the callbacks - // are _not_ necessarily called in order. - val _ = p.send( - rec, - (metadata: RecordMetadata, err: Exception) => { - res(idx) = Either.cond(err == null, metadata, err) - - if (count.incrementAndGet == length) { - callback(ZIO.succeed(Chunk.fromArray(res))) - } - } - ) - } - } catch { - case NonFatal(e) => - callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e)))) - } - } + for { + promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime)) + results <- ZIO.foreach(promises)(_.await.either) + } yield results + + private def sendRecord( + runtime: Runtime[Any] + )(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = { + def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = { + val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f)) + } + + for { + done <- Promise.make[Throwable, RecordMetadata] + _ <- ZIO + .attempt[Any] { + p.send( + record, + (metadata: RecordMetadata, err: Exception) => + unsafeRun { + if (err == null) done.succeed(metadata) + else done.fail(err) + } + ) + } + .catchAll(err => done.fail(err)) + } yield done + } private def serialize[R, K, V]( r: ProducerRecord[K, V], From f1d8ee36e86e1344bc28a7de11fe8a08a5f58671 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Thu, 18 Jul 2024 09:36:45 +0200 Subject: [PATCH 2/2] Ran `sbt fix fmt` --- zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index ce983f5fd..e4dc34496 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -1,8 +1,6 @@ package zio.kafka.producer -import org.apache.kafka.clients.producer import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } -import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio._ @@ -10,10 +8,7 @@ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } -import java.util.concurrent.atomic.AtomicLong -import scala.collection.immutable.BitSet import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal trait Producer {