Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Throttle unstaked quic streams for a given connection (#34562)
Browse files Browse the repository at this point in the history
* Throttle unstaked quic streams for a given connection

* Fix interval duration check

* move wait to handle_chunk

* set max unistreams to 0

* drop new streams

* cleanup

* some more cleanup

* fix tests

* update test and stop code

* fix bench-tps
  • Loading branch information
pgarg66 authored Dec 22, 2023
1 parent 88af74d commit 6bbd366
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 31 deletions.
8 changes: 6 additions & 2 deletions bench-tps/src/send_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,13 @@ where
fn send<C: BenchTpsClient + ?Sized>(&self, client: &Arc<C>) {
let mut send_txs = Measure::start("send_and_clone_txs");
let batch: Vec<_> = self.iter().map(|(_keypair, tx)| tx.clone()).collect();
client.send_batch(batch).expect("transfer");
let result = client.send_batch(batch);
send_txs.stop();
debug!("send {} {}", self.len(), send_txs);
if result.is_err() {
debug!("Failed to send batch {result:?}");
} else {
debug!("send {} {}", self.len(), send_txs);
}
}

fn verify<C: 'static + BenchTpsClient + Send + Sync + ?Sized>(
Expand Down
34 changes: 18 additions & 16 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2926,24 +2926,26 @@ fn setup_transfer_scan_threads(
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
for i in 0..starting_keypairs_.len() {
client
.async_transfer(
1,
&starting_keypairs_[i],
&target_keypairs_[i].pubkey(),
blockhash,
)
.unwrap();
let result = client.async_transfer(
1,
&starting_keypairs_[i],
&target_keypairs_[i].pubkey(),
blockhash,
);
if result.is_err() {
debug!("Failed in transfer for starting keypair: {:?}", result);
}
}
for i in 0..starting_keypairs_.len() {
client
.async_transfer(
1,
&target_keypairs_[i],
&starting_keypairs_[i].pubkey(),
blockhash,
)
.unwrap();
let result = client.async_transfer(
1,
&target_keypairs_[i],
&starting_keypairs_[i].pubkey(),
blockhash,
);
if result.is_err() {
debug!("Failed in transfer for starting keypair: {:?}", result);
}
}
}
})
Expand Down
8 changes: 5 additions & 3 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod tests {
assert_eq!(p.meta().size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
assert!(total_packets > 0);
}

fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair, IpAddr) {
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {
assert_eq!(p.meta().size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
assert!(total_packets > 0);
}

#[tokio::test]
Expand Down Expand Up @@ -182,7 +182,9 @@ mod tests {
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(client.send_data_batch(&packets).await.is_ok());
for packet in packets {
let _ = client.send_data(&packet).await;
}

nonblocking_check_packets(receiver, num_bytes, num_expected_packets).await;
exit.store(true, Ordering::Relaxed);
Expand Down
94 changes: 84 additions & 10 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
quic::{configure_server, QuicServerError, StreamStats},
quic::{configure_server, QuicServerError, StreamStats, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
tls_certificates::get_pubkey_from_tls_certificate,
},
Expand Down Expand Up @@ -39,6 +39,10 @@ use {
tokio::{task::JoinHandle, time::timeout},
};

/// Limit to 500K PPS
const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100);
const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100);
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);

Expand All @@ -55,6 +59,7 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre

const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const STREAM_STOP_CODE_THROTTLING: u32 = 15;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
Expand Down Expand Up @@ -264,6 +269,7 @@ enum ConnectionHandlerError {
MaxStreamError,
}

#[derive(Clone)]
struct NewConnectionHandlerParams {
// In principle, the code can be made to work with a crossbeam channel
// as long as we're careful never to use a blocking recv or send call
Expand Down Expand Up @@ -348,13 +354,11 @@ fn handle_and_cache_new_connection(
drop(connection_table_l);
tokio::spawn(handle_connection(
connection,
params.packet_sender.clone(),
remote_addr,
params.remote_pubkey,
last_update,
connection_table,
stream_exit,
params.stats.clone(),
params.clone(),
peer_type,
wait_for_chunk_timeout,
));
Expand Down Expand Up @@ -681,19 +685,42 @@ async fn packet_batch_sender(
}
}

#[allow(clippy::too_many_arguments)]
fn max_streams_for_connection_in_100ms(
connection_type: ConnectionPeerType,
stake: u64,
total_stake: u64,
) -> u64 {
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_100MS)
.saturating_div(MAX_UNSTAKED_CONNECTIONS as u64)
} else {
let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS);
((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64
}
}

fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool {
if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL {
*last_instant = tokio::time::Instant::now();
true
} else {
false
}
}

async fn handle_connection(
connection: Connection,
packet_sender: AsyncSender<PacketAccumulator>,
remote_addr: SocketAddr,
remote_pubkey: Option<Pubkey>,
last_update: Arc<AtomicU64>,
connection_table: Arc<Mutex<ConnectionTable>>,
stream_exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
params: NewConnectionHandlerParams,
peer_type: ConnectionPeerType,
wait_for_chunk_timeout: Duration,
) {
let stats = params.stats;
debug!(
"quic new connection {} streams: {} connections: {}",
remote_addr,
Expand All @@ -702,17 +729,28 @@ async fn handle_connection(
);
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let max_streams_per_100ms =
max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake);
let mut last_throttling_instant = tokio::time::Instant::now();
let mut streams_in_current_interval = 0;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) =
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
{
match stream {
Ok(mut stream) => {
if reset_throttling_params_if_needed(&mut last_throttling_instant) {
streams_in_current_interval = 0;
} else if streams_in_current_interval >= max_streams_per_100ms {
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
streams_in_current_interval = streams_in_current_interval.saturating_add(1);
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let stream_exit = stream_exit.clone();
let stats = stats.clone();
let packet_sender = packet_sender.clone();
let packet_sender = params.packet_sender.clone();
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
Expand Down Expand Up @@ -765,7 +803,7 @@ async fn handle_connection(
}

let removed_connection_count = connection_table.lock().unwrap().remove_connection(
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
stable_id,
);
Expand Down Expand Up @@ -1989,4 +2027,40 @@ pub mod test {
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
}

#[test]
fn test_max_streams_for_connection_in_100ms() {
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 0, 10000),
20
);

// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 10, 10000),
20
);

// If stake is 0, same limits as unstaked connections will apply.
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 0, 10000),
20
);

// max staked streams = 50K packets per ms * 80% = 40K
// function = 40K * stake / total_stake
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 15, 10000),
60
);

// max staked streams = 50K packets per ms * 80% = 40K
// function = 40K * stake / total_stake
assert_eq!(
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1000, 10000),
4000
);
}
}

0 comments on commit 6bbd366

Please sign in to comment.