Skip to content

Commit

Permalink
chore(metrics): Reorganize Metric data type fields (#6138)
Browse files Browse the repository at this point in the history
* chore(metrics): Reorganize Metric data type fields

The `Metric` type is composed of two types of fields:

1. "key" fields which identify which data series the metric belongs to,
2. "data" fields which contain a single data point.

This change splits `struct Metric` into two parts to allow for these
types of fields to be used separately in maps or arrays. Further, the
series data is split into the name and tags for further breakdown.

* Use auto-derived implementations of Eq and Hash for MetricSeries

Signed-off-by: Bruce Guenter <bruce@timber.io>
  • Loading branch information
Bruce Guenter authored Jan 22, 2021
1 parent f50f03c commit 82f7fef
Show file tree
Hide file tree
Showing 52 changed files with 2,917 additions and 3,292 deletions.
4 changes: 2 additions & 2 deletions src/api/schema/metrics/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ impl ErrorsTotal {
impl ErrorsTotal {
/// Metric timestamp
pub async fn timestamp(&self) -> Option<DateTime<Utc>> {
self.0.timestamp
self.0.data.timestamp
}

/// Total error count
pub async fn errors_total(&self) -> f64 {
match self.0.value {
match self.0.data.value {
MetricValue::Counter { value } => value,
_ => 0.00,
}
Expand Down
26 changes: 10 additions & 16 deletions src/api/schema/metrics/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn sum_metrics<'a, I: IntoIterator<Item = &'a Metric>>(metrics: I) -> Option<Met
let m = iter.next()?;

Some(iter.fold(m.clone(), |mut m1, m2| {
m1.update_value(m2);
m1.data.update(&m2.data);
m1
}))
}
Expand All @@ -35,19 +35,13 @@ pub trait MetricsFilter<'a> {

impl<'a> MetricsFilter<'a> for Vec<Metric> {
fn processed_events_total(&self) -> Option<ProcessedEventsTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name.as_str().eq("processed_events_total")),
)?;
let sum = sum_metrics(self.iter().filter(|m| m.name() == "processed_events_total"))?;

Some(ProcessedEventsTotal::new(sum))
}

fn processed_bytes_total(&self) -> Option<ProcessedBytesTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name.as_str().eq("processed_bytes_total")),
)?;
let sum = sum_metrics(self.iter().filter(|m| m.name() == "processed_bytes_total"))?;

Some(ProcessedBytesTotal::new(sum))
}
Expand All @@ -57,7 +51,7 @@ impl<'a> MetricsFilter<'a> for Vec<&'a Metric> {
fn processed_events_total(&self) -> Option<ProcessedEventsTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name.as_str().eq("processed_events_total"))
.filter(|m| m.name() == "processed_events_total")
.copied(),
)?;

Expand All @@ -67,7 +61,7 @@ impl<'a> MetricsFilter<'a> for Vec<&'a Metric> {
fn processed_bytes_total(&self) -> Option<ProcessedBytesTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name.as_str().eq("processed_bytes_total"))
.filter(|m| m.name() == "processed_bytes_total")
.copied(),
)?;

