From 092ad5f2ce7983f2e0ca0827d5d87c7321c448db Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 16:10:11 -0500 Subject: [PATCH 1/7] feat: support for `trace` as a depopt one can now give tasks an optional string "name". If `Trace` is present (installed) and enabled, this results in a span around the task's execution. This also plays ok with `await` and other effect-based primitives. --- dune-project | 1 + moonpool.opam | 1 + src/dune | 5 +++- src/fifo_pool.ml | 31 +++++++++++++++++++---- src/fork_join.ml | 14 +++++------ src/fut.ml | 34 ++++++++++++++----------- src/fut.mli | 4 +-- src/immediate_runner.ml | 14 +++++++++-- src/moonpool.mli | 12 ++++++--- src/runner.ml | 8 +++--- src/runner.mli | 18 ++++++++++--- src/suspend_.ml | 18 ++++++++----- src/suspend_.mli | 17 ++++++++++--- src/tracing_.dummy.ml | 4 +++ src/tracing_.mli | 4 +++ src/tracing_.real.ml | 13 ++++++++++ src/ws_pool.ml | 56 ++++++++++++++++++++++++++++------------- src/ws_pool.mli | 4 --- 18 files changed, 184 insertions(+), 74 deletions(-) create mode 100644 src/tracing_.dummy.ml create mode 100644 src/tracing_.mli create mode 100644 src/tracing_.real.ml diff --git a/dune-project b/dune-project index 5c54caea..b4e69e84 100644 --- a/dune-project +++ b/dune-project @@ -28,6 +28,7 @@ (>= 1.9.0) :with-test))) (depopts + (trace (>= 0.6)) thread-local-storage (domain-local-await (>= 0.2))) (tags diff --git a/moonpool.opam b/moonpool.opam index 18ef18ee..6ff7c8b0 100644 --- a/moonpool.opam +++ b/moonpool.opam @@ -19,6 +19,7 @@ depends: [ "mdx" {>= "1.9.0" & with-test} ] depopts: [ + "trace" {>= "0.6"} "thread-local-storage" "domain-local-await" {>= "0.2"} ] diff --git a/src/dune b/src/dune index 59005b54..db4763df 100644 --- a/src/dune +++ b/src/dune @@ -1,7 +1,7 @@ (library (public_name moonpool) (name moonpool) - (private_modules d_pool_ dla_) + (private_modules d_pool_ dla_ tracing_) (preprocess (action (run %{project_root}/src/cpp/cpp.exe %{input-file}))) @@ -9,6 +9,9 @@ (select thread_local_storage_.ml from (thread-local-storage -> thread_local_storage_.stub.ml) (-> thread_local_storage_.real.ml)) + (select tracing_.ml from + (trace.core -> tracing_.real.ml) + (-> tracing_.dummy.ml)) (select dla_.ml from (domain-local-await -> dla_.real.ml) ( -> dla_.dummy.ml)))) diff --git a/src/fifo_pool.ml b/src/fifo_pool.ml index c4cc59ac..c3a09e02 100644 --- a/src/fifo_pool.ml +++ b/src/fifo_pool.ml @@ -3,9 +3,14 @@ include Runner let ( let@ ) = ( @@ ) +type task_with_name = { + f: unit -> unit; + name: string; +} + type state = { threads: Thread.t array; - q: task Bb_queue.t; (** Queue for tasks. *) + q: task_with_name Bb_queue.t; (** Queue for tasks. *) } (** internal state *) @@ -13,7 +18,7 @@ let[@inline] size_ (self : state) = Array.length self.threads let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q (** Run [task] as is, on the pool. *) -let schedule_ (self : state) (task : task) : unit = +let schedule_ (self : state) (task : task_with_name) : unit = try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task @@ -22,19 +27,33 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; let (AT_pair (before_task, after_task)) = around_task in - let run_task task : unit = + let cur_span = ref Tracing_.dummy_span in + + let[@inline] exit_span_ () = + Tracing_.exit_span !cur_span; + cur_span := Tracing_.dummy_span + in + + let run_another_task ~name task' = schedule_ self { f = task'; name } in + + let run_task (task : task_with_name) : unit = let _ctx = before_task runner in + cur_span := Tracing_.enter_span task.name; (* run the task now, catching errors *) - (try Suspend_.with_suspend task ~run:(fun task' -> schedule_ self task') + (try + Suspend_.with_suspend task.f ~name:task.name ~run:run_another_task + ~on_suspend:exit_span_ with e -> let bt = Printexc.get_raw_backtrace () in on_exn e bt); + exit_span_ (); after_task runner _ctx in let main_loop () = let continue = ref true in while !continue do + assert (!cur_span = Tracing_.dummy_span); match Bb_queue.pop self.q with | task -> run_task task | exception Bb_queue.Closed -> continue := false @@ -84,10 +103,12 @@ let create ?(on_init_thread = default_thread_init_exit_) { threads = Array.make num_threads dummy; q = Bb_queue.create () } in + let run_async ~name f = schedule_ pool { f; name } in + let runner = Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async:(fun f -> schedule_ pool f) + ~run_async ~size:(fun () -> size_ pool) ~num_tasks:(fun () -> num_tasks_ pool) () diff --git a/src/fork_join.ml b/src/fork_join.ml index 8ad61cec..8a4b1fc3 100644 --- a/src/fork_join.ml +++ b/src/fork_join.ml @@ -48,7 +48,7 @@ module State_ = struct Suspend_.suspend { Suspend_.handle = - (fun ~run:_ suspension -> + (fun ~name:_ ~run:_ suspension -> while let old_st = A.get self in match old_st with @@ -113,19 +113,19 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit = max 1 (1 + (n / D_pool_.n_domains ())) in - let start_tasks ~run (suspension : Suspend_.suspension) = + let start_tasks ~name ~run (suspension : Suspend_.suspension) = let task_for ~offset ~len_range = match f offset (offset + len_range - 1) with | () -> if A.fetch_and_add missing (-len_range) = len_range then (* all tasks done successfully *) - suspension (Ok ()) + run ~name (fun () -> suspension (Ok ())) | exception exn -> let bt = Printexc.get_raw_backtrace () in if not (A.exchange has_failed true) then (* first one to fail, and [missing] must be >= 2 because we're not decreasing it. *) - suspension (Error (exn, bt)) + run ~name (fun () -> suspension (Error (exn, bt))) in let i = ref 0 in @@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit = let len_range = min chunk_size (n - offset) in assert (offset + len_range <= n); - run (fun () -> task_for ~offset ~len_range); + run ~name (fun () -> task_for ~offset ~len_range); i := !i + len_range done in @@ -143,9 +143,9 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit = Suspend_.suspend { Suspend_.handle = - (fun ~run suspension -> + (fun ~name ~run suspension -> (* run tasks, then we'll resume [suspension] *) - start_tasks ~run suspension); + start_tasks ~run ~name suspension); } ) diff --git a/src/fut.ml b/src/fut.ml index e1dd3f43..8ca3a41f 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -5,16 +5,16 @@ type 'a waiter = 'a or_error -> unit type 'a state = | Done of 'a or_error - | Waiting of 'a waiter list + | Waiting of { waiters: 'a waiter list } type 'a t = { st: 'a state A.t } [@@unboxed] type 'a promise = 'a t let make () = - let fut = { st = A.make (Waiting []) } in + let fut = { st = A.make (Waiting { waiters = [] }) } in fut, fut -let of_result x : _ t = { st = A.make (Done x) } +let[@inline] of_result x : _ t = { st = A.make (Done x) } let[@inline] return x : _ t = of_result (Ok x) let[@inline] fail e bt : _ t = of_result (Error (e, bt)) @@ -53,9 +53,8 @@ let on_result (self : _ t) (f : _ waiter) : unit = | Done x -> f x; false - | Waiting l -> - let must_retry = not (A.compare_and_set self.st st (Waiting (f :: l))) in - must_retry + | Waiting { waiters = l } -> + not (A.compare_and_set self.st st (Waiting { waiters = f :: l })) do Domain_.relax () done @@ -63,28 +62,31 @@ let on_result (self : _ t) (f : _ waiter) : unit = exception Already_fulfilled let fulfill (self : _ t) (r : _ result) : unit = + let fs = ref [] in while let st = A.get self.st in match st with | Done _ -> raise Already_fulfilled - | Waiting l -> + | Waiting { waiters = l } -> let did_swap = A.compare_and_set self.st st (Done r) in if did_swap then ( (* success, now call all the waiters *) - List.iter (fun f -> try f r with _ -> ()) l; + fs := l; false ) else true do Domain_.relax () - done + done; + List.iter (fun f -> try f r with _ -> ()) !fs; + () let[@inline] fulfill_idempotent self r = try fulfill self r with Already_fulfilled -> () (* ### combinators ### *) -let spawn ~on f : _ t = +let spawn ?name ~on f : _ t = let fut, promise = make () in let task () = @@ -97,13 +99,13 @@ let spawn ~on f : _ t = fulfill promise res in - Runner.run_async on task; + Runner.run_async ?name on task; fut -let spawn_on_current_runner f : _ t = +let spawn_on_current_runner ?name f : _ t = match Runner.get_current_runner () with | None -> failwith "Fut.spawn_on_current_runner: not running on a runner" - | Some on -> spawn ~on f + | Some on -> spawn ?name ~on f let reify_error (f : 'a t) : 'a or_error t = match peek f with @@ -413,9 +415,11 @@ let await (fut : 'a t) : 'a = Suspend_.suspend { Suspend_.handle = - (fun ~run k -> + (fun ~name ~run k -> on_result fut (function - | Ok _ -> run (fun () -> k (Ok ())) + | Ok _ -> + (* schedule continuation with the same name *) + run ~name (fun () -> k (Ok ())) | Error (exn, bt) -> (* fail continuation immediately *) k (Error (exn, bt)))); diff --git a/src/fut.mli b/src/fut.mli index 2dc31f2a..c8884ce7 100644 --- a/src/fut.mli +++ b/src/fut.mli @@ -81,11 +81,11 @@ val is_done : _ t -> bool (** {2 Combinators} *) -val spawn : on:Runner.t -> (unit -> 'a) -> 'a t +val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will hold its result. *) -val spawn_on_current_runner : (unit -> 'a) -> 'a t +val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a t (** This must be run from inside a runner, and schedules the new task on it as well. diff --git a/src/immediate_runner.ml b/src/immediate_runner.ml index d5e11284..db9725f5 100644 --- a/src/immediate_runner.ml +++ b/src/immediate_runner.ml @@ -1,9 +1,19 @@ include Runner +let run_async_ ~name f = + let sp = Tracing_.enter_span name in + try + let x = f () in + Tracing_.exit_span sp; + x + with e -> + let bt = Printexc.get_raw_backtrace () in + Tracing_.exit_span sp; + Printexc.raise_with_backtrace e bt + let runner : t = Runner.For_runner_implementors.create ~size:(fun () -> 0) ~num_tasks:(fun () -> 0) ~shutdown:(fun ~wait:_ () -> ()) - ~run_async:(fun f -> f ()) - () + ~run_async:run_async_ () diff --git a/src/moonpool.mli b/src/moonpool.mli index b12b739a..18c5abfa 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -23,10 +23,13 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.) *) -val run_async : Runner.t -> (unit -> unit) -> unit +val run_async : ?name:string -> Runner.t -> (unit -> unit) -> unit (** [run_async runner task] schedules the task to run on the given runner. This means [task()] will be executed at some point in the future, possibly in another thread. + @param name if provided and [Trace] is present in dependencies, a span + will be created when the task starts, and will stop when the task is over. + (since NEXT_RELEASE) @since 0.5 *) val recommended_thread_count : unit -> int @@ -35,13 +38,16 @@ val recommended_thread_count : unit -> int this because many of them will be blocked most of the time). @since 0.5 *) -val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t +val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t (** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns a future result for it. See {!Fut.spawn}. + @param name if provided and [Trace] is present in dependencies, + a span will be created for the future. (since NEXT_RELEASE) @since 0.5 *) -val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t +val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a Fut.t (** See {!Fut.spawn_on_current_runner}. + @param name see {!spawn}. since NEXT_RELEASE. @since 0.5 *) [@@@ifge 5.0] diff --git a/src/runner.ml b/src/runner.ml index 0fcf2392..437e24c4 100644 --- a/src/runner.ml +++ b/src/runner.ml @@ -3,7 +3,7 @@ module TLS = Thread_local_storage_ type task = unit -> unit type t = { - run_async: (unit -> unit) -> unit; + run_async: name:string -> task -> unit; shutdown: wait:bool -> unit -> unit; size: unit -> int; num_tasks: unit -> int; @@ -11,7 +11,7 @@ type t = { exception Shutdown -let[@inline] run_async (self : t) f : unit = self.run_async f +let[@inline] run_async ?(name = "") (self : t) f : unit = self.run_async ~name f let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () let[@inline] shutdown_without_waiting (self : t) : unit = @@ -20,9 +20,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit = let[@inline] num_tasks (self : t) : int = self.num_tasks () let[@inline] size (self : t) : int = self.size () -let run_wait_block self (f : unit -> 'a) : 'a = +let run_wait_block ?name self (f : unit -> 'a) : 'a = let q = Bb_queue.create () in - run_async self (fun () -> + run_async ?name self (fun () -> try let x = f () in Bb_queue.push q (Ok x) diff --git a/src/runner.mli b/src/runner.mli index b0af57ed..3b959496 100644 --- a/src/runner.mli +++ b/src/runner.mli @@ -33,13 +33,16 @@ val shutdown_without_waiting : t -> unit exception Shutdown -val run_async : t -> task -> unit +val run_async : ?name:string -> t -> task -> unit (** [run_async pool f] schedules [f] for later execution on the runner in one of the threads. [f()] will run on one of the runner's worker threads/domains. + @param name if provided and [Trace] is present in dependencies, a span + will be created when the task starts, and will stop when the task is over. + (since NEXT_RELEASE) @raise Shutdown if the runner was shut down before [run_async] was called. *) -val run_wait_block : t -> (unit -> 'a) -> 'a +val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a (** [run_wait_block pool f] schedules [f] for later execution on the pool, like {!run_async}. It then blocks the current thread until [f()] is done executing, @@ -47,7 +50,10 @@ val run_wait_block : t -> (unit -> 'a) -> 'a will raise it as well. {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} - about the required discipline to avoid deadlocks). *) + about the required discipline to avoid deadlocks). + @raise Shutdown if the runner was already shut down *) + +(** {2 Implementing runners} *) (** This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it. *) @@ -56,7 +62,7 @@ module For_runner_implementors : sig size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(task -> unit) -> + run_async:(name:string -> task -> unit) -> unit -> t (** Create a new runner. @@ -65,6 +71,10 @@ module For_runner_implementors : sig so that {!Fork_join} and other 5.x features work properly. *) val k_cur_runner : t option ref Thread_local_storage_.key + (** Key that should be used by each runner to store itself in TLS + on every thread it controls, so that tasks running on these threads + can access the runner. This is necessary for {!get_current_runner} + to work. *) end val get_current_runner : unit -> t option diff --git a/src/suspend_.ml b/src/suspend_.ml index 6555b6bc..575cf158 100644 --- a/src/suspend_.ml +++ b/src/suspend_.ml @@ -1,7 +1,9 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type task = unit -> unit -type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } +type suspension_handler = { + handle: name:string -> run:(name:string -> task -> unit) -> suspension -> unit; +} [@@unboxed] [@@@ifge 5.0] @@ -13,7 +15,8 @@ type _ Effect.t += Suspend : suspension_handler -> unit Effect.t let[@inline] suspend h = Effect.perform (Suspend h) -let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit = +let with_suspend ~name ~on_suspend ~(run : name:string -> task -> unit) + (f : unit -> unit) : unit = let module E = Effect.Deep in (* effect handler *) let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option = @@ -21,11 +24,12 @@ let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit = | Suspend h -> Some (fun k -> + on_suspend (); let k' : suspension = function | Ok () -> E.continue k () | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt in - h.handle ~run k') + h.handle ~name ~run k') | _ -> None in @@ -34,14 +38,16 @@ let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit = (* DLA interop *) let prepare_for_await () : Dla_.t = (* current state *) - let st : ((task -> unit) * suspension) option A.t = A.make None in + let st : (string * (name:string -> task -> unit) * suspension) option A.t = + A.make None + in let release () : unit = match A.exchange st None with | None -> () - | Some (run, k) -> run (fun () -> k (Ok ())) + | Some (name, run, k) -> run ~name (fun () -> k (Ok ())) and await () : unit = - suspend { handle = (fun ~run k -> A.set st (Some (run, k))) } + suspend { handle = (fun ~name ~run k -> A.set st (Some (name, run, k))) } in let t = { Dla_.release; await } in diff --git a/src/suspend_.mli b/src/suspend_.mli index 77cc06af..0334225f 100644 --- a/src/suspend_.mli +++ b/src/suspend_.mli @@ -8,12 +8,15 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type task = unit -> unit -type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } +type suspension_handler = { + handle: name:string -> run:(name:string -> task -> unit) -> suspension -> unit; +} [@@unboxed] (** The handler that knows what to do with the suspended computation. - The handler is given two things: + The handler is given a few things: + - the name (if any) of the current computation - the suspended computation (which can be resumed with a result eventually); - a [run] function that can be used to start tasks to perform some @@ -51,10 +54,18 @@ val suspend : suspension_handler -> unit val prepare_for_await : unit -> Dla_.t (** Our stub for DLA. Unstable. *) -val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit +val with_suspend : + name:string -> + on_suspend:(unit -> unit) -> + run:(name:string -> task -> unit) -> + (unit -> unit) -> + unit (** [with_suspend ~run f] runs [f()] in an environment where [suspend] will work. If [f()] suspends with suspension handler [h], this calls [h ~run k] where [k] is the suspension. + The suspension should always run in a new task, via [run]. + + @param on_suspend called when [f()] suspends itself. This will not do anything on OCaml 4.x. *) diff --git a/src/tracing_.dummy.ml b/src/tracing_.dummy.ml new file mode 100644 index 00000000..9977c6a8 --- /dev/null +++ b/src/tracing_.dummy.ml @@ -0,0 +1,4 @@ +let enabled () = false +let dummy_span = 0L +let enter_span _name = dummy_span +let exit_span = ignore diff --git a/src/tracing_.mli b/src/tracing_.mli new file mode 100644 index 00000000..beadc19f --- /dev/null +++ b/src/tracing_.mli @@ -0,0 +1,4 @@ +val dummy_span : int64 +val enter_span : string -> int64 +val exit_span : int64 -> unit +val enabled : unit -> bool diff --git a/src/tracing_.real.ml b/src/tracing_.real.ml new file mode 100644 index 00000000..a71d9b3b --- /dev/null +++ b/src/tracing_.real.ml @@ -0,0 +1,13 @@ +module Trace = Trace_core + +let enabled = Trace.enabled +let dummy_span = Int64.min_int +let dummy_file_ = "" + +let[@inline] enter_span name : int64 = + if name = "" then + dummy_span + else + Trace.enter_span ~__FILE__:dummy_file_ ~__LINE__:0 name + +let[@inline] exit_span sp = if sp <> dummy_span then Trace.exit_span sp diff --git a/src/ws_pool.ml b/src/ws_pool.ml index 8683040d..7fa930d1 100644 --- a/src/ws_pool.ml +++ b/src/ws_pool.ml @@ -13,10 +13,16 @@ module Id = struct let equal : t -> t -> bool = ( == ) end +type task_with_name = { + f: task; + name: string; +} + type worker_state = { pool_id_: Id.t; (** Unique per pool *) mutable thread: Thread.t; - q: task WSQ.t; (** Work stealing queue *) + q: task_with_name WSQ.t; (** Work stealing queue *) + mutable cur_span: int64; rng: Random.State.t; } (** State for a given worker. Only this worker is @@ -29,7 +35,8 @@ type state = { id_: Id.t; active: bool A.t; (** Becomes [false] when the pool is shutdown. *) workers: worker_state array; (** Fixed set of workers. *) - main_q: task Queue.t; (** Main queue for tasks coming from the outside *) + main_q: task_with_name Queue.t; + (** Main queue for tasks coming from the outside *) mutable n_waiting: int; (* protected by mutex *) mutable n_waiting_nonzero: bool; (** [n_waiting > 0] *) mutex: Mutex.t; @@ -65,9 +72,10 @@ let[@inline] try_wake_someone_ (self : state) : unit = ) (** Run [task] as is, on the pool. *) -let schedule_task_ (self : state) (w : worker_state option) (task : task) : unit - = +let schedule_task_ (self : state) ~name (w : worker_state option) (f : task) : + unit = (* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) + let task = { f; name } in match w with | Some w when Id.equal self.id_ w.pool_id_ -> (* we're on this same pool, schedule in the worker's state. Otherwise @@ -96,24 +104,36 @@ let schedule_task_ (self : state) (w : worker_state option) (task : task) : unit raise Shutdown (** Run this task, now. Must be called from a worker. *) -let run_task_now_ (self : state) ~runner task : unit = +let run_task_now_ (self : state) ~runner (w : worker_state) ~name task : unit = (* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) let (AT_pair (before_task, after_task)) = self.around_task in let _ctx = before_task runner in + + w.cur_span <- Tracing_.enter_span name; + let[@inline] exit_span_ () = + Tracing_.exit_span w.cur_span; + w.cur_span <- Tracing_.dummy_span + in + + let run_another_task ~name task' = + let w = find_current_worker_ () in + schedule_task_ self w ~name task' + in + (* run the task now, catching errors *) (try (* run [task()] and handle [suspend] in it *) - Suspend_.with_suspend task ~run:(fun task' -> - let w = find_current_worker_ () in - schedule_task_ self w task') + Suspend_.with_suspend task ~name ~run:run_another_task + ~on_suspend:exit_span_ with e -> let bt = Printexc.get_raw_backtrace () in self.on_exn e bt); + exit_span_ (); after_task runner _ctx -let[@inline] run_async_ (self : state) (task : task) : unit = +let[@inline] run_async_ (self : state) ~name (f : task) : unit = let w = find_current_worker_ () in - schedule_task_ self w task + schedule_task_ self w ~name f (* TODO: function to schedule many tasks from the outside. - build a queue @@ -122,8 +142,6 @@ let[@inline] run_async_ (self : state) (task : task) : unit = - wakeup all (broadcast) - unlock *) -let run = run_async - (** Wait on condition. Precondition: we hold the mutex. *) let[@inline] wait_ (self : state) : unit = self.n_waiting <- self.n_waiting + 1; @@ -132,10 +150,11 @@ let[@inline] wait_ (self : state) : unit = self.n_waiting <- self.n_waiting - 1; if self.n_waiting = 0 then self.n_waiting_nonzero <- false -exception Got_task of task +exception Got_task of task_with_name (** Try to steal a task *) -let try_to_steal_work_once_ (self : state) (w : worker_state) : task option = +let try_to_steal_work_once_ (self : state) (w : worker_state) : + task_with_name option = let init = Random.State.int w.rng (Array.length self.workers) in try @@ -160,7 +179,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit = match WSQ.pop w.q with | Some task -> try_wake_someone_ self; - run_task_now_ self ~runner task + run_task_now_ self ~runner w ~name:task.name task.f | None -> continue := false done @@ -173,7 +192,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit = worker_run_self_tasks_ self ~runner w; try_steal () and run_task task : unit = - run_task_now_ self ~runner task; + run_task_now_ self ~runner w ~name:task.name task.f; main () and try_steal () = match try_to_steal_work_once_ self w with @@ -231,7 +250,7 @@ type ('a, 'b) create_args = 'a (** Arguments used in {!create}. See {!create} for explanations. *) -let dummy_task_ () = assert false +let dummy_task_ = { f = ignore; name = "DUMMY_TASK" } let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) @@ -256,6 +275,7 @@ let create ?(on_init_thread = default_thread_init_exit_) { pool_id_; thread = dummy; + cur_span = Tracing_.dummy_span; q = WSQ.create ~dummy:dummy_task_ (); rng = Random.State.make [| i |]; }) @@ -279,7 +299,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let runner = Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async:(fun f -> run_async_ pool f) + ~run_async:(fun ~name f -> run_async_ pool ~name f) ~size:(fun () -> size_ pool) ~num_tasks:(fun () -> num_tasks_ pool) () diff --git a/src/ws_pool.mli b/src/ws_pool.mli index c13e4c75..578966a3 100644 --- a/src/ws_pool.mli +++ b/src/ws_pool.mli @@ -53,7 +53,3 @@ val with_ : (unit -> (t -> 'a) -> 'a, _) create_args Most parameters are the same as in {!create}. @since 0.3 *) - -val run : t -> (unit -> unit) -> unit - [@@deprecated "use run_async"] -(** deprecated alias to {!run_async} *) From dd9206b5b8658e6eef61fbfd250c713c0311146a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 16:12:03 -0500 Subject: [PATCH 2/7] use new tracing support in tests and benchs --- benchs/fib_rec.ml | 13 +++++++++---- benchs/pi.ml | 11 ++++++++++- test/effect-based/t_fork_join_heavy.ml | 2 ++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/benchs/fib_rec.ml b/benchs/fib_rec.ml index b0f1623d..25291e8c 100644 --- a/benchs/fib_rec.ml +++ b/benchs/fib_rec.ml @@ -1,4 +1,7 @@ open Moonpool +module Trace = Trace_core + +let ( let@ ) = ( @@ ) let rec fib_direct x = if x <= 1 then @@ -10,7 +13,7 @@ let cutoff = ref 20 let rec fib ~on x : int Fut.t = if x <= !cutoff then - Fut.spawn ~on (fun () -> fib_direct x) + Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) else let open Fut.Infix in let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in @@ -27,14 +30,14 @@ let fib_fj ~on x : int Fut.t = n1 + n2 ) in - Fut.spawn ~on (fun () -> fib_rec x) + Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) let fib_await ~on x : int Fut.t = let rec fib_rec x : int Fut.t = if x <= !cutoff then - Fut.spawn ~on (fun () -> fib_direct x) + Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) else - Fut.spawn ~on (fun () -> + Fut.spawn ~name:"fib" ~on (fun () -> let n1 = fib_rec (x - 1) in let n2 = fib_rec (x - 2) in let n1 = Fut.await n1 in @@ -66,6 +69,7 @@ let str_of_int_opt = function | Some i -> Printf.sprintf "Some %d" i let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fib.run" in let pool = lazy (create_pool ~kind ~psize ()) in let dl_pool = lazy @@ -108,6 +112,7 @@ let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit = Ws_pool.shutdown (Lazy.force pool) let () = + let@ () = Trace_tef.with_setup () in let n = ref 40 in let psize = ref None in let seq = ref false in diff --git a/benchs/pi.ml b/benchs/pi.ml index c8ef57b5..7e0dfd91 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -7,6 +7,7 @@ let j = ref 0 let spf = Printf.sprintf let run_sequential (num_steps : int) : float = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "pi.seq" in let step = 1. /. float num_steps in let sum = ref 0. in for i = 0 to num_steps - 1 do @@ -42,6 +43,11 @@ let run_par1 ~kind (num_steps : int) : float = (* one chunk of the work *) let run_task _idx_task : unit = + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "pi.slice" ~data:(fun () -> + [ "i", `Int _idx_task ]) + in + let sum = ref 0. in let i = ref 0 in while !i < num_steps do @@ -69,7 +75,7 @@ let run_fork_join ~kind num_steps : float = let step = 1. /. float num_steps in let global_sum = Lock.create 0. in - Ws_pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block ~name:"pi.fj" pool (fun () -> Fork_join.for_ ~chunk_size:(3 + (num_steps / num_tasks)) num_steps @@ -99,6 +105,9 @@ type mode = let () = let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in + let mode = ref Sequential in let n = ref 1000 in let time = ref false in diff --git a/test/effect-based/t_fork_join_heavy.ml b/test/effect-based/t_fork_join_heavy.ml index a981bee1..bacb1d18 100644 --- a/test/effect-based/t_fork_join_heavy.ml +++ b/test/effect-based/t_fork_join_heavy.ml @@ -33,10 +33,12 @@ let run ~min () = let l1, l2 = Fork_join.both (fun () -> + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fj.left" in Fork_join.map_list ~chunk_size (Fork_join.map_list ~chunk_size neg) l) (fun () -> + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fj.right" in Fork_join.map_list ~chunk_size (Fork_join.map_list ~chunk_size neg) ref_l1) From 192f866ea1bdee773cf6cba25544d8572ab5cce8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 16:12:14 -0500 Subject: [PATCH 3/7] chore: install depopts in CI --- .github/workflows/main.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 65325c61..91d18141 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -39,4 +39,6 @@ jobs: if: matrix.ocaml-compiler == '5.0' - run: opam exec -- dune build @install @runtest if: matrix.ocaml-compiler == '5.0' + - run: opam install trace thread-local-storage + - run: opam exec -- dune build @install From ef7d3700601826f5f260ddce28f9dd4c741e7d15 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 16:25:31 -0500 Subject: [PATCH 4/7] more tracing for `Fut` --- src/fut.ml | 45 ++++++++++++++++++++++++++------------------- src/fut.mli | 5 +++-- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/src/fut.ml b/src/fut.ml index 8ca3a41f..be473753 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -7,14 +7,18 @@ type 'a state = | Done of 'a or_error | Waiting of { waiters: 'a waiter list } -type 'a t = { st: 'a state A.t } [@@unboxed] +type 'a t = { + st: 'a state A.t; + name: string; +} + type 'a promise = 'a t -let make () = - let fut = { st = A.make (Waiting { waiters = [] }) } in +let make ?(name = "") () = + let fut = { st = A.make (Waiting { waiters = [] }); name } in fut, fut -let[@inline] of_result x : _ t = { st = A.make (Done x) } +let[@inline] of_result x : _ t = { st = A.make (Done x); name = "" } let[@inline] return x : _ t = of_result (Ok x) let[@inline] fail e bt : _ t = of_result (Error (e, bt)) @@ -115,7 +119,7 @@ let reify_error (f : 'a t) : 'a or_error t = on_result f (fun r -> fulfill promise (Ok r)); fut -let get_runner_ ?on () : Runner.t option = +let[@inline] get_runner_ ?on () : Runner.t option = match on with | Some _ as r -> r | None -> Runner.get_current_runner () @@ -131,20 +135,22 @@ let map ?on ~f fut : _ t = | Error e_bt -> Error e_bt in + let name = fut.name in match peek fut, get_runner_ ?on () with | Some res, None -> of_result @@ map_immediate_ res | Some res, Some runner -> - let fut2, promise = make () in - Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res); + let fut2, promise = make ~name () in + Runner.run_async ~name runner (fun () -> + fulfill promise @@ map_immediate_ res); fut2 | None, None -> - let fut2, promise = make () in + let fut2, promise = make ~name () in on_result fut (fun res -> fulfill promise @@ map_immediate_ res); fut2 | None, Some runner -> - let fut2, promise = make () in + let fut2, promise = make ~name () in on_result fut (fun res -> - Runner.run_async runner (fun () -> + Runner.run_async ~name runner (fun () -> fulfill promise @@ map_immediate_ res)); fut2 @@ -153,7 +159,7 @@ let join (fut : 'a t t) : 'a t = | Some (Ok f) -> f | Some (Error (e, bt)) -> fail e bt | None -> - let fut2, promise = make () in + let fut2, promise = make ~name:fut.name () in on_result fut (function | Ok sub_fut -> on_result sub_fut (fulfill promise) | Error _ as e -> fulfill promise e); @@ -176,19 +182,20 @@ let bind ?on ~f fut : _ t = on_result f_res_fut (fun r -> fulfill promise r) in + let name = fut.name in match peek fut, get_runner_ ?on () with | Some res, Some runner -> - let fut2, promise = make () in - Runner.run_async runner (bind_and_fulfill res promise); + let fut2, promise = make ~name () in + Runner.run_async ~name runner (bind_and_fulfill res promise); fut2 | Some res, None -> apply_f_to_res res | None, Some runner -> - let fut2, promise = make () in + let fut2, promise = make ~name () in on_result fut (fun r -> - Runner.run_async runner (bind_and_fulfill r promise)); + Runner.run_async ~name runner (bind_and_fulfill r promise)); fut2 | None, None -> - let fut2, promise = make () in + let fut2, promise = make ~name () in on_result fut (fun res -> bind_and_fulfill res promise ()); fut2 @@ -212,7 +219,7 @@ let both a b : _ t = | Some (Ok x), Some (Ok y) -> return (x, y) | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt | _ -> - let fut, promise = make () in + let fut, promise = make ~name:a.name () in let st = A.make `Neither in on_result a (function @@ -245,7 +252,7 @@ let choose a b : _ t = | _, Some (Ok y) -> return (Either.Right y) | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make () in + let fut, promise = make ~name:a.name () in let one_failure = A.make false in on_result a (function @@ -268,7 +275,7 @@ let choose_same a b : _ t = | _, Some (Ok y) -> return y | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make () in + let fut, promise = make ~name:a.name () in let one_failure = A.make false in on_result a (function diff --git a/src/fut.mli b/src/fut.mli index c8884ce7..77a1215a 100644 --- a/src/fut.mli +++ b/src/fut.mli @@ -26,8 +26,9 @@ type 'a promise (** A promise, which can be fulfilled exactly once to set the corresponding future *) -val make : unit -> 'a t * 'a promise -(** Make a new future with the associated promise *) +val make : ?name:string -> unit -> 'a t * 'a promise +(** Make a new future with the associated promise. + @param name name for the future, used for tracing. since NEXT_RELEASE. *) val on_result : 'a t -> ('a or_error -> unit) -> unit (** [on_result fut f] registers [f] to be called in the future From 8e9564a6f72fb3ec203473a962f1fb08e5a961ac Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 16:26:29 -0500 Subject: [PATCH 5/7] compat --- src/suspend_.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/suspend_.ml b/src/suspend_.ml index 575cf158..fceb6cc4 100644 --- a/src/suspend_.ml +++ b/src/suspend_.ml @@ -56,7 +56,7 @@ let prepare_for_await () : Dla_.t = [@@@ocaml.alert "+unstable"] [@@@else_] -let[@inline] with_suspend ~run:_ f = f () +let[@inline] with_suspend ~name:_ ~run:_ f = f () let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore } [@@@endif] From 4abc334ab33731e1aee79bbf668a7df67d0bc8b2 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 16:38:31 -0500 Subject: [PATCH 6/7] fix --- src/suspend_.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/suspend_.ml b/src/suspend_.ml index fceb6cc4..7824d917 100644 --- a/src/suspend_.ml +++ b/src/suspend_.ml @@ -56,7 +56,7 @@ let prepare_for_await () : Dla_.t = [@@@ocaml.alert "+unstable"] [@@@else_] -let[@inline] with_suspend ~name:_ ~run:_ f = f () +let[@inline] with_suspend ~name:_ ~on_suspend:_ ~run:_ f = f () let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore } [@@@endif] From 8d83d5b6912f4f2d1dad0f39338180d0db130b8d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 30 Jan 2024 17:06:20 -0500 Subject: [PATCH 7/7] perf: reduce size of `Fut` again --- src/fut.ml | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/src/fut.ml b/src/fut.ml index be473753..7fed5894 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -5,20 +5,24 @@ type 'a waiter = 'a or_error -> unit type 'a state = | Done of 'a or_error - | Waiting of { waiters: 'a waiter list } - -type 'a t = { - st: 'a state A.t; - name: string; -} + | Waiting of { + waiters: 'a waiter list; + name: string; + } +type 'a t = { st: 'a state A.t } [@@unboxed] type 'a promise = 'a t +let[@inline] get_name_ (self : _ t) = + match A.get self.st with + | Done _ -> "" + | Waiting { name; _ } -> name + let make ?(name = "") () = - let fut = { st = A.make (Waiting { waiters = [] }); name } in + let fut = { st = A.make (Waiting { waiters = []; name }) } in fut, fut -let[@inline] of_result x : _ t = { st = A.make (Done x); name = "" } +let[@inline] of_result x : _ t = { st = A.make (Done x) } let[@inline] return x : _ t = of_result (Ok x) let[@inline] fail e bt : _ t = of_result (Error (e, bt)) @@ -57,8 +61,8 @@ let on_result (self : _ t) (f : _ waiter) : unit = | Done x -> f x; false - | Waiting { waiters = l } -> - not (A.compare_and_set self.st st (Waiting { waiters = f :: l })) + | Waiting { waiters = l; name } -> + not (A.compare_and_set self.st st (Waiting { waiters = f :: l; name })) do Domain_.relax () done @@ -71,7 +75,7 @@ let fulfill (self : _ t) (r : _ result) : unit = let st = A.get self.st in match st with | Done _ -> raise Already_fulfilled - | Waiting { waiters = l } -> + | Waiting { waiters = l; name = _ } -> let did_swap = A.compare_and_set self.st st (Done r) in if did_swap then ( (* success, now call all the waiters *) @@ -135,7 +139,7 @@ let map ?on ~f fut : _ t = | Error e_bt -> Error e_bt in - let name = fut.name in + let name = get_name_ fut in match peek fut, get_runner_ ?on () with | Some res, None -> of_result @@ map_immediate_ res | Some res, Some runner -> @@ -159,7 +163,7 @@ let join (fut : 'a t t) : 'a t = | Some (Ok f) -> f | Some (Error (e, bt)) -> fail e bt | None -> - let fut2, promise = make ~name:fut.name () in + let fut2, promise = make ~name:(get_name_ fut) () in on_result fut (function | Ok sub_fut -> on_result sub_fut (fulfill promise) | Error _ as e -> fulfill promise e); @@ -182,7 +186,7 @@ let bind ?on ~f fut : _ t = on_result f_res_fut (fun r -> fulfill promise r) in - let name = fut.name in + let name = get_name_ fut in match peek fut, get_runner_ ?on () with | Some res, Some runner -> let fut2, promise = make ~name () in @@ -219,7 +223,7 @@ let both a b : _ t = | Some (Ok x), Some (Ok y) -> return (x, y) | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt | _ -> - let fut, promise = make ~name:a.name () in + let fut, promise = make ~name:(get_name_ a) () in let st = A.make `Neither in on_result a (function @@ -252,7 +256,7 @@ let choose a b : _ t = | _, Some (Ok y) -> return (Either.Right y) | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make ~name:a.name () in + let fut, promise = make ~name:(get_name_ a) () in let one_failure = A.make false in on_result a (function @@ -275,7 +279,7 @@ let choose_same a b : _ t = | _, Some (Ok y) -> return y | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make ~name:a.name () in + let fut, promise = make ~name:(get_name_ a) () in let one_failure = A.make false in on_result a (function