Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yet another polling system sketch #3296

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a840035
Implement SleepCallback
vasilmkd Aug 20, 2021
91f7dec
Implement SleepersQueue
vasilmkd Aug 20, 2021
bf7e95d
Add a cancelation mechanism to SleepCallback
vasilmkd Aug 20, 2021
9d061d5
Complicate the parking mechanism a bit
vasilmkd Aug 20, 2021
9d5c01a
Pretend to handle sleeping fibers
vasilmkd Aug 20, 2021
09b1073
Add a sleep method on WorkerThread
vasilmkd Aug 20, 2021
787173a
Implement the sleep method on WorkStealingThreadPool
vasilmkd Aug 20, 2021
7bbaf96
Simplify canceled sleepers handling
vasilmkd Aug 21, 2021
96e3baa
Worker threads need to fight for their awakening
vasilmkd Aug 21, 2021
a0db9bc
Wire up the new sleeping mechanism in the run loop
vasilmkd Aug 21, 2021
97ac586
Restore the correct thread pool state after sleeping
vasilmkd Aug 21, 2021
a46b1bc
Rename the internal sleep method
vasilmkd Aug 21, 2021
fb644ec
Add a sleep cancelation unit test
vasilmkd Aug 21, 2021
3935273
Add a link to the openjdk source code
vasilmkd Aug 21, 2021
f00c2b4
Add SleepBenchmark
vasilmkd Aug 21, 2021
c13dacc
Check ownership of `WorkerThread` before sleeping
vasilmkd Aug 24, 2021
2d679c7
`WorkStealingThreadPool` extends `Scheduler`
vasilmkd Aug 24, 2021
b2808e7
Switch the default scheduler to the compute pool
vasilmkd Aug 24, 2021
6431187
Add a unit test for foreign execution contexts
vasilmkd Aug 24, 2021
862f024
Directly complete the sleep async callback
vasilmkd Aug 26, 2021
5503844
Re-use the `RightUnit` instance
vasilmkd Aug 26, 2021
73811e3
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Oct 30, 2022
4bbfdfb
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Nov 20, 2022
6097579
Removed references to external `IORuntime` from `sleep` unhappy path
djspiewak Nov 20, 2022
55e5bfa
Fixed memory leak in sleep cancelation
djspiewak Nov 20, 2022
65d5561
Fixed reference to `global` executor
djspiewak Nov 20, 2022
5801a8d
`sbt prePR`
djspiewak Nov 20, 2022
639ac01
This definitely bumps the base version
djspiewak Nov 20, 2022
16715d3
Disable starvation checker in `SleepDrift` test
djspiewak Nov 20, 2022
564ffba
Removed errant println
djspiewak Nov 21, 2022
fce4572
Fixed `blockOn` mechanism for integrated timers
djspiewak Nov 21, 2022
3ab4a7c
Moved WSTP tests into platform spec
djspiewak Nov 21, 2022
01f5b3a
Removed unused import
djspiewak Nov 22, 2022
6a21e49
Initial sketch of `PollingSystem`
djspiewak Nov 24, 2022
cca9caf
Iterate polling system sketch
armanbilge Dec 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.benchmarks

import cats.effect.IO
import cats.effect.unsafe._
import cats.syntax.all._

import org.openjdk.jmh.annotations._

import scala.concurrent.duration._

import java.util.concurrent.TimeUnit

/**
* To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark SleepBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.SleepBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
* more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MINUTES)
class SleepBenchmark {

@Param(Array("10000"))
var size: Int = _

def sleepBenchmark(implicit runtime: IORuntime): Int = {
def fiber(i: Int): IO[Int] =
IO.sleep(1.nanosecond).flatMap { _ =>
IO(i).flatMap { j =>
IO.sleep(1.nanosecond).flatMap { _ =>
if (j > 1000)
IO.sleep(1.nanosecond).flatMap(_ => IO.pure(j))
else
IO.sleep(1.nanosecond).flatMap(_ => fiber(j + 1))
}
}
}

List
.range(0, size)
.traverse(fiber(_).start)
.flatMap(_.traverse(_.joinWithNever))
.map(_.sum)
.unsafeRunSync()
}

@Benchmark
def sleep(): Int = {
import cats.effect.unsafe.implicits.global
sleepBenchmark
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import scala.concurrent.duration._

object SleepDrift extends IOApp.Simple {

override val runtimeConfig =
super.runtimeConfig.copy(cpuStarvationCheckInitialDelay = Duration.Inf)

val delayTwoMinutes = {
def loop(n: Int): IO[Unit] = {
if (n <= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,36 +165,24 @@ class WorkStealingBenchmark {
(ExecutionContext.fromExecutor(executor), () => executor.shutdown())
}

val (scheduler, schedDown) = {
val executor = Executors.newSingleThreadScheduledExecutor { r =>
val t = new Thread(r)
t.setName("io-scheduler")
t.setDaemon(true)
t.setPriority(Thread.MAX_PRIORITY)
t
}
(Scheduler.fromScheduledExecutor(executor), () => executor.shutdown())
}

val compute =
new WorkStealingThreadPool(
256,
"io-compute",
"io-blocker",
60.seconds,
_.printStackTrace())
val compute = new WorkStealingThreadPool(
256,
"io-compute",
"io-blocker",
60.seconds,
SleepSystem,
_.printStackTrace())

val cancelationCheckThreshold =
System.getProperty("cats.effect.cancelation.check.threshold", "512").toInt

IORuntime(
compute,
blocking,
scheduler,
compute,
() => {
compute.shutdown()
blockDown()
schedDown()
},
IORuntimeConfig(
cancelationCheckThreshold,
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ThisBuild / git.gitUncommittedChanges := {
}
}

ThisBuild / tlBaseVersion := "3.4"
ThisBuild / tlBaseVersion := "3.5"
ThisBuild / tlUntaggedAreSnapshots := false

ThisBuild / organization := "org.typelevel"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cats.effect
package unsafe

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

// Can you imagine a thread pool on JS? Have fun trying to extend or instantiate
// this class. Unfortunately, due to the explicit branching, this type leaks
Expand All @@ -27,6 +28,13 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
def execute(runnable: Runnable): Unit
def reportFailure(cause: Throwable): Unit
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
fallback: Scheduler): Runnable
private[effect] def canExecuteBlockingCode(): Boolean
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Expand Down
11 changes: 5 additions & 6 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ trait IOApp {
*/
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig()

