From 04feed2cf52359c6f8be9156327e7bb02cc866b6 Mon Sep 17 00:00:00 2001 From: Greg Cusack Date: Fri, 29 Mar 2024 12:12:12 -0700 Subject: [PATCH] add metric for duplicate push messages (#321) * add metric for duplicate push messages * add in num_total_push * address comments. don't lock stats each time * address comments. remove num_total_push * change dup push message name in code to reflect metric name --- gossip/src/cluster_info_metrics.rs | 5 +++++ gossip/src/crds.rs | 12 ++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 74dc0c43e9606e..eed11f8313c195 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -434,6 +434,11 @@ pub(crate) fn submit_gossip_stats( i64 ), ("push_message_count", stats.push_message_count.clear(), i64), + ( + "num_duplicate_push_messages", + crds_stats.num_duplicate_push_messages, + i64 + ), ( "push_fanout_num_entries", stats.push_fanout_num_entries.clear(), diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index dbb6c43c0356c0..210c7a05aaf5f0 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -118,6 +118,7 @@ pub(crate) struct CrdsStats { /// number of times a message was first received via a PullResponse /// and that message was later received via a PushMessage pub(crate) num_redundant_pull_responses: u64, + pub(crate) num_duplicate_push_messages: u64, } /// This structure stores some local metadata associated with the CrdsValue @@ -235,9 +236,10 @@ impl Crds { let label = value.label(); let pubkey = value.pubkey(); let value = VersionedCrdsValue::new(value, self.cursor, now, route); + let mut stats = self.stats.lock().unwrap(); match self.table.entry(label) { Entry::Vacant(entry) => { - self.stats.lock().unwrap().record_insert(&value, route); + stats.record_insert(&value, route); let entry_index = entry.index(); self.shards.insert(entry_index, &value); match &value.value.data { @@ -263,7 +265,7 @@ impl Crds { Ok(()) } Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => { - self.stats.lock().unwrap().record_insert(&value, route); + stats.record_insert(&value, route); let entry_index = entry.index(); self.shards.remove(entry_index, entry.get()); self.shards.insert(entry_index, &value); @@ -302,7 +304,7 @@ impl Crds { Ok(()) } Entry::Occupied(mut entry) => { - self.stats.lock().unwrap().record_fail(&value, route); + stats.record_fail(&value, route); trace!( "INSERT FAILED data: {} new.wallclock: {}", value.value.label(), @@ -316,7 +318,9 @@ impl Crds { } else if matches!(route, GossipRoute::PushMessage(_)) { let entry = entry.get_mut(); if entry.num_push_recv == Some(0) { - self.stats.lock().unwrap().num_redundant_pull_responses += 1; + stats.num_redundant_pull_responses += 1; + } else { + stats.num_duplicate_push_messages += 1; } let num_push_dups = entry.num_push_recv.unwrap_or_default(); entry.num_push_recv = Some(num_push_dups.saturating_add(1));