Skip to content

Commit

Permalink
Distribute records sequentially
Browse files Browse the repository at this point in the history
It is running on the runloops single thread executor anyway!
  • Loading branch information
erikvanoosten committed Dec 4, 2024
1 parent 29ff1c2 commit e4bbd3d
Showing 1 changed file with 1 addition and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[consumer] final class Runloop private (
for {
consumerGroupMetadata <- getConsumerGroupMetadataIfAny
_ <- ZIO
.foreachParDiscard(streams) { streamControl =>
.foreachDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
if (records.isEmpty) {
Expand All @@ -150,8 +150,6 @@ private[consumer] final class Runloop private (
streamControl.offerRecords(builder.result())
}
}
.fork
.onExecutor(topLevelExecutor)
} yield fulfillResult
}
}
Expand Down

0 comments on commit e4bbd3d

Please sign in to comment.