Skip to content

Commit

Permalink
Add Eio.Pool
Browse files Browse the repository at this point in the history
This is similar to `Lwt_pool`.
  • Loading branch information
talex5 committed Aug 22, 2023
1 parent 5b3578a commit d07f63d
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Mutex = Eio_mutex
module Condition = Condition
module Stream = Stream
module Lazy = Lazy
module Pool = Pool
module Exn = Exn
module Resource = Resource
module Flow = Flow
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ module Stream = Stream
(** Delayed evaluation. *)
module Lazy = Lazy

(** A pool of resources. *)
module Pool = Pool

(** Cancelling fibers. *)
module Cancel = Eio__core.Cancel

Expand Down
160 changes: 160 additions & 0 deletions lib_eio/pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
(* A pool is a sequence of cells containing either available slots or consumers waiting for them.
A slot may or may not contain an actual resource.
To use a resource:
1. Get the next "suspend" cell. If it contains a resource slot, use it.
2. If no slot is ready and we're below capacity, create a new slot and add it (to the next resume cell).
3. Either way, wait for the cell to be resumed with a slot.
4. Once you have a slot, ensure it contains a resource, creating one if not.
5. When done, add the slot back (in the next resume cell).
*)

(* Import these directly because we copy this file for the dscheck tests. *)
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend

type 'a slot = 'a option ref

module Cell = struct
(* The possible behaviours are:
1. Suspender : In_transition -> Request Suspender waits for a resource
1.1. Resumer : Request -> Finished Resumer then providers a resource
1.2. Suspender : Request -> Finished Suspender cancels
2. Resumer : In_transition -> Resource Resumer provides a spare resource
2.1. Suspender : Resource -> Finished Suspender doesn't need to wait
*)

type 'a t =
| In_transition
| Request of ('a slot -> unit)
| Resource of 'a slot
| Finished

let init = In_transition

let segment_order = 2

let dump f = function
| In_transition -> Fmt.string f "In_transition"
| Request _ -> Fmt.string f "Request"
| Resource _ -> Fmt.string f "Resource"
| Finished -> Fmt.string f "Finished"
end

module Q = Cells.Make(Cell)

type 'a t = {
slots : int Atomic.t; (* Total resources, available and in use *)
max_slots : int;
alloc : unit -> 'a;
validate : 'a -> bool;
dispose : 'a -> unit;
q : 'a Q.t;
}

let create ?(validate=Fun.const true) ?(dispose=ignore) max_size alloc =
if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0";
{
slots = Atomic.make 0;
max_slots = max_size;
alloc;
validate;
dispose;
q = Q.make ();
}

(* [add t x] adds [x] to the queue of available slots. *)
let rec add t x =
let cell = Q.next_resume t.q in
let rec aux () =
match Atomic.get cell with
| In_transition -> if not (Atomic.compare_and_set cell In_transition (Resource x)) then aux ()
| Finished -> add t x (* The consumer cancelled. Get another cell and retry. *)
| Request r as prev ->
if Atomic.compare_and_set cell prev Finished then (
r x (* We had a consumer waiting. Give it to them. *)
) else add t x (* Consumer cancelled; retry with another cell. *)
| Resource _ -> assert false (* Can't happen; only a resumer can set this, and we're the resumer. *)
in
aux ()

(* Try to cancel by transitioning from [Request] to [Finished].
This can only be called after previously transitioning to [Request]. *)
let cancel segment cell =
match Atomic.exchange cell Cell.Finished with
| Request _ -> Q.cancel_cell segment; true
| Finished -> false (* Already resumed; reject cancellation *)
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)

(* If [t] is under capacity, add another (empty) slot. *)
let rec maybe_add_slot t =
let current = Atomic.get t.slots in
if current < t.max_slots then (
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
else maybe_add_slot t (* Concurrent update; try again *)
)

(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
Afterwards, the slot is returned to [t]. *)
let run_with t f slot =
match
begin match !slot with
| Some x when t.validate x -> f x
| Some x ->
slot := None;
t.dispose x;
let x = t.alloc () in
slot := Some x;
f x
| None ->
let x = t.alloc () in
slot := Some x;
f x
end
with
| r ->
add t slot;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
add t slot;
Printexc.raise_with_backtrace ex bt

