Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce PollResult, PollingSystem#processReadyEvents #4230

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,37 @@ abstract class PollingSystem {
def closePoller(poller: Poller): Unit

/**
* Blocks the thread until an event is polled, the timeout expires, or interrupted.
*
* @param poller
* the thread-local [[Poller]] used to poll events.
*
* @param nanos
* the maximum duration for which to block, where `nanos == -1` indicates to block
* indefinitely.
*
* @param reportFailure
* callback that handles any failures that occur during polling.
* @return
* whether any events are ready. e.g. if the method returned due to timeout, this should be
* `false`. If `true`, [[processReadyEvents]] must be called before calling any other method
* on this poller.
*/
def poll(poller: Poller, nanos: Long): Boolean

/**
* Processes ready events e.g. collects their results and resumes the corresponding tasks.
* This method should only be called after [[poll]] and only if it returned `true`.
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
*
* @param poller
* the thread-local [[Poller]] with ready events
*
* @return
* whether any events were polled. e.g. if the method returned due to timeout, this should
* be `false`.
* whether any of the ready events caused tasks to be rescheduled on the runtime
*/
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean
def processReadyEvents(poller: Poller): Boolean
djspiewak marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return
* whether poll should be called again (i.e., there are more events to be polled)
* whether [[poll]] should be called again (i.e., there are more events to be polled)
*/
def needsPoll(poller: Poller): Boolean

Expand Down
88 changes: 46 additions & 42 deletions core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,60 +41,64 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
def closePoller(poller: Poller): Unit =
poller.selector.close()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = {
def poll(poller: Poller, nanos: Long): Boolean = {
val millis = if (nanos >= 0) nanos / 1000000 else -1
val selector = poller.selector

if (millis == 0) selector.selectNow()
else if (millis > 0) selector.select(millis)
else selector.select()

if (selector.isOpen()) { // closing selector interrupts select
var polled = false

val ready = selector.selectedKeys().iterator()
while (ready.hasNext()) {
val key = ready.next()
ready.remove()

var readyOps = 0
var error: Throwable = null
try {
readyOps = key.readyOps()
// reset interest in triggered ops
key.interestOps(key.interestOps() & ~readyOps)
} catch {
case ex if NonFatal(ex) =>
error = ex
readyOps = -1 // interest all waiters
}
// closing selector interrupts select
selector.isOpen() && !selector.selectedKeys().isEmpty()
}

val value = if (error ne null) Left(error) else Right(readyOps)

val callbacks = key.attachment().asInstanceOf[Callbacks]
val iter = callbacks.iterator()
while (iter.hasNext()) {
val node = iter.next()

if ((node.interest & readyOps) != 0) { // drop this node and execute callback
node.remove()
val cb = node.callback
if (cb != null) {
cb(value)
polled = true
if (error ne null) poller.countSucceededOperation(readyOps)
else poller.countErroredOperation(node.interest)
} else {
poller.countCanceledOperation(node.interest)
}
def processReadyEvents(poller: Poller): Boolean = {
val selector = poller.selector
var fibersRescheduled = false

val ready = selector.selectedKeys().iterator()
while (ready.hasNext()) {
val key = ready.next()
ready.remove()

var readyOps = 0
var error: Throwable = null
try {
readyOps = key.readyOps()
// reset interest in triggered ops
key.interestOps(key.interestOps() & ~readyOps)
} catch {
case ex if NonFatal(ex) =>
error = ex
readyOps = -1 // interest all waiters
}

val value = if (error ne null) Left(error) else Right(readyOps)

val callbacks = key.attachment().asInstanceOf[Callbacks]
val iter = callbacks.iterator()
while (iter.hasNext()) {
val node = iter.next()

if ((node.interest & readyOps) != 0) { // drop this node and execute callback
node.remove()
val cb = node.callback
if (cb != null) {
cb(value)
fibersRescheduled = true
if (error ne null) poller.countSucceededOperation(readyOps)
else poller.countErroredOperation(node.interest)
} else {
poller.countCanceledOperation(node.interest)
}
}

()
}

polled
} else false
()
}

fibersRescheduled
}

def needsPoll(poller: Poller): Boolean =
Expand Down
4 changes: 3 additions & 1 deletion core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SleepSystem extends PollingSystem {

def closePoller(Poller: Poller): Unit = ()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = {
def poll(poller: Poller, nanos: Long): Boolean = {
if (nanos < 0)
LockSupport.park()
else if (nanos > 0)
Expand All @@ -44,6 +44,8 @@ object SleepSystem extends PollingSystem {
false
}

def processReadyEvents(poller: Poller): Boolean = false

def needsPoll(poller: Poller): Boolean = false

def interrupt(targetThread: Thread, targetPoller: Poller): Unit =
Expand Down
14 changes: 10 additions & 4 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ private[effect] final class WorkerThread[P <: AnyRef](
val self = this
random = ThreadLocalRandom.current()
val rnd = random
val reportFailure = pool.reportFailure(_)

/*
* A counter (modulo `ExternalQueueTicks`) which represents the
Expand Down Expand Up @@ -428,7 +427,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// Park the thread until further notice.
val start = System.nanoTime()
metrics.incrementPolledCount()
val polled = system.poll(_poller, -1, reportFailure)
val polled = system.poll(_poller, -1)
now = System.nanoTime() // update now
metrics.addIdleTime(now - start)

Expand All @@ -438,6 +437,8 @@ private[effect] final class WorkerThread[P <: AnyRef](
} else if (polled) {
if (parked.getAndSet(false))
pool.doneSleeping()
// TODO, if no tasks scheduled could fastpath back to park?
val _ = system.processReadyEvents(_poller)
return true
} else if (!parked.get()) { // Spurious wakeup check.
return false
Expand All @@ -464,7 +465,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (nanos > 0L) {
val start = now
metrics.incrementPolledCount()
val polled = system.poll(_poller, nanos, reportFailure)
val polled = system.poll(_poller, nanos)
// we already parked and time passed, so update time again
// it doesn't matter if we timed out or were awakened, the update is free-ish
now = System.nanoTime()
Expand All @@ -480,6 +481,9 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (parked.getAndSet(false)) {
pool.doneSleeping()
}
if (polled) { // TODO, if no tasks scheduled and no timers could fastpath back to park?
val _ = system.processReadyEvents(_poller)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as below.

}
true
} else { // we were either awakened spuriously or intentionally
if (parked.get()) // awakened spuriously, re-check next sleeper
Expand Down Expand Up @@ -579,7 +583,9 @@ private[effect] final class WorkerThread[P <: AnyRef](
sleepers.packIfNeeded()
// give the polling system a chance to discover events
metrics.incrementPolledCount()
system.poll(_poller, 0, reportFailure)
if (system.needsPoll(_poller) && system.poll(_poller, 0)) {
val _ = system.processReadyEvents(_poller)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
}

// Obtain a fiber or batch of fibers from the external queue.
val element = external.poll(rnd)
Expand Down
67 changes: 38 additions & 29 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.scalanative.meta.LinktimeInfo
import scala.scalanative.posix.errno._
import scala.scalanative.posix.string._
import scala.scalanative.posix.unistd
import scala.scalanative.runtime._
import scala.scalanative.runtime.{Array => _, _}
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

Expand Down Expand Up @@ -60,9 +60,12 @@ object EpollSystem extends PollingSystem {

def closePoller(poller: Poller): Unit = poller.close()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean =
def poll(poller: Poller, nanos: Long): Boolean =
poller.poll(nanos)

def processReadyEvents(poller: Poller): Boolean =
poller.processReadyEvents()

def needsPoll(poller: Poller): Boolean = poller.needsPoll()

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()
Expand Down Expand Up @@ -182,44 +185,50 @@ object EpollSystem extends PollingSystem {
private[this] val handles: Set[PollHandle] =
Collections.newSetFromMap(new IdentityHashMap)

private[this] val eventsArray = new Array[Byte](sizeof[epoll_event].toInt * MaxEvents)
@inline private[this] def events = eventsArray.atUnsafe(0).asInstanceOf[Ptr[epoll_event]]
private[this] var readyEventCount: Int = 0

private[EpollSystem] def close(): Unit =
if (unistd.close(epfd) != 0)
throw new IOException(fromCString(strerror(errno)))

private[EpollSystem] def poll(timeout: Long): Boolean = {

val events = stackalloc[epoll_event](MaxEvents.toULong)
var polled = false

@tailrec
def processEvents(timeout: Int): Unit = {

val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout)
val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt
val rtn = epoll_wait(epfd, events, MaxEvents, timeoutMillis)
if (rtn >= 0) {
readyEventCount = rtn
rtn > 0
} else if (errno == EINTR) { // spurious wake-up by signal
false
} else {
throw new IOException(fromCString(strerror(errno)))
}
}

if (triggeredEvents >= 0) {
polled = true
@tailrec
private[EpollSystem] def processReadyEvents(): Boolean = {
var i = 0
while (i < readyEventCount) {
val event = events + i.toLong
val handle = fromPtr(event.data)
handle.notify(event.events.toInt)
i += 1
}

var i = 0
while (i < triggeredEvents) {
val event = events + i.toLong
val handle = fromPtr(event.data)
handle.notify(event.events.toInt)
i += 1
}
} else if (errno != EINTR) { // spurious wake-up by signal
if (readyEventCount >= MaxEvents) { // drain the ready list
val rtn = epoll_wait(epfd, events, MaxEvents, 0)
if (rtn >= 0) {
readyEventCount = rtn
processReadyEvents()
} else {
throw new IOException(fromCString(strerror(errno)))
}

if (triggeredEvents >= MaxEvents)
processEvents(0) // drain the ready list
else
()
} else {
readyEventCount = 0
true
}

val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt
processEvents(timeoutMillis)

polled
}

private[EpollSystem] def needsPoll(): Boolean = !handles.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,11 @@ private[effect] final class EventLoopExecutorScheduler[P](
* the Scala Native global `ExecutionContext` which is currently hard-coded into every
* test framework, including MUnit, specs2, and Weaver.
*/
if (system.needsPoll(poller) || timeout != -1)
system.poll(poller, timeout, reportFailure)
else ()
if (system.needsPoll(poller) || timeout != -1) {
if (system.poll(poller, timeout)) {
val _ = system.processReadyEvents(poller)
}
}

continue = !executeQueue.isEmpty() || !sleepQueue.isEmpty() || system.needsPoll(poller)
}
Expand Down
Loading
Loading