Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Eio.Process to FCMs #605

Merged
merged 1 commit into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module Stdenv = struct
let stdout (t : <stdout : _ Flow.sink; ..>) = t#stdout
let stderr (t : <stderr : _ Flow.sink; ..>) = t#stderr
let net (t : <net : _ Net.t; ..>) = t#net
let process_mgr (t : <process_mgr : #Process.mgr; ..>) = t#process_mgr
let process_mgr (t : <process_mgr : _ Process.mgr; ..>) = t#process_mgr
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : _ Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : _ Time.Mono.t; ..>) = t#mono_clock
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ module Stdenv : sig
To use this, see {!Process}.
*)

val process_mgr : <process_mgr : #Process.mgr as 'a; ..> -> 'a
val process_mgr : <process_mgr : _ Process.mgr as 'a; ..> -> 'a
(** [process_mgr t] allows you to manage child processes. *)

(** {1 Domains (using multiple CPU cores)}
Expand Down
120 changes: 84 additions & 36 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,62 @@ let () =
| _ -> false
)

class virtual t = object
method virtual pid : int
method virtual await : exit_status
method virtual signal : int -> unit
end

let pid proc = proc#pid
let await proc = proc#await

let await_exn ?(is_success = Int.equal 0) proc =
match proc#await with
| `Exited code when is_success code -> ()
| status -> raise (err (Child_error status))

let signal proc = proc#signal

class virtual mgr = object
method virtual pipe :
sw:Switch.t ->
[Flow.source_ty | Resource.close_ty] r * [Flow.sink_ty | Resource.close_ty] r

method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir_ty Path.t ->
?stdin:Flow.source_ty r ->
?stdout:Flow.sink_ty r ->
?stderr:Flow.sink_ty r ->
?env:string array ->
?executable:string ->
string list ->
t
type 'tag ty = [ `Process | `Platform of 'tag ]

type 'a t = ([> [> `Generic] ty] as 'a) r

type 'tag mgr_ty = [ `Process_mgr | `Platform of 'tag ]

type 'a mgr = 'a r
constraint 'a = [> [> `Generic] mgr_ty]

module Pi = struct
module type PROCESS = sig
type t
type tag

val pid : t -> int
val await : t -> exit_status
val signal : t -> int -> unit
end

type (_, _, _) Resource.pi +=
| Process : ('t, (module PROCESS with type t = 't and type tag = 'tag), [> 'tag ty]) Resource.pi

let process (type t tag) (module X : PROCESS with type t = t and type tag = tag) =
Resource.handler [
H (Process, (module X));
]

module type MGR = sig
type tag
type t

val pipe :
t ->
sw:Switch.t ->
[Flow.source_ty | Resource.close_ty] r * [Flow.sink_ty | Resource.close_ty] r

val spawn :
t ->
sw:Switch.t ->
?cwd:Fs.dir_ty Path.t ->
?stdin:Flow.source_ty r ->
?stdout:Flow.sink_ty r ->
?stderr:Flow.sink_ty r ->
?env:string array ->
?executable:string ->
string list ->
tag ty r
end

type (_, _, _) Resource.pi +=
| Mgr : ('t, (module MGR with type t = 't and type tag = 'tag), [> 'tag mgr_ty]) Resource.pi

let mgr (type t tag) (module X : MGR with type t = t and type tag = tag) =
Resource.handler [
H (Mgr, (module X));
]
end

let bad_char = function
Expand All @@ -77,16 +102,37 @@ let pp_arg f x =

let pp_args = Fmt.hbox (Fmt.list ~sep:Fmt.sp pp_arg)

