Skip to content

Commit

Permalink
feat: add ping() to all comms RPC clients (#3227)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
<!--- Describe your changes in detail -->
Adds a `ping()` function to all comms RPC clients. `ping()` sends an RPC
request with the `ACK` flag set. The server will immediately reply with
an `ACK` response. This accurately measures RPC latency without a
potentially slow backend. `ping()` is now used in the wallet monitor.

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->

Previously, `get_last_request_latency` would refer to the latency of `get_tip_info` 
which will increase whenever the blockchain db is busy, for e.g:
- the base node is syncing
- another node(s) syncing from the base node
- one or many wallets scanning UTXOs from the base node
- one or many wallets running a recovery from the base node
- lots of lmdb writes e.g large reorg

A client wallet would, of course, have no idea that this is occurring
and simply display poor latency. This could be perceived to be a poor
RPC performance, when in fact, there are a number or non-network/RPC
related reasons why a ping > 1/2 seconds  is displayed.

`get_last_request_latency` is a better measure when determining
current base node performance (caveats: depending on the RPC method impl, 
current performance does not predict future performance). However,
it is misleading to use this as a user-facing value presented as network latency.

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
Unit test, console wallet test

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
sdbondi authored Aug 23, 2021
1 parent 9135eaa commit b5b6223
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 41 deletions.
6 changes: 3 additions & 3 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
let latency = client.get_last_request_latency().await?;

let tip_info = client.get_tip_info().await?;

Expand All @@ -134,14 +133,15 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse)
})?;

let latency = client.ping().await?;
let is_synced = tip_info.is_synced;
debug!(
target: LOG_TARGET,
"Base node {} Tip: {} ({}) Latency: {} ms",
peer_node_id,
chain_metadata.height_of_longest_chain(),
if is_synced { "Synced" } else { "Syncing..." },
latency.unwrap_or_default().as_millis()
latency.as_millis()
);

self.db.set_chain_metadata(chain_metadata.clone()).await?;
Expand All @@ -150,7 +150,7 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency,
latency: Some(latency),
online: OnlineStatus::Online,
})
.await;
Expand Down
4 changes: 4 additions & 0 deletions comms/rpc_macros/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ impl RpcCodeGenerator {
self.inner.get_last_request_latency().await
}

pub async fn ping(&mut self) -> Result<std::time::Duration, #dep_mod::RpcError> {
self.inner.ping().await
}

