Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CallbackStack based on new "pack locking" strategy #3943

Merged

Conversation

armanbilge
Copy link
Member

@armanbilge armanbilge commented Jan 10, 2024

Fixes #3940. Very fun collaboration with @samspills but credit to @mernst-github for proposing the idea of guarding pack in #3940 (comment).

The new callback stack is made up of two data types:

  1. CallbackStack.Node, a singly-linked list of callbacks
  2. CallbackStack itself, which holds:
    • an AtomicReference to the head Node, and
    • an AtomicBoolean lock allowedToPack, used for guarding pack

Pushing to the stack is a straighforward CAS of the head.

pack is the only method that modifies the list.

  • the AtomicBoolean lock guarantees that only one pack operation is running at a time, and is also used to publish the modifications to subsequent packs
  • if a call to pack is unable to acquire the lock, it gives up immediately

Invoking the callbacks is not guarded by the lock. This means that the modifications by pack may not be visible, and in fact packing can even occur concurrently while iterating over the list. This is actually okay:

  • the list is only ever modified to remove Nodes, since additions happen by CASing the pointer to the head Node
  • thus, if while iterating over the list we follow a stale pointer to a next Node that was actually removed, we take a "detour" but do eventually visit all the Nodes in the list

armanbilge and others added 16 commits January 9, 2024 01:20
Co-authored-by: Matthias Ernst <mernst-github@mernst.org>
Co-authored-by: Sam Pillsworth <sam@blerf.ca>
Co-authored-by: Matthias Ernst <mernst-github@mernst.org>
Co-authored-by: Sam Pillsworth <sam@blerf.ca>
Co-authored-by: Sam Pillsworth <sam@blerf.ca>
Co-authored-by: Matthias Ernst <mernst-github@mernst.org>
Co-authored-by: Sam Pillsworth <sam@blerf.ca>
Co-authored-by: Sam Pillsworth <sam@blerf.ca>
Co-authored-by: Sam Pillsworth <sam@blerf.ca>
Comment on lines 61 to 62
def apply(a: A): Boolean = {
// TODO should we read allowedToPack for memory effect?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained above, it's not strictly necessary, but if we did read allowedToPack then at least some of the packing would be guaranteed to be visible. But I'm not sure if this additional synchronization is worth it: the modifications may already be visible fortuitously and we still need to be prepared for concurrent packing anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I didn't consider concurrent pack and apply. If there were any concerns regarding correctness (boils down to: is it ok for pack to rewrite the edges pointing to already published data without synchronization?), could even consider spinning on the allowedToPack lock and own the list during apply? That might even amortize because apply won't take the packed "detour"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could even consider spinning on the allowedToPack lock and own the list during apply

Sam and I considered this idea but decided against it. We could be spinning for an unbounded time while we wait for pack to complete iterating over a very long list. Or what if we lose the race and another pack acquires the lock? For sure, these are worst case pathologies, but since owning the lock is not required to safely iterate the list, spinning until we acquire doesn't seem worth the risk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say yes, read allowedToPack here, because: what will we do here? We'll call all the cbs in the list, right? Which are cont's resumes, so when we call them, each will do 1-2 CASes. If we can avoid calling some of them (the cleared ones) by doing a volatile read, that seems like a win. (Of course we might already not call them, because we might already see that they're null... so I'm not sure actually...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really interesting point :) it's essentially gambling:

  • we do a volatile read now, but get a pretty good guarantee we won't do unnecessary CASes
  • we take our chances and if we are lucky we got away with doing the minimum synchronization necessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleared ones are nulled out though, so we shouldn't have any penalty there besides the node read, or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleared ones are nulled out though

Yes, but currently there is no guarantee that this is published either. They are ordinary writes/reads.

Comment on lines +94 to +100
/**
* Nulls all references in this callback stack.
*/
def clear(): Unit = {
callback = null
head.lazySet(null)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to be more aggressive about iterating the list and nulling all the callbacks and nexts. The concern is if someone holds onto a Handle for cancelation, they could actually be keeping the entire structure in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a valid concern. Maybe we could do that during apply? It's a little strange, because apply and clear are separate, but a call to clear always follows apply, doesn't it?

Copy link
Member Author

@armanbilge armanbilge Jan 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little strange, because apply and clear are separate, but a call to clear always follows apply, doesn't it?

I agree. I'd be happy to see clear go away and we do the clearing as part of apply.

Having a separate clear seems important here. This code surprised me.

try {
if (!callbacks(oc, false) && runtime.config.reportUnhandledFiberErrors) {
oc match {
case Outcome.Errored(e) => currentCtx.reportFailure(e)
case _ => ()
}
}
} finally {
callbacks.clear() /* avoid leaks */
}

I think the only callback that can throw is one installed at construction e.g. via IO#unsafeRunAsync. But it seems better to try/catch where the callback is defined, and only install non-throwing callbacks in IOFiber. It especially strikes me as odd if the other callbacks are not evaluated, because the first one threw.

However, these changes seem involved enough to warrant their own PR ...

Copy link
Contributor

@mernst-github mernst-github left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just a couple of thoughts:

Comment on lines 61 to 62
def apply(a: A): Boolean = {
// TODO should we read allowedToPack for memory effect?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I didn't consider concurrent pack and apply. If there were any concerns regarding correctness (boils down to: is it ok for pack to rewrite the edges pointing to already published data without synchronization?), could even consider spinning on the allowedToPack lock and own the list during apply? That might even amortize because apply won't take the packed "detour"?

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this. Some trivial changes and a couple simple questions but otherwise this is great.

@djspiewak
Copy link
Member

Looks like a perfect wash (within margin for error) in terms of performance. Good to go on that front!

Before

[info] Benchmark                                             (count)  (cpuTokens)   (size)   Mode  Cnt      Score     Error    Units
[info] DeepBindBenchmark.async                                   N/A          N/A    10000  thrpt   10   2935.967 ± 102.359    ops/s
[info] DeepBindBenchmark.delay                                   N/A          N/A    10000  thrpt   10   9742.384 ±  42.545    ops/s
[info] DeepBindBenchmark.pure                                    N/A          N/A    10000  thrpt   10  11626.005 ± 170.858    ops/s
[info] DeferredBenchmark.cancel                                   10          N/A      N/A  thrpt   10    104.204 ±   0.456    ops/s
[info] DeferredBenchmark.cancel                                  100          N/A      N/A  thrpt   10     13.537 ±   0.074    ops/s
[info] DeferredBenchmark.cancel                                 1000          N/A      N/A  thrpt   10      1.312 ±   0.008    ops/s
[info] DeferredBenchmark.complete                                 10          N/A      N/A  thrpt   10    110.424 ±   0.836    ops/s
[info] DeferredBenchmark.complete                                100          N/A      N/A  thrpt   10     26.084 ±   0.109    ops/s
[info] DeferredBenchmark.complete                               1000          N/A      N/A  thrpt   10      5.073 ±   0.287    ops/s
[info] DeferredBenchmark.getAfter                                 10          N/A      N/A  thrpt   10   7059.728 ±  19.859    ops/s
[info] DeferredBenchmark.getAfter                                100          N/A      N/A  thrpt   10   2033.584 ±  10.383    ops/s
[info] DeferredBenchmark.getAfter                               1000          N/A      N/A  thrpt   10    247.342 ±   0.501    ops/s
[info] DeferredBenchmark.getBefore                                10          N/A      N/A  thrpt   10   3994.309 ±  15.618    ops/s
[info] DeferredBenchmark.getBefore                               100          N/A      N/A  thrpt   10    798.325 ±   1.832    ops/s
[info] DeferredBenchmark.getBefore                              1000          N/A      N/A  thrpt   10     81.027 ±   0.163    ops/s
[info] ParallelBenchmark.parTraverse                             N/A        10000     1000  thrpt   10    878.708 ±  25.632    ops/s
[info] ParallelBenchmark.traverse                                N/A        10000     1000  thrpt   10     70.062 ±   0.050    ops/s
[info] ShallowBindBenchmark.async                                N/A          N/A    10000  thrpt   10   2157.575 ±   3.120    ops/s
[info] ShallowBindBenchmark.delay                                N/A          N/A    10000  thrpt   10  10052.708 ±  54.888    ops/s
[info] ShallowBindBenchmark.pure                                 N/A          N/A    10000  thrpt   10  10539.875 ± 135.470    ops/s
[info] WorkStealingBenchmark.alloc                               N/A          N/A  1000000  thrpt   10     14.151 ±   0.111  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark      N/A          N/A  1000000  thrpt   10     50.544 ±   0.753  ops/min
[info] WorkStealingBenchmark.runnableScheduling                  N/A          N/A  1000000  thrpt   10   2958.068 ±  15.379  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal       N/A          N/A  1000000  thrpt   10   2195.662 ±   5.718  ops/min
[info] WorkStealingBenchmark.scheduling                          N/A          N/A  1000000  thrpt   10     53.854 ±   2.195  ops/min

After

[info] Benchmark                                             (count)  (cpuTokens)   (size)   Mode  Cnt      Score     Error    Units
[info] DeepBindBenchmark.async                                   N/A          N/A    10000  thrpt   10   3068.624 ±  16.441    ops/s
[info] DeepBindBenchmark.delay                                   N/A          N/A    10000  thrpt   10   9853.126 ± 124.913    ops/s
[info] DeepBindBenchmark.pure                                    N/A          N/A    10000  thrpt   10  11664.990 ±  94.674    ops/s
[info] DeferredBenchmark.cancel                                   10          N/A      N/A  thrpt   10    107.015 ±   0.686    ops/s
[info] DeferredBenchmark.cancel                                  100          N/A      N/A  thrpt   10     14.352 ±   0.083    ops/s
[info] DeferredBenchmark.cancel                                 1000          N/A      N/A  thrpt   10      1.326 ±   0.016    ops/s
[info] DeferredBenchmark.complete                                 10          N/A      N/A  thrpt   10    110.982 ±   1.658    ops/s
[info] DeferredBenchmark.complete                                100          N/A      N/A  thrpt   10     26.111 ±   0.183    ops/s
[info] DeferredBenchmark.complete                               1000          N/A      N/A  thrpt   10      5.129 ±   0.271    ops/s
[info] DeferredBenchmark.getAfter                                 10          N/A      N/A  thrpt   10   6990.249 ±  47.301    ops/s
[info] DeferredBenchmark.getAfter                                100          N/A      N/A  thrpt   10   2042.712 ±  21.909    ops/s
[info] DeferredBenchmark.getAfter                               1000          N/A      N/A  thrpt   10    246.581 ±   1.930    ops/s
[info] DeferredBenchmark.getBefore                                10          N/A      N/A  thrpt   10   3941.678 ±  24.476    ops/s
[info] DeferredBenchmark.getBefore                               100          N/A      N/A  thrpt   10    744.944 ±   5.293    ops/s
[info] DeferredBenchmark.getBefore                              1000          N/A      N/A  thrpt   10     80.747 ±   0.431    ops/s
[info] ParallelBenchmark.parTraverse                             N/A        10000     1000  thrpt   10    887.059 ±  23.448    ops/s
[info] ParallelBenchmark.traverse                                N/A        10000     1000  thrpt   10     69.816 ±   0.752    ops/s
[info] ShallowBindBenchmark.async                                N/A          N/A    10000  thrpt   10   2169.835 ±  22.053    ops/s
[info] ShallowBindBenchmark.delay                                N/A          N/A    10000  thrpt   10   9872.628 ± 116.515    ops/s
[info] ShallowBindBenchmark.pure                                 N/A          N/A    10000  thrpt   10  10410.052 ±  75.004    ops/s
[info] WorkStealingBenchmark.alloc                               N/A          N/A  1000000  thrpt   10     14.018 ±   0.133  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark      N/A          N/A  1000000  thrpt   10     48.573 ±   2.040  ops/min
[info] WorkStealingBenchmark.runnableScheduling                  N/A          N/A  1000000  thrpt   10   2997.308 ±  10.291  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal       N/A          N/A  1000000  thrpt   10   2239.119 ±  12.325  ops/min
[info] WorkStealingBenchmark.scheduling                          N/A          N/A  1000000  thrpt   10     52.694 ±   1.716  ops/min

djspiewak
djspiewak previously approved these changes Jan 15, 2024
Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still leaning toward rolling the dice on not fencing in apply. Let's see how that treats us though, and if we're seeing problems there, we'll add the fence.

@armanbilge armanbilge dismissed djspiewak’s stale review January 15, 2024 18:27

The merge-base changed after approval.

@djspiewak djspiewak merged commit 6013bd6 into typelevel:series/3.5.x Jan 15, 2024
31 of 32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CallbackStack#pack can double-count removals when called concurrently
4 participants