Skip to content

Commit

Permalink
Allow closing synchronous streams
Browse files Browse the repository at this point in the history
This isn't currently exposed in the public interface.
  • Loading branch information
talex5 committed Nov 13, 2023
1 parent 9537ca1 commit 4b627b1
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 73 deletions.
7 changes: 5 additions & 2 deletions lib_eio/stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,15 @@ let add t v =
| Locking x -> Locking.add x v

let take = function
| Sync x -> Sync.take x
| Sync x -> Sync.take x |> Result.get_ok (* todo: allow closing streams *)
| Locking x -> Locking.take x

let take_nonblocking = function
| Sync x -> Sync.take_nonblocking x
| Locking x -> Locking.take_nonblocking x
| Sync x ->
match Sync.take_nonblocking x with
| Ok x -> Some x
| Error `Closed | Error `Would_block -> None

let length = function
| Sync _ -> 0
Expand Down
220 changes: 162 additions & 58 deletions lib_eio/sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@
the consumer sets its cell to a request with a dummy callback that rejects
all values and continues immediately.
Close
The LSB of the balance atomic is used to indicate that the stream has been closed.
When closed, the balance is always zero and no new consumers or producers can be added.
The closing thread is responsible for cancelling all pre-existing users.
The exchange
Once a producer and consumer have been paired off (and so their cell is now Finished),
Expand All @@ -137,9 +143,14 @@ module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend
module Cancel = Eio__core.Cancel

type producer_result =
| Sent (* Consumer accepted item. *)
| Rejected (* Consumer rejected the item. Retry. *)
| Failed of exn (* Cancelled or closed. *)

