Skip to content

Commit

Permalink
Pause a partition when its stream is ended
Browse files Browse the repository at this point in the history
During a graceful shutdown the partition stream is ended. The runloop however, is still happily fetching records for that stream. These are then put in the stream's queue even though they will never be read. This is taking away network and CPU at a moment where the application should focus on doing a clean shutdown quickly. It could even cause an OOM for application that are tuned for the case where processing happens almost immediately.

With this change, when a stream is ended:
- the partition is paused,
- already fetched new records are not added to the stream's queue.

Also: less logging at start of a poll.
  • Loading branch information
erikvanoosten committed Apr 13, 2024
1 parent 17f5b63 commit 1218204
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,20 @@ final class PartitionStreamControl private (

/** Offer new data for the stream to process. Should be called on every poll, also when `data.isEmpty` */
private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
if (data.isEmpty) {
queueInfoRef.update(_.withEmptyPoll)
} else {
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
}
ZIO
.whenZIO(isRunning) {
if (data.isEmpty) {
queueInfoRef.update(_.withEmptyPoll)
} else {
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
}
}
.unit

def queueSize: UIO[Int] = queueInfoRef.get.map(_.size)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,12 @@ private[consumer] final class Runloop private (

private def handlePoll(state: State): Task[State] = {
for {
partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams)
runningStreamsBeforePoll <- ZIO.filter(state.assignedStreams)(_.isRunning)
partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(runningStreamsBeforePoll)
_ <- ZIO.logDebug(
s"Starting poll with ${state.pendingRequests.size} pending requests and" +
s" ${state.pendingCommits.size} pending commits," +
s" resuming $partitionsToFetch partitions"
s" resuming ${partitionsToFetch.size} out of ${state.assignedStreams.size} partitions"
)
_ <- currentStateRef.set(state)
pollResult <-
Expand Down

0 comments on commit 1218204

Please sign in to comment.