From 37f8c1059c6fdc25b878ec465bda396996af811b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 11:59:20 -0400 Subject: [PATCH 1/8] perf: also use the main domain, along with n-1 other ones we always keep a thread alive on the main domain as a worker for new tasks, but other domains can still come and go to manage resources properly in case a pool is started and used only for a short while. --- src/d_pool_.ml | 12 +++++++++--- src/domain_.ml | 2 ++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/d_pool_.ml b/src/d_pool_.ml index fb78535b..d12a4f6a 100644 --- a/src/d_pool_.ml +++ b/src/d_pool_.ml @@ -18,9 +18,7 @@ type worker_state = { including a work queue and a thread refcount; and the domain itself, if any, in a separate option because it might outlive its own state. *) let domains_ : (worker_state option * Domain_.t option) Lock.t array = - (* number of domains we spawn. Note that we spawn n-1 domains - because there already is the main domain running. *) - let n = max 1 (Domain_.recommended_number () - 1) in + let n = max 1 (Domain_.recommended_number ()) in Array.init n (fun _ -> Lock.create (None, None)) (** main work loop for a domain worker. @@ -84,6 +82,14 @@ let work_ idx (st : worker_state) : unit = done; () +(* special case for main domain: we start a worker immediately *) +let () = + assert (Domain_.is_main_domain ()); + let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + (* thread that stays alive *) + ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t); + domains_.(0) <- Lock.create (Some w, None) + let[@inline] n_domains () : int = Array.length domains_ let run_on (i : int) (f : unit -> unit) : unit = diff --git a/src/domain_.ml b/src/domain_.ml index 60d1e669..3050282f 100644 --- a/src/domain_.ml +++ b/src/domain_.ml @@ -9,6 +9,7 @@ let get_id (self : t) : int = (Domain.get_id self :> int) let spawn : _ -> t = Domain.spawn let relax = Domain.cpu_relax let join = Domain.join +let is_main_domain = Domain.is_main_domain [@@@ocaml.alert "+unstable"] [@@@else_] @@ -21,5 +22,6 @@ let get_id (self : t) : int = Thread.id self let spawn f : t = Thread.create f () let relax () = Thread.yield () let join = Thread.join +let is_main_domain () = true [@@@endif] From f6a2e9a0ad66b85c409a6e72c4c8735b6c803107 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 12:29:25 -0400 Subject: [PATCH 2/8] feat: add `Moonpool.recommended_thread_count` --- src/moonpool.ml | 2 ++ src/moonpool.mli | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/src/moonpool.ml b/src/moonpool.ml index b4118536..b46bc123 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -2,6 +2,8 @@ let start_thread_on_some_domain f x = let did = Random.int (D_pool_.n_domains ()) in D_pool_.run_on_and_wait did (fun () -> Thread.create f x) +let recommended_thread_count () = Domain_.recommended_number () + module Atomic = Atomic_ module Blocking_queue = Bb_queue module Bounded_queue = Bounded_queue diff --git a/src/moonpool.mli b/src/moonpool.mli index b744dc51..4360c6ee 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -22,6 +22,12 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.) *) +val recommended_thread_count : unit -> int +(** Number of threads recommended to saturate the CPU. + For IO pools this makes little sense (you might want more threads than + this because many of them will be blocked most of the time). + @since NEXT_RELEASE *) + module Lock = Lock module Fut = Fut module Chan = Chan From 703ffde303121590dc45bd43781bee0c5e6b2a6f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 12:33:16 -0400 Subject: [PATCH 3/8] add `No_runner`: a runner that doesn't do anything in the background The idea is that you might have APIs that want a runner, but the work is too trivial to require a full actual thread pool. In this case use `No_runner.runner` and calls to `run_async runner f` will turn into `f()`. --- src/moonpool.ml | 6 ++++-- src/moonpool.mli | 8 +++++++- src/no_runner.ml | 9 +++++++++ src/no_runner.mli | 6 ++++++ 4 files changed, 26 insertions(+), 3 deletions(-) create mode 100644 src/no_runner.ml create mode 100644 src/no_runner.mli diff --git a/src/moonpool.ml b/src/moonpool.ml index b46bc123..498a9fb3 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -3,19 +3,21 @@ let start_thread_on_some_domain f x = D_pool_.run_on_and_wait did (fun () -> Thread.create f x) let recommended_thread_count () = Domain_.recommended_number () +let spawn = Fut.spawn module Atomic = Atomic_ module Blocking_queue = Bb_queue module Bounded_queue = Bounded_queue module Chan = Chan +module Fifo_pool = Fifo_pool module Fork_join = Fork_join module Fut = Fut module Lock = Lock +module No_runner = No_runner module Pool = Fifo_pool -module Ws_pool = Ws_pool module Runner = Runner -module Fifo_pool = Fifo_pool module Thread_local_storage = Thread_local_storage +module Ws_pool = Ws_pool module Private = struct module Ws_deque_ = Ws_deque_ diff --git a/src/moonpool.mli b/src/moonpool.mli index 4360c6ee..05b8649c 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -12,9 +12,10 @@ module Ws_pool = Ws_pool module Fifo_pool = Fifo_pool module Runner = Runner +module No_runner = No_runner module Pool = Fifo_pool -[@@deprecated "use Fifo_pool or Ws_pool"] +[@@deprecated "use Fifo_pool or Ws_pool to be more explicit"] (** Default pool. Please explicitly pick an implementation instead. *) val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t @@ -28,6 +29,11 @@ val recommended_thread_count : unit -> int this because many of them will be blocked most of the time). @since NEXT_RELEASE *) +val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t +(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) + and returns a future result for it. See {!Fut.spawn}. + @since NEXT_RELEASE *) + module Lock = Lock module Fut = Fut module Chan = Chan diff --git a/src/no_runner.ml b/src/no_runner.ml new file mode 100644 index 00000000..d5e11284 --- /dev/null +++ b/src/no_runner.ml @@ -0,0 +1,9 @@ +include Runner + +let runner : t = + Runner.For_runner_implementors.create + ~size:(fun () -> 0) + ~num_tasks:(fun () -> 0) + ~shutdown:(fun ~wait:_ () -> ()) + ~run_async:(fun f -> f ()) + () diff --git a/src/no_runner.mli b/src/no_runner.mli new file mode 100644 index 00000000..2c295adc --- /dev/null +++ b/src/no_runner.mli @@ -0,0 +1,6 @@ +(** Runner that runs in the caller, not in the background. *) + +include module type of Runner + +val runner : t +(** The trivial runner that actually runs tasks at the calling point. *) From 22624441fd554c3c59b45674ee7496f82904bd53 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 12:48:41 -0400 Subject: [PATCH 4/8] rename no_runner to immediate_runner --- src/{no_runner.ml => immediate_runner.ml} | 0 src/immediate_runner.mli | 20 ++++++++++++++++++++ src/moonpool.ml | 2 +- src/moonpool.mli | 2 +- src/no_runner.mli | 6 ------ 5 files changed, 22 insertions(+), 8 deletions(-) rename src/{no_runner.ml => immediate_runner.ml} (100%) create mode 100644 src/immediate_runner.mli delete mode 100644 src/no_runner.mli diff --git a/src/no_runner.ml b/src/immediate_runner.ml similarity index 100% rename from src/no_runner.ml rename to src/immediate_runner.ml diff --git a/src/immediate_runner.mli b/src/immediate_runner.mli new file mode 100644 index 00000000..ed017eba --- /dev/null +++ b/src/immediate_runner.mli @@ -0,0 +1,20 @@ +(** Runner that runs tasks immediately in the caller thread. + + Whenever a task is submitted to this runner via [Runner.run_async r task], + the task is run immediately in the caller thread as [task()]. + There are no background threads, no resource, this is just a trivial + implementation of the interface. + + This can be useful when an implementation needs a runner, but there isn't + enough work to justify starting an actual full thread pool. + + Another situation is when threads cannot be used at all (e.g. because you + plan to call [Unix.fork] later). + + @since NEXT_RELEASE +*) + +include module type of Runner + +val runner : t +(** The trivial runner that actually runs tasks at the calling point. *) diff --git a/src/moonpool.ml b/src/moonpool.ml index 498a9fb3..cb82f668 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -13,7 +13,7 @@ module Fifo_pool = Fifo_pool module Fork_join = Fork_join module Fut = Fut module Lock = Lock -module No_runner = No_runner +module Immediate_runner = Immediate_runner module Pool = Fifo_pool module Runner = Runner module Thread_local_storage = Thread_local_storage diff --git a/src/moonpool.mli b/src/moonpool.mli index 05b8649c..40b78891 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -12,7 +12,7 @@ module Ws_pool = Ws_pool module Fifo_pool = Fifo_pool module Runner = Runner -module No_runner = No_runner +module Immediate_runner = Immediate_runner module Pool = Fifo_pool [@@deprecated "use Fifo_pool or Ws_pool to be more explicit"] diff --git a/src/no_runner.mli b/src/no_runner.mli deleted file mode 100644 index 2c295adc..00000000 --- a/src/no_runner.mli +++ /dev/null @@ -1,6 +0,0 @@ -(** Runner that runs in the caller, not in the background. *) - -include module type of Runner - -val runner : t -(** The trivial runner that actually runs tasks at the calling point. *) From d09da9c0923da24e3a2122636a9d1cdcff9212c6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 13:00:15 -0400 Subject: [PATCH 5/8] breaking: change interface for number of threads now the user can specify `num_threads`; if not provided a sensible default is picked. --- src/fifo_pool.ml | 19 ++++++++----------- src/fifo_pool.mli | 8 ++++---- src/util_pool_.ml | 11 +++++++++++ src/util_pool_.mli | 5 +++++ src/ws_pool.ml | 17 ++++++----------- src/ws_pool.mli | 16 ++++++---------- 6 files changed, 40 insertions(+), 36 deletions(-) create mode 100644 src/util_pool_.ml create mode 100644 src/util_pool_.mli diff --git a/src/fifo_pool.ml b/src/fifo_pool.ml index 044e0013..1a95d715 100644 --- a/src/fifo_pool.ml +++ b/src/fifo_pool.ml @@ -56,13 +56,12 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t = + ?around_task ?num_threads () : t = (* wrapper *) let around_task = match around_task with @@ -70,11 +69,10 @@ let create ?(on_init_thread = default_thread_init_exit_) | None -> AT_pair (ignore, fun _ _ -> ()) in - (* number of threads to run *) - let min_threads = max 1 min_threads in let num_domains = D_pool_.n_domains () in - assert (num_domains >= 1); - let num_threads = max min_threads (num_domains * per_domain) in + + (* number of threads to run *) + let num_threads = Util_pool_.num_threads ?num_threads () in (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) let offset = Random.int num_domains in @@ -141,11 +139,10 @@ let create ?(on_init_thread = default_thread_init_exit_) runner -let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () f = +let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () f + = let pool = - create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () + create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool diff --git a/src/fifo_pool.mli b/src/fifo_pool.mli index 252083c5..4371db58 100644 --- a/src/fifo_pool.mli +++ b/src/fifo_pool.mli @@ -21,8 +21,7 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a (** Arguments used in {!create}. See {!create} for explanations. *) @@ -30,8 +29,9 @@ val create : (unit -> t, _) create_args (** [create ()] makes a new thread pool. @param on_init_thread called at the beginning of each new thread in the pool. @param min minimum size of the pool. See {!Pool.create_args}. - @param per_domain is the number of threads allocated per domain in the fixed - domain pool. See {!Pool.create_args}. + The default is [Domain.recommended_domain_count()], ie one worker per + CPU core. + On OCaml 4 the default is [4] (since there is only one domain). @param on_exit_thread called at the end of each worker thread in the pool. @param around_task a pair of [before, after] functions ran around each task. See {!Pool.create_args}. diff --git a/src/util_pool_.ml b/src/util_pool_.ml new file mode 100644 index 00000000..8207062a --- /dev/null +++ b/src/util_pool_.ml @@ -0,0 +1,11 @@ +let num_threads ?num_threads () : int = + let n_domains = D_pool_.n_domains () in + + (* number of threads to run *) + let num_threads = + match num_threads with + | Some j -> max 1 j + | None -> n_domains + in + + num_threads diff --git a/src/util_pool_.mli b/src/util_pool_.mli new file mode 100644 index 00000000..68fdde22 --- /dev/null +++ b/src/util_pool_.mli @@ -0,0 +1,5 @@ +(** Utils for pools *) + +val num_threads : ?num_threads:int -> unit -> int +(** Number of threads a pool should have. + @param num_threads user-specified number of threads *) diff --git a/src/ws_pool.ml b/src/ws_pool.ml index ca5d2500..179d555a 100644 --- a/src/ws_pool.ml +++ b/src/ws_pool.ml @@ -198,14 +198,13 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a (** Arguments used in {!create}. See {!create} for explanations. *) let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t = + ?around_task ?num_threads () : t = (* wrapper *) let around_task = match around_task with @@ -213,11 +212,8 @@ let create ?(on_init_thread = default_thread_init_exit_) | None -> AT_pair (ignore, fun _ _ -> ()) in - (* number of threads to run *) - let min_threads = max 1 min_threads in let num_domains = D_pool_.n_domains () in - assert (num_domains >= 1); - let num_threads = max min_threads (num_domains * per_domain) in + let num_threads = Util_pool_.num_threads ?num_threads () in (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) let offset = Random.int num_domains in @@ -301,11 +297,10 @@ let create ?(on_init_thread = default_thread_init_exit_) runner -let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () f = +let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () f + = let pool = - create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () + create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool diff --git a/src/ws_pool.mli b/src/ws_pool.mli index 4775024c..c13e4c75 100644 --- a/src/ws_pool.mli +++ b/src/ws_pool.mli @@ -26,8 +26,7 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a (** Arguments used in {!create}. See {!create} for explanations. *) @@ -35,14 +34,11 @@ val create : (unit -> t, _) create_args (** [create ()] makes a new thread pool. @param on_init_thread called at the beginning of each new thread in the pool. - @param min minimum size of the pool. It will be at least [1] internally, - so [0] or negative values make no sense. - @param per_domain is the number of threads allocated per domain in the fixed - domain pool. The default value is [0], but setting, say, [~per_domain:2] - means that if there are [8] domains (which might be the case on an 8-core machine) - then the minimum size of the pool is [16]. - If both [min] and [per_domain] are specified, the maximum of both - [min] and [per_domain * num_of_domains] is used. + @param num_threads size of the pool, ie. number of worker threads. + It will be at least [1] internally, so [0] or negative values make no sense. + The default is [Domain.recommended_domain_count()], ie one worker + thread per CPU core. + On OCaml 4 the default is [4] (since there is only one domain). @param on_exit_thread called at the end of each thread in the pool @param around_task a pair of [before, after], where [before pool] is called before a task is processed, From 8aaed6d95139a2f84360a94ec0ab920eb60abfaf Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 13:19:44 -0400 Subject: [PATCH 6/8] fix tests to use new API --- README.md | 2 +- benchs/fib_rec.ml | 22 ++++++++++++++-------- benchs/pi.ml | 8 ++++---- test/effect-based/t_fib1.ml | 4 ++-- test/effect-based/t_fib_fork_join.ml | 4 ++-- test/effect-based/t_fib_fork_join_all.ml | 4 ++-- test/effect-based/t_fork_join.ml | 8 ++++---- test/effect-based/t_fork_join_heavy.ml | 2 +- test/effect-based/t_futs1.ml | 2 +- test/effect-based/t_many.ml | 8 ++++---- test/effect-based/t_sort.ml | 2 +- test/t_bench1.ml | 2 +- test/t_chan_train.ml | 2 +- test/t_fib.ml | 4 ++-- test/t_fib_rec.ml | 6 +++--- test/t_futs1.ml | 4 ++-- test/t_props.ml | 4 ++-- test/t_resource.ml | 4 ++-- test/t_tree_futs.ml | 4 ++-- test/t_unfair.ml | 4 ++-- 20 files changed, 53 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index ab451e08..b06a1975 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ the workers of `pool`, as soon as one is available. No result is returned by `ru ```ocaml # #require "threads";; -# let pool = Moonpool.Fifo_pool.create ~min:4 ();; +# let pool = Moonpool.Fifo_pool.create ~num_threads:4 ();; val pool : Moonpool.Runner.t = # begin diff --git a/benchs/fib_rec.ml b/benchs/fib_rec.ml index d3df44df..571b8495 100644 --- a/benchs/fib_rec.ml +++ b/benchs/fib_rec.ml @@ -57,10 +57,14 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let create_pool ~psize ~kind () = match kind with - | "fifo" -> Fifo_pool.create ~min:psize () - | "pool" -> Ws_pool.create ~min:psize () + | "fifo" -> Fifo_pool.create ?num_threads:psize () + | "pool" -> Ws_pool.create ?num_threads:psize () | _ -> assert false +let str_of_int_opt = function + | None -> "None" + | Some i -> Printf.sprintf "Some %d" i + let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit = let pool = lazy (create_pool ~kind ~psize ()) in let dl_pool = @@ -80,14 +84,16 @@ let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit = Domainslib.Task.run pool (fun () -> Domainslib.Task.await pool @@ fib_dl ~pool n) ) else if fj then ( - Printf.printf "compute fib %d using fork-join with pool size=%d\n%!" n - psize; + Printf.printf "compute fib %d using fork-join with pool size=%s\n%!" n + (str_of_int_opt psize); fib_fj ~on:(Lazy.force pool) n |> Fut.wait_block_exn ) else if await then ( - Printf.printf "compute fib %d using await with pool size=%d\n%!" n psize; + Printf.printf "compute fib %d using await with pool size=%s\n%!" n + (str_of_int_opt psize); fib_await ~on:(Lazy.force pool) n |> Fut.wait_block_exn ) else ( - Printf.printf "compute fib %d with pool size=%d\n%!" n psize; + Printf.printf "compute fib %d with pool size=%s\n%!" n + (str_of_int_opt psize); fib ~on:(Lazy.force pool) n |> Fut.wait_block_exn ) in @@ -103,7 +109,7 @@ let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit = let () = let n = ref 40 in - let psize = ref 16 in + let psize = ref None in let seq = ref false in let niter = ref 3 in let kind = ref "pool" in @@ -112,7 +118,7 @@ let () = let fj = ref false in let opts = [ - "-psize", Arg.Set_int psize, " pool size"; + "-psize", Arg.Int (fun i -> psize := Some i), " pool size"; "-n", Arg.Set_int n, " fib "; "-seq", Arg.Set seq, " sequential"; "-dl", Arg.Set dl, " domainslib"; diff --git a/benchs/pi.ml b/benchs/pi.ml index 65304a80..c8ef57b5 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -21,14 +21,14 @@ let with_pool ~kind f = match kind with | "pool" -> if !j = 0 then - Ws_pool.with_ ~per_domain:1 f + Ws_pool.with_ f else - Ws_pool.with_ ~min:!j f + Ws_pool.with_ ~num_threads:!j f | "fifo" -> if !j = 0 then - Fifo_pool.with_ ~per_domain:1 f + Fifo_pool.with_ f else - Fifo_pool.with_ ~min:!j f + Fifo_pool.with_ ~num_threads:!j f | _ -> assert false (** Run in parallel using {!Fut.for_} *) diff --git a/test/effect-based/t_fib1.ml b/test/effect-based/t_fib1.ml index ca3f2861..a7c8ebee 100644 --- a/test/effect-based/t_fib1.ml +++ b/test/effect-based/t_fib1.ml @@ -26,13 +26,13 @@ let fib ~on x : int Fut.t = let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let fib_40 : int = - let pool = Ws_pool.create ~min:8 () in + let pool = Ws_pool.create ~num_threads:8 () in fib ~on:pool 40 |> Fut.wait_block_exn let () = Printf.printf "fib 40 = %d\n%!" fib_40 let run_test () = - let pool = Ws_pool.create ~min:8 () in + let pool = Ws_pool.create ~num_threads:8 () in assert ( List.init 10 (fib ~on:pool) diff --git a/test/effect-based/t_fib_fork_join.ml b/test/effect-based/t_fib_fork_join.ml index bdf60337..4e6639b2 100644 --- a/test/effect-based/t_fib_fork_join.ml +++ b/test/effect-based/t_fib_fork_join.ml @@ -27,13 +27,13 @@ let fib ~on x : int Fut.t = let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let fib_40 : int = - let pool = Ws_pool.create ~min:8 () in + let pool = Ws_pool.create ~num_threads:8 () in fib ~on:pool 40 |> Fut.wait_block_exn let () = Printf.printf "fib 40 = %d\n%!" fib_40 let run_test () = - let pool = Ws_pool.create ~min:8 () in + let pool = Ws_pool.create ~num_threads:8 () in assert ( List.init 10 (fib ~on:pool) diff --git a/test/effect-based/t_fib_fork_join_all.ml b/test/effect-based/t_fib_fork_join_all.ml index ed82902e..3caee9b9 100644 --- a/test/effect-based/t_fib_fork_join_all.ml +++ b/test/effect-based/t_fib_fork_join_all.ml @@ -22,13 +22,13 @@ let rec fib x : int = ) let fib_40 : int = - let@ pool = Ws_pool.with_ ~min:8 () in + let@ pool = Ws_pool.with_ ~num_threads:8 () in Fut.spawn ~on:pool (fun () -> fib 40) |> Fut.wait_block_exn let () = Printf.printf "fib 40 = %d\n%!" fib_40 let run_test () = - let@ pool = Ws_pool.with_ ~min:8 () in + let@ pool = Ws_pool.with_ ~num_threads:8 () in let fut = Fut.spawn ~on:pool (fun () -> diff --git a/test/effect-based/t_fork_join.ml b/test/effect-based/t_fork_join.ml index 5b467187..5c7134ca 100644 --- a/test/effect-based/t_fork_join.ml +++ b/test/effect-based/t_fork_join.ml @@ -5,7 +5,7 @@ let ( let@ ) = ( @@ ) open! Moonpool -let pool = Ws_pool.create ~min:4 () +let pool = Ws_pool.create ~num_threads:4 () let () = let x = @@ -270,7 +270,7 @@ end let t_eval = let arb = Q.set_stats [ "size", Evaluator.size ] Evaluator.arb in Q.Test.make ~name:"same eval" arb (fun e -> - let@ pool = Ws_pool.with_ ~min:4 () in + let@ pool = Ws_pool.with_ ~num_threads:4 () in (* Printf.eprintf "eval %s\n%!" (Evaluator.show e); *) let x = Evaluator.eval_seq e in let y = Evaluator.eval_fork_join ~pool e in @@ -288,7 +288,7 @@ let t_for_nested ~min ~chunk_size () = let ref_l2 = List.map (List.map neg) ref_l1 in let l1, l2 = - let@ pool = Ws_pool.with_ ~min () in + let@ pool = Ws_pool.with_ ~num_threads:min () in let@ () = Ws_pool.run_wait_block pool in let l1 = Fork_join.map_list ~chunk_size (Fork_join.map_list ~chunk_size neg) l @@ -310,7 +310,7 @@ let t_map ~chunk_size () = Q.Test.make ~name:"map1" Q.(small_list small_int |> Q.set_stats [ "len", List.length ]) (fun l -> - let@ pool = Ws_pool.with_ ~min:4 () in + let@ pool = Ws_pool.with_ ~num_threads:4 () in let@ () = Ws_pool.run_wait_block pool in let a1 = diff --git a/test/effect-based/t_fork_join_heavy.ml b/test/effect-based/t_fork_join_heavy.ml index ad9f7044..a981bee1 100644 --- a/test/effect-based/t_fork_join_heavy.ml +++ b/test/effect-based/t_fork_join_heavy.ml @@ -27,7 +27,7 @@ let run ~min () = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "step" in let l1, l2 = - let@ pool = Ws_pool.with_ ~min () in + let@ pool = Ws_pool.with_ ~num_threads:min () in let@ () = Ws_pool.run_wait_block pool in let l1, l2 = diff --git a/test/effect-based/t_futs1.ml b/test/effect-based/t_futs1.ml index 182ca9d5..4df18226 100644 --- a/test/effect-based/t_futs1.ml +++ b/test/effect-based/t_futs1.ml @@ -2,7 +2,7 @@ open! Moonpool -let pool = Ws_pool.create ~min:4 () +let pool = Ws_pool.create ~num_threads:4 () let () = let fut = Array.init 10 (fun i -> Fut.spawn ~on:pool (fun () -> i)) in diff --git a/test/effect-based/t_many.ml b/test/effect-based/t_many.ml index 4362932c..6a2b5918 100644 --- a/test/effect-based/t_many.ml +++ b/test/effect-based/t_many.ml @@ -30,19 +30,19 @@ let run ~pool () = let () = (print_endline "with fifo"; - let@ pool = Fifo_pool.with_ ~min:4 () in + let@ pool = Fifo_pool.with_ ~num_threads:4 () in run ~pool ()); (print_endline "with WS(1)"; - let@ pool = Ws_pool.with_ ~min:1 () in + let@ pool = Ws_pool.with_ ~num_threads:1 () in run ~pool ()); (print_endline "with WS(2)"; - let@ pool = Ws_pool.with_ ~min:2 () in + let@ pool = Ws_pool.with_ ~num_threads:2 () in run ~pool ()); (print_endline "with WS(4)"; - let@ pool = Ws_pool.with_ ~min:4 () in + let@ pool = Ws_pool.with_ ~num_threads:4 () in run ~pool ()); () diff --git a/test/effect-based/t_sort.ml b/test/effect-based/t_sort.ml index 8d3fe17c..8ccc372f 100644 --- a/test/effect-based/t_sort.ml +++ b/test/effect-based/t_sort.ml @@ -59,7 +59,7 @@ let rec quicksort arr i len : unit = (fun () -> quicksort arr !low (len - (!low - i))) ) -let pool = Moonpool.Ws_pool.create ~min:8 () +let pool = Moonpool.Ws_pool.create ~num_threads:8 () let () = let arr = Array.init 400_000 (fun _ -> Random.int 300_000) in diff --git a/test/t_bench1.ml b/test/t_bench1.ml index 95cd87a5..cd1a8bfd 100644 --- a/test/t_bench1.ml +++ b/test/t_bench1.ml @@ -8,7 +8,7 @@ let rec fib x = let run ~psize ~n ~j () : _ Fut.t = Printf.printf "pool size=%d, n=%d, j=%d\n%!" psize n j; - let pool = Ws_pool.create ~min:psize ~per_domain:0 () in + let pool = Ws_pool.create ~num_threads:psize () in (* TODO: a ppx for tracy so we can use instrumentation *) let loop () = diff --git a/test/t_chan_train.ml b/test/t_chan_train.ml index bb3e24f7..132d5540 100644 --- a/test/t_chan_train.ml +++ b/test/t_chan_train.ml @@ -1,7 +1,7 @@ open Moonpool (* large pool, some of our tasks below are long lived *) -let pool = Ws_pool.create ~min:30 () +let pool = Ws_pool.create ~num_threads:30 () open (val Fut.infix pool) diff --git a/test/t_fib.ml b/test/t_fib.ml index 3a98e395..3fc53bf9 100644 --- a/test/t_fib.ml +++ b/test/t_fib.ml @@ -4,8 +4,8 @@ let ( let@ ) = ( @@ ) let with_pool ~kind () f = match kind with - | `Fifo_pool -> Fifo_pool.with_ ~min:4 () f - | `Ws_pool -> Ws_pool.with_ ~min:4 () f + | `Fifo_pool -> Fifo_pool.with_ ~num_threads:4 () f + | `Ws_pool -> Ws_pool.with_ ~num_threads:4 () f let rec fib x = if x <= 1 then diff --git a/test/t_fib_rec.ml b/test/t_fib_rec.ml index 286e6aac..94e206b7 100644 --- a/test/t_fib_rec.ml +++ b/test/t_fib_rec.ml @@ -25,7 +25,7 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let fib_40 : int lazy_t = lazy (let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fib40" in - let pool = Fifo_pool.create ~min:8 () in + let pool = Fifo_pool.create ~num_threads:8 () in let r = fib ~on:pool 40 |> Fut.wait_block_exn in Ws_pool.shutdown pool; r) @@ -49,12 +49,12 @@ let run_test ~pool () = let run_test_size ~size () = Printf.printf "test pool(%d)\n%!" size; - let@ pool = Ws_pool.with_ ~min:size () in + let@ pool = Ws_pool.with_ ~num_threads:size () in run_test ~pool () let run_test_fifo ~size () = Printf.printf "test fifo(%d)\n%!" size; - let@ pool = Fifo_pool.with_ ~min:size () in + let@ pool = Fifo_pool.with_ ~num_threads:size () in run_test ~pool () let setup_counter () = diff --git a/test/t_futs1.ml b/test/t_futs1.ml index ee2d96a6..03a1ac13 100644 --- a/test/t_futs1.ml +++ b/test/t_futs1.ml @@ -1,7 +1,7 @@ open! Moonpool -let pool = Ws_pool.create ~min:4 () -let pool2 = Ws_pool.create ~min:2 () +let pool = Ws_pool.create ~num_threads:4 () +let pool2 = Ws_pool.create ~num_threads:2 () let () = let fut = Fut.return 1 in diff --git a/test/t_props.ml b/test/t_props.ml index 9fa64fbe..698650fd 100644 --- a/test/t_props.ml +++ b/test/t_props.ml @@ -7,8 +7,8 @@ let add_test t = tests := t :: !tests let with_pool ~kind () f = match kind with - | `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f - | `Ws_pool -> Ws_pool.with_ ~min:4 ~per_domain:1 () f + | `Fifo_pool -> Fifo_pool.with_ () f + | `Ws_pool -> Ws_pool.with_ () f let () = add_test @@ fun ~kind -> diff --git a/test/t_resource.ml b/test/t_resource.ml index c990f708..4c20e9fb 100644 --- a/test/t_resource.ml +++ b/test/t_resource.ml @@ -4,8 +4,8 @@ let ( let@ ) = ( @@ ) let with_pool ~kind () f = match kind with - | `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f - | `Ws_pool -> Ws_pool.with_ ~min:4 ~per_domain:1 () f + | `Fifo_pool -> Fifo_pool.with_ () f + | `Ws_pool -> Ws_pool.with_ () f (* test proper resource handling *) let run ~kind () = diff --git a/test/t_tree_futs.ml b/test/t_tree_futs.ml index 3507be0a..0bc96a03 100644 --- a/test/t_tree_futs.ml +++ b/test/t_tree_futs.ml @@ -4,8 +4,8 @@ let ( let@ ) = ( @@ ) let with_pool ~kind ~j () f = match kind with - | `Fifo_pool -> Fifo_pool.with_ ~min:j () f - | `Ws_pool -> Ws_pool.with_ ~min:j () f + | `Fifo_pool -> Fifo_pool.with_ ~num_threads:j () f + | `Ws_pool -> Ws_pool.with_ ~num_threads:j () f type 'a tree = | Leaf of 'a diff --git a/test/t_unfair.ml b/test/t_unfair.ml index f535a450..cee4373e 100644 --- a/test/t_unfair.ml +++ b/test/t_unfair.ml @@ -20,8 +20,8 @@ let run ~kind () = in match kind with - | `Simple -> Fifo_pool.create ~min:3 ~on_init_thread ~around_task () - | `Ws_pool -> Ws_pool.create ~min:3 ~on_init_thread ~around_task () + | `Simple -> Fifo_pool.create ~num_threads:3 ~on_init_thread ~around_task () + | `Ws_pool -> Ws_pool.create ~num_threads:3 ~on_init_thread ~around_task () in (* make all threads busy *) From 64f49b39070d6faaeee8d895f4fffe42e307981e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 29 Oct 2023 18:28:13 -0400 Subject: [PATCH 7/8] fix: rename Thread_local_storage_ to not collide with the library --- src/dune | 6 +++--- src/moonpool.ml | 2 +- src/moonpool.mli | 2 +- src/{thread_local_storage.mli => thread_local_storage_.mli} | 0 ..._local_storage.real.ml => thread_local_storage_.real.ml} | 0 ..._local_storage.stub.ml => thread_local_storage_.stub.ml} | 0 6 files changed, 5 insertions(+), 5 deletions(-) rename src/{thread_local_storage.mli => thread_local_storage_.mli} (100%) rename src/{thread_local_storage.real.ml => thread_local_storage_.real.ml} (100%) rename src/{thread_local_storage.stub.ml => thread_local_storage_.stub.ml} (100%) diff --git a/src/dune b/src/dune index 5275ab40..59005b54 100644 --- a/src/dune +++ b/src/dune @@ -6,9 +6,9 @@ (action (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (libraries threads either - (select thread_local_storage.ml from - (thread-local-storage -> thread_local_storage.stub.ml) - (-> thread_local_storage.real.ml)) + (select thread_local_storage_.ml from + (thread-local-storage -> thread_local_storage_.stub.ml) + (-> thread_local_storage_.real.ml)) (select dla_.ml from (domain-local-await -> dla_.real.ml) ( -> dla_.dummy.ml)))) diff --git a/src/moonpool.ml b/src/moonpool.ml index cb82f668..21b4ccec 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -16,7 +16,7 @@ module Lock = Lock module Immediate_runner = Immediate_runner module Pool = Fifo_pool module Runner = Runner -module Thread_local_storage = Thread_local_storage +module Thread_local_storage = Thread_local_storage_ module Ws_pool = Ws_pool module Private = struct diff --git a/src/moonpool.mli b/src/moonpool.mli index 40b78891..0e46bd02 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -38,7 +38,7 @@ module Lock = Lock module Fut = Fut module Chan = Chan module Fork_join = Fork_join -module Thread_local_storage = Thread_local_storage +module Thread_local_storage = Thread_local_storage_ (** A simple blocking queue. diff --git a/src/thread_local_storage.mli b/src/thread_local_storage_.mli similarity index 100% rename from src/thread_local_storage.mli rename to src/thread_local_storage_.mli diff --git a/src/thread_local_storage.real.ml b/src/thread_local_storage_.real.ml similarity index 100% rename from src/thread_local_storage.real.ml rename to src/thread_local_storage_.real.ml diff --git a/src/thread_local_storage.stub.ml b/src/thread_local_storage_.stub.ml similarity index 100% rename from src/thread_local_storage.stub.ml rename to src/thread_local_storage_.stub.ml From 8f9c33ba2c686f9fa86415a138cb9fd55d828064 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 29 Oct 2023 18:41:02 -0400 Subject: [PATCH 8/8] fix compilation error --- src/ws_pool.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ws_pool.ml b/src/ws_pool.ml index 179d555a..44432112 100644 --- a/src/ws_pool.ml +++ b/src/ws_pool.ml @@ -1,6 +1,6 @@ module WSQ = Ws_deque_ module A = Atomic_ -module TLS = Thread_local_storage +module TLS = Thread_local_storage_ include Runner let ( let@ ) = ( @@ )