From 9cf69670757d14aa2889a1326c12f558ffa96938 Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Wed, 13 Nov 2024 04:30:55 +0000 Subject: [PATCH 1/3] Initial --- crashtracker/src/collector/alt_fork.rs | 72 ++++++++++++++ crashtracker/src/collector/crash_handler.rs | 103 ++++++++++++-------- crashtracker/src/collector/emitters.rs | 14 +-- crashtracker/src/collector/mod.rs | 1 + 4 files changed, 141 insertions(+), 49 deletions(-) create mode 100644 crashtracker/src/collector/alt_fork.rs diff --git a/crashtracker/src/collector/alt_fork.rs b/crashtracker/src/collector/alt_fork.rs new file mode 100644 index 000000000..e97d0f1be --- /dev/null +++ b/crashtracker/src/collector/alt_fork.rs @@ -0,0 +1,72 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 +#![cfg(unix)] + +use anyhow; +use libc::pid_t; +use std::fs::File; +use std::io::{self, BufRead, BufReader}; + +#[cfg(target_os = "macos")] +pub(crate) fn alt_fork() -> i32 { + // There is a lower-level `__fork()` function in macOS, and we can call it from Rust, but the + // runtime is much stricter about which operations (e.g., no malloc) are allowed in the child. + // This somewhat defeats the purpose, so macOS for now will just have to live with atfork + // handlers. + unsafe { libc::fork() } +} + +#[cfg(target_os = "linux")] +fn is_being_traced() -> io::Result { + // Check to see whether we are being traced. This will fail on systems where procfs is + // unavailable, but presumably in those systems `ptrace()` is also unavailable. + // The caller is free to treat a failure as a false. + let file = File::open("/proc/self/status")?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line?; + if line.starts_with("TracerPid:") { + let tracer_pid = line.split_whitespace().nth(1).unwrap_or("0"); + return Ok(tracer_pid != "0"); + } + } + + Ok(false) +} + +#[cfg(target_os = "linux")] +pub(crate) fn alt_fork() -> pid_t { + use libc::{ + c_ulong, c_void, syscall, SYS_clone, CLONE_CHILD_CLEARTID, CLONE_CHILD_SETTID, + CLONE_PTRACE, SIGCHLD, + }; + + let mut ptid: pid_t = 0; + let mut ctid: pid_t = 0; + + // Check whether we're traced before we fork. + let being_traced = is_being_traced().unwrap_or(false); + let extra_flags = if being_traced { CLONE_PTRACE } else { 0 }; + + // Use the direct syscall interface into `clone()`. This should replicate the parameters used + // for glibc `fork()`, except of course without calling the atfork handlers. + // One question is whether we're using the right set of flags. For instance, does suppressing + // `SIGCHLD` here make it easier for us to handle some conditions in the parent process? + let res = unsafe { + syscall( + SYS_clone, + (CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | SIGCHLD | extra_flags) as c_ulong, + std::ptr::null_mut::(), + &mut ptid as *mut pid_t, + &mut ctid as *mut pid_t, + 0 as c_ulong, + ) + }; + + if res == -1 { + return -1; + } + + ctid +} diff --git a/crashtracker/src/collector/crash_handler.rs b/crashtracker/src/collector/crash_handler.rs index 617bc38d1..170fd5b24 100644 --- a/crashtracker/src/collector/crash_handler.rs +++ b/crashtracker/src/collector/crash_handler.rs @@ -6,6 +6,7 @@ use super::emitters::emit_crashreport; use super::saguard::SaGuard; +use super::alt_fork::alt_fork; use crate::crash_info::CrashtrackerMetadata; use crate::shared::configuration::{CrashtrackerConfiguration, CrashtrackerReceiverConfig}; use anyhow::Context; @@ -43,16 +44,6 @@ use std::time::{Duration, Instant}; // - sigaction // - write -// The use of fork or vfork is influenced by the availability of the function in the host libc. -// Macos seems to have deprecated vfork. The reason to prefer vfork is to suppress atfork -// handlers. This is OK because macos is primarily a test platform, and we have system-level -// testing on Linux in various CI environments. -#[cfg(target_os = "macos")] -use libc::fork as vfork; - -#[cfg(target_os = "linux")] -use libc::vfork; - #[derive(Debug)] struct OldHandlers { pub sigbus: SigAction, @@ -257,30 +248,26 @@ static CONFIG: AtomicPtr<(CrashtrackerConfiguration, String)> = AtomicPtr::new(p static RECEIVER_CONFIG: AtomicPtr = AtomicPtr::new(ptr::null_mut()); static RECEIVER_ARGS: AtomicPtr = AtomicPtr::new(ptr::null_mut()); +fn make_unix_socket_pair() -> anyhow::Result<(RawFd, RawFd)> { + let (sock1, sock2) = socket::socketpair( + socket::AddressFamily::Unix, + socket::SockType::Stream, + None, + socket::SockFlag::empty(), + ).context("Failed to create Unix domain socket pair")?; + Ok((sock1.into_raw_fd(), sock2.into_raw_fd())) +} + fn make_receiver(config: &CrashtrackerReceiverConfig) -> anyhow::Result { let stderr = open_file_or_quiet(config.stderr_filename.as_deref())?; let stdout = open_file_or_quiet(config.stdout_filename.as_deref())?; // Create anonymous Unix domain socket pair for communication - let (uds_parent, uds_child) = socket::socketpair( - socket::AddressFamily::Unix, - socket::SockType::Stream, - None, - socket::SockFlag::empty(), - ) - .context("Failed to create Unix domain socket pair") - .map(|(a, b)| (a.into_raw_fd(), b.into_raw_fd()))?; - - // We need to spawn a process without calling atfork handlers, since this is happening inside - // of a signal handler. Moreover, preference is given to multiplatform-uniform solutions. - // Although `vfork()` is deprecated, the alternatives have limitations - // * `fork()` calls atfork handlers - // * There is no guarantee that `posix_spawn()` will not call `fork()` internally - // * `clone()`/`clone3()` are Linux-specific - // Accordingly, use `vfork()` for now - // NB -- on macos the underlying implementation here is actually `fork()`! See the top of this - // file for details. - match unsafe { vfork() } { + let (uds_parent, uds_child) = make_unix_socket_pair()?; + + // This implementation calls `clone()` directly on linux and the libc `fork()` on macos. + // This avoids calling the `atfork()` handlers on Linux, but not on mac. + match fork_process() { 0 => { // Child (noreturn) run_receiver_child(uds_parent, uds_child, stderr, stdout) @@ -300,6 +287,42 @@ fn make_receiver(config: &CrashtrackerReceiverConfig) -> anyhow::Result, + receiver: Receiver, +) -> anyhow::Result<()> { + let ppid = unsafe { libc::getppid() }; + let (uds_parent, uds_child) = make_unix_socket_pair()?; + + match fork_process() { + 0 => { + // Child + let crash_report = emit_crashreport( + config_str, + metadata_str, + signum, + faulting_address, + ppid, + )?; + Ok(()) + } + pid if pid > 0 => { + // Parent + Ok(()) + } + _ => { + // Error + Err(anyhow::anyhow!("Failed to fork collector process")) + } + } + Ok(()) +} + /// Updates the crashtracker metadata for this process /// Metadata is stored in a global variable and sent to the crashtracking /// receiver when a crash occurs. @@ -484,6 +507,7 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re None }; + // During the execution of this signal handler, block ALL other signals, especially because we // cannot control whether or not we run with SA_NODEFER (crashtracker might have been chained). // The especially problematic signals are SIGCHLD and SIGPIPE, which are possibly delivered due @@ -492,26 +516,21 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re // disrupted. let _guard = SaGuard::<2>::new(&[signal::SIGCHLD, signal::SIGPIPE])?; - // Even though we just set a guard, we'll have to undo part of it in the receiver process in - // order to let it reap its own children properly. We have to do this anyway, so do it here in - // order to ensure that _this_ process has the right flags (especially for SIGCHLD). + // Spawn the collector and receiver processes let receiver = make_receiver(receiver_config)?; - - // Creating this stream means the underlying RawFD is now owned by the stream, so - // we shouldn't close it manually. - let mut unix_stream = unsafe { UnixStream::from_raw_fd(receiver.receiver_uds) }; - - // Currently the emission of the crash report doesn't have a firm time guarantee - // In a future patch, the timeout parameter should be passed into the IPC loop here and - // checked periodically. - let res = emit_crashreport( + let collector = spawn_collector( &mut unix_stream, config, config_str, metadata_string, signum, faulting_address, - ); + receiver, + )?; + + // Creating this stream means the underlying RawFD is now owned by the stream, so + // we shouldn't close it manually. + let mut unix_stream = unsafe { UnixStream::from_raw_fd(receiver.receiver_uds) }; let _ = unix_stream.flush(); unix_stream diff --git a/crashtracker/src/collector/emitters.rs b/crashtracker/src/collector/emitters.rs index 1c4a3dfb9..f3cb31105 100644 --- a/crashtracker/src/collector/emitters.rs +++ b/crashtracker/src/collector/emitters.rs @@ -102,11 +102,12 @@ pub(crate) fn emit_crashreport( metadata_string: &str, signum: i32, faulting_address: Option, + ppid: i32, ) -> anyhow::Result<()> { emit_metadata(pipe, metadata_string)?; emit_config(pipe, config_str)?; emit_siginfo(pipe, signum, faulting_address)?; - emit_procinfo(pipe)?; + emit_procinfo(pipe, ppid)?; pipe.flush()?; emit_counters(pipe)?; pipe.flush()?; @@ -147,18 +148,17 @@ fn emit_metadata(w: &mut impl Write, metadata_str: &str) -> anyhow::Result<()> { Ok(()) } -fn emit_procinfo(w: &mut impl Write) -> anyhow::Result<()> { +fn emit_procinfo(w: &mut impl Write, ppid: i32) -> anyhow::Result<()> { writeln!(w, "{DD_CRASHTRACK_BEGIN_PROCINFO}")?; - let pid = nix::unistd::getpid(); - writeln!(w, "{{\"pid\": {pid} }}")?; + writeln!(w, "{{\"pid\": {ppid} }}")?; writeln!(w, "{DD_CRASHTRACK_END_PROCINFO}")?; Ok(()) } #[cfg(target_os = "linux")] -/// `/proc/self/maps` is very useful for debugging, and difficult to get from -/// the child process (permissions issues on Linux). Emit it directly onto the -/// pipe to get around this. +/// `/proc/self/maps` is very useful for debugging, but it isn't guaranteed to be +/// readable even by same-UID child processes. However, a fresh fork of a given process has a +/// memory layout identical to the parent, so we just read it here. fn emit_proc_self_maps(w: &mut impl Write) -> anyhow::Result<()> { emit_text_file(w, "/proc/self/maps")?; Ok(()) diff --git a/crashtracker/src/collector/mod.rs b/crashtracker/src/collector/mod.rs index 470f70d91..38b678994 100644 --- a/crashtracker/src/collector/mod.rs +++ b/crashtracker/src/collector/mod.rs @@ -7,6 +7,7 @@ mod crash_handler; mod emitters; mod saguard; mod spans; +mod alt_fork; pub use api::*; pub use counters::{begin_op, end_op, reset_counters, OpTypes}; From 225d4cd4baff80043ccecd5a58da5b7b91416794 Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:29:42 +0000 Subject: [PATCH 2/3] Initial --- bin_tests/src/bin/crashtracker_bin_test.rs | 1 + crashtracker-ffi/src/collector/datatypes.rs | 4 + crashtracker-ffi/src/collector/mod.rs | 45 +++ crashtracker/src/collector/alt_fork.rs | 1 - crashtracker/src/collector/api.rs | 8 +- crashtracker/src/collector/crash_handler.rs | 330 +++++++++++++------- crashtracker/src/collector/mod.rs | 1 - crashtracker/src/collector/saguard.rs | 86 ----- crashtracker/src/receiver.rs | 1 + crashtracker/src/shared/configuration.rs | 12 +- crashtracker/src/shared/constants.rs | 2 + 11 files changed, 276 insertions(+), 215 deletions(-) delete mode 100644 crashtracker/src/collector/saguard.rs diff --git a/bin_tests/src/bin/crashtracker_bin_test.rs b/bin_tests/src/bin/crashtracker_bin_test.rs index df141ce63..425240b8e 100644 --- a/bin_tests/src/bin/crashtracker_bin_test.rs +++ b/bin_tests/src/bin/crashtracker_bin_test.rs @@ -57,6 +57,7 @@ mod unix { resolve_frames: crashtracker::StacktraceCollection::WithoutSymbols, endpoint, timeout_ms: TEST_COLLECTOR_TIMEOUT_MS, + unix_socket_path: Some("".to_string()), }; let metadata = CrashtrackerMetadata { diff --git a/crashtracker-ffi/src/collector/datatypes.rs b/crashtracker-ffi/src/collector/datatypes.rs index ca5d39b07..c73fa354b 100644 --- a/crashtracker-ffi/src/collector/datatypes.rs +++ b/crashtracker-ffi/src/collector/datatypes.rs @@ -70,6 +70,8 @@ pub struct Config<'a> { /// This is given as a uint32_t, but the actual timeout needs to fit inside of an i32 (max /// 2^31-1). This is a limitation of the various interfaces used to guarantee the timeout. pub timeout_ms: u32, + /// Optional filename for a unix domain socket if the receiver is used asynchonously + pub optional_unix_socket_filename: CharSlice<'a>, } impl<'a> TryFrom> for datadog_crashtracker::CrashtrackerConfiguration { @@ -87,6 +89,7 @@ impl<'a> TryFrom> for datadog_crashtracker::CrashtrackerConfiguration let endpoint = value.endpoint.cloned(); let resolve_frames = value.resolve_frames; let timeout_ms = value.timeout_ms; + let unix_socket_path = option_from_char_slice(value.optional_unix_socket_filename)?; Self::new( additional_files, create_alt_stack, @@ -94,6 +97,7 @@ impl<'a> TryFrom> for datadog_crashtracker::CrashtrackerConfiguration endpoint, resolve_frames, timeout_ms, + unix_socket_path, ) } } diff --git a/crashtracker-ffi/src/collector/mod.rs b/crashtracker-ffi/src/collector/mod.rs index 7a8343b6e..830279694 100644 --- a/crashtracker-ffi/src/collector/mod.rs +++ b/crashtracker-ffi/src/collector/mod.rs @@ -8,6 +8,7 @@ use super::crash_info::Metadata; use crate::Result; use anyhow::Context; pub use counters::*; +use datadog_crashtracker::CrashtrackerReceiverConfig; pub use datatypes::*; pub use spans::*; @@ -94,3 +95,47 @@ pub unsafe extern "C" fn ddog_crasht_init( .context("ddog_crasht_init failed") .into() } + +#[no_mangle] +#[must_use] +/// Initialize the crash-tracking infrastructure without launching the receiver. +/// +/// # Preconditions +/// Requires `config` to be given with a `unix_socket_path`, which is normally optional. +/// # Safety +/// Crash-tracking functions are not reentrant. +/// No other crash-handler functions should be called concurrently. +/// # Atomicity +/// This function is not atomic. A crash during its execution may lead to +/// unexpected crash-handling behaviour. +pub unsafe extern "C" fn ddog_crasht_init_without_receiver( + config: Config, + metadata: Metadata, +) -> Result { + (|| { + let config: datadog_crashtracker::CrashtrackerConfiguration = config.try_into()?; + let metadata = metadata.try_into()?; + + // If the unix domain socket path is not set, then we throw an error--there's currently no + // other way to specify communication between an async receiver and a collector, so this + // isn't a valid configuration. + if config.unix_socket_path.is_none() { + return Err(anyhow::anyhow!("config.unix_socket_path must be set")); + } + if config.unix_socket_path.as_ref().unwrap().is_empty() { + return Err(anyhow::anyhow!("config.unix_socket_path can't be empty")); + } + + // Populate an empty receiver config + let receiver_config = CrashtrackerReceiverConfig { + args: vec![], + env: vec![], + path_to_receiver_binary: "".to_string(), + stderr_filename: None, + stdout_filename: None, + }; + datadog_crashtracker::init(config, receiver_config, metadata) + })() + .context("ddog_crasht_init failed") + .into() +} diff --git a/crashtracker/src/collector/alt_fork.rs b/crashtracker/src/collector/alt_fork.rs index e97d0f1be..b6cff5527 100644 --- a/crashtracker/src/collector/alt_fork.rs +++ b/crashtracker/src/collector/alt_fork.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 #![cfg(unix)] -use anyhow; use libc::pid_t; use std::fs::File; use std::io::{self, BufRead, BufReader}; diff --git a/crashtracker/src/collector/api.rs b/crashtracker/src/collector/api.rs index 984369bb2..94f05050e 100644 --- a/crashtracker/src/collector/api.rs +++ b/crashtracker/src/collector/api.rs @@ -79,8 +79,6 @@ pub fn init( receiver_config: CrashtrackerReceiverConfig, metadata: CrashtrackerMetadata, ) -> anyhow::Result<()> { - // Setup the receiver first, so that if there is a crash detected it has - // somewhere to go. update_metadata(metadata)?; update_config(config)?; configure_receiver(receiver_config); @@ -131,6 +129,7 @@ fn test_crash() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -187,6 +186,7 @@ fn test_altstack_paradox() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, ); // This is slightly over-tuned to the language of the error message, but it'd require some @@ -253,6 +253,7 @@ fn test_altstack_use_create() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -379,6 +380,7 @@ fn test_altstack_use_nocreate() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -505,6 +507,7 @@ fn test_altstack_nouse() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -666,6 +669,7 @@ fn test_waitall_nohang() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( diff --git a/crashtracker/src/collector/crash_handler.rs b/crashtracker/src/collector/crash_handler.rs index 170fd5b24..48f1c1c03 100644 --- a/crashtracker/src/collector/crash_handler.rs +++ b/crashtracker/src/collector/crash_handler.rs @@ -5,25 +5,24 @@ #![allow(deprecated)] use super::emitters::emit_crashreport; -use super::saguard::SaGuard; use super::alt_fork::alt_fork; use crate::crash_info::CrashtrackerMetadata; use crate::shared::configuration::{CrashtrackerConfiguration, CrashtrackerReceiverConfig}; +use crate::shared::constants::*; use anyhow::Context; use libc::{ c_void, execve, mmap, sigaltstack, siginfo_t, MAP_ANON, MAP_FAILED, MAP_PRIVATE, PROT_NONE, PROT_READ, PROT_WRITE, SIGSTKSZ, }; -use nix::poll::{poll, PollFd, PollFlags}; +use libc::{poll, pollfd, POLLHUP}; use nix::sys::signal::{self, SaFlags, SigAction, SigHandler, SigSet}; use nix::sys::socket; use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; -use nix::unistd::{close, Pid}; +use nix::unistd::Pid; use std::ffi::CString; use std::fs::{File, OpenOptions}; -use std::io::Write; use std::os::unix::{ - io::{BorrowedFd, FromRawFd, IntoRawFd, RawFd}, + io::{FromRawFd, IntoRawFd, RawFd}, net::UnixStream, }; use std::ptr; @@ -34,7 +33,7 @@ use std::time::{Duration, Instant}; // Note that this file makes use the following async-signal safe functions in a signal handler. // // - clock_gettime -// - close (although Rust may call `free` because we call the higher-level nix interface) +// - close // - dup2 // - fork (but specifically only because it does so without calling atfork handlers) // - kill @@ -50,9 +49,11 @@ struct OldHandlers { pub sigsegv: SigAction, } -struct Receiver { - receiver_uds: RawFd, - receiver_pid: i32, +#[derive(Clone, Debug)] +struct WatchedProcess { + fd: RawFd, + pid: libc::pid_t, + oneshot: bool, } // The args_cstrings and env_vars_strings fields are just storage. Even though they're @@ -166,7 +167,7 @@ fn reap_child_non_blocking(pid: Pid, timeout_ms: u32) -> anyhow::Result { } /// Wrapper around the child process that will run the crash receiver -fn run_receiver_child(uds_parent: RawFd, uds_child: RawFd, stderr: RawFd, stdout: RawFd) -> ! { +fn run_receiver_child(uds_child: RawFd, stderr: RawFd, stdout: RawFd) -> ! { // File descriptor management unsafe { let _ = libc::dup2(uds_child, 0); @@ -175,10 +176,9 @@ fn run_receiver_child(uds_parent: RawFd, uds_child: RawFd, stderr: RawFd, stdout } // Close unused file descriptors - let _ = close(uds_parent); - let _ = close(uds_child); - let _ = close(stderr); - let _ = close(stdout); + let _ = unsafe { libc::close(uds_child) }; + let _ = unsafe { libc::close(stderr) }; + let _ = unsafe { libc::close(stdout) }; // We've already prepared the arguments and environment variable // If we've reached this point, it means we've prepared the arguments and environment variables @@ -217,21 +217,89 @@ fn run_receiver_child(uds_parent: RawFd, uds_child: RawFd, stderr: RawFd, stdout } } +fn run_collector_child( + config: &CrashtrackerConfiguration, + config_str: &str, + metadata_str: &str, + signum: i32, + faulting_address: Option, + receiver: &WatchedProcess, + ppid: libc::pid_t, +) -> ! { + // The collector process currently runs without access to stdio. There are two ways to resolve + // this: + // - Reuse the stdio files used for the receiver + // + con: we don't controll flushes to stdio, so the writes from the two processes may + // interleave parts of a single message + // - Create two new stdio files for the collector + // + con: that's _another_ two options to fill out in the config + let _ = unsafe { libc::close(0) }; + let _ = unsafe { libc::close(1) }; + let _ = unsafe { libc::close(2) }; + + // Before we start reading or writing to the socket, we need to disable SIGPIPE because we + // don't control the emission implementation enough to send MSG_NOSIGNAL. + // NB - collector is running in its own process, so this doesn't affect the watchdog process + let _ = unsafe { + signal::sigaction( + signal::SIGPIPE, + &SigAction::new( + SigHandler::SigIgn, + SaFlags::empty(), + SigSet::empty()), + ) + }; + + // We're ready to emit the crashreport + let mut unix_stream = unsafe { UnixStream::from_raw_fd(receiver.fd) }; + + let report = emit_crashreport( + &mut unix_stream, + config, + config_str, + metadata_str, + signum, + faulting_address, + ppid, + ); + if let Err(e) = report { + eprintln!("Failed to flush crash report: {e}"); + unsafe { libc::_exit(-1) }; + } + + // If we reach this point, then we exit. We're done. + // Note that since we're a forked process, we call `_exiit` in favor of: + // - `exit()`, because calling the atexit handlers might mutate shared state + // - `abort()`, because raising SIGABRT might cause other problems + unsafe { libc::_exit(0) }; +} + fn wait_for_pollhup(target_fd: RawFd, timeout_ms: i32) -> anyhow::Result { - // Need to convert the RawFd into a BorrowedFd to satisfy the PollFd prototype - let target_fd = unsafe { BorrowedFd::borrow_raw(target_fd) }; - let poll_fd = PollFd::new(&target_fd, PollFlags::POLLHUP); - - match poll(&mut [poll_fd], timeout_ms)? { - -1 => Err(anyhow::anyhow!("poll failed")), - 0 => Ok(false), - _ => match poll_fd - .revents() - .ok_or_else(|| anyhow::anyhow!("No revents found"))? - { - revents if revents.contains(PollFlags::POLLHUP) => Ok(true), - _ => Err(anyhow::anyhow!("poll returned unexpected result")), - }, + // NB - This implementation uses the libc interface to `poll`, since the `nix` interface has + // some ownership semantics which can apparently panic in certain cases. + let mut poll_fds = [pollfd { + fd: target_fd, + events: POLLHUP, + revents: 0, + }]; + + match unsafe { poll(poll_fds.as_mut_ptr(), 1, timeout_ms) } { + -1 => Err(anyhow::anyhow!( + "poll failed with errno: {}", + std::io::Error::last_os_error() + )), + 0 => Ok(false), // Timeout occurred + _ => { + let revents = poll_fds[0].revents; + if revents & POLLHUP != 0 { + Ok(true) // POLLHUP detected + } else { + Err(anyhow::anyhow!( + "poll returned unexpected result: revents = {}", + revents + )) + } + } } } @@ -258,26 +326,57 @@ fn make_unix_socket_pair() -> anyhow::Result<(RawFd, RawFd)> { Ok((sock1.into_raw_fd(), sock2.into_raw_fd())) } -fn make_receiver(config: &CrashtrackerReceiverConfig) -> anyhow::Result { - let stderr = open_file_or_quiet(config.stderr_filename.as_deref())?; - let stdout = open_file_or_quiet(config.stdout_filename.as_deref())?; +fn receiver_from_socket(unix_socket_path: Option) -> anyhow::Result { + let unix_socket_path = unix_socket_path.unwrap_or_default(); + if unix_socket_path.is_empty() { + return Err(anyhow::anyhow!("No receiver path provided")); + } + #[cfg(target_os = "linux")] + let unix_stream = if unix_socket_path.starts_with(['.', '/']) { + UnixStream::connect(unix_socket_path)? + } else { + use std::os::linux::net::SocketAddrExt; + let addr = std::os::unix::net::SocketAddr::from_abstract_name(unix_socket_path)?; + UnixStream::connect_addr(&addr)? + }; + + #[cfg(not(target_os = "linux"))] + let unix_stream = UnixStream::connect(unix_socket_path)?; + + let fd = unix_stream.into_raw_fd(); + Ok(WatchedProcess { + fd, + pid: 0, + oneshot: false, + }) +} + +fn receiver_from_config( + config: &CrashtrackerReceiverConfig, +) -> anyhow::Result { // Create anonymous Unix domain socket pair for communication let (uds_parent, uds_child) = make_unix_socket_pair()?; + // Create stdio file descriptors + let stderr = open_file_or_quiet(config.stderr_filename.as_deref())?; + let stdout = open_file_or_quiet(config.stdout_filename.as_deref())?; + // This implementation calls `clone()` directly on linux and the libc `fork()` on macos. // This avoids calling the `atfork()` handlers on Linux, but not on mac. - match fork_process() { + match alt_fork() { 0 => { // Child (noreturn) - run_receiver_child(uds_parent, uds_child, stderr, stdout) + let _ = unsafe { libc::close(uds_parent) }; // close parent end in child + run_receiver_child(uds_child, stderr, stdout); } pid if pid > 0 => { // Parent - let _ = close(uds_child); - Ok(Receiver { - receiver_uds: uds_parent, - receiver_pid: pid, + let _ = unsafe { libc::close(uds_child) }; // close child end in parent + Ok(WatchedProcess { + fd: uds_parent, + pid, + oneshot: true, }) } _ => { @@ -287,40 +386,50 @@ fn make_receiver(config: &CrashtrackerReceiverConfig) -> anyhow::Result, +) -> anyhow::Result { + // Try to create a receiver from the socket path, fallback to config on failure + receiver_from_socket(unix_socket_path) + .or_else(|_| {receiver_from_config(config)}) +} + fn make_collector( - unix_stream: &mut UnixStream, config: &CrashtrackerConfiguration, config_str: &str, metadata_str: &str, signum: i32, faulting_address: Option, - receiver: Receiver, -) -> anyhow::Result<()> { + receiver: &WatchedProcess, +) -> anyhow::Result { let ppid = unsafe { libc::getppid() }; - let (uds_parent, uds_child) = make_unix_socket_pair()?; - match fork_process() { + match alt_fork() { 0 => { - // Child - let crash_report = emit_crashreport( + // Child (does not exit from this function) + run_collector_child( + config, config_str, metadata_str, signum, faulting_address, + receiver, ppid, - )?; - Ok(()) + ); } pid if pid > 0 => { - // Parent - Ok(()) + Ok(WatchedProcess { + fd: receiver.fd, // TODO - Is this what we want here? + pid, + oneshot: true, + }) } _ => { // Error Err(anyhow::anyhow!("Failed to fork collector process")) } } - Ok(()) } /// Updates the crashtracker metadata for this process @@ -457,13 +566,35 @@ extern "C" fn handle_posix_sigaction(signum: i32, sig_info: *mut siginfo_t, ucon }; } +fn finish_watched_process(receiver: WatchedProcess, start_time: Instant, timeout_ms: u32) { + let pollhup_allowed_ms = timeout_ms + .saturating_sub(start_time.elapsed().as_millis() as u32) + .min(i32::MAX as u32) as i32; + let _ = wait_for_pollhup(receiver.fd, pollhup_allowed_ms); + + // We cleanup oneshot receivers now. + if receiver.oneshot { + // + unsafe { + libc::kill(receiver.pid, libc::SIGKILL); + } + + // If we have less than the minimum amount of time, give ourselves a few scheduler slices + // worth of headroom to help guarantee that we don't leak a zombie process. + let reaping_allowed_ms = std::cmp::min( + timeout_ms.saturating_sub(start_time.elapsed().as_millis() as u32), + DD_CRASHTRACK_MINIMUM_REAP_TIME_MS, + ); + let receiver_pid_as_pid = Pid::from_raw(receiver.pid); + + let _ = reap_child_non_blocking(receiver_pid_as_pid, reaping_allowed_ms); + } +} + fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Result<()> { - // If this is a SIGSEGV signal, it could be called due to a stack overflow. In that case, since - // this signal allocates to the stack and cannot guarantee it is running without SA_NODEFER, it - // is possible that we will re-emit the signal. Contemporary unices handle this just fine (no - // deadlock), but it does mean we will fail. Currently this situation is not detected. - // In general, handlers do not know their own stack usage requirements in advance and are - // incapable of guaranteeing that they will not overflow the stack. + // NB - stack overflow may result in a segfault, which will cause the signal to be re-emitted. + // The guard below combined with our signal disarming logic in the caller should prevent + // that situation from causing a deadlock. // One-time guard to guarantee at most one crash per process static NUM_TIMES_CALLED: AtomicU64 = AtomicU64::new(0); @@ -473,31 +604,19 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re return Ok(()); } - // Leak config and metadata to avoid calling `drop` during a crash - // Note that these operations also replace the global states. When the one-time guard is - // passed, all global configuration and metadata becomes invalid. - let config = CONFIG.swap(ptr::null_mut(), SeqCst); - anyhow::ensure!(!config.is_null(), "No crashtracking config"); - let (config, config_str) = unsafe { config.as_ref().context("No crashtracking receiver")? }; + // Get references to the configs. Rather than copying the structs, keep them as references. + // NB: this clears the global variables, so they can't be used again. + let config_ptr = CONFIG.swap(ptr::null_mut(), SeqCst); + anyhow::ensure!(!config_ptr.is_null(), "No crashtracking config"); + let (config, config_str) = unsafe { &*config_ptr }; let metadata_ptr = METADATA.swap(ptr::null_mut(), SeqCst); anyhow::ensure!(!metadata_ptr.is_null(), "No crashtracking metadata"); - let (_metadata, metadata_string) = unsafe { metadata_ptr.as_ref().context("metadata ptr")? }; + let (_metadata, metadata_string) = unsafe { &*metadata_ptr }; - let receiver_config = RECEIVER_CONFIG.swap(ptr::null_mut(), SeqCst); - anyhow::ensure!( - !receiver_config.is_null(), - "No crashtracking receiver config" - ); - let receiver_config = unsafe { receiver_config.as_ref().context("receiver config")? }; - - // Since we've gotten this far, we're going to start working on the crash report. This - // operation needs to be mindful of the total walltime elapsed during handling. This isn't only - // to prevent hanging, but also because services capable of restarting after a crash experience - // crashes as probabalistic queue-holding events, and so crash handling represents dead time - // which makes the overall service increasingly incompetent at handling load. - let timeout_ms = config.timeout_ms; - let start_time = Instant::now(); // This is the time at which the signal was received + let receiver_config = RECEIVER_CONFIG.load(SeqCst); + anyhow::ensure!(!receiver_config.is_null(), "No receiver config"); + let receiver_config = unsafe { &*receiver_config }; // Derive the faulting address from `sig_info` let faulting_address: Option = @@ -507,58 +626,31 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re None }; + // Keep to a strict deadline, so set timeouts early. + let timeout_ms = config.timeout_ms; + let start_time = Instant::now(); // This is the time at which the signal was received + + // Creating the receiver will either spawn a new process or merely attach to an existing async + // receiver + let receiver = make_receiver(receiver_config, config.unix_socket_path.clone())?; - // During the execution of this signal handler, block ALL other signals, especially because we - // cannot control whether or not we run with SA_NODEFER (crashtracker might have been chained). - // The especially problematic signals are SIGCHLD and SIGPIPE, which are possibly delivered due - // to the execution of this handler. - // SaGuard ensures that signals are restored to their original state even if control flow is - // disrupted. - let _guard = SaGuard::<2>::new(&[signal::SIGCHLD, signal::SIGPIPE])?; - - // Spawn the collector and receiver processes - let receiver = make_receiver(receiver_config)?; - let collector = spawn_collector( - &mut unix_stream, + // Create the collector. This will fork. + let collector = make_collector( config, config_str, metadata_string, signum, faulting_address, - receiver, + &receiver, )?; - // Creating this stream means the underlying RawFD is now owned by the stream, so - // we shouldn't close it manually. - let mut unix_stream = unsafe { UnixStream::from_raw_fd(receiver.receiver_uds) }; - - let _ = unix_stream.flush(); - unix_stream - .shutdown(std::net::Shutdown::Write) - .context("Could not shutdown writing on the stream")?; + // Now the watchdog (this process) can wait for the collector and receiver to finish. + // We wait on the collector first, since that gives the receiver a little bit of time + // to finish up if the collector had gotten stuck. + finish_watched_process(collector, start_time, timeout_ms); + finish_watched_process(receiver, start_time, timeout_ms); - // We have to wait for the receiver process and reap its exit status. - let pollhup_allowed_ms = timeout_ms - .saturating_sub(start_time.elapsed().as_millis() as u32) - .min(i32::MAX as u32) as i32; - let _ = wait_for_pollhup(receiver.receiver_uds, pollhup_allowed_ms) - .context("Failed to wait for pollhup")?; - - // Either the receiver is done, it timed out, or something failed. - // In any case, can't guarantee that the receiver will exit. - // SIGKILL will ensure that the process ends eventually, but there's - // no bound on that time. - // We emit SIGKILL and try to reap its exit status for the remaining time, then just give - // up. - unsafe { - libc::kill(receiver.receiver_pid, libc::SIGKILL); - } - let receiver_pid_as_pid = Pid::from_raw(receiver.receiver_pid); - let reaping_allowed_ms = timeout_ms.saturating_sub(start_time.elapsed().as_millis() as u32); - let _ = reap_child_non_blocking(receiver_pid_as_pid, reaping_allowed_ms) - .context("Failed to reap receiver process")?; - - res + Ok(()) } /// Registers UNIX signal handlers to detect program crashes. diff --git a/crashtracker/src/collector/mod.rs b/crashtracker/src/collector/mod.rs index 38b678994..4a9ed0499 100644 --- a/crashtracker/src/collector/mod.rs +++ b/crashtracker/src/collector/mod.rs @@ -5,7 +5,6 @@ mod api; mod counters; mod crash_handler; mod emitters; -mod saguard; mod spans; mod alt_fork; diff --git a/crashtracker/src/collector/saguard.rs b/crashtracker/src/collector/saguard.rs deleted file mode 100644 index 38f6ec72a..000000000 --- a/crashtracker/src/collector/saguard.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 -use nix::sys::signal::{self, SaFlags, SigAction, SigHandler}; - -// Provides a lexically-scoped guard for signals -// During execution of the signal handler, it cannot be guaranteed that the signal is handled -// without SA_NODEFER, thus it also cannot be guaranteed that signals like SIGCHLD and SIGPIPE will -// _not_ be emitted during this handler as a result of the handler itself. At the same time, it -// isn't known whether it is safe to merely block all signals, as the user's own handler will be -// given the chance to execute after ours. Thus, we need to prevent the emission of signals we -// might create (and cannot be created during a signal handler except by our own execution) and -// defer any other signals. -// To put it another way, it is conceivable that the crash handling code will emit SIGCHLD or -// SIGPIPE, and instead of risking responding to those signals, it needs to suppress them. On the -// other hand, it can't just "block" (`sigprocmask()`) those signals because this will only defer -// them to the next handler. -pub struct SaGuard { - old_sigactions: [(signal::Signal, signal::SigAction); N], - old_sigmask: signal::SigSet, -} - -impl SaGuard { - pub fn new(signals: &[signal::Signal; N]) -> anyhow::Result { - // Create an empty signal set for suppressing signals - let mut suppressed_signals = signal::SigSet::empty(); - for signal in signals { - suppressed_signals.add(*signal); - } - - // Save the current signal mask and block all signals except the suppressed ones - let mut old_sigmask = signal::SigSet::empty(); - signal::sigprocmask( - signal::SigmaskHow::SIG_BLOCK, - Some(&suppressed_signals), - Some(&mut old_sigmask), - )?; - - // Initialize array for saving old signal actions - let mut old_sigactions = [( - signal::Signal::SIGINT, - SigAction::new( - SigHandler::SigDfl, - SaFlags::empty(), - signal::SigSet::empty(), - ), - ); N]; - - // Set SIG_IGN for the specified signals and save old handlers - for (i, &signal) in signals.iter().enumerate() { - let old_sigaction = unsafe { - signal::sigaction( - signal, - &SigAction::new( - SigHandler::SigIgn, - SaFlags::empty(), - signal::SigSet::empty(), - ), - )? - }; - old_sigactions[i] = (signal, old_sigaction); - } - - Ok(Self { - old_sigactions, - old_sigmask, - }) - } -} - -impl Drop for SaGuard { - fn drop(&mut self) { - // Restore the original signal actions - for &(signal, old_sigaction) in &self.old_sigactions { - unsafe { - let _ = signal::sigaction(signal, &old_sigaction); - } - } - - // Restore the original signal mask - let _ = signal::sigprocmask( - signal::SigmaskHow::SIG_SETMASK, - Some(&self.old_sigmask), - None, - ); - } -} diff --git a/crashtracker/src/receiver.rs b/crashtracker/src/receiver.rs index 5b0ab902c..aaef56de7 100644 --- a/crashtracker/src/receiver.rs +++ b/crashtracker/src/receiver.rs @@ -414,6 +414,7 @@ mod tests { None, StacktraceCollection::Disabled, 3000, + None, )?)?, ) .await?; diff --git a/crashtracker/src/shared/configuration.rs b/crashtracker/src/shared/configuration.rs index ccd4f8e1c..6f92932de 100644 --- a/crashtracker/src/shared/configuration.rs +++ b/crashtracker/src/shared/configuration.rs @@ -28,6 +28,7 @@ pub struct CrashtrackerConfiguration { pub endpoint: Option, pub resolve_frames: StacktraceCollection, pub timeout_ms: u32, + pub unix_socket_path: Option, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -48,12 +49,7 @@ impl CrashtrackerReceiverConfig { stdout_filename: Option, ) -> anyhow::Result { anyhow::ensure!( - !path_to_receiver_binary.is_empty(), - "Expected a receiver binary" - ); - anyhow::ensure!( - stderr_filename.is_none() && stdout_filename.is_none() - || stderr_filename != stdout_filename, + stderr_filename.is_some() && stderr_filename != stdout_filename, "Can't give the same filename for stderr and stdout, they will conflict with each other" ); @@ -76,6 +72,7 @@ impl CrashtrackerConfiguration { endpoint: Option, resolve_frames: StacktraceCollection, timeout_ms: u32, + unix_socket_path: Option, ) -> anyhow::Result { // Requesting to create, but not use, the altstack is considered paradoxical. anyhow::ensure!( @@ -89,6 +86,8 @@ impl CrashtrackerConfiguration { } else { timeout_ms }; + // Note: don't check the receiver socket upfront, since a configuration can be interned + // before the receiver is started when using an async-receiver. Ok(Self { additional_files, create_alt_stack, @@ -96,6 +95,7 @@ impl CrashtrackerConfiguration { endpoint, resolve_frames, timeout_ms, + unix_socket_path, }) } } diff --git a/crashtracker/src/shared/constants.rs b/crashtracker/src/shared/constants.rs index 60b977f0c..55206033c 100644 --- a/crashtracker/src/shared/constants.rs +++ b/crashtracker/src/shared/constants.rs @@ -21,3 +21,5 @@ pub const DD_CRASHTRACK_END_SPAN_IDS: &str = "DD_CRASHTRACK_END_SPAN_IDS"; pub const DD_CRASHTRACK_END_STACKTRACE: &str = "DD_CRASHTRACK_END_STACKTRACE"; pub const DD_CRASHTRACK_END_TRACE_IDS: &str = "DD_CRASHTRACK_END_TRACE_IDS"; pub const DD_CRASHTRACK_DEFAULT_TIMEOUT_MS: u32 = 5_000; +pub const DD_CRASHTRACK_MINIMUM_REAP_TIME_MS: u32 = 160; // 4ms per sched slice, give ~4x10 slices + // for safety From fe8a73eebb6ffcaa06a4f74bb5a2160f2cdf51e9 Mon Sep 17 00:00:00 2001 From: sanchda <838104+sanchda@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:49:15 +0000 Subject: [PATCH 3/3] Clarify comment --- crashtracker/src/collector/crash_handler.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crashtracker/src/collector/crash_handler.rs b/crashtracker/src/collector/crash_handler.rs index 8b625cdda..a25b82742 100644 --- a/crashtracker/src/collector/crash_handler.rs +++ b/crashtracker/src/collector/crash_handler.rs @@ -33,9 +33,10 @@ use std::time::{Duration, Instant}; // Note that this file makes use the following async-signal safe functions in a signal handler. // // - clock_gettime +// - clone (only ony Linux, guaranteed not to call `atfork()` handlers) // - close // - dup2 -// - fork (but specifically only because it does so without calling atfork handlers) +// - fork (only on MacOS--no guarantee that `atfork()` handlers are suppressed) // - kill // - poll // - raise