Skip to content

Commit

Permalink
Rearrange msgpack decoder folder structure. (#877)
Browse files Browse the repository at this point in the history
* Rearrange folder structure.

* Solve PR comments.

* Move meta to meta_struct.
* Add doc to read_map_len.
  • Loading branch information
hoolioh authored Feb 17, 2025
1 parent a8fb144 commit 4942a3a
Show file tree
Hide file tree
Showing 27 changed files with 940 additions and 885 deletions.
2 changes: 1 addition & 1 deletion data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
mod tests {
use super::*;
use crate::error::ddog_trace_exporter_error_free;
use datadog_trace_utils::span_v04::Span;
use datadog_trace_utils::span::v04::Span;
use httpmock::prelude::*;
use httpmock::MockServer;
use std::{borrow::Borrow, mem::MaybeUninit};
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use criterion::{criterion_group, Criterion};
use data_pipeline::span_concentrator::SpanConcentrator;
use datadog_trace_utils::span_v04::Span;
use datadog_trace_utils::span::v04::Span;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! This includes the aggregation key to group spans together and the computation of stats from a
//! span.
use datadog_trace_protobuf::pb;
use datadog_trace_utils::span_v04::{trace_utils, Span};
use datadog_trace_utils::span::v04::{trace_utils, Span};
use std::borrow::Borrow;
use std::borrow::Cow;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::time::{self, Duration, SystemTime};

use datadog_trace_protobuf::pb;
use datadog_trace_utils::span_v04::{trace_utils, Span};
use datadog_trace_utils::span::v04::{trace_utils, Span};

use aggregation::{AggregationKey, StatsBucket};

Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::*;
use datadog_trace_utils::span_v04::trace_utils::compute_top_level_span;
use datadog_trace_utils::span::v04::trace_utils::compute_top_level_span;
use rand::{thread_rng, Rng};

const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
#[cfg(test)]
mod tests {
use super::*;
use datadog_trace_utils::span_v04::{trace_utils, Span};
use datadog_trace_utils::span::v04::{trace_utils, Span};
use httpmock::prelude::*;
use httpmock::MockServer;
use time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/trace_exporter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::telemetry::error::TelemetryError;
use crate::trace_exporter::msgpack_decoder::v04::error::DecodeError;
use crate::trace_exporter::msgpack_decoder::decode::error::DecodeError;
use hyper::http::StatusCode;
use hyper::Error as HyperError;
use serde_json::error::Error as SerdeError;
Expand Down
6 changes: 3 additions & 3 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use datadog_trace_utils::span_v04::{
use datadog_trace_utils::span::v04::{
trace_utils::{compute_top_level_span, has_top_level},
Span,
};
Expand Down Expand Up @@ -588,7 +588,7 @@ impl TraceExporter {

fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result<String, TraceExporterError> {
// TODO base on input format
let (mut traces, size) = match msgpack_decoder::v04::decoder::from_slice(data) {
let (mut traces, size) = match msgpack_decoder::v04::from_slice(data) {
Ok(res) => res,
Err(err) => {
error!("Error deserializing trace from request body: {err}");
Expand Down Expand Up @@ -976,7 +976,7 @@ mod tests {
use self::error::AgentErrorKind;
use self::error::BuilderErrorKind;
use super::*;
use datadog_trace_utils::span_v04::Span;
use datadog_trace_utils::span::v04::Span;
use httpmock::prelude::*;
use httpmock::MockServer;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion trace-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub mod trace_utils;
pub mod tracer_header_tags;
pub mod tracer_payload;

pub mod span_v04;
pub mod span;
88 changes: 88 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::{decode, decode::RmpRead, Marker};
use std::collections::HashMap;
use tinybytes::Bytes;

/// Reads a map from the buffer and returns it as a `HashMap`.
///
/// This function is generic over the key and value types of the map, and it uses a provided
/// function to read key-value pairs from the buffer.
///
/// # Arguments
///
/// * `len` - The number of key-value pairs to read from the buffer.
/// * `buf` - A reference to the Bytes containing the encoded map data.
/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a
/// `Result<(K, V), DecodeError>`.
///
/// # Returns
///
/// * `Ok(HashMap<K, V>)` - A `HashMap` containing the decoded key-value pairs if successful.
/// * `Err(DecodeError)` - An error if the decoding process fails.
///
/// # Errors
///
/// This function will return an error if:
/// - The `read_pair` function returns an error while reading a key-value pair.
///
/// # Type Parameters
///
/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`.
/// * `V` - The type of the values in the map.
/// * `F` - The type of the function used to read key-value pairs from the buffer.
#[inline]
pub fn read_map<K, V, F>(
len: usize,
buf: &mut Bytes,
read_pair: F,
) -> Result<HashMap<K, V>, DecodeError>
where
K: std::hash::Hash + Eq,
F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>,
{
let mut map = HashMap::with_capacity(len);
for _ in 0..len {
let (k, v) = read_pair(buf)?;
map.insert(k, v);
}
Ok(map)
}

/// Reads map length from the buffer
///
/// # Arguments
///
/// * `buf` - A reference to the Bytes containing the encoded map data.
///
/// # Returns
///
/// * `Ok(usize)` - Map length.
/// * `Err(DecodeError)` - An error if the decoding process fails.
///
/// # Errors
///
/// This function will return an error if:
/// - The buffer does not contain a map.
/// - There is an error reading from the buffer.
#[inline]
pub fn read_map_len(buf: &mut &[u8]) -> Result<usize, DecodeError> {
match decode::read_marker(buf)
.map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))?
{
Marker::FixMap(len) => Ok(len as usize),
Marker::Map16 => buf
.read_data_u16()
.map_err(|_| DecodeError::IOError)
.map(|len| len as usize),
Marker::Map32 => buf
.read_data_u32()
.map_err(|_| DecodeError::IOError)
.map(|len| len as usize),
_ => Err(DecodeError::InvalidType(
"Unable to read map from buffer".to_owned(),
)),
}
}
35 changes: 35 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/meta_struct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes};
use rmp::decode;
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

#[inline]
pub fn read_meta_struct(buf: &mut Bytes) -> Result<HashMap<BytesString, Vec<u8>>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
}

fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec<u8>), DecodeError> {
let key = read_string_bytes(buf)?;
let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| {
DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned())
})?;

let mut v = Vec::with_capacity(array_len as usize);

for _ in 0..array_len {
let value = read_number_bytes(buf)?;
v.push(value);
}
Ok((key, v))
}

let len = read_map_len(unsafe { buf.as_mut_slice() })?;
read_map(len, buf, read_meta_struct_pair)
}
27 changes: 27 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes};
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

#[inline]
pub fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> {
let key = read_string_bytes(buf)?;
let v = read_number_bytes(buf)?;

Ok((key, v))
}
#[inline]
pub fn read_metrics(buf: &mut Bytes) -> Result<HashMap<BytesString, f64>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
}

let len = read_map_len(unsafe { buf.as_mut_slice() })?;

read_map(len, buf, read_metric_pair)
}
10 changes: 10 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

pub mod error;
pub mod map;
pub mod meta_struct;
pub mod metrics;
pub mod number;
pub mod span_link;
pub mod string;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use super::error::DecodeError;
use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::{decode::RmpRead, Marker};
use std::fmt;
use tinybytes::Bytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::v04::decoder::{
use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{
handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref,
};
use crate::msgpack_decoder::v04::error::DecodeError;
use crate::msgpack_decoder::v04::number::read_number_bytes;
use crate::span_v04::SpanLink;
use crate::span::v04::SpanLink;
use rmp::Marker;
use std::str::FromStr;
use tinybytes::Bytes;
Expand Down Expand Up @@ -98,7 +98,7 @@ fn decode_span_link(buf: &mut Bytes) -> Result<SpanLink, DecodeError> {
#[cfg(test)]
mod tests {
use super::SpanLinkKey;
use crate::msgpack_decoder::v04::error::DecodeError;
use crate::msgpack_decoder::decode::error::DecodeError;
use std::str::FromStr;

#[test]
Expand Down
97 changes: 97 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::decode;
use rmp::decode::DecodeStringError;
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192)
const NULL_MARKER: &u8 = &0xc0;

#[inline]
pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> {
decode::read_str_from_slice(buf).map_err(|e| match e {
DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()),
DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()),
DecodeStringError::TypeMismatch(marker) => {
DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker))
}
DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()),
_ => DecodeError::IOError,
})
}

#[inline]
pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> {
read_string_ref_nomut(buf).map(|(str, newbuf)| {
*buf = newbuf;
str
})
}

#[inline]
pub fn read_string_bytes(buf: &mut Bytes) -> Result<BytesString, DecodeError> {
// Note: we need to pass a &'static lifetime here, otherwise it'll complain
read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| {
let string = BytesString::from_bytes_slice(buf, str);
*unsafe { buf.as_mut_slice() } = newbuf;
string
})
}

#[inline]
pub fn read_nullable_string_bytes(buf: &mut Bytes) -> Result<BytesString, DecodeError> {
if let Some(empty_string) = handle_null_marker(buf, BytesString::default) {
Ok(empty_string)
} else {
read_string_bytes(buf)
}
}

#[inline]
// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the
// BytesStrings.
pub fn read_str_map_to_bytes_strings(
buf: &mut Bytes,
) -> Result<HashMap<BytesString, BytesString>, DecodeError> {
let len = decode::read_map_len(unsafe { buf.as_mut_slice() })
.map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;

let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
for _ in 0..len {
let key = read_string_bytes(buf)?;
let value = read_string_bytes(buf)?;
map.insert(key, value);
}
Ok(map)
}

#[inline]
pub fn read_nullable_str_map_to_bytes_strings(
buf: &mut Bytes,
) -> Result<HashMap<BytesString, BytesString>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
}

read_str_map_to_bytes_strings(buf)
}

/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is
/// null and return the default value. If it is not null, you can continue to decode as expected.
#[inline]
pub fn handle_null_marker<T, F>(buf: &mut Bytes, default: F) -> Option<T>
where
F: FnOnce() -> T,
{
let slice = unsafe { buf.as_mut_slice() };

if slice.first() == Some(NULL_MARKER) {
*slice = &slice[1..];
Some(default())
} else {
None
}
}
1 change: 1 addition & 0 deletions trace-utils/src/msgpack_decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

pub mod decode;
pub mod v04;
Loading

0 comments on commit 4942a3a

Please sign in to comment.