-
Notifications
You must be signed in to change notification settings - Fork 531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Native Polling System #3314
Native Polling System #3314
Changes from all commits
6d23310
727bfe8
9a8f7ed
0889426
4250049
0b9ac02
956734a
dda54b7
1206b27
e0c4ec3
eeeb3e6
c4a0a16
aae6e97
457f89c
721c2fc
4f9e57b
a520aee
5f8146b
3640605
42491da
786127c
de3eea0
eb8ba84
0124567
72b05a7
d18fa76
4d3a916
9ba870f
9673883
43b0b0a
e5dd04f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright 2020-2022 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect | ||
|
||
trait FileDescriptorPoller { | ||
|
||
/** | ||
* Registers a file descriptor with the poller and monitors read- and/or write-ready events. | ||
*/ | ||
def registerFileDescriptor( | ||
fileDescriptor: Int, | ||
monitorReadReady: Boolean, | ||
monitorWriteReady: Boolean | ||
): Resource[IO, FileDescriptorPollHandle] | ||
|
||
} | ||
|
||
trait FileDescriptorPollHandle { | ||
|
||
/** | ||
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `read` or | ||
* `recv` on the file descriptor. | ||
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. | ||
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready | ||
* for reading. | ||
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this | ||
* method will complete with `B`. | ||
*/ | ||
def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] | ||
|
||
/** | ||
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `write` or | ||
* `send` on the file descriptor. | ||
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. | ||
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready | ||
* for writing. | ||
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this | ||
* method will complete with `B`. | ||
*/ | ||
def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] | ||
|
||
} | ||
Comment on lines
+32
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I sort of tore everything up and started again from scratch. Sorry 😅 I am excited about this new API, which I think is a significant improvement. Instead of exposing a raw, multi-shot callback to the user, the user passes an I/O operation (suspended in The new API is also a much better fit for working with C libraries, where typically you are invoking an I/O operation repeatedly until it is unblocked and has fully consumed/provided the desired data. I'll get a new snapshot up soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Daniel and I discussed this new API on Discord. Various doubts:
I am also concerned about these issues, since getting this API right is important for building a composable ecosystem on Native that can interop with external I/O libraries. On the JVM the important primitive is In Native world, C libraries that do I/O don't start their own threadpools. Instead they typically do two things:
Some concrete examples of this:
These "driver" methods are designed and intended to be invoked repeatedly, in a loop, until the operation completes successfully (i.e. is not blocked). In some cases this requires threading some state through. On the other side of things are how native polling systems work. There are essentially two categories:
tl;dr
These constraints are what pushed me to the API proposed here: a way to hand the C libraries' "driver" functions directly to the polling system itself, so that it can schedule and invoke these functions using the optimal strategy for that particular system. Finally, why put this abstraction in Cats Effect? So that we can have two independent, composable ecosystems:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just came here to say that this is a great write up that should be kept in some design document (or at least a gist) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import cats.effect.metrics.NativeCpuStarvationMetrics | |
|
||
import scala.concurrent.CancellationException | ||
import scala.concurrent.duration._ | ||
import scala.scalanative.meta.LinktimeInfo | ||
|
||
/** | ||
* The primary entry point to a Cats Effect application. Extend this trait rather than defining | ||
|
@@ -165,6 +166,21 @@ trait IOApp { | |
*/ | ||
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig() | ||
|
||
/** | ||
* The [[unsafe.PollingSystem]] used by the [[runtime]] which will evaluate the [[IO]] | ||
* produced by `run`. It is very unlikely that users will need to override this method. | ||
* | ||
* [[unsafe.PollingSystem]] implementors may provide their own flavors of [[IOApp]] that | ||
* override this method. | ||
*/ | ||
protected def pollingSystem: unsafe.PollingSystem = | ||
if (LinktimeInfo.isLinux) | ||
unsafe.EpollSystem | ||
else if (LinktimeInfo.isMac) | ||
unsafe.KqueueSystem | ||
else | ||
unsafe.SleepSystem | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should… probably support Windows too someday :-( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. More seriously, I think an fs2-uv with a libuv-based polling system would be the least masochistic way to deal with this. |
||
|
||
/** | ||
* The entry point for your application. Will be called by the runtime when the process is | ||
* started. If the underlying runtime supports it, any arguments passed to the process will be | ||
|
@@ -186,12 +202,8 @@ trait IOApp { | |
import unsafe.IORuntime | ||
|
||
val installed = IORuntime installGlobal { | ||
IORuntime( | ||
IORuntime.defaultComputeExecutionContext, | ||
IORuntime.defaultComputeExecutionContext, | ||
IORuntime.defaultScheduler, | ||
() => (), | ||
runtimeConfig) | ||
val loop = IORuntime.createEventLoop(pollingSystem) | ||
IORuntime(loop, loop, loop, () => (), runtimeConfig) | ||
} | ||
|
||
_runtime = IORuntime.global | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,9 @@ | |
package cats.effect | ||
|
||
import cats.effect.std.Console | ||
import cats.effect.unsafe.EventLoopExecutorScheduler | ||
|
||
import scala.reflect.ClassTag | ||
|
||
import java.time.Instant | ||
|
||
|
@@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => | |
*/ | ||
def readLine: IO[String] = | ||
Console[IO].readLine | ||
|
||
def poller[Poller](implicit ct: ClassTag[Poller]): IO[Option[Poller]] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API isn't going to be rich enough for the JVM. The tricky thing isn't getting the polling system out of the This is probably a decent example of trying to do native first and following up with the JVM. IMO we really need to do both simultaneously, which complicates the implementation but forces us to come to terms with both platform constraints in a single API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
IO.executionContext.map { | ||
case loop: EventLoopExecutorScheduler if ct.runtimeClass.isInstance(loop.poller) => | ||
Some(loop.poller.asInstanceOf[Poller]) | ||
case _ => None | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glancing at the implementations, it really looks like nothing about the looping structure here is platform-specific. In other words, this signature could be simplified and callers could use
tailRecM
to achieve the same effect. If we want to provide a convenience function which packages this up, we can, but as it stands every polling system is just doing essentially the same thing here. That suggests that the true primitive is what happens in theRight
branch.Edit Actually is this true? Looking again, it seems that the Epoll implementation explicitly recurses, while the KQueue one doesn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both implementations recurse. The recursion is used in the case that the operation fails because it's blocked, in which case it needs to be called again (ideally when the file descriptor is unblocked).
You may be right about this, but I need to think about it some more. If we were only dealing with kqueue/io_uring style polling systems, I would definitely agree.
However, for epoll, letting the polling system handle the recursion enables it to keep track of some internal state (related to edge-triggered mode I described above). This enables it to fast-path. Edit: actually, I think this is necessary for correctness, so it does not deadlock.
cats-effect/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Lines 112 to 131 in e5dd04f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further justification for this API, specifically as it relates to epoll:
As I mentioned in #3314 (comment), libuv's general polling facility uses "level-triggered" mode. So does the JDK NIO
Selector
. This is unfortunate, because level-triggering is very inefficient.For example, Netty "uses linux EPOLL Edge-Triggered Mode for maximal performance."
https://github.com/netty/netty/blob/bc49c4b1464c8345bf87b3053b3adefa64f91530/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java#L39-L42
So, why do libuv and
Selector
use level-triggering? This is explained in an nio-dev thread.https://mail.openjdk.org/pipermail/nio-dev/2008-December/000329.html
Essentially, this tight coupling forces the actual I/O implementations to be designed specifically for edge-triggered notifications, instead of against a generic I/O polling facility. This is consistent with Netty and also this warning in the libuv docs.
http://docs.libuv.org/en/v1.x/poll.html
Suffice to say, if I've done my job right, this API is encoding the tight coupling needed for edge-triggered notifications to work correctly: this is why the user must pass the desired I/O operation into the polling system.
Assuming this all works correctly, it lets downstreams build against a single, generic file descriptor polling interface, instead of maintaining multiple implementations tightly coupled to the underlying polling system. Furthermore, because this API is high-level, it lets the polling system optimize how and when notifications are scheduled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl;dr
isshould be designed to manage that race condition.