Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative producer implementation #1285

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 53 additions & 40 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zio.kafka.producer

import org.apache.kafka.clients.producer
import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio._
Expand All @@ -9,6 +11,7 @@ import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.BitSet
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -457,53 +460,63 @@ private[producer] final class ProducerLive(
override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap)

/**
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost,
* therefore this stream is run on the blocking thread pool.
* Currently sending has the following characteristics:
* - You can submit many chunks, they get buffered in the send queue.
* - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record
* was invoked).
* - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that
* they land in the same batch (which is good for compression).
* - Record ordering is retained and guaranteed between chunks.
* - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled
* (see https://kafka.apache.org/documentation/#producerconfigs_retries).
*/
val sendFromQueue: ZIO[Any, Nothing, Any] =
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
ZIO.runtime[Any].flatMap { runtime =>
// Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost,
// therefore this stream is run on the blocking thread pool.
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(runtime, serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
}
}

private def sendChunk(
runtime: Runtime[Any],
serializedRecords: Chunk[ByteRecord]
): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] =
ZIO
.async[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] { callback =>
try {
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()

// Since we might be sending to multiple partitions, the callbacks
// are _not_ necessarily called in order.
val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) => {
res(idx) = Either.cond(err == null, metadata, err)

if (count.incrementAndGet == length) {
callback(ZIO.succeed(Chunk.fromArray(res)))
}
}
)
}
} catch {
case NonFatal(e) =>
callback(ZIO.succeed(Chunk.fill(serializedRecords.size)(Left(e))))
}
}
for {
promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime))
results <- ZIO.foreach(promises)(_.await.either)
} yield results

private def sendRecord(
runtime: Runtime[Any]
)(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = {
def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = {
val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f))
}

for {
done <- Promise.make[Throwable, RecordMetadata]
_ <- ZIO
.attempt[Any] {
p.send(
record,
(metadata: RecordMetadata, err: Exception) =>
unsafeRun {
if (err == null) done.succeed(metadata)
else done.fail(err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we change this to done.fail((record,err)) we could implement a retry strategy in 494

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea. But are you sure you want to do the retries record by record?

Copy link

@domdorn domdorn Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is the only way to not send messages again that have been previously accepted, e.g. prevent double delivery?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other option is to do something after line 489. There you can records.zip(results) and filter those that failed. The failed records can then be retried together. The challenge is mostly to write elegant code that merges the original results with the retry results.

}
)
}
.catchAll(err => done.fail(err))
} yield done
}

private def serialize[R, K, V](
r: ProducerRecord[K, V],
Expand Down
Loading