Skip to content

Commit

Permalink
Don't validate that endpoint is not empty because it doesn't need to …
Browse files Browse the repository at this point in the history
…be present (#426)

* Don't validate that endpoint is not empty because it doesn't need to be present

* Fix parsing of windows pipe urls

* implement correct agent host detection from env for telemetry
  • Loading branch information
paullegranddc authored May 24, 2024
1 parent 0d5e0d1 commit f63cff5
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 160 deletions.
19 changes: 13 additions & 6 deletions bin_tests/src/bin/crashtracker_bin_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ mod unix {

pub fn main() -> anyhow::Result<()> {
let mut args = env::args().skip(1);
let output_filename = args.next().context("Unexpected number of arguments")?;
let output_url = args.next().context("Unexpected number of arguments")?;
let receiver_binary = args.next().context("Unexpected number of arguments")?;
let stderr_filename = args.next().context("Unexpected number of arguments")?;
let stdout_filename = args.next().context("Unexpected number of arguments")?;
let timeout = Duration::from_secs(30);

let endpoint = if output_url.is_empty() {
None
} else {
Some(ddcommon::Endpoint {
url: ddcommon::parse_uri(&output_url)?,
api_key: None,
})
};

crashtracker::init(
CrashtrackerConfiguration {
additional_files: vec![],
create_alt_stack: true,
endpoint: Some(ddcommon::Endpoint {
url: ddcommon::parse_uri(&format!("file://{}", output_filename))?,
api_key: None,
}),
resolve_frames: crashtracker::StacktraceCollection::WithoutSymbols,
endpoint,
timeout,
},
Some(CrashtrackerReceiverConfig::new(
vec![],
vec![],
env::vars().collect(),
receiver_binary,
Some(stderr_filename),
Some(stdout_filename),
Expand Down
135 changes: 103 additions & 32 deletions bin_tests/tests/crashtracker_bin_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#![cfg(unix)]

use std::collections::HashMap;
use std::io::{Read, Write};
use std::path::Path;
use std::process;
use std::{fs, path::PathBuf};
Expand All @@ -23,32 +25,15 @@ fn test_crash_tracking_bin_release() {
}

fn test_crash_tracking_bin(crash_tracking_receiver_profile: BuildProfile) {
let crashtracker_bin = ArtifactsBuild {
name: "crashtracker_bin_test".to_owned(),
build_profile: crash_tracking_receiver_profile,
artifact_type: ArtifactType::Bin,
triple_target: None,
};
let crashtracker_receiver = ArtifactsBuild {
name: "crashtracker_receiver".to_owned(),
build_profile: crash_tracking_receiver_profile,
artifact_type: ArtifactType::Bin,
triple_target: None,
};
let artifacts = build_artifacts(&[&crashtracker_receiver, &crashtracker_bin]).unwrap();

let tmpdir = tempfile::TempDir::new().unwrap();

let crash_profile_path = extend_path(tmpdir.path(), "crash");
let crash_telemetry_path = extend_path(tmpdir.path(), "crash.telemetry");
let stdout_path = extend_path(tmpdir.path(), "out.stdout");
let stderr_path = extend_path(tmpdir.path(), "out.stderr");
let (crashtracker_bin, crashtracker_receiver) =
setup_crashtracking_crates(crash_tracking_receiver_profile);
let fixtures = setup_test_fixtures(&[&crashtracker_receiver, &crashtracker_bin]);

let mut p = process::Command::new(&artifacts[&crashtracker_bin])
.arg(&crash_profile_path)
.arg(artifacts[&crashtracker_receiver].as_os_str())
.arg(&stderr_path)
.arg(&stdout_path)
let mut p = process::Command::new(&fixtures.artifacts[&crashtracker_bin])
.arg(format!("file://{}", fixtures.crash_profile_path.display()))
.arg(fixtures.artifacts[&crashtracker_receiver].as_os_str())
.arg(&fixtures.stderr_path)
.arg(&fixtures.stdout_path)
.spawn()
.unwrap();
let exit_status = bin_tests::timeit!("exit after signal", {
Expand All @@ -61,10 +46,10 @@ fn test_crash_tracking_bin(crash_tracking_receiver_profile: BuildProfile) {
// running before the receiver has a chance to send the report.
std::thread::sleep(std::time::Duration::from_millis(100));

let stderr = fs::read(stderr_path)
let stderr = fs::read(fixtures.stderr_path)
.context("reading crashtracker stderr")
.unwrap();
let stdout = fs::read(stdout_path)
let stdout = fs::read(fixtures.stdout_path)
.context("reading crashtracker stdout")
.unwrap();
assert!(matches!(
Expand All @@ -74,7 +59,7 @@ fn test_crash_tracking_bin(crash_tracking_receiver_profile: BuildProfile) {
assert_eq!(Ok(""), String::from_utf8(stdout).as_deref());

// Check the crash data
let crash_profile = fs::read(crash_profile_path)
let crash_profile = fs::read(fixtures.crash_profile_path)
.context("reading crashtracker profiling payload")
.unwrap();
let crash_payload = serde_json::from_slice::<serde_json::Value>(&crash_profile)
Expand All @@ -97,12 +82,17 @@ fn test_crash_tracking_bin(crash_tracking_receiver_profile: BuildProfile) {
crash_payload["siginfo"]
);

let crash_telemetry = fs::read(crash_telemetry_path)
let crash_telemetry = fs::read(fixtures.crash_telemetry_path)
.context("reading crashtracker telemetry payload")
.unwrap();
let telemetry_payload = serde_json::from_slice::<serde_json::Value>(&crash_telemetry)
.context("deserializing crashtracker telemetry payload to json")
.unwrap();
assert_telemetry_message(&crash_telemetry);
}

fn assert_telemetry_message(crash_telemetry: &[u8]) {
let telemetry_payload: serde_json::Value =
serde_json::from_slice::<serde_json::Value>(crash_telemetry)
.context("deserializing crashtracker telemetry payload to json")
.unwrap();
assert_eq!(telemetry_payload["request_type"], "logs");
assert_eq!(
serde_json::json!({
Expand Down Expand Up @@ -136,6 +126,87 @@ fn test_crash_tracking_bin(crash_tracking_receiver_profile: BuildProfile) {
assert_eq!(telemetry_payload["payload"][0]["is_sensitive"], true);
}

#[test]
#[cfg_attr(miri, ignore)]
#[cfg(unix)]
fn crash_tracking_empty_endpoint() {
use std::os::unix::net::UnixListener;

let (crashtracker_bin, crashtracker_receiver) = setup_crashtracking_crates(BuildProfile::Debug);
let fixtures = setup_test_fixtures(&[&crashtracker_receiver, &crashtracker_bin]);

let socket_path = extend_path(fixtures.tmpdir.path(), "trace_agent.socket");
let listener = UnixListener::bind(&socket_path).unwrap();

process::Command::new(&fixtures.artifacts[&crashtracker_bin])
// empty url, endpoint will be set to none
.arg("")
.arg(fixtures.artifacts[&crashtracker_receiver].as_os_str())
.arg(&fixtures.stderr_path)
.arg(&fixtures.stdout_path)
.env(
"DD_TRACE_AGENT_URL",
format!("unix://{}", socket_path.display()),
)
.spawn()
.unwrap();

let (mut stream, _) = listener.accept().unwrap();
let mut out = vec![0; 65536];
let read = stream.read(&mut out).unwrap();

stream
.write_all(b"HTTP/1.1 404\r\nContent-Length: 0\r\n\r\n")
.unwrap();
let resp = String::from_utf8_lossy(&out[..read]);
let pos = resp.find("\r\n\r\n").unwrap();
let body = &resp[pos + 4..];
assert_telemetry_message(body.as_bytes());
}

struct TestFixtures<'a> {
tmpdir: tempfile::TempDir,
crash_profile_path: PathBuf,
crash_telemetry_path: PathBuf,
stdout_path: PathBuf,
stderr_path: PathBuf,

artifacts: HashMap<&'a ArtifactsBuild, PathBuf>,
}

fn setup_test_fixtures<'a>(crates: &[&'a ArtifactsBuild]) -> TestFixtures<'a> {
let artifacts = build_artifacts(crates).unwrap();

let tmpdir = tempfile::TempDir::new().unwrap();
TestFixtures {
crash_profile_path: extend_path(tmpdir.path(), "crash"),
crash_telemetry_path: extend_path(tmpdir.path(), "crash.telemetry"),
stdout_path: extend_path(tmpdir.path(), "out.stdout"),
stderr_path: extend_path(tmpdir.path(), "out.stderr"),

artifacts,
tmpdir,
}
}

fn setup_crashtracking_crates(
crash_tracking_receiver_profile: BuildProfile,
) -> (ArtifactsBuild, ArtifactsBuild) {
let crashtracker_bin = ArtifactsBuild {
name: "crashtracker_bin_test".to_owned(),
build_profile: crash_tracking_receiver_profile,
artifact_type: ArtifactType::Bin,
triple_target: None,
};
let crashtracker_receiver = ArtifactsBuild {
name: "crashtracker_receiver".to_owned(),
build_profile: crash_tracking_receiver_profile,
artifact_type: ArtifactType::Bin,
triple_target: None,
};
(crashtracker_bin, crashtracker_receiver)
}

fn extend_path<T: AsRef<Path>>(parent: &Path, path: T) -> PathBuf {
let mut parent = parent.to_path_buf();
parent.push(path);
Expand Down
4 changes: 2 additions & 2 deletions crashtracker/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TelemetryCrashUploader {

// ignore result because what are we going to do?
let _ = if endpoint.url.scheme_str() == Some("file") {
cfg.set_url(&format!("file://{}.telemetry", endpoint.url.path()))
cfg.set_host_from_url(&format!("file://{}.telemetry", endpoint.url.path()))
} else {
cfg.set_endpoint(endpoint.clone())
};
Expand Down Expand Up @@ -261,7 +261,7 @@ mod tests {
let mut t = new_test_uploader();

t.cfg
.set_url(&format!("file://{}", output_filename.to_str().unwrap()))
.set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap()))
.unwrap();

let mut counters = HashMap::new();
Expand Down
1 change: 0 additions & 1 deletion ddcommon/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::task::{Context, Poll};
#[cfg(unix)]
pub mod uds;

#[cfg(windows)]
pub mod named_pipe;

pub mod errors;
Expand Down
71 changes: 34 additions & 37 deletions ddcommon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,48 +79,45 @@ where
builder.build().map_err(Error::custom)
}

// TODO: we should properly handle malformed urls
// * For windows and unix schemes:
// * For compatibility reasons with existing implementation this parser stores the encoded path
// in authority section as there is no existing standard
// [see](https://github.com/whatwg/url/issues/577) that covers this. We need to pick one hack
// or another
// * For file scheme implementation will simply backfill missing authority section
/// TODO: we should properly handle malformed urls
/// * For windows and unix schemes:
/// * For compatibility reasons with existing implementation this parser stores the encoded path
/// in authority section as there is no existing standard [see](https://github.com/whatwg/url/issues/577)
/// that covers this. We need to pick one hack or another
/// * For windows, interprets everything after windows: as path
/// * For unix, interprets everything after unix:// as path
/// * For file scheme implementation will simply backfill missing authority section
pub fn parse_uri(uri: &str) -> anyhow::Result<hyper::Uri> {
let scheme_pos = if let Some(scheme_pos) = uri.find("://") {
scheme_pos
if let Some(path) = uri.strip_prefix("unix://") {
encode_uri_path_in_authority("unix", path)
} else if let Some(path) = uri.strip_prefix("windows:") {
encode_uri_path_in_authority("windows", path)
} else if let Some(path) = uri.strip_prefix("file://") {
let mut parts = uri::Parts::default();
parts.scheme = uri::Scheme::from_str("file").ok();
parts.authority = Some(uri::Authority::from_static("localhost"));

// TODO: handle edge cases like improperly escaped url strings
//
// this is eventually user configurable field
// anything we can do to ensure invalid input becomes valid - will improve usability
parts.path_and_query = uri::PathAndQuery::from_str(path).ok();

Ok(hyper::Uri::from_parts(parts)?)
} else {
return Ok(hyper::Uri::from_str(uri)?);
};
Ok(hyper::Uri::from_str(uri)?)
}
}

let scheme = &uri[0..scheme_pos];
let rest = &uri[scheme_pos + 3..];
match scheme {
"windows" | "unix" => {
let mut parts = uri::Parts::default();
parts.scheme = uri::Scheme::from_str(scheme).ok();
fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<hyper::Uri> {
let mut parts = uri::Parts::default();
parts.scheme = uri::Scheme::from_str(scheme).ok();

let path = hex::encode(rest);
let path = hex::encode(path);

parts.authority = uri::Authority::from_str(path.as_str()).ok();
parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
Ok(hyper::Uri::from_parts(parts)?)
}
"file" => {
let mut parts = uri::Parts::default();
parts.scheme = uri::Scheme::from_str(scheme).ok();
parts.authority = Some(uri::Authority::from_static("localhost"));

// TODO: handle edge cases like improperly escaped url strings
//
// this is eventually user configurable field
// anything we can do to ensure invalid input becomes valid - will improve usability
parts.path_and_query = uri::PathAndQuery::from_str(rest).ok();

Ok(hyper::Uri::from_parts(parts)?)
}
_ => Ok(hyper::Uri::from_str(uri)?),
}
parts.authority = uri::Authority::from_str(path.as_str()).ok();
parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
Ok(hyper::Uri::from_parts(parts)?)
}

impl Endpoint {
Expand Down
Loading

0 comments on commit f63cff5

Please sign in to comment.