let use t f =
let segment, cell = Q.next_suspend t.q in
match Atomic.get cell with
| Finished | Request _ -> assert false
| Resource slot ->
Atomic.set cell Finished; (* Allow value to be GC'd *)
run_with t f slot
| In_transition ->
(* It would have been better if more resources were available.
If we still have capacity, add a new slot now. *)
maybe_add_slot t;
(* No item is available right now. Start waiting *)
let slot =
Suspend.enter_unchecked (fun ctx enqueue ->
let r x = enqueue (Ok x) in
if Atomic.compare_and_set cell In_transition (Request r) then (
match Fiber_context.get_error ctx with
| Some ex ->
if cancel segment cell then enqueue (Error ex);
(* else being resumed *)
| None ->
Fiber_context.set_cancel_fn ctx (fun ex ->
if cancel segment cell then enqueue (Error ex)
(* else being resumed *)
)
) else (
match Atomic.get cell with
| Resource x ->
Atomic.set cell Finished; (* Allow value to be GC'd *)
enqueue (Ok x)
| _ -> assert false
);
)
in
assert (Atomic.get cell = Finished);
run_with t f slot
43 changes: 43 additions & 0 deletions lib_eio/pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
(** This is useful to manage a collection of resources where creating new ones is expensive
and so you want to reuse them where possible.
Example:
{[
let buffer_pool = Eio.Pool.create 10 (fun () -> Bytes.create 1024) in
Eio.Pool.use buffer_pool (fun buf -> ...)
]}
Note: If you just need to limit how many resources are in use, it is simpler to use {!Eio.Semaphore} instead.
*)

type 'a t

val create :
?validate:('a -> bool) ->
?dispose:('a -> unit) ->
int ->
(unit -> 'a) ->
'a t
(** [create n alloc] is a fresh pool which allows up to [n] resources to be live at a time.
It uses [alloc] to create new resources as needed.
If [alloc] raises an exception then that use fails, but future calls to {!use} will retry.
The [alloc] function is called in the context of the fiber trying to use the pool.
If the pool is shared between domains and the resources are attached to a switch, this
might cause trouble (since switches can't be shared between domains).
You might therefore want to make [alloc] request a resource from the main domain rather than creating one itself.
You should also take care about handling cancellation in [alloc], since resources are typically
attached to a switch with the lifetime of the pool, meaning that if [alloc] fails then they won't
be freed automatically until the pool itself is finished.
@param validate If given, this is used to check each resource before using it.
If it returns [false], the pool removes it with [dispose] and then allocates a fresh resource.
@param dispose Used to free resources rejected by [validate].
If it raises, the exception is passed on to the user,
but resource is still considered to have been disposed. *)

val use : 'a t -> ('a -> 'b) -> 'b
(** [use t fn] waits for some resource [x] to be available and then runs [f x].
Afterwards (on success or error), [x] is returned to the pool. *)
8 changes: 7 additions & 1 deletion lib_eio/tests/dscheck/dune
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
(copy_files# (files ../../sync.ml))
(copy_files# (files ../../unix/rcfd.ml))
(copy_files# (files ../../condition.ml))
(copy_files# (files ../../pool.ml))
(copy_files# (files ../../core/broadcast.ml))

(executables
(names test_cells test_semaphore test_sync test_rcfd test_condition)
(names test_cells test_semaphore test_sync test_rcfd test_condition test_pool)
(libraries dscheck optint fmt eio))

(rule
Expand All @@ -34,3 +35,8 @@
(alias dscheck)
(package eio)
(action (run %{exe:test_condition.exe})))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_pool.exe})))
34 changes: 34 additions & 0 deletions lib_eio/tests/dscheck/test_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module T = Pool
module Cancel = Eio__core.Cancel

exception Abort

(* [clients] threads try to use a pool of size [n].
If [cancel] is set, they also try to cancel, and accept
that as success too. *)
let test ~n ~clients ~cancel () =
let t = T.create n (fun () -> ()) in
let used = Atomic.make 0 in
let finished = ref 0 in
for _ = 1 to clients do
Atomic.spawn (fun () ->
let ctx =
Fake_sched.run @@ fun () ->
try
T.use t (fun () -> Atomic.incr used);
incr finished;
with Cancel.Cancelled Abort ->
incr finished;
in
if cancel then
Option.iter (fun c -> Cancel.cancel c Abort) ctx
)
done;
Atomic.final (fun () ->
if not cancel then Atomic.check (fun () -> Atomic.get used = clients);
Atomic.check (fun () -> !finished = clients);
)

let () =
Atomic.trace (test ~n:1 ~clients:2 ~cancel:false);
Atomic.trace (test ~n:1 ~clients:2 ~cancel:true)
Loading

0 comments on commit d07f63d

Please sign in to comment.