From b5b62238cf7512abb38803c426369ebbcc8fe540 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 23 Aug 2021 15:20:05 +0400 Subject: [PATCH] feat: add `ping()` to all comms RPC clients (#3227) ## Description Adds a `ping()` function to all comms RPC clients. `ping()` sends an RPC request with the `ACK` flag set. The server will immediately reply with an `ACK` response. This accurately measures RPC latency without a potentially slow backend. `ping()` is now used in the wallet monitor. ## Motivation and Context Previously, `get_last_request_latency` would refer to the latency of `get_tip_info` which will increase whenever the blockchain db is busy, for e.g: - the base node is syncing - another node(s) syncing from the base node - one or many wallets scanning UTXOs from the base node - one or many wallets running a recovery from the base node - lots of lmdb writes e.g large reorg A client wallet would, of course, have no idea that this is occurring and simply display poor latency. This could be perceived to be a poor RPC performance, when in fact, there are a number or non-network/RPC related reasons why a ping > 1/2 seconds is displayed. `get_last_request_latency` is a better measure when determining current base node performance (caveats: depending on the RPC method impl, current performance does not predict future performance). However, it is misleading to use this as a user-facing value presented as network latency. ## How Has This Been Tested? Unit test, console wallet test ## Checklist: * [x] I'm merging against the `development` branch. * [x] I have squashed my commits into a single commit. --- .../wallet/src/base_node_service/monitor.rs | 6 +- comms/rpc_macros/src/generator.rs | 4 + comms/src/noise/socket.rs | 20 +-- comms/src/protocol/rpc/client.rs | 140 ++++++++++++++---- comms/src/protocol/rpc/error.rs | 6 + .../src/protocol/rpc/test/greeting_service.rs | 4 + comms/src/protocol/rpc/test/smoke.rs | 13 ++ 7 files changed, 152 insertions(+), 41 deletions(-) diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 86f17eb227..5a2c3a7e76 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -123,7 +123,6 @@ impl BaseNodeMonitor { .obtain_base_node_wallet_rpc_client() .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; - let latency = client.get_last_request_latency().await?; let tip_info = client.get_tip_info().await?; @@ -134,6 +133,7 @@ impl BaseNodeMonitor { ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse) })?; + let latency = client.ping().await?; let is_synced = tip_info.is_synced; debug!( target: LOG_TARGET, @@ -141,7 +141,7 @@ impl BaseNodeMonitor { peer_node_id, chain_metadata.height_of_longest_chain(), if is_synced { "Synced" } else { "Syncing..." }, - latency.unwrap_or_default().as_millis() + latency.as_millis() ); self.db.set_chain_metadata(chain_metadata.clone()).await?; @@ -150,7 +150,7 @@ impl BaseNodeMonitor { chain_metadata: Some(chain_metadata), is_synced: Some(is_synced), updated: Some(Utc::now().naive_utc()), - latency, + latency: Some(latency), online: OnlineStatus::Online, }) .await; diff --git a/comms/rpc_macros/src/generator.rs b/comms/rpc_macros/src/generator.rs index bb8c05a53b..f3e8cbffd1 100644 --- a/comms/rpc_macros/src/generator.rs +++ b/comms/rpc_macros/src/generator.rs @@ -211,6 +211,10 @@ impl RpcCodeGenerator { self.inner.get_last_request_latency().await } + pub async fn ping(&mut self) -> Result { + self.inner.ping().await + } + pub fn close(&mut self) { self.inner.close(); } diff --git a/comms/src/noise/socket.rs b/comms/src/noise/socket.rs index 149328cf60..eaf02a60b0 100644 --- a/comms/src/noise/socket.rs +++ b/comms/src/noise/socket.rs @@ -513,7 +513,7 @@ pub struct Handshake { impl Handshake { pub fn new(socket: TSocket, state: HandshakeState) -> Self { Self { - socket: NoiseSocket::new(socket, Box::new(state).into()), + socket: NoiseSocket::new(socket, state.into()), } } } @@ -585,8 +585,8 @@ where TSocket: AsyncRead + AsyncWrite + Unpin #[derive(Debug)] enum NoiseState { - HandshakeState(Box), - TransportState(Box), + HandshakeState(HandshakeState), + TransportState(TransportState), } macro_rules! proxy_state_method { @@ -619,20 +619,20 @@ impl NoiseState { pub fn into_transport_mode(self) -> Result { match self { - NoiseState::HandshakeState(state) => Ok(NoiseState::TransportState(Box::new(state.into_transport_mode()?))), + NoiseState::HandshakeState(state) => Ok(NoiseState::TransportState(state.into_transport_mode()?)), _ => Err(snow::Error::State(StateProblem::HandshakeAlreadyFinished)), } } } -impl From> for NoiseState { - fn from(state: Box) -> Self { +impl From for NoiseState { + fn from(state: HandshakeState) -> Self { NoiseState::HandshakeState(state) } } -impl From> for NoiseState { - fn from(state: Box) -> Self { +impl From for NoiseState { + fn from(state: TransportState) -> Self { NoiseState::TransportState(state) } } @@ -662,8 +662,8 @@ mod test { let (dialer_socket, listener_socket) = MemorySocket::new_pair(); let (dialer, listener) = ( - NoiseSocket::new(dialer_socket, Box::new(dialer_session).into()), - NoiseSocket::new(listener_socket, Box::new(listener_session).into()), + NoiseSocket::new(dialer_socket, dialer_session.into()), + NoiseSocket::new(listener_socket, listener_session.into()), ); Ok(( diff --git a/comms/src/protocol/rpc/client.rs b/comms/src/protocol/rpc/client.rs index 737255bbd5..9317d6727b 100644 --- a/comms/src/protocol/rpc/client.rs +++ b/comms/src/protocol/rpc/client.rs @@ -28,7 +28,7 @@ use crate::{ protocol::{ rpc::{ body::ClientStreaming, - message::BaseRequest, + message::{BaseRequest, RpcMessageFlags}, Handshake, NamedProtocolService, Response, @@ -136,6 +136,11 @@ impl RpcClient { self.connector.get_last_request_latency() } + /// Sends a ping and returns the latency + pub fn ping(&mut self) -> impl Future> + '_ { + self.connector.send_ping() + } + async fn call_inner( &mut self, request: BaseRequest, @@ -276,6 +281,17 @@ impl ClientConnector { reply_rx.await.map_err(|_| RpcError::RequestCancelled) } + pub async fn send_ping(&mut self) -> Result { + let (reply, reply_rx) = oneshot::channel(); + self.inner + .send(ClientRequest::SendPing(reply)) + .await + .map_err(|_| RpcError::ClientClosed)?; + + let latency = reply_rx.await.map_err(|_| RpcError::RequestCancelled)??; + Ok(latency) + } + pub fn is_connected(&self) -> bool { !self.inner.is_closed() } @@ -319,7 +335,7 @@ pub struct RpcClientWorker { // sent determines the byte size. A u16 will be more than enough for the purpose next_request_id: u16, ready_tx: Option>>, - latency: Option, + last_request_latency: Option, protocol_id: ProtocolId, } @@ -339,7 +355,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send framed, next_request_id: 0, ready_tx: Some(ready_tx), - latency: None, + last_request_latency: None, protocol_id, } } @@ -365,7 +381,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send self.protocol_name(), latency ); - self.latency = Some(latency); + self.last_request_latency = Some(latency); if let Some(r) = self.ready_tx.take() { let _ = r.send(Ok(())); } @@ -389,7 +405,13 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send } }, GetLastRequestLatency(reply) => { - let _ = reply.send(self.latency); + let _ = reply.send(self.last_request_latency); + }, + SendPing(reply) => { + if let Err(err) = self.do_ping_pong(reply).await { + error!(target: LOG_TARGET, "Unexpected error: {}. Worker is terminating.", err); + break; + } }, } } @@ -404,6 +426,52 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send ); } + async fn do_ping_pong(&mut self, reply: oneshot::Sender>) -> Result<(), RpcError> { + let ack = proto::rpc::RpcRequest { + flags: RpcMessageFlags::ACK.bits() as u32, + deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0), + ..Default::default() + }; + + let start = Instant::now(); + self.framed.send(ack.to_encoded_bytes().into()).await?; + + debug!( + target: LOG_TARGET, + "Ping (protocol {}) sent in {:.2?}", + self.protocol_name(), + start.elapsed() + ); + let resp = match self.read_reply().await { + Ok(resp) => resp, + Err(RpcError::ReplyTimeout) => { + debug!(target: LOG_TARGET, "Ping timed out after {:.0?}", start.elapsed()); + let _ = reply.send(Err(RpcStatus::timed_out("Response timed out"))); + return Ok(()); + }, + Err(err) => return Err(err), + }; + + let status = RpcStatus::from(&resp); + if !status.is_ok() { + let _ = reply.send(Err(status.clone())); + return Err(status.into()); + } + + let resp_flags = RpcMessageFlags::from_bits_truncate(resp.flags as u8); + if !resp_flags.contains(RpcMessageFlags::ACK) { + warn!(target: LOG_TARGET, "Invalid ping response {:?}", resp); + let _ = reply.send(Err(RpcStatus::protocol_error(format!( + "Received invalid ping response on protocol '{}'", + self.protocol_name() + )))); + return Err(RpcError::InvalidPingResponse); + } + + let _ = reply.send(Ok(start.elapsed())); + Ok(()) + } + async fn do_request_response( &mut self, request: BaseRequest, @@ -424,43 +492,29 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let start = Instant::now(); self.framed.send(req.to_encoded_bytes().into()).await?; - let (mut response_tx, response_rx) = mpsc::channel(1); + let (mut response_tx, response_rx) = mpsc::channel(10); if reply.send(response_rx).is_err() { debug!(target: LOG_TARGET, "Client request was cancelled."); response_tx.close_channel(); } loop { - // Wait until the timeout, allowing an extra grace period to account for latency - let next_msg_fut = match self.config.timeout_with_grace_period() { - Some(timeout) => Either::Left(time::timeout(timeout, self.framed.next())), - None => Either::Right(self.framed.next().map(Ok)), - }; - - let resp = match next_msg_fut.await { - Ok(Some(Ok(resp))) => { + let resp = match self.read_reply().await { + Ok(resp) => { let latency = start.elapsed(); trace!( target: LOG_TARGET, "Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}", - resp.len(), + resp.message.len(), request_id, self.protocol_name(), method, latency ); - self.latency = Some(latency); - proto::rpc::RpcResponse::decode(resp)? - }, - Ok(Some(Err(err))) => { - return Err(err.into()); + self.last_request_latency = Some(latency); + resp }, - Ok(None) => { - return Err(RpcError::ServerClosedRequest); - }, - - // Timeout - Err(_) => { + Err(RpcError::ReplyTimeout) => { debug!( target: LOG_TARGET, "Request {} (method={}) timed out after {:.0?}", @@ -472,6 +526,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send response_tx.close_channel(); break; }, + Err(err) => return Err(err), }; match Self::convert_to_result(resp, request_id) { @@ -480,7 +535,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send // We just ignore that as we still want obey the protocol and receive messages until the FIN flag or // the connection is dropped let is_finished = resp.is_finished(); - if !response_tx.is_closed() { + if response_tx.is_closed() { + warn!( + target: LOG_TARGET, + "Response receiver was dropped before the response/stream could complete for protocol {}, \ + the stream will continue until completed", + self.protocol_name() + ); + } else { let _ = response_tx.send(Ok(resp)).await; } if is_finished { @@ -496,7 +558,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send response_tx.close_channel(); break; }, - Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) => { + Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) | + Err(err @ RpcError::UnexpectedAckResponse) => { warn!(target: LOG_TARGET, "{}", err); // Ignore the response, this can happen when there is excessive latency. The server sends back a // reply before the deadline but it is only received after the client has timed @@ -510,6 +573,21 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send Ok(()) } + async fn read_reply(&mut self) -> Result { + // Wait until the timeout, allowing an extra grace period to account for latency + let next_msg_fut = match self.config.timeout_with_grace_period() { + Some(timeout) => Either::Left(time::timeout(timeout, self.framed.next())), + None => Either::Right(self.framed.next().map(Ok)), + }; + + match next_msg_fut.await { + Ok(Some(Ok(resp))) => Ok(proto::rpc::RpcResponse::decode(resp)?), + Ok(Some(Err(err))) => Err(err.into()), + Ok(None) => Err(RpcError::ServerClosedRequest), + Err(_) => Err(RpcError::ReplyTimeout), + } + } + fn next_request_id(&mut self) -> u16 { let next_id = self.next_request_id; // request_id is allowed to wrap around back to 0 @@ -524,6 +602,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let resp_id = u16::try_from(resp.request_id) .map_err(|_| RpcStatus::protocol_error(format!("invalid request_id: must be less than {}", u16::MAX)))?; + let flags = RpcMessageFlags::from_bits_truncate(resp.flags as u8); + if flags.contains(RpcMessageFlags::ACK) { + return Err(RpcError::UnexpectedAckResponse); + } + if resp_id != request_id { return Err(RpcError::ResponseIdDidNotMatchRequest { expected: request_id, @@ -551,4 +634,5 @@ pub enum ClientRequest { reply: oneshot::Sender, RpcStatus>>>, }, GetLastRequestLatency(oneshot::Sender>), + SendPing(oneshot::Sender>), } diff --git a/comms/src/protocol/rpc/error.rs b/comms/src/protocol/rpc/error.rs index 959e781cf1..64f811f9d8 100644 --- a/comms/src/protocol/rpc/error.rs +++ b/comms/src/protocol/rpc/error.rs @@ -59,6 +59,12 @@ pub enum RpcError { PeerManagerError(#[from] PeerManagerError), #[error("Connectivity error: {0}")] ConnectivityError(#[from] ConnectivityError), + #[error("Reply Timeout")] + ReplyTimeout, + #[error("Received an invalid ping response")] + InvalidPingResponse, + #[error("Unexpected ACK response. This is likely because of a previous ACK timeout")] + UnexpectedAckResponse, #[error(transparent)] UnknownError(#[from] anyhow::Error), } diff --git a/comms/src/protocol/rpc/test/greeting_service.rs b/comms/src/protocol/rpc/test/greeting_service.rs index 516ef230a1..0e190473dd 100644 --- a/comms/src/protocol/rpc/test/greeting_service.rs +++ b/comms/src/protocol/rpc/test/greeting_service.rs @@ -372,6 +372,10 @@ impl GreetingClient { self.inner.get_last_request_latency().await } + pub async fn ping(&mut self) -> Result { + self.inner.ping().await + } + pub fn close(&mut self) { self.inner.close(); } diff --git a/comms/src/protocol/rpc/test/smoke.rs b/comms/src/protocol/rpc/test/smoke.rs index 14570b31ce..a762ac4c9c 100644 --- a/comms/src/protocol/rpc/test/smoke.rs +++ b/comms/src/protocol/rpc/test/smoke.rs @@ -248,6 +248,19 @@ async fn response_too_big() { let _ = client.reply_with_msg_of_size(max_size as u64).await.unwrap(); } +#[runtime::test_basic] +async fn ping_latency() { + let (socket, _, _, _shutdown) = setup(GreetingService::new(&[]), 1).await; + + let framed = framing::canonical(socket, RPC_MAX_FRAME_SIZE); + let mut client = GreetingClient::builder().connect(framed).await.unwrap(); + + let latency = client.ping().await.unwrap(); + // This is plenty (typically would be < 1ms over MemorySocket), however CI can be very slow, so to prevent flakiness + // we leave a wide berth + assert!(latency.as_secs() < 5); +} + #[runtime::test_basic] async fn server_shutdown_before_connect() { let (socket, _, _, mut shutdown) = setup(GreetingService::new(&[]), 1).await;