let spawn ~sw (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?env ?executable args =
t#spawn ~sw
let await (type tag) ((Resource.T (v, ops)) : [> tag ty] r) =
let module X = (val (Resource.get ops Pi.Process)) in
X.await v

let await_exn ?(is_success = Int.equal 0) proc =
match await proc with
| `Exited code when is_success code -> ()
| status -> raise (err (Child_error status))

let pid (type tag) (t : [> tag ty] r) =
let (Resource.T (v, ops)) = t in
let module X = (val (Resource.get ops Pi.Process)) in
X.pid v

let signal (type tag) (t : [> tag ty] r) s =
let (Resource.T (v, ops)) = t in
let module X = (val (Resource.get ops Pi.Process)) in
X.signal v s

let spawn (type tag) ~sw (t : [> tag mgr_ty] r) ?cwd ?stdin ?stdout ?stderr ?env ?executable args : tag ty r =
let (Resource.T (v, ops)) = t in
let module X = (val (Resource.get ops Pi.Mgr)) in
X.spawn v ~sw
?cwd:(cwd :> Fs.dir_ty Path.t option)
?env
?executable args
?stdin:(stdin :> Flow.source_ty r option)
?stdout:(stdout :> Flow.sink_ty r option)
?stderr:(stderr :> Flow.sink_ty r option)

let run (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?(is_success = Int.equal 0) ?env ?executable args =
let run t ?cwd ?stdin ?stdout ?stderr ?(is_success = Int.equal 0) ?env ?executable args =
Switch.run @@ fun sw ->
let child = spawn ~sw t ?cwd ?stdin ?stdout ?stderr ?env ?executable args in
match await child with
Expand All @@ -95,9 +141,11 @@ let run (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?(is_success = Int.equal 0) ?env ?e
let ex = err (Child_error status) in
raise (Exn.add_context ex "running command: %a" pp_args args)

let pipe ~sw (t:#mgr) = t#pipe ~sw
let pipe (type tag) ~sw ((Resource.T (v, ops)) : [> tag mgr_ty] r) =
let module X = (val (Resource.get ops Pi.Mgr)) in
X.pipe v ~sw

let parse_out (t:#mgr) parse ?cwd ?stdin ?stderr ?is_success ?env ?executable args =
let parse_out (type tag) (t : [> tag mgr_ty] r) parse ?cwd ?stdin ?stderr ?is_success ?env ?executable args =
Switch.run @@ fun sw ->
let r, w = pipe t ~sw in
try
Expand Down
104 changes: 69 additions & 35 deletions lib_eio/process.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,66 +36,52 @@ val pp_args : string list Fmt.t

This is intended for use in error messages and logging.*)

(** {2 Processes} *)
(** {2 Types} *)

type 'tag ty = [ `Process | `Platform of 'tag ]

type 'a t = ([> [> `Generic] ty] as 'a) r
(** A process. *)
class virtual t : object
method virtual pid : int
method virtual await : exit_status
method virtual signal : int -> unit
end

val pid : #t -> int
type 'tag mgr_ty = [ `Process_mgr | `Platform of 'tag ]

type 'a mgr = 'a r
constraint 'a = [> [> `Generic] mgr_ty]
(** A process manager capable of spawning new processes. *)

(** {2 Processes} *)

val pid : _ t -> int
(** [pid t] is the process ID of [t]. *)

val await : #t -> exit_status
val await : _ t -> exit_status
(** [await t] waits for process [t] to exit and then reports the status. *)

val await_exn : ?is_success:(int -> bool) -> #t -> unit
val await_exn : ?is_success:(int -> bool) -> _ t -> unit
(** Like {! await} except an exception is raised if does not return a successful
exit status.

@param is_success Used to determine if an exit code is successful.
Default is [Int.equal 0]. *)

val signal : #t -> int -> unit
val signal : _ t -> int -> unit
(** [signal t i] sends the signal [i] to process [t].

If the process has already exited then this does nothing
(it will not signal a different process, even if the PID has been reused).

See {!Sys} for the signal numbers. *)

(** {2 Spawning processes} *)

class virtual mgr : object
method virtual pipe :
sw:Switch.t ->
[Flow.source_ty | Resource.close_ty] r * [Flow.sink_ty | Resource.close_ty] r

method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir_ty Path.t ->
?stdin:Flow.source_ty r ->
?stdout:Flow.sink_ty r ->
?stderr:Flow.sink_ty r ->
?env:string array ->
?executable:string ->
string list ->
t
end
(** A process manager capable of spawning new processes. *)

