From 5afc7777e9a2dd0ab3fbfa24274e4f4b382d36cd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Jan 2020 12:58:36 +0100 Subject: [PATCH] client/finality-grandpa: Make round_communication use bounded channel (#4691) * clinet/finality-grandpa: Make round_communication use bounded channel `round_communication` returns a `Sink` and a `Stream` for outgoing and incoming messages. The messages send into the `Sink` are forwarded down to the network as well as send back into the `Stream` to ensure the node processes its own messages. So far, to send messages into the `Sink` back into the `Stream`, an unbounded channel was used. This patch updates `round_communication` and `OutgoingMessages` to use a bounded channel. This is part of a greater effort to reduce the number of owners of components within `finality-grandpa` and `network` as well as to reduce the amount of unbounded channels. For details see d4fbb897c and f0c18520a. * client/finality-grandpa: Import futures03::future::ready at the top * client/finality-grandpa: Make tests use compat of future 03 * client/finality-grandpa: Do not import ready into scope Instead of importing futures03::future::ready into the scope, only import futures::future03 into scope and call ready as furure03::ready. --- .../finality-grandpa/src/communication/mod.rs | 89 ++++++++++--------- client/finality-grandpa/src/environment.rs | 9 +- client/finality-grandpa/src/tests.rs | 25 ++++-- 3 files changed, 76 insertions(+), 47 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 18cb14c739..64637c0ed8 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -27,12 +27,13 @@ //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. -use futures::{prelude::*, sync::mpsc}; +use futures::prelude::*; use futures03::{ channel::mpsc as mpsc03, compat::Compat, - future::{Future as Future03}, - stream::StreamExt, + future::{self as future03, Future as Future03}, + sink::Sink as Sink03, + stream::{Stream as Stream03, StreamExt}, }; use log::{debug, trace}; use parking_lot::Mutex; @@ -276,8 +277,8 @@ impl> NetworkBridge { local_key: Option, has_voted: HasVoted, ) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error>, + impl Stream03> + Unpin, + OutgoingMessages, ) { self.note_round( round, @@ -295,22 +296,20 @@ impl> NetworkBridge { }); let topic = round_topic::(round.0, set_id.0); - let incoming = Compat::new(self.gossip_engine.messages_for(topic) - .map(|item| Ok::<_, ()>(item))) - .filter_map(|notification| { + let incoming = self.gossip_engine.messages_for(topic) + .filter_map(move |notification| { let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); - if let Err(ref e) = decoded { - debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); - } - decoded.ok() - }) - .and_then(move |msg| { - match msg { - GossipMessage::Vote(msg) => { + + match decoded { + Err(ref e) => { + debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); + return future03::ready(None); + } + Ok(GossipMessage::Vote(msg)) => { // check signature. if !voters.contains_key(&msg.message.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); - return Ok(None); + return future03::ready(None); } if voters.len() <= TELEMETRY_VOTERS_LIMIT { @@ -339,18 +338,16 @@ impl> NetworkBridge { }; } - Ok(Some(msg.message)) + future03::ready(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); - return Ok(None); + return future03::ready(None); } } - }) - .filter_map(|x| x) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); + }); - let (tx, out_rx) = mpsc::unbounded(); + let (tx, out_rx) = mpsc03::channel(0); let outgoing = OutgoingMessages:: { round: round.0, set_id: set_id.0, @@ -360,14 +357,10 @@ impl> NetworkBridge { has_voted, }; - let out_rx = out_rx.map_err(move |()| Error::Network( - format!("Failed to receive on unbounded receiver for round {}", round.0) - )); - // Combine incoming votes from external GRANDPA nodes with outgoing // votes from our own GRANDPA voter to have a single // vote-import-pipeline. - let incoming = incoming.select(out_rx); + let incoming = futures03::stream::select(incoming, out_rx); (incoming, outgoing) } @@ -690,21 +683,29 @@ pub(crate) fn check_message_sig_with_buffer( /// use the same raw message and key to sign. This is currently true for /// `ed25519` and `BLS` signatures (which we might use in the future), care must /// be taken when switching to different key types. -struct OutgoingMessages { +pub(crate) struct OutgoingMessages { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, - sender: mpsc::UnboundedSender>, + sender: mpsc03::Sender>, network: GossipEngine, has_voted: HasVoted, } -impl Sink for OutgoingMessages +impl Unpin for OutgoingMessages {} + +impl Sink03> for OutgoingMessages { - type SinkItem = Message; - type SinkError = Error; + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03> { + Sink03::poll_ready(Pin::new(&mut self.sender), cx) + .map(|elem| { elem.map_err(|e| { + Error::Network(format!("Failed to poll_ready channel sender: {:?}", e)) + })}) + } - fn start_send(&mut self, mut msg: Message) -> StartSend, Error> { + fn start_send(mut self: Pin<&mut Self>, mut msg: Message) -> Result<(), Self::Error> { // if we've voted on this round previously under the same key, send that vote instead match &mut msg { finality_grandpa::Message::PrimaryPropose(ref mut vote) => @@ -760,17 +761,23 @@ impl Sink for OutgoingMessages self.network.gossip_message(topic, message.encode(), false); // forward the message to the inner sender. - let _ = self.sender.unbounded_send(signed); - } + return self.sender.start_send(signed).map_err(|e| { + Error::Network(format!("Failed to start_send on channel sender: {:?}", e)) + }); + }; - Ok(AsyncSink::Ready) + Ok(()) } - fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03> { + Poll03::Ready(Ok(())) + } - fn close(&mut self) -> Poll<(), Error> { - // ignore errors since we allow this inner sender to be closed already. - self.sender.close().or_else(|_| Ok(Async::Ready(()))) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03> { + Sink03::poll_close(Pin::new(&mut self.sender), cx) + .map(|elem| { elem.map_err(|e| { + Error::Network(format!("Failed to poll_close channel sender: {:?}", e)) + })}) } } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 372229001d..c5c6291dc0 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -22,7 +22,11 @@ use std::time::Duration; use log::{debug, warn, info}; use parity_scale_codec::{Decode, Encode}; use futures::prelude::*; -use futures03::future::{FutureExt as _, TryFutureExt as _}; +use futures03::{ + compat::{Compat, CompatSink}, + future::{FutureExt as _, TryFutureExt as _}, + stream::StreamExt as _, +}; use futures_timer::Delay; use parking_lot::RwLock; use sp_blockchain::{HeaderBackend, Error as ClientError}; @@ -608,6 +612,9 @@ where has_voted, ); + let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item))); + let outgoing = CompatSink::new(outgoing); + // schedule incoming messages from the network to be held until // corresponding blocks are imported. let incoming = Box::new(UntilVoteTargetImported::new( diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index 889250c54d..fdfb2fd808 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -37,15 +37,17 @@ use sp_consensus::{ BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport, import_queue::{BoxJustificationImport, BoxFinalityProofImport}, }; -use std::collections::{HashMap, HashSet}; -use std::result; +use std::{ + collections::{HashMap, HashSet}, + result, + pin::Pin, task, +}; use parity_scale_codec::Decode; -use sp_runtime::traits::{Header as HeaderT, HasherFor}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor}; use sp_runtime::generic::{BlockId, DigestItem}; use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public}; use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi}; use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check}; -use std::{pin::Pin, task}; use authorities::AuthoritySet; use finality_proof::{ @@ -1282,6 +1284,9 @@ fn voter_persists_its_votes() { HasVoted::No, ); + let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item))); + let round_tx = futures03::compat::CompatSink::new(round_tx); + let round_tx = Arc::new(Mutex::new(round_tx)); let exit_tx = Arc::new(Mutex::new(Some(exit_tx))); @@ -1332,7 +1337,17 @@ fn voter_persists_its_votes() { target_hash: block_30_hash, }; - round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap(); + // One should either be calling `Sink::send` or `Sink::start_send` followed + // by `Sink::poll_complete` to make sure items are being flushed. Given that + // we send in a loop including a delay until items are received, this can be + // ignored for the sake of reduced complexity. + if !round_tx.lock() + .start_send(finality_grandpa::Message::Prevote(prevote)) + .unwrap() + .is_ready() { + panic!("expected sink to be ready to write to."); + } + Ok(()) }).map_err(|_| panic!()))