From f13498b42890ccc8dae1f731f4b83a445e238f79 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 14 Jun 2020 08:52:00 -0700 Subject: [PATCH] Fix fannout gossip bench (bp #10509) (#10556) automerge --- core/src/banking_stage.rs | 2 +- core/src/cluster_info.rs | 12 ++- core/src/crds.rs | 3 + core/src/crds_gossip.rs | 18 ++-- core/src/crds_gossip_error.rs | 1 - core/src/crds_gossip_pull.rs | 11 ++- core/src/crds_gossip_push.rs | 155 +++++++++++++++++++++------------- core/src/replay_stage.rs | 2 +- core/tests/gossip.rs | 142 ++++++++++++++++++++++++++++++- 9 files changed, 271 insertions(+), 75 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a5a992a025dba2..1f6670c9a37c4d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -51,7 +51,7 @@ type PacketsAndOffsets = (Packets, Vec); pub type UnprocessedPackets = Vec; /// Transaction forwarding -pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4; +pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1; // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 4; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index ee903a862e23f6..cc3e25718bf7c0 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -252,6 +252,7 @@ pub struct ClusterInfo { my_contact_info: RwLock, id: Pubkey, stats: GossipStats, + socket: UdpSocket, } impl Default for ClusterInfo { @@ -407,6 +408,7 @@ impl ClusterInfo { my_contact_info: RwLock::new(contact_info), id, stats: GossipStats::default(), + socket: UdpSocket::bind("0.0.0.0:0").unwrap(), }; { let mut gossip = me.gossip.write().unwrap(); @@ -432,6 +434,7 @@ impl ClusterInfo { my_contact_info: RwLock::new(my_contact_info), id: *new_id, stats: GossipStats::default(), + socket: UdpSocket::bind("0.0.0.0:0").unwrap(), } } @@ -737,6 +740,13 @@ impl ClusterInfo { .process_push_message(&self.id(), vec![entry], now); } + pub fn send_vote(&self, vote: &Transaction) -> Result<()> { + let tpu = self.my_contact_info().tpu; + let buf = serialize(vote)?; + self.socket.send_to(&buf, &tpu)?; + Ok(()) + } + /// Get votes in the crds /// * since - The timestamp of when the vote inserted must be greater than /// since. This allows the bank to query for new votes only. @@ -2216,7 +2226,7 @@ impl ClusterInfo { .name("solana-listen".to_string()) .spawn(move || { let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) + .num_threads(std::cmp::min(get_thread_count(), 8)) .thread_name(|i| format!("sol-gossip-work-{}", i)) .build() .unwrap(); diff --git a/core/src/crds.rs b/core/src/crds.rs index ede88881dcaaeb..1f6cfe3c6f494d 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -36,6 +36,7 @@ use std::collections::HashMap; pub struct Crds { /// Stores the map of labels and values pub table: IndexMap, + pub num_inserts: usize, } #[derive(PartialEq, Debug)] @@ -84,6 +85,7 @@ impl Default for Crds { fn default() -> Self { Crds { table: IndexMap::new(), + num_inserts: 0, } } } @@ -125,6 +127,7 @@ impl Crds { .unwrap_or(true); if do_insert { let old = self.table.insert(label, new_value); + self.num_inserts += 1; Ok(old) } else { trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 0d578d89b61f80..817701b61ac5b8 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -76,17 +76,10 @@ impl CrdsGossip { stakes: &HashMap, ) -> HashMap> { let id = &self.id; - let crds = &self.crds; let push = &mut self.push; - let versioned = labels - .into_iter() - .filter_map(|label| crds.lookup_versioned(&label)); - let mut prune_map: HashMap> = HashMap::new(); - for val in versioned { - let origin = val.value.pubkey(); - let hash = val.value_hash; - let peers = push.prune_received_cache(id, &origin, hash, stakes); + for origin in labels.iter().map(|k| k.pubkey()) { + let peers = push.prune_received_cache(id, &origin, stakes); for from in peers { prune_map.entry(from).or_default().insert(origin); } @@ -113,7 +106,7 @@ impl CrdsGossip { return Err(CrdsGossipError::PruneMessageTimeout); } if self.id == *destination { - self.push.process_prune_msg(peer, origin); + self.push.process_prune_msg(&self.id, peer, origin); Ok(()) } else { Err(CrdsGossipError::BadPruneDestination) @@ -190,14 +183,15 @@ impl CrdsGossip { now: u64, process_pull_stats: &mut ProcessPullStats, ) { - self.pull.process_pull_responses( + let success = self.pull.process_pull_responses( &mut self.crds, from, responses, responses_expired_timeout, now, process_pull_stats, - ) + ); + self.push.push_pull_responses(success, now); } pub fn make_timeouts_test(&self) -> HashMap { diff --git a/core/src/crds_gossip_error.rs b/core/src/crds_gossip_error.rs index e99ae611e7b126..add95f5d44e715 100644 --- a/core/src/crds_gossip_error.rs +++ b/core/src/crds_gossip_error.rs @@ -2,7 +2,6 @@ pub enum CrdsGossipError { NoPeers, PushMessageTimeout, - PushMessageAlreadyReceived, PushMessageOldVersion, BadPruneDestination, PruneMessageTimeout, diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index aafcb3d151aaa1..75b471a1a5db7e 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -134,6 +134,7 @@ pub struct CrdsGossipPull { purged_values: VecDeque<(Hash, u64)>, pub crds_timeout: u64, pub msg_timeout: u64, + pub num_pulls: usize, } impl Default for CrdsGossipPull { @@ -143,6 +144,7 @@ impl Default for CrdsGossipPull { pull_request_time: HashMap::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + num_pulls: 0, } } } @@ -313,18 +315,24 @@ impl CrdsGossipPull { responses_expired_timeout: Vec, now: u64, stats: &mut ProcessPullStats, - ) { + ) -> Vec<(CrdsValueLabel, Hash, u64)> { + let mut success = vec![]; let mut owners = HashSet::new(); for r in responses_expired_timeout { stats.failed_insert += crds.insert_versioned(r).is_err() as usize; } for r in responses { let owner = r.value.label().pubkey(); + let label = r.value.label(); + let wc = r.value.wallclock(); + let hash = r.value_hash; let old = crds.insert_versioned(r); if old.is_err() { stats.failed_insert += 1; } else { stats.success += 1; + self.num_pulls += 1; + success.push((label, hash, wc)); } old.ok().map(|opt| { owners.insert(owner); @@ -338,6 +346,7 @@ impl CrdsGossipPull { for owner in owners { crds.update_record_timestamp(&owner, now); } + success } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 453c9f32cab288..3efc3afd08106f 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -5,7 +5,7 @@ //! //! Main differences are: //! 1. There is no `max hop`. Messages are signed with a local wallclock. If they are outside of -//! the local nodes wallclock window they are drooped silently. +//! the local nodes wallclock window they are dropped silently. //! 2. The prune set is stored in a Bloom filter. use crate::{ @@ -35,6 +35,7 @@ pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; +pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; #[derive(Clone)] pub struct CrdsGossipPush { @@ -44,12 +45,18 @@ pub struct CrdsGossipPush { active_set: IndexMap>, /// push message queue push_messages: HashMap, - /// cache that tracks which validators a message was received from - received_cache: HashMap)>, + /// Cache that tracks which validators a message was received from + /// bool indicates it has been pruned. + /// This cache represents a lagging view of which validators + /// currently have this node in their `active_set` + received_cache: HashMap>, pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, pub prune_timeout: u64, + pub num_total: usize, + pub num_old: usize, + pub num_pushes: usize, } impl Default for CrdsGossipPush { @@ -64,6 +71,9 @@ impl Default for CrdsGossipPush { push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, + num_total: 0, + num_old: 0, + num_pushes: 0, } } } @@ -81,18 +91,21 @@ impl CrdsGossipPush { &mut self, self_pubkey: &Pubkey, origin: &Pubkey, - hash: Hash, stakes: &HashMap, ) -> Vec { let origin_stake = stakes.get(origin).unwrap_or(&0); let self_stake = stakes.get(self_pubkey).unwrap_or(&0); - let cache = self.received_cache.get(&hash); + let cache = self.received_cache.get(origin); if cache.is_none() { return Vec::new(); } + let peers = cache.unwrap(); - let peers = &cache.unwrap().1; - let peer_stake_total: u64 = peers.iter().map(|p| stakes.get(p).unwrap_or(&0)).sum(); + let peer_stake_total: u64 = peers + .iter() + .filter(|v| !(v.1).0) + .map(|v| stakes.get(v.0).unwrap_or(&0)) + .sum(); let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake); if peer_stake_total < prune_stake_threshold { return Vec::new(); @@ -100,7 +113,8 @@ impl CrdsGossipPush { let staked_peers: Vec<(Pubkey, u64)> = peers .iter() - .filter_map(|p| stakes.get(p).map(|s| (*p, *s))) + .filter(|v| !(v.1).0) + .filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s))) .filter(|(_, s)| *s > 0) .collect(); @@ -117,16 +131,27 @@ impl CrdsGossipPush { let (next_peer, next_stake) = staked_peers[next]; keep.insert(next_peer); peer_stake_sum += next_stake; - if peer_stake_sum >= prune_stake_threshold { + if peer_stake_sum >= prune_stake_threshold + && keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES + { break; } } - peers - .iter() + let pruned_peers: Vec = peers + .keys() .filter(|p| !keep.contains(p)) .cloned() - .collect() + .collect(); + pruned_peers.iter().for_each(|p| { + self.received_cache + .get_mut(origin) + .unwrap() + .get_mut(p) + .unwrap() + .0 = true; + }); + pruned_peers } /// process a push message to the network @@ -137,6 +162,7 @@ impl CrdsGossipPush { value: CrdsValue, now: u64, ) -> Result, CrdsGossipError> { + self.num_total += 1; if now > value .wallclock() @@ -149,21 +175,32 @@ impl CrdsGossipPush { return Err(CrdsGossipError::PushMessageTimeout); } let label = value.label(); + let origin = label.pubkey(); let new_value = crds.new_versioned(now, value); let value_hash = new_value.value_hash; - if let Some((_, ref mut received_set)) = self.received_cache.get_mut(&value_hash) { - received_set.insert(from.clone()); - return Err(CrdsGossipError::PushMessageAlreadyReceived); - } + let received_set = self + .received_cache + .entry(origin) + .or_insert_with(HashMap::new); + received_set.entry(*from).or_insert((false, 0)).1 = now; + let old = crds.insert_versioned(new_value); if old.is_err() { + self.num_old += 1; return Err(CrdsGossipError::PushMessageOldVersion); } - let mut received_set = HashSet::new(); - received_set.insert(from.clone()); self.push_messages.insert(label, value_hash); - self.received_cache.insert(value_hash, (now, received_set)); - Ok(old.ok().and_then(|opt| opt)) + Ok(old.unwrap()) + } + + /// push pull responses + pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) { + for (label, value_hash, wc) in values { + if now > wc.checked_add(self.msg_timeout).unwrap_or_else(|| 0) { + continue; + } + self.push_messages.insert(label, value_hash); + } } /// New push message to broadcast to peers. @@ -172,18 +209,10 @@ impl CrdsGossipPush { /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - let max = self.active_set.len(); - let mut nodes: Vec<_> = (0..max).collect(); - nodes.shuffle(&mut rand::thread_rng()); - let peers: Vec = nodes - .into_iter() - .filter_map(|n| self.active_set.get_index(n)) - .take(self.push_fanout) - .map(|n| *n.0) - .collect(); let mut total_bytes: usize = 0; let mut values = vec![]; let mut push_messages: HashMap> = HashMap::new(); + trace!("new_push_messages {}", self.push_messages.len()); for (label, hash) in &self.push_messages { let res = crds.lookup_versioned(label); if res.is_none() { @@ -203,21 +232,37 @@ impl CrdsGossipPush { } values.push(value.clone()); } + trace!( + "new_push_messages {} {}", + values.len(), + self.active_set.len() + ); for v in values { - for p in peers.iter() { - let filter = self.active_set.get_mut(p); - if filter.is_some() && !filter.unwrap().contains(&v.label().pubkey()) { - push_messages.entry(*p).or_default().push(v.clone()); + //use a consistent index for the same origin so + //the active set learns the MST for that origin + let start = v.label().pubkey().as_ref()[0] as usize; + let max = self.push_fanout.min(self.active_set.len()); + for i in start..(start + max) { + let ix = i % self.active_set.len(); + if let Some((p, filter)) = self.active_set.get_index(ix) { + if !filter.contains(&v.label().pubkey()) { + trace!("new_push_messages insert {} {:?}", *p, v); + push_messages.entry(*p).or_default().push(v.clone()); + self.num_pushes += 1; + } } + self.push_messages.remove(&v.label()); } - self.push_messages.remove(&v.label()); } push_messages } /// add the `from` to the peer's filter of nodes - pub fn process_prune_msg(&mut self, peer: &Pubkey, origins: &[Pubkey]) { + pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) { for origin in origins { + if origin == self_pubkey { + continue; + } if let Some(p) = self.active_set.get_mut(peer) { p.add(origin) } @@ -339,15 +384,11 @@ impl CrdsGossipPush { /// purge received push message cache pub fn purge_old_received_cache(&mut self, min_time: u64) { - let old_msgs: Vec = self - .received_cache - .iter() - .filter_map(|(k, (rcvd_time, _))| if *rcvd_time < min_time { Some(k) } else { None }) - .cloned() - .collect(); - for k in old_msgs { - self.received_cache.remove(&k); - } + self.received_cache + .iter_mut() + .for_each(|v| v.1.retain(|_, v| v.1 > min_time)); + + self.received_cache.retain(|_, v| !v.is_empty()); } } @@ -371,7 +412,6 @@ mod test { let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &origin, 0, ))); - let label = value.label(); let low_staked_peers = (0..10).map(|_| Pubkey::new_rand()); let mut low_staked_set = HashSet::new(); low_staked_peers.for_each(|p| { @@ -380,11 +420,7 @@ mod test { stakes.insert(p, 1); }); - let versioned = crds - .lookup_versioned(&label) - .expect("versioned value should exist"); - let hash = versioned.value_hash; - let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); + let pruned = push.prune_received_cache(&self_id, &origin, &stakes); assert!( pruned.is_empty(), "should not prune if min threshold has not been reached" @@ -395,7 +431,7 @@ mod test { stakes.insert(high_staked_peer, high_stake); let _ = push.process_push_message(&mut crds, &high_staked_peer, value, 0); - let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); + let pruned = push.prune_received_cache(&self_id, &origin, &stakes); assert!( pruned.len() < low_staked_set.len() + 1, "should not prune all peers" @@ -409,7 +445,7 @@ mod test { } #[test] - fn test_process_push() { + fn test_process_push_one() { let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -425,9 +461,9 @@ mod test { assert_eq!(crds.lookup(&label), Some(&value)); // push it again - assert_eq!( + assert_matches!( push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Err(CrdsGossipError::PushMessageAlreadyReceived) + Err(CrdsGossipError::PushMessageOldVersion) ); } #[test] @@ -690,6 +726,7 @@ mod test { #[test] fn test_process_prune() { let mut crds = Crds::default(); + let self_id = Pubkey::new_rand(); let mut push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -707,7 +744,11 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0), Ok(None) ); - push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]); + push.process_prune_msg( + &self_id, + &peer.label().pubkey(), + &[new_msg.label().pubkey()], + ); assert_eq!(push.new_push_messages(&crds, 0), expected); } #[test] @@ -749,9 +790,9 @@ mod test { assert_eq!(crds.lookup(&label), Some(&value)); // push it again - assert_eq!( + assert_matches!( push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Err(CrdsGossipError::PushMessageAlreadyReceived) + Err(CrdsGossipError::PushMessageOldVersion) ); // purge the old pushed diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 12a0a190c49d72..b14eaddcaeb536 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -952,7 +952,6 @@ impl ReplayStage { progress.get_fork_stats(bank.slot()).unwrap().total_staked, lockouts_sender, ); - Self::push_vote( cluster_info, bank, @@ -1044,6 +1043,7 @@ impl ReplayStage { let blockhash = bank.last_blockhash(); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); + let _ = cluster_info.send_vote(&vote_tx); cluster_info.push_vote(tower_index, vote_tx); } diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index fd0d05049c109b..0985d4dbad0d4f 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -4,13 +4,14 @@ extern crate log; use rayon::iter::*; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::gossip_service::GossipService; +use solana_ledger::bank_forks::BankForks; use solana_perf::packet::Packet; use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -27,6 +28,28 @@ fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSoc ) } +fn test_node_with_bank( + node_keypair: Keypair, + exit: &Arc, + bank_forks: Arc>, +) -> (Arc, GossipService, UdpSocket) { + let keypair = Arc::new(node_keypair); + let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); + let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair)); + let gossip_service = GossipService::new( + &cluster_info, + Some(bank_forks), + test_node.sockets.gossip, + exit, + ); + let _ = cluster_info.my_contact_info(); + ( + cluster_info, + gossip_service, + test_node.sockets.tvu.pop().unwrap(), + ) +} + /// Test that the network converges. /// Run until every node in the network has a full ContactInfo set. /// Check that nodes stop sending updates after all the ContactInfo has been shared. @@ -181,3 +204,120 @@ pub fn cluster_info_retransmit() { dr2.join().unwrap(); dr3.join().unwrap(); } + +#[test] +#[ignore] +pub fn cluster_info_scale() { + use solana_measure::measure::Measure; + use solana_perf::test_tx::test_tx; + use solana_runtime::bank::Bank; + use solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, + }; + solana_logger::setup(); + let exit = Arc::new(AtomicBool::new(false)); + let num_nodes: usize = std::env::var("NUM_NODES") + .unwrap_or_else(|_| "10".to_string()) + .parse() + .expect("could not parse NUM_NODES as a number"); + + let vote_keypairs: Vec<_> = (0..num_nodes) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect(); + let genesis_config_info = create_genesis_config_with_vote_accounts(10_000, &vote_keypairs, 100); + let bank0 = Bank::new(&genesis_config_info.genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); + + let nodes: Vec<_> = vote_keypairs + .into_iter() + .map(|keypairs| test_node_with_bank(keypairs.node_keypair, &exit, bank_forks.clone())) + .collect(); + let ci0 = nodes[0].0.my_contact_info(); + for node in &nodes[1..] { + node.0.insert_info(ci0.clone()); + } + + let mut time = Measure::start("time"); + let mut done; + let mut success = false; + for _ in 0..30 { + done = true; + for (i, node) in nodes.iter().enumerate() { + warn!("node {} peers: {}", i, node.0.gossip_peers().len()); + if node.0.gossip_peers().len() != num_nodes - 1 { + done = false; + break; + } + } + if done { + success = true; + break; + } + sleep(Duration::from_secs(1)); + } + time.stop(); + warn!("found {} nodes in {} success: {}", num_nodes, time, success); + + for num_votes in 1..1000 { + let mut time = Measure::start("votes"); + let tx = test_tx(); + warn!("tx.message.account_keys: {:?}", tx.message.account_keys); + nodes[0].0.push_vote(0, tx.clone()); + let mut success = false; + for _ in 0..(30 * 5) { + let mut not_done = 0; + let mut num_old = 0; + let mut num_push_total = 0; + let mut num_pushes = 0; + let mut num_pulls = 0; + let mut num_inserts = 0; + for node in nodes.iter() { + //if node.0.get_votes(0).1.len() != (num_nodes * num_votes) { + let has_tx = node + .0 + .get_votes(0) + .1 + .iter() + .filter(|v| v.message.account_keys == tx.message.account_keys) + .count(); + num_old += node.0.gossip.read().unwrap().push.num_old; + num_push_total += node.0.gossip.read().unwrap().push.num_total; + num_pushes += node.0.gossip.read().unwrap().push.num_pushes; + num_pulls += node.0.gossip.read().unwrap().pull.num_pulls; + num_inserts += node.0.gossip.read().unwrap().crds.num_inserts; + if has_tx == 0 { + not_done += 1; + } + } + warn!("not_done: {}/{}", not_done, nodes.len()); + warn!("num_old: {}", num_old); + warn!("num_push_total: {}", num_push_total); + warn!("num_pushes: {}", num_pushes); + warn!("num_pulls: {}", num_pulls); + warn!("num_inserts: {}", num_inserts); + success = not_done < (nodes.len() / 20); + if success { + break; + } + sleep(Duration::from_millis(200)); + } + time.stop(); + warn!( + "propagated vote {} in {} success: {}", + num_votes, time, success + ); + sleep(Duration::from_millis(200)); + for node in nodes.iter() { + node.0.gossip.write().unwrap().push.num_old = 0; + node.0.gossip.write().unwrap().push.num_total = 0; + node.0.gossip.write().unwrap().push.num_pushes = 0; + node.0.gossip.write().unwrap().pull.num_pulls = 0; + node.0.gossip.write().unwrap().crds.num_inserts = 0; + } + } + + exit.store(true, Ordering::Relaxed); + for node in nodes { + node.1.join().unwrap(); + } +}