Skip to content

Commit

Permalink
Merge pull request #4230 from armanbilge/issue/4229
Browse files Browse the repository at this point in the history
Introduce `PollResult`, `PollingSystem#processReadyEvents`
  • Loading branch information
djspiewak authored Jan 9, 2025
2 parents 187a74f + 7b4d9e7 commit 6b1a0bd
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,36 @@ abstract class PollingSystem {
def closePoller(poller: Poller): Unit

/**
* Blocks the thread until an event is polled, the timeout expires, or interrupted.
*
* @param poller
* the thread-local [[Poller]] used to poll events.
*
* @param nanos
* the maximum duration for which to block, where `nanos == -1` indicates to block
* indefinitely.
*
* @param reportFailure
* callback that handles any failures that occur during polling.
* @return
* whether any ready events were polled and should be handled with [[processReadyEvents]].
* If result is incomplete, then [[poll]] should be called again after
* [[processReadyEvents]].
*/
def poll(poller: Poller, nanos: Long): PollResult

/**
* Processes ready events e.g. collects their results and resumes the corresponding tasks.
*
* @param poller
* the thread-local [[Poller]] with ready events
*
* @return
* whether any events were polled. e.g. if the method returned due to timeout, this should
* be `false`.
* whether any of the ready events caused tasks to be rescheduled on the runtime
*/
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean
def processReadyEvents(poller: Poller): Boolean

/**
* @return
* whether poll should be called again (i.e., there are more events to be polled)
* whether [[poll]] should be called again (i.e., there are more events to be polled)
*/
def needsPoll(poller: Poller): Boolean

Expand Down Expand Up @@ -137,3 +148,23 @@ object PollingSystem {
type Poller = P
}
}

