Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CallbackStack based on new "pack locking" strategy #3943

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6e2e87d
Add test for `CallbackStack#pack` race condition
armanbilge Jan 9, 2024
632ca06
wip pack lock
armanbilge Jan 9, 2024
dccd380
Implement `Fiber#join` via `asyncCheckAttempt`
armanbilge Jan 10, 2024
250592e
Merge remote-tracking branch 'origin/fix/fiber-join-async-check-attem…
armanbilge Jan 10, 2024
64cf1b6
checkpoint pack lock
armanbilge Jan 10, 2024
7f38b40
Fix recursion when head is removed
armanbilge Jan 10, 2024
1d8224f
If CASing head fails, pack tail anyway
armanbilge Jan 10, 2024
ca8f1a3
More aggressive self-removal on failed CAS
armanbilge Jan 10, 2024
1e54754
Fix NPE
armanbilge Jan 10, 2024
ede4637
Fix spec on JS, new race condition bug on JVM
armanbilge Jan 10, 2024
55ba5c6
Actually fix spec on JS
armanbilge Jan 10, 2024
e970713
Passthrough `removed`
armanbilge Jan 10, 2024
d448086
Return `removed` more
armanbilge Jan 10, 2024
13115ff
Tidy up the tests
armanbilge Jan 10, 2024
7f16898
Organize imports
armanbilge Jan 10, 2024
ea20d1d
Make `next` a `private[this]`
armanbilge Jan 10, 2024
e9301e6
Fix NPE, add test
armanbilge Jan 10, 2024
9c12425
Hush MiMa
armanbilge Jan 10, 2024
9121e45
Fix unused warning
armanbilge Jan 10, 2024
4563927
More hushing
armanbilge Jan 10, 2024
82c686f
Workaround weird unused warnings
armanbilge Jan 10, 2024
9e3579b
Even more hushing
armanbilge Jan 11, 2024
35aa9a1
Increase concurrency in `CallbackStackSpec`
armanbilge Jan 14, 2024
a18138e
Add comment/ref about data race in `CallbackStack#apply`
armanbilge Jan 14, 2024
5b3ad16
Use `try`/`finally` to acquire/release pack lock
armanbilge Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,11 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.unsafe.WorkerThread.sleep"),
// #3787, internal utility that was no longer needed
ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$"),
// #3943, refactored internal private CallbackStack data structure
ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.CallbackStack.currentHandle")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down Expand Up @@ -815,7 +819,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("cats.effect.CallbackStack"),
// introduced by #3642, which optimized the BatchingMacrotaskExecutor
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.BatchingMacrotaskExecutor$executeBatchTaskRunnable$")
"cats.effect.unsafe.BatchingMacrotaskExecutor$executeBatchTaskRunnable$"),
// #3943, refactored internal private CallbackStack data structure
ProblemFilters.exclude[Problem]("cats.effect.CallbackStackOps.*")
)
},
mimaBinaryIssueFilters ++= {
Expand Down
24 changes: 14 additions & 10 deletions core/js/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ package cats.effect

import scala.scalajs.js

import CallbackStack.Handle

private trait CallbackStack[A] extends js.Object

private final class CallbackStackOps[A](private val callbacks: js.Array[A => Unit])
extends AnyVal {

@inline def push(next: A => Unit): CallbackStack[A] = {
@inline def push(next: A => Unit): Handle[A] = {
callbacks.push(next)
callbacks.asInstanceOf[CallbackStack[A]]
callbacks.length - 1
}

@inline def unsafeSetCallback(cb: A => Unit): Unit = {
Expand All @@ -36,29 +38,31 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni
* Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true
* iff *any* callbacks were invoked.
*/
@inline def apply(oc: A, invoked: Boolean): Boolean =
@inline def apply(oc: A): Boolean =
callbacks
.asInstanceOf[js.Dynamic]
.reduceRight( // skips deleted indices, but there can still be nulls
(acc: Boolean, cb: A => Unit) =>
if (cb ne null) { cb(oc); true }
else acc,
invoked)
false)
.asInstanceOf[Boolean]

/**
* Removes the current callback from the queue.
* Removes the callback referenced by a handle. Returns `true` if the data structure was
* cleaned up immediately, `false` if a subsequent call to [[pack]] is required.
*/
@inline def clearCurrent(handle: Int): Unit =
@inline def clearHandle(handle: Handle[A]): Boolean = {
// deleting an index from a js.Array makes it sparse (aka "holey"), so no memory leak
js.special.delete(callbacks, handle)

@inline def currentHandle(): CallbackStack.Handle = callbacks.length - 1
true
}

@inline def clear(): Unit =
callbacks.length = 0 // javascript is crazy!

@inline def pack(bound: Int): Int = bound
@inline def pack(bound: Int): Int =
bound - bound // aka 0, but so bound is not unused ...
}

private object CallbackStack {
Expand All @@ -68,5 +72,5 @@ private object CallbackStack {
@inline implicit def ops[A](stack: CallbackStack[A]): CallbackStackOps[A] =
new CallbackStackOps(stack.asInstanceOf[js.Array[A => Unit]])

type Handle = Int
type Handle[A] = Int
}
209 changes: 150 additions & 59 deletions core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@ package cats.effect

import scala.annotation.tailrec

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import CallbackStack.Handle
import CallbackStack.Node

private final class CallbackStack[A](private[this] var callback: A => Unit)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
extends AtomicReference[CallbackStack[A]] {
extends AtomicReference[Node[A]] {
head =>

private[this] val allowedToPack = new AtomicBoolean(true)

def push(next: A => Unit): CallbackStack[A] = {
val attempt = new CallbackStack(next)
/**
* Pushes a callback to the top of the stack. Returns a handle that can be used with
* [[clearHandle]] to clear the callback.
*/
def push(cb: A => Unit): Handle[A] = {
val newHead = new Node(cb)

@tailrec
def loop(): CallbackStack[A] = {
val cur = get()
attempt.lazySet(cur)
def loop(): Handle[A] = {
val currentHead = head.get()
newHead.setNext(currentHead)

if (!compareAndSet(cur, attempt))
if (!head.compareAndSet(currentHead, newHead))
loop()
else
attempt
newHead
}

loop()
Expand All @@ -48,35 +58,46 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
* Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true
* iff *any* callbacks were invoked.
*/
@tailrec
def apply(oc: A, invoked: Boolean): Boolean = {
val cb = callback
def apply(a: A): Boolean = {
// TODO should we read allowedToPack for memory effect?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained above, it's not strictly necessary, but if we did read allowedToPack then at least some of the packing would be guaranteed to be visible. But I'm not sure if this additional synchronization is worth it: the modifications may already be visible fortuitously and we still need to be prepared for concurrent packing anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I didn't consider concurrent pack and apply. If there were any concerns regarding correctness (boils down to: is it ok for pack to rewrite the edges pointing to already published data without synchronization?), could even consider spinning on the allowedToPack lock and own the list during apply? That might even amortize because apply won't take the packed "detour"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could even consider spinning on the allowedToPack lock and own the list during apply

Sam and I considered this idea but decided against it. We could be spinning for an unbounded time while we wait for pack to complete iterating over a very long list. Or what if we lose the race and another pack acquires the lock? For sure, these are worst case pathologies, but since owning the lock is not required to safely iterate the list, spinning until we acquire doesn't seem worth the risk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say yes, read allowedToPack here, because: what will we do here? We'll call all the cbs in the list, right? Which are cont's resumes, so when we call them, each will do 1-2 CASes. If we can avoid calling some of them (the cleared ones) by doing a volatile read, that seems like a win. (Of course we might already not call them, because we might already see that they're null... so I'm not sure actually...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really interesting point :) it's essentially gambling:

  • we do a volatile read now, but get a pretty good guarantee we won't do unnecessary CASes
  • we take our chances and if we are lucky we got away with doing the minimum synchronization necessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleared ones are nulled out though, so we shouldn't have any penalty there besides the node read, or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleared ones are nulled out though

Yes, but currently there is no guarantee that this is published either. They are ordinary writes/reads.


val invoked2 = if (cb != null) {
cb(oc)
val cb = callback
var invoked = if (cb != null) {
cb(a)
true
} else {
invoked
false
}
var currentNode = head.get()

val next = get()
if (next != null)
next(oc, invoked2)
else
invoked2
while (currentNode ne null) {
val cb = currentNode.getCallback()
if (cb != null) {
cb(a)
invoked = true
}
currentNode = currentNode.getNext()
}

invoked
}

/**
* Removes the current callback from the queue.
* Removes the callback referenced by a handle. Returns `true` if the data structure was
* cleaned up immediately, `false` if a subsequent call to [[pack]] is required.
*/
def clearCurrent(handle: CallbackStack.Handle): Unit = {
val _ = handle
callback = null
def clearHandle(handle: CallbackStack.Handle[A]): Boolean = {
handle.clear()
false
}

def currentHandle(): CallbackStack.Handle = 0

def clear(): Unit = lazySet(null)
/**
* Nulls all references in this callback stack.
*/
def clear(): Unit = {
callback = null
head.lazySet(null)
}
Comment on lines +94 to +100
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to be more aggressive about iterating the list and nulling all the callbacks and nexts. The concern is if someone holds onto a Handle for cancelation, they could actually be keeping the entire structure in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a valid concern. Maybe we could do that during apply? It's a little strange, because apply and clear are separate, but a call to clear always follows apply, doesn't it?

Copy link
Member Author

@armanbilge armanbilge Jan 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little strange, because apply and clear are separate, but a call to clear always follows apply, doesn't it?

I agree. I'd be happy to see clear go away and we do the clearing as part of apply.

Having a separate clear seems important here. This code surprised me.

try {
if (!callbacks(oc, false) && runtime.config.reportUnhandledFiberErrors) {
oc match {
case Outcome.Errored(e) => currentCtx.reportFailure(e)
case _ => ()
}
}
} finally {
callbacks.clear() /* avoid leaks */
}

I think the only callback that can throw is one installed at construction e.g. via IO#unsafeRunAsync. But it seems better to try/catch where the callback is defined, and only install non-throwing callbacks in IOFiber. It especially strikes me as odd if the other callbacks are not evaluated, because the first one threw.

However, these changes seem involved enough to warrant their own PR ...


/**
* It is intended that `bound` be tracked externally and incremented on each clear(). Whenever
Expand Down Expand Up @@ -106,51 +127,121 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
* (amortized). This still biases the optimizations towards the head of the list, but ensures
* that packing will still inevitably reach all of the garbage cells.
*/
def pack(bound: Int): Int = {
// the first cell is always retained
val got = get()
if (got ne null)
got.packInternal(bound, 0, this)
else
def pack(bound: Int): Int =
if (allowedToPack.compareAndSet(true, false)) {
val got = head.get()
val rtn =
if (got ne null)
got.packHead(bound, 0, this)
else
0
allowedToPack.set(true)
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
rtn
} else {
0
}

override def toString(): String = s"CallbackStack($callback, ${get()})"

}

private object CallbackStack {
def apply[A](callback: A => Unit): CallbackStack[A] =
new CallbackStack(callback)

sealed abstract class Handle[A] {
private[CallbackStack] def clear(): Unit
}

@tailrec
private def packInternal(bound: Int, removed: Int, parent: CallbackStack[A]): Int = {
if (callback == null) {
val child = get()
private[CallbackStack] final class Node[A](
private[this] var callback: A => Unit
) extends Handle[A] {
private[this] var next: Node[A] = _

def getCallback(): A => Unit = callback

def getNext(): Node[A] = next

// doing this cas here ultimately deoptimizes contiguous empty chunks
if (!parent.compareAndSet(this, child)) {
// if we're contending with another pack(), just bail and let them continue
removed
def setNext(next: Node[A]): Unit = {
this.next = next
}

def clear(): Unit = {
callback = null
}

/**
* Packs this head node
*/
@tailrec
def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = {
val next = this.next // local copy

if (callback == null) {
if (root.compareAndSet(this, next)) {
if (next == null) {
// bottomed out
removed + 1
} else {
// note this can cause the bound to go negative, which is fine
next.packHead(bound - 1, removed + 1, root)
}
} else {
val prev = root.get()
if ((prev != null) && (prev.getNext() eq this)) {
// prev is our new parent, we are its tail
this.packTail(bound, removed, prev)
} else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail
next.packTail(bound - 1, removed, this)
} else {
removed
}
}
} else {
if (child == null) {
if (next == null) {
// bottomed out
removed
} else {
if (bound > 0)
next.packTail(bound - 1, removed, this)
else
removed
}
}
}

/**
* Packs this non-head node
*/
@tailrec
private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = {
val next = this.next // local copy

if (callback == null) {
// We own the pack lock, so it is safe to write `next`. It will be published to subsequent packs via the lock.
// Concurrent readers ie `CallbackStack#apply` may read a stale value for `next` still pointing to this node.
// This is okay b/c the new `next` (this node's tail) is still reachable via the old `next` (this node).
prev.setNext(next)
if (next == null) {
// bottomed out
removed + 1
} else {
// note this can cause the bound to go negative, which is fine
child.packInternal(bound - 1, removed + 1, parent)
next.packTail(bound - 1, removed + 1, prev)
}
}
} else {
val child = get()
if (child == null) {
// bottomed out
removed
} else {
if (bound > 0)
child.packInternal(bound - 1, removed, this)
else
if (next == null) {
// bottomed out
removed
} else {
if (bound > 0)
next.packTail(bound - 1, removed, this)
else
removed
}
}
}
}
}

private object CallbackStack {
def apply[A](cb: A => Unit): CallbackStack[A] =
new CallbackStack(cb)

type Handle = Byte
override def toString(): String = s"Node($callback, $next)"
}
}
17 changes: 9 additions & 8 deletions core/shared/src/main/scala/cats/effect/IODeferred.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ private final class IODeferred[A] extends Deferred[IO, A] {
private[this] val initial: IO[A] = {
val await = IO.asyncCheckAttempt[A] { cb =>
IO {
val stack = callbacks.push(cb)
val handle = stack.currentHandle()
val handle = callbacks.push(cb)

def clear(): Unit = {
stack.clearCurrent(handle)
val clearCount = clearCounter.incrementAndGet()
if ((clearCount & (clearCount - 1)) == 0) // power of 2
clearCounter.addAndGet(-callbacks.pack(clearCount))
()
val removed = callbacks.clearHandle(handle)
if (!removed) {
val clearCount = clearCounter.incrementAndGet()
if ((clearCount & (clearCount - 1)) == 0) // power of 2
clearCounter.addAndGet(-callbacks.pack(clearCount))
()
}
}

val back = cell.get()
Expand Down Expand Up @@ -59,7 +60,7 @@ private final class IODeferred[A] extends Deferred[IO, A] {

def complete(a: A): IO[Boolean] = IO {
if (cell.compareAndSet(initial, IO.pure(a))) {
val _ = callbacks(Right(a), false)
val _ = callbacks(Right(a))
callbacks.clear() // avoid leaks
true
} else {
Expand Down
Loading
Loading