Skip to content

Commit

Permalink
Small improvements to the Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jul 10, 2024
1 parent c717359 commit ab193d6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 47 deletions.
85 changes: 41 additions & 44 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,12 @@ object Producer {
settings: ProducerSettings
): ZIO[Scope, Throwable, Producer] =
for {
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, runtime, sendQueue)
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
producer = new ProducerLive(javaProducer, sendQueue)
_ <- producer.sendFromQueue.forkScoped
} yield producer

/**
Expand Down Expand Up @@ -362,7 +361,6 @@ object Producer {

private[producer] final class ProducerLive(
private[producer] val p: JProducer[Array[Byte], Array[Byte]],
runtime: Runtime[Any],
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]
) extends Producer {

Expand Down Expand Up @@ -460,51 +458,52 @@ private[producer] final class ProducerLive(

/**
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost,
* therefore this stream is run on a the blocking thread pool
* therefore this stream is run on the blocking thread pool.
*/
val sendFromQueue: ZIO[Any, Nothing, Any] =
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
ZIO.succeed {
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) =>
Unsafe.unsafe { implicit u =>
exec {
if (err != null) res(idx) = Left(err)
else res(idx) = Right(metadata)

if (count.incrementAndGet == length) {
exec {
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure()
}
}
}
}
)
}
} catch {
case NonFatal(e) =>
Unsafe.unsafe { implicit u =>
exec {
runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure()
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
}

private def sendChunk(
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
ZIO
.async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback =>
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

// Since we might be sending to multiple partitions, the callbacks
// are _not_ necessarily called in order.
p.send(
rec,
(metadata: RecordMetadata, err: Exception) => {
res(idx) = Either.cond(err == null, metadata, err)

if (count.incrementAndGet == length) {
callback(ZIO.succeed(Chunk.fromArray(res)))
}
}
)
}
} catch {
case NonFatal(e) =>
callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
}
}
.runDrain

private def serialize[R, K, V](
r: ProducerRecord[K, V],
Expand All @@ -516,6 +515,4 @@ private[producer] final class ProducerLive(
value <- valueSerializer.serialize(r.topic, r.headers, r.value())
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers)

/** Used to prevent warnings about not using the result of an expression. */
@inline private def exec[A](f: => A): Unit = { val _ = f }
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,11 @@ object TransactionalProducer {
)(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie)
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live = new ProducerLive(rawProducer, runtime, sendQueue)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
live = new ProducerLive(rawProducer, sendQueue)
_ <- live.sendFromQueue.forkScoped
} yield new LiveTransactionalProducer(live, semaphore)
}

0 comments on commit ab193d6

Please sign in to comment.