Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use main domain #14

Merged
merged 8 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ the workers of `pool`, as soon as one is available. No result is returned by `ru

```ocaml
# #require "threads";;
# let pool = Moonpool.Fifo_pool.create ~min:4 ();;
# let pool = Moonpool.Fifo_pool.create ~num_threads:4 ();;
val pool : Moonpool.Runner.t = <abstr>

# begin
Expand Down
22 changes: 14 additions & 8 deletions benchs/fib_rec.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])

let create_pool ~psize ~kind () =
match kind with
| "fifo" -> Fifo_pool.create ~min:psize ()
| "pool" -> Ws_pool.create ~min:psize ()
| "fifo" -> Fifo_pool.create ?num_threads:psize ()
| "pool" -> Ws_pool.create ?num_threads:psize ()
| _ -> assert false

let str_of_int_opt = function
| None -> "None"
| Some i -> Printf.sprintf "Some %d" i

let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit =
let pool = lazy (create_pool ~kind ~psize ()) in
let dl_pool =
Expand All @@ -80,14 +84,16 @@ let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit =
Domainslib.Task.run pool (fun () ->
Domainslib.Task.await pool @@ fib_dl ~pool n)
) else if fj then (
Printf.printf "compute fib %d using fork-join with pool size=%d\n%!" n
psize;
Printf.printf "compute fib %d using fork-join with pool size=%s\n%!" n
(str_of_int_opt psize);
fib_fj ~on:(Lazy.force pool) n |> Fut.wait_block_exn
) else if await then (
Printf.printf "compute fib %d using await with pool size=%d\n%!" n psize;
Printf.printf "compute fib %d using await with pool size=%s\n%!" n
(str_of_int_opt psize);
fib_await ~on:(Lazy.force pool) n |> Fut.wait_block_exn
) else (
Printf.printf "compute fib %d with pool size=%d\n%!" n psize;
Printf.printf "compute fib %d with pool size=%s\n%!" n
(str_of_int_opt psize);
fib ~on:(Lazy.force pool) n |> Fut.wait_block_exn
)
in
Expand All @@ -103,7 +109,7 @@ let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit =

let () =
let n = ref 40 in
let psize = ref 16 in
let psize = ref None in
let seq = ref false in
let niter = ref 3 in
let kind = ref "pool" in
Expand All @@ -112,7 +118,7 @@ let () =
let fj = ref false in
let opts =
[
"-psize", Arg.Set_int psize, " pool size";
"-psize", Arg.Int (fun i -> psize := Some i), " pool size";
"-n", Arg.Set_int n, " fib <n>";
"-seq", Arg.Set seq, " sequential";
"-dl", Arg.Set dl, " domainslib";
Expand Down
8 changes: 4 additions & 4 deletions benchs/pi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ let with_pool ~kind f =
match kind with
| "pool" ->
if !j = 0 then
Ws_pool.with_ ~per_domain:1 f
Ws_pool.with_ f
else
Ws_pool.with_ ~min:!j f
Ws_pool.with_ ~num_threads:!j f
| "fifo" ->
if !j = 0 then
Fifo_pool.with_ ~per_domain:1 f
Fifo_pool.with_ f
else
Fifo_pool.with_ ~min:!j f
Fifo_pool.with_ ~num_threads:!j f
| _ -> assert false

(** Run in parallel using {!Fut.for_} *)
Expand Down
12 changes: 9 additions & 3 deletions src/d_pool_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ type worker_state = {
including a work queue and a thread refcount; and the domain itself,
if any, in a separate option because it might outlive its own state. *)
let domains_ : (worker_state option * Domain_.t option) Lock.t array =
(* number of domains we spawn. Note that we spawn n-1 domains
because there already is the main domain running. *)
let n = max 1 (Domain_.recommended_number () - 1) in
let n = max 1 (Domain_.recommended_number ()) in
Array.init n (fun _ -> Lock.create (None, None))

(** main work loop for a domain worker.
Expand Down Expand Up @@ -84,6 +82,14 @@ let work_ idx (st : worker_state) : unit =
done;
()

(* special case for main domain: we start a worker immediately *)
let () =
assert (Domain_.is_main_domain ());
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
(* thread that stays alive *)
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
domains_.(0) <- Lock.create (Some w, None)

let[@inline] n_domains () : int = Array.length domains_

let run_on (i : int) (f : unit -> unit) : unit =
Expand Down
2 changes: 2 additions & 0 deletions src/domain_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ let get_id (self : t) : int = (Domain.get_id self :> int)
let spawn : _ -> t = Domain.spawn
let relax = Domain.cpu_relax
let join = Domain.join
let is_main_domain = Domain.is_main_domain

[@@@ocaml.alert "+unstable"]
[@@@else_]
Expand All @@ -21,5 +22,6 @@ let get_id (self : t) : int = Thread.id self
let spawn f : t = Thread.create f ()
let relax () = Thread.yield ()
let join = Thread.join
let is_main_domain () = true

[@@@endif]
6 changes: 3 additions & 3 deletions src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries threads either
(select thread_local_storage.ml from
(thread-local-storage -> thread_local_storage.stub.ml)
(-> thread_local_storage.real.ml))
(select thread_local_storage_.ml from
(thread-local-storage -> thread_local_storage_.stub.ml)
(-> thread_local_storage_.real.ml))
(select dla_.ml from
(domain-local-await -> dla_.real.ml)
( -> dla_.dummy.ml))))
19 changes: 8 additions & 11 deletions src/fifo_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,23 @@ type ('a, 'b) create_args =
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int ->
?per_domain:int ->
?num_threads:int ->
'a

