Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native Polling System #3314

Closed

Conversation

armanbilge
Copy link
Member

@armanbilge armanbilge commented Dec 11, 2022

Yes, I know, another polling system PR 😝 this one ports the ideas from #3296 to the Native platform.

The need on Native is greater, since non-blocking I/O is currently impossible without cooperation from the runtime. It's also a chance to test the API design since we have more leeway for breakage (particularly when we do the breaking update to Scala Native 0.5 in the near future).

This PR

  1. Makes the PollingSystem configurable in the event-loop runtime
  2. Introduces a FileDescriptorPoller abstraction
    • sufficient to implement non-blocking socket I/O, signals etc.
    • implementable by epoll, kqueue, io_uring, libuv, etc.
  3. Cribs the EpollSystem and KqueueSystem implementations from epollcat

Follow-up work

  • install signal handlers for cancelation and fiber dumps
  • a non-blocking, cancelable Console implementation

Downstream integrations

/cc @LeeTibbert @lolgab

@armanbilge armanbilge added this to the v3.5.0 milestone Dec 11, 2022
Comment on lines 69 to 74
def eventLoop[Poller](implicit ct: ClassTag[Poller]): IO[Option[EventLoop[Poller]]] =
IO.executionContext.map {
case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller()) =>
Some(loop.asInstanceOf[EventLoop[Poller]])
case _ => None
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poller is accessed like this.

* @return
* whether poll should be called again (i.e., there are more events to be polled)
*/
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on this PR I realized we definitely need reportFailure available in the polling system. Although in theory the callbacks should not throw, one realistic scenario in which they may is if they attempt to use a closed Dispatcher to run an effect in the callback.

@armanbilge
Copy link
Member Author

Published 3.5-4f9e57b for downstream experimentation.

Comment on lines +32 to +56
trait FileDescriptorPollHandle {

/**
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `read` or
* `recv` on the file descriptor.
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`.
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready
* for reading.
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this
* method will complete with `B`.
*/
def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B]

/**
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `write` or
* `send` on the file descriptor.
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`.
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready
* for writing.
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this
* method will complete with `B`.
*/
def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B]

}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of tore everything up and started again from scratch. Sorry 😅

I am excited about this new API, which I think is a significant improvement.

Instead of exposing a raw, multi-shot callback to the user, the user passes an I/O operation (suspended in IO) to the polling system. This gives the polling system full control over scheduling of the operation, so the specific strategy can be optimized for every backend (epoll, kqueue, io_uring).

The new API is also a much better fit for working with C libraries, where typically you are invoking an I/O operation repeatedly until it is unblocked and has fully consumed/provided the desired data.

I'll get a new snapshot up soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Daniel and I discussed this new API on Discord. Various doubts:

  • the fact that these methods are not the "primitives" of the underlying polling systems
  • whether this abstraction should even be in Cats Effect
  • whether this API is too tailored to the FS2 I/O usecase

I am also concerned about these issues, since getting this API right is important for building a composable ecosystem on Native that can interop with external I/O libraries.


On the JVM the important primitive is IO.async and friends. This is because external libraries that do I/O start their own threads and threadpools. We give them a callback that they execute on their own threadpool, to shift back into our own threadpool upon completion of some async task.

In Native world, C libraries that do I/O don't start their own threadpools. Instead they typically do two things:

  1. provide a set of file descriptors, that they are interested in reading/writing on
  2. provide a function(s) that "drive" the library. This function may either complete successfully (possibly invoking some callbacks) or return a special error code indicating that the method is blocked and should be tried again (ideally when I/O is known to be unblocked on the relevant file descriptors).

Some concrete examples of this:

These "driver" methods are designed and intended to be invoked repeatedly, in a loop, until the operation completes successfully (i.e. is not blocked). In some cases this requires threading some state through.


On the other side of things are how native polling systems work. There are essentially two categories:

  1. kqueue and io_uring: these are really nice APIs, in that they let you setup notifications on multiple file descriptors in a single syscall (indeed, the syscall used for polling). These notifications can be one-shot, and specific to read or write.

    This is an ideal scenario for load-balancing: you can always setup the notification on your current thread. As fibers get stolen, this should balance out dynamically.

  2. epoll and libuv: are ... not quite so nice :) setting up each notification requires its own syscall. Furthermore, you must register for both reads and writes at the same time (i.e. they cannot be setup independently). Their notifications are multi-shot so will be repeated until the file descriptor is explicitly unregistered.

    • epoll by default and libuv use "level-triggered" mode. This means it will keep notifying nagging that a file descriptor is ready until you drain/block it again. This is annoying if an fd is ready for reading/writing, but some downstream/upstream is not yet ready to take advantage of that.
    • epoll has an "edge-triggered" mode, such that it only notifies you once, when the status of the file descriptor changes. This is much more efficient, but requires some bookkeeping to make sure a notification is not missed.
    • libuv also warns that you should be prepared to handle "spurious" notifications, where you are notified even though the file descriptor is actually still blocked.

    Sadly this is not great for load balancing. To avoid syscalls and book-keeping it's preferable to register for notifications once. But then that file descriptor is tied to a particular thread indefinitely.