sealed abstract class PollResult
object PollResult {

/**
* Polled all of the available ready events.
*/
case object Complete extends PollResult

/**
* Polled some, but not all, of the available ready events. Poll should be called again to
* reap additional ready events.
*/
case object Incomplete extends PollResult

/**
* The poll was interrupted or timed out before any events became ready.
*/
case object Interrupted extends PollResult
}
91 changes: 49 additions & 42 deletions core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,60 +41,67 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
def closePoller(poller: Poller): Unit =
poller.selector.close()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = {
def poll(poller: Poller, nanos: Long): PollResult = {
val millis = if (nanos >= 0) nanos / 1000000 else -1
val selector = poller.selector

if (millis == 0) selector.selectNow()
else if (millis > 0) selector.select(millis)
else selector.select()

if (selector.isOpen()) { // closing selector interrupts select
var polled = false

val ready = selector.selectedKeys().iterator()
while (ready.hasNext()) {
val key = ready.next()
ready.remove()

var readyOps = 0
var error: Throwable = null
try {
readyOps = key.readyOps()
// reset interest in triggered ops
key.interestOps(key.interestOps() & ~readyOps)
} catch {
case ex if NonFatal(ex) =>
error = ex
readyOps = -1 // interest all waiters
}
// closing selector interrupts select
if (selector.isOpen() && !selector.selectedKeys().isEmpty())
PollResult.Complete
else
PollResult.Interrupted
}

val value = if (error ne null) Left(error) else Right(readyOps)

val callbacks = key.attachment().asInstanceOf[Callbacks]
val iter = callbacks.iterator()
while (iter.hasNext()) {
val node = iter.next()

if ((node.interest & readyOps) != 0) { // drop this node and execute callback
node.remove()
val cb = node.callback
if (cb != null) {
cb(value)
polled = true
if (error ne null) poller.countSucceededOperation(readyOps)
else poller.countErroredOperation(node.interest)
} else {
poller.countCanceledOperation(node.interest)
}
def processReadyEvents(poller: Poller): Boolean = {
val selector = poller.selector
var fibersRescheduled = false

val ready = selector.selectedKeys().iterator()
while (ready.hasNext()) {
val key = ready.next()
ready.remove()

var readyOps = 0
var error: Throwable = null
try {
readyOps = key.readyOps()
// reset interest in triggered ops
key.interestOps(key.interestOps() & ~readyOps)
} catch {
case ex if NonFatal(ex) =>
error = ex
readyOps = -1 // interest all waiters
}

val value = if (error ne null) Left(error) else Right(readyOps)

val callbacks = key.attachment().asInstanceOf[Callbacks]
val iter = callbacks.iterator()
while (iter.hasNext()) {
val node = iter.next()

if ((node.interest & readyOps) != 0) { // drop this node and execute callback
node.remove()
val cb = node.callback
if (cb != null) {
cb(value)
fibersRescheduled = true
if (error ne null) poller.countSucceededOperation(readyOps)
else poller.countErroredOperation(node.interest)
} else {
poller.countCanceledOperation(node.interest)
}
}

()
}

polled
} else false
()
}

fibersRescheduled
}

def needsPoll(poller: Poller): Boolean =
Expand Down
6 changes: 4 additions & 2 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ object SleepSystem extends PollingSystem {

def closePoller(Poller: Poller): Unit = ()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = {
def poll(poller: Poller, nanos: Long): PollResult = {
if (nanos < 0)
LockSupport.park()
else if (nanos > 0)
LockSupport.parkNanos(nanos)
else
()
false
PollResult.Interrupted
}

def processReadyEvents(poller: Poller): Boolean = false

def needsPoll(poller: Poller): Boolean = false

def interrupt(targetThread: Thread, targetPoller: Poller): Unit =
Expand Down
27 changes: 22 additions & 5 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ private[effect] final class WorkerThread[P <: AnyRef](
val self = this
random = ThreadLocalRandom.current()
val rnd = random
val reportFailure = pool.reportFailure(_)

/*
* A counter (modulo `ExternalQueueTicks`) which represents the
Expand Down Expand Up @@ -422,22 +421,34 @@ private[effect] final class WorkerThread[P <: AnyRef](
}
}

@tailrec
def drainReadyEvents(result: PollResult, acc: Boolean): Boolean =
if (result ne PollResult.Interrupted) {
val tasksScheduled = system.processReadyEvents(_poller) | acc
if (result eq PollResult.Complete) tasksScheduled
else drainReadyEvents(system.poll(_poller, 0), tasksScheduled)
} else {
acc
}

// returns true if polled event, false if unparked
def parkLoop(): Boolean = {
while (!done.get()) {
// Park the thread until further notice.
val start = System.nanoTime()
metrics.incrementPolledCount()
val polled = system.poll(_poller, -1, reportFailure)
val pollResult = system.poll(_poller, -1)
now = System.nanoTime() // update now
metrics.addIdleTime(now - start)

// the only way we can be interrupted here is if it happened *externally* (probably sbt)
if (isInterrupted()) {
pool.shutdown()
} else if (polled) {
} else if (pollResult ne PollResult.Interrupted) {
if (parked.getAndSet(false))
pool.doneSleeping()
// TODO, if no tasks scheduled could fastpath back to park?
val _ = drainReadyEvents(pollResult, false)
return true
} else if (!parked.get()) { // Spurious wakeup check.
return false
Expand All @@ -464,7 +475,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (nanos > 0L) {
val start = now
metrics.incrementPolledCount()
val polled = system.poll(_poller, nanos, reportFailure)
val pollResult = system.poll(_poller, nanos)
// we already parked and time passed, so update time again
// it doesn't matter if we timed out or were awakened, the update is free-ish
now = System.nanoTime()
Expand All @@ -475,11 +486,15 @@ private[effect] final class WorkerThread[P <: AnyRef](
false // we know `done` is `true`
} else {
// no matter why we woke up, there may be timers or events ready
val polled = pollResult ne PollResult.Interrupted
if (polled || (triggerTime - now <= 0)) {
// we timed out or polled an event
if (parked.getAndSet(false)) {
pool.doneSleeping()
}
if (polled) { // TODO, if no tasks scheduled and no timers could fastpath back to park?
val _ = drainReadyEvents(pollResult, false)
}
true
} else { // we were either awakened spuriously or intentionally
if (parked.get()) // awakened spuriously, re-check next sleeper
Expand Down Expand Up @@ -579,7 +594,9 @@ private[effect] final class WorkerThread[P <: AnyRef](
sleepers.packIfNeeded()
// give the polling system a chance to discover events
metrics.incrementPolledCount()
system.poll(_poller, 0, reportFailure)
if (system.needsPoll(_poller)) {
val _ = drainReadyEvents(system.poll(_poller, 0), false)
}

// Obtain a fiber or batch of fibers from the external queue.
val element = external.poll(rnd)
Expand Down
66 changes: 32 additions & 34 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import cats.syntax.all._

import org.typelevel.scalaccompat.annotation._

import scala.annotation.tailrec
import scala.scalanative.annotation.alwaysinline
import scala.scalanative.libc.errno._
import scala.scalanative.meta.LinktimeInfo
import scala.scalanative.posix.errno._
import scala.scalanative.posix.string._
import scala.scalanative.posix.unistd
import scala.scalanative.runtime._
import scala.scalanative.runtime.{Array => _, _}
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

Expand Down Expand Up @@ -60,9 +59,12 @@ object EpollSystem extends PollingSystem {

def closePoller(poller: Poller): Unit = poller.close()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean =
def poll(poller: Poller, nanos: Long): PollResult =
poller.poll(nanos)

def processReadyEvents(poller: Poller): Boolean =
poller.processReadyEvents()

def needsPoll(poller: Poller): Boolean = poller.needsPoll()

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()
Expand Down Expand Up @@ -182,44 +184,40 @@ object EpollSystem extends PollingSystem {
private[this] val handles: Set[PollHandle] =
Collections.newSetFromMap(new IdentityHashMap)

private[this] val eventsArray = new Array[Byte](sizeof[epoll_event].toInt * MaxEvents)
@inline private[this] def events = eventsArray.atUnsafe(0).asInstanceOf[Ptr[epoll_event]]
private[this] var readyEventCount: Int = 0

private[EpollSystem] def close(): Unit =
if (unistd.close(epfd) != 0)
throw new IOException(fromCString(strerror(errno)))

private[EpollSystem] def poll(timeout: Long): Boolean = {

val events = stackalloc[epoll_event](MaxEvents.toULong)
var polled = false

@tailrec
def processEvents(timeout: Int): Unit = {

val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout)

if (triggeredEvents >= 0) {
polled = true

var i = 0
while (i < triggeredEvents) {
val event = events + i.toLong
val handle = fromPtr(event.data)
handle.notify(event.events.toInt)
i += 1
}
} else if (errno != EINTR) { // spurious wake-up by signal
throw new IOException(fromCString(strerror(errno)))
}

if (triggeredEvents >= MaxEvents)
processEvents(0) // drain the ready list
else
()
}
private[EpollSystem] def poll(timeout: Long): PollResult = {

val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt
processEvents(timeoutMillis)
val rtn = epoll_wait(epfd, events, MaxEvents, timeoutMillis)
if (rtn >= 0) {
readyEventCount = rtn
if (rtn > 0) {
if (rtn < MaxEvents) PollResult.Complete else PollResult.Incomplete
} else PollResult.Interrupted
} else if (errno == EINTR) { // spurious wake-up by signal
PollResult.Interrupted
} else {
throw new IOException(fromCString(strerror(errno)))
}
}

polled
private[EpollSystem] def processReadyEvents(): Boolean = {
var i = 0
while (i < readyEventCount) {
val event = events + i.toLong
val handle = fromPtr(event.data)
handle.notify(event.events.toInt)
i += 1
}
readyEventCount = 0
true
}

private[EpollSystem] def needsPoll(): Boolean = !handles.isEmpty()
Expand Down
Loading

0 comments on commit 6b1a0bd

Please sign in to comment.