Expand Down Expand Up @@ -148,11 +142,11 @@ pub fn component_counter_metrics(
let mut iter = metrics.into_iter();
let mut m = iter.next()?;
m = iter.fold(m, |mut m1, m2| {
m1.update_value(&m2);
m1.data.update(&m2.data);
m1
});

match m.value {
match m.data.value {
MetricValue::Counter { value }
if cache.insert(name, value).unwrap_or(0.00) < value =>
{
Expand All @@ -175,7 +169,7 @@ pub fn counter_throughput(

get_metrics(interval)
.filter(filter_fn)
.filter_map(move |m| match m.value {
.filter_map(move |m| match m.data.value {
MetricValue::Counter { value } if value > last => {
let throughput = value - last;
last = value;
Expand Down Expand Up @@ -212,11 +206,11 @@ pub fn component_counter_throughputs(
let mut iter = metrics.into_iter();
let mut m = iter.next()?;
m = iter.fold(m, |mut m1, m2| {
m1.update_value(&m2);
m1.data.update(&m2.data);
m1
});

match m.value {
match m.data.value {
MetricValue::Counter { value } => {
let last = cache.insert(name, value).unwrap_or(0.00);
let throughput = value - last;
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/metrics/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ impl HostMetrics {
fn filter_host_metric(metrics: &[Metric], name: &str) -> f64 {
metrics
.iter()
.find(|m| matches!(&m.namespace, Some(n) if n == "host") && m.name == name)
.map(|m| match m.value {
.find(|m| matches!(m.namespace(), Some(n) if n == "host") && m.name() == name)
.map(|m| match m.data.value {
MetricValue::Gauge { value } => value,
MetricValue::Counter { value } => value,
_ => 0.00,
Expand Down
44 changes: 23 additions & 21 deletions src/api/schema/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = Uptime> {
get_metrics(interval).filter_map(|m| match m.name.as_str() {
get_metrics(interval).filter_map(|m| match m.name() {
"uptime_seconds" => Some(Uptime::new(m)),
_ => None,
})
Expand All @@ -66,7 +66,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = ProcessedEventsTotal> {
get_metrics(interval).filter_map(|m| match m.name.as_str() {
get_metrics(interval).filter_map(|m| match m.name() {
"processed_events_total" => Some(ProcessedEventsTotal::new(m)),
_ => None,
})
Expand All @@ -77,7 +77,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = i64> {
counter_throughput(interval, &|m| m.name == "processed_events_total")
counter_throughput(interval, &|m| m.name() == "processed_events_total")
.map(|(_, throughput)| throughput as i64)
}

Expand All @@ -86,24 +86,26 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = Vec<ComponentProcessedEventsThroughput>> {
component_counter_throughputs(interval, &|m| m.name == "processed_events_total").map(|m| {
m.into_iter()
.map(|(m, throughput)| {
ComponentProcessedEventsThroughput::new(
m.tag_value("component_name").unwrap(),
throughput as i64,
)
})
.collect()
})
component_counter_throughputs(interval, &|m| m.name() == "processed_events_total").map(
|m| {
m.into_iter()
.map(|(m, throughput)| {
ComponentProcessedEventsThroughput::new(
m.tag_value("component_name").unwrap(),
throughput as i64,
)
})
.collect()
},
)
}

/// Component event processing metrics over `interval`.
async fn component_processed_events_totals(
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = Vec<ComponentProcessedEventsTotal>> {
component_counter_metrics(interval, &|m| m.name == "processed_events_total").map(|m| {
component_counter_metrics(interval, &|m| m.name() == "processed_events_total").map(|m| {
m.into_iter()
.map(ComponentProcessedEventsTotal::new)
.collect()
Expand All @@ -115,7 +117,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = ProcessedBytesTotal> {
get_metrics(interval).filter_map(|m| match m.name.as_str() {
get_metrics(interval).filter_map(|m| match m.name() {
"processed_bytes_total" => Some(ProcessedBytesTotal::new(m)),
_ => None,
})
Expand All @@ -126,7 +128,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = i64> {
counter_throughput(interval, &|m| m.name == "processed_bytes_total")
counter_throughput(interval, &|m| m.name() == "processed_bytes_total")
.map(|(_, throughput)| throughput as i64)
}

Expand All @@ -135,7 +137,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = Vec<ComponentProcessedBytesTotal>> {
component_counter_metrics(interval, &|m| m.name == "processed_bytes_total").map(|m| {
component_counter_metrics(interval, &|m| m.name() == "processed_bytes_total").map(|m| {
m.into_iter()
.map(ComponentProcessedBytesTotal::new)
.collect()
Expand All @@ -147,7 +149,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = Vec<ComponentProcessedBytesThroughput>> {
component_counter_throughputs(interval, &|m| m.name == "processed_bytes_total").map(|m| {
component_counter_throughputs(interval, &|m| m.name() == "processed_bytes_total").map(|m| {
m.into_iter()
.map(|(m, throughput)| {
ComponentProcessedBytesThroughput::new(
Expand All @@ -165,7 +167,7 @@ impl MetricsSubscription {
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = ErrorsTotal> {
get_metrics(interval)
.filter(|m| m.name.ends_with("_errors_total"))
.filter(|m| m.name().ends_with("_errors_total"))
.map(ErrorsTotal::new)
}

Expand All @@ -174,7 +176,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = Vec<ComponentErrorsTotal>> {
component_counter_metrics(interval, &|m| m.name.ends_with("_errors_total"))
component_counter_metrics(interval, &|m| m.name().ends_with("_errors_total"))
.map(|m| m.into_iter().map(ComponentErrorsTotal::new).collect())
}

Expand All @@ -183,7 +185,7 @@ impl MetricsSubscription {
&self,
#[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32,
) -> impl Stream<Item = MetricType> {
get_metrics(interval).filter_map(|m| match m.name.as_str() {
get_metrics(interval).filter_map(|m| match m.name() {
"uptime_seconds" => Some(MetricType::Uptime(m.into())),
"processed_events_total" => Some(MetricType::ProcessedEventsTotal(m.into())),
"processed_bytes_total" => Some(MetricType::ProcessedBytesTotal(m.into())),
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/metrics/processed_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ impl ProcessedBytesTotal {
impl ProcessedBytesTotal {
/// Metric timestamp
pub async fn timestamp(&self) -> Option<DateTime<Utc>> {
self.0.timestamp
self.0.data.timestamp
}

/// Total number of bytes processed
pub async fn processed_bytes_total(&self) -> f64 {
match self.0.value {
match self.0.data.value {
MetricValue::Counter { value } => value,
_ => 0.00,
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/metrics/processed_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ impl ProcessedEventsTotal {
impl ProcessedEventsTotal {
/// Metric timestamp
pub async fn timestamp(&self) -> Option<DateTime<Utc>> {
self.0.timestamp
self.0.data.timestamp
}

/// Total number of events processed
pub async fn processed_events_total(&self) -> f64 {
match self.0.value {
match self.0.data.value {
MetricValue::Counter { value } => value,
_ => 0.00,
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/metrics/uptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ impl Uptime {
impl Uptime {
/// Metric timestamp
pub async fn timestamp(&self) -> Option<DateTime<Utc>> {
self.0.timestamp
self.0.data.timestamp
}

/// Number of seconds the Vector instance has been alive
pub async fn seconds(&self) -> f64 {
match self.0.value {
match self.0.data.value {
MetricValue::Gauge { value } => value,
_ => 0.00,
}
Expand Down
27 changes: 10 additions & 17 deletions src/conditions/check_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ impl CheckFieldsPredicate for EqualsPredicate {
_ => false,
},
}),
Event::Metric(m) => {
m.tags
.as_ref()
.and_then(|t| t.get(&self.target))
.map_or(false, |v| match &self.arg {
CheckFieldsPredicateArg::String(s) => s.as_bytes() == v.as_bytes(),
_ => false,
})
}
Event::Metric(m) => m
.tags()
.and_then(|t| t.get(&self.target))
.map_or(false, |v| match &self.arg {
CheckFieldsPredicateArg::String(s) => s.as_bytes() == v.as_bytes(),
_ => false,
}),
}
}
}
Expand Down Expand Up @@ -244,8 +242,7 @@ impl CheckFieldsPredicate for NotEqualsPredicate {
!self.arg.iter().any(|s| b == s.as_bytes())
}),
Event::Metric(m) => m
.tags
.as_ref()
.tags()
.and_then(|t| t.get(&self.target))
.map_or(false, |v| {
!self.arg.iter().any(|s| v.as_bytes() == s.as_bytes())
Expand Down Expand Up @@ -285,8 +282,7 @@ impl CheckFieldsPredicate for RegexPredicate {
.map(|field| field.to_string_lossy())
.map_or(false, |field| self.regex.is_match(&field)),
Event::Metric(metric) => metric
.tags
.as_ref()
.tags()
.and_then(|tags| tags.get(&self.target))
.map_or(false, |field| self.regex.is_match(field)),
}
Expand Down Expand Up @@ -317,10 +313,7 @@ impl CheckFieldsPredicate for ExistsPredicate {
fn check(&self, event: &Event) -> bool {
(match event {
Event::Log(l) => l.get(&self.target).is_some(),
Event::Metric(m) => m
.tags
.as_ref()
.map_or(false, |t| t.contains_key(&self.target)),
Event::Metric(m) => m.tags().map_or(false, |t| t.contains_key(&self.target)),
}) == self.arg
}
}
Expand Down
13 changes: 5 additions & 8 deletions src/conditions/is_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@ mod test {

assert_eq!(cond.check(&Event::from("just a log")), true);
assert_eq!(
cond.check(&Event::from(Metric {
name: "test metric".to_string(),
namespace: None,
timestamp: None,
tags: None,
kind: MetricKind::Incremental,
value: MetricValue::Counter { value: 1.0 },
})),
cond.check(&Event::from(Metric::new(
"test metric".to_string(),
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
))),
false
);
}
Expand Down
13 changes: 5 additions & 8 deletions src/conditions/is_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@ mod test {

assert_eq!(cond.check(&Event::from("just a log")), false);
assert_eq!(
cond.check(&Event::from(Metric {
name: "test metric".to_string(),
namespace: None,
timestamp: None,
tags: None,
kind: MetricKind::Incremental,
value: MetricValue::Counter { value: 1.0 },
})),
cond.check(&Event::from(Metric::new(
"test metric".to_string(),
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
))),
true
);
}
Expand Down
Loading

0 comments on commit 82f7fef

Please sign in to comment.