Skip to content

Commit

Permalink
Add Rwlock
Browse files Browse the repository at this point in the history
Co-authored-by: Jan Midtgaard <mail@janmidtgaard.dk>
  • Loading branch information
polytypic and jmid committed Jan 15, 2025
1 parent fc7ca3e commit a00046c
Show file tree
Hide file tree
Showing 27 changed files with 934 additions and 69 deletions.
3 changes: 2 additions & 1 deletion bench/bench_htbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ let run_one ~budgetf ~n_domains ?(n_ops = 400 * Util.iter_factor)
(if n_domains = 1 then "" else "s")
percent_mem
in
Times.record ~budgetf ~n_domains ~before ~init ~work ()
Times.record ~budgetf ~n_domains ~n_warmups:1 ~n_runs_min:1 ~before ~init
~work ()
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config

let run_suite ~budgetf =
Expand Down
112 changes: 112 additions & 0 deletions bench/bench_rwlock_htbl.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
open Multicore_bench

module Htbl = struct
open Picos_std_sync
module Htbl = Picos_aux_htbl

type ('k, 'v) t = { htbl : ('k, 'v) Htbl.t; rwlock : Rwlock.t }

let create ?hashed_type () =
{
htbl = Htbl.create ?hashed_type ();
rwlock = Rwlock.create ~padded:true ();
}

let remove_all t = Htbl.remove_all t.htbl

let find_opt t key =
Rwlock.lock_ro t.rwlock;
let result =
match Htbl.find_exn t.htbl key with
| value -> Some value
| exception Not_found -> None
in
Rwlock.unlock t.rwlock;
result

let try_add t key value =
Rwlock.lock t.rwlock;
let result = Htbl.try_add t.htbl key value in
Rwlock.unlock t.rwlock;
result

let try_remove t key =
Rwlock.lock t.rwlock;
let result = Htbl.try_remove t.htbl key in
Rwlock.unlock t.rwlock;
result
end

module Key = struct
type t = int

let equal = Int.equal
let hash = Fun.id
end

let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor)
?(n_keys = 1000) ~percent_mem ?(percent_add = (100 - percent_mem + 1) / 2)
?(prepopulate = true) () =
let limit_mem = percent_mem in
let limit_add = percent_mem + percent_add in

assert (0 <= limit_mem && limit_mem <= 100);
assert (limit_mem <= limit_add && limit_add <= 100);

let t = Htbl.create ~hashed_type:(module Key) () in

let n_ops = (100 + percent_mem) * n_ops / 100 in
let n_ops = n_ops * n_domains in

let n_ops_todo = Countdown.create ~n_domains () in

let before () =
let _ : _ Seq.t = Htbl.remove_all t in
Countdown.non_atomic_set n_ops_todo n_ops
in
let init i =
Scheduler.run @@ fun () ->
let state = Random.State.make_self_init () in
if prepopulate then begin
let n = ((i + 1) * n_keys / n_domains) - (i * n_keys / n_domains) in
for _ = 1 to n do
let value = Random.State.bits state in
let key = value mod n_keys in
Htbl.try_add t key value |> ignore
done
end;
state
in
let wrap _ _ = Scheduler.run in
let work domain_index state =
let rec work () =
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in
if n <> 0 then begin
for _ = 1 to n do
let value = Random.State.bits state in
let op = (value asr 20) mod 100 in
let key = value mod n_keys in
if op < percent_mem then Htbl.find_opt t key |> ignore
else if op < limit_add then Htbl.try_add t key value |> ignore
else Htbl.try_remove t key |> ignore
done;
work ()
end
in
work ()
in

let config =
Printf.sprintf "%d worker%s, %d%% reads" n_domains
(if n_domains = 1 then "" else "s")
percent_mem
in
Times.record ~budgetf ~n_domains ~n_warmups:1 ~n_runs_min:1 ~before ~init
~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ] [ 10; 50; 90; 95; 100 ]
|> List.concat_map @@ fun (n_domains, percent_mem) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one ~budgetf ~n_domains ~percent_mem ()
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
(run %{test} -brief "Picos_mpmcq")
(run %{test} -brief "Picos_mpscq")
(run %{test} -brief "Picos_htbl")
(run %{test} -brief "Picos_htbl with Rwlock")
(run %{test} -brief "Picos_stdio")
(run %{test} -brief "Picos_sync Stream")
(run %{test} -brief "Fib")
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ let benchmarks =
("Picos_mpmcq", Bench_mpmcq.run_suite);
("Picos_mpscq", Bench_mpscq.run_suite);
("Picos_htbl", Bench_htbl.run_suite);
("Picos_htbl with Rwlock", Bench_rwlock_htbl.run_suite);
("Picos_stdio", Bench_stdio.run_suite);
("Picos_sync Stream", Bench_stream.run_suite);
("Fib", Bench_fib.run_suite);
Expand Down
43 changes: 29 additions & 14 deletions lib/picos_std.awaitable/picos_std_awaitable.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ module Awaitable = struct
end

