From 6e2e87d57c7f7db7295dd7cdb5a8f2af9963c0a9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 9 Jan 2024 01:20:06 +0000 Subject: [PATCH 01/24] Add test for `CallbackStack#pack` race condition Co-authored-by: Matthias Ernst --- .../scala/cats/effect/CallbackStackSpec.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 806f2eb40b..c5afd5fa10 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -26,6 +26,31 @@ class CallbackStackSpec extends BaseSpec { pushed.clearCurrent(handle) stack.pack(1) must beEqualTo(1) } + + "handle race conditions in pack" in real { + IO { + val stack = CallbackStack[Unit](null) + locally { + val pushed = stack.push(_ => ()) + val handle = pushed.currentHandle() + pushed.clearCurrent(handle) + } + val clear = { + val pushed = stack.push(_ => ()) + val handle = pushed.currentHandle() + IO(pushed.clearCurrent(handle)) + } + (stack, clear) + }.flatMap { + case (stack, clear) => + val pack = IO(stack.pack(1)) + pack.both(clear *> pack).map { + case (x, y) => + (x + y) must beEqualTo(2) + } + }.replicateA_(1000) + .as(ok) + } } } From 632ca06617537ae6f0f96d9eb3fe5d1b52946679 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 9 Jan 2024 16:01:02 +0000 Subject: [PATCH 02/24] wip pack lock Co-authored-by: Sam Pillsworth Co-authored-by: Matthias Ernst --- .../scala/cats/effect/CallbackStack.scala | 99 +++++++++++++++++-- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 0a23745921..404e8ffd36 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -18,17 +18,94 @@ package cats.effect import scala.annotation.tailrec +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import CallbackStack.Node + +private final class CallbackStack[A](private[this] var callback: A => Unit) + extends AtomicReference[Node[A]] { + head => + + private[this] val allowedToPack = new AtomicBoolean(true) + + def push(cb: A => Unit): Node[A] = { + val newHead = new Node(cb) + + @tailrec + def loop(): CallbackStack[A] = { + val currentHead = head.get() + newHead.next = currentHead + + if (!head.compareAndSet(currentHead, newHead)) + loop() + else + newHead + } + + loop() + } + + def unsafeSetCallback(cb: A => Unit): Unit = { + callback = cb + } + + /** + * Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true + * iff *any* callbacks were invoked. + */ + def apply(a: A): Boolean = { + while (!allowedToPack.compareAndSet(true, false)) { + // spinloop + } + + val cb = callback + var invoked = if (cb != null) { + cb(a) + true + } else { + false + } + var currentNode = head.get() + + while (currentNode ne null) { + val cb = currentNode.getCallback() + if (cb != null) { + cb(a) + invoked = true + } + currentNode = currentNode.next + } + + invoked + } +} + +private object CallbackStack { + private[CallbackStack] final class Node[A]( + private[this] var callback: A => Unit, + ) { + var next: Node[A] = _ + + def getCallback(): A => Unit = callback + + def clear(): Unit = { + callback = null + } + } +} + private final class CallbackStack[A](private[this] var callback: A => Unit) extends AtomicReference[CallbackStack[A]] { + val allowedToPack = new AtomicBoolean(true) + def push(next: A => Unit): CallbackStack[A] = { val attempt = new CallbackStack(next) @tailrec def loop(): CallbackStack[A] = { - val cur = get() + val cur = head.get() attempt.lazySet(cur) if (!compareAndSet(cur, attempt)) @@ -106,14 +183,20 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) * (amortized). This still biases the optimizations towards the head of the list, but ensures * that packing will still inevitably reach all of the garbage cells. */ - def pack(bound: Int): Int = { - // the first cell is always retained - val got = get() - if (got ne null) - got.packInternal(bound, 0, this) - else + def pack(bound: Int): Int = + if (allowedToPack.compareAndSet(true, false)) { + // the first cell is always retained + val got = get() + val rtn = + if (got ne null) + got.packInternal(bound, 0, this) + else + 0 + allowedToPack.set(true) + rtn + } else { 0 - } + } @tailrec private def packInternal(bound: Int, removed: Int, parent: CallbackStack[A]): Int = { From dccd38098191c6a8f664ea46d9431b7bdc463ce9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 15:55:16 +0000 Subject: [PATCH 03/24] Implement `Fiber#join` via `asyncCheckAttempt` Co-authored-by: Sam Pillsworth --- .../src/main/scala/cats/effect/IOFiber.scala | 40 ++++++------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index f0c63cefeb..807a3a6b50 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -160,15 +160,21 @@ private final class IOFiber[A]( } /* this is swapped for an `IO.pure(outcome)` when we complete */ - private[this] var _join: IO[OutcomeIO[A]] = IO.async { cb => + private[this] var _join: IO[OutcomeIO[A]] = IO.asyncCheckAttempt { cb => IO { - val stack = registerListener(oc => cb(Right(oc))) + if (outcome == null) { + val back = callbacks.push(oc => cb(Right(oc))) - if (stack eq null) - Some(IO.unit) /* we were already invoked, so no `CallbackStack` needs to be managed */ - else { - val handle = stack.currentHandle() - Some(IO(stack.clearCurrent(handle))) + /* double-check */ + if (outcome != null) { + back.clearCurrent(back.currentHandle()) + Right(outcome) + } else { + val handle = back.currentHandle() + Left(Some(IO(back.clearCurrent(handle)))) + } + } else { + Right(outcome) } } } @@ -1168,26 +1174,6 @@ private final class IOFiber[A]( callbacks.unsafeSetCallback(cb) } - /* can return null, meaning that no CallbackStack needs to be later invalidated */ - private[this] def registerListener( - listener: OutcomeIO[A] => Unit): CallbackStack[OutcomeIO[A]] = { - if (outcome == null) { - val back = callbacks.push(listener) - - /* double-check */ - if (outcome != null) { - back.clearCurrent(back.currentHandle()) - listener(outcome) /* the implementation of async saves us from double-calls */ - null - } else { - back - } - } else { - listener(outcome) - null - } - } - @tailrec private[this] def succeeded(result: Any, depth: Int): IO[Any] = (ByteStack.pop(conts): @switch) match { From 64cf1b632f466abcb0be09869dca46cd8eac3cd3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:00:02 +0000 Subject: [PATCH 04/24] checkpoint pack lock Co-authored-by: Sam Pillsworth Co-authored-by: Matthias Ernst --- .../scala/cats/effect/CallbackStack.scala | 16 +- .../scala/cats/effect/CallbackStack.scala | 183 ++++++++---------- .../main/scala/cats/effect/IODeferred.scala | 7 +- .../src/main/scala/cats/effect/IOFiber.scala | 9 +- 4 files changed, 98 insertions(+), 117 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index faf744cbfb..a42042462c 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -18,14 +18,16 @@ package cats.effect import scala.scalajs.js +import CallbackStack.Handle + private trait CallbackStack[A] extends js.Object private final class CallbackStackOps[A](private val callbacks: js.Array[A => Unit]) extends AnyVal { - @inline def push(next: A => Unit): CallbackStack[A] = { + @inline def push(next: A => Unit): Handle[A] = { callbacks.push(next) - callbacks.asInstanceOf[CallbackStack[A]] + callbacks.length - 1 } @inline def unsafeSetCallback(cb: A => Unit): Unit = { @@ -36,25 +38,23 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni * Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true * iff *any* callbacks were invoked. */ - @inline def apply(oc: A, invoked: Boolean): Boolean = + @inline def apply(oc: A): Boolean = callbacks .asInstanceOf[js.Dynamic] .reduceRight( // skips deleted indices, but there can still be nulls (acc: Boolean, cb: A => Unit) => if (cb ne null) { cb(oc); true } else acc, - invoked) + false) .asInstanceOf[Boolean] /** * Removes the current callback from the queue. */ - @inline def clearCurrent(handle: Int): Unit = + @inline def clearHandle(handle: Handle[A]): Unit = // deleting an index from a js.Array makes it sparse (aka "holey"), so no memory leak js.special.delete(callbacks, handle) - @inline def currentHandle(): CallbackStack.Handle = callbacks.length - 1 - @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! @@ -68,5 +68,5 @@ private object CallbackStack { @inline implicit def ops[A](stack: CallbackStack[A]): CallbackStackOps[A] = new CallbackStackOps(stack.asInstanceOf[js.Array[A => Unit]]) - type Handle = Int + type Handle[A] = Int } diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 404e8ffd36..1c268b66c8 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -21,6 +21,7 @@ import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import CallbackStack.Handle import CallbackStack.Node private final class CallbackStack[A](private[this] var callback: A => Unit) @@ -29,11 +30,15 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) private[this] val allowedToPack = new AtomicBoolean(true) - def push(cb: A => Unit): Node[A] = { + /** + * Pushes a callback to the top of the stack. Returns a handle that can be used with + * [[clearHandle]] to clear the callback. + */ + def push(cb: A => Unit): Handle[A] = { val newHead = new Node(cb) @tailrec - def loop(): CallbackStack[A] = { + def loop(): Handle[A] = { val currentHead = head.get() newHead.next = currentHead @@ -55,9 +60,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) * iff *any* callbacks were invoked. */ def apply(a: A): Boolean = { - while (!allowedToPack.compareAndSet(true, false)) { - // spinloop - } + // TODO should we read allowedToPack for memory effect? val cb = callback var invoked = if (cb != null) { @@ -79,82 +82,22 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) invoked } -} - -private object CallbackStack { - private[CallbackStack] final class Node[A]( - private[this] var callback: A => Unit, - ) { - var next: Node[A] = _ - - def getCallback(): A => Unit = callback - - def clear(): Unit = { - callback = null - } - } -} - -private final class CallbackStack[A](private[this] var callback: A => Unit) - extends AtomicReference[CallbackStack[A]] { - - val allowedToPack = new AtomicBoolean(true) - - def push(next: A => Unit): CallbackStack[A] = { - val attempt = new CallbackStack(next) - - @tailrec - def loop(): CallbackStack[A] = { - val cur = head.get() - attempt.lazySet(cur) - - if (!compareAndSet(cur, attempt)) - loop() - else - attempt - } - - loop() - } - - def unsafeSetCallback(cb: A => Unit): Unit = { - callback = cb - } /** - * Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true - * iff *any* callbacks were invoked. + * Removes the callback referenced by a handle. */ - @tailrec - def apply(oc: A, invoked: Boolean): Boolean = { - val cb = callback - - val invoked2 = if (cb != null) { - cb(oc) - true - } else { - invoked - } - - val next = get() - if (next != null) - next(oc, invoked2) - else - invoked2 + def clearHandle(handle: CallbackStack.Handle[A]): Unit = { + handle.clear() } /** - * Removes the current callback from the queue. + * Nulls all references in this callback stack. */ - def clearCurrent(handle: CallbackStack.Handle): Unit = { - val _ = handle + def clear(): Unit = { callback = null + head.lazySet(null) } - def currentHandle(): CallbackStack.Handle = 0 - - def clear(): Unit = lazySet(null) - /** * It is intended that `bound` be tracked externally and incremented on each clear(). Whenever * pack is called, the number of empty cells removed from the stack is produced. It is @@ -185,11 +128,10 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) */ def pack(bound: Int): Int = if (allowedToPack.compareAndSet(true, false)) { - // the first cell is always retained - val got = get() + val got = head.get() val rtn = if (got ne null) - got.packInternal(bound, 0, this) + got.packHead(bound, this) else 0 allowedToPack.set(true) @@ -198,42 +140,83 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) 0 } - @tailrec - private def packInternal(bound: Int, removed: Int, parent: CallbackStack[A]): Int = { - if (callback == null) { - val child = get() +} + +private object CallbackStack { + def apply[A](callback: A => Unit): CallbackStack[A] = + new CallbackStack(callback) - // doing this cas here ultimately deoptimizes contiguous empty chunks - if (!parent.compareAndSet(this, child)) { - // if we're contending with another pack(), just bail and let them continue - removed + sealed abstract class Handle[A] { + private[CallbackStack] def clear(): Unit + } + + private[CallbackStack] final class Node[A]( + private[this] var callback: A => Unit + ) extends Handle[A] { + var next: Node[A] = _ + + def getCallback(): A => Unit = callback + + def clear(): Unit = { + callback = null + } + + @tailrec + def packHead(bound: Int, root: CallbackStack[A]): Int = { + val next = this.next // local copy + + if (callback == null) { + if (root.compareAndSet(this, next)) { + if (next == null) { + // bottomed out + 1 + } else { + // note this can cause the bound to go negative, which is fine + next.packTail(bound - 1, 1, this) + } + } else { // get the new top of the stack and start over + root.get().packHead(bound, root) + } } else { - if (child == null) { + if (next == null) { + // bottomed out + 0 + } else { + if (bound > 0) + next.packTail(bound - 1, 0, this) + else + 0 + } + } + } + + @tailrec + private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = { + val next = this.next // local copy + + if (callback == null) { + // We own the pack lock, so it is safe to write `next`. It will be published to subsequent packs via the lock. + // Concurrent readers ie `CallbackStack#apply` may read a stale value for `next` still pointing to this node. + // This is okay b/c the new `next` (the tail) is still reachable via the old `next` (this node). + prev.next = next + if (next == null) { // bottomed out removed + 1 } else { // note this can cause the bound to go negative, which is fine - child.packInternal(bound - 1, removed + 1, parent) + next.packTail(bound - 1, removed + 1, prev) } - } - } else { - val child = get() - if (child == null) { - // bottomed out - removed } else { - if (bound > 0) - child.packInternal(bound - 1, removed, this) - else + if (next == null) { + // bottomed out removed + } else { + if (bound > 0) + next.packTail(bound - 1, removed, this) + else + removed + } } } } } - -private object CallbackStack { - def apply[A](cb: A => Unit): CallbackStack[A] = - new CallbackStack(cb) - - type Handle = Byte -} diff --git a/core/shared/src/main/scala/cats/effect/IODeferred.scala b/core/shared/src/main/scala/cats/effect/IODeferred.scala index 217af8360a..d586807ef0 100644 --- a/core/shared/src/main/scala/cats/effect/IODeferred.scala +++ b/core/shared/src/main/scala/cats/effect/IODeferred.scala @@ -23,11 +23,10 @@ private final class IODeferred[A] extends Deferred[IO, A] { private[this] val initial: IO[A] = { val await = IO.asyncCheckAttempt[A] { cb => IO { - val stack = callbacks.push(cb) - val handle = stack.currentHandle() + val handle = callbacks.push(cb) def clear(): Unit = { - stack.clearCurrent(handle) + callbacks.clearHandle(handle) val clearCount = clearCounter.incrementAndGet() if ((clearCount & (clearCount - 1)) == 0) // power of 2 clearCounter.addAndGet(-callbacks.pack(clearCount)) @@ -59,7 +58,7 @@ private final class IODeferred[A] extends Deferred[IO, A] { def complete(a: A): IO[Boolean] = IO { if (cell.compareAndSet(initial, IO.pure(a))) { - val _ = callbacks(Right(a), false) + val _ = callbacks(Right(a)) callbacks.clear() // avoid leaks true } else { diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 807a3a6b50..465329dfee 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -163,15 +163,14 @@ private final class IOFiber[A]( private[this] var _join: IO[OutcomeIO[A]] = IO.asyncCheckAttempt { cb => IO { if (outcome == null) { - val back = callbacks.push(oc => cb(Right(oc))) + val handle = callbacks.push(oc => cb(Right(oc))) /* double-check */ if (outcome != null) { - back.clearCurrent(back.currentHandle()) + callbacks.clearHandle(handle) Right(outcome) } else { - val handle = back.currentHandle() - Left(Some(IO(back.clearCurrent(handle)))) + Left(Some(IO(callbacks.clearHandle(handle)))) } } else { Right(outcome) @@ -1061,7 +1060,7 @@ private final class IOFiber[A]( outcome = oc try { - if (!callbacks(oc, false) && runtime.config.reportUnhandledFiberErrors) { + if (!callbacks(oc) && runtime.config.reportUnhandledFiberErrors) { oc match { case Outcome.Errored(e) => currentCtx.reportFailure(e) case _ => () From 7f38b40710fa72214bc7b007a1ea2d75c3450934 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:34:16 +0000 Subject: [PATCH 05/24] Fix recursion when head is removed Co-authored-by: Sam Pillsworth --- .../scala/cats/effect/CallbackStack.scala | 10 ++++---- .../scala/cats/effect/CallbackStackSpec.scala | 25 ++++++++++--------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 1c268b66c8..b623e07bcb 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -131,7 +131,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) val got = head.get() val rtn = if (got ne null) - got.packHead(bound, this) + got.packHead(bound, 0, this) else 0 allowedToPack.set(true) @@ -162,20 +162,20 @@ private object CallbackStack { } @tailrec - def packHead(bound: Int, root: CallbackStack[A]): Int = { + def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = { val next = this.next // local copy if (callback == null) { if (root.compareAndSet(this, next)) { if (next == null) { // bottomed out - 1 + removed + 1 } else { // note this can cause the bound to go negative, which is fine - next.packTail(bound - 1, 1, this) + root.get().packHead(bound - 1, removed + 1, root) } } else { // get the new top of the stack and start over - root.get().packHead(bound, root) + root.get().packHead(bound, removed, root) } } else { if (next == null) { diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index c5afd5fa10..1f64b3b5b0 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -16,14 +16,15 @@ package cats.effect +import cats.syntax.all._ + class CallbackStackSpec extends BaseSpec { "CallbackStack" should { "correctly report the number removed" in { val stack = CallbackStack[Unit](null) - val pushed = stack.push(_ => ()) - val handle = pushed.currentHandle() - pushed.clearCurrent(handle) + val handle = stack.push(_ => ()) + stack.clearHandle(handle) stack.pack(1) must beEqualTo(1) } @@ -31,22 +32,22 @@ class CallbackStackSpec extends BaseSpec { IO { val stack = CallbackStack[Unit](null) locally { - val pushed = stack.push(_ => ()) - val handle = pushed.currentHandle() - pushed.clearCurrent(handle) + val handle = stack.push(_ => ()) + stack.clearHandle(handle) } val clear = { - val pushed = stack.push(_ => ()) - val handle = pushed.currentHandle() - IO(pushed.clearCurrent(handle)) + val handle = stack.push(_ => ()) + IO(stack.clearHandle(handle)) } (stack, clear) }.flatMap { case (stack, clear) => val pack = IO(stack.pack(1)) - pack.both(clear *> pack).map { - case (x, y) => - (x + y) must beEqualTo(2) + (pack.both(clear *> pack), pack).mapN { + case ((x, y), z) => + if ((x + y + z) != 2) + println((x, y, z)) + (x + y + z) must beEqualTo(2) } }.replicateA_(1000) .as(ok) From 1d8224f5c9228564cd11aaf9ad3ea21bb431240d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:48:09 +0000 Subject: [PATCH 06/24] If CASing head fails, pack tail anyway Co-authored-by: Sam Pillsworth --- .../src/main/scala/cats/effect/CallbackStack.scala | 6 +++--- .../src/test/scala/cats/effect/CallbackStackSpec.scala | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index b623e07bcb..422892a7db 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -172,10 +172,10 @@ private object CallbackStack { removed + 1 } else { // note this can cause the bound to go negative, which is fine - root.get().packHead(bound - 1, removed + 1, root) + next.packHead(bound - 1, removed + 1, root) } - } else { // get the new top of the stack and start over - root.get().packHead(bound, removed, root) + } else { // we were unable to remove ourselves, but we can still pack our tail + packTail(bound - 1, removed, this) } } else { if (next == null) { diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 1f64b3b5b0..ed7950ecbc 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -45,8 +45,6 @@ class CallbackStackSpec extends BaseSpec { val pack = IO(stack.pack(1)) (pack.both(clear *> pack), pack).mapN { case ((x, y), z) => - if ((x + y + z) != 2) - println((x, y, z)) (x + y + z) must beEqualTo(2) } }.replicateA_(1000) From ca8f1a32bea6072b1a5f3f3156314809ca741f8a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:55:52 +0000 Subject: [PATCH 07/24] More aggressive self-removal on failed CAS Co-authored-by: Sam Pillsworth --- .../main/scala/cats/effect/CallbackStack.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 422892a7db..cf26ebeee7 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -161,6 +161,9 @@ private object CallbackStack { callback = null } + /** + * Packs this head node + */ @tailrec def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = { val next = this.next // local copy @@ -174,8 +177,13 @@ private object CallbackStack { // note this can cause the bound to go negative, which is fine next.packHead(bound - 1, removed + 1, root) } - } else { // we were unable to remove ourselves, but we can still pack our tail - packTail(bound - 1, removed, this) + } else { + val prev = root.get() + if (prev.next eq this) { // prev is our new parent, we are its tail + this.packTail(bound, removed, prev) + } else { // we were unable to remove ourselves, but we can still pack our tail + next.packTail(bound - 1, removed, this) + } } } else { if (next == null) { @@ -190,6 +198,9 @@ private object CallbackStack { } } + /** + * Packs this non-head node + */ @tailrec private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = { val next = this.next // local copy From 1e54754b1398b32d5b34136a739a46fc19a252b1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 19:02:52 +0000 Subject: [PATCH 08/24] Fix NPE --- .../jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index cf26ebeee7..bfedce7eb5 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -181,8 +181,10 @@ private object CallbackStack { val prev = root.get() if (prev.next eq this) { // prev is our new parent, we are its tail this.packTail(bound, removed, prev) - } else { // we were unable to remove ourselves, but we can still pack our tail + } else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail next.packTail(bound - 1, removed, this) + } else { + removed } } } else { From ede463702cf5ad9e784ea64f2f1af70f820fbbc5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 19:47:40 +0000 Subject: [PATCH 09/24] Fix spec on JS, new race condition bug on JVM --- .../scala/cats/effect/CallbackStack.scala | 9 ++-- .../scala/cats/effect/CallbackStack.scala | 10 ++++- .../main/scala/cats/effect/IODeferred.scala | 12 +++--- .../src/main/scala/cats/effect/IOFiber.scala | 2 +- .../scala/cats/effect/CallbackStackSpec.scala | 42 +++++++++---------- 5 files changed, 41 insertions(+), 34 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index a42042462c..51b3cde4ca 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -49,16 +49,19 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni .asInstanceOf[Boolean] /** - * Removes the current callback from the queue. + * Removes the callback referenced by a handle. Returns `true` if the data structure was + * cleaned up immediately, `false` if a subsequent call to [[pack]] is required. */ - @inline def clearHandle(handle: Handle[A]): Unit = + @inline def clearHandle(handle: Handle[A]): Boolean = { // deleting an index from a js.Array makes it sparse (aka "holey"), so no memory leak js.special.delete(callbacks, handle) + true + } @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! - @inline def pack(bound: Int): Int = bound + @inline def pack(bound: Int): Int = 0 } private object CallbackStack { diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index bfedce7eb5..48d73ce291 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -84,10 +84,12 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) } /** - * Removes the callback referenced by a handle. + * Removes the callback referenced by a handle. Returns `true` if the data structure was + * cleaned up immediately, `false` if a subsequent call to [[pack]] is required. */ - def clearHandle(handle: CallbackStack.Handle[A]): Unit = { + def clearHandle(handle: CallbackStack.Handle[A]): Boolean = { handle.clear() + false } /** @@ -140,6 +142,8 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) 0 } + override def toString(): String = s"CallbackStack($callback, ${get()})" + } private object CallbackStack { @@ -231,5 +235,7 @@ private object CallbackStack { } } } + + override def toString(): String = s"Node($callback, $next)" } } diff --git a/core/shared/src/main/scala/cats/effect/IODeferred.scala b/core/shared/src/main/scala/cats/effect/IODeferred.scala index d586807ef0..33424e95bc 100644 --- a/core/shared/src/main/scala/cats/effect/IODeferred.scala +++ b/core/shared/src/main/scala/cats/effect/IODeferred.scala @@ -26,11 +26,13 @@ private final class IODeferred[A] extends Deferred[IO, A] { val handle = callbacks.push(cb) def clear(): Unit = { - callbacks.clearHandle(handle) - val clearCount = clearCounter.incrementAndGet() - if ((clearCount & (clearCount - 1)) == 0) // power of 2 - clearCounter.addAndGet(-callbacks.pack(clearCount)) - () + val removed = callbacks.clearHandle(handle) + if (!removed) { + val clearCount = clearCounter.incrementAndGet() + if ((clearCount & (clearCount - 1)) == 0) // power of 2 + clearCounter.addAndGet(-callbacks.pack(clearCount)) + () + } } val back = cell.get() diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 465329dfee..c7036009dd 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -170,7 +170,7 @@ private final class IOFiber[A]( callbacks.clearHandle(handle) Right(outcome) } else { - Left(Some(IO(callbacks.clearHandle(handle)))) + Left(Some(IO { callbacks.clearHandle(handle); () })) } } else { Right(outcome) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index ed7950ecbc..e28007f3a1 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -16,39 +16,35 @@ package cats.effect -import cats.syntax.all._ - class CallbackStackSpec extends BaseSpec { "CallbackStack" should { "correctly report the number removed" in { val stack = CallbackStack[Unit](null) val handle = stack.push(_ => ()) - stack.clearHandle(handle) + stack.push(_ => ()) + stack.clearHandle(handle) must beFalse stack.pack(1) must beEqualTo(1) } "handle race conditions in pack" in real { - IO { - val stack = CallbackStack[Unit](null) - locally { - val handle = stack.push(_ => ()) - stack.clearHandle(handle) - } - val clear = { - val handle = stack.push(_ => ()) - IO(stack.clearHandle(handle)) - } - (stack, clear) - }.flatMap { - case (stack, clear) => - val pack = IO(stack.pack(1)) - (pack.both(clear *> pack), pack).mapN { - case ((x, y), z) => - (x + y + z) must beEqualTo(2) - } - }.replicateA_(1000) - .as(ok) + + IO(CallbackStack[Unit](null)).flatMap { stack => + val pushClearPack = for { + handle <- IO(stack.push(_ => ())) + removed <- IO(stack.clearHandle(handle)) + packed <- IO(stack.pack(1)) + } yield (if (removed) 1 else 0) + packed + + pushClearPack + .both(pushClearPack) + .productL(IO(stack.toString).flatMap(IO.println)) + .product(IO(stack.pack(1))) + .debug() + .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } + .replicateA_(1000) + .as(ok) + } } } From 55ba5c6fdb7b50bbfedd2fad512d22cf7cb05bc9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 20:02:50 +0000 Subject: [PATCH 10/24] Actually fix spec on JS --- tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index e28007f3a1..0affa5271e 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -23,7 +23,7 @@ class CallbackStackSpec extends BaseSpec { val stack = CallbackStack[Unit](null) val handle = stack.push(_ => ()) stack.push(_ => ()) - stack.clearHandle(handle) must beFalse + stack.clearHandle(handle) stack.pack(1) must beEqualTo(1) } From e970713ede93ca4ca825e737985611e7403026ed Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:00:52 +0000 Subject: [PATCH 11/24] Passthrough `removed` --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 48d73ce291..b86115c768 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -197,7 +197,7 @@ private object CallbackStack { 0 } else { if (bound > 0) - next.packTail(bound - 1, 0, this) + next.packTail(bound - 1, removed, this) else 0 } From d448086b574d63e4ec385339f16216db100fcd0f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:06:16 +0000 Subject: [PATCH 12/24] Return `removed` more --- .../scala/cats/effect/CallbackStack.scala | 4 ++-- .../scala/cats/effect/CallbackStackSpec.scala | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index b86115c768..844faff696 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -194,12 +194,12 @@ private object CallbackStack { } else { if (next == null) { // bottomed out - 0 + removed } else { if (bound > 0) next.packTail(bound - 1, removed, this) else - 0 + removed } } } diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 0affa5271e..5de88b6875 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -30,18 +30,23 @@ class CallbackStackSpec extends BaseSpec { "handle race conditions in pack" in real { IO(CallbackStack[Unit](null)).flatMap { stack => - val pushClearPack = for { - handle <- IO(stack.push(_ => ())) + def pushClearPack(handle: CallbackStack.Handle[Unit]) = for { removed <- IO(stack.clearHandle(handle)) packed <- IO(stack.pack(1)) } yield (if (removed) 1 else 0) + packed - pushClearPack - .both(pushClearPack) - .productL(IO(stack.toString).flatMap(IO.println)) - .product(IO(stack.pack(1))) - .debug() - .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } + IO(stack.push(_ => ())) + .product(IO(stack.push(_ => ()))) + .flatMap { + case (handle1, handle2) => + // IO(stack.clearHandle(handle1)) *> + pushClearPack(handle1) + .both(pushClearPack(handle2)) + .productL(IO(stack.toString).flatMap(IO.println)) + .product(IO(stack.pack(1))) + .debug() + .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } + } .replicateA_(1000) .as(ok) } From 13115ff7e8bc486002f27d7e3a4a31859e3cbfcb Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:09:30 +0000 Subject: [PATCH 13/24] Tidy up the tests --- .../scala/cats/effect/CallbackStackSpec.scala | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 5de88b6875..24b8a8532d 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -23,30 +23,26 @@ class CallbackStackSpec extends BaseSpec { val stack = CallbackStack[Unit](null) val handle = stack.push(_ => ()) stack.push(_ => ()) - stack.clearHandle(handle) - stack.pack(1) must beEqualTo(1) + val removed = stack.clearHandle(handle) + if (removed) + stack.pack(1) must beEqualTo(0) + else + stack.pack(1) must beEqualTo(1) } "handle race conditions in pack" in real { IO(CallbackStack[Unit](null)).flatMap { stack => - def pushClearPack(handle: CallbackStack.Handle[Unit]) = for { + val pushClearPack = for { + handle <- IO(stack.push(_ => ())) removed <- IO(stack.clearHandle(handle)) packed <- IO(stack.pack(1)) } yield (if (removed) 1 else 0) + packed - IO(stack.push(_ => ())) - .product(IO(stack.push(_ => ()))) - .flatMap { - case (handle1, handle2) => - // IO(stack.clearHandle(handle1)) *> - pushClearPack(handle1) - .both(pushClearPack(handle2)) - .productL(IO(stack.toString).flatMap(IO.println)) - .product(IO(stack.pack(1))) - .debug() - .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } - } + pushClearPack + .both(pushClearPack) + .product(IO(stack.pack(1))) + .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } .replicateA_(1000) .as(ok) } From 7f168982480ff6cbb63ae9795997717ab12923a1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:12:34 +0000 Subject: [PATCH 14/24] Organize imports --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 844faff696..f56951f794 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -18,8 +18,7 @@ package cats.effect import scala.annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import CallbackStack.Handle import CallbackStack.Node From ea20d1d9a3bc77fa33be298908bfcee4981b9c4d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:17:10 +0000 Subject: [PATCH 15/24] Make `next` a `private[this]` --- .../main/scala/cats/effect/CallbackStack.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index f56951f794..c9499121ea 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -39,7 +39,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) @tailrec def loop(): Handle[A] = { val currentHead = head.get() - newHead.next = currentHead + newHead.setNext(currentHead) if (!head.compareAndSet(currentHead, newHead)) loop() @@ -76,7 +76,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) cb(a) invoked = true } - currentNode = currentNode.next + currentNode = currentNode.getNext() } invoked @@ -156,10 +156,16 @@ private object CallbackStack { private[CallbackStack] final class Node[A]( private[this] var callback: A => Unit ) extends Handle[A] { - var next: Node[A] = _ + private[this] var next: Node[A] = _ def getCallback(): A => Unit = callback + def getNext(): Node[A] = next + + def setNext(next: Node[A]): Unit = { + this.next = next + } + def clear(): Unit = { callback = null } @@ -182,7 +188,7 @@ private object CallbackStack { } } else { val prev = root.get() - if (prev.next eq this) { // prev is our new parent, we are its tail + if (prev.getNext() eq this) { // prev is our new parent, we are its tail this.packTail(bound, removed, prev) } else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail next.packTail(bound - 1, removed, this) @@ -213,8 +219,8 @@ private object CallbackStack { if (callback == null) { // We own the pack lock, so it is safe to write `next`. It will be published to subsequent packs via the lock. // Concurrent readers ie `CallbackStack#apply` may read a stale value for `next` still pointing to this node. - // This is okay b/c the new `next` (the tail) is still reachable via the old `next` (this node). - prev.next = next + // This is okay b/c the new `next` (this node's tail) is still reachable via the old `next` (this node). + prev.setNext(next) if (next == null) { // bottomed out removed + 1 From e9301e6c1d9fb484c54891282578eddcc52495a6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 22:41:29 +0000 Subject: [PATCH 16/24] Fix NPE, add test --- .../src/main/scala/cats/effect/CallbackStack.scala | 3 ++- .../src/test/scala/cats/effect/CallbackStackSpec.scala | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index c9499121ea..f28ebb1bcd 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -188,7 +188,8 @@ private object CallbackStack { } } else { val prev = root.get() - if (prev.getNext() eq this) { // prev is our new parent, we are its tail + if ((prev != null) && (prev.getNext() eq this)) { + // prev is our new parent, we are its tail this.packTail(bound, removed, prev) } else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail next.packTail(bound - 1, removed, this) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 24b8a8532d..54a8df1b99 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -47,6 +47,15 @@ class CallbackStackSpec extends BaseSpec { .as(ok) } } + + "pack runs concurrently with clear" in real { + IO { + val stack = CallbackStack[Unit](null) + val handle = stack.push(_ => ()) + stack.clearHandle(handle) + stack + }.flatMap(stack => IO(stack.pack(1)).both(IO(stack.clear()))).replicateA_(1000).as(ok) + } } } From 9c12425ab9f6de2c61926d93e633bd3ee6d84115 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 22:55:06 +0000 Subject: [PATCH 17/24] Hush MiMa --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 3b6d213387..0a36f1d147 100644 --- a/build.sbt +++ b/build.sbt @@ -658,7 +658,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) "cats.effect.unsafe.WorkerThread.sleep"), // #3787, internal utility that was no longer needed ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk"), - ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$"), + // #3943, refactored internal private CallbackStack data structure + ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions From 9121e45de02e92ca6e60804cfa880266d613d2cc Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 22:59:45 +0000 Subject: [PATCH 18/24] Fix unused warning --- core/js/src/main/scala/cats/effect/CallbackStack.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index 51b3cde4ca..4f74f1f859 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -61,7 +61,10 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! - @inline def pack(bound: Int): Int = 0 + @inline def pack(bound: Int): Int = { + val _ = bound + 0 + } } private object CallbackStack { From 4563927a9a2b975e42dba84a23b8ca137a9149ae Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 23:13:54 +0000 Subject: [PATCH 19/24] More hushing --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 0a36f1d147..d0f3bd561e 100644 --- a/build.sbt +++ b/build.sbt @@ -660,7 +660,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk"), ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$"), // #3943, refactored internal private CallbackStack data structure - ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "cats.effect.CallbackStack.currentHandle") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions From 82c686f13674e56e34ab3a09f49e37387d3d0f7f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 23:39:50 +0000 Subject: [PATCH 20/24] Workaround weird unused warnings --- core/js/src/main/scala/cats/effect/CallbackStack.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index 4f74f1f859..b76eee490f 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -61,10 +61,8 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! - @inline def pack(bound: Int): Int = { - val _ = bound - 0 - } + @inline def pack(bound: Int): Int = + bound - bound // aka 0, but so bound is not unused ... } private object CallbackStack { From 9e3579b7827b9570196c75345b36f542611a2da8 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 11 Jan 2024 00:00:50 +0000 Subject: [PATCH 21/24] Even more hushing --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index d0f3bd561e..c7cd2a5cc2 100644 --- a/build.sbt +++ b/build.sbt @@ -819,7 +819,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[IncompatibleTemplateDefProblem]("cats.effect.CallbackStack"), // introduced by #3642, which optimized the BatchingMacrotaskExecutor ProblemFilters.exclude[MissingClassProblem]( - "cats.effect.unsafe.BatchingMacrotaskExecutor$executeBatchTaskRunnable$") + "cats.effect.unsafe.BatchingMacrotaskExecutor$executeBatchTaskRunnable$"), + // #3943, refactored internal private CallbackStack data structure + ProblemFilters.exclude[Problem]("cats.effect.CallbackStackOps.*") ) }, mimaBinaryIssueFilters ++= { From 35aa9a1ee6998783fcae9825ad7b14b080f82977 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 14 Jan 2024 00:22:13 +0000 Subject: [PATCH 22/24] Increase concurrency in `CallbackStackSpec` --- .../scala/cats/effect/CallbackStackSpec.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 54a8df1b99..c65a6bf07e 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -16,7 +16,9 @@ package cats.effect -class CallbackStackSpec extends BaseSpec { +import cats.syntax.all._ + +class CallbackStackSpec extends BaseSpec with DetectPlatform { "CallbackStack" should { "correctly report the number removed" in { @@ -25,9 +27,9 @@ class CallbackStackSpec extends BaseSpec { stack.push(_ => ()) val removed = stack.clearHandle(handle) if (removed) - stack.pack(1) must beEqualTo(0) + stack.pack(1) mustEqual 0 else - stack.pack(1) must beEqualTo(1) + stack.pack(1) mustEqual 1 } "handle race conditions in pack" in real { @@ -40,10 +42,10 @@ class CallbackStackSpec extends BaseSpec { } yield (if (removed) 1 else 0) + packed pushClearPack - .both(pushClearPack) + .parReplicateA(3000) .product(IO(stack.pack(1))) - .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } - .replicateA_(1000) + .flatMap { case (xs, y) => IO((xs.sum + y) mustEqual 3000) } + .replicateA_(if (isJS || isNative) 1 else 1000) .as(ok) } } @@ -54,7 +56,7 @@ class CallbackStackSpec extends BaseSpec { val handle = stack.push(_ => ()) stack.clearHandle(handle) stack - }.flatMap(stack => IO(stack.pack(1)).both(IO(stack.clear()))).replicateA_(1000).as(ok) + }.flatMap(stack => IO(stack.pack(1)).both(IO(stack.clear()))).parReplicateA_(1000).as(ok) } } From a18138e359b12035388a5429ee0fda075bd6e3e5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 14 Jan 2024 00:25:46 +0000 Subject: [PATCH 23/24] Add comment/ref about data race in `CallbackStack#apply` --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index f28ebb1bcd..d59b7e30a3 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -59,7 +59,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) * iff *any* callbacks were invoked. */ def apply(a: A): Boolean = { - // TODO should we read allowedToPack for memory effect? + // see also note about data races in Node#packTail val cb = callback var invoked = if (cb != null) { From 5b3ad16a78b1fbb01b9effd0bf2c89be2168555e Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Jan 2024 00:37:51 +0000 Subject: [PATCH 24/24] Use `try`/`finally` to acquire/release pack lock --- .../src/main/scala/cats/effect/CallbackStack.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index d59b7e30a3..bdb01fd269 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -129,14 +129,15 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) */ def pack(bound: Int): Int = if (allowedToPack.compareAndSet(true, false)) { - val got = head.get() - val rtn = + try { + val got = head.get() if (got ne null) got.packHead(bound, 0, this) else 0 - allowedToPack.set(true) - rtn + } finally { + allowedToPack.set(true) + } } else { 0 }