diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 051bdcba5..205564921 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -265,17 +265,17 @@ impl SidecarServer { } }; - let size = data.len(); - - match tracer_payload::TracerPayloadParams::new( + let mut size = 0; + let mut processor = tracer_payload::DefaultTraceChunkProcessor; + let mut payload_params = tracer_payload::TracerPayloadParams::new( data, &headers, - &mut tracer_payload::DefaultTraceChunkProcessor, + &mut processor, target.api_key.is_some(), TraceEncoding::V04, - ) - .try_into() - { + ); + payload_params.measure_size(&mut size); + match payload_params.try_into() { Ok(payload) => { let data = SendData::new(size, payload, headers, target, None); self.trace_flusher.enqueue(data); diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 49d87b059..3426c157b 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -47,49 +47,57 @@ use tinybytes::{Bytes, BytesString}; /// }; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); -/// let decoded_traces = from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// let (decoded_traces, _payload_size) = +/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result>, DecodeError> { +pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { let trace_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) })?; - (0..trace_count).try_fold( - Vec::with_capacity( - trace_count - .try_into() - .expect("Unable to cast trace_count to usize"), - ), - |mut traces, _| { - let span_count = - rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) - })?; - - let trace = (0..span_count).try_fold( - Vec::with_capacity( - span_count - .try_into() - .expect("Unable to cast span_count to usize"), - ), - |mut trace, _| { - let span = decode_span(&mut data)?; - trace.push(span); - Ok(trace) - }, - )?; - - traces.push(trace); + let start_len = data.len(); - Ok(traces) - }, - ) + Ok(( + (0..trace_count).try_fold( + Vec::with_capacity( + trace_count + .try_into() + .expect("Unable to cast trace_count to usize"), + ), + |mut traces, _| { + let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) + .map_err(|_| { + DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned(), + ) + })?; + + let trace = (0..span_count).try_fold( + Vec::with_capacity( + span_count + .try_into() + .expect("Unable to cast span_count to usize"), + ), + |mut trace, _| { + let span = decode_span(&mut data)?; + trace.push(span); + Ok(trace) + }, + )?; + + traces.push(trace); + + Ok(traces) + }, + )?, + start_len - data.len(), + )) } #[inline] @@ -269,10 +277,13 @@ mod tests { name: BytesString::from_slice(expected_string.as_ref()).unwrap(), ..Default::default() }; - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (decoded_traces, decoded_size) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + assert_eq!(expected_size, decoded_size); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0]; @@ -290,7 +301,7 @@ mod tests { span["meta_struct"] = json!(expected_meta_struct.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -314,7 +325,7 @@ mod tests { span["meta_struct"] = json!(expected_meta_struct.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -340,7 +351,7 @@ mod tests { span["meta"] = json!(expected_meta.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -370,7 +381,7 @@ mod tests { span["meta"] = json!(expected_meta.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -392,7 +403,7 @@ mod tests { let mut span = create_test_json_span(1, 2, 0, 0); span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -416,7 +427,7 @@ mod tests { let mut span = create_test_json_span(1, 2, 0, 0); span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -449,7 +460,7 @@ mod tests { span["span_links"] = json!([expected_span_link]); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 4be7510da..af803bd42 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -171,6 +171,8 @@ pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> { data: tinybytes::Bytes, /// Reference to `TracerHeaderTags` containing metadata for the trace. tracer_header_tags: &'a TracerHeaderTags<'a>, + /// Amount of data consumed from buffer + size: Option<&'a mut usize>, /// A mutable reference to an implementation of `TraceChunkProcessor` that processes each /// `TraceChunk` after it is constructed but before it is added to the TracerPayloadCollection. /// TraceChunks are only available for v07 traces. @@ -194,11 +196,16 @@ impl<'a, T: TraceChunkProcessor + 'a> TracerPayloadParams<'a, T> { TracerPayloadParams { data, tracer_header_tags, + size: None, chunk_processor, is_agentless, encoding_type, } } + + pub fn measure_size(&mut self, size: &'a mut usize) { + self.size = Some(size); + } } // TODO: APMSP-1282 - Implement TryInto for other encoding types. Supporting TraceChunkProcessor but // not supporting v07 is a bit pointless for now. @@ -253,13 +260,16 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let traces: Vec> = - match msgpack_decoder::v04::decoder::from_slice(self.data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; + let (traces, size) = match msgpack_decoder::v04::decoder::from_slice(self.data) { + Ok(res) => res, + Err(e) => { + anyhow::bail!("Error deserializing trace from request body: {e}") + } + }; + + if let Some(size_ref) = self.size { + *size_ref = size; + } if traces.is_empty() { anyhow::bail!("No traces deserialized from the request body.");