-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Native Polling System #3314
Native Polling System #3314
Conversation
Co-authored-by: Lee Tibbert <lee.tibbert@gmail.com>
Co-authored-by: Lorenzo Gabriele <lorenzolespaul@gmail.com>
def eventLoop[Poller](implicit ct: ClassTag[Poller]): IO[Option[EventLoop[Poller]]] = | ||
IO.executionContext.map { | ||
case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller()) => | ||
Some(loop.asInstanceOf[EventLoop[Poller]]) | ||
case _ => None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The poller
is accessed like this.
* @return | ||
* whether poll should be called again (i.e., there are more events to be polled) | ||
*/ | ||
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While working on this PR I realized we definitely need reportFailure
available in the polling system. Although in theory the callbacks should not throw, one realistic scenario in which they may is if they attempt to use a closed Dispatcher
to run an effect in the callback.
Published |
This reverts commit d18fa76.
trait FileDescriptorPollHandle { | ||
|
||
/** | ||
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `read` or | ||
* `recv` on the file descriptor. | ||
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. | ||
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready | ||
* for reading. | ||
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this | ||
* method will complete with `B`. | ||
*/ | ||
def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] | ||
|
||
/** | ||
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `write` or | ||
* `send` on the file descriptor. | ||
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. | ||
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready | ||
* for writing. | ||
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this | ||
* method will complete with `B`. | ||
*/ | ||
def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I sort of tore everything up and started again from scratch. Sorry 😅
I am excited about this new API, which I think is a significant improvement.
Instead of exposing a raw, multi-shot callback to the user, the user passes an I/O operation (suspended in IO
) to the polling system. This gives the polling system full control over scheduling of the operation, so the specific strategy can be optimized for every backend (epoll, kqueue, io_uring).
The new API is also a much better fit for working with C libraries, where typically you are invoking an I/O operation repeatedly until it is unblocked and has fully consumed/provided the desired data.
I'll get a new snapshot up soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Daniel and I discussed this new API on Discord. Various doubts:
- the fact that these methods are not the "primitives" of the underlying polling systems
- whether this abstraction should even be in Cats Effect
- whether this API is too tailored to the FS2 I/O usecase
I am also concerned about these issues, since getting this API right is important for building a composable ecosystem on Native that can interop with external I/O libraries.
On the JVM the important primitive is IO.async
and friends. This is because external libraries that do I/O start their own threads and threadpools. We give them a callback that they execute on their own threadpool, to shift back into our own threadpool upon completion of some async task.
In Native world, C libraries that do I/O don't start their own threadpools. Instead they typically do two things:
- provide a set of file descriptors, that they are interested in reading/writing on
- provide a function(s) that "drive" the library. This function may either complete successfully (possibly invoking some callbacks) or return a special error code indicating that the method is blocked and should be tried again (ideally when I/O is known to be unblocked on the relevant file descriptors).
Some concrete examples of this:
- POSIX:
read
,write
,recv
,send
,connect
,accept
, which indicate blockage withEAGAIN
,EWOULDBLOCK
, andEINPROGRESS
- NGINX Unit:
nxt_unit_process_port_msg
, which indicates blockage withNXT_UNIT_AGAIN
- s2n-tls:
s2n_send
,s2n_recv
, which indicates blockage withS2N_ERR_T_BLOCKED
- c-ares async DNS resolver:
ares_process_fd
and then checking if its still interested in the socket viaares_getsock
These "driver" methods are designed and intended to be invoked repeatedly, in a loop, until the operation completes successfully (i.e. is not blocked). In some cases this requires threading some state through.
On the other side of things are how native polling systems work. There are essentially two categories:
-
kqueue and io_uring: these are really nice APIs, in that they let you setup notifications on multiple file descriptors in a single syscall (indeed, the syscall used for polling). These notifications can be one-shot, and specific to read or write.
This is an ideal scenario for load-balancing: you can always setup the notification on your current thread. As fibers get stolen, this should balance out dynamically.
-
epoll and libuv: are ... not quite so nice :) setting up each notification requires its own syscall. Furthermore, you must register for both reads and writes at the same time (i.e. they cannot be setup independently). Their notifications are multi-shot so will be repeated until the file descriptor is explicitly unregistered.
- epoll by default and libuv use "level-triggered" mode. This means it will keep
notifyingnagging that a file descriptor is ready until you drain/block it again. This is annoying if an fd is ready for reading/writing, but some downstream/upstream is not yet ready to take advantage of that. - epoll has an "edge-triggered" mode, such that it only notifies you once, when the status of the file descriptor changes. This is much more efficient, but requires some bookkeeping to make sure a notification is not missed.
- libuv also warns that you should be prepared to handle "spurious" notifications, where you are notified even though the file descriptor is actually still blocked.
Sadly this is not great for load balancing. To avoid syscalls and book-keeping it's preferable to register for notifications once. But then that file descriptor is tied to a particular thread indefinitely.
- epoll by default and libuv use "level-triggered" mode. This means it will keep
tl;dr
- C I/O libraries provide a list of "interesting" file descriptors and "driver" functions that you should invoke repeatedly while they are blocked, until they indicate success. This is not unique to the APIs that FS2 happens to use.
- Native polling systems either offer cheap-setup one-shot notifications or expensive-setup multi-shot notifications.
These constraints are what pushed me to the API proposed here: a way to hand the C libraries' "driver" functions directly to the polling system itself, so that it can schedule and invoke these functions using the optimal strategy for that particular system.
Finally, why put this abstraction in Cats Effect? So that we can have two independent, composable ecosystems:
- Pure wrappers around C libraries, that are based on file descriptor polling
- Polling systems, that implement file descriptor polling in addition to offering their own implementation specific primitives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just came here to say that this is a great write up that should be kept in some design document (or at least a gist)
Published |
fs2-io_uring now implements the |
The FS2 integration is now up in typelevel/fs2#3087. Lastly, I'd like to make sure that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we really need to consider the JVM simultaneously. There's a layer to this which is definitely platform-specific, but I think that layer is the polling system itself. I think that the IORuntime
and IO
APIs should likely be shared.
*/ | ||
type PollData | ||
|
||
def makePoller(ec: ExecutionContext, data: () => PollData): Poller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the ExecutionContext
is only necessary here because you're wrapping registration methods in subtypes in IO
. IMO, that juggle should be managed by callers, which already need to play tricks with evalOn
in order to keep things on the polling system pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | ||
* The thread-local data structure used for polling. | ||
*/ | ||
type PollData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is PollData
and Poller
separate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
import scala.concurrent.ExecutionContext | ||
|
||
abstract class PollingSystem { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do this correctly, this should be in jvm-native shared sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, definitely eventually. Until Native catches up with multi-threading it may not be possible, but I'll take a look.
readSemaphore: Semaphore[IO], | ||
writeSemaphore: Semaphore[IO] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's thread-local, why are the semaphores necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semaphores are to prevent running more than one read and one write operation on a file descriptor at a time.
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this | ||
* method will complete with `B`. | ||
*/ | ||
def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glancing at the implementations, it really looks like nothing about the looping structure here is platform-specific. In other words, this signature could be simplified and callers could use tailRecM
to achieve the same effect. If we want to provide a convenience function which packages this up, we can, but as it stands every polling system is just doing essentially the same thing here. That suggests that the true primitive is what happens in the Right
branch.
Edit Actually is this true? Looking again, it seems that the Epoll implementation explicitly recurses, while the KQueue one doesn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both implementations recurse. The recursion is used in the case that the operation fails because it's blocked, in which case it needs to be called again (ideally when the file descriptor is unblocked).
In other words, this signature could be simplified and callers could use
tailRecM
to achieve the same effect.
You may be right about this, but I need to think about it some more. If we were only dealing with kqueue/io_uring style polling systems, I would definitely agree.
However, for epoll, letting the polling system handle the recursion enables it to keep track of some internal state (related to edge-triggered mode I described above). This enables it to fast-path. Edit: actually, I think this is necessary for correctness, so it does not deadlock.
cats-effect/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Lines 112 to 131 in e5dd04f
def go(a: A, before: Int): IO[B] = | |
f(a).flatMap { | |
case Left(a) => | |
IO(readReadyCounter).flatMap { after => | |
if (before != after) | |
// there was a read-ready notification since we started, try again immediately | |
go(a, after) | |
else | |
IO.asyncCheckAttempt[Int] { cb => | |
IO { | |
readCallback = cb | |
// check again before we suspend | |
val now = readReadyCounter | |
if (now != before) { | |
readCallback = null | |
Right(now) | |
} else Left(Some(IO(this.readCallback = null))) | |
} | |
}.flatMap(go(a, _)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further justification for this API, specifically as it relates to epoll:
As I mentioned in #3314 (comment), libuv's general polling facility uses "level-triggered" mode. So does the JDK NIO Selector
. This is unfortunate, because level-triggering is very inefficient.
For example, Netty "uses linux EPOLL Edge-Triggered Mode for maximal performance."
https://github.com/netty/netty/blob/bc49c4b1464c8345bf87b3053b3adefa64f91530/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java#L39-L42
So, why do libuv and Selector
use level-triggering? This is explained in an nio-dev thread.
The Selector API requires a level-triggered polling mechanism.
Edge-triggered interfaces require a much tighter coupling with the I/O
methods.
https://mail.openjdk.org/pipermail/nio-dev/2008-December/000329.html
Essentially, this tight coupling forces the actual I/O implementations to be designed specifically for edge-triggered notifications, instead of against a generic I/O polling facility. This is consistent with Netty and also this warning in the libuv docs.
Using
uv_poll_t
for any other purpose is not recommended;uv_tcp_t
,uv_udp_t
, etc. provide an implementation that is faster and more scalable than what can be achieved with uv_poll_t
http://docs.libuv.org/en/v1.x/poll.html
Suffice to say, if I've done my job right, this API is encoding the tight coupling needed for edge-triggered notifications to work correctly: this is why the user must pass the desired I/O operation into the polling system.
Assuming this all works correctly, it lets downstreams build against a single, generic file descriptor polling interface, instead of maintaining multiple implementations tightly coupled to the underlying polling system. Furthermore, because this API is high-level, it lets the polling system optimize how and when notifications are scheduled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl;dr
- epoll is stupidly inefficient unless used in edge-triggered mode.
- In edge-triggered mode, epoll will not notify you if a file descriptor is already ready. It will only tell you when it transitions to ready.
- This leaves us with a race condition between performing an I/O operation and receiving a notification of I/O readiness. This API
isshould be designed to manage that race condition.
else if (LinktimeInfo.isMac) | ||
unsafe.KqueueSystem | ||
else | ||
unsafe.SleepSystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should… probably support Windows too someday :-(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No.
More seriously, I think an fs2-uv with a libuv-based polling system would be the least masochistic way to deal with this.
@@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => | |||
*/ | |||
def readLine: IO[String] = | |||
Console[IO].readLine | |||
|
|||
def poller[Poller](implicit ct: ClassTag[Poller]): IO[Option[Poller]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API isn't going to be rich enough for the JVM. The tricky thing isn't getting the polling system out of the ExecutionContext
, but rather getting the thread-local out of the worker. This function as-written could actually exist in downstream userspace and doesn't need to be added to IO
, whereas the thread-local extraction cannot.
This is probably a decent example of trying to do native first and following up with the JVM. IMO we really need to do both simultaneously, which complicates the implementation but forces us to come to terms with both platform constraints in a single API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fd: Int, | ||
reads: Boolean, | ||
writes: Boolean | ||
): Resource[IO, FileDescriptorPollHandle] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the use of IO
(and other high level constructs) in these APIs actively preventing us from taking advantage of thread-locals? It feels like this API is meant to be exceptionally low-level, except it's also presenting this high-level API. Are we sure this is safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def defaultScheduler: Scheduler = QueueExecutorScheduler | ||
def defaultScheduler: Scheduler = EventLoopExecutorScheduler.global | ||
|
||
def createEventLoop(system: PollingSystem): ExecutionContext with Scheduler = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
All of these are really the same question, which sadly means the main ideas of my encoding didn't really get across 😕
Firstly, the encoding I chose here can definitely be shared with the JVM (the only exception was interruption-related stuff, which I omitted here because no multi-threading). I had the JVM very much in mind when I worked on this. I'm happy to update the other PR to demonstrate exactly that. The important ideas are captured here: cats-effect/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala Lines 24 to 34 in e5dd04f
More specifically:
The This means the This also means that once a user has obtained a |
Ah I see. Okay I need to think more about this. |
This branch lives on in: |
Yes, I know, another polling system PR 😝 this one ports the ideas from #3296 to the Native platform.
The need on Native is greater, since non-blocking I/O is currently impossible without cooperation from the runtime. It's also a chance to test the API design since we have more leeway for breakage (particularly when we do the breaking update to Scala Native 0.5 in the near future).
This PR
PollingSystem
configurable in the event-loop runtimeFileDescriptorPoller
abstractionEpollSystem
andKqueueSystem
implementations from epollcatFollow-up work
Console
implementationDownstream integrations
PollingSystem
API armanbilge/fs2-io_uring#35/cc @LeeTibbert @lolgab