Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native Polling System #3314

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6d23310
Extract `PollingSystem` abstraction on Native
armanbilge Dec 11, 2022
727bfe8
Add `FileDescriptorPoller` abstraction
armanbilge Dec 11, 2022
9a8f7ed
Add `EpollSystem` and `KqueueSystem`
armanbilge Dec 11, 2022
0889426
Add test for `Scheduler#sleep`
armanbilge Dec 11, 2022
4250049
Add `FileDescriptorPollerSpec`
armanbilge Dec 11, 2022
0b9ac02
Consistent error-handling in `KqueueSystem`
armanbilge Dec 11, 2022
956734a
Make `pollingSystem` configurable in `IOApp`
armanbilge Dec 11, 2022
dda54b7
Nowarn unuseds
armanbilge Dec 12, 2022
1206b27
Revise the fd poller spec
armanbilge Dec 12, 2022
e0c4ec3
Remove `maxEvents` config from `EpollSystem`
armanbilge Dec 12, 2022
eeeb3e6
Remove `maxEvents` config from `KqueueSystem`
armanbilge Dec 12, 2022
c4a0a16
Add test for many simultaneous events
armanbilge Dec 12, 2022
aae6e97
Remove redundant `final`
armanbilge Dec 12, 2022
457f89c
Update comment
armanbilge Dec 12, 2022
721c2fc
Add test for pre-existing readiness
armanbilge Dec 12, 2022
4f9e57b
Add test for no readiness
armanbilge Dec 12, 2022
a520aee
Reimagine `FileDescriptorPoller`
armanbilge Dec 19, 2022
5f8146b
Fix parameter names
armanbilge Dec 19, 2022
3640605
Refactor/redesign `PollingSystem` ... again ... (:
armanbilge Dec 19, 2022
42491da
Dump `EventLoop` abstraction
armanbilge Dec 19, 2022
786127c
Update the `FileDescriptorPollerSpec`
armanbilge Dec 19, 2022
de3eea0
Rework `EpollSystem`
armanbilge Dec 20, 2022
eb8ba84
Set pipes to non-blocking mode
armanbilge Dec 20, 2022
0124567
Add fcntl import
armanbilge Dec 20, 2022
72b05a7
Fix bugs in spec
armanbilge Dec 20, 2022
d18fa76
Add some uncancelables
armanbilge Dec 20, 2022
4d3a916
Revert "Add some uncancelables"
armanbilge Dec 20, 2022
9ba870f
Rework `KqueueSystem`
armanbilge Dec 20, 2022
9673883
Post-refactor typos
armanbilge Dec 20, 2022
43b0b0a
Scope `.evalOn` even more tightly
armanbilge Dec 24, 2022
e5dd04f
Use `asyncCheckAttempt`
armanbilge Dec 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
} else Seq()
}
)
.nativeSettings(
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.PollingExecutorScheduler$SleepTask"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$")
)
)

/**
* Test support for the core project, providing various helpful instances like ScalaCheck
Expand Down
56 changes: 56 additions & 0 deletions core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

trait FileDescriptorPoller {

/**
* Registers a file descriptor with the poller and monitors read- and/or write-ready events.
*/
def registerFileDescriptor(
fileDescriptor: Int,
monitorReadReady: Boolean,
monitorWriteReady: Boolean
): Resource[IO, FileDescriptorPollHandle]

}