tl;dr

  • C I/O libraries provide a list of "interesting" file descriptors and "driver" functions that you should invoke repeatedly while they are blocked, until they indicate success. This is not unique to the APIs that FS2 happens to use.
  • Native polling systems either offer cheap-setup one-shot notifications or expensive-setup multi-shot notifications.

These constraints are what pushed me to the API proposed here: a way to hand the C libraries' "driver" functions directly to the polling system itself, so that it can schedule and invoke these functions using the optimal strategy for that particular system.

Finally, why put this abstraction in Cats Effect? So that we can have two independent, composable ecosystems:

  1. Pure wrappers around C libraries, that are based on file descriptor polling
  2. Polling systems, that implement file descriptor polling in addition to offering their own implementation specific primitives.

Copy link
Contributor

@Baccata Baccata Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just came here to say that this is a great write up that should be kept in some design document (or at least a gist)

@armanbilge
Copy link
Member Author

Published 3.5-9ba870f.

@armanbilge
Copy link
Member Author

armanbilge commented Dec 21, 2022

fs2-io_uring now implements the PollingSystem API in armanbilge/fs2-io_uring#35. In addition to io_uring specific features, it also supports the generic FileDescriptorPoller interface so it can be used with higher-level libraries such as NGINX Unit, libcurl, and c-ares.

@armanbilge
Copy link
Member Author

The FS2 integration is now up in typelevel/fs2#3087.

Lastly, I'd like to make sure that the FileDescriptorPoller API works well with C libraries such as NGINX Unit and c-ares.

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we really need to consider the JVM simultaneously. There's a layer to this which is definitely platform-specific, but I think that layer is the polling system itself. I think that the IORuntime and IO APIs should likely be shared.

*/
type PollData

def makePoller(ec: ExecutionContext, data: () => PollData): Poller
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ExecutionContext is only necessary here because you're wrapping registration methods in subtypes in IO. IMO, that juggle should be managed by callers, which already need to play tricks with evalOn in order to keep things on the polling system pool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* The thread-local data structure used for polling.
*/
type PollData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is PollData and Poller separate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import scala.concurrent.ExecutionContext

