Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Backport timeout async send tasks to v1.13 (#28750)
Browse files Browse the repository at this point in the history
Time out async sends to avoid slackers stuck in the queue for too long (backport #28545) (#28732)

* Time out async sends to avoid slackers stuck in the queue for too long (#28545)

* Time out async sends to avoid slackers stuck in the queue for too long

* Fixed a clippy error

* Added stats for timeout counts

* Link with stats correctly

(cherry picked from commit f10ef76)

# Conflicts:
#	tpu-client/src/connection_cache_stats.rs

* Fixed merge issue

* fmt code

Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
lijunwangs and mergify[bot] authored Nov 10, 2022
1 parent 959b760 commit f3471e5
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 8 deletions.
11 changes: 11 additions & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl ConnectionCacheStats {
client_stats.make_connection_ms.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.total_client_stats.send_timeout.fetch_add(
client_stats.send_timeout.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.sent_packets
.fetch_add(num_packets as u64, Ordering::Relaxed);
self.total_batches.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -224,6 +228,13 @@ impl ConnectionCacheStats {
self.batch_failure.swap(0, Ordering::Relaxed),
i64
),
(
"send_timeout",
self.total_client_stats
.send_timeout
.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ impl QuicTpuConnection {
self.client.stats()
}

pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
self.connection_stats.clone()
}

pub fn new(
endpoint: Arc<QuicLazyInitializedEndpoint>,
addr: SocketAddr,
Expand Down
49 changes: 41 additions & 8 deletions client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ use {
},
tpu_connection::TpuConnection as NonblockingTpuConnection,
},
tpu_connection::TpuConnection,
tpu_connection::{ClientStats, TpuConnection},
},
lazy_static::lazy_static,
solana_sdk::transport::Result as TransportResult,
log::*,
solana_sdk::transport::{Result as TransportResult, TransportError},
std::{
net::SocketAddr,
sync::{Arc, Condvar, Mutex, MutexGuard},
sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
time::Duration,
},
tokio::runtime::Runtime,
tokio::{runtime::Runtime, time::timeout},
};

const MAX_OUTSTANDING_TASK: u64 = 2000;
const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;

/// A semaphore used for limiting the number of asynchronous tasks spawn to the
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
Expand Down Expand Up @@ -107,18 +110,48 @@ async fn send_wire_transaction_async(
connection: Arc<NonblockingQuicTpuConnection>,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let result = connection.send_wire_transaction(wire_transaction).await;
let result = timeout(
Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
connection.send_wire_transaction(wire_transaction),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
result
handle_send_result(result, connection)
}

async fn send_wire_transaction_batch_async(
connection: Arc<NonblockingQuicTpuConnection>,
buffers: Vec<Vec<u8>>,
) -> TransportResult<()> {
let result = connection.send_wire_transaction_batch(&buffers).await;
let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64;

let result = timeout(
Duration::from_millis(time_out),
connection.send_wire_transaction_batch(&buffers),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
result
handle_send_result(result, connection)
}

/// Check the send result and update stats if timedout. Returns the checked result.
fn handle_send_result(
result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
connection: Arc<NonblockingQuicTpuConnection>,
) -> Result<(), TransportError> {
match result {
Ok(result) => result,
Err(_err) => {
let client_stats = ClientStats::default();
client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
let stats = connection.connection_stats();
stats.add_client_stats(&client_stats, 0, false);
info!("Timedout sending transaction {:?}", connection.tpu_addr());
Err(TransportError::Custom(
"Timedout sending transaction".to_string(),
))
}
}
}

impl TpuConnection for QuicTpuConnection {
Expand Down
1 change: 1 addition & 0 deletions client/src/tpu_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct ClientStats {
pub tx_data_blocked: MovingStat,
pub tx_acks: MovingStat,
pub make_connection_ms: AtomicU64,
pub send_timeout: AtomicU64,
}

#[enum_dispatch]
Expand Down

0 comments on commit f3471e5

Please sign in to comment.