diff --git a/Makefile b/Makefile index 3c2df6abe..8b21ff30b 100644 --- a/Makefile +++ b/Makefile @@ -18,5 +18,8 @@ test_luv: rm -rf _build EIO_BACKEND=luv dune runtest +dscheck: + dune exec -- ./lib_eio/tests/dscheck/test_cells.exe + docker: docker build -t eio . diff --git a/dune-project b/dune-project index a339c2a84..f1e25a392 100644 --- a/dune-project +++ b/dune-project @@ -27,7 +27,8 @@ (astring (and (>= 0.8.5) :with-test)) (crowbar (and (>= 0.2) :with-test)) (mtime (>= 2.0.0)) - (alcotest (and (>= 1.4.0) :with-test)))) + (alcotest (and (>= 1.4.0) :with-test)) + (dscheck (and (>= 0.1.0) :with-test)))) (package (name eio_linux) (synopsis "Eio implementation for Linux using io-uring") diff --git a/eio.opam b/eio.opam index 6d1951f04..988aea473 100644 --- a/eio.opam +++ b/eio.opam @@ -22,6 +22,7 @@ depends: [ "crowbar" {>= "0.2" & with-test} "mtime" {>= "2.0.0"} "alcotest" {>= "1.4.0" & with-test} + "dscheck" {>= "0.1.0" & with-test} "odoc" {with-doc} ] conflicts: [ diff --git a/lib_eio/condition.ml b/lib_eio/condition.ml index 934f4c983..ab133898e 100644 --- a/lib_eio/condition.ml +++ b/lib_eio/condition.ml @@ -1,27 +1,33 @@ -type t = { - waiters: unit Waiters.t; - mutex: Mutex.t; - id: Ctf.id -} +type t = Broadcast.t -let create () = { - waiters = Waiters.create (); - id = Ctf.mint_id (); - mutex = Mutex.create (); -} +let create () = Broadcast.create () -let await t mutex = - Mutex.lock t.mutex; - Eio_mutex.unlock mutex; - match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with - | () -> Eio_mutex.lock mutex - | exception ex -> Eio_mutex.lock mutex; raise ex +let await_generic ?mutex t = + match + Suspend.enter_unchecked (fun ctx enqueue -> + match Fiber_context.get_error ctx with + | Some ex -> + Option.iter Eio_mutex.unlock mutex; + enqueue (Error ex) + | None -> + match Broadcast.suspend t (fun () -> enqueue (Ok ())) with + | None -> + Option.iter Eio_mutex.unlock mutex + | Some request -> + Option.iter Eio_mutex.unlock mutex; + Fiber_context.set_cancel_fn ctx (fun ex -> + if Broadcast.cancel request then enqueue (Error ex) + (* else already succeeded *) + ) + ) + with + | () -> Option.iter Eio_mutex.lock mutex + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + Option.iter Eio_mutex.lock mutex; + Printexc.raise_with_backtrace ex bt -let await_no_mutex t = - Mutex.lock t.mutex; - Waiters.await ~mutex:(Some t.mutex) t.waiters t.id +let await t mutex = await_generic ~mutex t +let await_no_mutex t = await_generic t -let broadcast t = - Mutex.lock t.mutex; - Waiters.wake_all t.waiters (); - Mutex.unlock t.mutex +let broadcast = Broadcast.resume_all diff --git a/lib_eio/core/broadcast.ml b/lib_eio/core/broadcast.ml new file mode 100644 index 000000000..82539e36e --- /dev/null +++ b/lib_eio/core/broadcast.ml @@ -0,0 +1,108 @@ +(* See the Cells module for an overview of this system. + + Each new waiter atomically increments the "suspend" pointer and writes + a callback there. The waking fiber removes all the callbacks and calls them. + In this version, "resume" never gets ahead of "suspend" (broadcasting just + brings it up-to-date with the "suspend" pointer). + + When the resume fiber runs, some of the cells reserved for callbacks might + not yet have been filled. In this case, the resuming fiber just marks them + as needing to be resumed. When the suspending fiber continues, it will + notice this and continue immediately. *) + +module Cell = struct + (* For any given cell, there are two actors running in parallel: the + suspender and the resumer. + + The resumer only performs a single operation (resume). + + The consumer waits to be resumed and then, optionally, cancels. + + This means we only have three cases to think about: + + 1. Consumer adds request (Empty -> Request). + 1a. Provider fulfills it (Request -> Resumed). + 1b. Consumer cancels it (Request -> Cancelled). + 2. Provider gets to cell first (Empty -> Resumed). + When the consumer tries to wait, it resumes immediately. + + The Resumed state should never been seen. It exists only to allow the + request to be GC'd promptly. We could replace it with Empty, but having + separate states is clearer for debugging. *) + + type _ t = + | Request of (unit -> unit) + | Cancelled + | Resumed + | Empty + + let init = Empty + + let segment_order = 2 + + let dump f = function + | Request _ -> Fmt.string f "Request" + | Empty -> Fmt.string f "Empty" + | Resumed -> Fmt.string f "Resumed" + | Cancelled -> Fmt.string f "Cancelled" +end + +module Cells = Cells.Make(Cell) + +type cell = unit Cell.t +type t = unit Cells.t + +type request = unit Cells.segment * cell Atomic.t + +let rec resume cell = + match (Atomic.get cell : cell) with + | Request r as cur -> + (* The common case: we have a waiter for the value *) + if Atomic.compare_and_set cell cur Resumed then r (); + (* else it was cancelled at the same time; ignore *) + | Empty -> + (* The consumer has reserved this cell but not yet stored the request. + We place Resumed there and it will handle it soon. *) + if Atomic.compare_and_set cell Empty Resumed then + () (* The consumer will deal with it *) + else + resume cell (* The Request was added concurrently; use it *) + | Cancelled -> () + | Resumed -> + (* This state is unreachable because we (the provider) haven't set this yet *) + assert false + +let cancel (segment, cell) = + match (Atomic.get cell : cell) with + | Request _ as old -> + if Atomic.compare_and_set cell old Cancelled then ( + Cells.cancel_cell segment; + true + ) else false (* We got resumed first *) + | Resumed -> false (* We got resumed first *) + | Cancelled -> invalid_arg "Already cancelled!" + | Empty -> + (* To call [cancel] the user needs a [request] value, + which they only get once we've reached the [Request] state. + [Empty] is unreachable from [Request]. *) + assert false + +let suspend t k = + let (_, cell) as request = Cells.next_suspend t in + if Atomic.compare_and_set cell Empty (Request k) then Some request + else match Atomic.get cell with + | Resumed -> + (* Resumed before we could add the waiter *) + k (); + None + | Cancelled | Request _ | Empty -> + (* These are unreachable from the previously-observed non-Empty state + without us taking some action first *) + assert false + +let resume_all t = + Cells.resume_all t resume + +let create = Cells.make + +let dump f t = Cells.dump f t diff --git a/lib_eio/core/broadcast.mli b/lib_eio/core/broadcast.mli new file mode 100644 index 000000000..2602caf9a --- /dev/null +++ b/lib_eio/core/broadcast.mli @@ -0,0 +1,37 @@ +(** A lock-free queue of waiters that should all be resumed at once. + + This uses {!Cells} internally. *) + +type t + +type request +(** A handle to a pending request that can be used to cancel it. *) + +val create : unit -> t +(** [create ()] is a fresh broadcast queue. *) + +val suspend : t -> (unit -> unit) -> request option +(** [suspend t fn] arranges for [fn ()] to be called on {!resume_all}. + + [fn ()] may be called from the caller's context, or by [resume_all], + so it needs to be able to cope with running in any context where that + can run. For example, [fn] must be safe to call from a signal handler + if [resume_all] can be called from one. [fn] must not raise. + + The returned request can be used to cancel. It can be [None] in the + (unlikely) event that [t] got resumed before the function returned. *) + +val resume_all : t -> unit +(** [resume_all t] calls all non-cancelled callbacks attached to [t], + in the order in which they were suspended. + + This function is lock-free and can be used safely even from a signal handler or GC finalizer. *) + +val cancel : request -> bool +(** [cancel request] attempts to remove a pending request. + + It returns [true] if the request was cancelled, or [false] if it got + resumed before that could happen. *) + +val dump : Format.formatter -> t -> unit +(** Display the internal state of a queue, for debugging. *) diff --git a/lib_eio/core/cells.ml b/lib_eio/core/cells.ml new file mode 100644 index 000000000..32f9fdb87 --- /dev/null +++ b/lib_eio/core/cells.ml @@ -0,0 +1,483 @@ +module type CELL = sig + type 'a t + val init : 'a t + val segment_order : int + val dump : _ t Fmt.t +end + +(* To avoid worrying about wrapping on 32-bit platforms, + we use 63-bit integers for indexes in all cases. + On 64-bit platforms, this is just [int]. *) +module Int63 = struct + include Optint.Int63 + + (* Fallback for 32-bit platforms. *) + let rec fetch_and_add_fallback t delta = + let old = Atomic.get t in + if Atomic.compare_and_set t old (add old (of_int delta)) then old + else fetch_and_add_fallback t delta + + let fetch_and_add : t Atomic.t -> int -> t = + match is_immediate with + | True -> Atomic.fetch_and_add + | False -> fetch_and_add_fallback +end + +module Make(Cell : CELL) = struct + let cells_per_segment = 1 lsl Cell.segment_order + let segment_mask = cells_per_segment - 1 + + (* An index identifies a cell. It is a pair of the segment ID and the offset + within the segment, packed into a single integer so we can increment it + atomically. *) + module Index : sig + type t + type segment_id = Int63.t + + val of_segment : segment_id -> t + (* [of_segment x] is the index of the first cell in segment [x]. *) + + val segment : t -> segment_id + val offset : t -> int + + val zero : t + val succ : t -> t + val pred : t -> t + + val next : t Atomic.t -> t + + (* val pp : t Fmt.t *) + end = struct + type t = Int63.t + type segment_id = Int63.t + + let segment t = Int63.shift_right_logical t Cell.segment_order + let of_segment id = Int63.shift_left id Cell.segment_order + + let offset t = Int63.to_int t land segment_mask + + let zero = Int63.zero + let succ = Int63.succ + let pred = Int63.pred + + let next t_atomic = + Int63.fetch_and_add t_atomic (+1) + + (* let pp f t = Fmt.pf f "%d:%d" (segment t) (offset t) *) + end + + (* A pair with counts for the number of cancelled cells in a segment and the + number of pointers to it, packed as an integer so it can be adjusted atomically. *) + module Count : sig + type t + + val create : pointers:int -> t + (* [create ~pointers] is a new counter for a segment. + Initially there are no cancelled cells. *) + + val removed : t -> bool + (* [removed t] is true if a segment with this count should be removed + (i.e. all cells are cancelled and it has no pointers). + Once this returns [true], it will always return [true] in future. *) + + val incr_cancelled : t -> bool + (* Increment the count of cancelled cells, then return [removed t] for the new state. *) + + val try_inc_pointers : t -> bool + (* Atomically increment the pointers count, unless [removed t]. + Returns [true] on success. *) + + val dec_pointers : t -> bool + (* Decrement the pointers count, then return [removed t] for the new state. *) + + val validate : expected_pointers:int -> t -> unit + (* [validate ~expected_pointers t] check that [t] is a valid count for a non-removed segment. *) + + val dump : t Fmt.t + end = struct + type t = int Atomic.t + + (* We use 16 bits for the cancelled count, which should be plenty. + The remaining bits (at least 15) are used for the pointer count, + which normally doesn't go above 2 (except temporarily, and limited + by the number of domains). *) + let () = assert (cells_per_segment < 0x10000) + + let v ~pointers ~cancelled = (pointers lsl 16) lor cancelled + let v_removed = v ~pointers:0 ~cancelled:cells_per_segment + let pointers v = v lsr 16 + let cancelled v = v land 0xffff + + let create ~pointers = Atomic.make (v ~pointers ~cancelled:0) + + let dump f t = + let v = Atomic.get t in + Fmt.pf f "pointers=%d, cancelled=%d" (pointers v) (cancelled v) + + let incr_cancelled t = + Atomic.fetch_and_add t 1 = v_removed - 1 + + let rec try_inc_pointers t = + let v = Atomic.get t in + if v = v_removed then false + else ( + if Atomic.compare_and_set t v (v + (1 lsl 16)) then true + else try_inc_pointers t + ) + + let dec_pointers t = + Atomic.fetch_and_add t (-1 lsl 16) = v_removed + (1 lsl 16) + + let removed t = + Atomic.get t = v_removed + + let validate ~expected_pointers t = + let v = Atomic.get t in + assert (cancelled v >= 0 && cancelled v <= cells_per_segment); + if cancelled v = cells_per_segment then assert (pointers v > 0); + if pointers v <> expected_pointers then + Fmt.failwith "Bad pointer count!" + end + + (* A segment is a node in a linked list containing an array of [cells_per_segment] cells. *) + module Segment : sig + type 'a t + + val make_init : unit -> 'a t + (* [make_init ()] is a new initial segment. *) + + val id : _ t -> Index.segment_id + + val get : 'a t -> int -> 'a Cell.t Atomic.t + (* [get t offset] is the cell at [offset] within [t]. *) + + val try_inc_pointers : _ t -> bool + (* Atomically increment the pointers count if the segment isn't removed. + Returns [true] on success, or [false] if the segment was removed first. *) + + val dec_pointers : _ t -> unit + (* Decrement the pointers count, removing the segment if it is no longer + needed. *) + + val find : 'a t -> Index.segment_id -> 'a t + (* [find t id] finds the segment [id] searching forwards from [t]. + + If the target segment has not yet been created, this creates it (atomically). + If the target segment has been removed, this returns the next non-removed segment. *) + + val clear_prev : _ t -> unit + (* Called when the resumer has reached this segment, + so it will never need to skip over any previous segments. + Therefore, the previous pointer is no longer required and + previous segments can be GC'd. *) + + val cancel_cell : _ t -> unit + (* Increment the cancelled-cells counter, and remove the segment if it is no longer useful. *) + + val validate : 'a t -> suspend:'a t -> resume:'a t -> unit + (* [validate t ~suspend ~resume] checks that [t] is in a valid state, + assuming there are no operations currently in progress. + [suspend] and [resume] are the segments of the suspend and resume pointers. + It checks that both are reachable from [t]. *) + + val dump_list : label:Index.t Fmt.t -> 'a t Fmt.t + (* [dump_list] formats this segment and all following ones for debugging. + @param label Used to annotate indexes. *) + end = struct + type 'a t = { + id : Index.segment_id; + count : Count.t; + cells : 'a Cell.t Atomic.t array; + prev : 'a t option Atomic.t; (* None if first, or [prev] is no longer needed *) + next : 'a t option Atomic.t; (* None if not yet created *) + } + + let id t = t.id + + let get t i = Array.get t.cells i + + let pp_id f t = Int63.pp f t.id + + let dump_cells ~label f t = + let idx = ref (Index.of_segment t.id) in + for i = 0 to Array.length t.cells - 1 do + Fmt.pf f "@,%a" Cell.dump (Atomic.get t.cells.(i)); + label f !idx; + idx := Index.succ !idx + done + + let rec dump_list ~label f t = + Fmt.pf f "@[Segment %a (prev=%a, %a):%a@]" + pp_id t + (Fmt.Dump.option pp_id) (Atomic.get t.prev) + Count.dump t.count + (dump_cells ~label) t; + let next = Atomic.get t.next in + begin match next with + | Some next when next.id = Int63.succ t.id -> + () (* We'll show the labels at the start of the next segment *) + | _ -> + Fmt.pf f "@,End%a" + label (Index.of_segment (Int63.succ t.id)) + end; + Option.iter (fun next -> Fmt.cut f (); dump_list ~label f next) next + + let next t = + match Atomic.get t.next with + | Some s -> s + | None -> + let next = { + id = Int63.succ t.id; + count = Count.create ~pointers:0; + cells = Array.init cells_per_segment (fun (_ : int) -> Atomic.make Cell.init); + next = Atomic.make None; + prev = Atomic.make (Some t); + } in + if Atomic.compare_and_set t.next None (Some next) then next + else Atomic.get t.next |> Option.get + + let removed t = + Count.removed t.count + + (* Get the previous non-removed segment, if any. *) + let rec alive_prev t = + match Atomic.get t.prev with + | Some prev when removed prev -> alive_prev prev + | x -> x + + (* Get the next non-removed segment. *) + let alive_next t = + let next = Atomic.get t.next |> Option.get in + let rec live x = + if removed x then ( + match Atomic.get x.next with + | Some next -> live next + | None -> x (* The paper says to return "tail if all are removed", but can that ever happen? *) + ) else x + in + live next + + (* Remove [t] from the linked-list by splicing together + the previous live segment before us to the next live one afterwards. + The tricky case is when two adjacent segments get removed at the same time. + If that happens, the next and prev lists will still always be valid + (i.e. will include all live segments, in the correct order), but may not be optimal. + However, we will detect that case when it happens and fix it up immediately. *) + let rec remove t = + if Atomic.get t.next = None then () (* Can't remove tail. This shouldn't happen anyway. *) + else ( + let prev = alive_prev t + and next = alive_next t in + (* [prev] might have been removed by the time we do this, but it doesn't matter, + we're still only skipping removed segments (just not as many as desired). + We'll fix it up afterwards in that case. *) + Atomic.set next.prev prev; + (* Likewise [next] might have been removed too by now, but we'll correct later. *) + Option.iter (fun prev -> Atomic.set prev.next (Some next)) prev; + (* If either got removed by now, start again. *) + if removed next && Atomic.get next.next <> None then remove t + else match prev with + | Some prev when removed prev -> remove t + | _ -> () + ) + + let try_inc_pointers t = + Count.try_inc_pointers t.count + + let dec_pointers t = + if Count.dec_pointers t.count then remove t + + let cancel_cell t = + if Count.incr_cancelled t.count then remove t + + let rec find start id = + if start.id >= id && not (removed start) then start + else find (next start) id + + let make_init () = + { + id = Int63.zero; + count = Count.create ~pointers:2; + cells = Array.init cells_per_segment (fun (_ : int) -> Atomic.make Cell.init); + next = Atomic.make None; + prev = Atomic.make None; + } + + (* Note: this assumes the system is at rest (no operations in progress). *) + let rec validate t ~suspend ~resume ~seen_pointers = + let expected_pointers = + (if t == suspend then 1 else 0) + + (if t == resume then 1 else 0) + in + Count.validate ~expected_pointers t.count; + let seen_pointers = seen_pointers + expected_pointers in + match Atomic.get t.next with + | None -> assert (seen_pointers = 2) + | Some next -> + begin match Atomic.get next.prev with + | None -> assert (resume.id >= next.id) + | Some t2 -> assert (resume.id < next.id && t == t2) + end; + validate next ~suspend ~resume ~seen_pointers + + let validate = validate ~seen_pointers:0 + + let clear_prev t = + Atomic.set t.prev None + end + + (* A mutable pointer into the list of cells. *) + module Position : sig + type 'a t + + val of_segment : 'a Segment.t -> 'a t + (* [of_segment x] is a pointer to the first cell in [x]. *) + + val next : clear_prev:bool -> 'a t -> 'a Segment.t * 'a Cell.t Atomic.t + (* [next t ~clear_prev] returns the segment and cell of [t], and atomically increments it. + If [t]'s segment is all cancelled and no longer exists it will skip it and retry. + If [clear_prev] then the previous pointer is no longer required. *) + + val resume_all : 'a t -> stop:Index.t -> ('a Cell.t Atomic.t -> unit) -> unit + (* [resume_all t ~stop f] advances [t] to [stop], then calls [f cell] on each cell advanced over. *) + + val index : _ t -> Index.t + (* [index t] is the index of the cell currently pointed-to by [t]. *) + + val segment : 'a t -> 'a Segment.t + (* For debugging only. The segment containing the previously-returned cell (or the initial segment), + when the system is at rest. *) + end = struct + type 'a t = { + segment : 'a Segment.t Atomic.t; (* Note: can lag [idx] *) + idx : Index.t Atomic.t; + } + + let segment t = Atomic.get t.segment + let index t = Atomic.get t.idx + + let of_segment segment = + { + segment = Atomic.make segment; + idx = Atomic.make Index.zero; + } + + (* Set [t.segment] to [target] if [target] is ahead of us. + Returns [false] if [target] gets removed first. *) + let rec move_forward t (target : _ Segment.t) = + let cur = Atomic.get t.segment in + if Segment.id cur >= Segment.id target then true + else ( + if not (Segment.try_inc_pointers target) then false (* target already removed *) + else ( + if Atomic.compare_and_set t.segment cur target then ( + Segment.dec_pointers cur; + true + ) else ( + (* Concurrent update of [t]. Undo ref-count changes and retry. *) + Segment.dec_pointers target; + move_forward t target + ) + ) + ) + + (* Update [t] to the segment [id] (or the next non-removed segment after it). *) + let rec find_and_move_forward t start id = + let target = Segment.find start id in + if move_forward t target then target + else find_and_move_forward t start id (* Removed before we could increase the ref-count; rety *) + + let rec next ~clear_prev t = + (* Get the segment first before the index. Even if [idx] moves forwards after this, + we'll still be able to reach it from [r]. *) + let r = Atomic.get t.segment in + let i = Index.next t.idx in + let id = Index.segment i in + let s = find_and_move_forward t r id in + if clear_prev then Segment.clear_prev s; + if Segment.id s = id then ( + (s, Segment.get s (Index.offset i)) + ) else ( + (* The segment we wanted contains only cancelled cells. + Try to update the index to jump over those cells, then retry. *) + let s_index = Index.of_segment (Segment.id s) in + ignore (Atomic.compare_and_set t.idx (Index.succ i) s_index : bool); + next ~clear_prev t + ) + + let rec resume_all t ~stop f = + (* Get the segment first before the index. Even if [idx] moves forwards after this, + we'll still be able to reach it from [start_seg]. *) + let start_seg = Atomic.get t.segment in + let start = Atomic.get t.idx in + if start >= stop then () + else if not (Atomic.compare_and_set t.idx start stop) then ( + resume_all t ~stop f + ) else ( + (* We are now responsible for resuming all cells from [start] to [stop]. *) + (* Move [t.segment] forward so we can free older segments now. *) + ignore (find_and_move_forward t start_seg (Index.segment (Index.pred stop)) : _ Segment.t); + (* Resume all cells from [i] to [stop] (reachable via [seg]): *) + let rec aux seg i = + if i < stop then ( + let seg = Segment.find seg (Index.segment i) in + Segment.clear_prev seg; + let seg_start = Index.of_segment (Segment.id seg) in + if seg_start < stop then ( + let i = max i seg_start in + f (Segment.get seg (Index.offset i)); + aux seg (Index.succ i) + ) + ) + in + aux start_seg start + ) + end + + type 'a t = { + resume : 'a Position.t; + suspend : 'a Position.t; + } + + type 'a segment = 'a Segment.t + + let next_suspend t = + Position.next t.suspend ~clear_prev:false + + let next_resume t = + snd @@ Position.next t.resume ~clear_prev:true + + let resume_all t f = + Position.resume_all t.resume ~stop:(Position.index t.suspend) f + + let cancel_cell = Segment.cancel_cell + + let make () = + let init = Segment.make_init () in + { + resume = Position.of_segment init; + suspend = Position.of_segment init; + } + + let validate t = + let suspend = Position.segment t.suspend in + let resume = Position.segment t.resume in + let start = + if Segment.id suspend < Segment.id resume then suspend + else resume + in + Segment.validate start ~suspend ~resume + + let dump f t = + let suspend = Position.index t.suspend in + let resume = Position.index t.resume in + let start = + if suspend < resume then t.suspend + else t.resume + in + let label f i = + if i = suspend then Format.pp_print_string f " (suspend)"; + if i = resume then Format.pp_print_string f " (resume)"; + in + Format.fprintf f "@[%a@]" (Segment.dump_list ~label) (Position.segment start) +end diff --git a/lib_eio/core/cells.mli b/lib_eio/core/cells.mli new file mode 100644 index 000000000..89aa630b1 --- /dev/null +++ b/lib_eio/core/cells.mli @@ -0,0 +1,111 @@ +(** A lock-free queue-like structure with suspension and cancellation. + + This module provides an infinite sequence of atomic cells, which can be used for whatever you like. + There are two pointers into this sequence: a suspend (consumer) pointer and a resume (producer) pointer. + These are similar to the head and tail pointers in a traditional queue, + except that the consumer is also permitted to get ahead of the producer. + + To use this as a plain queue, each producer calls {!Make.next_resume} to get the + cell at the resume (tail) pointer (and advance it atomically), then stores + its value in the cell. Each consumer calls {!Make.next_suspend} to get the next + cell at the head of the queue (and advance the suspend pointer). + + The consumer/suspender is permitted to get ahead of the producer. In this + case, the consumer will CAS the cell from its initial state to a Request + state containing a callback to receive the value when it arrives. When a + producer later tries to CAS the cell from the initial state to holding a + value, it will fail and find the Request with the callback function + instead. It can then provide the value directly to the callback. + + A suspender can be cancelled by CASing the Request to a Cancelled state. + It should also call {!Make.cancel_cell} (if the CAS succeeds), to allow the cell to be freed. + If a resumer's CAS fails because the cell is cancelled, it can retry with a fresh cell. + + For efficiency, cells are grouped into segments, which are stored in a linked list. + Once all the cells in a segment are cancelled, the whole segment may be freed. + + This is based on {{:https://arxiv.org/pdf/2111.12682.pdf}A formally-verified + framework for fair synchronization in kotlin coroutines, Appendix C}, + which contains more details and examples of use. + + This module also adds the {!Make.resume_all} function, which is useful for broadcasting. +*) + +(** The signature for user-defined cell contents. *) +module type CELL = sig + type 'a t + + val init : 'a t + (** The value to give newly-allocated cells. *) + + val segment_order : int + (** The number of bits to use for the offset into the segment. + + The number of cells per segment is [2 ** segment_order]. *) + + val dump : _ t Fmt.t + (** Display the cell state for debugging. *) +end + +module Make(Cell : CELL) : sig + type 'a t + + type 'a segment + + val make : unit -> 'a t + (** [make ()] is a fresh sequence of cells. *) + + val next_suspend : 'a t -> 'a segment * 'a Cell.t Atomic.t + (** [next_suspend t] atomically returns the next suspend cell and its segment. + + If multiple domains call this at the same time, they will each get a different location. + + The cell might or might not have already been filled in by a resumer. + You need to handle both cases (typically by using {!Atomic.compare_and_set}). + + The segment can be used with {!cancel_cell}. + + This function is lock-free and is safe to call even from a signal handler or GC finalizer. *) + + val next_resume : 'a t -> 'a Cell.t Atomic.t + (** [next_resume t] atomically returns the next resume cell. + + If multiple domains call this at the same time, they will each get a different cell. + + The cell might or might not contain a request from a suspender that got there first. + You need to handle both cases (typically by using {!Atomic.compare_and_set}). + + Note: cancelled cells may or may not be skipped (you need to handle the case of the + cell you get being cancelled before you can write to it, but you also + can't rely on seeing every cancelled cell, as cancelled segments may be deleted). + + This function is lock-free and is safe to call even from a signal handler or GC finalizer. *) + + val resume_all : 'a t -> ('a Cell.t Atomic.t -> unit) -> unit + (** [resume_all t f] advances the resume position to the current suspend position, + then calls [f cell] on each cell advanced over. + + Note: as with {!next_resume}, [f] may be called for some cancelled cells but not others. + + [f] must not raise an exception (if it does, it will not be called on the remaining cells). + + If the resume position is ahead of the suspend position, then calling this function does nothing. + + This function is lock-free and is safe to call even from a signal handler or GC finalizer. *) + + val cancel_cell : 'a segment -> unit + (** [cancel_cell segment] increments the segment's count of the number of cancelled cells. + + Once all cells are cancelled it may be possible to discard the whole segment. + This avoids leaking memory if a user keeps suspending and then cancelling. + + You must not call this more than once per cell. + + This function is lock-free and is safe to call even from a signal handler or GC finalizer. *) + + val validate : _ t -> unit + (** [validate t] checks that [t] is in a valid state, assuming there are no operations currently in progress. *) + + val dump : _ t Fmt.t + (** [dump] outputs the internal state of a [_ t], for debugging. *) +end diff --git a/lib_eio/core/dune b/lib_eio/core/dune index 1627dfeeb..4063be78f 100644 --- a/lib_eio/core/dune +++ b/lib_eio/core/dune @@ -1,4 +1,4 @@ (library (name eio__core) (public_name eio.core) - (libraries cstruct hmap lwt-dllist fmt)) + (libraries cstruct hmap lwt-dllist fmt optint)) diff --git a/lib_eio/core/eio__core.ml b/lib_eio/core/eio__core.ml index cdca63500..a5d4e4b2c 100644 --- a/lib_eio/core/eio__core.ml +++ b/lib_eio/core/eio__core.ml @@ -5,6 +5,8 @@ module Cancel = Cancel module Exn = Exn module Private = struct module Suspend = Suspend + module Cells = Cells + module Broadcast = Broadcast module Waiters = Waiters module Ctf = Ctf module Fiber_context = Cancel.Fiber_context diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index 3df61e565..91d3f7705 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -559,6 +559,9 @@ end module Private : sig module Ctf = Ctf + module Cells = Cells + module Broadcast = Broadcast + (** Every fiber has an associated context. *) module Fiber_context : sig type t diff --git a/lib_eio/tests/broadcast.md b/lib_eio/tests/broadcast.md new file mode 100644 index 000000000..441382f0a --- /dev/null +++ b/lib_eio/tests/broadcast.md @@ -0,0 +1,163 @@ +```ocaml +# #require "eio";; +``` +```ocaml +module T = Eio__core__Broadcast +let show t = Fmt.pr "%a@." T.dump t +let fiber name () = Fmt.pr "%s: woken@." name +``` + +Initially we have a single segment full of empty cells. +Both resume and suspend pointers point at the first cell: + +```ocaml +# let t : T.t = T.create ();; +val t : T.t = +# show t;; +Segment 0 (prev=None, pointers=2, cancelled=0): + Empty (suspend) (resume) + Empty + Empty + Empty +End +- : unit = () +``` + +## Waking an empty queue + +Broadcasting with no waiters does nothing: + +```ocaml +# T.resume_all t; show t;; +Segment 0 (prev=None, pointers=2, cancelled=0): + Empty (suspend) (resume) + Empty + Empty + Empty +End +- : unit = () +``` + +## Adding waiters + +Requesting a wake-up adds requests to the queue: +```ocaml +# T.suspend t (fiber "0");; +- : T.request option = Some + +# show t;; +Segment 0 (prev=None, pointers=2, cancelled=0): + Request (resume) + Empty (suspend) + Empty + Empty +End +- : unit = () +``` +The returned request is to allow us to cancel if desired. + +Filling the segment: +```ocaml +# let suspend name = T.suspend t (fiber name);; +val suspend : string -> T.request option = + +# for i = 1 to 3 do suspend (string_of_int i) |> Option.get |> ignore done;; +- : unit = () + +# show t;; +Segment 0 (prev=None, pointers=2, cancelled=0): + Request (resume) + Request + Request + Request +End (suspend) +- : unit = () +``` + +Allocating new segments: +```ocaml +# let reqs = List.init 5 (fun i -> suspend (string_of_int i) |> Option.get);; +val reqs : T.request list = [; ; ; ; ] + +# show t;; +Segment 0 (prev=None, pointers=1, cancelled=0): + Request (resume) + Request + Request + Request +Segment 1 (prev=Some 0, pointers=0, cancelled=0): + Request + Request + Request + Request +Segment 2 (prev=Some 1, pointers=1, cancelled=0): + Request + Empty (suspend) + Empty + Empty +End +- : unit = () +``` + +Cancelling all the cells in a segment removes the segment: +```ocaml +# List.iter (fun r -> assert (T.cancel r)) reqs; show t;; +Segment 0 (prev=None, pointers=1, cancelled=0): + Request (resume) + Request + Request + Request +End +Segment 2 (prev=Some 0, pointers=1, cancelled=1): + Cancelled + Empty (suspend) + Empty + Empty +End +- : unit = () +``` + +```ocaml +# suspend "last";; +- : T.request option = Some + +# T.resume_all t;; +0: woken +1: woken +2: woken +3: woken +last: woken +- : unit = () + +# show t;; +Segment 2 (prev=None, pointers=2, cancelled=1): + Cancelled + Resumed + Empty (suspend) (resume) + Empty +End +- : unit = () +``` + +Resume all, filling segment: + +```ocaml +# suspend "a";; +- : T.request option = Some +# suspend "b";; +- : T.request option = Some + +# T.resume_all t;; +a: woken +b: woken +- : unit = () + +# show t;; +Segment 2 (prev=None, pointers=2, cancelled=1): + Cancelled + Resumed + Resumed + Resumed +End (suspend) (resume) +- : unit = () +``` diff --git a/lib_eio/tests/dscheck/atomic.ml b/lib_eio/tests/dscheck/atomic.ml new file mode 100644 index 000000000..4563b4a4f --- /dev/null +++ b/lib_eio/tests/dscheck/atomic.ml @@ -0,0 +1 @@ +include Dscheck.TracedAtomic diff --git a/lib_eio/tests/dscheck/dune b/lib_eio/tests/dscheck/dune new file mode 100644 index 000000000..731e0a119 --- /dev/null +++ b/lib_eio/tests/dscheck/dune @@ -0,0 +1,11 @@ +; We copy cells.ml here so we can build it using TracedAtomic instead of the default one. +(copy_files# (files ../../core/cells.ml)) + +(executable + (name test_cells) + (libraries dscheck optint fmt)) + +(rule + (alias dscheck) + (package eio) + (action (run %{exe:test_cells.exe}))) diff --git a/lib_eio/tests/dscheck/simple_cqs.ml b/lib_eio/tests/dscheck/simple_cqs.ml new file mode 100644 index 000000000..bd2d99aa0 --- /dev/null +++ b/lib_eio/tests/dscheck/simple_cqs.ml @@ -0,0 +1,62 @@ +(* A queue built on cells.ml using the "simple" cancellation mode, + where resuming a cancelled request does nothing instead of retrying. *) + +module Make(Config : sig val segment_order : int end) = struct + module Cell = struct + type _ t = + | Empty + | Value of int + | Waiting of (int -> unit) + | Cancelled + | Finished + + let init = Empty + + let segment_order = Config.segment_order + + let dump f = function + | Empty -> Fmt.string f "Empty" + | Value v -> Fmt.pf f "Value %d" v + | Waiting _ -> Fmt.string f "Waiting" + | Cancelled -> Fmt.string f "Cancelled" + | Finished -> Fmt.string f "Finished" + end + + module Cells = Cells.Make(Cell) + + let cancel (segment, cell) = + match Atomic.get cell with + | Cell.Waiting _ as prev -> + if Atomic.compare_and_set cell prev Cancelled then ( + Cells.cancel_cell segment; + true + ) else ( + false + ) + | Finished -> false + | _ -> assert false + + let resume t v = + let cell = Cells.next_resume t in + if not (Atomic.compare_and_set cell Cell.Empty (Value v)) then ( + match Atomic.get cell with + | Waiting w as prev -> + if Atomic.compare_and_set cell prev Finished then w v + (* else cancelled *) + | Cancelled -> () + | Empty | Value _ | Finished -> assert false + ) + + let suspend t k = + let segment, cell = Cells.next_suspend t in + if Atomic.compare_and_set cell Cell.Empty (Waiting k) then Some (segment, cell) + else ( + match Atomic.get cell with + | Value v -> Atomic.set cell Finished; k v; None + | Cancelled | Empty | Waiting _ | Finished -> assert false + ) + + let make = Cells.make + + let dump = Cells.dump +end diff --git a/lib_eio/tests/dscheck/test_cells.ml b/lib_eio/tests/dscheck/test_cells.ml new file mode 100644 index 000000000..a19b8618e --- /dev/null +++ b/lib_eio/tests/dscheck/test_cells.ml @@ -0,0 +1,207 @@ +let debug = false + +(* For each of [n_values], spawn one producer and one consumer. + If the consumer has to wait, it will also try to cancel. + The consumer increases the total if it gets a value; + the producer increments it if the cancellation succeeds instead. *) +let test_cells ~segment_order ~n_values () = + let module Cqs = Simple_cqs.Make(struct let segment_order = segment_order end) in + let expected_total = n_values in + let t = Cqs.make () in + let total = ref 0 in + for i = 1 to n_values do + Atomic.spawn (fun () -> + if debug then Fmt.epr "%d: wrote value@." i; + Cqs.resume t 1; + ); + Atomic.spawn (fun () -> + match + Cqs.suspend t (fun v -> + if debug then Fmt.epr "%d: resumed@." i; + total := !total + v + ) + with + | None -> () (* Already resumed *) + | Some request -> + if Cqs.cancel request then ( + if debug then Fmt.epr "%d: cancelled@." i; + total := !total + 1 + ) + ); + done; + Atomic.final + (fun () -> + if debug then ( + Format.eprintf "%a@." Cqs.dump t; + Format.eprintf "total=%d, expected_total=%d\n%!" !total expected_total; + ); + Cqs.Cells.validate t; + assert (!total = expected_total); + (* Printf.printf "total = %d\n%!" !total *) + ) + +(* An even simpler cell type with no payload. Just for testing removing whole cancelled segments. *) +module Unit_cells(Config : sig val segment_order : int end) = struct + module Cell = struct + type _ t = + | Empty (* A consumer is intending to collect a value in the future *) + | Value (* A value is waiting for its consumer *) + | Cancelled (* The consumer cancelled *) + + let init = Empty + + let segment_order = Config.segment_order + + let dump f = function + | Empty -> Fmt.string f "Empty" + | Value -> Fmt.string f "Value" + | Cancelled -> Fmt.string f "Cancelled" + end + module Cells = Cells.Make(Cell) + + type request = unit Cells.segment * unit Cell.t Atomic.t + + let cancel (segment, cell) = + if Atomic.compare_and_set cell Cell.Empty Cancelled then ( + Cells.cancel_cell segment; + true + ) else false (* Already at [Value]; cancellation fails. *) + + (* Provide a value. Returns [false] if already [Cancelled]. *) + let resume t = + let cell = Cells.next_resume t in + Atomic.compare_and_set cell Empty Value + + (* We reuse the [Empty] state to mean [Waiting]. *) + let suspend t : request = Cells.next_suspend t + + let resume_all t = + Cells.resume_all t + + let make = Cells.make + let dump f t = Atomic.check (fun () -> Cells.dump f t; true) + let validate t = Atomic.check (fun () -> Cells.validate t; true) +end + +(* A producer writes [n_items] to the queue (retrying if the cell gets cancelled first). + A consumer reads [n_items] from the queue (cancelling and retrying once if it can). + At the end, the consumer and resumer are at the same position. + This tests what happens if a whole segment gets cancelled and the producer therefore skips it. + [test_cells] is too slow to test this. *) +let test_skip_segments ~segment_order ~n_items () = + let module Cells = Unit_cells(struct let segment_order = segment_order end) in + if debug then print_endline "== start =="; + let t = Cells.make () in + Atomic.spawn (fun () -> + for _ = 1 to n_items do + let rec loop ~may_cancel = + if debug then print_endline "suspend"; + let request = Cells.suspend t in + if may_cancel && Cells.cancel request then ( + if debug then print_endline "cancelled"; + loop ~may_cancel:false + ) + in + loop ~may_cancel:true + done + ); + Atomic.spawn (fun () -> + for _ = 1 to n_items do + if debug then print_endline "resume"; + while not (Cells.resume t) do () done + done + ); + Atomic.final + (fun () -> + if debug then Fmt.pr "%a@." Cells.dump t; + Cells.Cells.validate t; + assert (Cells.Cells.Position.index t.suspend = + Cells.Cells.Position.index t.resume); + ) + +(* Create a list of [n_internal + 2] segments and cancel all the internal ones. + Ensure the list is valid afterwards. + This is simpler than [test_skip_segments], so we can test longer sequences + of cancellations. *) +let test_cancel_only ~n_internal () = + let module Cells = Unit_cells(struct let segment_order = 0 end) in + let t = Cells.make () in + ignore (Cells.suspend t : Cells.request); + let internals = Array.init n_internal (fun _ -> Cells.suspend t) in + ignore (Cells.suspend t : Cells.request); + let in_progress = ref 0 in + for i = 0 to n_internal - 1 do + Atomic.spawn (fun () -> + incr in_progress; + assert (Cells.cancel internals.(i)); + decr in_progress; + if !in_progress = 0 then Cells.validate t + ) + done; + Atomic.final + (fun () -> + assert (Cells.resume t); + assert (Cells.resume t); + if debug then Fmt.pr "%a@." Cells.dump t; + Cells.validate t; + assert (Cells.Cells.Position.index t.suspend = + Cells.Cells.Position.index t.resume); + ) + +(* Create [n] requests. Then try to cancel them in parallel with doing a resume_all. + Check the number of resumed requests is plausible (at least as many as there + were requests that hadn't started cancelling, and no more than those that hadn't + finished cancelling. *) +let test_broadcast ~segment_order ~n () = + let messages = ref [] in + let log fmt = (fmt ^^ "@.") |> Format.kasprintf @@ fun msg -> messages := msg :: !messages in + if debug then log "== start =="; + let module Cells = Unit_cells(struct let segment_order = segment_order end) in + let t = Cells.make () in + let requests = Array.init n (fun _ -> Cells.suspend t) in + let min_requests = Atomic.make n in + let max_requests = Atomic.make n in + for i = 0 to n - 1 do + Atomic.spawn (fun () -> + Atomic.decr min_requests; + if debug then log "Cancelling request"; + if Cells.cancel requests.(i) then ( + Atomic.decr max_requests; + if debug then log "Cancelled request"; + ) + ) + done; + Atomic.spawn (fun () -> + if debug then log "Broadcasting"; + let max_expected = Atomic.get max_requests in + let wakes = ref 0 in + Cells.resume_all t (fun cell -> + match Atomic.get cell with + | Empty -> incr wakes + | Cancelled -> () + | Value -> assert false + ); + let min_expected = Atomic.get min_requests in + let wakes = !wakes in + if debug then log "Broadcast done: wakes=%d (expected=%d-%d)" wakes min_expected max_expected; + assert (min_expected <= wakes && wakes <= max_expected) + ); + Atomic.final (fun () -> + if debug then ( + List.iter print_string (List.rev !messages) + ) + ) + +(* These tests take about 10s on my machine, with https://github.com/ocaml-multicore/dscheck/pull/3 + However, that PR is not reliable at finding all interleavings. *) +let () = + print_endline "Test broadcast:"; + Atomic.trace (test_broadcast ~segment_order:1 ~n:3); + print_endline "Test cancelling segments:"; + Atomic.trace (test_cancel_only ~n_internal:3); + print_endline "Test cancelling segments while suspending and resuming:"; + Atomic.trace (test_skip_segments ~segment_order:1 ~n_items:3); + print_endline "Test with 1 cell per segment:"; + Atomic.trace (test_cells ~segment_order:0 ~n_values:2); + print_endline "Test with 2 cells per segment:"; + Atomic.trace (test_cells ~segment_order:1 ~n_values:2) diff --git a/lib_eio/tests/dune b/lib_eio/tests/dune new file mode 100644 index 000000000..090c3e939 --- /dev/null +++ b/lib_eio/tests/dune @@ -0,0 +1,3 @@ +(mdx + (package eio) + (packages eio)) diff --git a/tests/condition.md b/tests/condition.md index a759894b0..9d1459e57 100644 --- a/tests/condition.md +++ b/tests/condition.md @@ -113,6 +113,32 @@ let await p = - : unit = () ``` +Cancellation while waiting: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + Fiber.first + (fun () -> + await ((=) 0); + assert false; + ) + (fun () -> ()); + Fiber.both + (fun () -> + traceln "x = %d" !x; + await ((=) 0); + traceln "x = %d" !x + ) + (fun () -> + set 5; + Fiber.yield (); + set 0; + );; ++x = 42 ++x = 0 +- : unit = () +``` + ## Use with mutex ```ocaml