From 80d69c13a8829dd3e45ed1f39d527fe25909b1b6 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 3 Oct 2024 17:16:07 +0200 Subject: [PATCH 1/2] Fix the trace payload size hint The incoming buffer may be much larger than the actual contained data. The decoder knows where the payload starts and ends, but the sidecar send_trace_v04 method does not. This ensures proper trace coalescing, leading to less individual http requests. Signed-off-by: Bob Weinand --- sidecar/src/service/sidecar_server.rs | 14 +-- .../src/msgpack_decoder/v04/decoder/mod.rs | 85 ++++++++++--------- trace-utils/src/tracer_payload.rs | 24 ++++-- 3 files changed, 70 insertions(+), 53 deletions(-) diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 051bdcba5..205564921 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -265,17 +265,17 @@ impl SidecarServer { } }; - let size = data.len(); - - match tracer_payload::TracerPayloadParams::new( + let mut size = 0; + let mut processor = tracer_payload::DefaultTraceChunkProcessor; + let mut payload_params = tracer_payload::TracerPayloadParams::new( data, &headers, - &mut tracer_payload::DefaultTraceChunkProcessor, + &mut processor, target.api_key.is_some(), TraceEncoding::V04, - ) - .try_into() - { + ); + payload_params.measure_size(&mut size); + match payload_params.try_into() { Ok(payload) => { let data = SendData::new(size, payload, headers, target, None); self.trace_flusher.enqueue(data); diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 49d87b059..e603fa6a0 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -47,49 +47,56 @@ use tinybytes::{Bytes, BytesString}; /// }; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); -/// let decoded_traces = from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// let (decoded_traces, _) = from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result>, DecodeError> { +pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { let trace_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) })?; - (0..trace_count).try_fold( - Vec::with_capacity( - trace_count - .try_into() - .expect("Unable to cast trace_count to usize"), - ), - |mut traces, _| { - let span_count = - rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) - })?; - - let trace = (0..span_count).try_fold( - Vec::with_capacity( - span_count - .try_into() - .expect("Unable to cast span_count to usize"), - ), - |mut trace, _| { - let span = decode_span(&mut data)?; - trace.push(span); - Ok(trace) - }, - )?; - - traces.push(trace); + let start_len = data.len(); - Ok(traces) - }, - ) + Ok(( + (0..trace_count).try_fold( + Vec::with_capacity( + trace_count + .try_into() + .expect("Unable to cast trace_count to usize"), + ), + |mut traces, _| { + let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) + .map_err(|_| { + DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned(), + ) + })?; + + let trace = (0..span_count).try_fold( + Vec::with_capacity( + span_count + .try_into() + .expect("Unable to cast span_count to usize"), + ), + |mut trace, _| { + let span = decode_span(&mut data)?; + trace.push(span); + Ok(trace) + }, + )?; + + traces.push(trace); + + Ok(traces) + }, + )?, + start_len - data.len(), + )) } #[inline] @@ -270,7 +277,7 @@ mod tests { ..Default::default() }; let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -290,7 +297,7 @@ mod tests { span["meta_struct"] = json!(expected_meta_struct.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -314,7 +321,7 @@ mod tests { span["meta_struct"] = json!(expected_meta_struct.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -340,7 +347,7 @@ mod tests { span["meta"] = json!(expected_meta.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -370,7 +377,7 @@ mod tests { span["meta"] = json!(expected_meta.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -392,7 +399,7 @@ mod tests { let mut span = create_test_json_span(1, 2, 0, 0); span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -416,7 +423,7 @@ mod tests { let mut span = create_test_json_span(1, 2, 0, 0); span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); @@ -449,7 +456,7 @@ mod tests { span["span_links"] = json!([expected_span_link]); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let decoded_traces = + let (decoded_traces, _) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 4be7510da..af803bd42 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -171,6 +171,8 @@ pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> { data: tinybytes::Bytes, /// Reference to `TracerHeaderTags` containing metadata for the trace. tracer_header_tags: &'a TracerHeaderTags<'a>, + /// Amount of data consumed from buffer + size: Option<&'a mut usize>, /// A mutable reference to an implementation of `TraceChunkProcessor` that processes each /// `TraceChunk` after it is constructed but before it is added to the TracerPayloadCollection. /// TraceChunks are only available for v07 traces. @@ -194,11 +196,16 @@ impl<'a, T: TraceChunkProcessor + 'a> TracerPayloadParams<'a, T> { TracerPayloadParams { data, tracer_header_tags, + size: None, chunk_processor, is_agentless, encoding_type, } } + + pub fn measure_size(&mut self, size: &'a mut usize) { + self.size = Some(size); + } } // TODO: APMSP-1282 - Implement TryInto for other encoding types. Supporting TraceChunkProcessor but // not supporting v07 is a bit pointless for now. @@ -253,13 +260,16 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let traces: Vec> = - match msgpack_decoder::v04::decoder::from_slice(self.data) { - Ok(res) => res, - Err(e) => { - anyhow::bail!("Error deserializing trace from request body: {e}") - } - }; + let (traces, size) = match msgpack_decoder::v04::decoder::from_slice(self.data) { + Ok(res) => res, + Err(e) => { + anyhow::bail!("Error deserializing trace from request body: {e}") + } + }; + + if let Some(size_ref) = self.size { + *size_ref = size; + } if traces.is_empty() { anyhow::bail!("No traces deserialized from the request body."); From ce20078d5bb50643b24744487cc7b504919322e9 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 3 Oct 2024 19:19:21 +0200 Subject: [PATCH 2/2] Add test Signed-off-by: Bob Weinand --- trace-utils/src/msgpack_decoder/v04/decoder/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index e603fa6a0..3426c157b 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -47,7 +47,8 @@ use tinybytes::{Bytes, BytesString}; /// }; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); -/// let (decoded_traces, _) = from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// let (decoded_traces, _payload_size) = +/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); @@ -276,10 +277,13 @@ mod tests { name: BytesString::from_slice(expected_string.as_ref()).unwrap(), ..Default::default() }; - let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let (decoded_traces, _) = + let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte + encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored + let (decoded_traces, decoded_size) = from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + assert_eq!(expected_size, decoded_size); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); let decoded_span = &decoded_traces[0][0];