Skip to content

Commit

Permalink
pack pullresponses up to packet limit and outbound data budget (solan…
Browse files Browse the repository at this point in the history
…a-labs#4712)

* pack pullresponses up to packet limit and outbound data budget

* don't clone until we send CrdsValue

* add pull response bytes metric. batch and score crdsvalues at once. index into pull_responses

* test_split_gossip_messages_pull_response

* score batches after pull responses are chunked

* address comments

* revert to original scoring
  • Loading branch information
gregcusack authored Feb 5, 2025
1 parent 0379ea6 commit 072013a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 45 deletions.
114 changes: 69 additions & 45 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use {
protocol::{
split_gossip_messages, Ping, PingCache, Protocol, PruneData,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, MAX_INCREMENTAL_SNAPSHOT_HASHES,
MAX_PRUNE_DATA_NODES, PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
MAX_PRUNE_DATA_NODES, PULL_RESPONSE_MAX_PAYLOAD_SIZE,
PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
},
restart_crds_values::{
RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
Expand All @@ -48,7 +49,6 @@ use {
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
solana_net_utils::{
bind_common_in_range_with_config, bind_common_with_config, bind_in_range,
bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified,
Expand Down Expand Up @@ -1736,7 +1736,6 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
) -> PacketBatch {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
let mut time = Measure::start("handle_pull_requests");
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packet_batch =
Expand Down Expand Up @@ -1768,67 +1767,92 @@ impl ClusterInfo {
&self.stats,
)
};
let (responses, scores): (Vec<_>, Vec<_>) = addrs
.iter()
.zip(pull_responses)
.flat_map(|(addr, responses)| repeat(addr).zip(responses))
.map(|(addr, response)| {
let age = now.saturating_sub(response.wallclock());
let score = DEFAULT_EPOCH_DURATION_MS
.saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1);
let score = if stakes.contains_key(&response.pubkey()) {
2 * score
} else {
score
};
let score = match response.data() {
CrdsData::ContactInfo(_) => 2 * score,
_ => score,
};
((addr, response), score)
})
.unzip();
if responses.is_empty() {
struct Chunk<'a> {
addr: &'a SocketAddr,
values: Vec<&'a CrdsValue>,
score: u64,
}
let mut num_crds_values = 0;
let mut chunks = Vec::with_capacity(pull_responses.len());
for (addr, crds_values) in addrs.iter().zip(&pull_responses) {
if crds_values.is_empty() {
continue;
}
num_crds_values += crds_values.len();

for chunk_values in split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, crds_values) {
let chunk_score = chunk_values
.iter()
.map(|value| {
let age = now.saturating_sub(value.wallclock());
// score CrdsValue: 2x score if staked; 2x score if ContactInfo
let score = DEFAULT_EPOCH_DURATION_MS
.saturating_sub(age)
.div(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)
.max(1);
let score = if stakes.contains_key(&value.pubkey()) {
2 * score
} else {
score
};
let score = match value.data() {
CrdsData::ContactInfo(_) => 2 * score,
_ => score,
};
score
})
.max()
.unwrap_or_default();

chunks.push(Chunk {
addr,
values: chunk_values,
score: chunk_score,
});
}
}
if chunks.is_empty() {
return packet_batch;
}
let scores: Vec<u64> = chunks.iter().map(|c| c.score).collect();
let mut rng = rand::thread_rng();
let shuffle = WeightedShuffle::new("handle-pull-requests", scores).shuffle(&mut rng);
let shuffle =
WeightedShuffle::<u64>::new("handle-pull-requests", &scores).shuffle(&mut rng);
let mut total_bytes = 0;
let mut sent = 0;
for (addr, response) in shuffle.map(|i| &responses[i]) {
let response = vec![response.clone()];
let response = Protocol::PullResponse(self_id, response);
let mut sent_pull_responses = 0;
let mut sent_crds_values = 0;
for chunk in shuffle.map(|i: usize| &chunks[i]) {
let Chunk { addr, values, .. } = chunk;
let chunk_values: Vec<CrdsValue> = values.iter().map(|&v| v.clone()).collect();
let response = Protocol::PullResponse(self_id, chunk_values);
match Packet::from_data(Some(addr), response) {
Err(err) => error!("failed to write pull-response packet: {:?}", err),
Err(err) => {
error!("failed to write pull-response packet: {:?}", err);
}
Ok(packet) => {
if self.outbound_budget.take(packet.meta().size) {
total_bytes += packet.meta().size;
let packet_size = packet.meta().size;
if self.outbound_budget.take(packet_size) {
total_bytes += packet_size;
packet_batch.push(packet);
sent += 1;
sent_pull_responses += 1;
sent_crds_values += values.len();
} else {
self.stats.gossip_pull_request_no_budget.add_relaxed(1);
break;
}
}
}
}
time.stop();
let dropped_responses = responses.len() - sent;
let dropped_responses = num_crds_values.saturating_sub(sent_crds_values);
self.stats
.gossip_pull_request_sent_requests
.add_relaxed(sent as u64);
.add_relaxed(sent_pull_responses as u64);
self.stats
.gossip_pull_request_dropped_requests
.add_relaxed(dropped_responses as u64);
debug!(
"handle_pull_requests: {} sent: {} total: {} total_bytes: {}",
time,
sent,
responses.len(),
total_bytes
);
self.stats
.gossip_pull_request_sent_bytes
.add_relaxed(total_bytes as u64);
packet_batch
}

Expand Down
6 changes: 6 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub struct GossipStats {
pub(crate) gossip_packets_dropped_count: Counter,
pub(crate) gossip_pull_request_dropped_requests: Counter,
pub(crate) gossip_pull_request_no_budget: Counter,
pub(crate) gossip_pull_request_sent_bytes: Counter,
pub(crate) gossip_pull_request_sent_requests: Counter,
pub(crate) gossip_transmit_loop_iterations_since_last_report: Counter,
pub(crate) gossip_transmit_loop_time: Counter,
Expand Down Expand Up @@ -423,6 +424,11 @@ pub(crate) fn submit_gossip_stats(
stats.gossip_pull_request_dropped_requests.clear(),
i64
),
(
"gossip_pull_request_sent_bytes",
stats.gossip_pull_request_sent_bytes.clear(),
i64
),
(
"gossip_transmit_loop_time",
stats.gossip_transmit_loop_time.clear(),
Expand Down
51 changes: 51 additions & 0 deletions gossip/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub(crate) const MAX_CRDS_OBJECT_SIZE: usize = 928;
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty push
/// message: Protocol::PushMessage(Pubkey::default(), Vec::default())
pub(crate) const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;
/// Max size of serialized crds-values in a Protocol::PullResponse packet. This
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty pull
/// message: Protocol::PullResponse(Pubkey::default(), Vec::default())
pub(crate) const PULL_RESPONSE_MAX_PAYLOAD_SIZE: usize = PUSH_MESSAGE_MAX_PAYLOAD_SIZE;
pub(crate) const DUPLICATE_SHRED_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 115;
/// Maximum number of incremental hashes in SnapshotHashes a node publishes
/// such that the serialized size of the push/pull message stays below
Expand Down Expand Up @@ -414,6 +418,15 @@ pub(crate) mod tests {
);
}

#[test]
fn test_pull_response_max_payload_size() {
let header = Protocol::PullResponse(Pubkey::default(), Vec::default());
assert_eq!(
PULL_RESPONSE_MAX_PAYLOAD_SIZE,
PACKET_DATA_SIZE - header.bincode_serialized_size()
);
}

#[test]
fn test_duplicate_shred_max_payload_size() {
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -522,6 +535,44 @@ pub(crate) mod tests {
}
}

#[test]
fn test_split_gossip_messages_pull_response() {
const NUM_CRDS_VALUES: usize = 2048;
let mut rng = rand::thread_rng();
let values: Vec<_> = repeat_with(|| CrdsValue::new_rand(&mut rng, None))
.take(NUM_CRDS_VALUES)
.collect();
let splits: Vec<_> =
split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, values.clone()).collect();
let self_pubkey = solana_pubkey::new_rand();
assert!(splits.len() * 2 < NUM_CRDS_VALUES);
// Assert that all messages are included in the splits.
assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::<usize>());
splits
.iter()
.flat_map(|s| s.iter())
.zip(values)
.for_each(|(a, b)| assert_eq!(*a, b));
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
// check message fits into PullResponse
let header_size = PACKET_DATA_SIZE - PULL_RESPONSE_MAX_PAYLOAD_SIZE;
for values in splits {
// Assert that sum of parts equals the whole.
let size = header_size
+ values
.iter()
.map(CrdsValue::bincode_serialized_size)
.sum::<usize>();
let message = Protocol::PullResponse(self_pubkey, values);
assert_eq!(message.bincode_serialized_size(), size);
// Assert that the message fits into a packet.
assert!(Packet::from_data(Some(&socket), message).is_ok());
}
}

#[test]
fn test_split_messages_packet_size() {
// Test that if a value is smaller than payload size but too large to be wrapped in a vec
Expand Down

0 comments on commit 072013a

Please sign in to comment.