val spawn :
sw:Switch.t ->
#mgr ->
[> 'tag mgr_ty] r ->
?cwd:Fs.dir_ty Path.t ->
?stdin:_ Flow.source ->
?stdout:_ Flow.sink ->
?stderr:_ Flow.sink ->
?env:string array ->
?executable:string ->
string list -> t
string list -> 'tag ty r
(** [spawn ~sw mgr args] creates a new child process that is connected to the switch [sw].

The child process will be sent {! Sys.sigkill} when the switch is released.
Expand All @@ -114,7 +100,7 @@ val spawn :
searching $PATH for it if necessary. *)

val run :
#mgr ->
_ mgr ->
?cwd:_ Path.t ->
?stdin:_ Flow.source ->
?stdout:_ Flow.sink ->
Expand All @@ -132,7 +118,7 @@ val run :
Note: If [spawn] needed to create extra fibers to copy [stdin], etc, then it also waits for those to finish. *)

val parse_out :
#mgr ->
_ mgr ->
'a Buf_read.parser ->
?cwd:_ Path.t ->
?stdin:_ Flow.source ->
Expand All @@ -154,8 +140,56 @@ val parse_out :

(** {2 Pipes} *)

val pipe : sw:Switch.t -> #mgr -> [Flow.source_ty | Resource.close_ty] r * [Flow.sink_ty | Resource.close_ty] r
val pipe : sw:Switch.t -> _ mgr -> [Flow.source_ty | Resource.close_ty] r * [Flow.sink_ty | Resource.close_ty] r
(** [pipe ~sw mgr] creates a pipe backed by the OS.

The flows can be used by {!spawn} without the need for extra fibers to copy the data.
This can be used to connect multiple processes together. *)

(** {2 Provider Interface} *)
module Pi : sig
module type PROCESS = sig
type t
type tag

val pid : t -> int
val await : t -> exit_status
val signal : t -> int -> unit
end

type (_, _, _) Resource.pi +=
| Process : ('t, (module PROCESS with type t = 't and type tag = 'tag), [> 'tag ty]) Resource.pi

val process :
(module PROCESS with type t = 't and type tag = 'tag) ->
('t, 'tag ty) Resource.handler

module type MGR = sig
type tag
type t

val pipe :
t ->
sw:Switch.t ->
[Flow.source_ty | Resource.close_ty] r * [Flow.sink_ty | Resource.close_ty] r

val spawn :
t ->
sw:Switch.t ->
?cwd:Fs.dir_ty Path.t ->
?stdin:Flow.source_ty r ->
?stdout:Flow.sink_ty r ->
?stderr:Flow.sink_ty r ->
?env:string array ->
?executable:string ->
string list ->
tag ty r
end

type (_, _, _) Resource.pi +=
| Mgr : ('t, (module MGR with type t = 't and type tag = 'tag), [> 'tag mgr_ty]) Resource.pi

val mgr :
(module MGR with type t = 't and type tag = 'tag) ->
('t, 'tag mgr_ty) Resource.handler
end
2 changes: 1 addition & 1 deletion lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module Stdenv = struct
stderr : sink_ty r;
net : [`Unix | `Generic] Eio.Net.ty r;
domain_mgr : Eio.Domain_manager.t;
process_mgr : Process.mgr;
process_mgr : Process.mgr_ty r;
clock : float Eio.Time.clock_ty r;
mono_clock : Eio.Time.Mono.ty r;
fs : Eio.Fs.dir_ty Eio.Path.t;
Expand Down
4 changes: 2 additions & 2 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module Stdenv : sig
stderr : sink_ty r;
net : [`Unix | `Generic] Eio.Net.ty r;
domain_mgr : Eio.Domain_manager.t;
process_mgr : Process.mgr;
process_mgr : Process.mgr_ty r;
clock : float Eio.Time.clock_ty r;
mono_clock : Eio.Time.Mono.ty r;
fs : Eio.Fs.dir_ty Eio.Path.t;
Expand All @@ -86,7 +86,7 @@ end

(** API for Eio backends only. *)
module Private : sig
type _ Effect.t +=
type _ Effect.t +=
| Await_readable : Unix.file_descr -> unit Effect.t (** See {!await_readable} *)
| Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *)
| Get_monotonic_clock : Eio.Time.Mono.ty r Effect.t
Expand Down
Loading
Loading