abstract class PollingSystem {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this correctly, this should be in jvm-native shared sources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely eventually. Until Native catches up with multi-threading it may not be possible, but I'll take a look.

Comment on lines +80 to +81
readSemaphore: Semaphore[IO],
writeSemaphore: Semaphore[IO]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's thread-local, why are the semaphores necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphores are to prevent running more than one read and one write operation on a file descriptor at a time.

* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this
* method will complete with `B`.
*/
def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glancing at the implementations, it really looks like nothing about the looping structure here is platform-specific. In other words, this signature could be simplified and callers could use tailRecM to achieve the same effect. If we want to provide a convenience function which packages this up, we can, but as it stands every polling system is just doing essentially the same thing here. That suggests that the true primitive is what happens in the Right branch.

Edit Actually is this true? Looking again, it seems that the Epoll implementation explicitly recurses, while the KQueue one doesn't?

Copy link
Member Author

@armanbilge armanbilge Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both implementations recurse. The recursion is used in the case that the operation fails because it's blocked, in which case it needs to be called again (ideally when the file descriptor is unblocked).

In other words, this signature could be simplified and callers could use tailRecM to achieve the same effect.

You may be right about this, but I need to think about it some more. If we were only dealing with kqueue/io_uring style polling systems, I would definitely agree.

However, for epoll, letting the polling system handle the recursion enables it to keep track of some internal state (related to edge-triggered mode I described above). This enables it to fast-path. Edit: actually, I think this is necessary for correctness, so it does not deadlock.

def go(a: A, before: Int): IO[B] =
f(a).flatMap {
case Left(a) =>
IO(readReadyCounter).flatMap { after =>
if (before != after)
// there was a read-ready notification since we started, try again immediately
go(a, after)
else
IO.asyncCheckAttempt[Int] { cb =>
IO {
readCallback = cb
// check again before we suspend
val now = readReadyCounter
if (now != before) {
readCallback = null
Right(now)
} else Left(Some(IO(this.readCallback = null)))
}
}.flatMap(go(a, _))
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some further justification for this API, specifically as it relates to epoll:

As I mentioned in #3314 (comment), libuv's general polling facility uses "level-triggered" mode. So does the JDK NIO Selector. This is unfortunate, because level-triggering is very inefficient.

For example, Netty "uses linux EPOLL Edge-Triggered Mode for maximal performance."
https://github.com/netty/netty/blob/bc49c4b1464c8345bf87b3053b3adefa64f91530/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java#L39-L42

So, why do libuv and Selector use level-triggering? This is explained in an nio-dev thread.

The Selector API requires a level-triggered polling mechanism. 
Edge-triggered interfaces require a much tighter coupling with the I/O 
methods.

https://mail.openjdk.org/pipermail/nio-dev/2008-December/000329.html

Essentially, this tight coupling forces the actual I/O implementations to be designed specifically for edge-triggered notifications, instead of against a generic I/O polling facility. This is consistent with Netty and also this warning in the libuv docs.

Using uv_poll_t for any other purpose is not recommended; uv_tcp_t, uv_udp_t, etc. provide an implementation that is faster and more scalable than what can be achieved with uv_poll_t

http://docs.libuv.org/en/v1.x/poll.html

Suffice to say, if I've done my job right, this API is encoding the tight coupling needed for edge-triggered notifications to work correctly: this is why the user must pass the desired I/O operation into the polling system.

Assuming this all works correctly, it lets downstreams build against a single, generic file descriptor polling interface, instead of maintaining multiple implementations tightly coupled to the underlying polling system. Furthermore, because this API is high-level, it lets the polling system optimize how and when notifications are scheduled.

Copy link
Member Author

@armanbilge armanbilge Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr

  1. epoll is stupidly inefficient unless used in edge-triggered mode.
  2. In edge-triggered mode, epoll will not notify you if a file descriptor is already ready. It will only tell you when it transitions to ready.
  3. This leaves us with a race condition between performing an I/O operation and receiving a notification of I/O readiness. This API is should be designed to manage that race condition.

else if (LinktimeInfo.isMac)
unsafe.KqueueSystem
else
unsafe.SleepSystem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should… probably support Windows too someday :-(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

More seriously, I think an fs2-uv with a libuv-based polling system would be the least masochistic way to deal with this.

@@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
*/
def readLine: IO[String] =
Console[IO].readLine

def poller[Poller](implicit ct: ClassTag[Poller]): IO[Option[Poller]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API isn't going to be rich enough for the JVM. The tricky thing isn't getting the polling system out of the ExecutionContext, but rather getting the thread-local out of the worker. This function as-written could actually exist in downstream userspace and doesn't need to be added to IO, whereas the thread-local extraction cannot.

This is probably a decent example of trying to do native first and following up with the JVM. IMO we really need to do both simultaneously, which complicates the implementation but forces us to come to terms with both platform constraints in a single API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fd: Int,
reads: Boolean,
writes: Boolean
): Resource[IO, FileDescriptorPollHandle] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the use of IO (and other high level constructs) in these APIs actively preventing us from taking advantage of thread-locals? It feels like this API is meant to be exceptionally low-level, except it's also presenting this high-level API. Are we sure this is safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def defaultScheduler: Scheduler = QueueExecutorScheduler
def defaultScheduler: Scheduler = EventLoopExecutorScheduler.global

def createEventLoop(system: PollingSystem): ExecutionContext with Scheduler =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@armanbilge
Copy link
Member Author

All of these are really the same question, which sadly means the main ideas of my encoding didn't really get across 😕

Why is PollData and Poller separate?

I think the ExecutionContext is only necessary here because you're wrapping registration methods in subtypes in IO. IMO, that juggle should be managed by callers, which already need to play tricks with evalOn in order to keep things on the polling system pool.

This API isn't going to be rich enough for the JVM. The tricky thing isn't getting the polling system out of the ExecutionContext, but rather getting the thread-local out of the worker. This function as-written could actually exist in downstream userspace and doesn't need to be added to IO, whereas the thread-local extraction cannot.

This is probably a decent example of trying to do native first and following up with the JVM. IMO we really need to do both simultaneously, which complicates the implementation but forces us to come to terms with both platform constraints in a single API.

Firstly, the encoding I chose here can definitely be shared with the JVM (the only exception was interruption-related stuff, which I omitted here because no multi-threading). I had the JVM very much in mind when I worked on this. I'm happy to update the other PR to demonstrate exactly that.

The important ideas are captured here:

/**
* The user-facing Poller interface.
*/
type Poller
/**
* The thread-local data structure used for polling.
*/
type PollData
def makePoller(ec: ExecutionContext, data: () => PollData): Poller

More specifically:

Poller is the user-facing API for interfacing with the polling system. PollData is the thread-local data structure.

The Poller also holds the specific polling system implementation, that relies on knowing (a) the execution context and (b) a way to access the thread-local data structure.

This means the Poller assumes full responsibility for getting on the right ExecutionContext when access to the thread-local is needed, and doing any non-threadsafe interactions with that on that same thread.

This also means that once a user has obtained a Poller, they can safely use it from anywhere, without having to worry about evalOning to the right place, or worrying about access to thread-locals. IMO this is a good API.

@djspiewak
Copy link
Member

Ah I see. Okay I need to think more about this.

@armanbilge
Copy link
Member Author

This branch lives on in:

@armanbilge armanbilge closed this May 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants