Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doodle of polling system high-level API #3179

Closed

Conversation

armanbilge
Copy link
Member

@armanbilge armanbilge commented Sep 29, 2022

Here's a doodle of what the high-level API for the polling system might look like.

  1. It bakes the concept straight into Async in kernel. This goes to the point that you need an IO-like ecosystem to really take advantage of this, but enables the ecosystem to build without knowing about IO.

  2. It enforces the idea that the PollingSystem you want might not be available. This may happen for various reasons:

    • not running on the WSTP, or evalOn-ed off of it
    • no system installed, or not the one you want

    This gives implementations the chance to recover, which in practice would mean starting and using their own event-loop thread. It also lets an easy default implementation fall out.

  3. It encourages you to do everything you need with the PollingSystem in a single, atomic delay block. Because a subsequent effect can always execute on another thread (since we do not have an uncedable construct) this is the only safe way to interact with a PollingSystem without requiring it to be threadsafe.

Example use

These are rough sketches, but whatever.

Selector

F.delayWithPollingSystem[Selector, Socket[F]] { selector =>
  val ch = selector.provider.openSocketChannel()
  val callback = ??? // too fugly for this example
  ch.register(selector, OP_READ | OP_WRITE, callback)
  new Socket[F](ch, ...)
} flatMap {
  case Some(socket) => F.pure(socket) // cool
  case None => Socket.makeTheOldFashionedWay[F]
}

io_uring

F.async[Int] { cb =>
  F.delayWithPollingSystem[Uring, Unit] { ring =>
    val sqe = ring.getSqe(cb) // get submission-queue-event and register callback
    io_uring_prep_nop(sqe) // prep an operation
    ()
  } flatMap {
    case Some(()) => F.unit // successful, do nothing
    case None => F.delay(/* submit elsewhere */)
  } as None // ignore cancellation for this example
}

@djspiewak
Copy link
Member

I like the idea of making this an effect, but I don't see the benefit to putting it into Async. This is literally what LiftIO is for. So whatever we do, the effect should be on IO itself, and then if someone wants to have it within a generic F context, they will need a LiftIO[F]

@armanbilge
Copy link
Member Author

This is literally what LiftIO is for.

What is "this"? There's no reason we have to force IO here.

@djspiewak
Copy link
Member

The functionality overall is IO specific though. It's kind of like saying "this works with any Async… so long as it is based on IO". That's the same as just saying LiftIO

@armanbilge
Copy link
Member Author

armanbilge commented Sep 29, 2022

The functionality overall is IO specific though.

Hm, either we have different things in mind or we just disagree :) if pulling this off requires modifications to the fiber runloop, ok I agree. But it's really a property of the ExecutionContext which is not specific to IO.

For what it's worth, I think just running a separate selector thread as part of your runtime is a valid implementation of this. Exactly how we currently run a separate scheduler thread today.

Soon we will be doing scheduling on the WSTP, thus subsuming the scheduler thread. Does this suddenly make everything in Temporal to be IO-specific? If not, then how is it any different?

@armanbilge
Copy link
Member Author

I like #3278 better :)

@armanbilge armanbilge closed this Nov 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants