Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1649 Reuse query counters #1671

31 changes: 24 additions & 7 deletions shotover/src/transforms/query_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::Result;
use async_trait::async_trait;
use metrics::counter;
use metrics::Counter;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;

use super::DownChainProtocol;
use super::TransformContextConfig;
Expand All @@ -16,6 +18,7 @@ use super::UpChainProtocol;
#[derive(Clone)]
pub struct QueryCounter {
counter_name: &'static str,
counters: HashMap<String, Counter>,
rukai marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -26,11 +29,25 @@ pub struct QueryCounterConfig {

impl QueryCounter {
pub fn new(counter_name: String) -> Self {
counter!("shotover_query_count", "name" => counter_name.clone());
let counter_name_ref: &'static str = counter_name.leak();
let mut counters = HashMap::new();
let query_counter = counter!("shotover_query_count", "name" => counter_name_ref);
counters.insert("shotover_query_count".to_string(), query_counter);
rukai marked this conversation as resolved.
Show resolved Hide resolved

QueryCounter {
// Leaking here is fine since the builder is created only once during shotover startup.
counter_name: counter_name.leak(),
counter_name: counter_name_ref,
counters,
}
}

fn increment_counter(&mut self, query: String, query_type: &'static str) {
rukai marked this conversation as resolved.
Show resolved Hide resolved
if self.counters.contains_key(query.as_str()) {
rukai marked this conversation as resolved.
Show resolved Hide resolved
self.counters.get(query.as_str()).unwrap().increment(1);
} else {
let query_counter = counter!("shotover_query_count", "name" => self.counter_name, "query" => query.clone(), "type" => query_type);
query_counter.increment(1);
self.counters.insert(query, query_counter);
}
}
}
Expand All @@ -57,20 +74,20 @@ impl Transform for QueryCounter {
#[cfg(feature = "cassandra")]
Some(Frame::Cassandra(frame)) => {
for statement in frame.operation.queries() {
counter!("shotover_query_count", "name" => self.counter_name, "query" => statement.short_name(), "type" => "cassandra").increment(1);
self.increment_counter(statement.short_name().to_string(), "cassandra");
}
}
#[cfg(feature = "redis")]
Some(Frame::Redis(frame)) => {
if let Some(query_type) = crate::frame::redis::redis_query_name(frame) {
counter!("shotover_query_count", "name" => self.counter_name, "query" => query_type, "type" => "redis").increment(1);
self.increment_counter(query_type, "redis");
} else {
counter!("shotover_query_count", "name" => self.counter_name, "query" => "unknown", "type" => "redis").increment(1);
self.increment_counter("unknown".to_string(), "redis");
}
}
#[cfg(feature = "kafka")]
Some(Frame::Kafka(_)) => {
counter!("shotover_query_count", "name" => self.counter_name, "query" => "unknown", "type" => "kafka").increment(1);
self.increment_counter("unknown".to_string(), "kafka");
}
Some(Frame::Dummy) => {
// Dummy does not count as a message
Expand All @@ -80,7 +97,7 @@ impl Transform for QueryCounter {
todo!();
}
None => {
counter!("shotover_query_count", "name" => self.counter_name, "query" => "unknown", "type" => "none").increment(1)
self.increment_counter("unknown".to_string(), "none");
}
}
}
Expand Down