pub fn close(&mut self) {
self.inner.close();
}
Expand Down
20 changes: 10 additions & 10 deletions comms/src/noise/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ pub struct Handshake<TSocket> {
impl<TSocket> Handshake<TSocket> {
pub fn new(socket: TSocket, state: HandshakeState) -> Self {
Self {
socket: NoiseSocket::new(socket, Box::new(state).into()),
socket: NoiseSocket::new(socket, state.into()),
}
}
}
Expand Down Expand Up @@ -585,8 +585,8 @@ where TSocket: AsyncRead + AsyncWrite + Unpin

#[derive(Debug)]
enum NoiseState {
HandshakeState(Box<HandshakeState>),
TransportState(Box<TransportState>),
HandshakeState(HandshakeState),
TransportState(TransportState),
}

macro_rules! proxy_state_method {
Expand Down Expand Up @@ -619,20 +619,20 @@ impl NoiseState {

pub fn into_transport_mode(self) -> Result<Self, snow::Error> {
match self {
NoiseState::HandshakeState(state) => Ok(NoiseState::TransportState(Box::new(state.into_transport_mode()?))),
NoiseState::HandshakeState(state) => Ok(NoiseState::TransportState(state.into_transport_mode()?)),
_ => Err(snow::Error::State(StateProblem::HandshakeAlreadyFinished)),
}
}
}

impl From<Box<HandshakeState>> for NoiseState {
fn from(state: Box<HandshakeState>) -> Self {
impl From<HandshakeState> for NoiseState {
fn from(state: HandshakeState) -> Self {
NoiseState::HandshakeState(state)
}
}

impl From<Box<TransportState>> for NoiseState {
fn from(state: Box<TransportState>) -> Self {
impl From<TransportState> for NoiseState {
fn from(state: TransportState) -> Self {
NoiseState::TransportState(state)
}
}
Expand Down Expand Up @@ -662,8 +662,8 @@ mod test {

let (dialer_socket, listener_socket) = MemorySocket::new_pair();
let (dialer, listener) = (
NoiseSocket::new(dialer_socket, Box::new(dialer_session).into()),
NoiseSocket::new(listener_socket, Box::new(listener_session).into()),
NoiseSocket::new(dialer_socket, dialer_session.into()),
NoiseSocket::new(listener_socket, listener_session.into()),
);

Ok((
Expand Down
140 changes: 112 additions & 28 deletions comms/src/protocol/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
protocol::{
rpc::{
body::ClientStreaming,
message::BaseRequest,
message::{BaseRequest, RpcMessageFlags},
Handshake,
NamedProtocolService,
Response,
Expand Down Expand Up @@ -136,6 +136,11 @@ impl RpcClient {
self.connector.get_last_request_latency()
}

/// Sends a ping and returns the latency
pub fn ping(&mut self) -> impl Future<Output = Result<Duration, RpcError>> + '_ {
self.connector.send_ping()
}

async fn call_inner(
&mut self,
request: BaseRequest<Bytes>,
Expand Down Expand Up @@ -276,6 +281,17 @@ impl ClientConnector {
reply_rx.await.map_err(|_| RpcError::RequestCancelled)
}

pub async fn send_ping(&mut self) -> Result<Duration, RpcError> {
let (reply, reply_rx) = oneshot::channel();
self.inner
.send(ClientRequest::SendPing(reply))
.await
.map_err(|_| RpcError::ClientClosed)?;

let latency = reply_rx.await.map_err(|_| RpcError::RequestCancelled)??;
Ok(latency)
}

pub fn is_connected(&self) -> bool {
!self.inner.is_closed()
}
Expand Down Expand Up @@ -319,7 +335,7 @@ pub struct RpcClientWorker<TSubstream> {
// sent determines the byte size. A u16 will be more than enough for the purpose
next_request_id: u16,
ready_tx: Option<oneshot::Sender<Result<(), RpcError>>>,
latency: Option<Duration>,
last_request_latency: Option<Duration>,
protocol_id: ProtocolId,
}

Expand All @@ -339,7 +355,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
framed,
next_request_id: 0,
ready_tx: Some(ready_tx),
latency: None,
last_request_latency: None,
protocol_id,
}
}
Expand All @@ -365,7 +381,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
self.protocol_name(),
latency
);
self.latency = Some(latency);
self.last_request_latency = Some(latency);
if let Some(r) = self.ready_tx.take() {
let _ = r.send(Ok(()));
}
Expand All @@ -389,7 +405,13 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
}
},
GetLastRequestLatency(reply) => {
let _ = reply.send(self.latency);
let _ = reply.send(self.last_request_latency);
},
SendPing(reply) => {
if let Err(err) = self.do_ping_pong(reply).await {
error!(target: LOG_TARGET, "Unexpected error: {}. Worker is terminating.", err);
break;
}
},
}
}
Expand All @@ -404,6 +426,52 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
);
}

async fn do_ping_pong(&mut self, reply: oneshot::Sender<Result<Duration, RpcStatus>>) -> Result<(), RpcError> {
let ack = proto::rpc::RpcRequest {
flags: RpcMessageFlags::ACK.bits() as u32,
deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0),
..Default::default()
};

let start = Instant::now();
self.framed.send(ack.to_encoded_bytes().into()).await?;

debug!(
target: LOG_TARGET,
"Ping (protocol {}) sent in {:.2?}",
self.protocol_name(),
start.elapsed()
);
let resp = match self.read_reply().await {
Ok(resp) => resp,
Err(RpcError::ReplyTimeout) => {
debug!(target: LOG_TARGET, "Ping timed out after {:.0?}", start.elapsed());
let _ = reply.send(Err(RpcStatus::timed_out("Response timed out")));
return Ok(());
},
Err(err) => return Err(err),
};

let status = RpcStatus::from(&resp);
if !status.is_ok() {
let _ = reply.send(Err(status.clone()));
return Err(status.into());
}

let resp_flags = RpcMessageFlags::from_bits_truncate(resp.flags as u8);
if !resp_flags.contains(RpcMessageFlags::ACK) {
warn!(target: LOG_TARGET, "Invalid ping response {:?}", resp);
let _ = reply.send(Err(RpcStatus::protocol_error(format!(
"Received invalid ping response on protocol '{}'",
self.protocol_name()
))));
return Err(RpcError::InvalidPingResponse);
}

let _ = reply.send(Ok(start.elapsed()));
Ok(())
}

async fn do_request_response(
&mut self,
request: BaseRequest<Bytes>,
Expand All @@ -424,43 +492,29 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
let start = Instant::now();
self.framed.send(req.to_encoded_bytes().into()).await?;

let (mut response_tx, response_rx) = mpsc::channel(1);
let (mut response_tx, response_rx) = mpsc::channel(10);
if reply.send(response_rx).is_err() {
debug!(target: LOG_TARGET, "Client request was cancelled.");
response_tx.close_channel();
}

loop {
// Wait until the timeout, allowing an extra grace period to account for latency
let next_msg_fut = match self.config.timeout_with_grace_period() {
Some(timeout) => Either::Left(time::timeout(timeout, self.framed.next())),
None => Either::Right(self.framed.next().map(Ok)),
};

let resp = match next_msg_fut.await {
Ok(Some(Ok(resp))) => {
let resp = match self.read_reply().await {
Ok(resp) => {
let latency = start.elapsed();
trace!(
target: LOG_TARGET,
"Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}",
resp.len(),
resp.message.len(),
request_id,
self.protocol_name(),
method,
latency
);
self.latency = Some(latency);
proto::rpc::RpcResponse::decode(resp)?
},
Ok(Some(Err(err))) => {
return Err(err.into());
self.last_request_latency = Some(latency);
resp
},
Ok(None) => {
return Err(RpcError::ServerClosedRequest);
},

// Timeout
Err(_) => {
Err(RpcError::ReplyTimeout) => {
debug!(
target: LOG_TARGET,
"Request {} (method={}) timed out after {:.0?}",
Expand All @@ -472,6 +526,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
response_tx.close_channel();
break;
},
Err(err) => return Err(err),
};

match Self::convert_to_result(resp, request_id) {
Expand All @@ -480,7 +535,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
// We just ignore that as we still want obey the protocol and receive messages until the FIN flag or
// the connection is dropped
let is_finished = resp.is_finished();
if !response_tx.is_closed() {
if response_tx.is_closed() {
warn!(
target: LOG_TARGET,
"Response receiver was dropped before the response/stream could complete for protocol {}, \
the stream will continue until completed",
self.protocol_name()
);
} else {
let _ = response_tx.send(Ok(resp)).await;
}
if is_finished {
Expand All @@ -496,7 +558,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
response_tx.close_channel();
break;
},
Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) => {
Err(err @ RpcError::ResponseIdDidNotMatchRequest { .. }) |
Err(err @ RpcError::UnexpectedAckResponse) => {
warn!(target: LOG_TARGET, "{}", err);
// Ignore the response, this can happen when there is excessive latency. The server sends back a
// reply before the deadline but it is only received after the client has timed
Expand All @@ -510,6 +573,21 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
Ok(())
}

async fn read_reply(&mut self) -> Result<proto::rpc::RpcResponse, RpcError> {
// Wait until the timeout, allowing an extra grace period to account for latency
let next_msg_fut = match self.config.timeout_with_grace_period() {
Some(timeout) => Either::Left(time::timeout(timeout, self.framed.next())),
None => Either::Right(self.framed.next().map(Ok)),
};

match next_msg_fut.await {
Ok(Some(Ok(resp))) => Ok(proto::rpc::RpcResponse::decode(resp)?),
Ok(Some(Err(err))) => Err(err.into()),
Ok(None) => Err(RpcError::ServerClosedRequest),
Err(_) => Err(RpcError::ReplyTimeout),
}
}

fn next_request_id(&mut self) -> u16 {
let next_id = self.next_request_id;
// request_id is allowed to wrap around back to 0
Expand All @@ -524,6 +602,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
let resp_id = u16::try_from(resp.request_id)
.map_err(|_| RpcStatus::protocol_error(format!("invalid request_id: must be less than {}", u16::MAX)))?;

let flags = RpcMessageFlags::from_bits_truncate(resp.flags as u8);
if flags.contains(RpcMessageFlags::ACK) {
return Err(RpcError::UnexpectedAckResponse);
}

if resp_id != request_id {
return Err(RpcError::ResponseIdDidNotMatchRequest {
expected: request_id,
Expand Down Expand Up @@ -551,4 +634,5 @@ pub enum ClientRequest {
reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
},
GetLastRequestLatency(oneshot::Sender<Option<Duration>>),
SendPing(oneshot::Sender<Result<Duration, RpcStatus>>),
}
6 changes: 6 additions & 0 deletions comms/src/protocol/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ pub enum RpcError {
PeerManagerError(#[from] PeerManagerError),
#[error("Connectivity error: {0}")]
ConnectivityError(#[from] ConnectivityError),
#[error("Reply Timeout")]
ReplyTimeout,
#[error("Received an invalid ping response")]
InvalidPingResponse,
#[error("Unexpected ACK response. This is likely because of a previous ACK timeout")]
UnexpectedAckResponse,
#[error(transparent)]
UnknownError(#[from] anyhow::Error),
}
Expand Down
4 changes: 4 additions & 0 deletions comms/src/protocol/rpc/test/greeting_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ impl GreetingClient {
self.inner.get_last_request_latency().await
}

pub async fn ping(&mut self) -> Result<Duration, RpcError> {
self.inner.ping().await
}

pub fn close(&mut self) {
self.inner.close();
}
Expand Down
Loading

0 comments on commit b5b6223

Please sign in to comment.