Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative producer implementation (again) #1311

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 62 additions & 51 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

trait Producer {

Expand Down Expand Up @@ -220,13 +218,12 @@ object Producer {
settings: ProducerSettings
): ZIO[Scope, Throwable, Producer] =
for {
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, runtime, sendQueue)
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
producer = new ProducerLive(javaProducer, sendQueue)
_ <- producer.sendFromQueue.forkScoped
} yield producer

/**
Expand Down Expand Up @@ -362,7 +359,6 @@ object Producer {

private[producer] final class ProducerLive(
private[producer] val p: JProducer[Array[Byte], Array[Byte]],
runtime: Runtime[Any],
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]
) extends Producer {

Expand Down Expand Up @@ -439,7 +435,6 @@ private[producer] final class ProducerLive(
.foreach(records)(serialize(_, keySerializer, valueSerializer))
.flatMap(produceChunkAsync)

// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] =
Expand All @@ -448,7 +443,9 @@ private[producer] final class ProducerLive(
for {
done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]]
_ <- sendQueue.offer((records, done))
} yield done.await
} yield
// noinspection YieldingZIOEffectInspection
done.await
}

override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] =
Expand All @@ -459,52 +456,68 @@ private[producer] final class ProducerLive(
override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap)

/**
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost,
* therefore this stream is run on the blocking thread pool
* Currently sending has the following characteristics:
* - You can submit many chunks, they get buffered in the send queue.
* - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer true, is it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote that before we learned better, it has never been true.

* was invoked).
* - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that
* they land in the same batch (which is good for compression).
* - Record ordering is retained and guaranteed between chunks.
* - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled
* (see https://kafka.apache.org/documentation/#producerconfigs_retries).
*/
val sendFromQueue: ZIO[Any, Nothing, Any] =
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
ZIO.succeed {
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicInteger = new AtomicInteger
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) =>
Unsafe.unsafe { implicit u =>
exec {
if (err != null) res(idx) = Left(err)
else res(idx) = Right(metadata)

if (count.incrementAndGet == length) {
exec {
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure()
}
}
}
}
)
Comment on lines -480 to -495
Copy link
Contributor

@ytalashko ytalashko Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @svroonland, @erikvanoosten, I think I found the 'cause of regression:
p.send( is a method for asynchronous record sending, e.g. it not blocks until the record is sent to the broker, it gives a callback for that (or returns java's future).
So, in the current implementation (in master branch, NOT in this PR) sendFromQueue stream is not actually awaits the chunk to be sent to the broker, but rather only dispatches chunk, and the actual awaiting is done by the caller of produce, produceChunk, ... ('cause done.succeed called in the p.send callback). Which is great, 'cause it handles parallel sending well.
The changes in this PR are making sendFromQueue stream to actually await the chunk to be sent to the broker, which breaks this ability to truly handle parallel sending, 'cause now sending is performed chunk by chunk.

This sendFromQueue implementation (from master branch) is really seem fast and readable (at least to me).
I would love if we could keep the internals of this sendFromQueue implementation, as it seem to be fast (again), which is super important for such piece of code (per me).
I mean, if there is a nice API for users, it is really don't matter if the internals are "ugly" or not, the performance & readability of those internals are of a higher value (per me).
Wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes perfect sense, we totally missed that.

Agreed that we we need to keep it fast or make it faster, we will definitely make sure that any changes we make to this piece of code do not affect the performance in a negative way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ytalashko. With that information a fix is easy. I'll push it shortly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: 9149479 🤞

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works!
The last 2 builds in this graph are from this PR. The last one has the fix, the one before does not.

afbeelding

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool :) What does it look like compared to 2.8.2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the graph above we see that this PR is a little bit slower than master. However, there is a lot of variance between runs so it is too early to tell whether this is a significant regression. But for sure, this PR is now much better than 2.8.1!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool :) What does it look like compared to 2.8.2?

2.8.2 is not in the graph, but it should be similar to the first 3 runs in the graph.

ZIO.runtime[Any].flatMap { runtime =>
// Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost,
// therefore this stream is run on the blocking thread pool.
ZIO.blocking {
ZIO.scoped {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(runtime, serializedRecords, done)
}
} catch {
case NonFatal(e) =>
Unsafe.unsafe { implicit u =>
exec {
runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure()
}
}
}
.runDrain
}
}
.runDrain
}

private def sendChunk(
runtime: Runtime[Any],
serializedRecords: Chunk[ByteRecord],
done: Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]]
): ZIO[Scope, Nothing, Unit] =
for {
promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime))
_ <- ZIO
.foreach(promises.reverse)(_.await.either)
.flatMap(results => done.succeed(results.reverse))
.forkScoped
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
} yield ()

private def sendRecord(
runtime: Runtime[Any]
)(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = {
def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = {
val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f))
}

for {
done <- Promise.make[Throwable, RecordMetadata]
_ <- ZIO
.attempt[Any] {
p.send(
record,
(metadata: RecordMetadata, err: Exception) =>
unsafeRun {
if (err == null) done.succeed(metadata)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. We'd still need an additional fiber though.

Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before.

This sounds really nice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).)

Maybe we can retry the send call errors (something mentioned in #1311 (comment)).

That way we also get accurate error reporting for each record.

Yeah, this is something missing right now in master, would be great to have it

continue calling send for the following records, even when there is an error.

This can lead to broken order of produced messages, maybe it is better not to produce new records from the chunk if some is failed, wdyt?

else done.fail(err)
}
)
}
.catchAll(err => done.fail(err))
} yield done
}

private def serialize[R, K, V](
r: ProducerRecord[K, V],
Expand All @@ -516,6 +529,4 @@ private[producer] final class ProducerLive(
value <- valueSerializer.serialize(r.topic, r.headers, r.value())
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers)

/** Used to prevent warnings about not using the result of an expression. */
@inline private def exec[A](f: => A): Unit = { val _ = f }
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,11 @@ object TransactionalProducer {
)(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie)
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live = new ProducerLive(rawProducer, runtime, sendQueue)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
live = new ProducerLive(rawProducer, sendQueue)
_ <- live.sendFromQueue.forkScoped
} yield new LiveTransactionalProducer(live, semaphore)
}
Loading