-
Notifications
You must be signed in to change notification settings - Fork 531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Yet another polling system sketch #3296
Closed
armanbilge
wants to merge
35
commits into
typelevel:series/3.x
from
armanbilge:experiment/integrated-polling
Closed
Changes from all commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
a840035
Implement SleepCallback
vasilmkd 91f7dec
Implement SleepersQueue
vasilmkd bf7e95d
Add a cancelation mechanism to SleepCallback
vasilmkd 9d061d5
Complicate the parking mechanism a bit
vasilmkd 9d5c01a
Pretend to handle sleeping fibers
vasilmkd 09b1073
Add a sleep method on WorkerThread
vasilmkd 787173a
Implement the sleep method on WorkStealingThreadPool
vasilmkd 7bbaf96
Simplify canceled sleepers handling
vasilmkd 96e3baa
Worker threads need to fight for their awakening
vasilmkd a0db9bc
Wire up the new sleeping mechanism in the run loop
vasilmkd 97ac586
Restore the correct thread pool state after sleeping
vasilmkd a46b1bc
Rename the internal sleep method
vasilmkd fb644ec
Add a sleep cancelation unit test
vasilmkd 3935273
Add a link to the openjdk source code
vasilmkd f00c2b4
Add SleepBenchmark
vasilmkd c13dacc
Check ownership of `WorkerThread` before sleeping
vasilmkd 2d679c7
`WorkStealingThreadPool` extends `Scheduler`
vasilmkd b2808e7
Switch the default scheduler to the compute pool
vasilmkd 6431187
Add a unit test for foreign execution contexts
vasilmkd 862f024
Directly complete the sleep async callback
vasilmkd 5503844
Re-use the `RightUnit` instance
vasilmkd 73811e3
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak 4bbfdfb
Merge branch 'series/3.x' into feature/integrated-timers
djspiewak 6097579
Removed references to external `IORuntime` from `sleep` unhappy path
djspiewak 55e5bfa
Fixed memory leak in sleep cancelation
djspiewak 65d5561
Fixed reference to `global` executor
djspiewak 5801a8d
`sbt prePR`
djspiewak 639ac01
This definitely bumps the base version
djspiewak 16715d3
Disable starvation checker in `SleepDrift` test
djspiewak 564ffba
Removed errant println
djspiewak fce4572
Fixed `blockOn` mechanism for integrated timers
djspiewak 3ab4a7c
Moved WSTP tests into platform spec
djspiewak 01f5b3a
Removed unused import
djspiewak 6a21e49
Initial sketch of `PollingSystem`
djspiewak cca9caf
Iterate polling system sketch
armanbilge File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
78 changes: 78 additions & 0 deletions
78
benchmarks/src/main/scala/cats/effect/benchmarks/SleepBenchmark.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Copyright 2020-2022 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect.benchmarks | ||
|
||
import cats.effect.IO | ||
import cats.effect.unsafe._ | ||
import cats.syntax.all._ | ||
|
||
import org.openjdk.jmh.annotations._ | ||
|
||
import scala.concurrent.duration._ | ||
|
||
import java.util.concurrent.TimeUnit | ||
|
||
/** | ||
* To do comparative benchmarks between versions: | ||
* | ||
* benchmarks/run-benchmark SleepBenchmark | ||
* | ||
* This will generate results in `benchmarks/results`. | ||
* | ||
* Or to run the benchmark from within sbt: | ||
* | ||
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.SleepBenchmark | ||
* | ||
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that | ||
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but | ||
* more is better. | ||
*/ | ||
@State(Scope.Thread) | ||
@BenchmarkMode(Array(Mode.Throughput)) | ||
@OutputTimeUnit(TimeUnit.MINUTES) | ||
class SleepBenchmark { | ||
|
||
@Param(Array("10000")) | ||
var size: Int = _ | ||
|
||
def sleepBenchmark(implicit runtime: IORuntime): Int = { | ||
def fiber(i: Int): IO[Int] = | ||
IO.sleep(1.nanosecond).flatMap { _ => | ||
IO(i).flatMap { j => | ||
IO.sleep(1.nanosecond).flatMap { _ => | ||
if (j > 1000) | ||
IO.sleep(1.nanosecond).flatMap(_ => IO.pure(j)) | ||
else | ||
IO.sleep(1.nanosecond).flatMap(_ => fiber(j + 1)) | ||
} | ||
} | ||
} | ||
|
||
List | ||
.range(0, size) | ||
.traverse(fiber(_).start) | ||
.flatMap(_.traverse(_.joinWithNever)) | ||
.map(_.sum) | ||
.unsafeRunSync() | ||
} | ||
|
||
@Benchmark | ||
def sleep(): Int = { | ||
import cats.effect.unsafe.implicits.global | ||
sleepBenchmark | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
78 changes: 78 additions & 0 deletions
78
core/jvm/src/main/scala/cats/effect/unsafe/EventLoop.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Copyright 2020-2022 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect | ||
package unsafe | ||
|
||
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} | ||
import scala.reflect.ClassTag | ||
|
||
import java.util.concurrent.LinkedBlockingQueue | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
|
||
trait EventLoop[+Registrar] extends ExecutionContext { | ||
|
||
protected def registrarTag: ClassTag[_ <: Registrar] | ||
|
||
def registrar(): Registrar | ||
|
||
} | ||
|
||
object EventLoop { | ||
def unapply[R](loop: EventLoop[Any])(ct: ClassTag[R]): Option[EventLoop[R]] = | ||
if (ct.runtimeClass.isAssignableFrom(loop.registrarTag.runtimeClass)) | ||
Some(loop.asInstanceOf[EventLoop[R]]) | ||
else | ||
None | ||
|
||
def fromPollingSystem( | ||
name: String, | ||
system: PollingSystem): (EventLoop[system.Poller], () => Unit) = { | ||
Comment on lines
+41
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A single-thread dispatcher from a polling system. I just tossed this together, I doubt it's any good. I suspect that there will be objections to including this in CE. My argument for why we should have this is:
|
||
|
||
val done = new AtomicBoolean(false) | ||
val poller = system.makePoller() | ||
|
||
val loop = new Thread(name) with EventLoop[system.Poller] with ExecutionContextExecutor { | ||
|
||
val queue = new LinkedBlockingQueue[Runnable] | ||
|
||
def registrarTag: ClassTag[_ <: system.Poller] = system.pollerTag | ||
|
||
def registrar(): system.Poller = poller | ||
|
||
def execute(command: Runnable): Unit = { | ||
queue.put(command) | ||
poller.interrupt(this) | ||
} | ||
|
||
def reportFailure(cause: Throwable): Unit = cause.printStackTrace() | ||
|
||
override def run(): Unit = { | ||
while (!done.get()) { | ||
while (!queue.isEmpty()) queue.poll().run() | ||
poller.poll(-1) | ||
} | ||
} | ||
} | ||
|
||
val cleanup = () => { | ||
done.set(true) | ||
poller.interrupt(loop) | ||
} | ||
|
||
(loop, cleanup) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
core/jvm/src/main/scala/cats/effect/unsafe/PollingSystem.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Copyright 2020-2022 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect | ||
package unsafe | ||
|
||
import scala.reflect.ClassTag | ||
|
||
abstract class PollingSystem { | ||
|
||
type Poller <: AbstractPoller | ||
def pollerTag: ClassTag[Poller] | ||
|
||
def makePoller(): Poller | ||
|
||
protected abstract class AbstractPoller { | ||
def poll(nanos: Long): Unit | ||
def interrupt(target: Thread): Unit | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So instead of this pattern match maybe we should expose this as
IO.eventLoop
or something.