module Awaiters = struct
let equal_bit = 0b01
let num_bits = 1
let one = 1 lsl num_bits

type _ tdt =
| Zero : [> `Zero ] tdt
| One : {
awaitable : 'a awaitable; (* Might also want to clear this *)
mutable value : 'a; (* This is mutable to avoid space leaks *)
trigger : Trigger.t;
mutable counter : int;
mutable counter_and_bits : int;
mutable next : min0;
}
-> [> `One ] tdt
Expand All @@ -53,22 +57,22 @@ module Awaitable = struct
let[@inline] snoc t (One tail_r as tail) =
match t with
| Min1 (One head_r) ->
tail_r.counter <- head_r.counter + 1;
tail_r.counter_and_bits <- head_r.counter_and_bits + one;
Many { head = One head_r; prev = One head_r; tail }
| Min1 (Many many_r as many) ->
exec many;
let (One prev_r as prev) = many_r.tail in
tail_r.counter <- prev_r.counter + 1;
tail_r.counter_and_bits <- prev_r.counter_and_bits + one;
Many { head = many_r.head; prev; tail }

external as1 : min0 -> is1 = "%identity"

let[@inline] awaitable_of (One r : is1) = Packed.Packed r.awaitable
let[@inline] counter_of (One r : is1) = r.counter
let[@inline] counter_of (One r : is1) = r.counter_and_bits lsr num_bits

let[@inline] next_of (One r : is1) ~tail =
let[@inline] next_of (One r as one : is1) ~tail =
let next = as1 r.next in
let counter = r.counter in
let counter = counter_of one in
if counter_of tail - counter < counter_of next - counter then tail
else next

Expand All @@ -77,7 +81,12 @@ module Awaitable = struct

let[@inline] generalize (One r : is1) = One r
let[@inline] is_signaled (One r : is1) = Trigger.is_signaled r.trigger
let[@inline] is_signalable (One r : is1) = get r.awaitable != r.value

let[@inline] is_signalable (One r : is1) =
Bool.to_int (get r.awaitable != r.value)
lxor (r.counter_and_bits land equal_bit)
<> 0

let[@inline] await (One r : is1) = Trigger.await r.trigger
let[@inline] clear (One r : is1) = r.value <- Obj.magic ()

Expand Down Expand Up @@ -156,7 +165,8 @@ module Awaitable = struct
let trigger = Trigger.create () in
if bits land (1 lsl i) = 0 then Trigger.signal trigger;
let awaitable = make 0 and next = Min0 Zero in
One { awaitable; value = 1; trigger; counter = 0; next }
One
{ awaitable; value = 1; trigger; counter_and_bits = 0; next }
in
let queue = ref (Min1 (make 0)) in
for i = 1 to n - 1 do
Expand Down Expand Up @@ -284,7 +294,7 @@ module Awaitable = struct
let awaitable = make 0 and next = Min0 Zero in
if signaled_bits land (1 lsl i) = 0 then
Trigger.signal trigger;
One { awaitable; value; trigger; counter = 0; next }
One { awaitable; value; trigger; counter_and_bits = 0; next }
in
let queue = ref (Min1 (make 0)) in
for i = 1 to n - 1 do
Expand Down Expand Up @@ -387,12 +397,16 @@ module Awaitable = struct
done
with Not_found -> ()

type op = Isnt | Is

module Awaiter = struct
type t = Awaiters.is1

let add_as (type a) (t : a awaitable) trigger value =
let add_as (type a) (t : a awaitable) trigger op value =
let one : Awaiters.is1 =
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
let counter_and_bits = match op with Isnt -> 0 | Is -> 1 in
One
{ awaitable = t; value; trigger; counter_and_bits; next = Min0 Zero }
in
let backoff = ref Backoff.default in
while
Expand All @@ -409,16 +423,16 @@ module Awaitable = struct

let add (type a) (t : a awaitable) trigger =
let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in
add_as t trigger unique_value
add_as t trigger Isnt unique_value

let remove one =
Awaiters.signal_and_clear one;
update (Awaiters.awaitable_of one) ~signal:false ~count:1
end

let await t value =
let await_until t op value =
let trigger = Trigger.create () in
let one = Awaiter.add_as t trigger value in
let one = Awaiter.add_as t trigger op value in
if Awaiters.is_signalable one then Awaiter.remove one
else
match Awaiters.await one with
Expand All @@ -428,6 +442,7 @@ module Awaitable = struct
update (Awaiters.awaitable_of one) ~signal:true ~count:1;
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)

let[@inline] await t value = await_until t Isnt value
let[@inline] broadcast t = update (Packed t) ~signal:true ~count:Int.max_int
let[@inline] signal t = update (Packed t) ~signal:true ~count:1

Expand Down
25 changes: 17 additions & 8 deletions lib/picos_std.awaitable/picos_std_awaitable.mli
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,29 @@ module Awaitable : sig
{{:https://en.wikipedia.org/wiki/Thundering_herd_problem} thundering herd}
phenomena. *)

val await : 'a t -> 'a -> unit
(** [await awaitable before] suspends the current fiber until the awaitable is
explicitly {!signal}ed and has a value other than [before].
(** *)
type op = Isnt | Is

val await_until : 'a t -> op -> 'a -> unit
(** [await_until awaitable op comparand] suspends the current fiber until the
awaitable is explicitly {!signal}ed and has a value that either is or
isn't equal to [comparand] as specified by the {!op}.
⚠️ This operation is subject to the
{{:https://en.wikipedia.org/wiki/ABA_problem} ABA} problem. An [await] for
value other than [A] may not return after the awaitable is signaled while
having the value [B], because at a later point the awaitable has again the
value [A]. Furthermore, by the time an [await] for value other than [A]
returns, the awaitable might already again have the value [A].
{{:https://en.wikipedia.org/wiki/ABA_problem} ABA} problem. For example,
an await for value other than [A] may not return after the awaitable is
signaled while having the value [B], because at a later point the
awaitable has again the value [A]. Furthermore, by the time an await for
value other than [A] returns, the awaitable might already again have the
value [A].
⚠️ Atomic operations that change the value of an awaitable do not
implicitly wake up awaiters. *)

val await : 'a t -> 'a -> unit
(** [await awaitable before] is equivalent to
[await_until awaitable Isnt before]. *)

module Awaiter : sig
(** Low level interface for more flexible waiting. *)

Expand Down
26 changes: 26 additions & 0 deletions lib/picos_std.sync/cond.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
open Picos
open Picos_std_awaitable

type t = unit Awaitable.t

let create ?padded () = Awaitable.make ?padded ()

let[@inline] wait t mutex ~lock ~unlock =
let trigger = Trigger.create () in
let awaiter = Awaitable.Awaiter.add t trigger in
unlock mutex;
let lock_forbidden mutex =
let fiber = Fiber.current () in
let forbid = Fiber.exchange fiber ~forbid:true in
lock mutex;
Fiber.set fiber ~forbid
in
match Trigger.await trigger with
| None -> lock_forbidden mutex
| Some exn_bt ->
Awaitable.Awaiter.remove awaiter;
lock_forbidden mutex;
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)

let signal = Awaitable.signal
let broadcast = Awaitable.broadcast
8 changes: 8 additions & 0 deletions lib/picos_std.sync/dune
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
(rule
(package picos_std)
(action
(progn
(copy rwlock.slim.ml rwlock.ml)
(copy lock.counting.ml lock.ml))))

(library
(name picos_std_sync)
(public_name picos_std.sync)
(libraries
(re_export picos_std.event)
picos_std.awaitable
picos
backoff
multicore-magic))
Expand Down
26 changes: 26 additions & 0 deletions lib/picos_std.sync/intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module type Condition = sig
type mutex
(** Represents a mutual exclusion lock. *)

type t
(** Represents a condition variable *)

val create : ?padded:bool -> unit -> t
(** [create ()] return a new condition variable. *)

val wait : t -> mutex -> unit
(** [wait condition mutex] unlocks the [mutex], waits for the [condition], and
locks the [mutex] before returning or raising due to the operation being
canceled.
ℹ️ If the fiber has been canceled and propagation of cancelation is
allowed, this may raise the cancelation exception. *)

val signal : t -> unit
(** [signal condition] wakes up one fiber waiting on the [condition] variable
unless there are no such fibers. *)

val broadcast : t -> unit
(** [broadcast condition] wakes up all the fibers waiting on the [condition]
variable. *)
end
Loading

0 comments on commit a00046c

Please sign in to comment.