Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mitigate panic in profiler shutdown, bound timeout #1932

Merged
merged 1 commit into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,47 +1194,28 @@ mod tests {
#[test]
fn detect_uri_from_config_works() {
// expected
let endpoint = detect_uri_from_config(
None,
None,
None
);
let endpoint = detect_uri_from_config(None, None, None);
let expected = AgentEndpoint::default();
assert_eq!(endpoint, expected);

// ipv4 host
let endpoint = detect_uri_from_config(
None,
Some(Cow::Owned("127.0.0.1".to_owned())),
None
);
let endpoint = detect_uri_from_config(None, Some(Cow::Owned("127.0.0.1".to_owned())), None);
let expected = AgentEndpoint::Uri(Uri::from_static("http://127.0.0.1:8126"));
assert_eq!(endpoint, expected);

// ipv6 host
let endpoint = detect_uri_from_config(
None,
Some(Cow::Owned("::1".to_owned())),
None
);
let endpoint = detect_uri_from_config(None, Some(Cow::Owned("::1".to_owned())), None);
let expected = AgentEndpoint::Uri(Uri::from_static("http://[::1]:8126"));
assert_eq!(endpoint, expected);

// ipv6 host, custom port
let endpoint = detect_uri_from_config(
None,
Some(Cow::Owned("::1".to_owned())),
Some(9000),
);
let endpoint = detect_uri_from_config(None, Some(Cow::Owned("::1".to_owned())), Some(9000));
let expected = AgentEndpoint::Uri(Uri::from_static("http://[::1]:9000"));
assert_eq!(endpoint, expected);

// agent_url
let endpoint = detect_uri_from_config(
Some(Cow::Owned("http://[::1]:8126".to_owned())),
None,
None,
);
let endpoint =
detect_uri_from_config(Some(Cow::Owned("http://[::1]:8126".to_owned())), None, None);
let expected = AgentEndpoint::Uri(Uri::from_static("http://[::1]:8126"));
assert_eq!(endpoint, expected);

Expand Down
56 changes: 46 additions & 10 deletions profiling/src/profiling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crossbeam_channel::{select, Receiver, Sender, TrySendError};
use datadog_profiling::exporter::{Endpoint, File, Tag};
use datadog_profiling::profile;
use datadog_profiling::profile::api::{Function, Line, Location, Period, Sample};
use libc::sched_yield;
use log::{debug, info, trace, warn};
use std::borrow::{Borrow, Cow};
use std::collections::HashMap;
Expand Down Expand Up @@ -739,22 +740,57 @@ impl Profiler {
.try_send(ProfilerMessage::LocalRootSpanResource(message))
}

pub fn stop(self) {
// todo: what should be done when a thread panics?
debug!("Stopping profiler.");
let _ = self.message_sender.send(ProfilerMessage::Cancel);
if let Err(err) = self.time_collector_handle.join() {
std::panic::resume_unwind(err)
/// Waits for the handle to be finished. If finished, it will join the
/// handle. Otherwise, it will leak the handle.
/// # Panics
/// Panics if the thread being joined has panic'd.
fn join_timeout(handle: JoinHandle<()>, timeout: Duration, impact: &str) {
// After notifying the other threads, it's likely they'll need some
// time to respond adequately. Joining on the JoinHandle is supposed
// to be the correct way to do this, but we've observed this can
// panic:
// https://github.com/DataDog/dd-trace-php/issues/1919
// Thus far, we have not been able to reproduce it and address the
// root cause. So, for now, mitigate it instead with a busy loop.
let start = Instant::now();
while !handle.is_finished() {
unsafe { sched_yield() };
if start.elapsed() >= timeout {
let name = handle.thread().name().unwrap_or("{unknown}");
warn!("Timeout of {timeout:?} reached when joining thread '{name}'. {impact}");
return;
}
}

// Wait for the time_collector to join, since that will drop the sender
// half of the channel that the uploader is holding, allowing it to
// finish.
if let Err(err) = self.uploader_handle.join() {
if let Err(err) = handle.join() {
std::panic::resume_unwind(err)
}
}

pub fn stop(self) {
debug!("Stopping profiler.");
match self.message_sender.send(ProfilerMessage::Cancel) {
Err(err) => warn!("Failed to notify other threads of cancellation: {err}."),
Ok(_) => debug!("Notified other threads of cancellation."),
}

let timeout = Duration::from_secs(2);
Self::join_timeout(
self.time_collector_handle,
timeout,
"Recent samples may be lost.",
);

// Wait for the time_collector to join, since that will drop
// the sender half of the channel that the uploader is
// holding, allowing it to finish.
Self::join_timeout(
self.uploader_handle,
timeout,
"Recent samples are most likely lost.",
);
}

fn cpu_sub(now: cpu_time::ThreadTime, prev: cpu_time::ThreadTime) -> i64 {
let now = now.as_duration();
let prev = prev.as_duration();
Expand Down