Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small improvements to the Producer #1272

Merged
merged 3 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,12 @@ object Producer {
settings: ProducerSettings
): ZIO[Scope, Throwable, Producer] =
for {
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, runtime, sendQueue)
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
producer = new ProducerLive(javaProducer, sendQueue)
_ <- producer.sendFromQueue.forkScoped
} yield producer

/**
Expand Down Expand Up @@ -362,7 +361,6 @@ object Producer {

private[producer] final class ProducerLive(
private[producer] val p: JProducer[Array[Byte], Array[Byte]],
runtime: Runtime[Any],
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]
) extends Producer {

Expand Down Expand Up @@ -460,51 +458,52 @@ private[producer] final class ProducerLive(

/**
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost,
* therefore this stream is run on a the blocking thread pool
* therefore this stream is run on the blocking thread pool.
*/
val sendFromQueue: ZIO[Any, Nothing, Any] =
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
ZIO.succeed {
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) =>
Unsafe.unsafe { implicit u =>
exec {
if (err != null) res(idx) = Left(err)
else res(idx) = Right(metadata)

if (count.incrementAndGet == length) {
exec {
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure()
}
}
}
}
)
}
} catch {
case NonFatal(e) =>
Unsafe.unsafe { implicit u =>
exec {
runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure()
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
}

private def sendChunk(
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
ZIO
.async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback =>
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

// Since we might be sending to multiple partitions, the callbacks
// are _not_ necessarily called in order.
val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) => {
res(idx) = Either.cond(err == null, metadata, err)

if (count.incrementAndGet == length) {
callback(ZIO.succeed(Chunk.fromArray(res)))
}
}
)
}
} catch {
case NonFatal(e) =>
callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
}
}
.runDrain

private def serialize[R, K, V](
r: ProducerRecord[K, V],
Expand All @@ -516,6 +515,4 @@ private[producer] final class ProducerLive(
value <- valueSerializer.serialize(r.topic, r.headers, r.value())
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers)

/** Used to prevent warnings about not using the result of an expression. */
@inline private def exec[A](f: => A): Unit = { val _ = f }
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,11 @@ object TransactionalProducer {
)(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie)
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live = new ProducerLive(rawProducer, runtime, sendQueue)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
live = new ProducerLive(rawProducer, sendQueue)
_ <- live.sendFromQueue.forkScoped
} yield new LiveTransactionalProducer(live, semaphore)
}
Loading