let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t =
?around_task ?num_threads () : t =
(* wrapper *)
let around_task =
match around_task with
| Some (f, g) -> AT_pair (f, g)
| None -> AT_pair (ignore, fun _ _ -> ())
in

(* number of threads to run *)
let min_threads = max 1 min_threads in
let num_domains = D_pool_.n_domains () in
assert (num_domains >= 1);
let num_threads = max min_threads (num_domains * per_domain) in

(* number of threads to run *)
let num_threads = Util_pool_.num_threads ?num_threads () in

(* make sure we don't bias towards the first domain(s) in {!D_pool_} *)
let offset = Random.int num_domains in
Expand Down Expand Up @@ -141,11 +139,10 @@ let create ?(on_init_thread = default_thread_init_exit_)

runner

let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
() f =
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () f
=
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
()
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads ()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool
8 changes: 4 additions & 4 deletions src/fifo_pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ type ('a, 'b) create_args =
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int ->
?per_domain:int ->
?num_threads:int ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)

val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread in the pool.
@param min minimum size of the pool. See {!Pool.create_args}.
@param per_domain is the number of threads allocated per domain in the fixed
domain pool. See {!Pool.create_args}.
The default is [Domain.recommended_domain_count()], ie one worker per
CPU core.
On OCaml 4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each worker thread in the pool.
@param around_task a pair of [before, after] functions
ran around each task. See {!Pool.create_args}.
Expand Down
9 changes: 9 additions & 0 deletions src/immediate_runner.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include Runner

let runner : t =
Runner.For_runner_implementors.create
~size:(fun () -> 0)
~num_tasks:(fun () -> 0)
~shutdown:(fun ~wait:_ () -> ())
~run_async:(fun f -> f ())
()
20 changes: 20 additions & 0 deletions src/immediate_runner.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(** Runner that runs tasks immediately in the caller thread.

Whenever a task is submitted to this runner via [Runner.run_async r task],
the task is run immediately in the caller thread as [task()].
There are no background threads, no resource, this is just a trivial
implementation of the interface.

This can be useful when an implementation needs a runner, but there isn't
enough work to justify starting an actual full thread pool.

Another situation is when threads cannot be used at all (e.g. because you
plan to call [Unix.fork] later).

@since NEXT_RELEASE
*)

include module type of Runner

val runner : t
(** The trivial runner that actually runs tasks at the calling point. *)
10 changes: 7 additions & 3 deletions src/moonpool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ let start_thread_on_some_domain f x =
let did = Random.int (D_pool_.n_domains ()) in
D_pool_.run_on_and_wait did (fun () -> Thread.create f x)

let recommended_thread_count () = Domain_.recommended_number ()
let spawn = Fut.spawn

