diff --git a/README.md b/README.md index bf1ba503c..fd02e23da 100644 --- a/README.md +++ b/README.md @@ -221,24 +221,66 @@ Calling an operation that performs an effect (such as `yield`) can switch to a d ## Tracing -The library can write traces in CTF format, showing when threads (fibers) are created, when they run, and how they interact. -We can run the previous code with tracing enabled (writing to a new `trace.ctf` file) like this: +When OCaml's tracing is turned on, Eio writes events about many actions, +such as creating fibers or resolving promises. -```ocaml -# let () = - Eio_unix.Trace.with_tracing "trace.ctf" @@ fun () -> - Eio_main.run main;; -+x = 1 -+y = 1 -+x = 2 -+y = 2 -+x = 3 -+y = 3 +[examples/trace](./examples/trace/) shows how to consume the events manually: + + +```sh +$ dune exec -- ./examples/trace/main.exe ++tracer: starting +30926487700447:ring 0: create fiber 0 +30926487702032:ring 0: running fiber 0 +30926487705057:ring 0: create switch 1 +30926487707264:ring 0: create fiber 2 +30926487707512:ring 0: running fiber 2 +30926487720213:ring 0: log "tracer: starting" ++server: starting ++client: connecting socket... ++server: got connection from client ++server: read "Hello" from socket +30926487769298:ring 0: running fiber 0 +30926487769877:ring 0: create fiber 3 +30926487770083:ring 0: running fiber 3 +30926487771198:ring 0: create switch 4 +30926487807888:ring 0: create switch 5 +30926487808329:ring 0: create fiber 6 +30926487808555:ring 0: running fiber 6 +30926487812219:ring 0: log "server: starting" +30926487818883:ring 0: running fiber 3 +30926487819091:ring 0: create fiber 7 +30926487819155:ring 0: running fiber 7 +30926487822428:ring 0: log "client: connecting socket..." +30926487901604:ring 0: running fiber 3 +30926487904947:ring 0: running fiber 0 +30926487907318:ring 0: running fiber 6 +30926487917202:ring 0: log "server: got connection from client" +30926487929993:ring 0: running fiber 6 +30926487941403:ring 0: running fiber 7 +30926487948000:ring 0: running fiber 7 +30926487971898:ring 0: resolve 7 +30926487974810:ring 0: running fiber 6 +30926487975215:ring 0: running fiber 6 +30926487977869:ring 0: running fiber 6 +30926487984514:ring 0: log "server: read \"Hello\" from socket" +30926487990785:ring 0: resolve 6 +30926487991752:ring 0: running fiber 3 +30926488022310:ring 0: resolve 3 +30926497839725:ring 0: running fiber 2 ++tracer: stopping ``` -The trace can be viewed using [mirage-trace-viewer][]. -This should work even while the program is still running. -The file is a ring buffer, so when it gets full, old events will start to be overwritten with new ones. +Note that the output from `traceln` appears in the trace as well as on the console. + +There are various third-party tools that can consume this data +(but may currently require patches to support the new system): + +- [Meio][] (Monitoring for Eio) provides an interactive console-based UI for exploring running fibers. +- [Olly][] can save Perfetto traces and report statistics. +- [mirage-trace-viewer][] renders the trace visually. + +For example, this is how mirage-trace-viewer renders the counting example above:

