Skip to content

Commit

Permalink
tighten the minimal streams per 100ms for staked node (solana-labs#738)
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Apr 11, 2024
1 parent 998fbef commit c38894e
Showing 1 changed file with 66 additions and 14 deletions.
80 changes: 66 additions & 14 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ fn handle_and_cache_new_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
params: &NewConnectionHandlerParams,
wait_for_chunk_timeout: Duration,
max_unstaked_connections: usize,
) -> Result<(), ConnectionHandlerError> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
connection_table_l.peer_type,
Expand Down Expand Up @@ -374,6 +375,7 @@ fn handle_and_cache_new_connection(
params.clone(),
peer_type,
wait_for_chunk_timeout,
max_unstaked_connections,
));
Ok(())
} else {
Expand Down Expand Up @@ -414,6 +416,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
connection_table_clone,
params,
wait_for_chunk_timeout,
max_connections,
)
} else {
connection.close(
Expand Down Expand Up @@ -534,6 +537,7 @@ async fn setup_connection(
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout,
max_unstaked_connections,
) {
stats
.connection_added_from_staked_peer
Expand Down Expand Up @@ -717,17 +721,30 @@ fn max_streams_for_connection_in_100ms(
connection_type: ConnectionPeerType,
stake: u64,
total_stake: u64,
max_unstaked_connections: usize,
) -> u64 {
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
let max_unstaked_streams_per_100ms = if max_unstaked_connections == 0 {
0
} else {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_100MS)
.saturating_div(MAX_UNSTAKED_CONNECTIONS as u64)
};

let min_staked_streams_per_100ms = if max_unstaked_connections == 0 {
const MIN_STAKED_STREAMS: u64 = 1;
MIN_STAKED_STREAMS
} else {
max_unstaked_streams_per_100ms.saturating_add(1)
};

if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
max_unstaked_streams_per_100ms
} else {
const MIN_STAKED_STREAMS: u64 = 8;
let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS);
std::cmp::max(
MIN_STAKED_STREAMS,
min_staked_streams_per_100ms,
((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64,
)
}
Expand All @@ -751,6 +768,7 @@ async fn handle_connection(
params: NewConnectionHandlerParams,
peer_type: ConnectionPeerType,
wait_for_chunk_timeout: Duration,
max_unstaked_connections: usize,
) {
let stats = params.stats;
debug!(
Expand All @@ -761,8 +779,12 @@ async fn handle_connection(
);
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let max_streams_per_100ms =
max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake);
let max_streams_per_100ms = max_streams_for_connection_in_100ms(
peer_type,
params.stake,
params.total_stake,
max_unstaked_connections,
);
let mut last_throttling_instant = tokio::time::Instant::now();
let mut streams_in_current_interval = 0;
while !stream_exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -2065,42 +2087,72 @@ pub mod test {
fn test_max_streams_for_connection_in_100ms() {
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 0, 10000),
max_streams_for_connection_in_100ms(
ConnectionPeerType::Unstaked,
0,
10000,
MAX_UNSTAKED_CONNECTIONS
),
20
);

// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 10, 10000),
max_streams_for_connection_in_100ms(
ConnectionPeerType::Unstaked,
10,
10000,
MAX_UNSTAKED_CONNECTIONS
),
20
);

// If stake is 0, same limits as unstaked connections will apply.
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 0, 10000),
max_streams_for_connection_in_100ms(
ConnectionPeerType::Staked,
0,
10000,
MAX_UNSTAKED_CONNECTIONS
),
20
);

// max staked streams = 50K packets per ms * 80% = 40K
// function = 40K * stake / total_stake
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 15, 10000),
max_streams_for_connection_in_100ms(
ConnectionPeerType::Staked,
15,
10000,
MAX_UNSTAKED_CONNECTIONS
),
60
);

// max staked streams = 50K packets per ms * 80% = 40K
// function = 40K * stake / total_stake
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1000, 10000),
max_streams_for_connection_in_100ms(
ConnectionPeerType::Staked,
1000,
10000,
MAX_UNSTAKED_CONNECTIONS
),
4000
);

// max staked streams = 50K packets per ms * 80% = 40K
// minimum staked streams.
// max staked streams minimum unstkaed streams + 1.
// (50K packets per ms * 20%) / 500 + 1 =
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1, 50000),
8
max_streams_for_connection_in_100ms(
ConnectionPeerType::Staked,
1,
50000,
MAX_UNSTAKED_CONNECTIONS
),
21
);
}
}

0 comments on commit c38894e

Please sign in to comment.