module Atomic = Atomic_
module Blocking_queue = Bb_queue
module Bounded_queue = Bounded_queue
module Chan = Chan
module Fifo_pool = Fifo_pool
module Fork_join = Fork_join
module Fut = Fut
module Lock = Lock
module Immediate_runner = Immediate_runner
module Pool = Fifo_pool
module Ws_pool = Ws_pool
module Runner = Runner
module Fifo_pool = Fifo_pool
module Thread_local_storage = Thread_local_storage
module Thread_local_storage = Thread_local_storage_
module Ws_pool = Ws_pool

module Private = struct
module Ws_deque_ = Ws_deque_
Expand Down
16 changes: 14 additions & 2 deletions src/moonpool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,33 @@
module Ws_pool = Ws_pool
module Fifo_pool = Fifo_pool
module Runner = Runner
module Immediate_runner = Immediate_runner

module Pool = Fifo_pool
[@@deprecated "use Fifo_pool or Ws_pool"]
[@@deprecated "use Fifo_pool or Ws_pool to be more explicit"]
(** Default pool. Please explicitly pick an implementation instead. *)

val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
(** Similar to {!Thread.create}, but it picks a background domain at random
to run the thread. This ensures that we don't always pick the same domain
to run all the various threads needed in an application (timers, event loops, etc.) *)

val recommended_thread_count : unit -> int
(** Number of threads recommended to saturate the CPU.
For IO pools this makes little sense (you might want more threads than
this because many of them will be blocked most of the time).
@since NEXT_RELEASE *)

val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
and returns a future result for it. See {!Fut.spawn}.
@since NEXT_RELEASE *)

module Lock = Lock
module Fut = Fut
module Chan = Chan
module Fork_join = Fork_join
module Thread_local_storage = Thread_local_storage
module Thread_local_storage = Thread_local_storage_

(** A simple blocking queue.

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
11 changes: 11 additions & 0 deletions src/util_pool_.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
let num_threads ?num_threads () : int =
let n_domains = D_pool_.n_domains () in

(* number of threads to run *)
let num_threads =
match num_threads with
| Some j -> max 1 j
| None -> n_domains
in

num_threads
5 changes: 5 additions & 0 deletions src/util_pool_.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
(** Utils for pools *)

val num_threads : ?num_threads:int -> unit -> int
(** Number of threads a pool should have.
@param num_threads user-specified number of threads *)
19 changes: 7 additions & 12 deletions src/ws_pool.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module WSQ = Ws_deque_
module A = Atomic_
module TLS = Thread_local_storage
module TLS = Thread_local_storage_
include Runner

let ( let@ ) = ( @@ )
Expand Down Expand Up @@ -198,26 +198,22 @@ type ('a, 'b) create_args =
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int ->
?per_domain:int ->
?num_threads:int ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)

let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t =
?around_task ?num_threads () : t =
(* wrapper *)
let around_task =
match around_task with
| Some (f, g) -> AT_pair (f, g)
| None -> AT_pair (ignore, fun _ _ -> ())
in

(* number of threads to run *)
let min_threads = max 1 min_threads in
let num_domains = D_pool_.n_domains () in
assert (num_domains >= 1);
let num_threads = max min_threads (num_domains * per_domain) in
let num_threads = Util_pool_.num_threads ?num_threads () in

(* make sure we don't bias towards the first domain(s) in {!D_pool_} *)
let offset = Random.int num_domains in
Expand Down Expand Up @@ -301,11 +297,10 @@ let create ?(on_init_thread = default_thread_init_exit_)

runner

let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
() f =
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () f
=
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
()
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads ()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool
16 changes: 6 additions & 10 deletions src/ws_pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,19 @@ type ('a, 'b) create_args =
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int ->
?per_domain:int ->
?num_threads:int ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)

val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread
in the pool.
@param min minimum size of the pool. It will be at least [1] internally,
so [0] or negative values make no sense.
@param per_domain is the number of threads allocated per domain in the fixed
domain pool. The default value is [0], but setting, say, [~per_domain:2]
means that if there are [8] domains (which might be the case on an 8-core machine)
then the minimum size of the pool is [16].
If both [min] and [per_domain] are specified, the maximum of both
[min] and [per_domain * num_of_domains] is used.
@param num_threads size of the pool, ie. number of worker threads.
It will be at least [1] internally, so [0] or negative values make no sense.
The default is [Domain.recommended_domain_count()], ie one worker
thread per CPU core.
On OCaml 4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each thread in the pool
@param around_task a pair of [before, after], where [before pool] is called
before a task is processed,
Expand Down
Loading