protected def pollingSystem: unsafe.PollingSystem = unsafe.SleepSystem

/**
* Controls the number of worker threads which will be allocated to the compute pool in the
* underlying runtime. In general, this should be no ''greater'' than the number of physical
Expand Down Expand Up @@ -317,22 +319,19 @@ trait IOApp {
val (compute, compDown) =
IORuntime.createWorkStealingComputeThreadPool(
threads = computeWorkerThreadCount,
reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime))
reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime),
pollingSystem = pollingSystem)

val (blocking, blockDown) =
IORuntime.createDefaultBlockingExecutionContext()

val (scheduler, schedDown) =
IORuntime.createDefaultScheduler()

IORuntime(
compute,
blocking,
scheduler,
compute,
{ () =>
compDown()
blockDown()
schedDown()
},
runtimeConfig)
}
Expand Down
78 changes: 78 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/EventLoop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.reflect.ClassTag

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean

trait EventLoop[+Registrar] extends ExecutionContext {

protected def registrarTag: ClassTag[_ <: Registrar]

def registrar(): Registrar

}

object EventLoop {
def unapply[R](loop: EventLoop[Any])(ct: ClassTag[R]): Option[EventLoop[R]] =
if (ct.runtimeClass.isAssignableFrom(loop.registrarTag.runtimeClass))
Some(loop.asInstanceOf[EventLoop[R]])
else
None
Comment on lines +34 to +39
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of this pattern match maybe we should expose this as IO.eventLoop or something.


def fromPollingSystem(
name: String,
system: PollingSystem): (EventLoop[system.Poller], () => Unit) = {
Comment on lines +41 to +43
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single-thread dispatcher from a polling system. I just tossed this together, I doubt it's any good.

I suspect that there will be objections to including this in CE. My argument for why we should have this is:

  1. implementing a good event dispatcher thread is non-trivial
  2. in the case when the WSTP does not have the desired polling system installed, to provide a fallback implementation a dispatcher thread will be necessary.


val done = new AtomicBoolean(false)
val poller = system.makePoller()

val loop = new Thread(name) with EventLoop[system.Poller] with ExecutionContextExecutor {

val queue = new LinkedBlockingQueue[Runnable]

def registrarTag: ClassTag[_ <: system.Poller] = system.pollerTag

def registrar(): system.Poller = poller

def execute(command: Runnable): Unit = {
queue.put(command)
poller.interrupt(this)
}

def reportFailure(cause: Throwable): Unit = cause.printStackTrace()

override def run(): Unit = {
while (!done.get()) {
while (!queue.isEmpty()) queue.poll().run()
poller.poll(-1)
}
}
}

val cleanup = () => {
done.set(true)
poller.interrupt(loop)
}

(loop, cleanup)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type

private[this] final val DefaultBlockerPrefix = "io-compute-blocker"

// The default compute thread pool on the JVM is now a work stealing thread pool.
def createWorkStealingComputeThreadPool(
threads: Int = Math.max(2, Runtime.getRuntime().availableProcessors()),
threadPrefix: String = "io-compute",
blockerThreadPrefix: String = DefaultBlockerPrefix,
runtimeBlockingExpiration: Duration = 60.seconds,
pollingSystem: PollingSystem = SleepSystem,
reportFailure: Throwable => Unit = _.printStackTrace())
: (WorkStealingThreadPool, () => Unit) = {
val threadPool =
Expand All @@ -47,6 +47,7 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
threadPrefix,
blockerThreadPrefix,
runtimeBlockingExpiration,
pollingSystem,
reportFailure)

val unregisterMBeans =
Expand Down Expand Up @@ -180,8 +181,8 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
installGlobal {
val (compute, _) = createWorkStealingComputeThreadPool()
val (blocking, _) = createDefaultBlockingExecutionContext()
val (scheduler, _) = createDefaultScheduler()
IORuntime(compute, blocking, scheduler, () => (), IORuntimeConfig())

IORuntime(compute, blocking, compute, () => (), IORuntimeConfig())
}
}

Expand Down
33 changes: 33 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/PollingSystem.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

import scala.reflect.ClassTag

abstract class PollingSystem {

type Poller <: AbstractPoller
def pollerTag: ClassTag[Poller]

def makePoller(): Poller

protected abstract class AbstractPoller {
def poll(nanos: Long): Unit
def interrupt(target: Thread): Unit
}
}
Loading