Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
durban committed Feb 17, 2024
1 parent e99a1c3 commit c7c297a
Show file tree
Hide file tree
Showing 32 changed files with 189 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[effect] sealed abstract class WorkStealingThreadPool[P] private ()
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
Expand Down
19 changes: 10 additions & 9 deletions core/js/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import scala.scalajs.js

import CallbackStack.Handle

private trait CallbackStack[A] extends js.Object
private trait CallbackStack[A, B] extends js.Object

private final class CallbackStackOps[A](private val callbacks: js.Array[A => Unit])
private final class CallbackStackOps[A, B](private val callbacks: js.Array[A => B])
extends AnyVal {

@inline def push(next: A => Unit): Handle[A] = {
@inline def push(next: A => B): Handle[A] = {
callbacks.push(next)
callbacks.length - 1
}

@inline def unsafeSetCallback(cb: A => Unit): Unit = {
@inline def unsafeSetCallback(cb: A => B): Unit = {
callbacks(callbacks.length - 1) = cb
}

Expand All @@ -42,7 +42,7 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni
callbacks
.asInstanceOf[js.Dynamic]
.reduceRight( // skips deleted indices, but there can still be nulls
(acc: Boolean, cb: A => Unit) =>
(acc: Boolean, cb: A => B) =>
if (cb ne null) { cb(oc); true }
else acc,
false)
Expand All @@ -66,11 +66,12 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni
}

private object CallbackStack {
@inline def of[A](cb: A => Unit): CallbackStack[A] =
js.Array(cb).asInstanceOf[CallbackStack[A]]

@inline implicit def ops[A](stack: CallbackStack[A]): CallbackStackOps[A] =
new CallbackStackOps(stack.asInstanceOf[js.Array[A => Unit]])
@inline def of[A, B](cb: A => B): CallbackStack[A, B] =
js.Array(cb).asInstanceOf[CallbackStack[A, B]]

@inline implicit def ops[A, B](stack: CallbackStack[A, B]): CallbackStackOps[A, B] =
new CallbackStackOps(stack.asInstanceOf[js.Array[A => B]])

type Handle[A] = Int
}
28 changes: 13 additions & 15 deletions core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import CallbackStack.Handle
import CallbackStack.Node
import Platform.static

private final class CallbackStack[A](private[this] var callback: A => Unit)
extends AtomicReference[Node[A]] {
private final class CallbackStack[A, B](private[this] var callback: A => B)
extends AtomicReference[Node[A, B]] {
head =>

private[this] val allowedToPack = new AtomicBoolean(true)
Expand All @@ -34,11 +34,11 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
* Pushes a callback to the top of the stack. Returns a handle that can be used with
* [[clearHandle]] to clear the callback.
*/
def push(cb: A => Unit): Handle[A] = {
def push(cb: A => B): Handle[A, B] = {
val newHead = new Node(cb)

@tailrec
def loop(): Handle[A] = {
def loop(): Handle[A, B] = {
val currentHead = head.get()
newHead.setNext(currentHead)

Expand All @@ -51,7 +51,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
loop()
}

def unsafeSetCallback(cb: A => Unit): Unit = {
def unsafeSetCallback(cb: A => B): Unit = {
callback = cb
}

Expand Down Expand Up @@ -148,19 +148,19 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
}

private object CallbackStack {
@static def of[A](cb: A => Unit): CallbackStack[A] =
@static def of[A](cb: A => B): CallbackStack[A] =
new CallbackStack(cb)

sealed abstract class Handle[A] {
sealed abstract class Handle[A, B] {
private[CallbackStack] def clear(): Unit
}

private[CallbackStack] final class Node[A](
private[this] var callback: A => Unit
) extends Handle[A] {
private[CallbackStack] final class Node[A, B](
private[this] var callback: A => B
) extends Handle[A, B] {
private[this] var next: Node[A] = _

def getCallback(): A => Unit = callback
def getCallback(): A => B = callback

def getNext(): Node[A] = next

Expand All @@ -176,7 +176,7 @@ private object CallbackStack {
* Packs this head node
*/
@tailrec
def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = {
def packHead(bound: Int, removed: Int, root: CallbackStack[A, B]): Int = {
val next = this.next // local copy

if (callback == null) {
Expand Down Expand Up @@ -216,7 +216,7 @@ private object CallbackStack {
* Packs this non-head node
*/
@tailrec
private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = {
private def packTail(bound: Int, removed: Int, prev: Node[A, B]): Int = {
val next = this.next // local copy

if (callback == null) {
Expand All @@ -243,7 +243,5 @@ private object CallbackStack {
}
}
}

override def toString(): String = s"Node($callback, $next)"
}
}
3 changes: 2 additions & 1 deletion core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
IO.async[Any] { nextCb =>
for {
done <- IO(new AtomicBoolean(false))
cb <- IO(new AtomicReference[Either[Throwable, Unit] => Unit](null))
cb <- IO(new AtomicReference[Either[Throwable, Unit] => Boolean](null))

canInterrupt <- IO(new juc.Semaphore(0))
manyDone <- IO(new AtomicBoolean(false))
Expand Down Expand Up @@ -100,6 +100,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
val cb0 = cb.getAndSet(null)
if (cb0 != null) {
cb0(RightUnit)
()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
}

cb(Right(node))
} catch { case ex if NonFatal(ex) => cb(Left(ex)) }
()
} catch { case ex if NonFatal(ex) => cb(Left(ex)); () }
}
}.map { node =>
Some {
Expand Down Expand Up @@ -163,7 +164,7 @@ object SelectorSystem {

private final class CallbackNode(
var interest: Int,
var callback: Either[Throwable, Int] => Unit,
var callback: Either[Throwable, Int] => Boolean,
var next: CallbackNode
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
*/

private[this] type Callback =
Right[Nothing, Unit] => Unit
Right[Nothing, Unit] => Boolean

/**
* Base nodes (which form the base list) store the payload.
Expand Down Expand Up @@ -137,7 +137,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
private[unsafe] final def insertTlr(
now: Long,
delay: Long,
callback: Right[Nothing, Unit] => Unit
callback: Right[Nothing, Unit] => Boolean
): Runnable = {
insert(now, delay, callback, ThreadLocalRandom.current())
}
Expand All @@ -159,7 +159,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
final def insert(
now: Long,
delay: Long,
callback: Right[Nothing, Unit] => Unit,
callback: Right[Nothing, Unit] => Boolean,
tlr: ThreadLocalRandom
): Function0[Unit] with Runnable = {
require(delay >= 0L)
Expand Down Expand Up @@ -198,7 +198,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
* @param now
* the current time as returned by `System.nanoTime`
*/
final def pollFirstIfTriggered(now: Long): Right[Nothing, Unit] => Unit = {
final def pollFirstIfTriggered(now: Long): Right[Nothing, Unit] => Boolean = {
doRemoveFirstNodeIfTriggered(now)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ private[effect] final class WorkStealingThreadPool[P](
*/
def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread[_]]) {
val worker = thread.asInstanceOf[WorkerThread[P]]
Expand All @@ -677,7 +677,7 @@ private[effect] final class WorkStealingThreadPool[P](
*/
private[this] final def sleepExternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
val random = ThreadLocalRandom.current()
val idx = random.nextInt(threadCount)
val tsl = sleepers(idx)
Expand All @@ -692,7 +692,7 @@ private[effect] final class WorkStealingThreadPool[P](
}

override def sleep(delay: FiniteDuration, task: Runnable): Runnable = {
sleepInternal(delay, _ => task.run())
sleepInternal(delay, { _ => task.run(); false })
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private final class WorkerThread[P](

def sleep(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
// take the opportunity to update the current time, just in case other timers can benefit
val _now = System.nanoTime()
now = _now
Expand Down
12 changes: 8 additions & 4 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ object EpollSystem extends PollingSystem {
) extends FileDescriptorPollHandle {

private[this] var readReadyCounter = 0
private[this] var readCallback: Either[Throwable, Int] => Unit = null
private[this] var readCallback: Either[Throwable, Int] => Boolean = null

private[this] var writeReadyCounter = 0
private[this] var writeCallback: Either[Throwable, Int] => Unit = null
private[this] var writeCallback: Either[Throwable, Int] => Boolean = null

def notify(events: Int): Unit = {
if ((events & EPOLLIN) != 0) {
Expand All @@ -112,7 +112,10 @@ object EpollSystem extends PollingSystem {
writeReadyCounter = counter
val cb = writeCallback
writeCallback = null
if (cb ne null) cb(Right(counter))
if (cb ne null) {
cb(Right(counter))
()
}
}
}

Expand Down Expand Up @@ -226,7 +229,7 @@ object EpollSystem extends PollingSystem {
reads: Boolean,
writes: Boolean,
handle: PollHandle,
cb: Either[Throwable, (PollHandle, IO[Unit])] => Unit
cb: Either[Throwable, (PollHandle, IO[Unit])] => Boolean
): Unit = {
val event = stackalloc[epoll_event]()
event.events =
Expand All @@ -247,6 +250,7 @@ object EpollSystem extends PollingSystem {
}

cb(result)
()
}

@alwaysinline private[this] def toPtr(handle: PollHandle): Ptr[Byte] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ object KqueueSystem extends PollingSystem {
access { kqueue =>
kqueue.evSet(fd, EVFILT_READ, EV_ADD.toUShort, kqcb)
cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_READ)))))
()
}
}

Expand All @@ -124,6 +125,7 @@ object KqueueSystem extends PollingSystem {
access { kqueue =>
kqueue.evSet(fd, EVFILT_WRITE, EV_ADD.toUShort, kqcb)
cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_WRITE)))))
()
}
}
}
Expand All @@ -140,13 +142,13 @@ object KqueueSystem extends PollingSystem {
changelistArray.atUnsafe(0).asInstanceOf[Ptr[kevent64_s]]
private[this] var changeCount = 0

private[this] val callbacks = new LongMap[Either[Throwable, Unit] => Unit]()
private[this] val callbacks = new LongMap[Either[Throwable, Unit] => Boolean]()

private[KqueueSystem] def evSet(
ident: Int,
filter: Short,
flags: CUnsignedShort,
cb: Either[Throwable, Unit] => Unit
cb: Either[Throwable, Unit] => Boolean
): Unit = {
val change = changelist + changeCount.toLong

Expand Down
16 changes: 8 additions & 8 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* The given function `k` will be invoked during evaluation of the `IO` to:
* - check if result is already available;
* - "schedule" the asynchronous callback, where the callback of type `Either[Throwable, A]
* \=> Unit` is the parameter passed to that function. Only the ''first'' invocation of
* \=> Boolean` is the parameter passed to that function. Only the ''first'' invocation of
* the callback will be effective! All subsequent invocations will be silently dropped.
*
* The process of registering the callback itself is suspended in `IO` (the outer `IO` of
Expand Down Expand Up @@ -1221,7 +1221,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* [[async]] for a simplified variant without an option for immediate result
*/
def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] = {
k: (Either[Throwable, A] => Boolean) => IO[Either[Option[IO[Unit]], A]]): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll =>
Expand All @@ -1241,7 +1241,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* Suspends an asynchronous side effect in `IO`.
*
* The given function `k` will be invoked during evaluation of the `IO` to "schedule" the
* asynchronous callback, where the callback of type `Either[Throwable, A] => Unit` is the
* asynchronous callback, where the callback of type `Either[Throwable, A] => Boolean` is the
* parameter passed to that function. Only the ''first'' invocation of the callback will be
* effective! All subsequent invocations will be silently dropped.
*
Expand Down Expand Up @@ -1280,7 +1280,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* [[asyncCheckAttempt]] for more generic version providing an optional immediate result of
* computation
*/
def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] = {
def async[A](k: (Either[Throwable, A] => Boolean) => IO[Option[IO[Unit]]]): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll =>
Expand Down Expand Up @@ -1333,7 +1333,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* [[asyncCheckAttempt]] for more generic version providing an optional immediate result of
* computation and a finalizer
*/
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
def async_[A](k: (Either[Throwable, A] => Boolean) => Unit): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable(_ => lift(IO.delay(k(resume))).flatMap(_ => get))
Expand Down Expand Up @@ -1808,13 +1808,13 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
private[this] final class IOAsync extends kernel.Async[IO] with StackSafeMonad[IO] {

override def asyncCheckAttempt[A](
k: (Either[Throwable, A] => Unit) => IO[Either[Option[IO[Unit]], A]]): IO[A] =
k: (Either[Throwable, A] => Boolean) => IO[Either[Option[IO[Unit]], A]]): IO[A] =
IO.asyncCheckAttempt(k)

override def async[A](k: (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]): IO[A] =
override def async[A](k: (Either[Throwable, A] => Boolean) => IO[Option[IO[Unit]]]): IO[A] =
IO.async(k)

override def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] =
override def async_[A](k: (Either[Throwable, A] => Boolean) => Unit): IO[A] =
IO.async_(k)

override def as[A, B](ioa: IO[A], b: B): IO[B] =
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/IODeferred.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private final class IODeferred[A] extends Deferred[IO, A] {
}

private[this] val cell = new AtomicReference(initial)
private[this] val callbacks = CallbackStack.of[Right[Nothing, A]](null)
private[this] val callbacks = CallbackStack.of[Right[Nothing, A], Boolean](null)
private[this] val clearCounter = new AtomicInteger

def complete(a: A): IO[Boolean] = IO {
Expand Down
Loading

0 comments on commit c7c297a

Please sign in to comment.