trait FileDescriptorPollHandle {

/**
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `read` or
* `recv` on the file descriptor.
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`.
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready
* for reading.
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this
* method will complete with `B`.
*/
def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glancing at the implementations, it really looks like nothing about the looping structure here is platform-specific. In other words, this signature could be simplified and callers could use tailRecM to achieve the same effect. If we want to provide a convenience function which packages this up, we can, but as it stands every polling system is just doing essentially the same thing here. That suggests that the true primitive is what happens in the Right branch.

Edit Actually is this true? Looking again, it seems that the Epoll implementation explicitly recurses, while the KQueue one doesn't?

Copy link
Member Author

@armanbilge armanbilge Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both implementations recurse. The recursion is used in the case that the operation fails because it's blocked, in which case it needs to be called again (ideally when the file descriptor is unblocked).

In other words, this signature could be simplified and callers could use tailRecM to achieve the same effect.

You may be right about this, but I need to think about it some more. If we were only dealing with kqueue/io_uring style polling systems, I would definitely agree.

However, for epoll, letting the polling system handle the recursion enables it to keep track of some internal state (related to edge-triggered mode I described above). This enables it to fast-path. Edit: actually, I think this is necessary for correctness, so it does not deadlock.

def go(a: A, before: Int): IO[B] =
f(a).flatMap {
case Left(a) =>
IO(readReadyCounter).flatMap { after =>
if (before != after)
// there was a read-ready notification since we started, try again immediately
go(a, after)
else
IO.asyncCheckAttempt[Int] { cb =>
IO {
readCallback = cb
// check again before we suspend
val now = readReadyCounter
if (now != before) {
readCallback = null
Right(now)
} else Left(Some(IO(this.readCallback = null)))
}
}.flatMap(go(a, _))
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some further justification for this API, specifically as it relates to epoll:

As I mentioned in #3314 (comment), libuv's general polling facility uses "level-triggered" mode. So does the JDK NIO Selector. This is unfortunate, because level-triggering is very inefficient.

For example, Netty "uses linux EPOLL Edge-Triggered Mode for maximal performance."
https://github.com/netty/netty/blob/bc49c4b1464c8345bf87b3053b3adefa64f91530/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java#L39-L42

So, why do libuv and Selector use level-triggering? This is explained in an nio-dev thread.

The Selector API requires a level-triggered polling mechanism. 
Edge-triggered interfaces require a much tighter coupling with the I/O 
methods.

https://mail.openjdk.org/pipermail/nio-dev/2008-December/000329.html

Essentially, this tight coupling forces the actual I/O implementations to be designed specifically for edge-triggered notifications, instead of against a generic I/O polling facility. This is consistent with Netty and also this warning in the libuv docs.

Using uv_poll_t for any other purpose is not recommended; uv_tcp_t, uv_udp_t, etc. provide an implementation that is faster and more scalable than what can be achieved with uv_poll_t

http://docs.libuv.org/en/v1.x/poll.html

Suffice to say, if I've done my job right, this API is encoding the tight coupling needed for edge-triggered notifications to work correctly: this is why the user must pass the desired I/O operation into the polling system.

Assuming this all works correctly, it lets downstreams build against a single, generic file descriptor polling interface, instead of maintaining multiple implementations tightly coupled to the underlying polling system. Furthermore, because this API is high-level, it lets the polling system optimize how and when notifications are scheduled.

Copy link
Member Author

@armanbilge armanbilge Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr

  1. epoll is stupidly inefficient unless used in edge-triggered mode.
  2. In edge-triggered mode, epoll will not notify you if a file descriptor is already ready. It will only tell you when it transitions to ready.
  3. This leaves us with a race condition between performing an I/O operation and receiving a notification of I/O readiness. This API is should be designed to manage that race condition.


/**
* Recursively invokes `f` until it is no longer blocked. Typically `f` will call `write` or
* `send` on the file descriptor.
* - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`.
* Then `f` will be invoked again with `A` at a later point, when the file handle is ready
* for writing.
* - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this
* method will complete with `B`.
*/
def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B]

}
Comment on lines +32 to +56
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of tore everything up and started again from scratch. Sorry 😅

I am excited about this new API, which I think is a significant improvement.

Instead of exposing a raw, multi-shot callback to the user, the user passes an I/O operation (suspended in IO) to the polling system. This gives the polling system full control over scheduling of the operation, so the specific strategy can be optimized for every backend (epoll, kqueue, io_uring).

The new API is also a much better fit for working with C libraries, where typically you are invoking an I/O operation repeatedly until it is unblocked and has fully consumed/provided the desired data.

I'll get a new snapshot up soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Daniel and I discussed this new API on Discord. Various doubts:

  • the fact that these methods are not the "primitives" of the underlying polling systems
  • whether this abstraction should even be in Cats Effect
  • whether this API is too tailored to the FS2 I/O usecase

I am also concerned about these issues, since getting this API right is important for building a composable ecosystem on Native that can interop with external I/O libraries.


On the JVM the important primitive is IO.async and friends. This is because external libraries that do I/O start their own threads and threadpools. We give them a callback that they execute on their own threadpool, to shift back into our own threadpool upon completion of some async task.

In Native world, C libraries that do I/O don't start their own threadpools. Instead they typically do two things:

  1. provide a set of file descriptors, that they are interested in reading/writing on
  2. provide a function(s) that "drive" the library. This function may either complete successfully (possibly invoking some callbacks) or return a special error code indicating that the method is blocked and should be tried again (ideally when I/O is known to be unblocked on the relevant file descriptors).

Some concrete examples of this:

These "driver" methods are designed and intended to be invoked repeatedly, in a loop, until the operation completes successfully (i.e. is not blocked). In some cases this requires threading some state through.


On the other side of things are how native polling systems work. There are essentially two categories:

  1. kqueue and io_uring: these are really nice APIs, in that they let you setup notifications on multiple file descriptors in a single syscall (indeed, the syscall used for polling). These notifications can be one-shot, and specific to read or write.

    This is an ideal scenario for load-balancing: you can always setup the notification on your current thread. As fibers get stolen, this should balance out dynamically.

