diff --git a/Cargo.lock b/Cargo.lock index 2052ed83d..6019d1ae8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1453,6 +1453,7 @@ dependencies = [ "spawn_worker", "tarpc", "tempfile", + "tinybytes", "tokio", "tokio-serde", "tokio-util 0.6.10", @@ -1642,6 +1643,7 @@ dependencies = [ "spawn_worker", "sys-info", "tempfile", + "tinybytes", "tokio", "tokio-util 0.7.11", "tracing", @@ -1756,6 +1758,7 @@ dependencies = [ "criterion", "datadog-trace-normalization", "datadog-trace-protobuf", + "datadog-trace-utils", "ddcommon", "flate2", "futures", @@ -1771,6 +1774,7 @@ dependencies = [ "serde", "serde_json", "testcontainers", + "tinybytes", "tokio", ] @@ -4678,9 +4682,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.197" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09" dependencies = [ "serde_derive", ] @@ -4696,9 +4700,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.197" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", @@ -4707,11 +4711,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -5371,7 +5376,10 @@ dependencies = [ "once_cell", "pretty_assertions", "proptest", + "serde", + "serde_json", "test-case", + "tinybytes", ] [[package]] diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index 57b32ae6b..c6afa00e2 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -1,4 +1,4 @@ -root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-utils, ddcommon, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent, bin_tests +root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-utils, ddcommon, tinybytes, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent third_party_libraries: - package_name: addr2line package_version: 0.21.0 @@ -24432,7 +24432,7 @@ third_party_libraries: - license: BSD-3-Clause text: NOT FOUND - package_name: serde - package_version: 1.0.197 + package_version: 1.0.209 repository: https://github.com/serde-rs/serde license: MIT OR Apache-2.0 licenses: @@ -24848,7 +24848,7 @@ third_party_libraries: END OF TERMS AND CONDITIONS - package_name: serde_derive - package_version: 1.0.197 + package_version: 1.0.209 repository: https://github.com/serde-rs/serde license: MIT OR Apache-2.0 licenses: @@ -25056,7 +25056,7 @@ third_party_libraries: END OF TERMS AND CONDITIONS - package_name: serde_json - package_version: 1.0.120 + package_version: 1.0.127 repository: https://github.com/serde-rs/json license: MIT OR Apache-2.0 licenses: diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index baf38b8df..4b2f7eb1f 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self, SendData, TracerHeaderTags}; use datadog_trace_utils::tracer_payload; -use datadog_trace_utils::tracer_payload::TraceEncoding; +use datadog_trace_utils::tracer_payload::TraceCollection; use ddcommon::{connector, Endpoint}; use hyper::http::uri::PathAndQuery; use hyper::{Body, Client, Method, Uri}; @@ -231,11 +231,10 @@ impl TraceExporter { ), TraceExporterOutputFormat::V07 => { let tracer_payload = trace_utils::collect_trace_chunks( - traces, + TraceCollection::V07(traces), &header_tags, &mut tracer_payload::DefaultTraceChunkProcessor, self.endpoint.api_key.is_some(), - TraceEncoding::V07, ); let endpoint = Endpoint { diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index 321e59615..b869ceb9c 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } tokio-serde = { version = "0.8", features = ["bincode"] } tokio-util = { version = "0.6.9", features = ["codec"] } libc = { version = "0.2" } +tinybytes = { path = "../tinybytes", optional = true } # tarpc needed extensions to allow 1 way communication and to export some internal structs tarpc = { path = "tarpc/tarpc", default-features = false, features = ["serde-transport"], package = "tarpc" } @@ -63,3 +64,6 @@ path = "benches/ipc.rs" [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(polyfill_glibc_memfd)'] } + +[features] +tiny-bytes = ["tinybytes"] diff --git a/ipc/src/platform/mem_handle.rs b/ipc/src/platform/mem_handle.rs index 87714924b..62490609b 100644 --- a/ipc/src/platform/mem_handle.rs +++ b/ipc/src/platform/mem_handle.rs @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize}; #[cfg(all(unix, not(target_os = "macos")))] use std::os::unix::prelude::AsRawFd; use std::{ffi::CString, io}; +#[cfg(feature = "tiny-bytes")] +use tinybytes::UnderlyingBytes; #[derive(Clone, Serialize, Deserialize, Debug)] pub struct ShmHandle { @@ -203,6 +205,15 @@ impl From for PlatformHandle { unsafe impl Sync for MappedMem where T: FileBackedHandle {} unsafe impl Send for MappedMem where T: FileBackedHandle {} +impl AsRef<[u8]> for MappedMem { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} + +#[cfg(feature = "tiny-bytes")] +impl UnderlyingBytes for MappedMem {} + #[cfg(test)] mod tests { use crate::platform::{FileBackedHandle, NamedShmHandle, ShmHandle}; diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index eb39ca826..05eded757 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -26,6 +26,7 @@ datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } datadog-remote-config = { path = "../remote-config" } datadog-crashtracker = { path = "../crashtracker" } +tinybytes = { path = "../tinybytes" } futures = { version = "0.3", default-features = false } manual_future = "0.1.1" @@ -34,7 +35,7 @@ hyper = { version = "0.14", features = ["client"], default-features = false } lazy_static = "1.4" pin-project = "1" -datadog-ipc = { path = "../ipc" } +datadog-ipc = { path = "../ipc", features = ["tiny-bytes"] } datadog-ipc-macros = { path = "../ipc/macros" } rand = "0.8.3" diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index e87f0a4ab..e7ed49f13 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -47,6 +47,7 @@ use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_remote_config::fetch::ConfigInvariants; use datadog_trace_utils::tracer_header_tags::TracerHeaderTags; +use tinybytes; type NoResponse = Ready<()>; @@ -248,7 +249,12 @@ impl SidecarServer { .expect("Unable to acquire lock on sessions") } - fn send_trace_v04(&self, headers: &SerializedTracerHeaderTags, data: &[u8], target: &Endpoint) { + fn send_trace_v04( + &self, + headers: &SerializedTracerHeaderTags, + data: tinybytes::Bytes, + target: &Endpoint, + ) { let headers: TracerHeaderTags = match headers.try_into() { Ok(headers) => headers, Err(e) => { @@ -741,7 +747,7 @@ impl SidecarInterface for SidecarServer { _: Context, instance_id: InstanceId, handle: ShmHandle, - len: usize, + _len: usize, headers: SerializedTracerHeaderTags, ) -> Self::SendTraceV04ShmFut { if let Some(endpoint) = self @@ -753,7 +759,8 @@ impl SidecarInterface for SidecarServer { tokio::spawn(async move { match handle.map() { Ok(mapped) => { - self.send_trace_v04(&headers, &mapped.as_slice()[..len], &endpoint); + let bytes = tinybytes::Bytes::from(mapped); + self.send_trace_v04(&headers, bytes, &endpoint); } Err(e) => error!("Failed mapping shared trace data memory: {}", e), } @@ -779,7 +786,8 @@ impl SidecarInterface for SidecarServer { .clone() { tokio::spawn(async move { - self.send_trace_v04(&headers, data.as_slice(), &endpoint); + let bytes = tinybytes::Bytes::from(data); + self.send_trace_v04(&headers, bytes, &endpoint); }); } diff --git a/tinybytes/Cargo.toml b/tinybytes/Cargo.toml index 647ef7e33..4e6474329 100644 --- a/tinybytes/Cargo.toml +++ b/tinybytes/Cargo.toml @@ -14,3 +14,12 @@ once_cell = "1.8" pretty_assertions = "1.3" proptest = {version = "1.5", features = ["std"], default-features = false} test-case = "2.2" +serde_json = "1.0.127" +tinybytes = { path = ".", features = ["bytes_string", "serialization"] } + +[dependencies] +serde = { version = "1.0.209", optional = true } + +[features] +bytes_string = [] +serialization = ["serde"] diff --git a/tinybytes/src/bytes_string.rs b/tinybytes/src/bytes_string.rs new file mode 100644 index 000000000..95148ffa2 --- /dev/null +++ b/tinybytes/src/bytes_string.rs @@ -0,0 +1,193 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::Bytes; +#[cfg(feature = "serde")] +use serde::ser::{Serialize, Serializer}; +use std::borrow::Borrow; +use std::str::Utf8Error; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct BytesString { + bytes: Bytes, +} + +#[cfg(feature = "serde")] +impl Serialize for BytesString { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(self.as_str()) + } +} + +impl BytesString { + /// Creates a `BytesString` from a slice of bytes. + /// + /// This function validates that the provided slice is valid UTF-8. If the slice is not valid + /// UTF-8, an error is returned. + /// + /// # Arguments + /// + /// * `slice` - A byte slice that will be converted into a `BytesString`. + /// + /// # Returns + /// + /// A `Result` containing the `BytesString` if the slice is valid UTF-8, or a `Utf8Error` if + /// the slice is not valid UTF-8. + /// + /// # Errors + /// + /// Returns a `Utf8Error` if the bytes are not valid UTF-8. + pub fn from_slice(slice: &[u8]) -> Result { + std::str::from_utf8(slice)?; + Ok(BytesString { + bytes: Bytes::copy_from_slice(slice), + }) + } + + /// Creates a `BytesString` from a `tinybytes::Bytes` instance. + /// + /// This function validates that the provided `Bytes` instance contains valid UTF-8 data. If the + /// data is not valid UTF-8, an error is returned. + /// + /// # Arguments + /// + /// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`. + /// + /// # Returns + /// + /// A `Result` containing the `BytesString` if the bytes are valid UTF-8, or a `Utf8Error` if + /// the bytes are not valid UTF-8. + /// + /// # Errors + /// + /// Returns a `Utf8Error` if the bytes are not valid UTF-8. + pub fn from_bytes(bytes: Bytes) -> Result { + std::str::from_utf8(&bytes)?; + Ok(BytesString { bytes }) + } + + /// Creates a `BytesString` from a string slice within the given buffer. + /// + /// # Arguments + /// + /// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`. + /// * `slice` - The string slice pointing into the given bytes that will form the `BytesString`. + pub fn from_bytes_slice(bytes: &Bytes, slice: &str) -> BytesString { + // SAFETY: This is safe as a str slice is definitely a valid UTF-8 slice. + unsafe { + BytesString::from_bytes_unchecked( + bytes.slice_ref(slice.as_bytes()).expect("Invalid slice"), + ) + } + } + + /// Creates a `BytesString` from a `tinybytes::Bytes` instance without validating the bytes. + /// + /// This function does not perform any validation on the provided bytes, and assumes that the + /// bytes are valid UTF-8. If the bytes are not valid UTF-8, the behavior is undefined. + /// + /// # Arguments + /// + /// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`. + /// + /// # Safety + /// + /// This function is unsafe because it assumes the bytes are valid UTF-8. If the bytes are not + /// valid UTF-8, the behavior is undefined. + pub unsafe fn from_bytes_unchecked(bytes: Bytes) -> BytesString { + BytesString { bytes } + } + + /// Returns the string slice representation of the `BytesString` (without validating the bytes). + /// Typically, you should use `from_slice` or `from_bytes` when creating a BytesString to + /// ensure the bytes are valid UTF-8 (if the bytes haven't already been validated by other + /// means) so further validation may be unnecessary. + pub fn as_str(&self) -> &str { + // SAFETY: We assume all BytesStrings are valid UTF-8. + unsafe { std::str::from_utf8_unchecked(&self.bytes) } + } +} + +impl Default for BytesString { + fn default() -> Self { + BytesString { + bytes: Bytes::empty(), + } + } +} + +impl Borrow for BytesString { + fn borrow(&self) -> &str { + self.as_str() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_from_slice() { + let slice = b"hello"; + let bytes_string = BytesString::from_slice(slice).unwrap(); + assert_eq!(bytes_string.as_str(), "hello"); + } + + #[test] + fn test_from_slice_invalid_utf8() { + let invalid_utf8_slice = &[0, 159, 146, 150]; + let result = BytesString::from_slice(invalid_utf8_slice); + assert!(result.is_err()); + } + + #[test] + fn test_from_bytes() { + let bytes = Bytes::copy_from_slice(b"world"); + let bytes_string = BytesString::from_bytes(bytes).unwrap(); + assert_eq!(bytes_string.as_str(), "world"); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_from_bytes_invalid_utf8() { + let invalid_utf8_bytes = Bytes::copy_from_slice(&[0, 159, 146, 150]); + let result = BytesString::from_bytes(invalid_utf8_bytes); + assert!(result.is_err()); + } + + #[test] + fn test_from_bytes_unchecked() { + let bytes = Bytes::copy_from_slice(b"unchecked"); + let bytes_string = unsafe { BytesString::from_bytes_unchecked(bytes) }; + assert_eq!(bytes_string.as_str(), "unchecked"); + } + + #[test] + fn test_as_str() { + let bytes_string = BytesString::from_slice(b"test").unwrap(); + assert_eq!(bytes_string.as_str(), "test"); + } + + #[test] + fn test_serialize() { + let bytes_string = BytesString::from_slice(b"serialize"); + let serialized = serde_json::to_string(&bytes_string.unwrap()).unwrap(); + assert_eq!(serialized, "\"serialize\""); + } + + #[test] + fn test_default() { + let bytes_string: BytesString = Default::default(); + assert_eq!(bytes_string.as_str(), ""); + } + + #[test] + fn test_borrow() { + let bytes_string = BytesString::from_slice(b"borrow").unwrap(); + let borrowed: &str = bytes_string.borrow(); + assert_eq!(borrowed, "borrow"); + } +} diff --git a/tinybytes/src/lib.rs b/tinybytes/src/lib.rs index 036c62788..3ba95a4ee 100644 --- a/tinybytes/src/lib.rs +++ b/tinybytes/src/lib.rs @@ -10,8 +10,7 @@ use std::{ /// Immutable bytes type with zero copy cloning and slicing. #[derive(Clone)] pub struct Bytes { - ptr: *const u8, - len: usize, + slice: &'static [u8], // The `bytes`` field is used to ensure that the underlying bytes are freed when there are no // more references to the `Bytes` object. For static buffers the field is `None`. bytes: Option>, @@ -36,11 +35,7 @@ impl Bytes { #[inline] pub fn from_static(value: &'static [u8]) -> Self { let slice: &[u8] = value; - Self { - ptr: slice.as_ptr(), - len: slice.len(), - bytes: None, - } + Self { slice, bytes: None } } /// Creates `Bytes` from a slice, by copying. @@ -51,13 +46,13 @@ impl Bytes { /// Returns the length of the `Bytes`. #[inline] pub const fn len(&self) -> usize { - self.len + self.slice.len() } /// Returns `true` if the `Bytes` is empty. #[inline] pub const fn is_empty(&self) -> bool { - self.len == 0 + self.slice.is_empty() } /// Returns a slice of self for the provided range. @@ -154,8 +149,8 @@ impl Bytes { let subset_start = subset.as_ptr() as usize; let subset_end = subset_start + subset.len(); - let self_start = self.ptr as usize; - let self_end = self_start + self.len; + let self_start = self.slice.as_ptr() as usize; + let self_end = self_start + self.slice.len(); if subset_start >= self_start && subset_end <= self_end { Some(self.safe_slice_ref(subset_start - self_start, subset_end - self_start)) } else { @@ -163,13 +158,25 @@ impl Bytes { } } + /// Returns a mutable reference to the slice of self. + /// Allows for fast unchecked shrinking of the slice. + /// + /// # Safety + /// + /// Callers of that function must make sure that they only put subslices of the slice into the + /// returned reference. + /// They also need to make sure to not persist the slice reference for longer than the struct + /// lives. + #[inline] + pub unsafe fn as_mut_slice(&mut self) -> &mut &'static [u8] { + &mut self.slice + } + // private fn from_underlying(value: impl UnderlyingBytes) -> Self { - let slice: &[u8] = value.as_ref(); Self { - ptr: slice.as_ptr(), - len: slice.len(), + slice: unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(value.as_ref()) }, bytes: Some(Arc::new(value)), } } @@ -177,15 +184,14 @@ impl Bytes { #[inline] fn safe_slice_ref(&self, start: usize, end: usize) -> Self { Self { - ptr: unsafe { self.ptr.add(start) }, - len: end - start, + slice: &self.slice[start..end], bytes: self.bytes.clone(), } } #[inline] fn as_slice(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + self.slice } } @@ -264,5 +270,10 @@ impl fmt::Debug for Bytes { } } +#[cfg(feature = "bytes_string")] +mod bytes_string; +#[cfg(feature = "bytes_string")] +pub use bytes_string::BytesString; + #[cfg(test)] mod test; diff --git a/tinybytes/src/test.rs b/tinybytes/src/test.rs index c974ccf9d..1c86ebe27 100644 --- a/tinybytes/src/test.rs +++ b/tinybytes/src/test.rs @@ -128,6 +128,7 @@ fn test_bytes_slice_ref_is_shallow() { } #[test] +#[cfg_attr(miri, ignore)] fn test_bytes_drop_frees_underlying() { let underlying = CountingU8::new(vec![1, 2, 3, 4, 5].into()); let counter = underlying.counter(); diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 59baca694..dbf7c8f67 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -12,8 +12,7 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::SendData; use datadog_trace_utils::trace_utils::{self}; -use datadog_trace_utils::tracer_payload::TraceChunkProcessor; -use datadog_trace_utils::tracer_payload::TraceEncoding; +use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TraceCollection}; use crate::{ config::Config, @@ -90,14 +89,13 @@ impl TraceProcessor for ServerlessTraceProcessor { }; let payload = trace_utils::collect_trace_chunks( - traces, + TraceCollection::V07(traces), &tracer_header_tags, &mut ChunkProcessor { config: config.clone(), mini_agent_metadata: mini_agent_metadata.clone(), }, true, // In mini agent, we always send agentless - TraceEncoding::V07, ); let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index b6ffeb828..86bd5729a 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -39,6 +39,7 @@ testcontainers = { version = "0.17.0", optional = true } cargo_metadata = { version = "0.18.1", optional = true } # Dependency of cargo metadata, but 0.1.8 requires too new of a rust version. cargo-platform = { version = "=0.1.7", optional = true } +tinybytes = { path = "../tinybytes", features = ["bytes_string", "serialization"] } [dev-dependencies] bolero = "0.10.1" @@ -47,6 +48,7 @@ criterion = "0.5.1" httpmock = { version = "0.7.0"} serde_json = "1.0" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +datadog-trace-utils = { path = ".", features = ["test-utils"] } [features] test-utils = ["httpmock", "testcontainers", "cargo_metadata", "cargo-platform"] diff --git a/trace-utils/benches/deserialization.rs b/trace-utils/benches/deserialization.rs index c4573c3d9..b16afbca9 100644 --- a/trace-utils/benches/deserialization.rs +++ b/trace-utils/benches/deserialization.rs @@ -39,17 +39,18 @@ pub fn deserialize_msgpack_to_internal(c: &mut Criterion) { let data = rmp_serde::to_vec(&vec![span_data1, span_data2]).expect("Failed to serialize test spans."); + let data_as_bytes = tinybytes::Bytes::copy_from_slice(&data); let tracer_header_tags = &TracerHeaderTags::default(); c.bench_function( "benching deserializing traces from msgpack to their internal representation ", |b| { b.iter_batched( - || &data, - |data| { + || data_as_bytes.clone(), + |data_as_bytes| { let result: anyhow::Result = black_box( TracerPayloadParams::new( - data, + data_as_bytes, tracer_header_tags, &mut DefaultTraceChunkProcessor, false, diff --git a/trace-utils/src/lib.rs b/trace-utils/src/lib.rs index fffe4f8c1..d3090e777 100644 --- a/trace-utils/src/lib.rs +++ b/trace-utils/src/lib.rs @@ -10,3 +10,5 @@ pub mod test_utils; pub mod trace_utils; pub mod tracer_header_tags; pub mod tracer_payload; + +pub mod span_v04; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 76a7c7a41..49d87b059 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -6,16 +6,12 @@ mod span_link; use self::span::decode_span; use super::error::DecodeError; -use super::number::read_number; -use crate::tracer_payload::TracerPayloadV04; -use datadog_trace_protobuf::pb::Span; +use super::number::read_number_bytes; +use crate::span_v04::Span; use rmp::decode::DecodeStringError; -use rmp::{ - decode, - decode::{read_array_len, RmpRead}, - Marker, -}; +use rmp::{decode, decode::RmpRead, Marker}; use std::{collections::HashMap, f64}; +use tinybytes::{Bytes, BytesString}; /// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. /// @@ -23,8 +19,8 @@ use std::{collections::HashMap, f64}; /// /// # Arguments /// -/// * `data` - A mutable reference to a slice of bytes containing the encoded data.Bytes are -/// expected to be encoded msgpack data containing a list of a list of v04 spans. +/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be +/// encoded msgpack data containing a list of a list of v04 spans. /// /// # Returns /// @@ -43,50 +39,61 @@ use std::{collections::HashMap, f64}; /// use datadog_trace_protobuf::pb::Span; /// use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; /// use rmp_serde::to_vec_named; +/// use tinybytes; /// /// let span = Span { /// name: "test-span".to_owned(), /// ..Default::default() /// }; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); -/// let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); +/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); +/// let decoded_traces = from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; -/// assert_eq!("test-span", decoded_span.name); +/// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(data: &mut &[u8]) -> Result, DecodeError> { - let trace_count = read_array_len(data).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) - })?; - - let mut traces: Vec = Default::default(); - - for _ in 0..trace_count { - let span_count = match read_array_len(data) { - Ok(count) => count, - Err(_) => { - return Err(DecodeError::InvalidFormat( - "Unable to read array len for span count".to_owned(), - )) - } - }; +pub fn from_slice(mut data: tinybytes::Bytes) -> Result>, DecodeError> { + let trace_count = + rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) + })?; - let mut trace: Vec = Default::default(); + (0..trace_count).try_fold( + Vec::with_capacity( + trace_count + .try_into() + .expect("Unable to cast trace_count to usize"), + ), + |mut traces, _| { + let span_count = + rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) + })?; + + let trace = (0..span_count).try_fold( + Vec::with_capacity( + span_count + .try_into() + .expect("Unable to cast span_count to usize"), + ), + |mut trace, _| { + let span = decode_span(&mut data)?; + trace.push(span); + Ok(trace) + }, + )?; - for _ in 0..span_count { - let span = decode_span(data)?; - trace.push(span); - } - traces.push(trace); - } + traces.push(trace); - Ok(traces) + Ok(traces) + }, + ) } #[inline] -fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { +fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { decode::read_str_from_slice(buf).map_err(|e| match e { DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), @@ -99,53 +106,70 @@ fn read_string_ref(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { } #[inline] -fn read_string(buf: &mut &[u8]) -> Result { - let (str_ref, remaining_buf) = read_string_ref(buf)?; - *buf = remaining_buf; - Ok(str_ref.to_string()) +fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_ref_nomut(buf).map(|(str, newbuf)| { + *buf = newbuf; + str + }) } #[inline] -fn read_str_pair(buf: &mut &[u8]) -> Result<(String, String), DecodeError> { - let k = read_string(buf)?; - let v = read_string(buf)?; - - Ok((k, v)) +fn read_string_bytes(buf: &mut Bytes) -> Result { + // Note: we need to pass a &'static lifetime here, otherwise it'll complain + read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| { + let string = BytesString::from_bytes_slice(buf, str); + *unsafe { buf.as_mut_slice() } = newbuf; + string + }) } #[inline] -fn read_metric_pair(buf: &mut &[u8]) -> Result<(String, f64), DecodeError> { - let k = read_string(buf)?; - let v = read_number(buf)?.try_into()?; - - Ok((k, v)) +// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the +// BytesStrings. +fn read_str_map_to_bytes_strings( + buf_wrapper: &mut Bytes, +) -> Result, DecodeError> { + let len = decode::read_map_len(unsafe { buf_wrapper.as_mut_slice() }) + .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; + + let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); + for _ in 0..len { + let key = read_string_bytes(buf_wrapper)?; + let value = read_string_bytes(buf_wrapper)?; + map.insert(key, value); + } + Ok(map) } -fn read_map_strs(buf: &mut &[u8]) -> Result, DecodeError> { - let len = read_map_len(buf)?; - read_map(len, buf, read_str_pair) -} +#[inline] +fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { + let key = read_string_bytes(buf)?; + let v = read_number_bytes(buf)?; -fn read_metrics(buf: &mut &[u8]) -> Result, DecodeError> { - let len = read_map_len(buf)?; + Ok((key, v)) +} +fn read_metrics(buf: &mut Bytes) -> Result, DecodeError> { + let len = read_map_len(unsafe { buf.as_mut_slice() })?; read_map(len, buf, read_metric_pair) } -fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeError> { - fn read_meta_struct_pair(buf: &mut &[u8]) -> Result<(String, Vec), DecodeError> { - let k = read_string(buf)?; - let mut v = vec![]; - let array_len = decode::read_array_len(buf).map_err(|_| { +fn read_meta_struct(buf: &mut Bytes) -> Result>, DecodeError> { + fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec), DecodeError> { + let key = read_string_bytes(buf)?; + let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) })?; + + let mut v = Vec::with_capacity(array_len as usize); + for _ in 0..array_len { - let value = read_number(buf)?.try_into()?; + let value = read_number_bytes(buf)?; v.push(value); } - Ok((k, v)) + Ok((key, v)) } - let len = read_map_len(buf)?; + let len = read_map_len(unsafe { buf.as_mut_slice() })?; read_map(len, buf, read_meta_struct_pair) } @@ -157,7 +181,7 @@ fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeE /// # Arguments /// /// * `len` - The number of key-value pairs to read from the buffer. -/// * `buf` - A mutable reference to the buffer containing the encoded map data. +/// * `buf` - A reference to the Bytes containing the encoded map data. /// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a /// `Result<(K, V), DecodeError>`. /// @@ -178,14 +202,14 @@ fn read_meta_struct(buf: &mut &[u8]) -> Result>, DecodeE /// * `F` - The type of the function used to read key-value pairs from the buffer. fn read_map( len: usize, - buf: &mut &[u8], + buf: &mut Bytes, read_pair: F, ) -> Result, DecodeError> where K: std::hash::Hash + Eq, - F: Fn(&mut &[u8]) -> Result<(K, V), DecodeError>, + F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>, { - let mut map = HashMap::new(); + let mut map = HashMap::with_capacity(len); for _ in 0..len { let (k, v) = read_pair(buf)?; map.insert(k, v); @@ -215,60 +239,94 @@ fn read_map_len(buf: &mut &[u8]) -> Result { #[cfg(test)] mod tests { use super::*; - use datadog_trace_protobuf::pb::SpanLink; + use crate::test_utils::create_test_json_span; + use bolero::check; + use rmp_serde; + use rmp_serde::to_vec_named; + use serde_json::json; + use tinybytes::BytesString; + + fn generate_meta_struct_element(i: u8) -> (String, Vec) { + let map = HashMap::from([ + ( + format!("meta_struct_map_key {}", i + 1), + format!("meta_struct_map_val {}", i + 1), + ), + ( + format!("meta_struct_map_key {}", i + 2), + format!("meta_struct_map_val {}", i + 2), + ), + ]); + let key = format!("key {}", i).to_owned(); + + (key, rmp_serde::to_vec_named(&map).unwrap()) + } #[test] fn decoder_read_string_success() { let expected_string = "test-service-name"; let span = Span { - name: expected_string.to_owned(), + name: BytesString::from_slice(expected_string.as_ref()).unwrap(), ..Default::default() }; let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_string, decoded_span.name); + assert_eq!(expected_string, decoded_span.name.as_str()); } #[test] fn test_decoder_meta_struct_fixed_map_success() { let expected_meta_struct = HashMap::from([ - ("key1".to_string(), vec![1, 2, 3]), - ("key2".to_string(), vec![4, 5, 6]), + generate_meta_struct_element(0), + generate_meta_struct_element(1), ]); - let span = Span { - meta_struct: expected_meta_struct.clone(), - ..Default::default() - }; + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta_struct"] = json!(expected_meta_struct.clone()); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_meta_struct, decoded_span.meta_struct); + + for (key, value) in expected_meta_struct.iter() { + assert_eq!( + value, + &decoded_span.meta_struct[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } } #[test] fn test_decoder_meta_struct_map_16_success() { - let expected_meta_struct: HashMap> = (0..20) - .map(|i| (format!("key {}", i), vec![1 + i, 2 + i, 3 + i])) - .collect(); + let expected_meta_struct: HashMap> = + (0..20).map(generate_meta_struct_element).collect(); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta_struct"] = json!(expected_meta_struct.clone()); - let span = Span { - meta_struct: expected_meta_struct.clone(), - ..Default::default() - }; let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_meta_struct, decoded_span.meta_struct); + + for (key, value) in expected_meta_struct.iter() { + assert_eq!( + value, + &decoded_span.meta_struct[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } } #[test] @@ -277,109 +335,173 @@ mod tests { ("key1".to_string(), "value1".to_string()), ("key2".to_string(), "value2".to_string()), ]); - let span = Span { - meta: expected_meta.clone(), - ..Default::default() - }; + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta"] = json!(expected_meta.clone()); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_meta, decoded_span.meta); + + for (key, value) in expected_meta.iter() { + assert_eq!( + value, + &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str() + ); + } } #[test] fn test_decoder_meta_map_16_success() { let expected_meta: HashMap = (0..20) - .map(|i| (format!("key {}", i), format!("value {}", i))) + .map(|i| { + ( + format!("key {}", i).to_owned(), + format!("value {}", i).to_owned(), + ) + }) .collect(); - let span = Span { - meta: expected_meta.clone(), - ..Default::default() - }; + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta"] = json!(expected_meta.clone()); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_meta, decoded_span.meta); + + for (key, value) in expected_meta.iter() { + assert_eq!( + value, + &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str() + ); + } } #[test] fn test_decoder_metrics_fixed_map_success() { - let mut span = Span::default(); - let expected_metrics = - HashMap::from([("metric1".to_string(), 1.23), ("metric2".to_string(), 4.56)]); - span.metrics = expected_metrics.clone(); + let expected_metrics = HashMap::from([("metric1", 1.23), ("metric2", 4.56)]); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_metrics, decoded_span.metrics); + + for (key, value) in expected_metrics.iter() { + assert_eq!( + value, + &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } } #[test] fn test_decoder_metrics_map16_success() { - let mut span = Span::default(); let expected_metrics: HashMap = (0..20) .map(|i| (format!("metric{}", i), i as f64)) .collect(); - span.metrics = expected_metrics.clone(); + let mut span = create_test_json_span(1, 2, 0, 0); + span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = from_slice(&mut encoded_data.as_slice()).expect("Decoding failed"); + let decoded_traces = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_metrics, decoded_span.metrics); + + for (key, value) in expected_metrics.iter() { + assert_eq!( + value, + &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } } #[test] fn test_decoder_span_link_success() { - let expected_span_links = vec![SpanLink { - trace_id: 1, - trace_id_high: 0, - span_id: 1, - attributes: HashMap::from([ - ("attr1".to_string(), "test_value".to_string()), - ("attr2".to_string(), "test_value".to_string()), - ]), - tracestate: "state_test".to_string(), - flags: 0b101, - }]; + let expected_span_link = json!({ + "trace_id": 1, + "trace_id_high": 0, + "span_id": 1, + "attributes": { + "attr1": "test_value", + "attr2": "test_value2" + }, + "tracestate": "state_test", + "flags": 0b101 + }); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["span_links"] = json!([expected_span_link]); - let span = Span { - span_links: expected_span_links.clone(), - ..Default::default() - }; let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let decoded_traces = - from_slice(&mut encoded_data.as_slice()).expect("unable to decode span"); + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_span_links, decoded_span.span_links); + + assert_eq!( + expected_span_link["trace_id"], + decoded_span.span_links[0].trace_id + ); + assert_eq!( + expected_span_link["trace_id_high"], + decoded_span.span_links[0].trace_id_high + ); + assert_eq!( + expected_span_link["span_id"], + decoded_span.span_links[0].span_id + ); + assert_eq!( + expected_span_link["tracestate"], + decoded_span.span_links[0].tracestate.as_str() + ); + assert_eq!( + expected_span_link["flags"], + decoded_span.span_links[0].flags + ); + assert_eq!( + expected_span_link["attributes"]["attr1"], + decoded_span.span_links[0].attributes + [&BytesString::from_slice("attr1".as_ref()).unwrap()] + .as_str() + ); + assert_eq!( + expected_span_link["attributes"]["attr2"], + decoded_span.span_links[0].attributes + [&BytesString::from_slice("attr2".as_ref()).unwrap()] + .as_str() + ); } #[test] + #[cfg_attr(miri, ignore)] fn test_decoder_read_string_wrong_format() { let span = Span { - service: "my_service".to_owned(), + service: BytesString::from_slice("my_service".as_ref()).unwrap(), ..Default::default() }; let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); // This changes the map size from 11 to 12 to trigger an InvalidMarkerRead error. encoded_data[2] = 0x8c; - let result = from_slice(&mut encoded_data.as_slice()); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); assert_eq!( Err(DecodeError::InvalidFormat( "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() @@ -389,16 +511,18 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn test_decoder_read_string_utf8_error() { let invalid_seq = vec![0, 159, 146, 150]; let invalid_str = unsafe { String::from_utf8_unchecked(invalid_seq) }; + let invalid_str_as_bytes = tinybytes::Bytes::from(invalid_str); let span = Span { - name: invalid_str.to_owned(), + name: unsafe { BytesString::from_bytes_unchecked(invalid_str_as_bytes) }, ..Default::default() }; let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let result = from_slice(&mut encoded_data.as_slice()); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); assert_eq!( Err(DecodeError::Utf8Error( "invalid utf-8 sequence of 1 bytes from index 1".to_owned() @@ -408,6 +532,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn test_decoder_invalid_marker_for_trace_count_read() { let span = Span::default(); let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); @@ -415,7 +540,7 @@ mod tests { // reading the array len of traces encoded_data[0] = 0x8c; - let result = from_slice(&mut encoded_data.as_ref()); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); assert_eq!( Err(DecodeError::InvalidFormat( "Unable to read array len for trace count".to_string() @@ -425,6 +550,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn test_decoder_invalid_marker_for_span_count_read() { let span = Span::default(); let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); @@ -432,7 +558,7 @@ mod tests { // reading the array len of spans encoded_data[1] = 0x8c; - let result = from_slice(&mut encoded_data.as_ref()); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); assert_eq!( Err(DecodeError::InvalidFormat( "Unable to read array len for span count".to_owned() @@ -442,25 +568,7 @@ mod tests { } #[test] - fn test_decoder_read_string_invalid_data_read() { - let span = Span { - name: "test-span".to_owned(), - ..Default::default() - }; - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - // This changes the marker for the empty metrics map to a str8 marker - encoded_data[104] = 0xD9; - - let result = from_slice(&mut encoded_data.as_slice()); - assert_eq!( - Err(DecodeError::InvalidConversion( - "Expected at least bytes 1, but only got 0 (pos 1)".to_owned() - )), - result - ); - } - - #[test] + #[cfg_attr(miri, ignore)] fn test_decoder_read_string_type_mismatch() { let span = Span::default(); let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); @@ -468,7 +576,7 @@ mod tests { // field to an integer marker encoded_data[3] = 0x01; - let result = from_slice(&mut encoded_data.as_slice()); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); assert_eq!( Err(DecodeError::InvalidType( "Type mismatch at marker FixPos(1)".to_owned() @@ -477,11 +585,8 @@ mod tests { ); } - use bolero::check; - use datadog_trace_protobuf::pb::Span; - use rmp_serde::to_vec_named; - #[test] + #[cfg_attr(miri, ignore)] fn fuzz_from_slice() { check!() .with_type::<( @@ -515,13 +620,16 @@ mod tests { start, )| { let span = Span { - name, - service, - resource, - r#type: span_type, - meta: HashMap::from([(meta_key, meta_value)]), + name: BytesString::from_slice(name.as_ref()).unwrap(), + service: BytesString::from_slice(service.as_ref()).unwrap(), + resource: BytesString::from_slice(resource.as_ref()).unwrap(), + r#type: BytesString::from_slice(span_type.as_ref()).unwrap(), + meta: HashMap::from([( + BytesString::from_slice(meta_key.as_ref()).unwrap(), + BytesString::from_slice(meta_value.as_ref()).unwrap(), + )]), metrics: HashMap::from([( - metric_key, + BytesString::from_slice(metric_key.as_ref()).unwrap(), metric_value.parse::().unwrap_or_default(), )]), trace_id, @@ -531,8 +639,7 @@ mod tests { ..Default::default() }; let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); - let result = from_slice(&mut encoded_data.as_slice()); - println!("result: {:?}", result); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); assert!(result.is_ok()); }, diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs index 4bd9c3d61..445337198 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -2,13 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ - read_map_strs, read_meta_struct, read_metrics, read_string, read_string_ref, - span_link::read_span_links, + read_meta_struct, read_metrics, read_str_map_to_bytes_strings, read_string_bytes, + read_string_ref, span_link::read_span_links, }; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_number; -use datadog_trace_protobuf::pb::Span; -use std::str::FromStr; +use crate::msgpack_decoder::v04::number::read_number_bytes; +use crate::span_v04::{Span, SpanKey}; +use tinybytes::Bytes; /// Decodes a slice of bytes into a `Span` object. /// @@ -26,103 +26,50 @@ use std::str::FromStr; /// This function will return an error if: /// - The map length cannot be read. /// - Any key or value cannot be decoded. -#[inline] -pub(crate) fn decode_span(buf: &mut &[u8]) -> Result { +pub fn decode_span(buffer: &mut Bytes) -> Result { let mut span = Span::default(); - let span_size = rmp::decode::read_map_len(buf).map_err(|_| { + + let span_size = rmp::decode::read_map_len(unsafe { buffer.as_mut_slice() }).map_err(|_| { DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) })?; for _ in 0..span_size { - fill_span(&mut span, buf)?; + fill_span(&mut span, buffer)?; } - Ok(span) -} - -#[derive(Debug, PartialEq)] -enum SpanKey { - Service, - Name, - Resource, - TraceId, - SpanId, - ParentId, - Start, - Duration, - Error, - Meta, - Metrics, - Type, - MetaStruct, - SpanLinks, -} -impl FromStr for SpanKey { - type Err = DecodeError; - - fn from_str(s: &str) -> Result { - match s { - "service" => Ok(SpanKey::Service), - "name" => Ok(SpanKey::Name), - "resource" => Ok(SpanKey::Resource), - "trace_id" => Ok(SpanKey::TraceId), - "span_id" => Ok(SpanKey::SpanId), - "parent_id" => Ok(SpanKey::ParentId), - "start" => Ok(SpanKey::Start), - "duration" => Ok(SpanKey::Duration), - "error" => Ok(SpanKey::Error), - "meta" => Ok(SpanKey::Meta), - "metrics" => Ok(SpanKey::Metrics), - "type" => Ok(SpanKey::Type), - "meta_struct" => Ok(SpanKey::MetaStruct), - "span_links" => Ok(SpanKey::SpanLinks), - _ => Err(DecodeError::InvalidFormat( - format!("Invalid span key: {}", s).to_owned(), - )), - } - } + Ok(span) } -fn fill_span(span: &mut Span, buf: &mut &[u8]) -> Result<(), DecodeError> { - let (key, value) = read_string_ref(buf)?; - let key = key.parse::()?; - - *buf = value; +// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the +// BytesStrings +fn fill_span(span: &mut Span, buf: &mut Bytes) -> Result<(), DecodeError> { + let key = read_string_ref(unsafe { buf.as_mut_slice() })? + .parse::() + .map_err(|_| DecodeError::InvalidFormat("Invalid span key".to_owned()))?; match key { - SpanKey::Service => { - let value = read_string(buf)?; - span.service = value; - } - SpanKey::Name => { - let value = read_string(buf)?; - span.name = value; - } - SpanKey::Resource => { - let value = read_string(buf)?; - span.resource = value; - } - SpanKey::TraceId => span.trace_id = read_number(buf)?.try_into()?, - SpanKey::SpanId => span.span_id = read_number(buf)?.try_into()?, - SpanKey::ParentId => span.parent_id = read_number(buf)?.try_into()?, - SpanKey::Start => span.start = read_number(buf)?.try_into()?, - SpanKey::Duration => span.duration = read_number(buf)?.try_into()?, - SpanKey::Error => span.error = read_number(buf)?.try_into()?, - SpanKey::Meta => span.meta = read_map_strs(buf)?, + SpanKey::Service => span.service = read_string_bytes(buf)?, + SpanKey::Name => span.name = read_string_bytes(buf)?, + SpanKey::Resource => span.resource = read_string_bytes(buf)?, + SpanKey::TraceId => span.trace_id = read_number_bytes(buf)?, + SpanKey::SpanId => span.span_id = read_number_bytes(buf)?, + SpanKey::ParentId => span.parent_id = read_number_bytes(buf)?, + SpanKey::Start => span.start = read_number_bytes(buf)?, + SpanKey::Duration => span.duration = read_number_bytes(buf)?, + SpanKey::Error => span.error = read_number_bytes(buf)?, + SpanKey::Type => span.r#type = read_string_bytes(buf)?, + SpanKey::Meta => span.meta = read_str_map_to_bytes_strings(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, - SpanKey::Type => { - let value = read_string(buf)?; - span.r#type = value; - } SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, } Ok(()) } + #[cfg(test)] mod tests { use super::SpanKey; - use crate::msgpack_decoder::v04::error::DecodeError; + use crate::span_v04::SpanKeyParseError; use std::str::FromStr; #[test] @@ -145,9 +92,9 @@ mod tests { ); assert_eq!(SpanKey::from_str("span_links").unwrap(), SpanKey::SpanLinks); - assert!(matches!( - SpanKey::from_str("invalid_key"), - Err(DecodeError::InvalidFormat(_)) - )); + let invalid_result = SpanKey::from_str("invalid_key"); + let msg = format!("SpanKeyParseError: Invalid span key: {}", "invalid_key"); + assert!(matches!(invalid_result, Err(SpanKeyParseError { .. }))); + assert_eq!(invalid_result.unwrap_err().to_string(), msg); } } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs index 07a48d723..360e4522a 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs @@ -1,12 +1,15 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::v04::decoder::{read_map_strs, read_string, read_string_ref}; +use crate::msgpack_decoder::v04::decoder::{ + read_str_map_to_bytes_strings, read_string_bytes, read_string_ref, +}; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_number; -use datadog_trace_protobuf::pb::SpanLink; +use crate::msgpack_decoder::v04::number::read_number_bytes; +use crate::span_v04::SpanLink; use rmp::Marker; use std::str::FromStr; +use tinybytes::Bytes; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. /// @@ -25,12 +28,12 @@ use std::str::FromStr; /// - The marker for the array length cannot be read. /// - Any `SpanLink` cannot be decoded. /// ``` -pub(crate) fn read_span_links(buf: &mut &[u8]) -> Result, DecodeError> { - match rmp::decode::read_marker(buf).map_err(|_| { +pub(crate) fn read_span_links(buf: &mut Bytes) -> Result, DecodeError> { + match rmp::decode::read_marker(unsafe { buf.as_mut_slice() }).map_err(|_| { DecodeError::InvalidFormat("Unable to read marker for span links".to_owned()) })? { Marker::FixArray(len) => { - let mut vec: Vec = Vec::new(); + let mut vec: Vec = Vec::with_capacity(len.into()); for _ in 0..len { vec.push(decode_span_link(buf)?); } @@ -50,6 +53,7 @@ enum SpanLinkKey { Tracestate, Flags, } + impl FromStr for SpanLinkKey { type Err = DecodeError; @@ -68,26 +72,19 @@ impl FromStr for SpanLinkKey { } } -fn decode_span_link(buf: &mut &[u8]) -> Result { +fn decode_span_link(buf: &mut Bytes) -> Result { let mut span = SpanLink::default(); - let span_size = rmp::decode::read_map_len(buf) + let span_size = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { - let (key, value) = read_string_ref(buf)?; - *buf = value; - let key = key.parse::()?; - - match key { - SpanLinkKey::TraceId => span.trace_id = read_number(buf)?.try_into()?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number(buf)?.try_into()?, - SpanLinkKey::SpanId => span.span_id = read_number(buf)?.try_into()?, - SpanLinkKey::Attributes => span.attributes = read_map_strs(buf)?, - SpanLinkKey::Tracestate => { - let value = read_string(buf)?; - span.tracestate = value; - } - SpanLinkKey::Flags => span.flags = read_number(buf)?.try_into()?, + match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { + SpanLinkKey::TraceId => span.trace_id = read_number_bytes(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_bytes(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number_bytes(buf)?, + SpanLinkKey::Attributes => span.attributes = read_str_map_to_bytes_strings(buf)?, + SpanLinkKey::Tracestate => span.tracestate = read_string_bytes(buf)?, + SpanLinkKey::Flags => span.flags = read_number_bytes(buf)?, } } diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/number.rs index 9a4f39cfc..4e0fce7f4 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/number.rs @@ -4,6 +4,7 @@ use super::error::DecodeError; use rmp::{decode::RmpRead, Marker}; use std::fmt; +use tinybytes::Bytes; #[derive(Debug, PartialEq)] pub enum Number { @@ -189,6 +190,12 @@ pub fn read_number(buf: &mut &[u8]) -> Result { } } +pub fn read_number_bytes>( + buf: &mut Bytes, +) -> Result { + read_number(unsafe { buf.as_mut_slice() })?.try_into() +} + #[cfg(test)] mod tests { use super::*; diff --git a/trace-utils/src/send_data/mod.rs b/trace-utils/src/send_data/mod.rs index 46d9a942b..11f5544d7 100644 --- a/trace-utils/src/send_data/mod.rs +++ b/trace-utils/src/send_data/mod.rs @@ -465,7 +465,7 @@ mod tests { use super::*; use crate::send_data::retry_strategy::RetryBackoffType; use crate::send_data::retry_strategy::RetryStrategy; - use crate::test_utils::{create_send_data, create_test_span, poll_for_mock_hit}; + use crate::test_utils::{create_send_data, create_test_no_alloc_span, poll_for_mock_hit}; use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags}; use crate::tracer_header_tags::TracerHeaderTags; use datadog_trace_protobuf::pb::Span; @@ -775,7 +775,7 @@ mod tests { let header_tags = HEADER_TAGS; - let trace = vec![create_test_span(1234, 12342, 12341, 1, false)]; + let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)]; let data = SendData::new( 100, TracerPayloadCollection::V04(vec![trace.clone()]), @@ -956,7 +956,7 @@ mod tests { let header_tags = HEADER_TAGS; - let trace = vec![create_test_span(1234, 12342, 12341, 1, false)]; + let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)]; let data = SendData::new( 100, TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]), diff --git a/trace-utils/src/span_v04.rs b/trace-utils/src/span_v04.rs new file mode 100644 index 000000000..83c220915 --- /dev/null +++ b/trace-utils/src/span_v04.rs @@ -0,0 +1,106 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use serde::Serialize; +use std::collections::HashMap; +use std::fmt; +use std::str::FromStr; +use tinybytes::BytesString; + +#[derive(Debug, PartialEq)] +pub enum SpanKey { + Service, + Name, + Resource, + TraceId, + SpanId, + ParentId, + Start, + Duration, + Error, + Meta, + Metrics, + Type, + MetaStruct, + SpanLinks, +} + +impl FromStr for SpanKey { + type Err = SpanKeyParseError; + + fn from_str(s: &str) -> Result { + match s { + "service" => Ok(SpanKey::Service), + "name" => Ok(SpanKey::Name), + "resource" => Ok(SpanKey::Resource), + "trace_id" => Ok(SpanKey::TraceId), + "span_id" => Ok(SpanKey::SpanId), + "parent_id" => Ok(SpanKey::ParentId), + "start" => Ok(SpanKey::Start), + "duration" => Ok(SpanKey::Duration), + "error" => Ok(SpanKey::Error), + "meta" => Ok(SpanKey::Meta), + "metrics" => Ok(SpanKey::Metrics), + "type" => Ok(SpanKey::Type), + "meta_struct" => Ok(SpanKey::MetaStruct), + "span_links" => Ok(SpanKey::SpanLinks), + _ => Err(SpanKeyParseError::new(format!("Invalid span key: {}", s))), + } + } +} + +#[derive(Clone, Debug, Default, PartialEq, Serialize)] +pub struct Span { + pub service: BytesString, + pub name: BytesString, + pub resource: BytesString, + pub r#type: BytesString, + pub trace_id: u64, + pub span_id: u64, + pub parent_id: u64, + pub start: i64, + pub duration: i64, + #[serde(skip_serializing_if = "is_default")] + pub error: i32, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub metrics: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta_struct: HashMap>, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_links: Vec, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)] +pub struct SpanLink { + pub trace_id: u64, + pub trace_id_high: u64, + pub span_id: u64, + pub attributes: HashMap, + pub tracestate: BytesString, + pub flags: u64, +} + +#[derive(Debug)] +pub struct SpanKeyParseError { + pub message: String, +} + +impl SpanKeyParseError { + pub fn new(message: impl Into) -> Self { + SpanKeyParseError { + message: message.into(), + } + } +} +impl fmt::Display for SpanKeyParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SpanKeyParseError: {}", self.message) + } +} +impl std::error::Error for SpanKeyParseError {} + +fn is_default(t: &T) -> bool { + t == &T::default() +} diff --git a/trace-utils/src/test_utils/mod.rs b/trace-utils/src/test_utils/mod.rs index f225ac774..357260e77 100644 --- a/trace-utils/src/test_utils/mod.rs +++ b/trace-utils/src/test_utils/mod.rs @@ -7,14 +7,72 @@ use std::collections::HashMap; use std::time::Duration; use crate::send_data::SendData; +use crate::span_v04::Span; use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use datadog_trace_protobuf::pb; use ddcommon::Endpoint; use httpmock::Mock; use serde_json::json; +use tinybytes::BytesString; use tokio::time::sleep; +pub fn create_test_no_alloc_span( + trace_id: u64, + span_id: u64, + parent_id: u64, + start: i64, + is_top_level: bool, +) -> Span { + let mut span = Span { + trace_id, + span_id, + service: BytesString::from_slice("test-service".as_ref()).unwrap(), + name: BytesString::from_slice("test_name".as_ref()).unwrap(), + resource: BytesString::from_slice("test-resource".as_ref()).unwrap(), + parent_id, + start, + duration: 5, + error: 0, + meta: HashMap::from([ + ( + BytesString::from_slice("service".as_ref()).unwrap(), + BytesString::from_slice("test-service".as_ref()).unwrap(), + ), + ( + BytesString::from_slice("env".as_ref()).unwrap(), + BytesString::from_slice("test-env".as_ref()).unwrap(), + ), + ( + BytesString::from_slice("runtime-id".as_ref()).unwrap(), + BytesString::from_slice("test-runtime-id-value".as_ref()).unwrap(), + ), + ]), + metrics: HashMap::new(), + r#type: BytesString::default(), + meta_struct: HashMap::new(), + span_links: vec![], + }; + if is_top_level { + span.metrics + .insert(BytesString::from_slice("_top_level".as_ref()).unwrap(), 1.0); + span.meta.insert( + BytesString::from_slice("_dd.origin".as_ref()).unwrap(), + BytesString::from_slice("cloudfunction".as_ref()).unwrap(), + ); + span.meta.insert( + BytesString::from_slice("origin".as_ref()).unwrap(), + BytesString::from_slice("cloudfunction".as_ref()).unwrap(), + ); + span.meta.insert( + BytesString::from_slice("functionname".as_ref()).unwrap(), + BytesString::from_slice("dummy_function_name".as_ref()).unwrap(), + ); + span.r#type = BytesString::from_slice("serverless".as_ref()).unwrap(); + } + span +} + pub fn create_test_span( trace_id: u64, span_id: u64, diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index a1a40aa20..69a647e72 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -16,7 +16,7 @@ pub use crate::send_data::send_data_result::SendDataResult; pub use crate::send_data::SendData; pub use crate::tracer_header_tags::TracerHeaderTags; use crate::tracer_payload; -use crate::tracer_payload::{TraceEncoding, TracerPayloadCollection}; +use crate::tracer_payload::{TraceCollection, TracerPayloadCollection}; use datadog_trace_normalization::normalizer; use datadog_trace_protobuf::pb; use ddcommon::azure_app_services; @@ -555,15 +555,14 @@ macro_rules! parse_root_span_tags { } pub fn collect_trace_chunks( - mut traces: Vec>, + traces: TraceCollection, tracer_header_tags: &TracerHeaderTags, process_chunk: &mut T, is_agentless: bool, - encoding_type: TraceEncoding, ) -> TracerPayloadCollection { - match encoding_type { - TraceEncoding::V04 => TracerPayloadCollection::V04(traces), - TraceEncoding::V07 => { + match traces { + TraceCollection::V04(traces) => TracerPayloadCollection::V04(traces), + TraceCollection::V07(mut traces) => { let mut trace_chunks: Vec = Vec::new(); // We'll skip setting the global metadata and rely on the agent to unpack these diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 8ae0a963f..4be7510da 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -1,14 +1,16 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::span_v04::Span; use crate::{ msgpack_decoder, trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}, }; use datadog_trace_protobuf::pb; use std::cmp::Ordering; +use tinybytes; -pub type TracerPayloadV04 = Vec; +pub type TracerPayloadV04 = Vec; #[derive(Debug, Clone)] /// Enumerates the different encoding types. @@ -19,6 +21,12 @@ pub enum TraceEncoding { V07, } +/// A collection of traces before they are turned into TraceChunks. +pub enum TraceCollection { + V07(Vec>), + V04(Vec>), +} + #[derive(Debug, Clone)] /// Enum representing a general abstraction for a collection of tracer payloads. pub enum TracerPayloadCollection { @@ -159,8 +167,8 @@ impl TraceChunkProcessor for DefaultTraceChunkProcessor { /// the conversion process, handling different encoding types and ensuring that all required /// data is available for the conversion. pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> { - /// A byte slice containing the serialized msgpack data. - data: &'a [u8], + /// A tinybytes::Bytes slice containing the serialized msgpack data. + data: tinybytes::Bytes, /// Reference to `TracerHeaderTags` containing metadata for the trace. tracer_header_tags: &'a TracerHeaderTags<'a>, /// A mutable reference to an implementation of `TraceChunkProcessor` that processes each @@ -177,7 +185,7 @@ pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> { impl<'a, T: TraceChunkProcessor + 'a> TracerPayloadParams<'a, T> { pub fn new( - data: &'a [u8], + data: tinybytes::Bytes, tracer_header_tags: &'a TracerHeaderTags, chunk_processor: &'a mut T, is_agentless: bool, @@ -223,12 +231,13 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto /// DefaultTraceChunkProcessor, TraceEncoding, TracerPayloadCollection, TracerPayloadParams, /// }; /// use std::convert::TryInto; - /// - /// let data = &[/* msgpack data */]; - /// + /// use tinybytes; + /// // This will likely be a &[u8] slice in practice. + /// let data: Vec = Vec::new(); + /// let data_as_bytes = tinybytes::Bytes::from(data); /// let tracer_header_tags = &TracerHeaderTags::default(); /// let result: Result = TracerPayloadParams::new( - /// data, + /// data_as_bytes, /// tracer_header_tags, /// &mut DefaultTraceChunkProcessor, /// false, @@ -244,13 +253,8 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - // msgpack::decoder::from_slice requires a mutable ref to self.data, so we need to - // create a local copy of the ref to make the ref mutable - let mut data_slice: &[u8] = self.data; - let data: &mut &[u8] = &mut data_slice; - - let traces: Vec> = - match msgpack_decoder::v04::decoder::from_slice(data) { + let traces: Vec> = + match msgpack_decoder::v04::decoder::from_slice(self.data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}") @@ -262,11 +266,10 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto } Ok(collect_trace_chunks( - traces, + TraceCollection::V04(traces), self.tracer_header_tags, self.chunk_processor, self.is_agentless, - TraceEncoding::V04, )) } _ => todo!("Encodings other than TraceEncoding::V04 not implemented yet."), @@ -277,10 +280,11 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto #[cfg(test)] mod tests { use super::*; - use crate::test_utils::create_test_span; + use crate::test_utils::create_test_no_alloc_span; use datadog_trace_protobuf::pb; use serde_json::json; use std::collections::HashMap; + use tinybytes::BytesString; fn create_dummy_collection_v07() -> TracerPayloadCollection { TracerPayloadCollection::V07(vec![pb::TracerPayload { @@ -303,12 +307,12 @@ mod tests { }]) } - fn create_trace() -> Vec { + fn create_trace() -> Vec { vec![ // create a root span with metrics - create_test_span(1234, 12341, 0, 1, true), - create_test_span(1234, 12342, 12341, 1, false), - create_test_span(1234, 12343, 12342, 1, false), + create_test_no_alloc_span(1234, 12341, 0, 1, true), + create_test_no_alloc_span(1234, 12342, 12341, 1, false), + create_test_no_alloc_span(1234, 12343, 12342, 1, false), ] } @@ -331,7 +335,7 @@ mod tests { #[test] fn test_append_traces_v04() { let mut trace = - TracerPayloadCollection::V04(vec![vec![create_test_span(0, 1, 0, 2, true)]]); + TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]); let empty = TracerPayloadCollection::V04(vec![]); @@ -378,10 +382,10 @@ mod tests { "type": "serverless", }]); - let expected_serialized_span_data1 = vec![pb::Span { - service: "test-service".to_string(), - name: "test-service-name".to_string(), - resource: "test-service-resource".to_string(), + let expected_serialized_span_data1 = vec![Span { + service: BytesString::from_slice("test-service".as_ref()).unwrap(), + name: BytesString::from_slice("test-service-name".as_ref()).unwrap(), + resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(), trace_id: 111, span_id: 222, parent_id: 100, @@ -391,7 +395,7 @@ mod tests { meta: HashMap::new(), metrics: HashMap::new(), meta_struct: HashMap::new(), - r#type: "serverless".to_string(), + r#type: BytesString::from_slice("serverless".as_ref()).unwrap(), span_links: vec![], }]; @@ -410,10 +414,10 @@ mod tests { "type": "", }]); - let expected_serialized_span_data2 = vec![pb::Span { - service: "test-service".to_string(), - name: "test-service-name".to_string(), - resource: "test-service-resource".to_string(), + let expected_serialized_span_data2 = vec![Span { + service: BytesString::from_slice("test-service".as_ref()).unwrap(), + name: BytesString::from_slice("test-service-name".as_ref()).unwrap(), + resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(), trace_id: 111, span_id: 333, parent_id: 100, @@ -423,17 +427,18 @@ mod tests { meta: HashMap::new(), metrics: HashMap::new(), meta_struct: HashMap::new(), - r#type: "".to_string(), + r#type: BytesString::default(), span_links: vec![], }]; let data = rmp_serde::to_vec(&vec![span_data1, span_data2]) .expect("Failed to serialize test span."); + let data = tinybytes::Bytes::from(data); let tracer_header_tags = &TracerHeaderTags::default(); let result: anyhow::Result = TracerPayloadParams::new( - &data, + data, tracer_header_tags, &mut DefaultTraceChunkProcessor, false, @@ -459,10 +464,11 @@ mod tests { let dummy_trace = create_trace(); let expected = vec![dummy_trace.clone()]; let payload = rmp_serde::to_vec_named(&expected).unwrap(); + let payload = tinybytes::Bytes::from(payload); let tracer_header_tags = &TracerHeaderTags::default(); let result: anyhow::Result = TracerPayloadParams::new( - &payload, + payload, tracer_header_tags, &mut DefaultTraceChunkProcessor, false, diff --git a/trace-utils/tests/test_send_data.rs b/trace-utils/tests/test_send_data.rs index 61bf964b5..ad2b97b7d 100644 --- a/trace-utils/tests/test_send_data.rs +++ b/trace-utils/tests/test_send_data.rs @@ -2,15 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 #[cfg(test)] - mod tracing_integration_tests { - use datadog_trace_utils::send_data::SendData; - use datadog_trace_utils::test_utils::create_test_span; + use datadog_trace_utils::test_utils::create_test_no_alloc_span; use datadog_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent; use datadog_trace_utils::trace_utils::TracerHeaderTags; use datadog_trace_utils::tracer_payload::TracerPayloadCollection; use ddcommon::Endpoint; + use tinybytes::BytesString; #[cfg_attr(miri, ignore)] #[tokio::test] @@ -35,14 +34,20 @@ mod tracing_integration_tests { .await, ); - let mut span_1 = create_test_span(1234, 12342, 12341, 1, false); - span_1.metrics.insert("_dd_metric1".to_string(), 1.0); - span_1.metrics.insert("_dd_metric2".to_string(), 2.0); + let mut span_1 = create_test_no_alloc_span(1234, 12342, 12341, 1, false); + span_1.metrics.insert( + BytesString::from_slice("_dd_metric1".as_ref()).unwrap(), + 1.0, + ); + span_1.metrics.insert( + BytesString::from_slice("_dd_metric2".as_ref()).unwrap(), + 2.0, + ); - let span_2 = create_test_span(1234, 12343, 12341, 1, false); + let span_2 = create_test_no_alloc_span(1234, 12343, 12341, 1, false); - let mut root_span = create_test_span(1234, 12341, 0, 0, true); - root_span.r#type = "web".to_string(); + let mut root_span = create_test_no_alloc_span(1234, 12341, 0, 0, true); + root_span.r#type = BytesString::from_slice("web".as_ref()).unwrap(); let trace = vec![span_1, span_2, root_span];