From 75f0a5c751835aba15a3cd42ae3b30900f6b1428 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 3 Nov 2020 10:21:33 +0100 Subject: [PATCH] feat: add bootstrap message backlog --- src/consensus/dkg.rs | 26 ++++----------------- src/routing/approved.rs | 11 +++------ src/routing/bootstrap.rs | 49 +++++++++++++++++++++------------------- src/routing/mod.rs | 19 ++++++++++++---- src/routing/stage.rs | 16 +++++++++---- 5 files changed, 60 insertions(+), 61 deletions(-) diff --git a/src/consensus/dkg.rs b/src/consensus/dkg.rs index 99a02f6926..3495039f67 100644 --- a/src/consensus/dkg.rs +++ b/src/consensus/dkg.rs @@ -109,7 +109,7 @@ impl DkgVoter { ) -> Vec { if let Some(session) = &self.participant { if session.dkg_key == dkg_key && session.elders_info.is_some() { - trace!("{} DKG for {} already in progress", our_name, elders_info); + trace!("DKG for {} already in progress", elders_info); return vec![]; } } @@ -124,12 +124,7 @@ impl DkgVoter { match KeyGen::initialize(our_name, threshold, participants) { Ok((key_gen, message)) => { - trace!( - "{} DKG for {:?} {} starting", - our_name, - dkg_key, - elders_info - ); + trace!("DKG for {} starting", elders_info); let mut session = Participant { dkg_key, @@ -194,7 +189,7 @@ impl DkgVoter { let dkg_key = session.dkg_key; - trace!("{} DKG for {} progressing", our_name, elders_info); + trace!("DKG for {} progressing", elders_info); match session .key_gen @@ -361,12 +356,7 @@ impl Participant { return vec![]; } - trace!( - "{} process DKG message {:?} {:?}", - our_name, - dkg_key, - message - ); + trace!("process DKG message {:?}", message); let responses = self .key_gen .handle_message(&mut rand::thread_rng(), message) @@ -394,13 +384,7 @@ impl Participant { .copied() .collect(); - trace!( - "{} broadcasting DKG message {:?} {:?} to {:?}", - our_name, - dkg_key, - message, - recipients - ); + trace!("broadcasting DKG message {:?} to {:?}", message, recipients); let mut commands = vec![]; commands.push(DkgCommand::SendMessage { diff --git a/src/routing/approved.rs b/src/routing/approved.rs index d7a17dd200..e40df2701f 100644 --- a/src/routing/approved.rs +++ b/src/routing/approved.rs @@ -984,7 +984,7 @@ impl Approved { dkg_key: DkgKey, new_elders_info: EldersInfo, ) -> Result> { - trace!("{} Received DKGStart for {}", self.node, new_elders_info); + trace!("Received DKGStart for {}", new_elders_info); self.dkg_voter .start_participating(self.node.name(), dkg_key, new_elders_info) .into_commands(&self.node) @@ -1577,14 +1577,9 @@ impl Approved { } fn send_dkg_start(&mut self, new_elders_info: EldersInfo) -> Result> { - let dkg_key = DkgKey::new(&new_elders_info); + trace!("Send DKGStart for {}", new_elders_info); - trace!( - "{} Send DKGStart for {:?} {}", - self.node, - dkg_key, - new_elders_info - ); + let dkg_key = DkgKey::new(&new_elders_info); // Send to all participants. let recipients: Vec<_> = new_elders_info.elders.values().copied().collect(); diff --git a/src/routing/bootstrap.rs b/src/routing/bootstrap.rs index b6ab585ed4..055aca7c36 100644 --- a/src/routing/bootstrap.rs +++ b/src/routing/bootstrap.rs @@ -21,10 +21,12 @@ use crate::{ }; use bytes::Bytes; use futures::future::{self, Either}; -use std::{future::Future, mem, net::SocketAddr}; +use std::{collections::VecDeque, future::Future, mem, net::SocketAddr}; use tokio::sync::mpsc; use xor_name::Prefix; +const BACKLOG_CAPACITY: usize = 100; + /// Bootstrap into the network as an infant node. /// /// NOTE: It's not guaranteed this function ever returns. This can happen due to messages being @@ -35,7 +37,7 @@ pub(crate) async fn infant( comm: &Comm, incoming_messages: &mut IncomingMessages, bootstrap_addr: SocketAddr, -) -> Result<(Node, Section)> { +) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> { let (send_tx, send_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); @@ -62,7 +64,7 @@ pub(crate) async fn relocate( recv_rx: mpsc::Receiver<(Message, SocketAddr)>, bootstrap_addrs: Vec, relocate_details: SignedRelocateDetails, -) -> Result<(Node, Section)> { +) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> { let (send_tx, send_rx) = mpsc::channel(1); let state = State::new(node, send_tx, recv_rx)?; @@ -79,6 +81,8 @@ struct State { // Receiver for incoming messages. recv_rx: mpsc::Receiver<(Message, SocketAddr)>, node: Node, + // Backlog for unknown messages + backlog: VecDeque<(Message, SocketAddr)>, } impl State { @@ -91,6 +95,7 @@ impl State { send_tx, recv_rx, node, + backlog: VecDeque::with_capacity(BACKLOG_CAPACITY), }) } @@ -98,7 +103,7 @@ impl State { mut self, bootstrap_addrs: Vec, relocate_details: Option, - ) -> Result<(Node, Section)> { + ) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> { let (elders_info, section_key) = self .bootstrap(bootstrap_addrs, relocate_details.as_ref()) .await?; @@ -187,14 +192,7 @@ impl State { return Ok((response.clone(), sender)); } - _ => { - trace!( - "{} Useless message {:?} from {}", - self.node, - message, - sender, - ); - } + _ => self.backlog_message(message, sender), } } @@ -236,7 +234,7 @@ impl State { mut elders_info: EldersInfo, mut section_key: bls::PublicKey, relocate_payload: Option, - ) -> Result<(Node, Section)> { + ) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> { loop { self.send_join_requests(&elders_info, section_key, relocate_payload.as_ref()) .await?; @@ -250,7 +248,11 @@ impl State { elders_info, section_chain, } => { - return Ok((self.node, Section::new(section_chain, elders_info)?)); + return Ok(( + self.node, + Section::new(section_chain, elders_info)?, + self.backlog.into_iter().collect(), + )); } JoinResponse::Rejoin { elders_info: new_elders_info, @@ -359,14 +361,7 @@ impl State { )); } - _ => { - trace!( - "{} Useless message {:?} from {}", - self.node, - message, - sender, - ); - } + _ => self.backlog_message(message, sender), } } @@ -397,6 +392,14 @@ impl State { } } } + + fn backlog_message(&mut self, message: Message, sender: SocketAddr) { + while self.backlog.len() >= BACKLOG_CAPACITY { + let _ = self.backlog.pop_front(); + } + + self.backlog.push_back((message, sender)) + } } enum JoinResponse { @@ -543,7 +546,7 @@ mod tests { }; // Drive both tasks to completion concurrently (but on the same thread). - let ((_node, section), _) = future::try_join(bootstrap, others).await?; + let ((_node, section, _backlog), _) = future::try_join(bootstrap, others).await?; assert_eq!(*section.elders_info(), elders_info); assert_eq!(*section.chain().last_key(), pk); diff --git a/src/routing/mod.rs b/src/routing/mod.rs index 4a775247c0..21f9dde87f 100644 --- a/src/routing/mod.rs +++ b/src/routing/mod.rs @@ -88,7 +88,7 @@ impl Routing { let (event_tx, event_rx) = mpsc::unbounded_channel(); - let (state, comm, incoming_msgs) = if config.first { + let (state, comm, incoming_msgs, backlog) = if config.first { info!("{} Starting a new network as the seed node.", node_name); let comm = Comm::new(config.transport_config)?; let incoming_msgs = comm.listen()?; @@ -99,26 +99,37 @@ impl Routing { state.send_event(Event::Connected(Connected::First)); state.send_event(Event::PromotedToElder); - (state, comm, incoming_msgs) + (state, comm, incoming_msgs, vec![]) } else { info!("{} Bootstrapping a new node.", node_name); let (comm, bootstrap_addr) = Comm::from_bootstrapping(config.transport_config).await?; let mut incoming_msgs = comm.listen()?; let node = Node::new(keypair, comm.our_connection_info()?); - let (node, section) = + let (node, section, backlog) = bootstrap::infant(node, &comm, &mut incoming_msgs, bootstrap_addr).await?; let state = Approved::new(node, section, None, event_tx); state.send_event(Event::Connected(Connected::First)); - (state, comm, incoming_msgs) + (state, comm, incoming_msgs, backlog) }; let stage = Arc::new(Stage::new(state, comm)); let executor = Executor::new(stage.clone(), incoming_msgs).await; let event_stream = EventStream::new(event_rx); + // Process message backlog + for (message, sender) in backlog { + stage + .clone() + .handle_commands(Command::HandleMessage { + message, + sender: Some(sender), + }) + .await?; + } + let routing = Self { stage, _executor: executor, diff --git a/src/routing/stage.rs b/src/routing/stage.rs index d951620d15..aa6576df45 100644 --- a/src/routing/stage.rs +++ b/src/routing/stage.rs @@ -105,8 +105,7 @@ impl Stage { message_rx, } => { self.handle_relocate(bootstrap_addrs, details, message_rx) - .await?; - Ok(vec![]) + .await } } } @@ -155,11 +154,11 @@ impl Stage { bootstrap_addrs: Vec, details: SignedRelocateDetails, message_rx: mpsc::Receiver<(Message, SocketAddr)>, - ) -> Result<()> { + ) -> Result> { let node = self.state.lock().await.node().clone(); let previous_name = node.name(); - let (node, section) = + let (node, section, backlog) = bootstrap::relocate(node, &self.comm, message_rx, bootstrap_addrs, details).await?; let mut state = self.state.lock().await; @@ -168,6 +167,13 @@ impl Stage { state.send_event(Event::Connected(Connected::Relocate { previous_name })); - Ok(()) + let commands = backlog + .into_iter() + .map(|(message, sender)| Command::HandleMessage { + message, + sender: Some(sender), + }) + .collect(); + Ok(commands) } }