Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Fix fannout gossip bench (bp #10509) (#10556)
Browse files Browse the repository at this point in the history
automerge
  • Loading branch information
mergify[bot] authored Jun 14, 2020
1 parent b567138 commit f13498b
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 75 deletions.
2 changes: 1 addition & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;

/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 4;
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1;

// Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 4;
Expand Down
12 changes: 11 additions & 1 deletion core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub struct ClusterInfo {
my_contact_info: RwLock<ContactInfo>,
id: Pubkey,
stats: GossipStats,
socket: UdpSocket,
}

impl Default for ClusterInfo {
Expand Down Expand Up @@ -407,6 +408,7 @@ impl ClusterInfo {
my_contact_info: RwLock::new(contact_info),
id,
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
};
{
let mut gossip = me.gossip.write().unwrap();
Expand All @@ -432,6 +434,7 @@ impl ClusterInfo {
my_contact_info: RwLock::new(my_contact_info),
id: *new_id,
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
}
}

Expand Down Expand Up @@ -737,6 +740,13 @@ impl ClusterInfo {
.process_push_message(&self.id(), vec![entry], now);
}

pub fn send_vote(&self, vote: &Transaction) -> Result<()> {
let tpu = self.my_contact_info().tpu;
let buf = serialize(vote)?;
self.socket.send_to(&buf, &tpu)?;
Ok(())
}

/// Get votes in the crds
/// * since - The timestamp of when the vote inserted must be greater than
/// since. This allows the bank to query for new votes only.
Expand Down Expand Up @@ -2216,7 +2226,7 @@ impl ClusterInfo {
.name("solana-listen".to_string())
.spawn(move || {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.num_threads(std::cmp::min(get_thread_count(), 8))
.thread_name(|i| format!("sol-gossip-work-{}", i))
.build()
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions core/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::collections::HashMap;
pub struct Crds {
/// Stores the map of labels and values
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -84,6 +85,7 @@ impl Default for Crds {
fn default() -> Self {
Crds {
table: IndexMap::new(),
num_inserts: 0,
}
}
}
Expand Down Expand Up @@ -125,6 +127,7 @@ impl Crds {
.unwrap_or(true);
if do_insert {
let old = self.table.insert(label, new_value);
self.num_inserts += 1;
Ok(old)
} else {
trace!("INSERT FAILED data: {} new.wallclock: {}", label, wallclock,);
Expand Down
18 changes: 6 additions & 12 deletions core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,10 @@ impl CrdsGossip {
stakes: &HashMap<Pubkey, u64>,
) -> HashMap<Pubkey, HashSet<Pubkey>> {
let id = &self.id;
let crds = &self.crds;
let push = &mut self.push;
let versioned = labels
.into_iter()
.filter_map(|label| crds.lookup_versioned(&label));

let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
for val in versioned {
let origin = val.value.pubkey();
let hash = val.value_hash;
let peers = push.prune_received_cache(id, &origin, hash, stakes);
for origin in labels.iter().map(|k| k.pubkey()) {
let peers = push.prune_received_cache(id, &origin, stakes);
for from in peers {
prune_map.entry(from).or_default().insert(origin);
}
Expand All @@ -113,7 +106,7 @@ impl CrdsGossip {
return Err(CrdsGossipError::PruneMessageTimeout);
}
if self.id == *destination {
self.push.process_prune_msg(peer, origin);
self.push.process_prune_msg(&self.id, peer, origin);
Ok(())
} else {
Err(CrdsGossipError::BadPruneDestination)
Expand Down Expand Up @@ -190,14 +183,15 @@ impl CrdsGossip {
now: u64,
process_pull_stats: &mut ProcessPullStats,
) {
self.pull.process_pull_responses(
let success = self.pull.process_pull_responses(
&mut self.crds,
from,
responses,
responses_expired_timeout,
now,
process_pull_stats,
)
);
self.push.push_pull_responses(success, now);
}

pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
Expand Down
1 change: 0 additions & 1 deletion core/src/crds_gossip_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
pub enum CrdsGossipError {
NoPeers,
PushMessageTimeout,
PushMessageAlreadyReceived,
PushMessageOldVersion,
BadPruneDestination,
PruneMessageTimeout,
Expand Down
11 changes: 10 additions & 1 deletion core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub struct CrdsGossipPull {
purged_values: VecDeque<(Hash, u64)>,
pub crds_timeout: u64,
pub msg_timeout: u64,
pub num_pulls: usize,
}

impl Default for CrdsGossipPull {
Expand All @@ -143,6 +144,7 @@ impl Default for CrdsGossipPull {
pull_request_time: HashMap::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
num_pulls: 0,
}
}
}
Expand Down Expand Up @@ -313,18 +315,24 @@ impl CrdsGossipPull {
responses_expired_timeout: Vec<VersionedCrdsValue>,
now: u64,
stats: &mut ProcessPullStats,
) {
) -> Vec<(CrdsValueLabel, Hash, u64)> {
let mut success = vec![];
let mut owners = HashSet::new();
for r in responses_expired_timeout {
stats.failed_insert += crds.insert_versioned(r).is_err() as usize;
}
for r in responses {
let owner = r.value.label().pubkey();
let label = r.value.label();
let wc = r.value.wallclock();
let hash = r.value_hash;
let old = crds.insert_versioned(r);
if old.is_err() {
stats.failed_insert += 1;
} else {
stats.success += 1;
self.num_pulls += 1;
success.push((label, hash, wc));
}
old.ok().map(|opt| {
owners.insert(owner);
Expand All @@ -338,6 +346,7 @@ impl CrdsGossipPull {
for owner in owners {
crds.update_record_timestamp(&owner, now);
}
success
}
// build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
Expand Down
Loading

0 comments on commit f13498b

Please sign in to comment.