Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Eio.Pool #602

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Mutex = Eio_mutex
module Condition = Condition
module Stream = Stream
module Lazy = Lazy
module Pool = Pool
module Exn = Exn
module Resource = Resource
module Flow = Flow
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ module Stream = Stream
(** Delayed evaluation. *)
module Lazy = Lazy

(** A pool of resources. *)
module Pool = Pool

(** Cancelling fibers. *)
module Cancel = Eio__core.Cancel

Expand Down
158 changes: 158 additions & 0 deletions lib_eio/pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
(* A pool is a sequence of cells containing either available slots or consumers waiting for them.
A slot may or may not contain an actual resource.

To use a resource:

1. Get the next "suspend" cell. If it contains a resource slot, use it.
2. If no slot is ready and we're below capacity, create a new slot and add it (to the next resume cell).
3. Either way, wait for the cell to be resumed with a slot.
4. Once you have a slot, ensure it contains a resource, creating one if not.
5. When done, add the slot back (in the next resume cell).
*)

(* Import these directly because we copy this file for the dscheck tests. *)
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend

type 'a slot = 'a option ref

module Cell = struct
(* The possible behaviours are:

1. Suspender : In_transition -> Request Suspender waits for a resource
1.1. Resumer : Request -> Finished Resumer then providers a resource
1.2. Suspender : Request -> Finished Suspender cancels
2. Resumer : In_transition -> Resource Resumer provides a spare resource
2.1. Suspender : Resource -> Finished Suspender doesn't need to wait
*)

type 'a t =
| In_transition
| Request of ('a slot -> unit)
| Resource of 'a slot
| Finished

let init = In_transition

let segment_order = 2

let dump f = function
| In_transition -> Fmt.string f "In_transition"
| Request _ -> Fmt.string f "Request"
| Resource _ -> Fmt.string f "Resource"
| Finished -> Fmt.string f "Finished"
end

module Q = Cells.Make(Cell)

type 'a t = {
slots : int Atomic.t; (* Total resources, available and in use *)
max_slots : int;
alloc : unit -> 'a;
validate : 'a -> bool;
dispose : 'a -> unit;
q : 'a Q.t;
}

let create ?(validate=Fun.const true) ?(dispose=ignore) max_size alloc =
if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0";
{
slots = Atomic.make 0;
max_slots = max_size;
alloc;
validate;
dispose;
q = Q.make ();
}

(* [add t x] adds [x] to the queue of available slots. *)
let rec add t x =
let cell = Q.next_resume t.q in
let rec aux () =
match Atomic.get cell with
| In_transition -> if not (Atomic.compare_and_set cell In_transition (Resource x)) then aux ()
| Finished -> add t x (* The consumer cancelled. Get another cell and retry. *)
| Request r as prev ->
if Atomic.compare_and_set cell prev Finished then (
r x (* We had a consumer waiting. Give it to them. *)
) else add t x (* Consumer cancelled; retry with another cell. *)
| Resource _ -> assert false (* Can't happen; only a resumer can set this, and we're the resumer. *)
in
aux ()

(* Try to cancel by transitioning from [Request] to [Finished].
This can only be called after previously transitioning to [Request]. *)
let cancel segment cell =
match Atomic.exchange cell Cell.Finished with
| Request _ -> Q.cancel_cell segment; true
| Finished -> false (* Already resumed; reject cancellation *)
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)

(* If [t] is under capacity, add another (empty) slot. *)
let rec maybe_add_slot t =
let current = Atomic.get t.slots in
if current < t.max_slots then (
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
else maybe_add_slot t (* Concurrent update; try again *)
)

(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
Afterwards, the slot is returned to [t]. *)
let run_with t f slot =
match
begin match !slot with
| Some x when t.validate x -> f x
| Some x ->
slot := None;
t.dispose x;
let x = t.alloc () in
slot := Some x;
f x
| None ->
let x = t.alloc () in
slot := Some x;
f x
end
with
| r ->
add t slot;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
add t slot;
Printexc.raise_with_backtrace ex bt

