Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Time out async sends to avoid slackers stuck in the queue for too long (backport #28545) #28732

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl ConnectionCacheStats {
client_stats.make_connection_ms.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.total_client_stats.send_timeout.fetch_add(
client_stats.send_timeout.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.sent_packets
.fetch_add(num_packets as u64, Ordering::Relaxed);
self.total_batches.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -224,6 +228,13 @@ impl ConnectionCacheStats {
self.batch_failure.swap(0, Ordering::Relaxed),
i64
),
(
"send_timeout",
self.total_client_stats
.send_timeout
.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ impl QuicTpuConnection {
self.client.stats()
}

pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
self.connection_stats.clone()
}

pub fn new(
endpoint: Arc<QuicLazyInitializedEndpoint>,
addr: SocketAddr,
Expand Down
49 changes: 41 additions & 8 deletions client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ use {
},
tpu_connection::TpuConnection as NonblockingTpuConnection,
},
tpu_connection::TpuConnection,
tpu_connection::{ClientStats, TpuConnection},
},
lazy_static::lazy_static,
solana_sdk::transport::Result as TransportResult,
log::*,
solana_sdk::transport::{Result as TransportResult, TransportError},
std::{
net::SocketAddr,
sync::{Arc, Condvar, Mutex, MutexGuard},
sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
time::Duration,
},
tokio::runtime::Runtime,
tokio::{runtime::Runtime, time::timeout},
};

const MAX_OUTSTANDING_TASK: u64 = 2000;
const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;

/// A semaphore used for limiting the number of asynchronous tasks spawn to the
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
Expand Down Expand Up @@ -107,18 +110,48 @@ async fn send_wire_transaction_async(
connection: Arc<NonblockingQuicTpuConnection>,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let result = connection.send_wire_transaction(wire_transaction).await;
let result = timeout(
Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
connection.send_wire_transaction(wire_transaction),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
result
handle_send_result(result, connection)
}

async fn send_wire_transaction_batch_async(
connection: Arc<NonblockingQuicTpuConnection>,
buffers: Vec<Vec<u8>>,
) -> TransportResult<()> {
let result = connection.send_wire_transaction_batch(&buffers).await;
let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64;

let result = timeout(
Duration::from_millis(time_out),
connection.send_wire_transaction_batch(&buffers),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
result
handle_send_result(result, connection)
}

/// Check the send result and update stats if timedout. Returns the checked result.
fn handle_send_result(
result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
connection: Arc<NonblockingQuicTpuConnection>,
) -> Result<(), TransportError> {
match result {
Ok(result) => result,
Err(_err) => {
let client_stats = ClientStats::default();
client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
let stats = connection.connection_stats();
stats.add_client_stats(&client_stats, 0, false);
info!("Timedout sending transaction {:?}", connection.tpu_addr());
Err(TransportError::Custom(
"Timedout sending transaction".to_string(),
))
}
}
}

impl TpuConnection for QuicTpuConnection {
Expand Down
1 change: 1 addition & 0 deletions client/src/tpu_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct ClientStats {
pub tx_data_blocked: MovingStat,
pub tx_acks: MovingStat,
pub make_connection_ms: AtomicU64,
pub send_timeout: AtomicU64,
}

#[enum_dispatch]
Expand Down