  2. epoll and libuv: are ... not quite so nice :) setting up each notification requires its own syscall. Furthermore, you must register for both reads and writes at the same time (i.e. they cannot be setup independently). Their notifications are multi-shot so will be repeated until the file descriptor is explicitly unregistered.

    • epoll by default and libuv use "level-triggered" mode. This means it will keep notifying nagging that a file descriptor is ready until you drain/block it again. This is annoying if an fd is ready for reading/writing, but some downstream/upstream is not yet ready to take advantage of that.
    • epoll has an "edge-triggered" mode, such that it only notifies you once, when the status of the file descriptor changes. This is much more efficient, but requires some bookkeeping to make sure a notification is not missed.
    • libuv also warns that you should be prepared to handle "spurious" notifications, where you are notified even though the file descriptor is actually still blocked.

    Sadly this is not great for load balancing. To avoid syscalls and book-keeping it's preferable to register for notifications once. But then that file descriptor is tied to a particular thread indefinitely.


tl;dr

  • C I/O libraries provide a list of "interesting" file descriptors and "driver" functions that you should invoke repeatedly while they are blocked, until they indicate success. This is not unique to the APIs that FS2 happens to use.
  • Native polling systems either offer cheap-setup one-shot notifications or expensive-setup multi-shot notifications.

These constraints are what pushed me to the API proposed here: a way to hand the C libraries' "driver" functions directly to the polling system itself, so that it can schedule and invoke these functions using the optimal strategy for that particular system.

Finally, why put this abstraction in Cats Effect? So that we can have two independent, composable ecosystems:

  1. Pure wrappers around C libraries, that are based on file descriptor polling
  2. Polling systems, that implement file descriptor polling in addition to offering their own implementation specific primitives.

Copy link
Contributor

@Baccata Baccata Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just came here to say that this is a great write up that should be kept in some design document (or at least a gist)

24 changes: 18 additions & 6 deletions core/native/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cats.effect.metrics.NativeCpuStarvationMetrics

import scala.concurrent.CancellationException
import scala.concurrent.duration._
import scala.scalanative.meta.LinktimeInfo

/**
* The primary entry point to a Cats Effect application. Extend this trait rather than defining
Expand Down Expand Up @@ -165,6 +166,21 @@ trait IOApp {
*/
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig()

/**
* The [[unsafe.PollingSystem]] used by the [[runtime]] which will evaluate the [[IO]]
* produced by `run`. It is very unlikely that users will need to override this method.
*
* [[unsafe.PollingSystem]] implementors may provide their own flavors of [[IOApp]] that
* override this method.
*/
protected def pollingSystem: unsafe.PollingSystem =
if (LinktimeInfo.isLinux)
unsafe.EpollSystem
else if (LinktimeInfo.isMac)
unsafe.KqueueSystem
else
unsafe.SleepSystem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should… probably support Windows too someday :-(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

More seriously, I think an fs2-uv with a libuv-based polling system would be the least masochistic way to deal with this.


/**
* The entry point for your application. Will be called by the runtime when the process is
* started. If the underlying runtime supports it, any arguments passed to the process will be
Expand All @@ -186,12 +202,8 @@ trait IOApp {
import unsafe.IORuntime

val installed = IORuntime installGlobal {
IORuntime(
IORuntime.defaultComputeExecutionContext,
IORuntime.defaultComputeExecutionContext,
IORuntime.defaultScheduler,
() => (),
runtimeConfig)
val loop = IORuntime.createEventLoop(pollingSystem)
IORuntime(loop, loop, loop, () => (), runtimeConfig)
}

_runtime = IORuntime.global
Expand Down
10 changes: 10 additions & 0 deletions core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package cats.effect

import cats.effect.std.Console
import cats.effect.unsafe.EventLoopExecutorScheduler

import scala.reflect.ClassTag

import java.time.Instant

Expand Down Expand Up @@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
*/
def readLine: IO[String] =
Console[IO].readLine

def poller[Poller](implicit ct: ClassTag[Poller]): IO[Option[Poller]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API isn't going to be rich enough for the JVM. The tricky thing isn't getting the polling system out of the ExecutionContext, but rather getting the thread-local out of the worker. This function as-written could actually exist in downstream userspace and doesn't need to be added to IO, whereas the thread-local extraction cannot.

This is probably a decent example of trying to do native first and following up with the JVM. IMO we really need to do both simultaneously, which complicates the implementation but forces us to come to terms with both platform constraints in a single API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IO.executionContext.map {
case loop: EventLoopExecutorScheduler if ct.runtimeClass.isInstance(loop.poller) =>
Some(loop.poller.asInstanceOf[Poller])
case _ => None
}
}
Loading