From 6ff92a40def837f69c46759dad1a995f3b3d91d7 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 3 Nov 2022 02:35:47 +0000 Subject: [PATCH] Time out async sends to avoid slackers stuck in the queue for too long (backport #28545) (#28732) * Time out async sends to avoid slackers stuck in the queue for too long (#28545) * Time out async sends to avoid slackers stuck in the queue for too long * Fixed a clippy error * Added stats for timeout counts * Link with stats correctly (cherry picked from commit f10ef763dc60c94c5065bf509b3524da44158bcf) # Conflicts: # tpu-client/src/connection_cache_stats.rs * Fixed merge issue * fmt code Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> (cherry picked from commit 527e2d4f59c6429a4a959d279738c872b97e56b5) --- client/src/connection_cache.rs | 11 ++++++ client/src/nonblocking/quic_client.rs | 4 +++ client/src/quic_client.rs | 49 ++++++++++++++++++++++----- client/src/tpu_connection.rs | 1 + 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 502d0ede8719ba..4a716badfabf67 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -92,6 +92,10 @@ impl ConnectionCacheStats { client_stats.make_connection_ms.load(Ordering::Relaxed), Ordering::Relaxed, ); + self.total_client_stats.send_timeout.fetch_add( + client_stats.send_timeout.load(Ordering::Relaxed), + Ordering::Relaxed, + ); self.sent_packets .fetch_add(num_packets as u64, Ordering::Relaxed); self.total_batches.fetch_add(1, Ordering::Relaxed); @@ -224,6 +228,13 @@ impl ConnectionCacheStats { self.batch_failure.swap(0, Ordering::Relaxed), i64 ), + ( + "send_timeout", + self.total_client_stats + .send_timeout + .swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 97d4cfdb40844c..d2f1e5dd3d4f08 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -512,6 +512,10 @@ impl QuicTpuConnection { self.client.stats() } + pub fn connection_stats(&self) -> Arc { + self.connection_stats.clone() + } + pub fn new( endpoint: Arc, addr: SocketAddr, diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index f1236b56b16bfc..bcd7f03090ac07 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -11,18 +11,21 @@ use { }, tpu_connection::TpuConnection as NonblockingTpuConnection, }, - tpu_connection::TpuConnection, + tpu_connection::{ClientStats, TpuConnection}, }, lazy_static::lazy_static, - solana_sdk::transport::Result as TransportResult, + log::*, + solana_sdk::transport::{Result as TransportResult, TransportError}, std::{ net::SocketAddr, - sync::{Arc, Condvar, Mutex, MutexGuard}, + sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard}, + time::Duration, }, - tokio::runtime::Runtime, + tokio::{runtime::Runtime, time::timeout}, }; const MAX_OUTSTANDING_TASK: u64 = 2000; +const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000; /// A semaphore used for limiting the number of asynchronous tasks spawn to the /// runtime. Before spawnning a task, use acquire. After the task is done (be it @@ -107,18 +110,48 @@ async fn send_wire_transaction_async( connection: Arc, wire_transaction: Vec, ) -> TransportResult<()> { - let result = connection.send_wire_transaction(wire_transaction).await; + let result = timeout( + Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS), + connection.send_wire_transaction(wire_transaction), + ) + .await; ASYNC_TASK_SEMAPHORE.release(); - result + handle_send_result(result, connection) } async fn send_wire_transaction_batch_async( connection: Arc, buffers: Vec>, ) -> TransportResult<()> { - let result = connection.send_wire_transaction_batch(&buffers).await; + let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64; + + let result = timeout( + Duration::from_millis(time_out), + connection.send_wire_transaction_batch(&buffers), + ) + .await; ASYNC_TASK_SEMAPHORE.release(); - result + handle_send_result(result, connection) +} + +/// Check the send result and update stats if timedout. Returns the checked result. +fn handle_send_result( + result: Result, tokio::time::error::Elapsed>, + connection: Arc, +) -> Result<(), TransportError> { + match result { + Ok(result) => result, + Err(_err) => { + let client_stats = ClientStats::default(); + client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); + let stats = connection.connection_stats(); + stats.add_client_stats(&client_stats, 0, false); + info!("Timedout sending transaction {:?}", connection.tpu_addr()); + Err(TransportError::Custom( + "Timedout sending transaction".to_string(), + )) + } + } } impl TpuConnection for QuicTpuConnection { diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 9f02319379c942..0825c06987dd58 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -21,6 +21,7 @@ pub struct ClientStats { pub tx_data_blocked: MovingStat, pub tx_acks: MovingStat, pub make_connection_ms: AtomicU64, + pub send_timeout: AtomicU64, } #[enum_dispatch]