type 'a item = {
v : 'a;
kp : (bool, exn) result -> unit; (* [Ok false] means consumer refused the item; retry. *)
v : ('a, [`Closed]) result;
kp : producer_result -> unit;
cancel : [
| `Resuming (* In the process of resuming, so can't cancel. *)
| `Suspended of (unit -> bool) (* Call this function to attempt to leave the queue. *)
Expand All @@ -149,7 +160,7 @@ type 'a item = {

type 'a cell =
| In_transition
| Slot of ('a -> bool)
| Slot of (('a, [`Closed]) result -> bool)
| Item of 'a item
| Finished

Expand All @@ -169,8 +180,74 @@ end

module Q = Cells.Make(Cell)

type update_result =
| Updated
| Update_refused
| Balance_closed

module Balance : sig
type t

val make : unit -> t

val close : t -> (int, [> `Closed]) result
(* Mark as closed and return the previous state. *)

val get : t -> (int, [> `Closed]) result
(** [get t] is the number of items available (if non-negative) or the
number of consumers waiting for an item. *)

val fetch_and_add : t -> int -> (int, [> `Closed]) result
(** [fetch_and_add t diff] increases the value by [diff] and returns the old value. *)

val incr_if_negative : t -> update_result
val decr_if_positive : t -> update_result

val pp : t Fmt.t
end = struct
type t = int Atomic.t

let closed = 1
let counter x = x asr 1
let is_closed x = (x land 1) <> 0

let value x =
if is_closed x then Error `Closed else Ok (x asr 1)

let fetch_and_add x diff =
value (Atomic.fetch_and_add x (diff lsl 1))

let rec decr_if_positive t =
let x = Atomic.get t in
if is_closed x then Balance_closed
else if counter x > 0 then (
if Atomic.compare_and_set t x (x - 2) then Updated
else decr_if_positive t
) else Update_refused

let rec incr_if_negative t =
let x = Atomic.get t in
if is_closed x then Balance_closed
else if counter x < 0 then (
if Atomic.compare_and_set t x (x + 2) then Updated
else incr_if_negative t
) else Update_refused

let make () = Atomic.make 0

let close t =
value (Atomic.exchange t closed)

let get t = value (Atomic.get t)

let pp f t =
match get t with
| Ok x -> Fmt.int f x
| Error `Closed -> Fmt.string f "(closed)"
end

type 'a t = {
balance : int Atomic.t;
balance : Balance.t;
consumers : 'a Q.t;
producers : 'a Q.t;
}
Expand All @@ -180,13 +257,14 @@ type 'a loc =
| Long of ('a Q.segment * 'a Cell.t Atomic.t) (* Acting as suspender of cell; can cancel *)

let dump f t =
Fmt.pf f "@[<v2>Sync (balance=%d)@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
(Atomic.get t.balance)
Fmt.pf f "@[<v2>Sync (balance=%a)@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
Balance.pp t.balance
Q.dump t.consumers
Q.dump t.producers

(* Give [item] to consumer [kc]. [item]'s cell is now Finished. *)
let exchange item kc = item.kp (Ok (kc item.v))
let exchange item kc =
item.kp (if kc item.v then Sent else Rejected)

(* Add [value] to [cell].
If the cell is in transition, place [value] there and let the other party handle it later.
Expand All @@ -209,20 +287,6 @@ let rec add_to_cell queue value cell =

(* Cancelling *)

let rec decr_balance_if_positive t =
let cur = Atomic.get t.balance in
if cur > 0 then (
if Atomic.compare_and_set t.balance cur (cur - 1) then true
else decr_balance_if_positive t
) else false

let rec incr_balance_if_negative t =
let cur = Atomic.get t.balance in
if cur < 0 then (
if Atomic.compare_and_set t.balance cur (cur + 1) then true
else incr_balance_if_negative t
) else false

(* Cancel [cell] on our suspend queue.
This function works for both consumers and producers, as we can tell from
the value what our role is (and if there isn't a value, we're finished anyway).
Expand All @@ -232,7 +296,8 @@ let rec incr_balance_if_negative t =
let cancel t (segment, cell) =
let cancel2 update_balance ~old =
if Atomic.compare_and_set cell old In_transition then (
if update_balance t then (
match update_balance t.balance with
| Updated ->
(* At this point, we are committed to cancelling. *)
begin match Atomic.exchange cell Finished with
| Finished -> assert false
Expand All @@ -241,7 +306,7 @@ let cancel t (segment, cell) =
| Slot kc -> add_to_cell t.producers (Slot kc) (Q.next_resume t.producers)
end;
true
) else (
| Update_refused | Balance_closed ->
(* We decided not to cancel. We know a resume is coming. *)
if Atomic.compare_and_set cell In_transition old then false
else (
Expand All @@ -253,13 +318,12 @@ let cancel t (segment, cell) =
false
| _ -> assert false
)
)
) else false (* The peer resumed us first *)
in
match Atomic.get cell with
| Finished -> false (* The peer resumed us first *)
| Slot _ as old -> cancel2 incr_balance_if_negative ~old (* We are a consumer *)
| Item _ as old -> cancel2 decr_balance_if_positive ~old (* We are a producer *)
| Slot _ as old -> cancel2 Balance.incr_if_negative ~old (* We are a consumer *)
| Item _ as old -> cancel2 Balance.decr_if_positive ~old (* We are a producer *)
| In_transition ->
(* Either we're initialising the cell, in which case we haven't told the
application how to cancel this location yet, or we're already
Expand Down Expand Up @@ -292,16 +356,20 @@ let rec producer_resume_cell t ~success ~in_transition cell =

(* This is essentially the main [put] function, but parameterised so it can be shared with
the rejoin-after-rejection case. *)
let producer_join (t : _ t) ~success ~suspend =
let old = Atomic.fetch_and_add t.balance (+1) in
if old < 0 then (
let cell = Q.next_resume t.consumers in
producer_resume_cell t cell
~success
~in_transition:(fun cell -> suspend (Short cell))
) else (
suspend (Long (Q.next_suspend t.producers))
)
let producer_join (t : _ t) ~success ~suspend ~closed =
match Balance.fetch_and_add t.balance (+1) with
| Error `Closed -> closed ()
| Ok old ->
if old < 0 then (
let cell = Q.next_resume t.consumers in
producer_resume_cell t cell
~success
~in_transition:(fun cell -> suspend (Short cell))
) else (
suspend (Long (Q.next_suspend t.producers))
)

let put_closed_err = Invalid_argument "Stream closed"

(* Called when a consumer took our value but then rejected it.
We start the put operation again, except that our fiber is already suspended
Expand All @@ -310,6 +378,7 @@ let producer_join (t : _ t) ~success ~suspend =
let put_already_suspended t request =
producer_join t
~success:(exchange request)
~closed:(fun () -> request.kp (Failed put_closed_err))
~suspend:(fun loc ->
let Short cell | Long (_, cell) = loc in
add_to_cell t.consumers (Item request) cell;
Expand All @@ -323,7 +392,7 @@ let put_already_suspended t request =
(* We got cancelled after the peer removed our cell and before we updated the
cancel function with the new location, or we were cancelled while doing a
(non-cancellable) resume. Deal with it now. *)
if cancel t loc then request.kp (Error ex);
if cancel t loc then request.kp (Failed ex);
(* else we got resumed first *)
| _, Short _ ->
(* We can't cancel while in the process of resuming a cell on the [consumers] queue.
Expand All @@ -346,12 +415,12 @@ let put_suspend t v loc =
| Long loc -> `Suspended (fun () -> cancel t loc)
in
let rec item = {
v;
v = Ok v;
cancel = Atomic.make cancel;
kp = function
| Error _ as e -> enqueue e (* Cancelled by [put_already_suspended]. *)
| Ok true -> enqueue (Ok ()) (* Success! *)
| Ok false -> put_already_suspended t item (* Consumer rejected value. Restart. *)
| Failed e -> enqueue (Error e)
| Sent -> enqueue (Ok ()) (* Success! *)
| Rejected -> put_already_suspended t item (* Consumer rejected value. Restart. *)
} in
let Short cell | Long (_, cell) = loc in
add_to_cell t.consumers (Item item) cell;
Expand All @@ -368,8 +437,9 @@ let put_suspend t v loc =

let rec put (t : _ t) v =
producer_join t
~success:(fun kc -> if kc v then () else put t v)
~success:(fun kc -> if kc (Ok v) then () else put t v)
~suspend:(put_suspend t v)
~closed:(fun () -> raise put_closed_err)

(* Taking. *)

Expand Down Expand Up @@ -402,25 +472,35 @@ let take_suspend t loc =
)

let take (t : _ t) =
let old = Atomic.fetch_and_add t.balance (-1) in
if old > 0 then (
let cell = Q.next_resume t.producers in
consumer_resume_cell t cell
~success:(fun item -> item.kp (Ok true); item.v)
~in_transition:(fun cell -> take_suspend t (Short cell))
) else (
take_suspend t (Long (Q.next_suspend t.consumers))
)
match Balance.fetch_and_add t.balance (-1) with
| Error _ as e -> e
| Ok old ->
if old > 0 then (
let cell = Q.next_resume t.producers in
consumer_resume_cell t cell
~success:(fun item -> item.kp Sent; item.v)
~in_transition:(fun cell -> take_suspend t (Short cell))
) else (
take_suspend t (Long (Q.next_suspend t.consumers))
)

let take t =
(take t
: (_, [ `Closed ]) result
:> (_, [> `Closed ]) result)

let reject = Slot (fun _ -> false)

let take_nonblocking (t : _ t) =
if decr_balance_if_positive t then (
match Balance.decr_if_positive t.balance with
| Balance_closed -> Error `Closed
| Update_refused -> Error `Would_block (* No waiting producers for us *)
| Updated ->
let rec aux cell =
consumer_resume_cell t cell
~success:(fun item ->
item.kp (Ok true); (* Always accept the item *)
Some item.v
item.kp Sent; (* Always accept the item *)
(item.v :> (_, [`Closed | `Would_block]) result)
)
~in_transition:(fun cell ->
(* Our producer is still in the process of writing its [Item], but
Expand All @@ -430,19 +510,43 @@ let take_nonblocking (t : _ t) =
todo: could spin for a bit here first - the Item will probably arrive soon,
and that would avoid making the producer start again. *)
Domain.cpu_relax (); (* Brief wait to encourage producer to finish *)
if Atomic.compare_and_set cell In_transition reject then None
if Atomic.compare_and_set cell In_transition reject then Error `Would_block
else aux cell
)
in aux (Q.next_resume t.producers)
) else None (* No waiting producers for us *)

let take_nonblocking t =
(take_nonblocking t
: (_, [ `Would_block | `Closed ]) result
:> (_, [> `Would_block | `Closed ]) result)

(* Creation and status. *)

let create () =
{
consumers = Q.make ();
producers = Q.make ();
balance = Atomic.make 0;
balance = Balance.make ();
}

let balance t = Atomic.get t.balance
let close t =
match Balance.close t.balance with
| Error `Closed -> ()
| Ok old ->
if old > 0 then (
(* Reject each waiting producer. They will try to restart and then discover the stream is closed. *)
for _ = 1 to old do
let cell = Q.next_resume t.producers in
add_to_cell t.consumers reject cell;
done
) else (
let reject_consumer = Item { v = Error `Closed; kp = ignore; cancel = Atomic.make `Resuming } in
(* Reject each waiting consumer. *)
for _ = 1 to -old do
let cell = Q.next_resume t.consumers in
add_to_cell t.consumers reject_consumer cell
done
)

let balance t =
Balance.get t.balance
Loading

0 comments on commit 4b627b1

Please sign in to comment.