Skip to content

Commit

Permalink
Use AtomicInteger instead of AtomicLong inside Producer.sendFromQueue…
Browse files Browse the repository at this point in the history
… implementation (#1315)

Minor change (alignment), in case we gonna keep current
`Producer.sendFromQueue` implementation
(#1311 (comment)),
otherwise, this PR can be closed.
  • Loading branch information
ytalashko authored Aug 26, 2024
1 parent 41d059d commit 0231c5d
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -460,7 +460,7 @@ private[producer] final class ProducerLive(

/**
* Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost,
* therefore this stream is run on a the blocking thread pool
* therefore this stream is run on the blocking thread pool
*/
val sendFromQueue: ZIO[Any, Nothing, Any] =
ZStream
Expand All @@ -471,8 +471,8 @@ private[producer] final class ProducerLive(
val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex
val res: Array[Either[Throwable, RecordMetadata]] =
new Array[Either[Throwable, RecordMetadata]](serializedRecords.length)
val count: AtomicLong = new AtomicLong
val length = serializedRecords.length
val count: AtomicInteger = new AtomicInteger
val length = serializedRecords.length

while (it.hasNext) {
val (rec, idx): (ByteRecord, Int) = it.next()
Expand Down

0 comments on commit 0231c5d

Please sign in to comment.