@@ -246,10 +288,6 @@ The file is a ring buffer, so when it gets full, old events will start to be ove This shows the two counting threads as two horizonal lines. The white regions indicate when each thread was running. -Note that the output from `traceln` appears in the trace as well as on the console. - -The [Meio][] (Monitoring for Eio) project provides an interactive console-based UI for exploring running fibers, -using the new runtime events support in OCaml 5.1. ## Cancellation @@ -1849,3 +1887,4 @@ Some background about the effects system can be found in: [Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/ [Eio.Process]: https://ocaml-multicore.github.io/eio/eio/Eio/Process/index.html [Dev meetings]: https://docs.google.com/document/d/1ZBfbjAkvEkv9ldumpZV5VXrEc_HpPeYjHPW_TiwJe4Q +[Olly]: https://github.com/tarides/runtime_events_tools diff --git a/examples/trace/dune b/examples/trace/dune new file mode 100644 index 000000000..da5d8293d --- /dev/null +++ b/examples/trace/dune @@ -0,0 +1,3 @@ +(executable + (name main) + (libraries eio.runtime_events eio_main)) diff --git a/examples/trace/main.ml b/examples/trace/main.ml new file mode 100644 index 000000000..7aee9579c --- /dev/null +++ b/examples/trace/main.ml @@ -0,0 +1,79 @@ +(* This example shows how to trace an Eio program. + + The [main] function creates a listening socket and has a client connect and send a message, + which is handled by a server fiber. + + At the same time, another fiber is displaying trace events. + For simplicity, this example runs the tracer in the same process as the program being traced, + but typically they would be separate processes. *) + +open Eio.Std + +(* [handle pp] handles an event by writing the timestamp, ring ID and user data with [traceln]. + [pp] is used to format the user data. *) +let handle pp : _ Eio_runtime_events.handler = + fun ring ts v -> + (* Note: don't use traceln here, as it will just generate more log events! *) + Fmt.epr "%9Ld:ring %d: %a@." (Runtime_events.Timestamp.to_int64 ts) ring pp v + +let callbacks = + let x = + Runtime_events.Callbacks.create () +(* + ~runtime_begin:(handle (fun f phase -> Fmt.pf f "begin %s" (Runtime_events.runtime_phase_name phase))) + ~runtime_end:(handle (fun f phase -> Fmt.pf f "end %s" (Runtime_events.runtime_phase_name phase))) +*) + ~lost_events:(fun ring n -> traceln "ring %d lost %d events" ring n) + |> Eio_runtime_events.add_callbacks + ~fiber:(handle (Fmt.fmt "running fiber %d")) + ~create:(handle (fun f (id, ty) -> Fmt.pf f "create %s %d" (Eio_runtime_events.ty_to_string ty) id)) + ~resolve:(handle (Fmt.fmt "resolve %d")) + ~log:(handle (Fmt.fmt "log %S")) + ~name:(handle (fun f (id, name) -> Fmt.pf f "%d is named %S" id name)) + (* (see lib_eio/runtime_events/eio_runtime_events.mli for more event types) *) + in + x + +(* Read and display trace events from [cursor] until [finished]. *) +let trace ~finished (clock, delay) cursor = + traceln "tracer: starting"; + let rec aux () = + let _ : int = Runtime_events.read_poll cursor callbacks None in + if !finished then ( + traceln "tracer: stopping" + ) else ( + Eio.Time.Mono.sleep clock delay; + aux () + ) + in + aux () + +(* The program to be traced. *) +let main net = + Switch.run @@ fun sw -> + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8123) in + let s = Eio.Net.listen ~sw ~backlog:1 ~reuse_addr:true net addr in + Fiber.both + (fun () -> + traceln "server: starting"; + let c, _addr = Eio.Net.accept ~sw s in + traceln "server: got connection from client"; + let msg = Eio.Flow.read_all c in + traceln "server: read %S from socket" msg + ) + (fun () -> + traceln "client: connecting socket..."; + let c = Eio.Net.connect ~sw net addr in + Eio.Flow.copy_string "Hello" c; + Eio.Flow.close c + ) + +(* Enable tracing then run the [main] and [trace] fibers. *) +let () = + Runtime_events.start (); + let cursor = Runtime_events.create_cursor None in (* Create a in-process cursor *) + Eio_main.run @@ fun env -> + let finished = ref false in + Fiber.both + (fun () -> trace ~finished (env#mono_clock, 0.01) cursor) + (fun () -> main env#net; finished := true) diff --git a/lib_eio/core/debug.ml b/lib_eio/core/debug.ml index 6665479ef..7943c30b8 100644 --- a/lib_eio/core/debug.ml +++ b/lib_eio/core/debug.ml @@ -15,7 +15,7 @@ let default_traceln ?__POS__:pos fmt = Format.pp_close_box f (); Format.pp_print_flush f (); let msg = Buffer.contents b in - Trace.label msg; + Trace.log msg; let lines = String.split_on_char '\n' msg in Mutex.lock traceln_mutex; Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () -> diff --git a/lib_eio/core/dune b/lib_eio/core/dune index 17cda9c66..184869bf7 100644 --- a/lib_eio/core/dune +++ b/lib_eio/core/dune @@ -1,4 +1,4 @@ (library (name eio__core) (public_name eio.core) - (libraries cstruct hmap lwt-dllist fmt optint domain-local-await)) + (libraries cstruct hmap lwt-dllist fmt optint domain-local-await eio.runtime_events)) diff --git a/lib_eio/core/fiber.ml b/lib_eio/core/fiber.ml index 0785e8a12..b4f5e153b 100644 --- a/lib_eio/core/fiber.ml +++ b/lib_eio/core/fiber.ml @@ -19,10 +19,10 @@ let fork ~sw f = Switch.with_op sw @@ fun () -> match f () with | () -> - Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None + Trace.resolve (Cancel.Fiber_context.tid new_fiber) | exception ex -> Switch.fail sw ex; (* The [with_op] ensures this will succeed *) - Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) + Trace.resolve_error (Cancel.Fiber_context.tid new_fiber) ex ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_daemon ~sw f = @@ -35,13 +35,13 @@ let fork_daemon ~sw f = match f () with | `Stop_daemon -> (* The daemon asked to stop. *) - Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None + Trace.resolve (Cancel.Fiber_context.tid new_fiber) | exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) -> (* The daemon was cancelled because all non-daemon fibers are finished. *) - Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None + Trace.resolve (Cancel.Fiber_context.tid new_fiber) | exception ex -> Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *) - Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) + Trace.resolve_error (Cancel.Fiber_context.tid new_fiber) ex ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_promise ~sw f = diff --git a/lib_eio/core/promise.ml b/lib_eio/core/promise.ml index ca15c74b0..49a032ee1 100644 --- a/lib_eio/core/promise.ml +++ b/lib_eio/core/promise.ml @@ -76,7 +76,7 @@ let resolve t v = | Resolved _ -> invalid_arg "Can't resolve already-resolved promise" | Unresolved b as prev -> if Atomic.compare_and_set t.state prev (Resolved v) then ( - Trace.resolve t.id ~ex:None; + Trace.resolve t.id; Broadcast.resume_all b ) else ( (* Otherwise, the promise was already resolved. Retry (to get the error). *) diff --git a/lib_eio/core/single_waiter.ml b/lib_eio/core/single_waiter.ml index aa27d7b37..31e3c8495 100644 --- a/lib_eio/core/single_waiter.ml +++ b/lib_eio/core/single_waiter.ml @@ -11,14 +11,17 @@ let create () = { wake = ignore } let wake t v = t.wake v let await t id = - Suspend.enter @@ fun ctx enqueue -> - Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> - t.wake <- ignore; - enqueue (Error ex) - ); - t.wake <- (fun x -> - Cancel.Fiber_context.clear_cancel_fn ctx; - t.wake <- ignore; - Trace.read ~reader:id ctx.tid; - enqueue x - ) + let x = + Suspend.enter @@ fun ctx enqueue -> + Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> + t.wake <- ignore; + enqueue (Error ex) + ); + t.wake <- (fun x -> + Cancel.Fiber_context.clear_cancel_fn ctx; + t.wake <- ignore; + enqueue x + ) + in + Trace.read id; + x diff --git a/lib_eio/core/switch.ml b/lib_eio/core/switch.ml index 891f7db7f..aec1a384b 100644 --- a/lib_eio/core/switch.ml +++ b/lib_eio/core/switch.ml @@ -51,7 +51,7 @@ let combine_exn ex = function let fail ?(bt=Printexc.get_raw_backtrace ()) t ex = check_our_domain t; if t.exs = None then - Trace.resolve t.id ~ex:(Some ex); + Trace.resolve_error t.id ex; t.exs <- Some (combine_exn (ex, bt) t.exs); try Cancel.cancel t.cancel ex diff --git a/lib_eio/core/trace.ml b/lib_eio/core/trace.ml index d4987beb4..17bec65ac 100644 --- a/lib_eio/core/trace.ml +++ b/lib_eio/core/trace.ml @@ -1,31 +1,5 @@ (* Copyright (C) 2014, Thomas Leonard *) -(* Note: we expect some kind of logger to process the trace buffer to collect - events, but currently we don't have any barriers to ensure that the buffer - is in a consistent state (although it usually is). So for now, you should - pause tracing before trying to parse the buffer. In particular, GC events - complicate things because we may need to add a GC event while in the middle - of adding some other event. *) - -open Bigarray - -module BS = struct - (* Replacement for endianBigstring that avoids pulling in a Unix dependency *) - - external set_64 : Cstruct.buffer -> int -> int64 -> unit = "%caml_bigstring_set64" - external swap64 : int64 -> int64 = "%bswap_int64" - external unsafe_chr : int -> char = "%identity" - - let set_int8 s off v = Array1.set s off (unsafe_chr v) - [@@ocaml.inline] - - let set_int64_le s off v = - if Sys.big_endian - then set_64 s off (swap64 v) - else set_64 s off v - [@@ocaml.inline] -end - type id = int let id_chunk_size = 1024 @@ -47,359 +21,19 @@ let mint_id () = Domain.DLS.set next_id_key next_id_local_succ; next_id_local -type ty = - | Fiber - | Promise - | Semaphore - | Switch - | Stream - | Mutex - -type log_buffer = (char, int8_unsigned_elt, c_layout) Array1.t - -let current_thread = ref (-1) - -let int_of_thread_type t = - match t with - | Fiber -> 1 - | Promise -> 15 - | Semaphore -> 16 - | Switch -> 17 - | Stream -> 18 - | Mutex -> 19 - -module Packet = struct - let magic = 0xc1fc1fc1l - let uuid = "\x05\x88\x3b\x8d\x52\x1a\x48\x7b\xb3\x97\x45\x6a\xb1\x50\x68\x0c" - - (* - [%%cstruct - type packet_header = { - (* Stream header, repeated for each packet *) - magic: uint32_t; - uuid: uint8_t [@len 16]; - - (* Packet header *) - size: uint32_t; - stream_packet_count: uint16_t; - content_size_low: uint16_t; (* 2x16 bit to avoid allocating an Int32 *) - content_size_high: uint16_t; - } [@@little_endian] - ] - *) - - (* Auto-generated code from the above (to avoid a dependency on ppxlib) *) - let sizeof_packet_header = 30 - let set_packet_header_magic v x = Cstruct.LE.set_uint32 v 0 x - let set_packet_header_uuid src srcoff dst = Cstruct.blit_from_string src srcoff dst 4 16 - let set_packet_header_size v x = Cstruct.LE.set_uint32 v 20 x - let set_packet_header_stream_packet_count v x = Cstruct.LE.set_uint16 v 24 x - let set_packet_header_content_size_low v x = Cstruct.LE.set_uint16 v 26 x - let set_packet_header_content_size_high v x = Cstruct.LE.set_uint16 v 28 x - (* End auto-generated code *) - - type t = { - packet_start : int; - header : Cstruct.t; - packet_end : int; - } - - let first_event packet = - packet.packet_start + sizeof_packet_header - - let packet_end packet = - packet.packet_end - - let set_content_end packet content_end = - let header = packet.header in - let bits = (content_end - packet.packet_start) * 8 in - set_packet_header_content_size_low header (bits land 0xffff); - set_packet_header_content_size_high header (bits lsr 16) - - let clear ~count packet = - let bits = sizeof_packet_header * 8 in - let header = packet.header in - set_packet_header_stream_packet_count header (count land 0xffff); - set_packet_header_content_size_low header (bits land 0xffff); - set_packet_header_content_size_high header (bits lsr 16) - - let make ~count ~off ~len buffer = - let header = Cstruct.of_bigarray ~off ~len:sizeof_packet_header buffer in - set_packet_header_magic header magic; - set_packet_header_uuid uuid 0 header; - set_packet_header_size header (Int32.of_int (len * 8)); - let packet = { - packet_start = off; - header; - packet_end = off + len; - } in - clear ~count packet; - packet - -end - -module Control = struct - (* Following LTT, our trace buffer is divided into a small number of - * fixed-sized "packets", each of which contains many events. When there - * isn't room in the current packet for the next event, we move to the next - * packet. This wastes a few bytes at the end of each packet, but it allows - * us to discard whole packets at a time when we need to overwrite something. - *) - type t = { - log : log_buffer; - - timestamper : log_buffer -> int -> unit; (* Write a timestamp at the given offset. *) - - mutable next_event : int; (* Index to write next event (always < packet_end) *) - mutable packet_end: int; - packets : Packet.t array; - mutable active_packet : int; - - (* Each packet is numbered, making it easy to get the order when reading the - * ring buffer and allowing for detection of missed packets. *) - mutable next_stream_packet_count : int; - } - - let event_log = ref None - - let stop log = - match !event_log with - | Some active when log == active -> - event_log := None - | _ -> failwith "Log is not currently tracing!" - - let op_creates = 0 - (* let op_read = 1 *) - let op_fulfills = 2 - let op_fails = 3 - (* let op_becomes = 4 *) - let op_label = 5 - (* let op_increase = 6 *) - let op_switch = 7 - (* let op_gc = 8 *) - (* let op_old_signal = 9 *) - let op_try_read = 10 - (* let op_counter_value = 11 *) - let op_read_later = 12 - let op_signal = 13 +module RE = Eio_runtime_events - let write64 log v i = - BS.set_int64_le log i v; - i + 8 - - let write8 log v i = - BS.set_int8 log i v; - i + 1 - - let write_string log v i = - let l = String.length v in - for idx = 0 to l - 1 do - Array1.set log (i + idx) v.[idx] - done; - Array1.set log (i + l) '\x00'; - i + l + 1 - - (* The current packet is full. Move to the next one. *) - let next_packet log = - log.active_packet <- (log.active_packet + 1) mod Array.length log.packets; - let packet = log.packets.(log.active_packet) in - log.packet_end <- Packet.packet_end packet; - log.next_event <- Packet.first_event packet; - let count = log.next_stream_packet_count in - Packet.clear packet ~count; - log.next_stream_packet_count <- count + 1 - - let rec add_event log op len = - (* Note: be careful about allocation here, as doing GC will add another event... *) - let i = log.next_event in - let new_i = i + 9 + len in - (* >= rather than > is slightly wasteful, but avoids next_event overlapping the next packet *) - if new_i >= log.packet_end then ( - (* Printf.printf "can't write %d at %d\n%!" (9 + len) i; *) - let old_packet = log.packets.(log.active_packet) in - assert (i > Packet.first_event old_packet); - next_packet log; - add_event log op len - ) else ( - (* Printf.printf "writing at %d\n%!" i; *) - log.next_event <- new_i; - Packet.set_content_end log.packets.(log.active_packet) new_i; - log.timestamper log.log i; - i + 8 |> write8 log.log op - ) - - (* This is faster than [let end_event = ignore]! *) - external end_event : int -> unit = "%ignore" -(* - let end_event i = - match !event_log with - | None -> assert false - | Some log -> assert (i = log.next_event || log.next_event = 0) -*) - - let write_tid log tid = - write64 log (Int64.of_int tid) - - let note_created log child thread_type = - add_event log op_creates 17 - |> write_tid log.log !current_thread - |> write_tid log.log child - |> write8 log.log (int_of_thread_type thread_type) - |> end_event - - let note_read log ~reader input = - add_event log op_read_later 16 - |> write_tid log.log reader - |> write_tid log.log input - |> end_event - - let note_try_read log thread input = - add_event log op_try_read 16 - |> write_tid log.log thread - |> write_tid log.log input - |> end_event - - let note_signal ~src log dst = - add_event log op_signal 16 - |> write_tid log.log dst - |> write_tid log.log src - |> end_event - - let note_resolved log p ~ex = - match ex with - | Some ex -> - let msg = Printexc.to_string ex in - add_event log op_fails (17 + String.length msg) - |> write_tid log.log !current_thread - |> write_tid log.log p - |> write_string log.log msg - |> end_event - | None -> - add_event log op_fulfills 16 - |> write_tid log.log !current_thread - |> write_tid log.log p - |> end_event - -(* - let note_becomes log input main = - if main <> input then ( - add_event log op_becomes 16 - |> write64 log.log input - |> write64 log.log main - |> end_event - ) -*) - - let note_label log thread msg = - add_event log op_label (9 + String.length msg) - |> write_tid log.log thread - |> write_string log.log msg - |> end_event - - let note_switch log new_current = - if new_current <> !current_thread then ( - current_thread := new_current; - add_event log op_switch 8 - |> write_tid log.log new_current - |> end_event - ) - - let note_suspend log () = - current_thread := (-1); - add_event log op_switch 8 - |> write_tid log.log (-1) - |> end_event - -(* - let note_gc duration = - match !event_log with - | None -> () - | Some log -> - add_event log op_gc 8 - |> write64 log.log (duration *. 1000000000. |> Int64.of_float) - |> end_event -*) - - let make ~timestamper log = - let size = Array1.dim log in - let n_packets = 4 in - let packet_size = size / n_packets in - let packets = Array.init n_packets (fun i -> - let off = i * packet_size in - let len = if i = n_packets - 1 then size - off else packet_size in - Packet.make ~count:i ~off ~len log - ) in - let active_packet = 0 in - { - log; - timestamper; - packets; - active_packet; - packet_end = Packet.packet_end packets.(active_packet); - next_event = Packet.first_event packets.(active_packet); - next_stream_packet_count = 1; - } - - let start (log:t) = - event_log := Some log; - current_thread := -1 -end - -let label name = - match !Control.event_log with - | None -> () - | Some log -> Control.note_label log !current_thread name +let add_event = Runtime_events.User.write let create ?label id ty = - match !Control.event_log with - | None -> () - | Some log -> - Control.note_created log id ty; - Option.iter (Control.note_label log id) label - -let fiber new_current = - match !Control.event_log with - | None -> () - | Some log -> Control.note_switch log new_current - -let hiatus () = - match !Control.event_log with - | None -> () - | Some log -> Control.note_suspend log () - -let resume new_current = - match !Control.event_log with - | None -> () - | Some log -> Control.note_switch log new_current - -let try_read input = - match !Control.event_log with - | None -> () - | Some log -> Control.note_try_read log !current_thread input - -let read ?reader input = - match !Control.event_log with - | None -> () - | Some log -> - let reader = - match reader with - | None -> !current_thread - | Some r -> r - in - Control.note_read log ~reader input - -let resolve id ~ex = - match !Control.event_log with - | None -> () - | Some log -> Control.note_resolved log id ~ex - -let signal ?src dst = - match !Control.event_log with - | None -> () - | Some log -> - let src = - match src with - | None -> !current_thread - | Some x -> x - in - Control.note_signal ~src log dst + add_event RE.create (id, ty); + Option.iter (fun l -> add_event RE.name (id, l)) label + +let log = add_event RE.log +let fiber = add_event RE.fiber +let suspend = add_event RE.suspend +let try_read = add_event RE.try_read +let read = add_event RE.read +let signal = add_event RE.signal +let resolve = add_event RE.resolve +let resolve_error id ex = add_event RE.resolve_error (id, ex) diff --git a/lib_eio/core/trace.mli b/lib_eio/core/trace.mli index 6e50b0cf1..64dff7885 100644 --- a/lib_eio/core/trace.mli +++ b/lib_eio/core/trace.mli @@ -1,78 +1,40 @@ -(** This library is used to write event traces in mirage-profile's CTF format. *) +(** Trace Eio events using OCaml's runtime events system. *) type id = private int (** Each thread/fiber/promise is identified by a unique ID. *) +val mint_id : unit -> id +(** [mint_id ()] is a fresh unique [id]. *) + (** {2 Recording events} Libraries and applications can use these functions to make the traces more useful. *) -val label : string -> unit -(** [label msg] attaches text [msg] to the current thread. *) +val log : string -> unit +(** [log msg] attaches text [msg] to the current fiber. *) (** {2 Recording system events} These are normally only called by the scheduler. *) -type ty = - | Fiber - | Promise - | Semaphore - | Switch - | Stream - | Mutex -(** Types of recorded objects. *) - -val mint_id : unit -> id -(** [mint_id ()] is a fresh unique [id]. *) +val create : ?label:string -> id -> Eio_runtime_events.ty -> unit +(** [create id ty] records the creation of [id]. *) -val create : ?label:string -> id -> ty -> unit -(** [create t id ty] records the creation of [id]. *) - -val read : ?reader:id -> id -> unit -(** [read src] records that promise [src]'s value was read. - @param reader The thread doing the read (default is the current thread). *) +val read : id -> unit +(** [read src] records that promise [src]'s value was read. *) val try_read : id -> unit -(** [try_read src] records that the current thread wants to read from [src] (which is not currently ready). *) +(** [try_read src] records that the current fiber wants to read from [src] (which is not currently ready). *) val fiber : id -> unit -(** [fiber id] records that fiber [id] is now running. *) - -val hiatus : unit -> unit -(** [hiatus ()] records that the system will sleep for reason [r]. *) - -val resume : id -> unit -(** [resume id] records that the system has resumed (used after {!hiatus}), - and is now running [id]. *) - -val resolve : id -> ex:exn option -> unit -(** [resolve id ~ex] records that [id] is now resolved. - If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *) - -val signal : ?src:id -> id -> unit -(** [signal ~src dst] records that [dst] was signalled. - @param src The thread sending the signal (default is the current thread). *) - -(** {2 Controlling tracing} *) - -type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t - -module Control : sig - type t - - val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t - (** [make ~timestamper b] is a trace buffer that record events in [b]. - In most cases, the {!Eio_unix.Trace} module provides a simpler interface. *) +(** [fiber id] records that [id] is now the current fiber for this domain. *) - val start : t -> unit - (** [start t] begins recording events in [t]. *) +val suspend : Runtime_events.Type.span -> unit +(** [suspend] records when the event loop is stopped waiting for events from the OS. *) - val stop : t -> unit - (** [stop t] stops recording to [t] (which must be the current trace buffer). *) -end +val resolve : id -> unit +(** [resolve id] records that [id] is now resolved. *) -(**/**) +val resolve_error : id -> exn -> unit +(** [resolve_error id exn] records that [id] is now failed. *) -module BS : sig - val set_int8 : Cstruct.buffer -> int -> int -> unit - val set_int64_le : Cstruct.buffer -> int -> int64 -> unit -end +val signal : id -> unit +(** [signal x] records that [x] was signalled. *) diff --git a/lib_eio/eio_mutex.ml b/lib_eio/eio_mutex.ml index 18f7a5de5..7ec326784 100644 --- a/lib_eio/eio_mutex.ml +++ b/lib_eio/eio_mutex.ml @@ -56,12 +56,14 @@ let lock t = match t.state with | Locked -> Trace.try_read t.id; - begin match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with - | `Error ex -> raise ex (* Poisoned; stop waiting *) + begin match Waiters.await ~mutex:(Some t.mutex) t.waiters with + | `Error ex -> + Trace.read t.id; + raise ex (* Poisoned; stop waiting *) | `Take -> (* The unlocker didn't change the state, so it's still Locked, as required. {locked t * R} *) - () + Trace.read t.id end | Unlocked -> Trace.read t.id; diff --git a/lib_eio/runtime_events/dune b/lib_eio/runtime_events/dune new file mode 100644 index 000000000..6ba2acc86 --- /dev/null +++ b/lib_eio/runtime_events/dune @@ -0,0 +1,4 @@ +(library + (name eio_runtime_events) + (public_name eio.runtime_events) + (libraries runtime_events)) diff --git a/lib_eio/runtime_events/eio_runtime_events.ml b/lib_eio/runtime_events/eio_runtime_events.ml new file mode 100644 index 000000000..51450c847 --- /dev/null +++ b/lib_eio/runtime_events/eio_runtime_events.ml @@ -0,0 +1,180 @@ +type id = int + +type ty = + | Fiber + | Promise + | Semaphore + | Switch + | Stream + | Mutex + +let ty_to_uint8 = function + | Fiber -> 1 + | Promise -> 15 + | Semaphore -> 16 + | Switch -> 17 + | Stream -> 18 + | Mutex -> 19 + +let ty_of_uint8 = function + | 1 -> Fiber + | 15 -> Promise + | 16 -> Semaphore + | 17 -> Switch + | 18 -> Stream + | 19 -> Mutex + | _ -> assert false + +let ty_to_string (t : ty) = + match t with + | Fiber -> "fiber" + | Promise -> "promise" + | Semaphore -> "semaphore" + | Switch -> "switch" + | Stream -> "stream" + | Mutex -> "mutex" + +let string = + let encode buf s = + let len = min (Bytes.length buf) (String.length s) in + Bytes.blit_string s 0 buf 0 len; + len + in + let decode buf len = Bytes.sub_string buf 0 len in + Runtime_events.Type.register ~encode ~decode + +let id_ty_type = + let encode buf (id, ty) = + Bytes.set_int64_le buf 0 (Int64.of_int id); + Bytes.set_int8 buf 8 (ty_to_uint8 ty); + 9 + in + let decode buf _size = + let id = Bytes.get_int64_le buf 0 |> Int64.to_int in + let ty = ty_of_uint8 (Bytes.get_int8 buf 8) in + (id, ty) + in + Runtime_events.Type.register ~encode ~decode + +let id_string_type = + let encode buf (id, msg) = + (* Check size of buf and use smallest size which means we may + have to truncate the label. *) + let available_buf_len = Bytes.length buf - 8 in + let msg_len = String.length msg in + let data_len = min available_buf_len msg_len in + Bytes.set_int64_le buf 0 (Int64.of_int id); + Bytes.blit_string msg 0 buf 8 data_len; + data_len + 8 + in + let decode buf size = + let id = Bytes.get_int64_le buf 0 |> Int64.to_int in + (id, Bytes.sub_string buf 8 (size - 8)) + in + Runtime_events.Type.register ~encode ~decode + +let exn_type = + let encode buf (id, exn) = + (* Check size of buf and use smallest size which means we may + have to truncate the label. *) + let available_buf_len = Bytes.length buf - 8 in + let msg = Printexc.to_string exn in + let msg_len = String.length msg in + let data_len = min available_buf_len msg_len in + Bytes.set_int64_le buf 0 (Int64.of_int id); + Bytes.blit_string msg 0 buf 8 data_len; + data_len + 8 + in + let decode buf size = + let id = Bytes.get_int64_le buf 0 |> Int64.to_int in + (id, Failure (Bytes.sub_string buf 8 (size - 8))) + in + Runtime_events.Type.register ~encode ~decode + +(* Runtime events registration *) + +type Runtime_events.User.tag += Create +let create = Runtime_events.User.register "eio.create" Create id_ty_type + +type Runtime_events.User.tag += Read +let read = Runtime_events.User.register "eio.read" Read Runtime_events.Type.int + +type Runtime_events.User.tag += Try_read +let try_read = Runtime_events.User.register "eio.try_read" Try_read Runtime_events.Type.int + +type Runtime_events.User.tag += Resolve | Resolve_error +let resolve = Runtime_events.User.register "eio.resolve" Resolve Runtime_events.Type.int +let resolve_error = Runtime_events.User.register "eio.resolve_error" Resolve_error exn_type + +type Runtime_events.User.tag += Name +let name = Runtime_events.User.register "eio.name" Name id_string_type + +type Runtime_events.User.tag += Log +let log = Runtime_events.User.register "eio.log" Log string + +type Runtime_events.User.tag += Fiber +let fiber = Runtime_events.User.register "eio.fiber" Fiber Runtime_events.Type.int + +type Runtime_events.User.tag += Signal +let signal = Runtime_events.User.register "eio.signal" Signal Runtime_events.Type.int + +type Runtime_events.User.tag += Suspend +let suspend = Runtime_events.User.register "eio.suspend" Suspend Runtime_events.Type.span + +type 'a handler = int -> Runtime_events.Timestamp.t -> 'a -> unit + +let ignore_event : _ handler = fun _ring_id _ts _data -> () + +let add_callbacks + ?(create=ignore_event) + ?(read=ignore_event) + ?(try_read=ignore_event) + ?(resolve=ignore_event) + ?(resolve_error=ignore_event) + ?(name=ignore_event) + ?(log=ignore_event) + ?(fiber=ignore_event) + ?(signal=ignore_event) + ?(suspend=ignore_event) + x = + let create_event ring_id ts ev v = + match Runtime_events.User.tag ev with + | Create -> create ring_id ts v + | _ -> assert false + in + let int_event ring_id ts ev v = + match Runtime_events.User.tag ev with + | Read -> read ring_id ts v + | Try_read -> try_read ring_id ts v + | Resolve -> resolve ring_id ts v + | Fiber -> fiber ring_id ts v + | Signal -> signal ring_id ts v + | _ -> () + in + let span_event ring_id ts ev v = + match Runtime_events.User.tag ev with + | Suspend -> suspend ring_id ts v + | _ -> () + in + let int_exn_event ring_id ts ev (id, ex) = + match Runtime_events.User.tag ev, ex with + | Resolve_error, Failure msg -> resolve_error ring_id ts (id, msg) + | _ -> () + in + let id_string_event ring_id ts ev v = + match Runtime_events.User.tag ev with + | Name -> name ring_id ts v + | _ -> () + in + let string_event ring_id ts ev v = + match Runtime_events.User.tag ev with + | Log -> log ring_id ts v + | _ -> () + in + x + |> Runtime_events.Callbacks.add_user_event id_ty_type create_event + |> Runtime_events.Callbacks.add_user_event Runtime_events.Type.int int_event + |> Runtime_events.Callbacks.add_user_event exn_type int_exn_event + |> Runtime_events.Callbacks.add_user_event string string_event + |> Runtime_events.Callbacks.add_user_event id_string_type id_string_event + |> Runtime_events.Callbacks.add_user_event Runtime_events.Type.span span_event diff --git a/lib_eio/runtime_events/eio_runtime_events.mli b/lib_eio/runtime_events/eio_runtime_events.mli new file mode 100644 index 000000000..7c57168ec --- /dev/null +++ b/lib_eio/runtime_events/eio_runtime_events.mli @@ -0,0 +1,46 @@ +(** This library is used to write event traces using OCaml's runtime events infrastructure. *) + +type id = int + +type ty = + | Fiber + | Promise + | Semaphore + | Switch + | Stream + | Mutex +(** Types of recorded objects. *) + +val ty_to_string : ty -> string + +(** {2 Writing events} *) + +val create : (id * ty) Runtime_events.User.t +val log : string Runtime_events.User.t +val name : (id * string) Runtime_events.User.t +val resolve : id Runtime_events.User.t +val resolve_error : (id * exn) Runtime_events.User.t +val fiber : id Runtime_events.User.t +val read : id Runtime_events.User.t +val try_read : id Runtime_events.User.t +val signal : id Runtime_events.User.t +val suspend : Runtime_events.Type.span Runtime_events.User.t + +(** {2 Consuming events} *) + +type 'a handler = int -> Runtime_events.Timestamp.t -> 'a -> unit +(** A ['a handler] is a function for handling events of type ['a]. + It is called as [handler ring_id ts value]. *) + +val add_callbacks: + ?create:(id * ty) handler -> + ?read:id handler -> + ?try_read:id handler -> + ?resolve:id handler -> + ?resolve_error:(id * string) handler -> + ?name:(id * string) handler -> + ?log:string handler -> + ?fiber:id handler -> + ?signal:id handler -> + ?suspend:Runtime_events.Type.span handler -> + Runtime_events.Callbacks.t -> Runtime_events.Callbacks.t diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index eba7514b4..fe179476e 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -52,7 +52,7 @@ module Locking = struct ) else ( (* The queue is full. Wait for our turn first. *) Suspend.enter_unchecked @@ fun ctx enqueue -> - Waiters.await_internal ~mutex:(Some t.mutex) t.writers t.id ctx (fun r -> + Waiters.await_internal ~mutex:(Some t.mutex) t.writers ctx (fun r -> (* This is called directly from [wake_one] and so we have the lock. We're still running in [wake_one]'s domain here. *) if Result.is_ok r then ( @@ -69,7 +69,9 @@ module Locking = struct match Queue.take_opt t.items with | None -> (* There aren't any items, so we need to wait for one. *) - Waiters.await ~mutex:(Some t.mutex) t.readers t.id + let x = Waiters.await ~mutex:(Some t.mutex) t.readers in + Trace.read t.id; + x | Some v -> (* If anyone was waiting for space, let the next one go. [is_empty writers || length items = t.capacity - 1] *) diff --git a/lib_eio/unix/ctf_unix.ml b/lib_eio/unix/ctf_unix.ml deleted file mode 100644 index 214f9bc80..000000000 --- a/lib_eio/unix/ctf_unix.ml +++ /dev/null @@ -1,21 +0,0 @@ -open Bigarray - -module Trace = Eio.Private.Trace - -let timestamper log_buffer ofs = - let ns = Mtime.to_uint64_ns @@ Mtime_clock.now () in - Trace.BS.set_int64_le log_buffer ofs ns - -let mmap_buffer ~size path = - let fd = Unix.(openfile path [O_RDWR; O_CREAT; O_TRUNC] 0o644) in - Unix.set_close_on_exec fd; - Unix.ftruncate fd size; - let ba = array1_of_genarray (Unix.map_file fd char c_layout true [| size |]) in - Unix.close fd; - ba - -let with_tracing ?(size=0x100000) path fn = - let buffer = mmap_buffer ~size path in - let trace_config = Trace.Control.make ~timestamper buffer in - Trace.Control.start trace_config; - Fun.protect fn ~finally:(fun () -> Trace.Control.stop trace_config) diff --git a/lib_eio/unix/ctf_unix.mli b/lib_eio/unix/ctf_unix.mli deleted file mode 100644 index 7b0a7e2a1..000000000 --- a/lib_eio/unix/ctf_unix.mli +++ /dev/null @@ -1,9 +0,0 @@ -val timestamper : Eio.Private.Trace.log_buffer -> int -> unit -(** Uses [Mtime_clock] to write timestamps. *) - -val mmap_buffer : size:int -> string -> Eio.Private.Trace.log_buffer -(** [mmap_buffer ~size path] initialises file [path] as an empty buffer for tracing. *) - -val with_tracing : ?size:int -> string -> (unit -> 'a) -> 'a -(** [with_tracing path fn] is a convenience function that uses {!mmap_buffer} to create a log buffer, - calls {!Trace.Control.start} to start recording, runs [fn], and then stops recording. *) diff --git a/lib_eio/unix/eio_unix.ml b/lib_eio/unix/eio_unix.ml index 41d2c59fa..45325817e 100644 --- a/lib_eio/unix/eio_unix.ml +++ b/lib_eio/unix/eio_unix.ml @@ -26,8 +26,6 @@ let run_in_systhread = Private.run_in_systhread module Ipaddr = Net.Ipaddr -module Trace = Ctf_unix - module Process = Process module Net = Net module Pi = Pi diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index 5da4c3272..c968f2de6 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -97,6 +97,4 @@ module Private : sig module Fork_action = Fork_action end -module Trace = Ctf_unix - module Pi = Pi diff --git a/lib_eio/waiters.ml b/lib_eio/waiters.ml index 9bd4e0208..293dcd6d5 100644 --- a/lib_eio/waiters.ml +++ b/lib_eio/waiters.ml @@ -38,7 +38,7 @@ let rec wake_one t v = let is_empty = Lwt_dllist.is_empty -let await_internal ~mutex (t:'a t) id ctx enqueue = +let await_internal ~mutex (t:'a t) ctx enqueue = match Fiber_context.get_error ctx with | Some ex -> Option.iter Mutex.unlock mutex; @@ -46,10 +46,6 @@ let await_internal ~mutex (t:'a t) id ctx enqueue = | None -> let resolved_waiter = ref Hook.null in let finished = Atomic.make false in - let enqueue x = - Trace.read ~reader:id (Fiber_context.tid ctx); - enqueue x - in let cancel ex = if Atomic.compare_and_set finished false true then ( Hook.remove !resolved_waiter; @@ -66,5 +62,5 @@ let await_internal ~mutex (t:'a t) id ctx enqueue = Mutex.unlock mutex (* Returns a result if the wait succeeds, or raises if cancelled. *) -let await ~mutex waiters id = - Suspend.enter_unchecked (await_internal ~mutex waiters id) +let await ~mutex waiters = + Suspend.enter_unchecked (await_internal ~mutex waiters) diff --git a/lib_eio/waiters.mli b/lib_eio/waiters.mli index f3c39fe0c..e08bd4b92 100644 --- a/lib_eio/waiters.mli +++ b/lib_eio/waiters.mli @@ -21,8 +21,8 @@ val is_empty : 'a t -> bool val await : mutex:Mutex.t option -> - 'a t -> Trace.id -> 'a -(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t]. + 'a t -> 'a +(** [await ~mutex t] suspends the current fiber and adds its continuation to [t]. When the waiter is woken, the fiber is resumed and returns the result. If [t] can be used from multiple domains: - [mutex] must be set to the mutex to use to unlock it. @@ -32,9 +32,9 @@ val await : val await_internal : mutex:Mutex.t option -> - 'a t -> Trace.id -> Fiber_context.t -> + 'a t -> Fiber_context.t -> (('a, exn) result -> unit) -> unit -(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber. +(** [await_internal ~mutex t ctx enqueue] is like [await], but the caller has to suspend the fiber. This also allows wrapping the [enqueue] function. Calls [enqueue (Error (Cancelled _))] if cancelled. Note: [enqueue] is called from the triggering domain, diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml index d3a194d24..1efc8877f 100644 --- a/lib_eio_linux/low_level.ml +++ b/lib_eio_linux/low_level.ml @@ -20,12 +20,12 @@ let file_offset t = function let enqueue_read st action (file_offset,fd,buf,len) = let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in - Trace.label "read"; + Trace.log "read"; Sched.submit_rw_req st req let rec enqueue_writev args st action = let (file_offset,fd,bufs) = args in - Trace.label "writev"; + Trace.log "writev"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.writev st.uring ~file_offset fd bufs (Job action) ) @@ -35,11 +35,11 @@ let rec enqueue_writev args st action = let enqueue_write st action (file_offset,fd,buf,len) = let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in - Trace.label "write"; + Trace.log "write"; Sched.submit_rw_req st req let rec enqueue_splice ~src ~dst ~len st action = - Trace.label "splice"; + Trace.log "splice"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.splice st.uring (Job action) ~src ~dst ~len ) @@ -48,7 +48,7 @@ let rec enqueue_splice ~src ~dst ~len st action = Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action = - Trace.label "openat2"; + Trace.log "openat2"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action) ) @@ -57,7 +57,7 @@ let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st ac Queue.push (fun st -> enqueue_openat2 args st action) st.io_q let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action = - Trace.label "statx"; + Trace.log "statx"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.statx st.uring ?fd ~mask path buf flags (Job action) ) @@ -66,7 +66,7 @@ let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action = Queue.push (fun st -> enqueue_statx args st action) st.io_q let rec enqueue_unlink ((dir, fd, path) as args) st action = - Trace.label "unlinkat"; + Trace.log "unlinkat"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.unlink st.uring ~dir ~fd path (Job action) ) @@ -75,7 +75,7 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action = Queue.push (fun st -> enqueue_unlink args st action) st.io_q let rec enqueue_connect fd addr st action = - Trace.label "connect"; + Trace.log "connect"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.connect st.uring fd addr (Job action) ) @@ -84,7 +84,7 @@ let rec enqueue_connect fd addr st action = Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q let rec enqueue_send_msg fd ~fds ~dst buf st action = - Trace.label "send_msg"; + Trace.log "send_msg"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.send_msg st.uring fd ~fds ?dst buf (Job action) ) @@ -93,7 +93,7 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action = Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q let rec enqueue_recv_msg fd msghdr st action = - Trace.label "recv_msg"; + Trace.log "recv_msg"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.recv_msg st.uring fd msghdr (Job action); ) @@ -102,7 +102,7 @@ let rec enqueue_recv_msg fd msghdr st action = Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q let rec enqueue_accept fd client_addr st action = - Trace.label "accept"; + Trace.log "accept"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.accept st.uring fd client_addr (Job action) ) in @@ -112,7 +112,7 @@ let rec enqueue_accept fd client_addr st action = ) let rec enqueue_noop t action = - Trace.label "noop"; + Trace.log "noop"; let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in if job = None then ( (* wait until an sqe is available *) @@ -147,7 +147,7 @@ let read_upto ?file_offset fd buf len = let rec enqueue_readv args st action = let (file_offset,fd,bufs) = args in - Trace.label "readv"; + Trace.log "readv"; let retry = Sched.with_cancel_hook ~action st (fun () -> Uring.readv st.uring ~file_offset fd bufs (Job action)) in @@ -465,7 +465,6 @@ let shutdown socket command = | Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg let accept ~sw fd = - Trace.label "accept"; Fd.use_exn "accept" fd @@ fun fd -> let client_addr = Uring.Sockaddr.create () in let res = Sched.enter (enqueue_accept fd client_addr) in diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 27b90ba26..9060a4994 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -9,8 +9,6 @@ module Suspended = Eio_utils.Suspended module Zzz = Eio_utils.Zzz module Lf_queue = Eio_utils.Lf_queue -let system_thread = Trace.mint_id () - let statx_works = ref false (* Before Linux 5.18, statx is unreliable *) type exit = [`Exit_scheduler] @@ -119,7 +117,7 @@ let rec enqueue_job t fn = (* Cancellations always come from the same domain, so no need to send wake events here. *) let rec enqueue_cancel job t = - Trace.label "cancel"; + Trace.log "cancel"; match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with | None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q | Some _ -> () @@ -162,7 +160,7 @@ let submit_pending_io st = match Queue.take_opt st.io_q with | None -> () | Some fn -> - Trace.label "submit_pending_io"; + Trace.log "submit_pending_io"; fn st let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) = @@ -183,7 +181,7 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as re ) in if retry then ( - Trace.label "await-sqe"; + Trace.log "await-sqe"; (* wait until an sqe is available *) Queue.push (fun st -> submit_rw_req st req) io_q ) @@ -244,9 +242,9 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = (* At this point we're not going to check [run_q] again before sleeping. If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Trace.hiatus (); + Trace.suspend Begin; let result = Uring.wait ?timeout uring in - Trace.resume system_thread; + Trace.suspend End; Atomic.set st.need_wakeup false; Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) match result with @@ -338,7 +336,7 @@ let free_buf st buf = | Some k -> enqueue_thread st k buf let rec enqueue_poll_add fd poll_mask st action = - Trace.label "poll_add"; + Trace.log "poll_add"; let retry = with_cancel_hook ~action st (fun () -> Uring.poll_add st.uring fd poll_mask (Job action) ) @@ -347,7 +345,7 @@ let rec enqueue_poll_add fd poll_mask st action = Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q let rec enqueue_poll_add_unix fd poll_mask st action cb = - Trace.label "poll_add"; + Trace.log "poll_add"; let retry = with_cancel_hook ~action st (fun () -> Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb)) ) @@ -357,7 +355,7 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb = let rec enqueue_readv args st action = let (file_offset,fd,bufs) = args in - Trace.label "readv"; + Trace.log "readv"; let retry = with_cancel_hook ~action st (fun () -> Uring.readv st.uring ~file_offset fd bufs (Job action)) in diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index d0b22cca4..bc7fb6bd3 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -78,8 +78,8 @@ let test_direct_copy () = let buffer = Buffer.create 20 in let to_output = Eio.Flow.buffer_sink buffer in Switch.run (fun sw -> - Fiber.fork ~sw (fun () -> Trace.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2); - Fiber.fork ~sw (fun () -> Trace.label "copy2"; Eio.Flow.copy from_pipe2 to_output); + Fiber.fork ~sw (fun () -> Trace.log "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2); + Fiber.fork ~sw (fun () -> Trace.log "copy2"; Eio.Flow.copy from_pipe2 to_output); Eio.Flow.copy (Eio.Flow.string_source msg) to_pipe1; Eio.Flow.close to_pipe1; ); diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 0fadc8308..fe9e42c00 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -24,8 +24,6 @@ module Poll = Iomux.Poll type exit = [`Exit_scheduler] -let system_thread = Trace.mint_id () - (* The type of items in the run queue. *) type runnable = | IO : runnable (* Reminder to check for IO *) @@ -209,12 +207,12 @@ let rec next t : [`Exit_scheduler] = (* At this point we're not going to check [run_q] again before sleeping. If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Trace.hiatus (); + Trace.suspend Begin; let nready = try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout with Unix.Unix_error (Unix.EINTR, _, "") -> 0 in - Trace.fiber system_thread; + Trace.suspend End; Atomic.set t.need_wakeup false; Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) Poll.iter_ready t.poll nready (ready t); diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml index eb28a0884..5f8f9fbda 100755 --- a/lib_eio_windows/sched.ml +++ b/lib_eio_windows/sched.ml @@ -23,8 +23,6 @@ module Rcfd = Eio_unix.Private.Rcfd type exit = [`Exit_scheduler] -let system_thread = Trace.mint_id () - (* The type of items in the run queue. *) type runnable = | IO : runnable (* Reminder to check for IO *) @@ -204,14 +202,16 @@ let rec next t : [`Exit_scheduler] = (* At this point we're not going to check [run_q] again before sleeping. If [need_wakeup] is still [true], this is fine because we don't promise to do that. If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Trace.hiatus (); + Trace.suspend Begin; let cons fd acc = fd :: acc in let read = FdSet.fold cons t.poll.to_read [] in let write = FdSet.fold cons t.poll.to_write [] in match Unix.select read write [] timeout with - | exception Unix.(Unix_error (EINTR, _, _)) -> next t + | exception Unix.(Unix_error (EINTR, _, _)) -> + Trace.suspend End; + next t | readable, writeable, _ -> - Trace.resume system_thread; + Trace.suspend End; Atomic.set t.need_wakeup false; Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) List.iter (ready t [ `W ]) writeable; diff --git a/tests/nounix/nounix.ml b/tests/nounix/nounix.ml index 3f855d236..f0af7e6cf 100644 --- a/tests/nounix/nounix.ml +++ b/tests/nounix/nounix.ml @@ -1,9 +1,5 @@ -(* This module also checks that Eio doesn't pull in a dependency on Unix. +(* This module checks that Eio doesn't pull in a dependency on Unix. See the [dune] file. *) -module Trace = Eio.Private.Trace - let () = - let bs = Cstruct.create 8 in - Trace.BS.set_int64_le bs.buffer 0 1234L; - assert (Cstruct.LE.get_uint64 bs 0 = 1234L) + assert (Eio.Buf_read.(parse_string_exn take_all) "hi" = "hi") diff --git a/tests/sync.md b/tests/sync.md index f39d5af31..d37b90f71 100644 --- a/tests/sync.md +++ b/tests/sync.md @@ -96,10 +96,10 @@ Basic semaphore tests: let running = ref 0 in let sem = Semaphore.make 2 in let fork = Fiber.fork_promise ~sw in - let a = fork (fun () -> Trace.label "a"; Semaphore.acquire sem; incr running) in - let b = fork (fun () -> Trace.label "b"; Semaphore.acquire sem; incr running) in - let c = fork (fun () -> Trace.label "c"; Semaphore.acquire sem; incr running) in - let d = fork (fun () -> Trace.label "d"; Semaphore.acquire sem; incr running) in + let a = fork (fun () -> Trace.log "a"; Semaphore.acquire sem; incr running) in + let b = fork (fun () -> Trace.log "b"; Semaphore.acquire sem; incr running) in + let c = fork (fun () -> Trace.log "c"; Semaphore.acquire sem; incr running) in + let d = fork (fun () -> Trace.log "d"; Semaphore.acquire sem; incr running) in traceln "Semaphore means that only %d threads are running" !running; Promise.await_exn a; Promise.await_exn b; @@ -132,8 +132,8 @@ Releasing a semaphore when no-one is waiting for it: let sem = Semaphore.make 0 in Semaphore.release sem; (* Release with free-counter *) traceln "Initial config: %d" (Semaphore.get_value sem); - Fiber.fork ~sw (fun () -> Trace.label "a"; Semaphore.acquire sem); - Fiber.fork ~sw (fun () -> Trace.label "b"; Semaphore.acquire sem); + Fiber.fork ~sw (fun () -> Trace.log "a"; Semaphore.acquire sem); + Fiber.fork ~sw (fun () -> Trace.log "b"; Semaphore.acquire sem); traceln "A running: %d" (Semaphore.get_value sem); Semaphore.release sem; (* Release with a non-empty wait-queue *) traceln "Now b running: %d" (Semaphore.get_value sem);