-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternative producer implementation (again) #1311
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,9 +8,7 @@ import zio.kafka.serde.Serializer | |
import zio.kafka.utils.SslHelper | ||
import zio.stream.{ ZPipeline, ZStream } | ||
|
||
import java.util.concurrent.atomic.AtomicInteger | ||
import scala.jdk.CollectionConverters._ | ||
import scala.util.control.NonFatal | ||
|
||
trait Producer { | ||
|
||
|
@@ -220,13 +218,12 @@ object Producer { | |
settings: ProducerSettings | ||
): ZIO[Scope, Throwable, Producer] = | ||
for { | ||
runtime <- ZIO.runtime[Any] | ||
sendQueue <- | ||
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( | ||
settings.sendBufferSize | ||
) | ||
producer = new ProducerLive(javaProducer, runtime, sendQueue) | ||
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped | ||
producer = new ProducerLive(javaProducer, sendQueue) | ||
_ <- producer.sendFromQueue.forkScoped | ||
} yield producer | ||
|
||
/** | ||
|
@@ -362,7 +359,6 @@ object Producer { | |
|
||
private[producer] final class ProducerLive( | ||
private[producer] val p: JProducer[Array[Byte], Array[Byte]], | ||
runtime: Runtime[Any], | ||
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] | ||
) extends Producer { | ||
|
||
|
@@ -439,7 +435,6 @@ private[producer] final class ProducerLive( | |
.foreach(records)(serialize(_, keySerializer, valueSerializer)) | ||
.flatMap(produceChunkAsync) | ||
|
||
// noinspection YieldingZIOEffectInspection | ||
override def produceChunkAsyncWithFailures( | ||
records: Chunk[ByteRecord] | ||
): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] = | ||
|
@@ -448,7 +443,9 @@ private[producer] final class ProducerLive( | |
for { | ||
done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]] | ||
_ <- sendQueue.offer((records, done)) | ||
} yield done.await | ||
} yield | ||
// noinspection YieldingZIOEffectInspection | ||
done.await | ||
} | ||
|
||
override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] = | ||
|
@@ -459,52 +456,68 @@ private[producer] final class ProducerLive( | |
override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap) | ||
|
||
/** | ||
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost, | ||
* therefore this stream is run on the blocking thread pool | ||
* Currently sending has the following characteristics: | ||
* - You can submit many chunks, they get buffered in the send queue. | ||
* - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record | ||
* was invoked). | ||
* - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that | ||
* they land in the same batch (which is good for compression). | ||
* - Record ordering is retained and guaranteed between chunks. | ||
* - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled | ||
* (see https://kafka.apache.org/documentation/#producerconfigs_retries). | ||
*/ | ||
val sendFromQueue: ZIO[Any, Nothing, Any] = | ||
ZStream | ||
.fromQueueWithShutdown(sendQueue) | ||
.mapZIO { case (serializedRecords, done) => | ||
ZIO.succeed { | ||
try { | ||
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex | ||
val res: Array[Either[Throwable, RecordMetadata]] = | ||
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length) | ||
val count: AtomicInteger = new AtomicInteger | ||
val length = serializedRecords.length | ||
|
||
while (it.hasNext) { | ||
val (rec, idx): (ByteRecord, Int) = it.next() | ||
|
||
val _ = p.send( | ||
rec, | ||
(metadata: RecordMetadata, err: Exception) => | ||
Unsafe.unsafe { implicit u => | ||
exec { | ||
if (err != null) res(idx) = Left(err) | ||
else res(idx) = Right(metadata) | ||
|
||
if (count.incrementAndGet == length) { | ||
exec { | ||
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure() | ||
} | ||
} | ||
} | ||
} | ||
) | ||
Comment on lines
-480
to
-495
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @svroonland, @erikvanoosten, I think I found the 'cause of regression: This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that makes perfect sense, we totally missed that. Agreed that we we need to keep it fast or make it faster, we will definitely make sure that any changes we make to this piece of code do not affect the performance in a negative way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @ytalashko. With that information a fix is easy. I'll push it shortly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix: 9149479 🤞 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool :) What does it look like compared to 2.8.2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the graph above we see that this PR is a little bit slower than master. However, there is a lot of variance between runs so it is too early to tell whether this is a significant regression. But for sure, this PR is now much better than 2.8.1! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
2.8.2 is not in the graph, but it should be similar to the first 3 runs in the graph. |
||
ZIO.runtime[Any].flatMap { runtime => | ||
// Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost, | ||
// therefore this stream is run on the blocking thread pool. | ||
ZIO.blocking { | ||
ZIO.scoped { | ||
ZStream | ||
.fromQueueWithShutdown(sendQueue) | ||
.mapZIO { case (serializedRecords, done) => | ||
sendChunk(runtime, serializedRecords, done) | ||
} | ||
} catch { | ||
case NonFatal(e) => | ||
Unsafe.unsafe { implicit u => | ||
exec { | ||
runtime.unsafe.run(done.succeed(Chunk.fill(serializedRecords.size)(Left(e)))).getOrThrowFiberFailure() | ||
} | ||
} | ||
} | ||
.runDrain | ||
} | ||
} | ||
.runDrain | ||
} | ||
|
||
private def sendChunk( | ||
runtime: Runtime[Any], | ||
serializedRecords: Chunk[ByteRecord], | ||
done: Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]] | ||
): ZIO[Scope, Nothing, Unit] = | ||
for { | ||
promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime)) | ||
_ <- ZIO | ||
.foreach(promises.reverse)(_.await.either) | ||
.flatMap(results => done.succeed(results.reverse)) | ||
.forkScoped | ||
erikvanoosten marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} yield () | ||
|
||
private def sendRecord( | ||
runtime: Runtime[Any] | ||
)(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = { | ||
def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = { | ||
val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f)) | ||
} | ||
|
||
for { | ||
done <- Promise.make[Throwable, RecordMetadata] | ||
_ <- ZIO | ||
.attempt[Any] { | ||
p.send( | ||
record, | ||
(metadata: RecordMetadata, err: Exception) => | ||
unsafeRun { | ||
if (err == null) done.succeed(metadata) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice idea. We'd still need an additional fiber though. Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This sounds really nice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Maybe we can retry the
Yeah, this is something missing right now in master, would be great to have it
This can lead to broken order of produced messages, maybe it is better not to produce new records from the chunk if some is failed, wdyt? |
||
else done.fail(err) | ||
} | ||
) | ||
} | ||
.catchAll(err => done.fail(err)) | ||
} yield done | ||
} | ||
|
||
private def serialize[R, K, V]( | ||
r: ProducerRecord[K, V], | ||
|
@@ -516,6 +529,4 @@ private[producer] final class ProducerLive( | |
value <- valueSerializer.serialize(r.topic, r.headers, r.value()) | ||
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) | ||
|
||
/** Used to prevent warnings about not using the result of an expression. */ | ||
@inline private def exec[A](f: => A): Unit = { val _ = f } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer true, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote that before we learned better, it has never been true.