Skip to content

Commit

Permalink
Merge pull request #4187 from iRevive/timer-heap-metrics
Browse files Browse the repository at this point in the history
Add `TimerHeapMetrics`
  • Loading branch information
djspiewak authored Dec 19, 2024
2 parents a917c50 + 5048008 commit aa5fbbd
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 30 deletions.
117 changes: 87 additions & 30 deletions core/jvm/src/main/scala/cats/effect/unsafe/TimerHeap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private final class TimerHeap extends AtomicInteger {

// And this is how many of those externally canceled nodes were already removed.
// We track this separately so we can increment on owner thread without overhead of the atomic.
private[this] var removedCanceledCounter = 0
private[this] var removedCanceledCounter: Int = 0

// The index 0 is not used; the root is at index 1.
// This is standard practice in binary heaps, to simplify arithmetics.
Expand All @@ -70,6 +70,13 @@ private final class TimerHeap extends AtomicInteger {

private[this] val RightUnit = Right(())

// metrics
private[this] var totalScheduled: Long = 0L
private[this] var totalExecuted: Long = 0L
private[this] var totalCanceled: Long = 0L

private def incrementTotalCanceled() = totalCanceled += 1

/**
* only called by owner thread
*/
Expand All @@ -81,8 +88,12 @@ private final class TimerHeap extends AtomicInteger {
if (root.isDeleted()) { // DOA. Remove it and loop.

removeAt(1)
if (root.isCanceled())
if (root.isCanceled()) {
removedCanceledCounter += 1
totalCanceled += 1
} else {
totalExecuted += 1
}

peekFirstTriggerTime() // loop

Expand Down Expand Up @@ -133,11 +144,20 @@ private final class TimerHeap extends AtomicInteger {
heap(size) = null
size -= 1

if (root.isCanceled())
if (root.isCanceled()) {
removedCanceledCounter += 1
totalCanceled += 1
} else {
totalExecuted += 1
}

val back = root.getAndClear()
if (rootExpired && (back ne null)) back else loop()
if (rootExpired && (back ne null)) {
totalExecuted += 1
back
} else {
loop()
}
} else null
} else null

Expand Down Expand Up @@ -178,34 +198,45 @@ private final class TimerHeap extends AtomicInteger {
delay: Long,
callback: Right[Nothing, Unit] => Unit,
out: Array[Right[Nothing, Unit] => Unit]
): Function0[Unit] with Runnable = if (size > 0) {
val heap = this.heap // local copy
val triggerTime = computeTriggerTime(now, delay)
): Function0[Unit] with Runnable = {
totalScheduled += 1

val root = heap(1)
val rootDeleted = root.isDeleted()
val rootExpired = !rootDeleted && isExpired(root, now)
if (rootDeleted || rootExpired) { // see if we can just replace the root
root.index = -1
if (root.isCanceled()) removedCanceledCounter += 1
if (rootExpired) out(0) = root.getAndClear()
val node = new Node(triggerTime, callback, 1)
heap(1) = node
fixDown(1)
node
} else { // insert at the end
val heap = growIfNeeded() // new heap array if it grew
if (size > 0) {
val heap = this.heap // local copy
val triggerTime = computeTriggerTime(now, delay)

val root = heap(1)
val rootDeleted = root.isDeleted()
val rootExpired = !rootDeleted && isExpired(root, now)
if (rootDeleted || rootExpired) { // see if we can just replace the root
root.index = -1
if (root.isCanceled()) {
removedCanceledCounter += 1
totalCanceled += 1
} else {
totalExecuted += 1
if (rootExpired) {
out(0) = root.getAndClear()
}
}
val node = new Node(triggerTime, callback, 1)
heap(1) = node
fixDown(1)
node
} else { // insert at the end
val heap = growIfNeeded() // new heap array if it grew
size += 1
val node = new Node(triggerTime, callback, size)
heap(size) = node
fixUp(size)
node
}
} else {
val node = new Node(now + delay, callback, 1)
this.heap(1) = node
size += 1
val node = new Node(triggerTime, callback, size)
heap(size) = node
fixUp(size)
node
}
} else {
val node = new Node(now + delay, callback, 1)
this.heap(1) = node
size += 1
node
}

/**
Expand Down Expand Up @@ -241,6 +272,29 @@ private final class TimerHeap extends AtomicInteger {
}
}

def totalTimersScheduled(): Long = totalScheduled

def totalTimersExecuted(): Long = totalExecuted

def totalTimersCanceled(): Long = totalCanceled

/**
* Returns the current number of the outstanding timers.
*/
def outstandingTimers(): Int = size

/**
* Returns the next due to fire.
*/
def nextTimerDue(): Option[Long] = {
val root = heap(1)
if (root ne null) {
val now = System.nanoTime()
val when = root.triggerTime - now
Some(when)
} else None
}

private[this] def pack(removeCount: Int): Unit = {
val heap = this.heap // local copy

Expand All @@ -254,6 +308,7 @@ private final class TimerHeap extends AtomicInteger {
if (heap(i).isCanceled()) {
removeAt(i)
r += 1
totalCanceled += 1
// Don't increment i, the new i may be canceled too.
} else {
i += 1
Expand Down Expand Up @@ -460,8 +515,10 @@ private final class TimerHeap extends AtomicInteger {
val worker = thread.asInstanceOf[WorkerThread[_]]
val heap = TimerHeap.this
if (worker.ownsTimers(heap)) {
// remove only if we are still in the heap
if (index >= 0) heap.removeAt(index)
if (index >= 0) { // remove only if we are still in the heap
heap.removeAt(index)
}
heap.incrementTotalCanceled()
} else { // otherwise this heap will need packing
// it is okay to increment more than once if invoked multiple times
// but it will undermine the packIfNeeded short-circuit optimization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ sealed trait WorkStealingPoolMetrics {
* The list of queue-specific metrics of the work-stealing thread pool.
*/
def localQueues: List[WorkStealingPoolMetrics.LocalQueueMetrics]

/**
* The list of timer-specific metrics of the work-stealing thread pool.
*/
def timers: List[WorkStealingPoolMetrics.TimerHeapMetrics]
}

object WorkStealingPoolMetrics {
Expand Down Expand Up @@ -156,6 +161,61 @@ object WorkStealingPoolMetrics {

}

sealed trait TimerHeapMetrics {

/**
* The index of the TimerHeap.
*/
def index: Int

/**
* The current number of the outstanding timers, that remain to be executed.
*
* @note
* the value may differ between invocations
*/
def timersOutstandingCount(): Int

/**
* The total number of the successfully executed timers.
*
* @note
* the value may differ between invocations
*/
def totalTimersExecutedCount(): Long

/**
* The total number of the scheduled timers.
*
* @note
* the value may differ between invocations
*/
def totalTimersScheduledCount(): Long

/**
* The total number of the canceled timers.
*
* @note
* the value may differ between invocations
*/
def totalTimersCanceledCount(): Long

/**
* Returns the time in nanoseconds till the next due to fire.
*
* The negative number could indicate that the worker thread is overwhelmed by
* (long-running) tasks and not able to check/trigger timers frequently enough. The
* indication is similar to the starvation checker.
*
* Returns `None` when there is no upcoming timer.
*
* @note
* the value may differ between invocations
*/
def nextTimerDue(): Option[Long]

}

private[metrics] def apply(ec: ExecutionContext): Option[WorkStealingPoolMetrics] =
ec match {
case wstp: WorkStealingThreadPool[_] =>
Expand All @@ -170,6 +230,11 @@ object WorkStealingPoolMetrics {
wstp.localQueues.toList.zipWithIndex.map {
case (queue, idx) => localQueueMetrics(queue, idx)
}

val timers: List[TimerHeapMetrics] =
wstp.sleepers.toList.zipWithIndex.map {
case (timerHeap, idx) => timerHeapMetrics(timerHeap, idx)
}
}

Some(metrics)
Expand Down Expand Up @@ -197,4 +262,14 @@ object WorkStealingPoolMetrics {
def successfulStealAttemptCount(): Long = queue.getSuccessfulStealAttemptCount()
def stolenFiberCount(): Long = queue.getStolenFiberCount()
}

private def timerHeapMetrics(timerHeap: TimerHeap, idx: Int): TimerHeapMetrics =
new TimerHeapMetrics {
def index: Int = idx
def nextTimerDue(): Option[Long] = timerHeap.nextTimerDue()
def timersOutstandingCount(): Int = timerHeap.outstandingTimers()
def totalTimersExecutedCount(): Long = timerHeap.totalTimersExecuted()
def totalTimersScheduledCount(): Long = timerHeap.totalTimersScheduled()
def totalTimersCanceledCount(): Long = timerHeap.totalTimersCanceled()
}
}

0 comments on commit aa5fbbd

Please sign in to comment.