diff --git a/data-pipeline-ffi/src/trace_exporter.rs b/data-pipeline-ffi/src/trace_exporter.rs index 04637c6e1..eb3f44064 100644 --- a/data-pipeline-ffi/src/trace_exporter.rs +++ b/data-pipeline-ffi/src/trace_exporter.rs @@ -369,7 +369,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send( mod tests { use super::*; use crate::error::ddog_trace_exporter_error_free; - use datadog_trace_utils::span_v04::Span; + use datadog_trace_utils::span::v04::Span; use httpmock::prelude::*; use httpmock::MockServer; use std::{borrow::Borrow, mem::MaybeUninit}; diff --git a/data-pipeline/benches/span_concentrator_bench.rs b/data-pipeline/benches/span_concentrator_bench.rs index de8ee780c..9c5853e2c 100644 --- a/data-pipeline/benches/span_concentrator_bench.rs +++ b/data-pipeline/benches/span_concentrator_bench.rs @@ -7,7 +7,7 @@ use std::{ use criterion::{criterion_group, Criterion}; use data_pipeline::span_concentrator::SpanConcentrator; -use datadog_trace_utils::span_v04::Span; +use datadog_trace_utils::span::v04::Span; fn get_bucket_start(now: SystemTime, n: u64) -> i64 { let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n); diff --git a/data-pipeline/src/span_concentrator/aggregation.rs b/data-pipeline/src/span_concentrator/aggregation.rs index 04f693290..fd597147f 100644 --- a/data-pipeline/src/span_concentrator/aggregation.rs +++ b/data-pipeline/src/span_concentrator/aggregation.rs @@ -4,7 +4,7 @@ //! This includes the aggregation key to group spans together and the computation of stats from a //! span. use datadog_trace_protobuf::pb; -use datadog_trace_utils::span_v04::{trace_utils, Span}; +use datadog_trace_utils::span::v04::{trace_utils, Span}; use std::borrow::Borrow; use std::borrow::Cow; use std::collections::HashMap; diff --git a/data-pipeline/src/span_concentrator/mod.rs b/data-pipeline/src/span_concentrator/mod.rs index 866b8f8a5..b162bd544 100644 --- a/data-pipeline/src/span_concentrator/mod.rs +++ b/data-pipeline/src/span_concentrator/mod.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::time::{self, Duration, SystemTime}; use datadog_trace_protobuf::pb; -use datadog_trace_utils::span_v04::{trace_utils, Span}; +use datadog_trace_utils::span::v04::{trace_utils, Span}; use aggregation::{AggregationKey, StatsBucket}; diff --git a/data-pipeline/src/span_concentrator/tests.rs b/data-pipeline/src/span_concentrator/tests.rs index 342edfcd3..d7be2d791 100644 --- a/data-pipeline/src/span_concentrator/tests.rs +++ b/data-pipeline/src/span_concentrator/tests.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; -use datadog_trace_utils::span_v04::trace_utils::compute_top_level_span; +use datadog_trace_utils::span::v04::trace_utils::compute_top_level_span; use rand::{thread_rng, Rng}; const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64; diff --git a/data-pipeline/src/stats_exporter.rs b/data-pipeline/src/stats_exporter.rs index 87ab77823..2ce5b49fc 100644 --- a/data-pipeline/src/stats_exporter.rs +++ b/data-pipeline/src/stats_exporter.rs @@ -187,7 +187,7 @@ pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result { #[cfg(test)] mod tests { use super::*; - use datadog_trace_utils::span_v04::{trace_utils, Span}; + use datadog_trace_utils::span::v04::{trace_utils, Span}; use httpmock::prelude::*; use httpmock::MockServer; use time::Duration; diff --git a/data-pipeline/src/trace_exporter/error.rs b/data-pipeline/src/trace_exporter/error.rs index b488b0a2e..4bc30f0e3 100644 --- a/data-pipeline/src/trace_exporter/error.rs +++ b/data-pipeline/src/trace_exporter/error.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::telemetry::error::TelemetryError; -use crate::trace_exporter::msgpack_decoder::v04::error::DecodeError; +use crate::trace_exporter::msgpack_decoder::decode::error::DecodeError; use hyper::http::StatusCode; use hyper::Error as HyperError; use serde_json::error::Error as SerdeError; diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index 78ed78456..a378d4e27 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -11,7 +11,7 @@ use crate::{ }; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; -use datadog_trace_utils::span_v04::{ +use datadog_trace_utils::span::v04::{ trace_utils::{compute_top_level_span, has_top_level}, Span, }; @@ -588,7 +588,7 @@ impl TraceExporter { fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result { // TODO base on input format - let (mut traces, size) = match msgpack_decoder::v04::decoder::from_slice(data) { + let (mut traces, size) = match msgpack_decoder::v04::from_slice(data) { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); @@ -976,7 +976,7 @@ mod tests { use self::error::AgentErrorKind; use self::error::BuilderErrorKind; use super::*; - use datadog_trace_utils::span_v04::Span; + use datadog_trace_utils::span::v04::Span; use httpmock::prelude::*; use httpmock::MockServer; use std::collections::HashMap; diff --git a/trace-utils/src/lib.rs b/trace-utils/src/lib.rs index 730bfde98..7f51bda47 100644 --- a/trace-utils/src/lib.rs +++ b/trace-utils/src/lib.rs @@ -12,4 +12,4 @@ pub mod trace_utils; pub mod tracer_header_tags; pub mod tracer_payload; -pub mod span_v04; +pub mod span; diff --git a/trace-utils/src/msgpack_decoder/v04/error.rs b/trace-utils/src/msgpack_decoder/decode/error.rs similarity index 100% rename from trace-utils/src/msgpack_decoder/v04/error.rs rename to trace-utils/src/msgpack_decoder/decode/error.rs diff --git a/trace-utils/src/msgpack_decoder/decode/map.rs b/trace-utils/src/msgpack_decoder/decode/map.rs new file mode 100644 index 000000000..ed6874ca2 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/decode/map.rs @@ -0,0 +1,88 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::decode::error::DecodeError; +use rmp::{decode, decode::RmpRead, Marker}; +use std::collections::HashMap; +use tinybytes::Bytes; + +/// Reads a map from the buffer and returns it as a `HashMap`. +/// +/// This function is generic over the key and value types of the map, and it uses a provided +/// function to read key-value pairs from the buffer. +/// +/// # Arguments +/// +/// * `len` - The number of key-value pairs to read from the buffer. +/// * `buf` - A reference to the Bytes containing the encoded map data. +/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a +/// `Result<(K, V), DecodeError>`. +/// +/// # Returns +/// +/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The `read_pair` function returns an error while reading a key-value pair. +/// +/// # Type Parameters +/// +/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. +/// * `V` - The type of the values in the map. +/// * `F` - The type of the function used to read key-value pairs from the buffer. +#[inline] +pub fn read_map( + len: usize, + buf: &mut Bytes, + read_pair: F, +) -> Result, DecodeError> +where + K: std::hash::Hash + Eq, + F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>, +{ + let mut map = HashMap::with_capacity(len); + for _ in 0..len { + let (k, v) = read_pair(buf)?; + map.insert(k, v); + } + Ok(map) +} + +/// Reads map length from the buffer +/// +/// # Arguments +/// +/// * `buf` - A reference to the Bytes containing the encoded map data. +/// +/// # Returns +/// +/// * `Ok(usize)` - Map length. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The buffer does not contain a map. +/// - There is an error reading from the buffer. +#[inline] +pub fn read_map_len(buf: &mut &[u8]) -> Result { + match decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? + { + Marker::FixMap(len) => Ok(len as usize), + Marker::Map16 => buf + .read_data_u16() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + Marker::Map32 => buf + .read_data_u32() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + _ => Err(DecodeError::InvalidType( + "Unable to read map from buffer".to_owned(), + )), + } +} diff --git a/trace-utils/src/msgpack_decoder/decode/meta_struct.rs b/trace-utils/src/msgpack_decoder/decode/meta_struct.rs new file mode 100644 index 000000000..57d1f6975 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/decode/meta_struct.rs @@ -0,0 +1,35 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; +use crate::msgpack_decoder::decode::number::read_number_bytes; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes}; +use rmp::decode; +use std::collections::HashMap; +use tinybytes::{Bytes, BytesString}; + +#[inline] +pub fn read_meta_struct(buf: &mut Bytes) -> Result>, DecodeError> { + if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { + return Ok(empty_map); + } + + fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec), DecodeError> { + let key = read_string_bytes(buf)?; + let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) + })?; + + let mut v = Vec::with_capacity(array_len as usize); + + for _ in 0..array_len { + let value = read_number_bytes(buf)?; + v.push(value); + } + Ok((key, v)) + } + + let len = read_map_len(unsafe { buf.as_mut_slice() })?; + read_map(len, buf, read_meta_struct_pair) +} diff --git a/trace-utils/src/msgpack_decoder/decode/metrics.rs b/trace-utils/src/msgpack_decoder/decode/metrics.rs new file mode 100644 index 000000000..477d1bd39 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/decode/metrics.rs @@ -0,0 +1,27 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; +use crate::msgpack_decoder::decode::number::read_number_bytes; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes}; +use std::collections::HashMap; +use tinybytes::{Bytes, BytesString}; + +#[inline] +pub fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { + let key = read_string_bytes(buf)?; + let v = read_number_bytes(buf)?; + + Ok((key, v)) +} +#[inline] +pub fn read_metrics(buf: &mut Bytes) -> Result, DecodeError> { + if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { + return Ok(empty_map); + } + + let len = read_map_len(unsafe { buf.as_mut_slice() })?; + + read_map(len, buf, read_metric_pair) +} diff --git a/trace-utils/src/msgpack_decoder/decode/mod.rs b/trace-utils/src/msgpack_decoder/decode/mod.rs new file mode 100644 index 000000000..dd07848ec --- /dev/null +++ b/trace-utils/src/msgpack_decoder/decode/mod.rs @@ -0,0 +1,10 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod error; +pub mod map; +pub mod meta_struct; +pub mod metrics; +pub mod number; +pub mod span_link; +pub mod string; diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/decode/number.rs similarity index 99% rename from trace-utils/src/msgpack_decoder/v04/number.rs rename to trace-utils/src/msgpack_decoder/decode/number.rs index 219591481..44891383a 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/decode/number.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use super::error::DecodeError; +use crate::msgpack_decoder::decode::error::DecodeError; use rmp::{decode::RmpRead, Marker}; use std::fmt; use tinybytes::Bytes; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/decode/span_link.rs similarity index 94% rename from trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs rename to trace-utils/src/msgpack_decoder/decode/span_link.rs index 2e9c4ec5f..7d4ad48dc 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/decode/span_link.rs @@ -1,12 +1,12 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::v04::decoder::{ +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::msgpack_decoder::decode::number::read_number_bytes; +use crate::msgpack_decoder::decode::string::{ handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref, }; -use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_number_bytes; -use crate::span_v04::SpanLink; +use crate::span::v04::SpanLink; use rmp::Marker; use std::str::FromStr; use tinybytes::Bytes; @@ -98,7 +98,7 @@ fn decode_span_link(buf: &mut Bytes) -> Result { #[cfg(test)] mod tests { use super::SpanLinkKey; - use crate::msgpack_decoder::v04::error::DecodeError; + use crate::msgpack_decoder::decode::error::DecodeError; use std::str::FromStr; #[test] diff --git a/trace-utils/src/msgpack_decoder/decode/string.rs b/trace-utils/src/msgpack_decoder/decode/string.rs new file mode 100644 index 000000000..3137fbd73 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/decode/string.rs @@ -0,0 +1,97 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::decode::error::DecodeError; +use rmp::decode; +use rmp::decode::DecodeStringError; +use std::collections::HashMap; +use tinybytes::{Bytes, BytesString}; + +// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) +const NULL_MARKER: &u8 = &0xc0; + +#[inline] +pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { + decode::read_str_from_slice(buf).map_err(|e| match e { + DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), + DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), + DecodeStringError::TypeMismatch(marker) => { + DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) + } + DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), + _ => DecodeError::IOError, + }) +} + +#[inline] +pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_ref_nomut(buf).map(|(str, newbuf)| { + *buf = newbuf; + str + }) +} + +#[inline] +pub fn read_string_bytes(buf: &mut Bytes) -> Result { + // Note: we need to pass a &'static lifetime here, otherwise it'll complain + read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| { + let string = BytesString::from_bytes_slice(buf, str); + *unsafe { buf.as_mut_slice() } = newbuf; + string + }) +} + +#[inline] +pub fn read_nullable_string_bytes(buf: &mut Bytes) -> Result { + if let Some(empty_string) = handle_null_marker(buf, BytesString::default) { + Ok(empty_string) + } else { + read_string_bytes(buf) + } +} + +#[inline] +// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the +// BytesStrings. +pub fn read_str_map_to_bytes_strings( + buf: &mut Bytes, +) -> Result, DecodeError> { + let len = decode::read_map_len(unsafe { buf.as_mut_slice() }) + .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; + + let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); + for _ in 0..len { + let key = read_string_bytes(buf)?; + let value = read_string_bytes(buf)?; + map.insert(key, value); + } + Ok(map) +} + +#[inline] +pub fn read_nullable_str_map_to_bytes_strings( + buf: &mut Bytes, +) -> Result, DecodeError> { + if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { + return Ok(empty_map); + } + + read_str_map_to_bytes_strings(buf) +} + +/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is +/// null and return the default value. If it is not null, you can continue to decode as expected. +#[inline] +pub fn handle_null_marker(buf: &mut Bytes, default: F) -> Option +where + F: FnOnce() -> T, +{ + let slice = unsafe { buf.as_mut_slice() }; + + if slice.first() == Some(NULL_MARKER) { + *slice = &slice[1..]; + Some(default()) + } else { + None + } +} diff --git a/trace-utils/src/msgpack_decoder/mod.rs b/trace-utils/src/msgpack_decoder/mod.rs index f4e980a0a..8ff33a59e 100644 --- a/trace-utils/src/msgpack_decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/mod.rs @@ -1,4 +1,5 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +pub mod decode; pub mod v04; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs deleted file mode 100644 index d5e01f6c9..000000000 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ /dev/null @@ -1,855 +0,0 @@ -// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -mod span; -mod span_link; - -use self::span::decode_span; -use super::error::DecodeError; -use super::number::read_number_bytes; -use crate::span_v04::Span; -use rmp::decode::DecodeStringError; -use rmp::{decode, decode::RmpRead, Marker}; -use std::{collections::HashMap, f64}; -use tinybytes::{Bytes, BytesString}; - -// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) -const NULL_MARKER: &u8 = &0xc0; - -/// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. -/// -/// -/// -/// # Arguments -/// -/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be -/// encoded msgpack data containing a list of a list of v04 spans. -/// -/// # Returns -/// -/// * `Ok(Vec)` - A vector of decoded `TracerPayloadV04` objects if successful. -/// * `Err(DecodeError)` - An error if the decoding process fails. -/// -/// # Errors -/// -/// This function will return an error if: -/// - The array length for trace count or span count cannot be read. -/// - Any span cannot be decoded. -/// -/// # Examples -/// -/// ``` -/// use datadog_trace_protobuf::pb::Span; -/// use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; -/// use rmp_serde::to_vec_named; -/// use tinybytes; -/// -/// let span = Span { -/// name: "test-span".to_owned(), -/// ..Default::default() -/// }; -/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); -/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); -/// let (decoded_traces, _payload_size) = -/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); -/// -/// assert_eq!(1, decoded_traces.len()); -/// assert_eq!(1, decoded_traces[0].len()); -/// let decoded_span = &decoded_traces[0][0]; -/// assert_eq!("test-span", decoded_span.name.as_str()); -/// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { - let trace_count = - rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) - })?; - - let start_len = data.len(); - - Ok(( - (0..trace_count).try_fold( - Vec::with_capacity( - trace_count - .try_into() - .expect("Unable to cast trace_count to usize"), - ), - |mut traces, _| { - let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) - .map_err(|_| { - DecodeError::InvalidFormat( - "Unable to read array len for span count".to_owned(), - ) - })?; - - let trace = (0..span_count).try_fold( - Vec::with_capacity( - span_count - .try_into() - .expect("Unable to cast span_count to usize"), - ), - |mut trace, _| { - let span = decode_span(&mut data)?; - trace.push(span); - Ok(trace) - }, - )?; - - traces.push(trace); - - Ok(traces) - }, - )?, - start_len - data.len(), - )) -} - -#[inline] -fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { - decode::read_str_from_slice(buf).map_err(|e| match e { - DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), - DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), - DecodeStringError::TypeMismatch(marker) => { - DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) - } - DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), - _ => DecodeError::IOError, - }) -} - -#[inline] -fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - read_string_ref_nomut(buf).map(|(str, newbuf)| { - *buf = newbuf; - str - }) -} - -#[inline] -fn read_string_bytes(buf: &mut Bytes) -> Result { - // Note: we need to pass a &'static lifetime here, otherwise it'll complain - read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| { - let string = BytesString::from_bytes_slice(buf, str); - *unsafe { buf.as_mut_slice() } = newbuf; - string - }) -} - -#[inline] -fn read_nullable_string_bytes(buf: &mut Bytes) -> Result { - if let Some(empty_string) = handle_null_marker(buf, BytesString::default) { - Ok(empty_string) - } else { - read_string_bytes(buf) - } -} - -#[inline] -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings. -fn read_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - let len = decode::read_map_len(unsafe { buf.as_mut_slice() }) - .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; - - let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); - for _ in 0..len { - let key = read_string_bytes(buf)?; - let value = read_string_bytes(buf)?; - map.insert(key, value); - } - Ok(map) -} - -#[inline] -fn read_nullable_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - read_str_map_to_bytes_strings(buf) -} - -#[inline] -fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { - let key = read_string_bytes(buf)?; - let v = read_number_bytes(buf)?; - - Ok((key, v)) -} -#[inline] -fn read_metrics(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - let len = read_map_len(unsafe { buf.as_mut_slice() })?; - - read_map(len, buf, read_metric_pair) -} - -#[inline] -fn read_meta_struct(buf: &mut Bytes) -> Result>, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec), DecodeError> { - let key = read_string_bytes(buf)?; - let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) - })?; - - let mut v = Vec::with_capacity(array_len as usize); - - for _ in 0..array_len { - let value = read_number_bytes(buf)?; - v.push(value); - } - Ok((key, v)) - } - - let len = read_map_len(unsafe { buf.as_mut_slice() })?; - read_map(len, buf, read_meta_struct_pair) -} - -/// Reads a map from the buffer and returns it as a `HashMap`. -/// -/// This function is generic over the key and value types of the map, and it uses a provided -/// function to read key-value pairs from the buffer. -/// -/// # Arguments -/// -/// * `len` - The number of key-value pairs to read from the buffer. -/// * `buf` - A reference to the Bytes containing the encoded map data. -/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a -/// `Result<(K, V), DecodeError>`. -/// -/// # Returns -/// -/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. -/// * `Err(DecodeError)` - An error if the decoding process fails. -/// -/// # Errors -/// -/// This function will return an error if: -/// - The `read_pair` function returns an error while reading a key-value pair. -/// -/// # Type Parameters -/// -/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. -/// * `V` - The type of the values in the map. -/// * `F` - The type of the function used to read key-value pairs from the buffer. -#[inline] -fn read_map( - len: usize, - buf: &mut Bytes, - read_pair: F, -) -> Result, DecodeError> -where - K: std::hash::Hash + Eq, - F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>, -{ - let mut map = HashMap::with_capacity(len); - for _ in 0..len { - let (k, v) = read_pair(buf)?; - map.insert(k, v); - } - Ok(map) -} - -#[inline] -fn read_map_len(buf: &mut &[u8]) -> Result { - match decode::read_marker(buf) - .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? - { - Marker::FixMap(len) => Ok(len as usize), - Marker::Map16 => buf - .read_data_u16() - .map_err(|_| DecodeError::IOError) - .map(|len| len as usize), - Marker::Map32 => buf - .read_data_u32() - .map_err(|_| DecodeError::IOError) - .map(|len| len as usize), - _ => Err(DecodeError::InvalidType( - "Unable to read map from buffer".to_owned(), - )), - } -} - -/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is -/// null and return the default value. If it is not null, you can continue to decode as expected. -#[inline] -fn handle_null_marker(buf: &mut Bytes, default: F) -> Option -where - F: FnOnce() -> T, -{ - let slice = unsafe { buf.as_mut_slice() }; - - if slice.first() == Some(NULL_MARKER) { - *slice = &slice[1..]; - Some(default()) - } else { - None - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_utils::create_test_json_span; - use bolero::check; - use rmp_serde; - use rmp_serde::to_vec_named; - use serde_json::json; - use tinybytes::BytesString; - - fn generate_meta_struct_element(i: u8) -> (String, Vec) { - let map = HashMap::from([ - ( - format!("meta_struct_map_key {}", i + 1), - format!("meta_struct_map_val {}", i + 1), - ), - ( - format!("meta_struct_map_key {}", i + 2), - format!("meta_struct_map_val {}", i + 2), - ), - ]); - let key = format!("key {}", i).to_owned(); - - (key, rmp_serde::to_vec_named(&map).unwrap()) - } - #[test] - fn test_empty_array() { - let encoded_data = vec![0x90]; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - let (_decoded_traces, decoded_size) = from_slice(bytes).expect("Decoding failed"); - - assert_eq!(0, decoded_size); - } - - #[test] - fn test_decoder_size() { - let span = Span { - name: BytesString::from_slice("span_name".as_ref()).unwrap(), - ..Default::default() - }; - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte - encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored - let (_decoded_traces, decoded_size) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(expected_size, decoded_size); - } - - #[test] - fn test_decoder_read_string_success() { - let expected_string = "test-service-name"; - let span = Span { - name: BytesString::from_slice(expected_string.as_ref()).unwrap(), - ..Default::default() - }; - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - assert_eq!(expected_string, decoded_span.name.as_str()); - } - - #[test] - fn test_decoder_read_null_string_success() { - let mut span = create_test_json_span(1, 2, 0, 0); - span["name"] = json!(null); - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - assert_eq!("", decoded_span.name.as_str()); - } - - #[test] - fn test_decoder_read_number_success() { - let span = create_test_json_span(1, 2, 0, 0); - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - assert_eq!(1, decoded_span.trace_id); - } - - #[test] - fn test_decoder_read_null_number_success() { - let mut span = create_test_json_span(1, 2, 0, 0); - span["trace_id"] = json!(null); - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - assert_eq!(0, decoded_span.trace_id); - } - - #[test] - fn test_decoder_meta_struct_null_map_success() { - let mut span = create_test_json_span(1, 2, 0, 0); - span["meta_struct"] = json!(null); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - assert!(decoded_span.meta_struct.is_empty()); - } - - #[test] - fn test_decoder_meta_struct_fixed_map_success() { - let expected_meta_struct = HashMap::from([ - generate_meta_struct_element(0), - generate_meta_struct_element(1), - ]); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["meta_struct"] = json!(expected_meta_struct.clone()); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - for (key, value) in expected_meta_struct.iter() { - assert_eq!( - value, - &decoded_span.meta_struct[&BytesString::from_slice(key.as_ref()).unwrap()] - ); - } - } - - #[test] - fn test_decoder_meta_struct_map_16_success() { - let expected_meta_struct: HashMap> = - (0..20).map(generate_meta_struct_element).collect(); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["meta_struct"] = json!(expected_meta_struct.clone()); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - for (key, value) in expected_meta_struct.iter() { - assert_eq!( - value, - &decoded_span.meta_struct[&BytesString::from_slice(key.as_ref()).unwrap()] - ); - } - } - - #[test] - fn test_decoder_meta_fixed_map_success() { - let expected_meta = HashMap::from([ - ("key1".to_string(), "value1".to_string()), - ("key2".to_string(), "value2".to_string()), - ]); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["meta"] = json!(expected_meta.clone()); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - for (key, value) in expected_meta.iter() { - assert_eq!( - value, - &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str() - ); - } - } - - #[test] - fn test_decoder_meta_null_map_success() { - let mut span = create_test_json_span(1, 2, 0, 0); - span["meta"] = json!(null); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - assert!(decoded_span.meta.is_empty()); - } - - #[test] - fn test_decoder_meta_map_16_success() { - let expected_meta: HashMap = (0..20) - .map(|i| { - ( - format!("key {}", i).to_owned(), - format!("value {}", i).to_owned(), - ) - }) - .collect(); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["meta"] = json!(expected_meta.clone()); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - for (key, value) in expected_meta.iter() { - assert_eq!( - value, - &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str() - ); - } - } - - #[test] - fn test_decoder_metrics_fixed_map_success() { - let expected_metrics = HashMap::from([("metric1", 1.23), ("metric2", 4.56)]); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["metrics"] = json!(expected_metrics.clone()); - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - for (key, value) in expected_metrics.iter() { - assert_eq!( - value, - &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()] - ); - } - } - - #[test] - fn test_decoder_metrics_map16_success() { - let expected_metrics: HashMap = (0..20) - .map(|i| (format!("metric{}", i), i as f64)) - .collect(); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["metrics"] = json!(expected_metrics.clone()); - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - for (key, value) in expected_metrics.iter() { - assert_eq!( - value, - &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()] - ); - } - } - - #[test] - fn test_decoder_metrics_null_success() { - let mut span = create_test_json_span(1, 2, 0, 0); - span["metrics"] = json!(null); - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - assert!(decoded_span.metrics.is_empty()); - } - - #[test] - fn test_decoder_span_link_success() { - let expected_span_link = json!({ - "trace_id": 1, - "trace_id_high": 0, - "span_id": 1, - "attributes": { - "attr1": "test_value", - "attr2": "test_value2" - }, - "tracestate": "state_test", - "flags": 0b101 - }); - - let mut span = create_test_json_span(1, 2, 0, 0); - span["span_links"] = json!([expected_span_link]); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - assert_eq!( - expected_span_link["trace_id"], - decoded_span.span_links[0].trace_id - ); - assert_eq!( - expected_span_link["trace_id_high"], - decoded_span.span_links[0].trace_id_high - ); - assert_eq!( - expected_span_link["span_id"], - decoded_span.span_links[0].span_id - ); - assert_eq!( - expected_span_link["tracestate"], - decoded_span.span_links[0].tracestate.as_str() - ); - assert_eq!( - expected_span_link["flags"], - decoded_span.span_links[0].flags - ); - assert_eq!( - expected_span_link["attributes"]["attr1"], - decoded_span.span_links[0].attributes - [&BytesString::from_slice("attr1".as_ref()).unwrap()] - .as_str() - ); - assert_eq!( - expected_span_link["attributes"]["attr2"], - decoded_span.span_links[0].attributes - [&BytesString::from_slice("attr2".as_ref()).unwrap()] - .as_str() - ); - } - - #[test] - fn test_decoder_null_span_link_success() { - let mut span = create_test_json_span(1, 2, 0, 0); - span["span_links"] = json!(null); - - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); - - assert_eq!(1, decoded_traces.len()); - assert_eq!(1, decoded_traces[0].len()); - let decoded_span = &decoded_traces[0][0]; - - assert!(decoded_span.span_links.is_empty()); - } - - #[test] - fn test_decoder_read_string_wrong_format() { - let span = Span { - service: BytesString::from_slice("my_service".as_ref()).unwrap(), - ..Default::default() - }; - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - // This changes the map size from 11 to 12 to trigger an InvalidMarkerRead error. - encoded_data[2] = 0x8c; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); - assert_eq!( - Err(DecodeError::InvalidFormat( - "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() - )), - result - ); - } - - #[test] - fn test_decoder_read_string_utf8_error() { - let invalid_seq = vec![0, 159, 146, 150]; - let invalid_str = unsafe { String::from_utf8_unchecked(invalid_seq) }; - let invalid_str_as_bytes = tinybytes::Bytes::from(invalid_str); - let span = Span { - name: unsafe { BytesString::from_bytes_unchecked(invalid_str_as_bytes) }, - ..Default::default() - }; - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); - assert_eq!( - Err(DecodeError::Utf8Error( - "invalid utf-8 sequence of 1 bytes from index 1".to_owned() - )), - result - ); - } - - #[test] - fn test_decoder_invalid_marker_for_trace_count_read() { - let span = Span::default(); - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - // This changes the entire payload to a map with 12 keys in order to trigger an error when - // reading the array len of traces - encoded_data[0] = 0x8c; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); - - assert_eq!( - Err(DecodeError::InvalidFormat( - "Unable to read array len for trace count".to_string() - )), - result - ); - } - - #[test] - fn test_decoder_invalid_marker_for_span_count_read() { - let span = Span::default(); - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - // This changes the entire payload to a map with 12 keys in order to trigger an error when - // reading the array len of spans - encoded_data[1] = 0x8c; - - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); - - assert_eq!( - Err(DecodeError::InvalidFormat( - "Unable to read array len for span count".to_owned() - )), - result - ); - } - - #[test] - fn test_decoder_read_string_type_mismatch() { - let span = Span::default(); - let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - // Modify the encoded data to cause a type mismatch by changing the marker for the `name` - // field to an integer marker - encoded_data[3] = 0x01; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); - - assert_eq!( - Err(DecodeError::InvalidType( - "Type mismatch at marker FixPos(1)".to_owned() - )), - result - ); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn fuzz_from_slice() { - check!() - .with_type::<( - String, - String, - String, - String, - String, - String, - String, - String, - u64, - u64, - u64, - i64, - )>() - .cloned() - .for_each( - |( - name, - service, - resource, - span_type, - meta_key, - meta_value, - metric_key, - metric_value, - trace_id, - span_id, - parent_id, - start, - )| { - let span = Span { - name: BytesString::from_slice(name.as_ref()).unwrap(), - service: BytesString::from_slice(service.as_ref()).unwrap(), - resource: BytesString::from_slice(resource.as_ref()).unwrap(), - r#type: BytesString::from_slice(span_type.as_ref()).unwrap(), - meta: HashMap::from([( - BytesString::from_slice(meta_key.as_ref()).unwrap(), - BytesString::from_slice(meta_value.as_ref()).unwrap(), - )]), - metrics: HashMap::from([( - BytesString::from_slice(metric_key.as_ref()).unwrap(), - metric_value.parse::().unwrap_or_default(), - )]), - trace_id, - span_id, - parent_id, - start, - ..Default::default() - }; - let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); - let result = from_slice(tinybytes::Bytes::from(encoded_data)); - - assert!(result.is_ok()); - }, - ); - } -} diff --git a/trace-utils/src/msgpack_decoder/v04/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs index 5b789e7c1..6c954d779 100644 --- a/trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -1,6 +1,653 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -pub mod decoder; -pub mod error; -pub mod number; +mod span; + +use self::span::decode_span; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v04::Span; + +/// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. +/// +/// +/// +/// # Arguments +/// +/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be +/// encoded msgpack data containing a list of a list of v04 spans. +/// +/// # Returns +/// +/// * `Ok(Vec)` - A vector of decoded `TracerPayloadV04` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The array length for trace count or span count cannot be read. +/// - Any span cannot be decoded. +/// +/// # Examples +/// +/// ``` +/// use datadog_trace_protobuf::pb::Span; +/// use datadog_trace_utils::msgpack_decoder::v04::from_slice; +/// use rmp_serde::to_vec_named; +/// use tinybytes; +/// +/// let span = Span { +/// name: "test-span".to_owned(), +/// ..Default::default() +/// }; +/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); +/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); +/// let (decoded_traces, _payload_size) = +/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// +/// assert_eq!(1, decoded_traces.len()); +/// assert_eq!(1, decoded_traces[0].len()); +/// let decoded_span = &decoded_traces[0][0]; +/// assert_eq!("test-span", decoded_span.name.as_str()); +/// ``` +pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { + let trace_count = + rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) + })?; + + let start_len = data.len(); + + Ok(( + (0..trace_count).try_fold( + Vec::with_capacity( + trace_count + .try_into() + .expect("Unable to cast trace_count to usize"), + ), + |mut traces, _| { + let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) + .map_err(|_| { + DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned(), + ) + })?; + + let trace = (0..span_count).try_fold( + Vec::with_capacity( + span_count + .try_into() + .expect("Unable to cast span_count to usize"), + ), + |mut trace, _| { + let span = decode_span(&mut data)?; + trace.push(span); + Ok(trace) + }, + )?; + + traces.push(trace); + + Ok(traces) + }, + )?, + start_len - data.len(), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::create_test_json_span; + use bolero::check; + use rmp_serde; + use rmp_serde::to_vec_named; + use serde_json::json; + use std::collections::HashMap; + use tinybytes::BytesString; + + fn generate_meta_struct_element(i: u8) -> (String, Vec) { + let map = HashMap::from([ + ( + format!("meta_struct_map_key {}", i + 1), + format!("meta_struct_map_val {}", i + 1), + ), + ( + format!("meta_struct_map_key {}", i + 2), + format!("meta_struct_map_val {}", i + 2), + ), + ]); + let key = format!("key {}", i).to_owned(); + + (key, rmp_serde::to_vec_named(&map).unwrap()) + } + #[test] + fn test_empty_array() { + let encoded_data = vec![0x90]; + let encoded_data = + unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; + let bytes = tinybytes::Bytes::from_static(encoded_data); + let (_decoded_traces, decoded_size) = from_slice(bytes).expect("Decoding failed"); + + assert_eq!(0, decoded_size); + } + + #[test] + fn test_decoder_size() { + let span = Span { + name: BytesString::from_slice("span_name".as_ref()).unwrap(), + ..Default::default() + }; + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (_decoded_traces, decoded_size) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(expected_size, decoded_size); + } + + #[test] + fn test_decoder_read_string_success() { + let expected_string = "test-service-name"; + let span = Span { + name: BytesString::from_slice(expected_string.as_ref()).unwrap(), + ..Default::default() + }; + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(expected_string, decoded_span.name.as_str()); + } + + #[test] + fn test_decoder_read_null_string_success() { + let mut span = create_test_json_span(1, 2, 0, 0); + span["name"] = json!(null); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!("", decoded_span.name.as_str()); + } + + #[test] + fn test_decoder_read_number_success() { + let span = create_test_json_span(1, 2, 0, 0); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(1, decoded_span.trace_id); + } + + #[test] + fn test_decoder_read_null_number_success() { + let mut span = create_test_json_span(1, 2, 0, 0); + span["trace_id"] = json!(null); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert_eq!(0, decoded_span.trace_id); + } + + #[test] + fn test_decoder_meta_struct_null_map_success() { + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta_struct"] = json!(null); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + assert!(decoded_span.meta_struct.is_empty()); + } + + #[test] + fn test_decoder_meta_struct_fixed_map_success() { + let expected_meta_struct = HashMap::from([ + generate_meta_struct_element(0), + generate_meta_struct_element(1), + ]); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta_struct"] = json!(expected_meta_struct.clone()); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + for (key, value) in expected_meta_struct.iter() { + assert_eq!( + value, + &decoded_span.meta_struct[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } + } + + #[test] + fn test_decoder_meta_struct_map_16_success() { + let expected_meta_struct: HashMap> = + (0..20).map(generate_meta_struct_element).collect(); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta_struct"] = json!(expected_meta_struct.clone()); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + for (key, value) in expected_meta_struct.iter() { + assert_eq!( + value, + &decoded_span.meta_struct[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } + } + + #[test] + fn test_decoder_meta_fixed_map_success() { + let expected_meta = HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta"] = json!(expected_meta.clone()); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + for (key, value) in expected_meta.iter() { + assert_eq!( + value, + &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str() + ); + } + } + + #[test] + fn test_decoder_meta_null_map_success() { + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta"] = json!(null); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + assert!(decoded_span.meta.is_empty()); + } + + #[test] + fn test_decoder_meta_map_16_success() { + let expected_meta: HashMap = (0..20) + .map(|i| { + ( + format!("key {}", i).to_owned(), + format!("value {}", i).to_owned(), + ) + }) + .collect(); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["meta"] = json!(expected_meta.clone()); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + for (key, value) in expected_meta.iter() { + assert_eq!( + value, + &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str() + ); + } + } + + #[test] + fn test_decoder_metrics_fixed_map_success() { + let expected_metrics = HashMap::from([("metric1", 1.23), ("metric2", 4.56)]); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["metrics"] = json!(expected_metrics.clone()); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + for (key, value) in expected_metrics.iter() { + assert_eq!( + value, + &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } + } + + #[test] + fn test_decoder_metrics_map16_success() { + let expected_metrics: HashMap = (0..20) + .map(|i| (format!("metric{}", i), i as f64)) + .collect(); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["metrics"] = json!(expected_metrics.clone()); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + for (key, value) in expected_metrics.iter() { + assert_eq!( + value, + &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()] + ); + } + } + + #[test] + fn test_decoder_metrics_null_success() { + let mut span = create_test_json_span(1, 2, 0, 0); + span["metrics"] = json!(null); + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + assert!(decoded_span.metrics.is_empty()); + } + + #[test] + fn test_decoder_span_link_success() { + let expected_span_link = json!({ + "trace_id": 1, + "trace_id_high": 0, + "span_id": 1, + "attributes": { + "attr1": "test_value", + "attr2": "test_value2" + }, + "tracestate": "state_test", + "flags": 0b101 + }); + + let mut span = create_test_json_span(1, 2, 0, 0); + span["span_links"] = json!([expected_span_link]); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + assert_eq!( + expected_span_link["trace_id"], + decoded_span.span_links[0].trace_id + ); + assert_eq!( + expected_span_link["trace_id_high"], + decoded_span.span_links[0].trace_id_high + ); + assert_eq!( + expected_span_link["span_id"], + decoded_span.span_links[0].span_id + ); + assert_eq!( + expected_span_link["tracestate"], + decoded_span.span_links[0].tracestate.as_str() + ); + assert_eq!( + expected_span_link["flags"], + decoded_span.span_links[0].flags + ); + assert_eq!( + expected_span_link["attributes"]["attr1"], + decoded_span.span_links[0].attributes + [&BytesString::from_slice("attr1".as_ref()).unwrap()] + .as_str() + ); + assert_eq!( + expected_span_link["attributes"]["attr2"], + decoded_span.span_links[0].attributes + [&BytesString::from_slice("attr2".as_ref()).unwrap()] + .as_str() + ); + } + + #[test] + fn test_decoder_null_span_link_success() { + let mut span = create_test_json_span(1, 2, 0, 0); + span["span_links"] = json!(null); + + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let (decoded_traces, _) = + from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + + assert_eq!(1, decoded_traces.len()); + assert_eq!(1, decoded_traces[0].len()); + let decoded_span = &decoded_traces[0][0]; + + assert!(decoded_span.span_links.is_empty()); + } + + #[test] + fn test_decoder_read_string_wrong_format() { + let span = Span { + service: BytesString::from_slice("my_service".as_ref()).unwrap(), + ..Default::default() + }; + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the map size from 11 to 12 to trigger an InvalidMarkerRead error. + encoded_data[2] = 0x8c; + let encoded_data = + unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; + let bytes = tinybytes::Bytes::from_static(encoded_data); + + let result = from_slice(bytes); + assert_eq!( + Err(DecodeError::InvalidFormat( + "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() + )), + result + ); + } + + #[test] + fn test_decoder_read_string_utf8_error() { + let invalid_seq = vec![0, 159, 146, 150]; + let invalid_str = unsafe { String::from_utf8_unchecked(invalid_seq) }; + let invalid_str_as_bytes = tinybytes::Bytes::from(invalid_str); + let span = Span { + name: unsafe { BytesString::from_bytes_unchecked(invalid_str_as_bytes) }, + ..Default::default() + }; + let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let encoded_data = + unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; + let bytes = tinybytes::Bytes::from_static(encoded_data); + + let result = from_slice(bytes); + assert_eq!( + Err(DecodeError::Utf8Error( + "invalid utf-8 sequence of 1 bytes from index 1".to_owned() + )), + result + ); + } + + #[test] + fn test_decoder_invalid_marker_for_trace_count_read() { + let span = Span::default(); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the entire payload to a map with 12 keys in order to trigger an error when + // reading the array len of traces + encoded_data[0] = 0x8c; + let encoded_data = + unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; + let bytes = tinybytes::Bytes::from_static(encoded_data); + + let result = from_slice(bytes); + + assert_eq!( + Err(DecodeError::InvalidFormat( + "Unable to read array len for trace count".to_string() + )), + result + ); + } + + #[test] + fn test_decoder_invalid_marker_for_span_count_read() { + let span = Span::default(); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // This changes the entire payload to a map with 12 keys in order to trigger an error when + // reading the array len of spans + encoded_data[1] = 0x8c; + + let encoded_data = + unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; + let bytes = tinybytes::Bytes::from_static(encoded_data); + + let result = from_slice(bytes); + + assert_eq!( + Err(DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned() + )), + result + ); + } + + #[test] + fn test_decoder_read_string_type_mismatch() { + let span = Span::default(); + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // Modify the encoded data to cause a type mismatch by changing the marker for the `name` + // field to an integer marker + encoded_data[3] = 0x01; + let encoded_data = + unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; + let bytes = tinybytes::Bytes::from_static(encoded_data); + + let result = from_slice(bytes); + + assert_eq!( + Err(DecodeError::InvalidType( + "Type mismatch at marker FixPos(1)".to_owned() + )), + result + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn fuzz_from_slice() { + check!() + .with_type::<( + String, + String, + String, + String, + String, + String, + String, + String, + u64, + u64, + u64, + i64, + )>() + .cloned() + .for_each( + |( + name, + service, + resource, + span_type, + meta_key, + meta_value, + metric_key, + metric_value, + trace_id, + span_id, + parent_id, + start, + )| { + let span = Span { + name: BytesString::from_slice(name.as_ref()).unwrap(), + service: BytesString::from_slice(service.as_ref()).unwrap(), + resource: BytesString::from_slice(resource.as_ref()).unwrap(), + r#type: BytesString::from_slice(span_type.as_ref()).unwrap(), + meta: HashMap::from([( + BytesString::from_slice(meta_key.as_ref()).unwrap(), + BytesString::from_slice(meta_value.as_ref()).unwrap(), + )]), + metrics: HashMap::from([( + BytesString::from_slice(metric_key.as_ref()).unwrap(), + metric_value.parse::().unwrap_or_default(), + )]), + trace_id, + span_id, + parent_id, + start, + ..Default::default() + }; + let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); + let result = from_slice(tinybytes::Bytes::from(encoded_data)); + + assert!(result.is_ok()); + }, + ); + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/span.rs similarity index 88% rename from trace-utils/src/msgpack_decoder/v04/decoder/span.rs rename to trace-utils/src/msgpack_decoder/v04/span.rs index ccbb7d1e9..d4dfff697 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/span.rs @@ -1,13 +1,14 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use super::{ - read_meta_struct, read_metrics, read_nullable_str_map_to_bytes_strings, - read_nullable_string_bytes, read_string_ref, span_link::read_span_links, +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::msgpack_decoder::decode::number::read_nullable_number_bytes; +use crate::msgpack_decoder::decode::span_link::read_span_links; +use crate::msgpack_decoder::decode::string::{ + read_nullable_str_map_to_bytes_strings, read_nullable_string_bytes, read_string_ref, }; -use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_nullable_number_bytes; -use crate::span_v04::{Span, SpanKey}; +use crate::msgpack_decoder::decode::{meta_struct::read_meta_struct, metrics::read_metrics}; +use crate::span::v04::{Span, SpanKey}; use tinybytes::Bytes; /// Decodes a slice of bytes into a `Span` object. @@ -69,7 +70,7 @@ fn fill_span(span: &mut Span, buf: &mut Bytes) -> Result<(), DecodeError> { #[cfg(test)] mod tests { use super::SpanKey; - use crate::span_v04::SpanKeyParseError; + use crate::span::v04::SpanKeyParseError; use std::str::FromStr; #[test] diff --git a/trace-utils/src/span/mod.rs b/trace-utils/src/span/mod.rs new file mode 100644 index 000000000..2897c3aad --- /dev/null +++ b/trace-utils/src/span/mod.rs @@ -0,0 +1,4 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod v04; diff --git a/trace-utils/src/span_v04/mod.rs b/trace-utils/src/span/v04/mod.rs similarity index 100% rename from trace-utils/src/span_v04/mod.rs rename to trace-utils/src/span/v04/mod.rs diff --git a/trace-utils/src/span_v04/span.rs b/trace-utils/src/span/v04/span.rs similarity index 100% rename from trace-utils/src/span_v04/span.rs rename to trace-utils/src/span/v04/span.rs diff --git a/trace-utils/src/span_v04/trace_utils.rs b/trace-utils/src/span/v04/trace_utils.rs similarity index 100% rename from trace-utils/src/span_v04/trace_utils.rs rename to trace-utils/src/span/v04/trace_utils.rs diff --git a/trace-utils/src/test_utils/mod.rs b/trace-utils/src/test_utils/mod.rs index 7ec33aab5..d8a876b98 100644 --- a/trace-utils/src/test_utils/mod.rs +++ b/trace-utils/src/test_utils/mod.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use std::time::Duration; use crate::send_data::SendData; -use crate::span_v04::Span; +use crate::span::v04::Span; use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use datadog_trace_protobuf::pb; diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 2623a5755..8789afd25 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span_v04::Span; +use crate::span::v04::Span; use crate::{ msgpack_decoder, trace_utils::{cmp_send_data_payloads, collect_trace_chunks, TracerHeaderTags}, @@ -260,7 +260,7 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let (traces, size) = match msgpack_decoder::v04::decoder::from_slice(self.data) { + let (traces, size) = match msgpack_decoder::v04::from_slice(self.data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}")