Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrated timers #3219

Merged
merged 34 commits into from
Jan 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a840035
Implement SleepCallback
vasilmkd Aug 20, 2021
91f7dec
Implement SleepersQueue
vasilmkd Aug 20, 2021
bf7e95d
Add a cancelation mechanism to SleepCallback
vasilmkd Aug 20, 2021
9d061d5
Complicate the parking mechanism a bit
vasilmkd Aug 20, 2021
9d5c01a
Pretend to handle sleeping fibers
vasilmkd Aug 20, 2021
09b1073
Add a sleep method on WorkerThread
vasilmkd Aug 20, 2021
787173a
Implement the sleep method on WorkStealingThreadPool
vasilmkd Aug 20, 2021
7bbaf96
Simplify canceled sleepers handling
vasilmkd Aug 21, 2021
96e3baa
Worker threads need to fight for their awakening
vasilmkd Aug 21, 2021
a0db9bc
Wire up the new sleeping mechanism in the run loop
vasilmkd Aug 21, 2021
97ac586
Restore the correct thread pool state after sleeping
vasilmkd Aug 21, 2021
a46b1bc
Rename the internal sleep method
vasilmkd Aug 21, 2021
fb644ec
Add a sleep cancelation unit test
vasilmkd Aug 21, 2021
3935273
Add a link to the openjdk source code
vasilmkd Aug 21, 2021
f00c2b4
Add SleepBenchmark
vasilmkd Aug 21, 2021
c13dacc
Check ownership of `WorkerThread` before sleeping
vasilmkd Aug 24, 2021
2d679c7
`WorkStealingThreadPool` extends `Scheduler`
vasilmkd Aug 24, 2021
b2808e7
Switch the default scheduler to the compute pool
vasilmkd Aug 24, 2021
6431187
Add a unit test for foreign execution contexts
vasilmkd Aug 24, 2021
862f024
Directly complete the sleep async callback
vasilmkd Aug 26, 2021
5503844
Re-use the `RightUnit` instance
vasilmkd Aug 26, 2021
73811e3
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Oct 30, 2022
4bbfdfb
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Nov 20, 2022
6097579
Removed references to external `IORuntime` from `sleep` unhappy path
djspiewak Nov 20, 2022
55e5bfa
Fixed memory leak in sleep cancelation
djspiewak Nov 20, 2022
65d5561
Fixed reference to `global` executor
djspiewak Nov 20, 2022
5801a8d
`sbt prePR`
djspiewak Nov 20, 2022
639ac01
This definitely bumps the base version
djspiewak Nov 20, 2022
16715d3
Disable starvation checker in `SleepDrift` test
djspiewak Nov 20, 2022
564ffba
Removed errant println
djspiewak Nov 21, 2022
fce4572
Fixed `blockOn` mechanism for integrated timers
djspiewak Nov 21, 2022
3ab4a7c
Moved WSTP tests into platform spec
djspiewak Nov 21, 2022
01f5b3a
Removed unused import
djspiewak Nov 22, 2022
5951e31
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Nov 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.benchmarks

import cats.effect.IO
import cats.effect.unsafe._
import cats.syntax.all._

import org.openjdk.jmh.annotations._

import scala.concurrent.duration._

import java.util.concurrent.TimeUnit

