From c39b18a5f43fcb09dd4011792cd02b9b420e4d89 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 3 Dec 2024 19:28:04 +0100 Subject: [PATCH] Fork offering results to stream --- .../zio/kafka/consumer/internal/Runloop.scala | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index b4ceddbb2..8255d6836 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -129,26 +129,28 @@ private[consumer] final class Runloop private ( else { for { consumerGroupMetadata <- getConsumerGroupMetadataIfAny - _ <- ZIO.foreachParDiscard(streams) { streamControl => - val tp = streamControl.tp - val records = polledRecords.records(tp) - if (records.isEmpty) { - streamControl.offerRecords(Chunk.empty) - } else { - val builder = ChunkBuilder.make[Record](records.size()) - val iterator = records.iterator() - while (iterator.hasNext) { - val consumerRecord = iterator.next() - builder += - CommittableRecord[Array[Byte], Array[Byte]]( - record = consumerRecord, - commitHandle = committer.commit, - consumerGroupMetadata = consumerGroupMetadata - ) + _ <- ZIO + .foreachParDiscard(streams) { streamControl => + val tp = streamControl.tp + val records = polledRecords.records(tp) + if (records.isEmpty) { + streamControl.offerRecords(Chunk.empty) + } else { + val builder = ChunkBuilder.make[Record](records.size()) + val iterator = records.iterator() + while (iterator.hasNext) { + val consumerRecord = iterator.next() + builder += + CommittableRecord[Array[Byte], Array[Byte]]( + record = consumerRecord, + commitHandle = committer.commit, + consumerGroupMetadata = consumerGroupMetadata + ) + } + streamControl.offerRecords(builder.result()) } - streamControl.offerRecords(builder.result()) } - } + .fork } yield fulfillResult } }