Skip to content

Commit

Permalink
Merge pull request #499 from talex5/proc-redux
Browse files Browse the repository at this point in the history
Cross-platform subprocess support
  • Loading branch information
talex5 authored May 4, 2023
2 parents 0c28312 + 86501ab commit 7ea93f2
Show file tree
Hide file tree
Showing 18 changed files with 783 additions and 39 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Eio replaces existing concurrency libraries such as Lwt
* [Buffered Writing](#buffered-writing)
* [Error Handling](#error-handling)
* [Filesystem Access](#filesystem-access)
* [Running processes](#running-processes)
* [Time](#time)
* [Multicore Support](#multicore-support)
* [Synchronisation Tools](#synchronisation-tools)
Expand Down Expand Up @@ -876,6 +877,55 @@ A program that operates on the current directory will probably want to use `cwd`
whereas a program that accepts a path from the user will probably want to use `fs`,
perhaps with `open_dir` to constrain all access to be within that directory.

## Running processes

Spawning a child process can be done using the [Eio.Process][] module:

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.run proc_mgr ["echo"; "hello"];;
hello
- : unit = ()
```

There are various optional arguments for setting the process's current directory or connecting up the standard streams.
For example, we can use `tr` to convert some text to upper-case:

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.run proc_mgr ["tr"; "a-z"; "A-Z"]
~stdin:(Eio.Flow.string_source "One two three\n");;
ONE TWO THREE
- : unit = ()
```

If you want to capture the output of a process, you can provide a suitable `Eio.Flow.sink` as the `stdout` argument,
or use the `parse_out` convenience wrapper:

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.parse_out proc_mgr Eio.Buf_read.line ["echo"; "hello"];;
- : string = "hello"
```

All process functions either return the exit status or check that it was zero (success):

```ocaml
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.parse_out proc_mgr Eio.Buf_read.take_all ["sh"; "-c"; "exit 3"];;
Exception:
Eio.Io Process Child_error Exited 3,
running command: sh -c "exit 3"
```

`Process.spawn` and `Process.await` give more control over the process's lifetime
and exit status, and `Eio_unix.Process` gives more control over passing file
descriptors (on systems that support them).

## Time

The standard environment provides a [clock][Eio.Time] with the usual POSIX time:
Expand Down Expand Up @@ -1825,3 +1875,4 @@ Some background about the effects system can be found in:
[kcas]: https://github.com/ocaml-multicore/kcas
[Meio]: https://github.com/tarides/meio
[Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/
[Eio.Process]: https://github.com/ocaml-multicore/eio/blob/main/lib_eio/process.ml
14 changes: 8 additions & 6 deletions doc/prelude.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ module Eio_main = struct
let run fn =
Eio_main.run @@ fun env ->
fn @@ object
method net = env#net
method stdin = env#stdin
method stdout = env#stdout
method cwd = env#cwd
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
method net = env#net
method stdin = env#stdin
method stdout = env#stdout
method stderr = env#stderr
method cwd = env#cwd
method process_mgr = env#process_mgr
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
end
end

Expand Down
2 changes: 2 additions & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Flow = Flow
module Buf_read = Buf_read
module Buf_write = Buf_write
module Net = Net
module Process = Process
module Domain_manager = Domain_manager
module Time = Time
module File = File
Expand All @@ -34,6 +35,7 @@ module Stdenv = struct
let stdout (t : <stdout : #Flow.sink; ..>) = t#stdout
let stderr (t : <stderr : #Flow.sink; ..>) = t#stderr
let net (t : <net : #Net.t; ..>) = t#net
let process_mgr (t : <process_mgr : #Process.mgr; ..>) = t#process_mgr
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : #Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : #Time.Mono.t; ..>) = t#mono_clock
Expand Down
11 changes: 11 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ module Buf_write = Buf_write
(** Networking. *)
module Net = Net

(** Managing child processes. *)
module Process = Process

(** Parallel computation across multiple CPU cores. *)
module Domain_manager : sig
class virtual t : object
Expand Down Expand Up @@ -208,6 +211,14 @@ module Stdenv : sig
val net : <net : #Net.t as 'a; ..> -> 'a
(** [net t] gives access to the process's network namespace. *)

(** {1 Processes }
To use this, see {!Process}.
*)

val process_mgr : <process_mgr : #Process.mgr as 'a; ..> -> 'a
(** [process_mgr t] allows you to manage child processes. *)

(** {1 Domains (using multiple CPU cores)}
To use this, see {!Domain_manager}.
Expand Down
110 changes: 110 additions & 0 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
type exit_status = [
| `Exited of int
| `Signaled of int
]

type status = [ exit_status | `Stopped of int ]

let pp_status ppf = function
| `Exited i -> Format.fprintf ppf "Exited %i" i
| `Signaled i -> Format.fprintf ppf "Signalled %i" i
| `Stopped i -> Format.fprintf ppf "Stopped %i" i

type error =
| Executable_not_found of string
| Child_error of exit_status

type Exn.err += E of error

let err e = Exn.create (E e)

let () =
Exn.register_pp (fun f -> function
| E e ->
Fmt.string f "Process ";
begin match e with
| Executable_not_found e -> Fmt.pf f "Executable %S not found" e;
| Child_error e -> Fmt.pf f "Child_error %a" pp_status e;
end;
true
| _ -> false
)

class virtual t = object
method virtual pid : int
method virtual await : exit_status
method virtual signal : int -> unit
end

let pid proc = proc#pid
let await proc = proc#await

let await_exn proc =
match proc#await with
| `Exited 0 -> ()
| status -> raise (err (Child_error status))

let signal proc = proc#signal

class virtual mgr = object
method virtual pipe :
sw:Switch.t ->
<Flow.source; Flow.close> * <Flow.sink; Flow.close>

method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
?stdin:Flow.source ->
?stdout:Flow.sink ->
?stderr:Flow.sink ->
?env:string array ->
?executable:string ->
string list ->
t
end

let bad_char = function
| ' ' | '"' | '\'' | '\\' -> true
| c ->
let c = Char.code c in
c <= 32 || c >= 127

let pp_arg f x =
if x = "" || String.exists bad_char x then Fmt.(quote string) f x
else Fmt.string f x

let pp_args = Fmt.hbox (Fmt.list ~sep:Fmt.sp pp_arg)

let spawn ~sw (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?env ?executable args =
t#spawn ~sw
?cwd:(cwd :> Fs.dir Path.t option)
?env
?executable args
?stdin:(stdin :> Flow.source option)
?stdout:(stdout :> Flow.sink option)
?stderr:(stderr :> Flow.sink option)

let run (t:#mgr) ?cwd ?stdin ?stdout ?stderr ?env ?executable args =
Switch.run @@ fun sw ->
let child = spawn ~sw t ?cwd ?stdin ?stdout ?stderr ?env ?executable args in
match await child with
| `Exited 0 -> ()
| status ->
let ex = err (Child_error status) in
raise (Exn.add_context ex "running command: %a" pp_args args)

let pipe ~sw (t:#mgr) = t#pipe ~sw

let parse_out (t:#mgr) parse ?cwd ?stdin ?stderr ?env ?executable args =
Switch.run @@ fun sw ->
let r, w = pipe t ~sw in
try
let child = spawn ~sw t ?cwd ?stdin ~stdout:w ?stderr ?env ?executable args in
Flow.close w;
let output = Buf_read.parse_exn parse r ~max_size:max_int in
Flow.close r;
await_exn child;
output
with Exn.Io _ as ex ->
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "running command: %a" pp_args args
150 changes: 150 additions & 0 deletions lib_eio/process.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
(** Example:
{[
# Eio_main.run @@ fun env ->
let proc_mgr = Eio.Stdenv.process_mgr env in
Eio.Process.parse_out proc_mgr Eio.Buf_read.line ["echo"; "hello"]
]}
*)

(** {2 Status and error types} *)

type exit_status = [
| `Exited of int (** Process exited with the given return code. *)
| `Signaled of int (** Process was killed by the given signal. *)
]

type status = [
| exit_status
| `Stopped of int (** Process was stopped (paused) by the given signal. *)
]

val pp_status : status Fmt.t

type error =
| Executable_not_found of string (** The requested executable does not exist. *)
| Child_error of exit_status (** The process exited with an error status. *)

type Exn.err += E of error

val err : error -> exn
(** [err e] is [Eio.Exn.create (E e)] *)

val pp_args : string list Fmt.t
(** Formats a list of arguments, quoting any that might cause confusion to the reader.
This is intended for use in error messages and logging.*)

(** {2 Processes} *)

(** A process. *)
class virtual t : object
method virtual pid : int
method virtual await : exit_status
method virtual signal : int -> unit
end

val pid : #t -> int
(** [pid t] is the process ID of [t]. *)

val await : #t -> exit_status
(** [await t] waits for process [t] to exit and then reports the status. *)

val await_exn : #t -> unit
(** Like {! await} except an exception is raised if the status is not [`Exited 0]. *)

val signal : #t -> int -> unit
(** [signal t i] sends the signal [i] to process [t].
If the process has already exited then this does nothing
(it will not signal a different process, even if the PID has been reused).
See {!Sys} for the signal numbers. *)

(** {2 Spawning processes} *)

class virtual mgr : object
method virtual pipe :
sw:Switch.t ->
<Flow.source; Flow.close> * <Flow.sink; Flow.close>

method virtual spawn :
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
?stdin:Flow.source ->
?stdout:Flow.sink ->
?stderr:Flow.sink ->
?env:string array ->
?executable:string ->
string list ->
t
end
(** A process manager capable of spawning new processes. *)

val spawn :
sw:Switch.t ->
#mgr ->
?cwd:#Fs.dir Path.t ->
?stdin:#Flow.source ->
?stdout:#Flow.sink ->
?stderr:#Flow.sink ->
?env:string array ->
?executable:string ->
string list -> t
(** [spawn ~sw mgr args] creates a new child process that is connected to the switch [sw].
The child process will be sent {! Sys.sigkill} when the switch is released.
If the flows [stdin], [stdout] and [stderr] are not backed by file descriptors then
this also creates pipes and spawns fibers to copy the data as necessary.
If you need more control over file descriptors, see {!Eio_unix.Process}.
@param cwd The current working directory of the process (default: same as parent process).
@param stdin The flow to attach to the process's standard input (default: same as parent process).
@param stdout A flow that the process's standard output goes to (default: same as parent process).
@param stderr A flow that the process's standard error goes to (default: same as parent process).
@param env The environment for the process (default: same as parent process).
@param executable The path of the executable to run.
If not given then the first item in [args] is used,
searching $PATH for it if necessary. *)

val run :
#mgr ->
?cwd:#Fs.dir Path.t ->
?stdin:#Flow.source ->
?stdout:#Flow.sink ->
?stderr:#Flow.sink ->
?env:string array ->
?executable:string ->
string list -> unit
(** [run] does {!spawn} followed by {!await_exn}, with the advantage that if the process fails then
the error message includes the command that failed.
Note: If [spawn] needed to create extra fibers to copy [stdin], etc, then it also waits for those to finish. *)

val parse_out :
#mgr ->
'a Buf_read.parser ->
?cwd:#Fs.dir Path.t ->
?stdin:#Flow.source ->
?stderr:#Flow.sink ->
?env:string array ->
?executable:string ->
string list -> 'a
(** [parse_out mgr parser args] runs [args] and parses the child's stdout with [parser].
It also waits for the process to finish and checks its exit status is zero.
Note that [parser] must consume the entire output of the process (like {!Buf_read.parse}).
To return all the output as a string, use {!Buf_read.take_all} as the parser.
This is a convenience wrapper around {!run},
and the optional arguments have the same meanings. *)

(** {2 Pipes} *)

val pipe : sw:Switch.t -> #mgr -> <Flow.source; Flow.close> * <Flow.sink; Flow.close>
(** [pipe ~sw mgr] creates a pipe backed by the OS.
The flows can be used by {!spawn} without the need for extra fibers to copy the data.
This can be used to connect multiple processes together. *)
Loading

0 comments on commit 7ea93f2

Please sign in to comment.