/**
* To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark SleepBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.SleepBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
* more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MINUTES)
class SleepBenchmark {

@Param(Array("10000"))
var size: Int = _

def sleepBenchmark(implicit runtime: IORuntime): Int = {
def fiber(i: Int): IO[Int] =
IO.sleep(1.nanosecond).flatMap { _ =>
IO(i).flatMap { j =>
IO.sleep(1.nanosecond).flatMap { _ =>
if (j > 1000)
IO.sleep(1.nanosecond).flatMap(_ => IO.pure(j))
else
IO.sleep(1.nanosecond).flatMap(_ => fiber(j + 1))
}
}
}

List
.range(0, size)
.traverse(fiber(_).start)
.flatMap(_.traverse(_.joinWithNever))
.map(_.sum)
.unsafeRunSync()
}

@Benchmark
def sleep(): Int = {
import cats.effect.unsafe.implicits.global
sleepBenchmark
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import scala.concurrent.duration._

object SleepDrift extends IOApp.Simple {

override val runtimeConfig =
super.runtimeConfig.copy(cpuStarvationCheckInitialDelay = Duration.Inf)

val delayTwoMinutes = {
def loop(n: Int): IO[Unit] = {
if (n <= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,36 +165,23 @@ class WorkStealingBenchmark {
(ExecutionContext.fromExecutor(executor), () => executor.shutdown())
}

val (scheduler, schedDown) = {
val executor = Executors.newSingleThreadScheduledExecutor { r =>
val t = new Thread(r)
t.setName("io-scheduler")
t.setDaemon(true)
t.setPriority(Thread.MAX_PRIORITY)
t
}
(Scheduler.fromScheduledExecutor(executor), () => executor.shutdown())
}

val compute =
new WorkStealingThreadPool(
256,
"io-compute",
"io-blocker",
60.seconds,
_.printStackTrace())
val compute = new WorkStealingThreadPool(
256,
"io-compute",
"io-blocker",
60.seconds,
_.printStackTrace())

val cancelationCheckThreshold =
System.getProperty("cats.effect.cancelation.check.threshold", "512").toInt

IORuntime(
compute,
blocking,
scheduler,
compute,
() => {
compute.shutdown()
blockDown()
schedDown()
},
IORuntimeConfig(
cancelationCheckThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cats.effect
package unsafe

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

// Can you imagine a thread pool on JS? Have fun trying to extend or instantiate
// this class. Unfortunately, due to the explicit branching, this type leaks
Expand All @@ -27,6 +28,13 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
def execute(runnable: Runnable): Unit
def reportFailure(cause: Throwable): Unit
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
fallback: Scheduler): Runnable
private[effect] def canExecuteBlockingCode(): Boolean
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Expand Down
6 changes: 1 addition & 5 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,13 @@ trait IOApp {
val (blocking, blockDown) =
IORuntime.createDefaultBlockingExecutionContext()

val (scheduler, schedDown) =
IORuntime.createDefaultScheduler()

IORuntime(
compute,
blocking,
scheduler,
compute,
{ () =>
compDown()
blockDown()
schedDown()
},
runtimeConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
installGlobal {
val (compute, _) = createWorkStealingComputeThreadPool()
val (blocking, _) = createDefaultBlockingExecutionContext()
val (scheduler, _) = createDefaultScheduler()
IORuntime(compute, blocking, scheduler, () => (), IORuntimeConfig())

IORuntime(compute, blocking, compute, () => (), IORuntimeConfig())
}
}

Expand Down
79 changes: 79 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepCallback.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe

import scala.concurrent.duration.FiniteDuration

import java.util.concurrent.atomic.AtomicBoolean

private final class SleepCallback private (
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScheduledCallback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScheduledCallback might be a better name here.

val triggerTime: Long,
private[this] var _callback: Right[Nothing, Unit] => Unit)
extends AtomicBoolean(true)
with Runnable {

def callback(r: Right[Nothing, Unit]): Unit = {
val cb = _callback
if (cb != null) {
cb(r)
}
}

override def run(): Unit = {
lazySet(false)
_callback = null // avoid memory leaks
}
}

private object SleepCallback {

/**
* Translated to Scala from:
* https://github.com/openjdk/jdk/blob/04a806ec86a388b8de31d42f904c4321beb69e14/src/java.base/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java#L527-L547
*/
def create(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit,
now: Long,
sleepers: SleepersQueue): SleepCallback = {

def overflowFree(delay: Long, now: Long): Long =
if (sleepers.isEmpty) delay
else {
val head = sleepers.head()
val headDelay = head.triggerTime - now
if (headDelay < 0 && (delay - headDelay < 0))
Long.MaxValue + headDelay
else
delay
}

val triggerTime = {
val delayNanos = delay.toNanos

if (delayNanos < (Long.MaxValue >> 1))
now + delayNanos
else
now + overflowFree(delayNanos, now)
}

new SleepCallback(triggerTime, callback)
}

implicit val sleepCallbackReverseOrdering: Ordering[SleepCallback] =
Ordering.fromLessThan(_.triggerTime > _.triggerTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ordering.fromLessThan(_.triggerTime > _.triggerTime)
Ordering.fromLessThan(_.triggerTime - _.triggerTime > 0)

}
50 changes: 50 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepersQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe

import scala.collection.mutable.PriorityQueue

private final class SleepersQueue private () {
private[this] val queue: PriorityQueue[SleepCallback] = PriorityQueue.empty
private[this] var count: Int = 0

def isEmpty: Boolean =
count == 0

def nonEmpty: Boolean =
!isEmpty

def head(): SleepCallback =
queue.head

def +=(scb: SleepCallback): Unit = {
queue += scb
count += 1
}

def popHead(): Unit = {
queue.dequeue()
count -= 1
()
}

override def toString = s"SleepersQueue($queue, $count)"
}

private object SleepersQueue {
def empty: SleepersQueue = new SleepersQueue()
}
Loading