Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yet another polling system sketch #3296

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a840035
Implement SleepCallback
vasilmkd Aug 20, 2021
91f7dec
Implement SleepersQueue
vasilmkd Aug 20, 2021
bf7e95d
Add a cancelation mechanism to SleepCallback
vasilmkd Aug 20, 2021
9d061d5
Complicate the parking mechanism a bit
vasilmkd Aug 20, 2021
9d5c01a
Pretend to handle sleeping fibers
vasilmkd Aug 20, 2021
09b1073
Add a sleep method on WorkerThread
vasilmkd Aug 20, 2021
787173a
Implement the sleep method on WorkStealingThreadPool
vasilmkd Aug 20, 2021
7bbaf96
Simplify canceled sleepers handling
vasilmkd Aug 21, 2021
96e3baa
Worker threads need to fight for their awakening
vasilmkd Aug 21, 2021
a0db9bc
Wire up the new sleeping mechanism in the run loop
vasilmkd Aug 21, 2021
97ac586
Restore the correct thread pool state after sleeping
vasilmkd Aug 21, 2021
a46b1bc
Rename the internal sleep method
vasilmkd Aug 21, 2021
fb644ec
Add a sleep cancelation unit test
vasilmkd Aug 21, 2021
3935273
Add a link to the openjdk source code
vasilmkd Aug 21, 2021
f00c2b4
Add SleepBenchmark
vasilmkd Aug 21, 2021
c13dacc
Check ownership of `WorkerThread` before sleeping
vasilmkd Aug 24, 2021
2d679c7
`WorkStealingThreadPool` extends `Scheduler`
vasilmkd Aug 24, 2021
b2808e7
Switch the default scheduler to the compute pool
vasilmkd Aug 24, 2021
6431187
Add a unit test for foreign execution contexts
vasilmkd Aug 24, 2021
862f024
Directly complete the sleep async callback
vasilmkd Aug 26, 2021
5503844
Re-use the `RightUnit` instance
vasilmkd Aug 26, 2021
73811e3
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Oct 30, 2022
4bbfdfb
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak Nov 20, 2022
6097579
Removed references to external `IORuntime` from `sleep` unhappy path
djspiewak Nov 20, 2022
55e5bfa
Fixed memory leak in sleep cancelation
djspiewak Nov 20, 2022
65d5561
Fixed reference to `global` executor
djspiewak Nov 20, 2022
5801a8d
`sbt prePR`
djspiewak Nov 20, 2022
639ac01
This definitely bumps the base version
djspiewak Nov 20, 2022
16715d3
Disable starvation checker in `SleepDrift` test
djspiewak Nov 20, 2022
564ffba
Removed errant println
djspiewak Nov 21, 2022
fce4572
Fixed `blockOn` mechanism for integrated timers
djspiewak Nov 21, 2022
3ab4a7c
Moved WSTP tests into platform spec
djspiewak Nov 21, 2022
01f5b3a
Removed unused import
djspiewak Nov 22, 2022
6a21e49
Initial sketch of `PollingSystem`
djspiewak Nov 24, 2022
cca9caf
Iterate polling system sketch
armanbilge Dec 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/EventLoop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.reflect.ClassTag

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean

trait EventLoop[+Registrar] extends ExecutionContext {

protected def registrarTag: ClassTag[_ <: Registrar]

def registrar(): Registrar

}

object EventLoop {
def unapply[R](loop: EventLoop[Any])(ct: ClassTag[R]): Option[EventLoop[R]] =
if (ct.runtimeClass.isAssignableFrom(loop.registrarTag.runtimeClass))
Some(loop.asInstanceOf[EventLoop[R]])
else
None
Comment on lines +34 to +39
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of this pattern match maybe we should expose this as IO.eventLoop or something.


def fromPollingSystem(
name: String,
system: PollingSystem): (EventLoop[system.Poller], () => Unit) = {
Comment on lines +41 to +43
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single-thread dispatcher from a polling system. I just tossed this together, I doubt it's any good.

I suspect that there will be objections to including this in CE. My argument for why we should have this is:

  1. implementing a good event dispatcher thread is non-trivial
  2. in the case when the WSTP does not have the desired polling system installed, to provide a fallback implementation a dispatcher thread will be necessary.


val done = new AtomicBoolean(false)
val poller = system.makePoller()

val loop = new Thread(name) with EventLoop[system.Poller] with ExecutionContextExecutor {

val queue = new LinkedBlockingQueue[Runnable]

def registrarTag: ClassTag[_ <: system.Poller] = system.pollerTag

def registrar(): system.Poller = poller

def execute(command: Runnable): Unit = {
queue.put(command)
poller.interrupt(this)
}

def reportFailure(cause: Throwable): Unit = cause.printStackTrace()

override def run(): Unit = {
while (!done.get()) {
while (!queue.isEmpty()) queue.poll().run()
poller.poll(-1)
}
}
}

val cleanup = () => {
done.set(true)
poller.interrupt(loop)
}

(loop, cleanup)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ import scala.reflect.ClassTag
abstract class PollingSystem {

type Poller <: AbstractPoller
def pollerTag: ClassTag[Poller]

def apply(): Poller

final def local()(implicit tag: ClassTag[Poller]): Option[Poller] =
Thread.currentThread() match {
case t: WorkerThread => tag.unapply(t.poller())
case _ => None
}
def makePoller(): Poller

protected abstract class AbstractPoller {
def poll(nanos: Long): Unit
Expand Down
6 changes: 5 additions & 1 deletion core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package cats.effect
package unsafe

import scala.reflect.ClassTag

import java.util.concurrent.locks.LockSupport

object SleepSystem extends PollingSystem {

def apply(): Poller = new Poller()
def pollerTag: ClassTag[Poller] = ClassTag(classOf[Poller])

def makePoller(): Poller = new Poller()

final class Poller extends AbstractPoller {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import java.util.Comparator
import java.util.concurrent.{ConcurrentSkipListSet, ThreadLocalRandom}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.concurrent.locks.LockSupport
import scala.reflect.ClassTag

/**
* Work-stealing thread pool which manages a pool of [[WorkerThread]] s for the specific purpose
Expand All @@ -67,7 +68,8 @@ private[effect] final class WorkStealingThreadPool(
system: PollingSystem,
reportFailure0: Throwable => Unit
) extends ExecutionContextExecutor
with Scheduler {
with Scheduler
with EventLoop[Any] {

import TracingConstants._
import WorkStealingThreadPoolConstants._
Expand Down Expand Up @@ -126,7 +128,7 @@ private[effect] final class WorkStealingThreadPool(
fiberBags(i) = fiberBag
val sleepersQueue = SleepersQueue.empty
sleepersQueues(i) = sleepersQueue
val poller = system()
val poller = system.makePoller()
pollers(i) = poller

val thread =
Expand Down Expand Up @@ -585,6 +587,20 @@ private[effect] final class WorkStealingThreadPool(
}
}

def registrar(): Any = {
val pool = this
val thread = Thread.currentThread()

if (thread.isInstanceOf[WorkerThread]) {
val worker = thread.asInstanceOf[WorkerThread]
if (worker.isOwnedBy(pool)) return worker.poller()
}

throw new RuntimeException("Invoked from outside the WSTP")
}

protected def registrarTag: ClassTag[?] = system.pollerTag

/**
* Shut down the thread pool and clean up the pool state. Calling this method after the pool
* has been shut down has no effect.
Expand Down