Skip to content

Commit

Permalink
Use Iomux.Poll.ppoll instead of epoll via ocaml-poll
Browse files Browse the repository at this point in the history
An Iomux.Poll.t holds has an entry for each available file descriptor.
The allocation strategy is straightforward:
 - Each index in the poll array maps to the fd of same number.
 - We toggle the poll slot activation when we move from waiters->empty and
empty->waiters. This is a bit different than before as we actually call `update`
after `iter_ready` unregistering the possible interest.
 - We have to bump maxi everytime we go over the current allocated, and we don't
go back, we could decrement maxi when we close the last one, but we can't really
avoid holes, so don't bother.

I'm surprised this worked the first time, I still have to review it.
  • Loading branch information
haesbaert authored and talex5 committed Feb 24, 2023
1 parent 9e47358 commit 9bbc258
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
2 changes: 1 addition & 1 deletion lib_eio_posix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
(language c)
(flags :standard -D_LARGEFILE64_SOURCE)
(names eio_posix_stubs))
(libraries eio eio.utils eio.unix fmt poll))
(libraries eio eio.utils eio.unix fmt iomux))
89 changes: 48 additions & 41 deletions lib_eio_posix/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module Lf_queue = Eio_utils.Lf_queue
module Fiber_context = Eio.Private.Fiber_context
module Ctf = Eio.Private.Ctf
module Rcfd = Eio_unix.Private.Rcfd
module Poll = Iomux.Poll

type exit = [`Exit_scheduler]

Expand All @@ -40,6 +41,7 @@ type t = {
run_q : runnable Lf_queue.t;

poll : Poll.t;
mutable poll_maxi : int;

(* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event.
In that case, [need_wakeup = true] and you must signal using [eventfd]. *)
Expand Down Expand Up @@ -106,41 +108,44 @@ let clear_event_fd t =
let buf = Bytes.create 8 in
let got = Unix.read t.eventfd_r buf 0 (Bytes.length buf) in
assert (got > 0);
Poll.set t.poll t.eventfd_r Poll.Event.read
Poll.(set_index t.poll (Iomux.Util.fd_of_unix t.eventfd_r) t.eventfd_r Flags.pollin)

let ready t fd (event : Poll.Event.t) =
let update t waiters fd =
let fdi = Iomux.Util.fd_of_unix fd in
let flags =
match not (Lwt_dllist.is_empty waiters.read),
not (Lwt_dllist.is_empty waiters.write) with
| false, false -> Poll.Flags.empty
| true, false -> Poll.Flags.pollin
| false, true -> Poll.Flags.pollout
| true, true -> Poll.Flags.(pollin + pollout)
in
if flags = Poll.Flags.empty then (
Poll.(invalidate_index t.poll fdi);
Hashtbl.remove t.fd_map fd
) else (
Poll.(set_index t.poll fdi fd flags);
if fdi > t.poll_maxi then
t.poll_maxi <- fdi
)

let ready t _index fd revents =
(* Reason about this *)
if fd == t.eventfd_r then (
clear_event_fd t
(* The scheduler will now look at the run queue again and notice any new items. *)
) else (
let waiters = Hashtbl.find t.fd_map fd in
let pending = Lwt_dllist.create () in
if event.readable then Lwt_dllist.transfer_l waiters.read pending;
if event.writable then Lwt_dllist.transfer_l waiters.write pending;
(* Everything is marked as ONESHOT, so if we want to use anything later we need to re-arm with EPOLL_CTL_MOD.
Except that if the FD gets closed and reassigned then we'll need to use EPOLL_CTL_ADD instead.
Since we don't know whether it will be closed or not, stop watching it immediately. *)
let event2 : Poll.Event.t = {
readable = not (Lwt_dllist.is_empty waiters.read);
writable = not (Lwt_dllist.is_empty waiters.write);
} in
Poll.set t.poll fd event2;
if event2 = Poll.Event.none then Hashtbl.remove t.fd_map fd;
Lwt_dllist.iter_node_r (remove_and_resume t) pending
)
if Poll.Flags.(mem revents (pollout + pollhup + pollerr)) then
Lwt_dllist.transfer_l waiters.write pending;
if Poll.Flags.(mem revents (pollin + pollhup + pollerr)) then
Lwt_dllist.transfer_l waiters.read pending;
(* If pending has things, it means we modified the waiters, refresh our view *)
if not (Lwt_dllist.is_empty pending) then
update t waiters fd;
Lwt_dllist.iter_node_r (remove_and_resume t) pending)

(* todo: remove event when empty. Or when closed? But hard to know when that happens. *)
let update t waiters fd =
let event : Poll.Event.t = {
readable = not (Lwt_dllist.is_empty waiters.read);
writable = not (Lwt_dllist.is_empty waiters.write);
} in
try
Poll.set t.poll fd event
with Unix.Unix_error(Unix.EPERM, _, "") ->
(* [fd] doesn't support polling (e.g. it's a regular file). Assume it won't block
and wake the caller immediately. *)
ready t fd event

let add_read t fd k =
let waiters = get_waiters t fd in
Expand Down Expand Up @@ -183,13 +188,13 @@ let rec next t : [`Exit_scheduler] =
let time = Mtime.to_uint64_ns time in
let now = Mtime.to_uint64_ns now in
let diff_ns = Int64.sub time now in
Poll.Timeout.after diff_ns
| `Nothing -> Poll.Timeout.never
Poll.Nanoseconds diff_ns
| `Nothing -> Poll.Infinite
in
if not (Lf_queue.is_empty t.run_q) then (
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
next t
) else if timeout = Never && t.active_ops = 0 then (
) else if timeout = Infinite && t.active_ops = 0 then (
(* Nothing further can happen at this point. *)
Lf_queue.close t.run_q; (* Just to catch bugs if something tries to enqueue later *)
`Exit_scheduler
Expand All @@ -200,18 +205,14 @@ let rec next t : [`Exit_scheduler] =
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Ctf.(note_hiatus Wait_for_work);
let r =
match Poll.wait t.poll timeout with
| `Ok | `Timeout as x -> x
| exception Unix.Unix_error(Unix.EINTR, _, "") -> `EINTR
let nready =
try Poll.ppoll t.poll (t.poll_maxi + 1) timeout []
with Unix.Unix_error(Unix.EINTR, _, "") -> 0
in
Ctf.note_resume system_thread;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
begin match r with
| `EINTR | `Timeout -> ()
| `Ok -> Poll.iter_ready t.poll ~f:(ready t); Poll.clear t.poll
end;
Poll.iter_ready t.poll nready (ready t);
next t
) else (
(* Someone added a new job while we were setting [need_wakeup] to [true].
Expand All @@ -236,10 +237,16 @@ let with_sched fn =
assert was_open
in
let poll = Poll.create () in
let cleanup () = Poll.close poll; cleanup () in
for i = 0 to Poll.maxfds poll - 1 do
Poll.invalidate_index poll i
done;
let fd_map = Hashtbl.create 10 in
let t = { run_q; poll; fd_map; eventfd; eventfd_r; active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in
Poll.set poll eventfd_r Poll.Event.read;
let t = { run_q; poll; poll_maxi = 0; fd_map; eventfd; eventfd_r;
active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in
let eventfd_ri = Iomux.Util.fd_of_unix eventfd_r in
Poll.(set_index t.poll eventfd_ri eventfd_r Flags.pollin);
if eventfd_ri > t.poll_maxi then
t.poll_maxi <- eventfd_ri;
match fn t with
| x -> cleanup (); x
| exception ex ->
Expand Down

0 comments on commit 9bbc258

Please sign in to comment.