Skip to content

Commit

Permalink
Fork offering results to stream
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Dec 3, 2024
1 parent e689977 commit c39b18a
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,28 @@ private[consumer] final class Runloop private (
else {
for {
consumerGroupMetadata <- getConsumerGroupMetadataIfAny
_ <- ZIO.foreachParDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
if (records.isEmpty) {
streamControl.offerRecords(Chunk.empty)
} else {
val builder = ChunkBuilder.make[Record](records.size())
val iterator = records.iterator()
while (iterator.hasNext) {
val consumerRecord = iterator.next()
builder +=
CommittableRecord[Array[Byte], Array[Byte]](
record = consumerRecord,
commitHandle = committer.commit,
consumerGroupMetadata = consumerGroupMetadata
)
_ <- ZIO
.foreachParDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
if (records.isEmpty) {
streamControl.offerRecords(Chunk.empty)
} else {
val builder = ChunkBuilder.make[Record](records.size())
val iterator = records.iterator()
while (iterator.hasNext) {
val consumerRecord = iterator.next()
builder +=
CommittableRecord[Array[Byte], Array[Byte]](
record = consumerRecord,
commitHandle = committer.commit,
consumerGroupMetadata = consumerGroupMetadata
)
}
streamControl.offerRecords(builder.result())
}
streamControl.offerRecords(builder.result())
}
}
.fork
} yield fulfillResult
}
}
Expand Down

0 comments on commit c39b18a

Please sign in to comment.