Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrated timers #3219

Merged
merged 34 commits into from
Jan 28, 2023
Merged

Conversation

djspiewak
Copy link
Member

Dusted off @vasilmkd's old branch (first PRed in #2252) and merged it with the latest head. Still needs some work:

  • The fallback sleep implementation in WorkStealingThreadPool references IORuntime.global just to get things to compile. This doesn't strike me as strictly necessary given the way that sleepInternal works, but trivially removing it got me into trouble.
  • The sleep cancelation action appears to just be a lazySet(false) on SleepCallback, which is concerning because it leaks memory. I believe this is why the fiber dump specs are failing.

The runtime has changed meaningfully since Vasil's original implementation, so this definitely needs a bit of careful thought to ensure that we aren't doing anything weird or conflicty. Notably, this implementation simply doesn't implement timer stealing at all, and instead all timers are held local to their scheduling thread. We're going to need to work to validate the hypothesis that we can get away with this. If we need to implement theft, things become a lot more complex.

This is the first step in moving us toward a fully integrated runtime.

@djspiewak djspiewak added this to the v3.5.0 milestone Oct 30, 2022
@djspiewak djspiewak marked this pull request as draft October 30, 2022 14:36
@armanbilge
Copy link
Member

  • The sleep cancelation action appears to just be a lazySet(false) on SleepCallback, which is concerning because it leaks memory. I believe this is why the fiber dump specs are failing.

Btw, I have an alternative SleepCallback implementation in the native implementation. Might be interesting if we can unify those.

private[this] final class SleepTask(
val at: Long,
val runnable: Runnable
) extends Runnable
with Comparable[SleepTask] {
def run(): Unit = {
sleepQueue.remove(this)
()
}
def compareTo(that: SleepTask): Int =
java.lang.Long.compare(this.at, that.at)
}

@djspiewak
Copy link
Member Author

Published as 3.5-639ac01

@djspiewak
Copy link
Member Author

Ran a quick test on Ember. With this change, peak RPS on a basic GET request improved by a little over 25%.

@armanbilge
Copy link
Member

Um ... holy smokes!

@djspiewak
Copy link
Member Author

Did a little more testing. A slightly less trivial test involving a POST body and some JSON parsing (using Circe) shows peak RPS improvements around 14%, which makes intuitive sense since that test is going to be a little more bounded by the body processing than by the pure connection overhead. Still, 25% improvements in trivial GET peak RPS is pretty damn great.

P99 latencies in both cases were improved by 13.5%. I think this number is probably a bit more trustworthy, and also still very impressive IMO.

@djspiewak
Copy link
Member Author

Did a quick run through the WorkStealingBenchmark. It hasn't regressed even a small amount. I suspect this is because the JIT is simply inlining away the sleepers.nonEmpty check (since it's always false in those benchmarks). In a real case with real timers, I would expect some regression in straight-line performance, but obviously any such case would also benefit commensurately from this very change. Either way, I think it's safe to say this is ready for serious review.

We can do a lot better than the implementation as it stands, but that can be incrementally layered onto this once it lands.

@djspiewak djspiewak marked this pull request as ready for review November 21, 2022 02:16
@djspiewak
Copy link
Member Author

djspiewak commented Nov 21, 2022

TODO

  • Figure out why the CI keeps hanging (was able to reproduce locally but not minimize)
  • Make this work with blocking (and add tests)

@@ -514,6 +522,52 @@ private[effect] final class WorkStealingThreadPool(
*/
override def reportFailure(cause: Throwable): Unit = reportFailure0(cause)

override def monotonicNanos(): Long = System.nanoTime()

override def nowMillis(): Long = System.currentTimeMillis()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be plugble ,as what if I want to make use of https://github.com/OpenHFT/Chronicle-Ticker

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@He-Pin yes, you can always replace the Scheduler in the IORuntime with your own implementation. See also the Cats Effect test runtime, which supports time mocking.
https://typelevel.org/cats-effect/docs/core/test-runtime#mocking-time

@djspiewak
Copy link
Member Author

Released as snapshot 3.5-01f5b3a

@djspiewak
Copy link
Member Author

A few simple canaries were promising. At least, nothing exploded. We should look more closely. In the meantime, I think this is ready for more serious review. Note that there are definitely follow-ups we can explore which will absolutely micro-optimize this further. We can do that once this lands in series/3.x, since this is, by itself, already a significant improvement.

@djspiewak djspiewak requested a review from vasilmkd November 26, 2022 23:29
@armanbilge armanbilge mentioned this pull request Dec 26, 2022
@durban
Copy link
Contributor

durban commented Dec 29, 2022

Notably, this implementation simply doesn't implement timer stealing at all, and instead all timers are held local to their scheduling thread.

I think one scenario affected by this is when (1) a compute thread is blocked/spinwaiting on a condition (without using IO.blocking), and (2) for that condition to become true, a timer must fire. In this case, if the timer is coincidentally on the same thread as (1), the result is a deadlock. Of course, the solution is "don't do that". True, (1) should "never" happen, but I'm pretty sure it does. Together with (2)... I'm not sure how common it is...

In any case, this is a possible (if contrived) scenario, from which the current system can recover eventually (the timers are independent), but with this PR it can deadlock. (With timer stealing, I think it would also recover eventually, because the timer in question would get stolen.) I'm not sure how important supporting badly behaved code like this is, I just wanted to mention it.

@djspiewak
Copy link
Member Author

@durban So you're thinking about something like this?

val flag = new AtomicBoolean(false)
IO(flag.set(true)).delayBy(2.seconds) &> IO(while (!flag.get()) {})

I agree that, without timer stealing, the above can hang forever if the timer happens to be on the same thread as the while loop. However, just to be clear, if you replicateA_(100) the above, you will create a livelock even on the current version of Cats Effect. Locking up worker threads in this fashion is just… very bad. :-)

@durban
Copy link
Contributor

durban commented Dec 30, 2022

@djspiewak Yeah, something like that. The replicateA_ example is good, because it shows that currently it just seems to work. So yeah, "don't do that" is the solution.

}

implicit val sleepCallbackReverseOrdering: Ordering[SleepCallback] =
Ordering.fromLessThan(_.triggerTime > _.triggerTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ordering.fromLessThan(_.triggerTime > _.triggerTime)
Ordering.fromLessThan(_.triggerTime - _.triggerTime > 0)

while (cont) {
val head = sleepers.head()

if (head.triggerTime <= now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (head.triggerTime <= now) {
if (head.triggerTime - now <= 0) {

@armanbilge
Copy link
Member

Btw, I had to make a small patch on my JVM polling branch in e44a802.

if (!isInterrupted()) {
val now = System.nanoTime()
val head = sleepersQueue.head()
val nanos = head.triggerTime - now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cherry-pick e44a802

Suggested change
val nanos = head.triggerTime - now
val nanos = Math.max(head.triggerTime - now, 0)

if (scheduler.isInstanceOf[WorkStealingThreadPool])
scheduler.asInstanceOf[WorkStealingThreadPool].sleepInternal(delay, cb)
else
scheduler.sleep(delay, () => cb(RightUnit))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use match?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually match is often slower because of how it gets compiled. The JIT fast-paths the sequential combination of a conditional jump branching on isInstanceOf, followed immediately by a dynamic cast, and it turns that into a single operation on most architectures. match is more declarative but can easily generate bytecode which messes up this JIT optimization, so in very hot-path code we tend to be more explicit about it.


import java.util.concurrent.atomic.AtomicBoolean

private final class SleepCallback private (
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScheduledCallback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScheduledCallback might be a better name here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants