Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signal #374

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib_eio/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@
(public_name eio)
(flags (:standard -open Eio__core -open Eio__core.Private))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint mtime))

(rule
(targets config.ml)
(action (run ./include/discover.exe)))
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module Time = Time
module File = File
module Fs = Fs
module Path = Path
module Signal = Signal

module Stdenv = struct
type t = <
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ module Fs = Fs
(** Accessing paths on a file-system. *)
module Path = Path

(** Setting up signal handlers. *)
module Signal = Signal

(** Control over debugging. *)
module Debug : sig
(** Example:
Expand Down
102 changes: 102 additions & 0 deletions lib_eio/include/discover.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
module C = Configurator.V1

let mangle (s:string) =
let is_alpha = function 'a' .. 'z' | 'A' .. 'Z' -> true | _ -> false in
let rec first_alpha i =
if is_alpha (String.get s i) then
i
else
first_alpha (succ i)
in
let skip = first_alpha 0 in
String.sub s skip ((String.length s) - skip) |>
String.lowercase_ascii

let () =
let open C.C_define in
C.main ~name:"discover" @@
fun c ->
let sig_build =
C.C_define.import c
~c_flags:["-I"; Filename.concat (Sys.getcwd ()) "include"]
~includes:["signal.h"]
in
let process =
List.map
(function
| name, Value.Int v ->
Printf.sprintf " let %s = %d" (mangle name) v
| name, Value.Switch v ->
if not v then
Printf.sprintf " let %s_opt = None" (mangle name)
else
(match sig_build Type.[ name, Int; ] with
| (name, Value.Int v) :: [] ->
Printf.sprintf " let %s_opt = Some %d" (mangle name) v
| _ -> assert false)
| _ -> assert false)
in
let signums =
sig_build
Type.[
"SIGHUP", Int;
"SIGINT", Int;
"SIGQUIT", Int;
"SIGILL", Int;
"SIGTRAP", Int;
"SIGABRT", Int;
"SIGEMT", Switch;
"SIGFPE", Int;
"SIGKILL", Int;
"SIGBUS", Int;
"SIGSEGV", Int;
"SIGSYS", Int;
"SIGPIPE", Int;
"SIGALRM", Int;
"SIGTERM", Int;
"SIGURG", Int;
"SIGSTOP", Int;
"SIGTSTP", Int;
"SIGCONT", Int;
"SIGCHLD", Int;
"SIGTTIN", Int;
"SIGTTOU", Int;
"SIGIO", Int;
"SIGXCPU", Int;
"SIGXFSZ", Int;
"SIGVTALRM", Int;
"SIGPROF", Int;
"SIGWINCH", Int;
"SIGINFO", Switch;
"SIGUSR1", Int;
"SIGUSR2", Int;
"SIGTHR", Switch;
] |> process
in
let nsig =
let maxfound =
sig_build
Type.[
"NSIG", Switch;
"_NSIG", Switch;
"_SIG_MAXSIG", Switch;
] |>
List.map
(function
| name, Value.Switch v ->
if not v then
0
else
(match sig_build Type.[ name, Int; ] with
| (_, Value.Int v) :: [] -> v
| _ -> assert false)
| _ -> assert false)
in
let maxfound = List.fold_left max 64 maxfound in
[ Printf.sprintf " let nsig = %d" maxfound ]
in
C.Flags.write_lines "config.ml" @@
[ "module Signum = struct" ] @
nsig @
signums @
[ "end" ]
4 changes: 4 additions & 0 deletions lib_eio/include/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(executable
(name discover)
(modules discover)
(libraries dune-configurator))
70 changes: 70 additions & 0 deletions lib_eio/signal.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
(*
* Copyright (C) 2022 Christiano Haesbaert
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

include Config.Signum

type sigbox = {
box : unit Stream.t;
signum : int;
hook : Switch.hook ref;
subscribed : bool ref;
}

type signum = int

let signum_to_int signum = (signum :> int)

module Private = struct
type _ Effect.t +=
| Subscribe : (int * unit Stream.t) -> unit Effect.t
| Unsubscribe : (int * unit Stream.t) -> unit Effect.t
| Publish : int -> unit Effect.t

let subscribe sigbox = Effect.perform @@ Subscribe (sigbox.signum, sigbox.box)
let unsubscribe sigbox = Effect.perform @@ Unsubscribe (sigbox.signum, sigbox.box)
let publish signum = Effect.perform @@ Publish signum
end

let check_subscription sigbox =
if not !(sigbox.subscribed) then
invalid_arg "sigbox is not subscribed"

let unsubscribe sigbox =
check_subscription sigbox;
Private.unsubscribe sigbox;
sigbox.subscribed := false;
Switch.remove_hook !(sigbox.hook)

let subscribe ~sw signum =
assert (signum > 0 && signum < nsig);
let sigbox =
{ box = Stream.create 1; signum;
hook = ref Switch.null_hook; subscribed = ref false }
in
sigbox.hook := Switch.on_release_cancellable sw (fun () -> unsubscribe sigbox);
Private.subscribe sigbox;
sigbox.subscribed := true;
sigbox

let publish signum = Private.publish signum

let wait sigbox =
check_subscription sigbox;
Stream.take sigbox.box

let wait_one signum = Switch.run @@ fun sw -> subscribe ~sw signum |> wait

let is_pending sigbox = not (Stream.is_empty sigbox.box)
107 changes: 107 additions & 0 deletions lib_eio/signal.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
(** Signals

Signals in EIO work on a subscription basis. A {!sigbox} is
subscribed to a specific {!signum} via [subscribe], which can in
turn be waited on with [wait].

When a signal arrives, all {!sigbox}es subscribed to the
corresponding signal are populated and [wait] returns.

Signals are buffered but not indefinately, the size of the buffer
and the number of signals queued depend on the backend implementation.

There can be multiple subscriptions for the same {!signum}, even
across different domains, but each individual {!sigbox} is local
to its domain and tied to a {!Switch.t}
*)

(** Example:
{[
Switch.run @@ fun sw ->
let sighupbox = Signal.(subscribe ~sw sighup) in
while !run do
Signal.wait sighupbox;
traceln "Got sighup";
daemon_reconfigure ();
done;
Signal.unsubscribe sighupbox
]}

{[
Fiber.first
(fun () -> Signal.(wait_one sigint))
(fun () -> Time.sleep clock 1.0)
]}
*)

type sigbox
(** Stores signals that can be [wait]ed on, each {!sigbox} listens to
one {!signum}, you can have multiple {!sigbox} for the same
{!signum}, and in multiple domains.

*)

module Private : sig
type _ Effect.t +=
| Subscribe : (int * unit Stream.t) -> unit Effect.t
| Unsubscribe : (int * unit Stream.t) -> unit Effect.t
| Publish : int -> unit Effect.t
end

type signum = private int

val subscribe : sw:Switch.t -> signum -> sigbox
(** Create a subscription for waiting on signal {!signum}. *)

val unsubscribe : sigbox -> unit
(** Remove a subscription, possibly restoring the orignal {!signum} behaviour. *)

val publish : signum -> unit
(** Raise a signal, like kill. **)

val wait : sigbox -> unit
(** Wait for signals, it is an error to [wait] on an unsubscribed {!sigbox}. *)

val wait_one : signum -> unit
(** Like [wait] but only for a single occurrence of {!signum}. *)

val is_pending : sigbox -> bool
(** false if [wait] would block. *)

val signum_to_int : signum -> int
(** The system's idea of {!signum}, always a positive number, unlike signals from {!Sys}. *)

(** Signals, some are optional since not every system supports those. *)
val sighup : signum
val sigint : signum
val sigquit : signum
val sigill : signum
val sigtrap : signum
val sigabrt : signum
val sigemt_opt : signum option
val sigfpe : signum
val sigkill : signum
val sigbus : signum
val sigsegv : signum
val sigsys : signum
val sigpipe : signum
val sigalrm : signum
val sigterm : signum
val sigurg : signum
val sigstop : signum
val sigtstp : signum
val sigcont : signum
val sigchld : signum
val sigttin : signum
val sigttou : signum
val sigio : signum
val sigxcpu : signum
val sigxfsz : signum
val sigvtalrm : signum
val sigprof : signum
val sigwinch : signum
val siginfo_opt : signum option
val sigusr1 : signum
val sigusr2 : signum
val sigthr_opt : signum option
val nsig : int
16 changes: 16 additions & 0 deletions lib_eio/stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ let add t item =
)
)

let add_nonblocking t item =
Mutex.lock t.mutex;
match Waiters.wake_one t.readers item with
| `Ok -> Mutex.unlock t.mutex; true
| `Queue_empty ->
(* No-one is waiting for an item. Queue it. *)
if Queue.length t.items < t.capacity then (
Queue.add item t.items;
Mutex.unlock t.mutex;
true
) else (
Mutex.unlock t.mutex;
false)

let add_canfail t item = ignore @@ add_nonblocking t item

let take t =
Mutex.lock t.mutex;
match Queue.take_opt t.items with
Expand Down
10 changes: 10 additions & 0 deletions lib_eio/stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ val add : 'a t -> 'a -> unit

If this would take [t] over capacity, it blocks until there is space. *)

val add_nonblocking : 'a t -> 'a -> bool
(** [add_nonblocking t item] adds [item] to [t].

Returns true if added, false if it would block. *)

val add_canfail : 'a t -> 'a -> unit
(** [add_canfail t item] tries to add [item] to [t].

Adding can fail if [t] is full, only use this when you want unreliable streams. *)

val take : 'a t -> 'a
(** [take t] takes the next item from the head of [t].

Expand Down
Loading