Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial sketch of PollingSystem #3278

Closed

Conversation

djspiewak
Copy link
Member

@djspiewak djspiewak commented Nov 24, 2022

Builds on #3219. Alternative to #3179.

This adds a PollingSystem API, exposed in the following ways:

abstract class PollingSystem {

  type Poller <: AbstractPoller

  def apply(): Poller

  final def local()(implicit tag: ClassTag[Poller]): Option[Poller] =
    Thread.currentThread() match {
      case t: WorkerThread => tag.unapply(t.poller())
      case _ => None
    }

  protected abstract class AbstractPoller {
    def poll(nanos: Long): Unit
    def interrupt(target: Thread): Unit
  }
}
trait IOApp {
  protected def pollingSystem: unsafe.PollingSystem = ...

The concept here is that you would define a singleton polling system which you're likely to use throughout the application. For example:

object SleepSystem extends PollingSystem {

  def apply(): Poller = new Poller()

  final class Poller extends AbstractPoller {

    def poll(nanos: Long): Unit = {
      if (nanos < 0)
        LockSupport.park()
      else if (nanos > 0)
        LockSupport.parkNanos(nanos)
      else
        ()
    }

    def interrupt(target: Thread): Unit =
      LockSupport.unpark(target)
  }
}

(note the above can actually be simplified a bit since it's entirely stateless)

Any thread-local state associated with polling would be included in the specific subtype implementation of Poller. The idea is that anyone downstream implementing something like Network would know its exact expected polling system type (e.g. SelectorSystem, or perhaps UringSystem). Any asynchronous events being registered-for would be done in a fashion which is specific to the subtype. For example, the method of registering for io_uring is likely to be different from the method for Selector. Calling SelectorSystem.local() would give you that specific subtype, which can then be used to register a callback linked to an async.

Every time the worker thread polls the external queue, it also calls Poller#poll with 0L. Whenever the worker suspends indefinitely, it passes -1L, and when it suspends for a specific timer window, it passes a positive nanoseconds value. Interruption is inherently polling system specific, which is why this is also something that is included on the interface. For example, an io_uring implementation might raise an event back to itself, awakening the suspension within kernelspace.

There are a few outstanding problems here. For starters, this is very low-level and unsafe. I'd prefer to at least encapsulate it nicely within IO. Additionally, there isn't a good way to detect and fallback as a downstream implementor when you don't have access to a polling system. This can happen in a lot of very normal circumstances, such as evalOn. One solution here might be to build an additional Poller which functions as the "global" poller, but we have to figure that out.

Comment on lines +24 to +28
type Poller <: AbstractPoller

def apply(): Poller

final def local()(implicit tag: ClassTag[Poller]): Option[Poller] =
Copy link
Member

@armanbilge armanbilge Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Poller <: AbstractPoller
def apply(): Poller
final def local()(implicit tag: ClassTag[Poller]): Option[Poller] =
type Poller <: AbstractPoller
def tag: ClassTag[Poller]
protected def apply(): Poller
final def local(): Option[Poller] =

Copy link
Member

@armanbilge armanbilge Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on second thought: I think we need to expose local() as a static method, independent of the PollingSystem instance.

def local[Poller]()(implicit tag: ClassTag[Poller]): Option[Poller]

The reason is, suppose I have an abstract FileDescriptorPoller, which is implemented by an EpollSystem, KqueueSystem, UringSystem, CurlSystem, and UVSystem. How can I get an instance of FileDescriptorPoller without knowing about any of those systems?

Copy link
Member Author

@djspiewak djspiewak Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you mean. Yes, I think that's a limitation of the current system. Technically, FileDescriptorPollingSystem would probably be the way you'd get around this, but I have a better idea now…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, FileDescriptorPollingSystem would probably be the way you'd get around this

Except, such a thing cannot exist, at least not as a object :) there are at least 5 possible implementations. You can have it as an abstract class sure, but how do you actually get one, without knowing the specific implementation.

but I have a better idea now…

😃

Comment on lines +34 to +37
protected abstract class AbstractPoller {
def poll(nanos: Long): Unit
def interrupt(target: Thread): Unit
}
Copy link
Member

@armanbilge armanbilge Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one sub-optimal thing about this design is that it leaks poll and interrupt to users of Poller.

I think these would be better defined as abstract methods on the PollingSystem.

protected def poll(poller: Poller, nanos: Long): Unit
protected def interrupt(poller: Poller, target: Thread): Unit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users of Poller are, by definition, Poller authors. The whole point of Poller is that it's a mechanism for poller authors to talk "to themselves", in a sense, through the WSTP. It's very much not meant to be a thing that people just arbitrarily get at.

Copy link
Member

@armanbilge armanbilge Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, not really. Firstly, a user has to install the PollingSystem in their IOApp or IORuntime, which means the system itself has to be public. If the system is public, and these are public methods, that means that anyone in userspace can randomly call these poll and interrupt methods. Sure, whatever, but also, it doesn't have to be that way.

Second, I disagree that poller authors are only talking to themselves. The entire idea of file descriptor polling abstraction is that poller authors are not talking to themselves. On one side, are polling system implementors, and on the other are poller consumers.

I remember raising this point before: I think we should be careful not to conflate the API that the WSTP needs (i.e. poll and interrupt) with the API that the consumer writing I/O stuff with the poller needs (which is polling system specific).

@armanbilge
Copy link
Member

armanbilge commented Nov 24, 2022

Additionally, there isn't a good way to detect and fallback as a downstream implementor when you don't have access to a polling system. This can happen in a lot of very normal circumstances, such as evalOn. One solution here might be to build an additional Poller which functions as the "global" poller, but we have to figure that out.

It's sort of a more complex version of the timers. If you are not on the WSTP, but you need to schedule a timer, you can either submit it to the WSTP, or we could keep a global scheduler thread. In theory, that global thread could be running a polling system.

@armanbilge
Copy link
Member

Another "feature" of a global polling system would be that you can have multiple polling systems in an application. E.g. if for some reason you need an SelectorSystem for one thing and a UringSystem for something else.

@djspiewak
Copy link
Member Author

djspiewak commented Nov 24, 2022

It's sort of a more complex version of the timers. If you are not on the WSTP, but you need to schedule a timer, you can either submit it to the WSTP, or we could keep a global scheduler thread. In theory, that global thread could be running a polling system.

I mean, in that view, the correct answer if you don't have a Poller but you know you're running on IO is to grab one from a random worker thread. This is effectively the trick that timers use. I think in practice though this wouldn't quite work, since timer submission is much more tightly controlled.

@armanbilge
Copy link
Member

I think in practice though this wouldn't quite work, since timer submission is much more tightly controlled.

Ah yes I think you are right. It's not really safe to grab a Poller from a random worker thread. However, you can safely submit a task that needs a Poller to a random worker thread. This is sort of what I was imagining in my API sketch:

/**
* Attempts to retrieve the runtime's polling system, if present and an instance of `S`.
*
* If successful, it invokes the side-effect `f` with it and returns the result.
*
* If no polling system of type `S` is available, then `f` is not run and `None` is returned.
*/
@nowarn
def delayWithPollingSystem[S: ClassTag, A](f: S => A): F[Option[A]] = pure(None)

@TimWSpence
Copy link
Member

What would a SelectorSystem implementation look like? I haven't quite visualized how this integrates other events like non-blocking I/O. Would poll invoking a bunch of async callbacks to reschedule fibers as it observes the events that they are blocked on?

@djspiewak djspiewak closed this Dec 25, 2022
@armanbilge armanbilge mentioned this pull request Dec 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants