From 97acd4b611dbbbeed35e816eb317072ff2c65095 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Fri, 30 Dec 2022 11:58:31 +0000 Subject: [PATCH] Make Eio.Semaphore lock-free This uses the new Cells module to replace the use of a mutex. Co-authored-by: Vesa Karvonen --- Makefile | 1 + lib_eio/sem_state.ml | 176 ++++++++++++++++++++ lib_eio/semaphore.ml | 65 +++----- lib_eio/tests/dscheck/dune | 10 +- lib_eio/tests/dscheck/test_semaphore.ml | 49 ++++++ lib_eio/tests/semaphore.md | 210 ++++++++++++++++++++++++ tests/semaphore.md | 96 +++++++++++ 7 files changed, 566 insertions(+), 41 deletions(-) create mode 100644 lib_eio/sem_state.ml create mode 100644 lib_eio/tests/dscheck/test_semaphore.ml create mode 100644 lib_eio/tests/semaphore.md create mode 100644 tests/semaphore.md diff --git a/Makefile b/Makefile index 8b21ff30b..8934f79a0 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ test_luv: EIO_BACKEND=luv dune runtest dscheck: + dune exec -- ./lib_eio/tests/dscheck/test_semaphore.exe dune exec -- ./lib_eio/tests/dscheck/test_cells.exe docker: diff --git a/lib_eio/sem_state.ml b/lib_eio/sem_state.ml new file mode 100644 index 000000000..59df3f18a --- /dev/null +++ b/lib_eio/sem_state.ml @@ -0,0 +1,176 @@ +(* A lock-free semaphore, using Cells. + + We have a number of resources (1 in the case of a mutex). Each time a user + wants a resource, the user decrements the counter. When finished with the + resource, the user increments the counter again. + + If there are more users than resources then the counter will be negative. If + a user decrements the counter to a non-negative value then it gets ownership + of one of the free resources. If it decrements the counter to a negative + value then it must wait (by allocating a cell). When a user with a resource + increments the counter *from* a negative value, that user is responsible for + resuming one waiting cell (transferring ownership of the resource). This + ensures that every waiter will get woken exactly once. + + Cancellation + + We could consider cancelling a request to be simply replacing the callback + with a dummy one that immediately releases the resource. However, if callers + keep cancelling then the list of cancelled requests would keep growing. + + Instead, we'd like cancellation simply to undo the effects of suspending, by + incrementing the counter and marking the cell as Finished (so that the + resumer will ignore it and move on to the next waiter, and the Finished + cell can be freed). + + If the cancelling user increments from a negative value then it is responsible + for waking one user, which is fine as it is waking itself. However, it may find + itself incrementing from a non-negative one if it is racing with a resumer + (if the count is non-negative then once all current operations finish there + would be no suspended users, so the process of waking this user must have + already begun). + + To handle this, a cancelling user first transitions the cell to In_transition, + then increments the counter, then transitions to the final Finished state, + in the usual case where it incremented from a negative value. + + If a resumer runs at the same time then it may also increment the counter + from a non-negative value and try to wake this cell. It transitions the cell + from In_transition to Finished. The cancelling user will notice this when it + fails to CAS to Finished and can handle it. + + If the cancelling user sees the Finished state after In_transition then it + knows that the resuming user has transferred to it the responsibility of + waking one user. If the cancelling user is also responsible for waking one + user then it performs an extra resume on behalf of the resuming user. + + Finally, if the cancelling user is not responsible for waking anyone (even + itself) then it leaves the cell in In_transition (the CQS paper uses a + separate Refused state, but we don't actually need that). This can only + happen when a resume is happening at the same time. The resumer will + transition to Finished, creating an obligation to resume, but we've just + done that anyway. We know this In_transition state can't last long because + at the moment when the canceller incremented the counter all current + waiters, including itself, were in the process of being resumed. *) + +module Cell = struct + type _ t = + | In_transition (* The suspender will try to CAS this soon. *) + | Request of (unit -> unit) (* Waiting for a resource. *) + | Finished (* Ownership of the resource has been transferred, + or the suspender cancelled. *) + + let init = In_transition + (* We only resume when we know another thread is suspended or in the process of suspending. *) + + let segment_order = 2 + + let dump f = function + | Request _ -> Fmt.string f "Request" + | Finished -> Fmt.string f "Finished" + | In_transition -> Fmt.string f "In_transition" +end + +module Cells = Cells.Make(Cell) + +type cell = unit Cell.t + +type t = { + state : int Atomic.t; (* Free resources. Negative if there are waiters waiting. *) + cells : unit Cells.t; +} + +type request = t * unit Cells.segment * unit Cell.t Atomic.t + +(* Wake one waiter (and give it the resource being released). *) +let rec resume t = + let cell = Cells.next_resume t.cells in + match (Atomic.exchange cell Finished : cell) with + | Request r -> + (* The common case: there was a waiter for the value. + We pass ownership of the resource to it. *) + r () + | Finished -> + (* The waiter has finished cancelling. Ignore it and resume the next one. *) + resume t + | In_transition -> + (* The consumer is in the middle of doing something and will soon try to + CAS to a new state. It will see that we got there first and handle the + resume when it's done. *) + () + +(* [true] on success, or [false] if we need to suspend. + You MUST call [suspend] iff this returns [false]. + The reason for splitting this is because e.g. [Semaphore] needs to get + the continuation for the fiber between [acquire] and [suspend]. *) +let acquire t = + let s = Atomic.fetch_and_add t.state (-1) in + (* We got a resource if we decremented *to* a non-negative number, + which happens if we decremented *from* a positive one. *) + s > 0 + +let suspend t k : request option = + let (segment, cell) = Cells.next_suspend t.cells in + if Atomic.compare_and_set cell In_transition (Request k) then Some (t, segment, cell) + else match Atomic.get cell with + | Finished -> + (* We got resumed before we could add the waiter. *) + k (); + None + | Request _ | In_transition -> + (* These are unreachable from the previously-observed non-In_transition state + without us taking some action first *) + assert false + +let release t = + let s = Atomic.fetch_and_add t.state (+1) in + if s < 0 then ( + (* We incremented from a negative value. + We are therefore responsible for waking one waiter. *) + resume t + ) + +let cancel (t, segment, cell) = + match (Atomic.get cell : cell) with + | Request _ as old -> + if Atomic.compare_and_set cell old In_transition then ( + (* Undo the effect of [acquire] by incrementing the counter. + As always, if we increment from a negative value then we need to resume one waiter. *) + let need_resume = Atomic.fetch_and_add t.state (+1) < 0 in + if need_resume then ( + if Atomic.compare_and_set cell In_transition Finished then ( + (* The normal case. We resumed ourself by cancelling. + This is the only case we need to tell the segment because in all + other cases the resumer has already reached this segment so + freeing it is pointless. *) + Cells.cancel_cell segment + ) else ( + (* [release] got called at the same time and it also needed to resume one waiter. + So we call [resume] to handle the extra one, in addition to resuming ourself. *) + resume t + ) + ) else ( + (* This can only happen if [release] ran at the same time and incremented the counter + before we did. Since we were suspended, and later we saw the counter + show that no one was, it must have decided to wake us. Either it has placed Finished + in the cell, or it's about to do so. Either way, we discharge the obligation to + wake someone by resuming ourself with a cancellation. + The resource returns to the free pool. We know the resumer has already finished with it + even if it hasn't updated the cell state yet. *) + ); + true + ) else false (* We got resumed first *) + | Finished -> false (* We got resumed first *) + | In_transition -> invalid_arg "Already cancelling!" + +let dump f t = + Fmt.pf f "Semaphore (state=%d)@,%a" + (Atomic.get t.state) + Cells.dump t.cells + +let create n = + if n < 0 then raise (Invalid_argument "n < 0"); + { + cells = Cells.make (); + state = Atomic.make n; + } diff --git a/lib_eio/semaphore.ml b/lib_eio/semaphore.ml index f44cf6572..2733be594 100644 --- a/lib_eio/semaphore.ml +++ b/lib_eio/semaphore.ml @@ -1,55 +1,42 @@ -type state = - | Free of int - | Waiting of unit Waiters.t - type t = { id : Ctf.id; - mutex : Mutex.t; - mutable state : state; + state : Sem_state.t; } let make n = - if n < 0 then raise (Invalid_argument "n < 0"); let id = Ctf.mint_id () in Ctf.note_created id Ctf.Semaphore; { id; - mutex = Mutex.create (); - state = Free n; + state = Sem_state.create n; } let release t = - Mutex.lock t.mutex; Ctf.note_signal t.id; - match t.state with - | Free x when x = max_int -> Mutex.unlock t.mutex; raise (Sys_error "semaphore would overflow max_int!") - | Free x -> t.state <- Free (succ x); Mutex.unlock t.mutex - | Waiting q -> - begin match Waiters.wake_one q () with - | `Ok -> () - | `Queue_empty -> t.state <- Free 1 - end; - Mutex.unlock t.mutex + Sem_state.release t.state -let rec acquire t = - Mutex.lock t.mutex; - match t.state with - | Waiting q -> - Ctf.note_try_read t.id; - Waiters.await ~mutex:(Some t.mutex) q t.id - | Free 0 -> - t.state <- Waiting (Waiters.create ()); - Mutex.unlock t.mutex; - acquire t - | Free n -> - Ctf.note_read t.id; - t.state <- Free (pred n); - Mutex.unlock t.mutex +let acquire t = + if not (Sem_state.acquire t.state) then ( + (* No free resources. + We must wait until one of the existing users increments the counter and resumes us. + It's OK if they resume before we suspend; we'll just pick up the token they left. *) + Suspend.enter_unchecked (fun ctx enqueue -> + match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with + | None -> () (* Already resumed *) + | Some request -> + Ctf.note_try_read t.id; + match Fiber_context.get_error ctx with + | Some ex -> + if Sem_state.cancel request then enqueue (Error ex); + (* else already resumed *) + | None -> + Fiber_context.set_cancel_fn ctx (fun ex -> + if Sem_state.cancel request then enqueue (Error ex) + (* else already resumed *) + ) + ) + ); + Ctf.note_read t.id let get_value t = - Mutex.lock t.mutex; - let s = t.state in - Mutex.unlock t.mutex; - match s with - | Free n -> n - | Waiting _ -> 0 + max 0 (Atomic.get t.state.state) diff --git a/lib_eio/tests/dscheck/dune b/lib_eio/tests/dscheck/dune index 731e0a119..ab707d76a 100644 --- a/lib_eio/tests/dscheck/dune +++ b/lib_eio/tests/dscheck/dune @@ -1,11 +1,17 @@ ; We copy cells.ml here so we can build it using TracedAtomic instead of the default one. (copy_files# (files ../../core/cells.ml)) +(copy_files# (files ../../sem_state.ml)) -(executable - (name test_cells) +(executables + (names test_cells test_semaphore) (libraries dscheck optint fmt)) (rule (alias dscheck) (package eio) (action (run %{exe:test_cells.exe}))) + +(rule + (alias dscheck) + (package eio) + (action (run %{exe:test_semaphore.exe}))) diff --git a/lib_eio/tests/dscheck/test_semaphore.ml b/lib_eio/tests/dscheck/test_semaphore.ml new file mode 100644 index 000000000..7ea049f4d --- /dev/null +++ b/lib_eio/tests/dscheck/test_semaphore.ml @@ -0,0 +1,49 @@ +let debug = false + +module T = Sem_state + +let test ~capacity ~users () = + let messages = ref [] in + let log fmt = (fmt ^^ "@.") |> Format.kasprintf @@ fun msg -> messages := msg :: !messages in + if debug then log "== start =="; + let t = T.create capacity in + let running = Atomic.make 0 in + let acquire fn = + if T.acquire t then (fn (); None) + else T.suspend t fn + in + for i = 1 to users do + Atomic.spawn (fun () -> + match + acquire (fun () -> + if debug then log "%d: got resource" i; + Atomic.incr running; + Atomic.decr running; + if debug then log "%d: released resource" i; + T.release t + ) + with + | None -> () + | Some request -> + if T.cancel request then ( + if debug then log "%d: cancelled request" i; + ) + ) + done; + Atomic.every (fun () -> assert (Atomic.get running <= capacity)); + Atomic.final (fun () -> + if debug then ( + List.iter print_string (List.rev !messages); + Fmt.pr "%a@." T.dump t; + ); + assert (Atomic.get t.state = capacity); + (* Do a dummy non-cancelled operation to ensure the pointers end up together: *) + T.resume t; + assert (T.suspend t ignore = None); + assert (T.Cells.Position.index t.cells.suspend = + T.Cells.Position.index t.cells.resume); + ) + +let () = + Atomic.trace (test ~capacity:1 ~users:3); + Atomic.trace (test ~capacity:2 ~users:3) diff --git a/lib_eio/tests/semaphore.md b/lib_eio/tests/semaphore.md new file mode 100644 index 000000000..19d708fe5 --- /dev/null +++ b/lib_eio/tests/semaphore.md @@ -0,0 +1,210 @@ +```ocaml +# #require "eio";; +``` +```ocaml +module T = Eio__Sem_state +let show t = Fmt.pr "%a@." T.dump t + +let acquire t label = + if T.acquire t then ( + Fmt.pr "%s: Acquired@." label; + None + ) else ( + T.suspend t (fun () -> Fmt.pr "%s: Acquired@." label) + ) +``` + +Initially we have a single segment full of empty cells. +Both resume and suspend pointers point at the first cell: + +```ocaml +# let t : T.t = T.create 1;; +val t : T.t = {T.state = ; cells = } +# show t;; +Semaphore (state=1) +Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition +End +- : unit = () +``` + +The first user can take the lock just by decrementing the state counter: + +```ocaml +# acquire t "a";; +a: Acquired +- : T.request option = None + +# show t;; +Semaphore (state=0) +Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition +End +- : unit = () +``` + +However, a second user must wait: + +```ocaml +# acquire t "b";;; +- : T.request option = +Some ({T.state = ; cells = }, , ) + +# show t;; +Semaphore (state=-1) +Segment 0 (prev=None, pointers=2, cancelled=0): + Request (resume) + In_transition (suspend) + In_transition + In_transition +End +- : unit = () +``` + +Same for a third user: + +```ocaml +# acquire t "c";;; +- : T.request option = +Some ({T.state = ; cells = }, , ) + +# show t;; +Semaphore (state=-2) +Segment 0 (prev=None, pointers=2, cancelled=0): + Request (resume) + Request + In_transition (suspend) + In_transition +End +- : unit = () +``` + +When the first user releases it, the second one runs: + +```ocaml +# T.release t;; +b: Acquired +- : unit = () + +# show t;; +Semaphore (state=-1) +Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + Request (resume) + In_transition (suspend) + In_transition +End +- : unit = () +``` + +When that finishes, the third one runs: + +```ocaml +# T.release t;; +c: Acquired +- : unit = () +``` + +The final release, with no waiters, just increments the state counter: + +```ocaml +# T.release t;; +- : unit = () + +# show t;; +Semaphore (state=1) +Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + Finished + In_transition (suspend) (resume) + In_transition +End +- : unit = () +``` + +## Cancellation + +"b" and "c" have to wait, as "a" has the resource: + +```ocaml +# let t : T.t = T.create 1;; +val t : T.t = {T.state = ; cells = } + +# acquire t "a";; +a: Acquired +- : T.request option = None + +# let b = acquire t "b" |> Option.get;; +val b : T.request = ({T.state = ; cells = }, , ) + +# let c = acquire t "c" |> Option.get;; +val c : T.request = ({T.state = ; cells = }, , ) + +# show t;; +Semaphore (state=-2) +Segment 0 (prev=None, pointers=2, cancelled=0): + Request (resume) + Request + In_transition (suspend) + In_transition +End +- : unit = () +``` + +Cancelling "b" increments the state counter and its request simply becomes Finished: + +```ocaml +# T.cancel b;; +- : bool = true + +# show t;; +Semaphore (state=-1) +Segment 0 (prev=None, pointers=2, cancelled=1): + Finished (resume) + Request + In_transition (suspend) + In_transition +End +- : unit = () +``` + +When "a" releases it, "c" is resumed: + +```ocaml +# T.release t;; +c: Acquired +- : unit = () + +# show t;; +Semaphore (state=0) +Segment 0 (prev=None, pointers=2, cancelled=1): + Finished + Finished + In_transition (suspend) (resume) + In_transition +End +- : unit = () +``` + +Finally, finishing "c" restores the state to 1: + +```ocaml +# T.release t;; +- : unit = () + +# show t;; +Semaphore (state=1) +Segment 0 (prev=None, pointers=2, cancelled=1): + Finished + Finished + In_transition (suspend) (resume) + In_transition +End +- : unit = () +``` diff --git a/tests/semaphore.md b/tests/semaphore.md new file mode 100644 index 000000000..53066f84e --- /dev/null +++ b/tests/semaphore.md @@ -0,0 +1,96 @@ +# Setting up the environment + +```ocaml +# #require "eio.mock";; +``` + +```ocaml +open Eio.Std + +module T = Eio.Semaphore + +let run fn = + Eio_mock.Backend.run @@ fun _ -> + fn () + +let acquire t = + traceln "Acquiring"; + T.acquire t; + traceln "Acquired" + +let release t = + traceln "Releasing"; + T.release t; + traceln "Released" +``` + +# Test cases + +Simple case: + +```ocaml +# run @@ fun () -> + let t = T.make 1 in + acquire t; + release t; + acquire t; + release t;; ++Acquiring ++Acquired ++Releasing ++Released ++Acquiring ++Acquired ++Releasing ++Released +- : unit = () +``` + +Concurrent access to the semaphore: + +```ocaml +# run @@ fun () -> + let t = T.make 2 in + let fn () = + acquire t; + Eio.Fiber.yield (); + release t + in + List.init 4 (fun _ -> fn) + |> Fiber.all;; ++Acquiring ++Acquired ++Acquiring ++Acquired ++Acquiring ++Acquiring ++Releasing ++Released ++Releasing ++Released ++Acquired ++Acquired ++Releasing ++Released ++Releasing ++Released +- : unit = () +``` + +Cancellation: + +```ocaml +# run @@ fun () -> + let t = T.make 0 in + Fiber.first + (fun () -> acquire t) + (fun () -> ()); + release t; + acquire t;; ++Acquiring ++Releasing ++Released ++Acquiring ++Acquired +- : unit = () +```