Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polling system #3332

Merged
merged 104 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
6a21e49
Initial sketch of `PollingSystem`
djspiewak Nov 24, 2022
cca9caf
Iterate polling system sketch
armanbilge Dec 1, 2022
6d23310
Extract `PollingSystem` abstraction on Native
armanbilge Dec 11, 2022
727bfe8
Add `FileDescriptorPoller` abstraction
armanbilge Dec 11, 2022
9a8f7ed
Add `EpollSystem` and `KqueueSystem`
armanbilge Dec 11, 2022
0889426
Add test for `Scheduler#sleep`
armanbilge Dec 11, 2022
4250049
Add `FileDescriptorPollerSpec`
armanbilge Dec 11, 2022
0b9ac02
Consistent error-handling in `KqueueSystem`
armanbilge Dec 11, 2022
956734a
Make `pollingSystem` configurable in `IOApp`
armanbilge Dec 11, 2022
dda54b7
Nowarn unuseds
armanbilge Dec 12, 2022
1206b27
Revise the fd poller spec
armanbilge Dec 12, 2022
e0c4ec3
Remove `maxEvents` config from `EpollSystem`
armanbilge Dec 12, 2022
eeeb3e6
Remove `maxEvents` config from `KqueueSystem`
armanbilge Dec 12, 2022
c4a0a16
Add test for many simultaneous events
armanbilge Dec 12, 2022
aae6e97
Remove redundant `final`
armanbilge Dec 12, 2022
457f89c
Update comment
armanbilge Dec 12, 2022
721c2fc
Add test for pre-existing readiness
armanbilge Dec 12, 2022
4f9e57b
Add test for no readiness
armanbilge Dec 12, 2022
a520aee
Reimagine `FileDescriptorPoller`
armanbilge Dec 19, 2022
5f8146b
Fix parameter names
armanbilge Dec 19, 2022
3640605
Refactor/redesign `PollingSystem` ... again ... (:
armanbilge Dec 19, 2022
42491da
Dump `EventLoop` abstraction
armanbilge Dec 19, 2022
786127c
Update the `FileDescriptorPollerSpec`
armanbilge Dec 19, 2022
de3eea0
Rework `EpollSystem`
armanbilge Dec 20, 2022
eb8ba84
Set pipes to non-blocking mode
armanbilge Dec 20, 2022
0124567
Add fcntl import
armanbilge Dec 20, 2022
72b05a7
Fix bugs in spec
armanbilge Dec 20, 2022
d18fa76
Add some uncancelables
armanbilge Dec 20, 2022
4d3a916
Revert "Add some uncancelables"
armanbilge Dec 20, 2022
9ba870f
Rework `KqueueSystem`
armanbilge Dec 20, 2022
9673883
Post-refactor typos
armanbilge Dec 20, 2022
43b0b0a
Scope `.evalOn` even more tightly
armanbilge Dec 24, 2022
e5dd04f
Use `asyncCheckAttempt`
armanbilge Dec 24, 2022
aae00a6
Merge remote-tracking branch 'origin/experiment/integrated-polling' i…
armanbilge Dec 25, 2022
a41a46e
Attempt to reconcile jvm/native polling systems
armanbilge Dec 25, 2022
1da8c70
Use polling system interruption; cleanup poll data
armanbilge Dec 25, 2022
e1cc016
First draft `SelectorSystem`
armanbilge Dec 26, 2022
1c263ad
Install `SelectorSystem` by default
armanbilge Dec 26, 2022
e44a802
Fix calculation of sleep duration
armanbilge Dec 26, 2022
720208c
Add `SelectorPollerSpec`
armanbilge Dec 26, 2022
411eadc
Only iterate ready keys if selector is open
armanbilge Dec 26, 2022
029f5a2
Fixup `SelectorSystem`
armanbilge Dec 26, 2022
c80fabf
Simplification
armanbilge Dec 26, 2022
9687a5c
Add scaladocs to `SelectorPoller`, bikeshed method
armanbilge Dec 26, 2022
861d902
scalafmt
armanbilge Dec 26, 2022
3d265d7
Organize imports
armanbilge Dec 26, 2022
01c4a03
Fix bincompat
armanbilge Dec 26, 2022
ec6a29c
Add test for using poller after blocking
armanbilge Dec 28, 2022
6c4a9d1
Use `delayWithData` abstraction, retire `evalOn`
armanbilge Dec 28, 2022
6581dc4
Guard againstmultiple wstps
armanbilge Dec 28, 2022
7d7055a
Merge remote-tracking branch 'upstream/series/3.x' into feature/jvm-p…
armanbilge Jan 28, 2023
23f20ac
Replace `delayWithData` with `register`
armanbilge Jan 28, 2023
e848c2b
`Poller->GlobalPollingState`, `PollData->Poller`
armanbilge Jan 28, 2023
d68c165
Merge remote-tracking branch 'upstream/series/3.x' into feature/jvm-p…
armanbilge Apr 19, 2023
144c049
Update headers
armanbilge Apr 19, 2023
33d8ff5
Try to fix polling/parking interaction
armanbilge Apr 20, 2023
1f95fd7
Fixes
armanbilge Apr 21, 2023
8bc1940
Clear `_active` when removing refs to old data
armanbilge Apr 23, 2023
3da03b9
doneSleeping
durban Apr 23, 2023
875166e
Add hanging test for parked threads and polled events
armanbilge Apr 27, 2023
e4cda08
Merge remote-tracking branch 'durban/feature/jvm-polling-system' into…
armanbilge Apr 27, 2023
bbb5dc5
Notify `doneSleeping` only if `polled` events
armanbilge Apr 27, 2023
91ef606
Add test for selecting illegal ops
armanbilge Apr 27, 2023
675838e
Try to replicate `CancelledKeyException`
armanbilge Apr 27, 2023
0b3be29
Add hanging test for concurrent close
armanbilge Apr 27, 2023
3589570
Handle `CancelledKeyException` in `SelectorSystem#poll`
armanbilge Apr 28, 2023
e1b1d37
Organize imports
armanbilge Apr 28, 2023
3d1cd96
Merge remote-tracking branch 'upstream/series/3.x' into feature/jvm-p…
armanbilge May 2, 2023
a769179
Merge branch 'series/3.x' into feature/jvm-polling-system
armanbilge May 11, 2023
5de6087
Merge remote-tracking branch 'upstream/series/3.x' into feature/jvm-p…
armanbilge May 23, 2023
4661ba3
Fixup native polling
armanbilge May 23, 2023
e6403d2
Add `Poller` type parameter to WSTP
armanbilge May 24, 2023
9b77aac
Cleanup casts
armanbilge May 24, 2023
c7a57a8
Fix 2.12 compile
armanbilge May 24, 2023
35eae3e
Expose pollers via `IORuntime`
armanbilge May 27, 2023
4710769
Fix test
armanbilge May 27, 2023
e5586dc
Bincompat fixes
armanbilge May 27, 2023
0142603
`GlobalPollingState`->`Api`, `SelectorPoller`->`Selector`
armanbilge May 28, 2023
45d16e7
Restore scala 3 + native + macos intel job
armanbilge May 29, 2023
72fbd79
Try to fix matrix exclusions
armanbilge May 29, 2023
78ce2c0
Use `Mutex` instead of `Semaphore(1)`
armanbilge May 29, 2023
63dc15c
Fix kqueue ready-queue draining
armanbilge May 30, 2023
5f6a7b3
Workaround SN bu in kqueue
armanbilge May 31, 2023
704b8ec
Tweak the workaround
armanbilge May 31, 2023
74e9031
Merge remote-tracking branch 'upstream/series/3.x' into feature/jvm-p…
armanbilge Jun 1, 2023
2096fad
Try to fix epoll binding on ARM
armanbilge Jun 6, 2023
8d0157a
Formatting
armanbilge Jun 6, 2023
cd7e0f4
Merge remote-tracking branch 'upstream/series/3.x' into feature/jvm-p…
armanbilge Jun 8, 2023
dfbe7c7
`Poller` -> `P`
armanbilge Jun 8, 2023
8d01908
Expose poller type in `liveTraces()`
armanbilge Jun 8, 2023
6442d7a
Better error reporting for `clock_gettime`
armanbilge Jun 12, 2023
b64bcfd
Add `PollingSystem#close`
armanbilge Jun 12, 2023
2cf8eee
Fix `SleepSystem` public api
armanbilge Jun 12, 2023
0a578ab
Workaround warning
armanbilge Jun 12, 2023
5460d48
Use `LongMap` for `KqueueSystem`
armanbilge Jun 12, 2023
8ad5971
Fix 2.12 compile
armanbilge Jun 12, 2023
7c8c36f
Poke ci
armanbilge Jun 12, 2023
5760cad
Fix filename
armanbilge Jun 12, 2023
f766126
Fix JVM global runtime shutdown
armanbilge Jun 12, 2023
e0e361b
Update names in `SelectorSpec`
armanbilge Jun 12, 2023
126dea3
Fix exception handling in `SelectorSystem`
armanbilge Jun 12, 2023
f1a2864
`poller`->`selector`
armanbilge Jun 13, 2023
09f0109
Optimize `SelectorSystem#poll` loop
armanbilge Jun 13, 2023
58f695f
Refactor exception handling in `EpollSystem`
armanbilge Jun 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ jobs:
java: graalvm@11
- os: windows-latest
scala: 3.3.0
ci: ciJVM
- os: macos-latest
scala: 3.3.0
ci: ciJVM
- os: windows-latest
scala: 2.12.18
ci: ciJVM
- os: macos-latest
scala: 2.12.18
ci: ciJVM
- ci: ciFirefox
scala: 3.3.0
- ci: ciChrome
Expand Down Expand Up @@ -97,9 +101,6 @@ jobs:
- os: macos-latest
ci: ciNative
scala: 2.12.18
- os: macos-latest
ci: ciNative
scala: 3.3.0
- os: windows-latest
java: graalvm@11
runs-on: ${{ matrix.os }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,13 @@ class WorkStealingBenchmark {
(ExecutionContext.fromExecutor(executor), () => executor.shutdown())
}

val compute = new WorkStealingThreadPool(
val compute = new WorkStealingThreadPool[AnyRef](
256,
"io-compute",
"io-blocker",
60.seconds,
false,
SleepSystem,
_.printStackTrace())

val cancelationCheckThreshold =
Expand Down
23 changes: 16 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ThisBuild / git.gitUncommittedChanges := {
}
}

ThisBuild / tlBaseVersion := "3.5"
ThisBuild / tlBaseVersion := "3.6"
ThisBuild / tlUntaggedAreSnapshots := false

ThisBuild / organization := "org.typelevel"
Expand Down Expand Up @@ -224,8 +224,8 @@ ThisBuild / githubWorkflowBuildMatrixExclusions := {
val windowsAndMacScalaFilters =
(ThisBuild / githubWorkflowScalaVersions).value.filterNot(Set(Scala213)).flatMap { scala =>
Seq(
MatrixExclude(Map("os" -> Windows, "scala" -> scala)),
MatrixExclude(Map("os" -> MacOS, "scala" -> scala)))
MatrixExclude(Map("os" -> Windows, "scala" -> scala, "ci" -> CI.JVM.command)),
MatrixExclude(Map("os" -> MacOS, "scala" -> scala, "ci" -> CI.JVM.command)))
}

val jsScalaFilters = for {
Expand Down Expand Up @@ -254,9 +254,7 @@ ThisBuild / githubWorkflowBuildMatrixExclusions := {

javaFilters ++ Seq(
MatrixExclude(Map("os" -> Windows, "ci" -> ci)),
MatrixExclude(Map("os" -> MacOS, "ci" -> ci, "scala" -> Scala212)),
// keep a native+2.13+macos job
MatrixExclude(Map("os" -> MacOS, "ci" -> ci, "scala" -> Scala3))
MatrixExclude(Map("os" -> MacOS, "ci" -> ci, "scala" -> Scala212))
)
}

Expand Down Expand Up @@ -640,7 +638,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.IOFiberConstants.ExecuteRunnableR"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.scope"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.IOFiberConstants.ContStateResult")
"cats.effect.IOFiberConstants.ContStateResult"),
// introduced by #3332, polling system
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.IORuntimeBuilder.this")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down Expand Up @@ -824,6 +825,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
} else Seq()
}
)
.nativeSettings(
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.PollingExecutorScheduler$SleepTask"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$")
)
)
.disablePlugins(JCStressPlugin)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration
// Can you imagine a thread pool on JS? Have fun trying to extend or instantiate
// this class. Unfortunately, due to the explicit branching, this type leaks
// into the shared source code of IOFiber.scala.
private[effect] sealed abstract class WorkStealingThreadPool private ()
private[effect] sealed abstract class WorkStealingThreadPool[P] private ()
extends ExecutionContext {
def execute(runnable: Runnable): Unit
def reportFailure(cause: Throwable): Unit
Expand All @@ -38,12 +38,12 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
private[effect] def canExecuteBlockingCode(): Boolean
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])],
Map[WorkerThread[P], (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])],
Map[Runnable, Trace])
}