let use t f =
let segment, cell = Q.next_suspend t.q in
match Atomic.get cell with
| Finished | Request _ -> assert false
| Resource slot ->
Atomic.set cell Finished; (* Allow value to be GC'd *)
run_with t f slot
| In_transition ->
(* It would have been better if more resources were available.
If we still have capacity, add a new slot now. *)
maybe_add_slot t;
(* No item is available right now. Start waiting *)
let slot =
Suspend.enter_unchecked (fun ctx enqueue ->
let r x = enqueue (Ok x) in
if Atomic.compare_and_set cell In_transition (Request r) then (
match Fiber_context.get_error ctx with
| Some ex ->
if cancel segment cell then enqueue (Error ex);
(* else being resumed *)
| None ->
Fiber_context.set_cancel_fn ctx (fun ex ->
if cancel segment cell then enqueue (Error ex)
(* else being resumed *)
)
) else (
match Atomic.exchange cell Finished with
| Resource x -> enqueue (Ok x)
| _ -> assert false
);
)
in
(* assert (Atomic.get cell = Finished); *)
run_with t f slot
43 changes: 43 additions & 0 deletions lib_eio/pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
(** This is useful to manage a collection of resources where creating new ones is expensive
and so you want to reuse them where possible.

Example:

{[
let buffer_pool = Eio.Pool.create 10 (fun () -> Bytes.create 1024) in
Eio.Pool.use buffer_pool (fun buf -> ...)
]}

Note: If you just need to limit how many resources are in use, it is simpler to use {!Eio.Semaphore} instead.
*)

type 'a t

val create :
?validate:('a -> bool) ->
?dispose:('a -> unit) ->
int ->
(unit -> 'a) ->
'a t
(** [create n alloc] is a fresh pool which allows up to [n] resources to be live at a time.
It uses [alloc] to create new resources as needed.
If [alloc] raises an exception then that use fails, but future calls to {!use} will retry.

The [alloc] function is called in the context of the fiber trying to use the pool.
If the pool is shared between domains and the resources are attached to a switch, this
might cause trouble (since switches can't be shared between domains).
You might therefore want to make [alloc] request a resource from the main domain rather than creating one itself.

You should also take care about handling cancellation in [alloc], since resources are typically
attached to a switch with the lifetime of the pool, meaning that if [alloc] fails then they won't
be freed automatically until the pool itself is finished.

@param validate If given, this is used to check each resource before using it.
If it returns [false], the pool removes it with [dispose] and then allocates a fresh resource.
@param dispose Used to free resources rejected by [validate].
If it raises, the exception is passed on to the user,
but resource is still considered to have been disposed. *)

val use : 'a t -> ('a -> 'b) -> 'b
(** [use t fn] waits for some resource [x] to be available and then runs [f x].
Afterwards (on success or error), [x] is returned to the pool. *)
8 changes: 7 additions & 1 deletion lib_eio/tests/dscheck/dune
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
(copy_files# (files ../../sync.ml))
(copy_files# (files ../../unix/rcfd.ml))
(copy_files# (files ../../condition.ml))
(copy_files# (files ../../pool.ml))
(copy_files# (files ../../core/broadcast.ml))

(executables
(names test_cells test_semaphore test_sync test_rcfd test_condition)
(names test_cells test_semaphore test_sync test_rcfd test_condition test_pool)
(libraries dscheck optint fmt eio))

(rule
Expand All @@ -34,3 +35,8 @@
(alias dscheck)
(package eio)
(action (run %{exe:test_condition.exe})))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_pool.exe})))
34 changes: 34 additions & 0 deletions lib_eio/tests/dscheck/test_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module T = Pool
module Cancel = Eio__core.Cancel

exception Abort

(* [clients] threads try to use a pool of size [n].
If [cancel] is set, they also try to cancel, and accept
that as success too. *)
let test ~n ~clients ~cancel () =
let t = T.create n (fun () -> ()) in
let used = Atomic.make 0 in
let finished = ref 0 in
for _ = 1 to clients do
Atomic.spawn (fun () ->
let ctx =
Fake_sched.run @@ fun () ->
try
T.use t (fun () -> Atomic.incr used);
incr finished;
with Cancel.Cancelled Abort ->
incr finished;
in
if cancel then
Option.iter (fun c -> Cancel.cancel c Abort) ctx
)
done;
Atomic.final (fun () ->
if not cancel then Atomic.check (fun () -> Atomic.get used = clients);
Atomic.check (fun () -> !finished = clients);
)

let () =
Atomic.trace (test ~n:1 ~clients:2 ~cancel:false);
Atomic.trace (test ~n:1 ~clients:2 ~cancel:true)
Loading
Loading