Skip to content

Commit

Permalink
Make runloop command queue unbounded (#986)
Browse files Browse the repository at this point in the history
Runloop can get dead-locked when it is subscribed to a large amount of partitions.

- The Runloop command queue is a bounded queue, with a fixed size of 1024.
- The Runloop stream takes commands from the queue, process them and offer a new `Poll` command into the stream (when applicable).
- The `.offer` can suspend the fiber when the command queue is already full. this, in turn, makes the entire stream freeze. the underlying kafka consumer will eventually (after `max.poll.interval.ms`) be dropped from the group without any notification to the zstream.
- To fix this, we change the command queue from bounded to unbounded which will not suspend the fiber that adds the `Poll` command
  • Loading branch information
yaarix authored Jul 19, 2023
1 parent c9e37d2 commit 4a89a7a
Showing 1 changed file with 1 addition and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,6 @@ private[consumer] object Runloop {
) extends RebalanceEvent
}

// Internal parameters, should not be necessary to tune
private final val commandQueueSize: Int = 1024

def make(
hasGroupId: Boolean,
consumer: ConsumerAccess,
Expand All @@ -572,7 +569,7 @@ private[consumer] object Runloop {
): URIO[Scope, Runloop] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown)
commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown)
lastRebalanceEvent <- Ref.Synchronized.make[Option[Runloop.RebalanceEvent]](None)
initialState = State.initial
currentStateRef <- Ref.make(initialState)
Expand Down

0 comments on commit 4a89a7a

Please sign in to comment.