diff --git a/src/io/ev_loop.ml b/src/io/ev_loop.ml index 7cff644f..fa5616fb 100644 --- a/src/io/ev_loop.ml +++ b/src/io/ev_loop.ml @@ -60,8 +60,6 @@ module Per_fd = struct mutable writes: IO_wait.t list; } - let[@inline] is_empty self = self.reads = [] && self.writes = [] - let update_event (self : t) ~(poll : Poll.t) : unit = let ev = match self.reads, self.writes with @@ -71,6 +69,9 @@ module Per_fd = struct | [], _ :: _ -> Poll.Event.write | _ :: _, _ :: _ -> Poll.Event.read_write in + Printf.eprintf "poll.set %d {rd=%b, wr=%b}\n%!" + (Obj.magic self.fd.fd : int) + ev.readable ev.writable; Poll.set poll self.fd.fd ev end @@ -100,11 +101,11 @@ module IO_tbl = struct | Read -> self.n_read <- 1 + self.n_read; per_fd.reads <- ev :: per_fd.reads; - if self.n_read = 0 then Per_fd.update_event per_fd ~poll:self.poll + if self.n_read = 1 then Per_fd.update_event per_fd ~poll:self.poll | Write -> self.n_write <- 1 + self.n_write; per_fd.writes <- ev :: per_fd.writes; - if self.n_write = 0 then Per_fd.update_event per_fd ~poll:self.poll + if self.n_write = 1 then Per_fd.update_event per_fd ~poll:self.poll let[@inline] trigger_waiter (io : IO_wait.t) = if io.active then io.f io.as_cancel_handle @@ -112,6 +113,7 @@ module IO_tbl = struct (** Wake up waiters on FDs who received events *) let handle_ready ~ignore_fd (self : t) : unit = let update_per_fd (per_fd : Per_fd.t) (event : Poll.Event.t) = + Printf.eprintf "handle ready %d\n%!" (Obj.magic per_fd.fd.fd : int); if Fd.closed per_fd.fd then (* cleanup *) Hashtbl.remove self.tbl per_fd.fd.fd @@ -136,6 +138,8 @@ module IO_tbl = struct let per_fd = Hashtbl.find self.tbl fd in update_per_fd per_fd event )); + Poll.clear self.poll; + () (** Remove closed FDs *) @@ -165,7 +169,8 @@ module Ev_loop = struct type t = { timer: Timer.t; actions: Action_queue.t; - io_tbl: IO_tbl.t; (** Used for select *) + io_tbl: IO_tbl.t; + (** Used to remember which events are tracked on which FD *) in_blocking_section: bool A.t; (** Is the ev loop thread currently waiting? *) pipe_read: Unix.file_descr; (** Main thread only *) @@ -181,7 +186,6 @@ module Ev_loop = struct Unix.set_nonblock pipe_read; let poll = Poll.create () in - Poll.set poll pipe_read Poll.Event.read; { timer = Timer.create (); @@ -233,14 +237,18 @@ module Ev_loop = struct let delay_s = Option.value delay_s ~default:10. in let timeout = Poll.Timeout.after Int64.(of_float (delay_s *. 1e9)) in - (* run [select] *) + (* poll *) + Poll.set self.poll self.pipe_read Poll.Event.read; A.set self.in_blocking_section true; + let has_events = - let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in + let@ _sp = Tracing_.with_span "moonpool-unix.evloop.poll" in + Printf.eprintf "polling…\n%!"; match Poll.wait self.poll timeout with | `Timeout -> false | `Ok -> true in + Printf.eprintf "poll returned with has_events=%b\n%!" has_events; A.set self.in_blocking_section false; @@ -269,6 +277,10 @@ let rec get_or_set_as_current_ (ev : Ev_loop.t) : Ev_loop.t * bool = let bg_loop_ (ev_loop : Ev_loop.t) = let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in + Printf.eprintf "bg loop is thread %d\n%!" (Thread.id @@ Thread.self ()); + ignore + (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe; Sys.sigterm; Sys.sigint ] + : _ list); while true do Ev_loop.run_step_ ev_loop done diff --git a/test/io/echo_server.ml b/test/io/echo_server.ml index da2c30c2..b964c214 100644 --- a/test/io/echo_server.ml +++ b/test/io/echo_server.ml @@ -11,10 +11,14 @@ let main ~port ~j () : unit = let@ _main_runner = MU.main in Trace.set_thread_name "main"; + Printf.eprintf "main is thread %d\n%!" (Thread.id @@ Thread.self ()); + let@ server = MU.TCP_server.with_server ~runner (MU.Sockaddr.any port) ~handle:(fun ~client_addr:addr ic oc -> Trace.message "got new client"; + Printf.eprintf "handle client on thread %d\n%!" + (Thread.id @@ Thread.self ()); let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> [ "addr", `String (MU.Sockaddr.show addr) ]) @@ -59,4 +63,7 @@ let () = in Arg.parse opts ignore "echo server"; - main ~port:!port ~j:!j () + try main ~port:!port ~j:!j () + with Sys.Break -> + Printf.eprintf "got ctrl-c, exiting\n%!"; + exit 0