private[unsafe] sealed abstract class WorkerThread private () extends Thread {
private[unsafe] def isOwnedBy(threadPool: WorkStealingThreadPool): Boolean
private[unsafe] sealed abstract class WorkerThread[P] private () extends Thread {
private[unsafe] def isOwnedBy(threadPool: WorkStealingThreadPool[_]): Boolean
private[unsafe] def monitor(fiber: Runnable): WeakBag.Handle
private[unsafe] def index: Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
private[effect] sealed class FiberMonitor(
// A reference to the compute pool of the `IORuntime` in which this suspended fiber bag
// operates. `null` if the compute pool of the `IORuntime` is not a `WorkStealingThreadPool`.
private[this] val compute: WorkStealingThreadPool
private[this] val compute: WorkStealingThreadPool[_]
) extends FiberMonitorShared {

private[this] final val BagReferences =
Expand All @@ -69,8 +69,8 @@ private[effect] sealed class FiberMonitor(
*/
def monitorSuspended(fiber: IOFiber[_]): WeakBag.Handle = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread]) {
val worker = thread.asInstanceOf[WorkerThread]
if (thread.isInstanceOf[WorkerThread[_]]) {
val worker = thread.asInstanceOf[WorkerThread[_]]
// Guard against tracking errors when multiple work stealing thread pools exist.
if (worker.isOwnedBy(compute)) {
worker.monitor(fiber)
Expand Down Expand Up @@ -116,14 +116,14 @@ private[effect] sealed class FiberMonitor(
val externalFibers = external.collect(justFibers)
val suspendedFibers = suspended.collect(justFibers)
val workersMapping: Map[
WorkerThread,
WorkerThread[_],
(Thread.State, Option[(IOFiber[_], Trace)], Map[IOFiber[_], Trace])] =
workers.map {
case (thread, (state, opt, set)) =>
val filteredOpt = opt.collect(justFibers)
val filteredSet = set.collect(justFibers)
(thread, (state, filteredOpt, filteredSet))
}
}.toMap

(externalFibers, workersMapping, suspendedFibers)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

abstract class PollingSystem {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As promised: this now lives in jvm-native/ shared sources.


/**
* The user-facing interface.
*/
type Api <: AnyRef

/**
* The thread-local data structure used for polling.
*/
type Poller <: AnyRef

def close(): Unit

def makeApi(register: (Poller => Unit) => Unit): Api

def makePoller(): Poller

def closePoller(poller: Poller): Unit

/**
* @param nanos
* the maximum duration for which to block, where `nanos == -1` indicates to block
* indefinitely.
*
* @return
* whether any events were polled
*/
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean

/**
* @return
* whether poll should be called again (i.e., there are more events to be polled)
*/
def needsPoll(poller: Poller): Boolean

def interrupt(targetThread: Thread, targetPoller: Poller): Unit
Comment on lines +38 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to pull this out into a Poller trait of some sort, similar to how I encoded it on my branch, rather than putting it all into a single trait like this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i swore off oop 😭 fine I guess, that's how I ended up implementing them anyway ... but I do prefer this encoding, for whatever reason

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I tried this change and I'm having trouble making it work with the WithPoller[P] aux type, which I think is necessary (see #3332 (comment)). Let me have a look at your branch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm 🤔

protected def pollingSystem: unsafe.PollingSystem[unsafe.Poller] = unsafe.SelectorSystem()


}

private object PollingSystem {
type WithPoller[P] = PollingSystem {
type Poller = P
}
}
9 changes: 7 additions & 2 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ trait IOApp {
*/
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig()

protected def pollingSystem: unsafe.PollingSystem =
unsafe.IORuntime.createDefaultPollingSystem()

/**
* Controls the number of worker threads which will be allocated to the compute pool in the
* underlying runtime. In general, this should be no ''greater'' than the number of physical
Expand Down Expand Up @@ -338,11 +341,12 @@ trait IOApp {
import unsafe.IORuntime

val installed = IORuntime installGlobal {
val (compute, compDown) =
val (compute, poller, compDown) =
IORuntime.createWorkStealingComputeThreadPool(
threads = computeWorkerThreadCount,
reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime),
blockedThreadDetectionEnabled = blockedThreadDetectionEnabled
blockedThreadDetectionEnabled = blockedThreadDetectionEnabled,
pollingSystem = pollingSystem
)

val (blocking, blockDown) =
Expand All @@ -352,6 +356,7 @@ trait IOApp {
compute,
blocking,
compute,
List(poller),
{ () =>
compDown()
blockDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
*/
def readLine: IO[String] =
Console[IO].readLine

}
36 changes: 36 additions & 0 deletions core/jvm/src/main/scala/cats/effect/Selector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

import java.nio.channels.SelectableChannel
import java.nio.channels.spi.SelectorProvider

trait Selector {

/**
* The [[java.nio.channels.spi.SelectorProvider]] that should be used to create
* [[java.nio.channels.SelectableChannel]]s that are compatible with this polling system.
*/
def provider: SelectorProvider

/**
* Fiber-block until a [[java.nio.channels.SelectableChannel]] is ready on at least one of the
* designated operations. The returned value will indicate which operations are ready.
*/
def select(ch: SelectableChannel, ops: Int): IO[Int]

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.concurrent.ExecutionContext

private[unsafe] trait FiberMonitorCompanionPlatform {
def apply(compute: ExecutionContext): FiberMonitor = {
if (TracingConstants.isStackTracing && compute.isInstanceOf[WorkStealingThreadPool]) {
val wstp = compute.asInstanceOf[WorkStealingThreadPool]
if (TracingConstants.isStackTracing && compute.isInstanceOf[WorkStealingThreadPool[_]]) {
val wstp = compute.asInstanceOf[WorkStealingThreadPool[_]]
new FiberMonitor(wstp)
} else {
new FiberMonitor(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,36 @@ package cats.effect.unsafe

private[unsafe] abstract class IORuntimeBuilderPlatform { self: IORuntimeBuilder =>

protected var customPollingSystem: Option[PollingSystem] = None

/**
* Override the default [[PollingSystem]]
*/
def setPollingSystem(system: PollingSystem): IORuntimeBuilder = {
if (customPollingSystem.isDefined) {
throw new RuntimeException("Polling system can only be set once")
}
customPollingSystem = Some(system)
this
}

// TODO unify this with the defaults in IORuntime.global and IOApp
protected def platformSpecificBuild: IORuntime = {
val (compute, computeShutdown) =
customCompute.getOrElse(
IORuntime.createWorkStealingComputeThreadPool(reportFailure = failureReporter))
val (compute, poller, computeShutdown) =
customCompute
.map {
case (c, s) =>
(c, Nil, s)
}
.getOrElse {
val (c, p, s) =
IORuntime.createWorkStealingComputeThreadPool(
pollingSystem =
customPollingSystem.getOrElse(IORuntime.createDefaultPollingSystem()),
reportFailure = failureReporter
)
(c, List(p), s)
}
val xformedCompute = computeTransform(compute)

val (scheduler, schedulerShutdown) = xformedCompute match {
Expand All @@ -36,6 +61,7 @@ private[unsafe] abstract class IORuntimeBuilderPlatform { self: IORuntimeBuilder
computeShutdown()
blockingShutdown()
schedulerShutdown()
extraPollers.foreach(_._2())
extraShutdownHooks.reverse.foreach(_())
}
val runtimeConfig = customConfig.getOrElse(IORuntimeConfig())
Expand All @@ -44,6 +70,7 @@ private[unsafe] abstract class IORuntimeBuilderPlatform { self: IORuntimeBuilder
computeTransform(compute),
blockingTransform(blocking),
scheduler,
poller ::: extraPollers.map(_._1),
shutdown,
runtimeConfig
)
Expand Down
Loading