Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 25, 2023
1 parent 93e94e8 commit a6d8f27
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 50 deletions.
2 changes: 1 addition & 1 deletion zio-kafka-test/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<logger name="com.yammer.metrics" level="ERROR"/>
<logger name="kafka" level="ERROR" />

<root level="INFO">
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
67 changes: 36 additions & 31 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -877,11 +877,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
): ZIO[Scope & Kafka, Throwable, Unit] =
for {
consumedMessagesCounter <- Ref.make(0)
_ <- consumedMessagesCounter.get
.flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed"))
.repeat(Schedule.fixed(1.second))
.forkScoped
logProducedMessageCount =
consumedMessagesCounter.get.flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed"))
streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None)
batchCounter <- Ref.make(0)
transactionalConsumer <-
Consumer
.partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string)
Expand All @@ -892,33 +891,39 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned")
_ <- consumerCreated.succeed(())
partitionStreams = assignedPartitions.map(_._2)
s <- ZStream
.mergeAllUnbounded(64)(partitionStreams: _*)
.mapChunksZIO { records =>
ZIO.scoped {
for {
transaction <- transactionalProducer.createTransaction
_ <- ZIO.logDebug("==== Producing chunk batch ====")
_ <- transaction.produceChunkBatch(
records.map(r => new ProducerRecord(toTopic, r.key, r.value)),
Serde.string,
Serde.string,
OffsetBatch(records)
)
_ <- ZIO.logDebug("==== Done producing chunk batch ====")
_ <- consumedMessagesCounter.update(_ + records.size)
} yield Chunk.empty
}.uninterruptible
}
.runDrain
.ensuring {
for {
_ <- streamCompleteOnRebalanceRef.set(None)
_ <- p.succeed(())
c <- consumedMessagesCounter.get
_ <- ZIO.logDebug(s"Consumed $c messages")
} yield ()
}
s <-
ZStream
.mergeAllUnbounded(64)(partitionStreams: _*)
.mapChunksZIO { records =>
ZIO.scoped {
for {
batchCount <- batchCounter.getAndUpdate(_ + 1)
transaction <- transactionalProducer.createTransaction
_ <-
ZIO.logDebug(s"==== Producing chunk batch $batchCount - ${records.size} records ====")
_ <- transaction.produceChunkBatch(
records.map(r => new ProducerRecord(toTopic, r.key, r.value)),
Serde.string,
Serde.string,
OffsetBatch(records)
)
_ <- ZIO.logDebug(
s"==== Done producing chunk batch $batchCount - ${records.size} records ===="
)
_ <- consumedMessagesCounter.update(_ + records.size)
_ <- logProducedMessageCount
} yield Chunk.empty
}.uninterruptible
}
.runDrain
.ensuring {
for {
_ <- streamCompleteOnRebalanceRef.set(None)
_ <- p.succeed(())
c <- consumedMessagesCounter.get
_ <- ZIO.logDebug(s"Consumed $c messages")
} yield ()
}
} yield s
}
.runDrain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package zio.kafka.producer

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidGroupIdException
import org.apache.kafka.common.serialization.ByteArraySerializer
import zio.Cause.Fail
import zio._
import zio.kafka.consumer.{ Consumer, OffsetBatch }

import java.util
import scala.jdk.CollectionConverters._

trait TransactionalProducer {
Expand All @@ -25,27 +27,27 @@ object TransactionalProducer {
private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction())

private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Consumer, Throwable, Unit] = {
def sendOffsetsToTransaction: ZIO[Consumer, Throwable, Unit] =
for {
consumerGroupMetadata <-
ZIO.serviceWithZIO[Consumer](
_.consumerGroupMetadata.flatMap {
case Some(metadata) => ZIO.succeed(metadata)
case None =>
ZIO.fail(
new InvalidGroupIdException(
"To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration."
)
)
}
val sendOffsetsToTransaction: ZIO[Consumer, Throwable, Unit] = {
@inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] =
ZIO.fail(
new InvalidGroupIdException(
"To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration."
)
)

offsets = offsetBatch.offsets.map { case (topicPartition, offset) =>
topicPartition -> new OffsetAndMetadata(offset + 1)
}.asJava
ZIO.serviceWithZIO[Consumer](
_.consumerGroupMetadata.flatMap {
case None => invalidGroupIdException
case Some(metadata) =>
val offsets: util.Map[TopicPartition, OffsetAndMetadata] =
offsetBatch.offsets.map { case (topicPartition, offset) =>
topicPartition -> new OffsetAndMetadata(offset + 1)
}.asJava

_ <- ZIO.attemptBlocking(live.p.sendOffsetsToTransaction(offsets, consumerGroupMetadata))
} yield ()
ZIO.attemptBlocking(live.p.sendOffsetsToTransaction(offsets, metadata))
}
)
}

sendOffsetsToTransaction.when(offsetBatch.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction())
}
Expand Down

0 comments on commit a6d8f27

Please sign in to comment.