From fd016f32b0165884455bb5aef6c49b9c15e3048f Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Fri, 23 Sep 2022 16:38:44 +0200 Subject: [PATCH] trigger mempool sync on lag Co-authored-by: Stan Bondi --- base_layer/core/src/mempool/config.rs | 3 + .../src/mempool/sync_protocol/initializer.rs | 10 +-- .../core/src/mempool/sync_protocol/mod.rs | 72 +++++++++++++++++-- .../core/src/mempool/sync_protocol/test.rs | 12 +++- common/config/presets/c_base_node.toml | 2 + 5 files changed, 87 insertions(+), 12 deletions(-) diff --git a/base_layer/core/src/mempool/config.rs b/base_layer/core/src/mempool/config.rs index 25174096509..edf1fdef486 100644 --- a/base_layer/core/src/mempool/config.rs +++ b/base_layer/core/src/mempool/config.rs @@ -50,6 +50,8 @@ pub struct MempoolServiceConfig { pub initial_sync_num_peers: usize, /// The maximum number of transactions to sync in a single sync session Default: 10_000 pub initial_sync_max_transactions: usize, + /// The maximum number of blocks added via sync or re-org to triggering a sync + pub block_sync_trigger: usize, } impl Default for MempoolServiceConfig { @@ -57,6 +59,7 @@ impl Default for MempoolServiceConfig { Self { initial_sync_num_peers: 2, initial_sync_max_transactions: 10_000, + block_sync_trigger: 5, } } } diff --git a/base_layer/core/src/mempool/sync_protocol/initializer.rs b/base_layer/core/src/mempool/sync_protocol/initializer.rs index 74ec61af00c..e864a76a973 100644 --- a/base_layer/core/src/mempool/sync_protocol/initializer.rs +++ b/base_layer/core/src/mempool/sync_protocol/initializer.rs @@ -32,9 +32,9 @@ use tari_service_framework::{async_trait, ServiceInitializationError, ServiceIni use tokio::{sync::mpsc, time::sleep}; use crate::{ - base_node::StateMachineHandle, + base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle}, mempool::{ - sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL}, + sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL}, Mempool, MempoolServiceConfig, }, @@ -83,8 +83,7 @@ impl ServiceInitializer for MempoolSyncInitializer { log_mdc::extend(mdc.clone()); let state_machine = handles.expect_handle::(); let connectivity = handles.expect_handle::(); - // Ensure that we get an subscription ASAP so that we don't miss any connectivity events - let connectivity_event_subscription = connectivity.get_event_subscription(); + let base_node = handles.expect_handle::(); let mut status_watch = state_machine.get_status_info_watch(); if !status_watch.borrow().bootstrapped { @@ -103,8 +102,9 @@ impl ServiceInitializer for MempoolSyncInitializer { } log_mdc::extend(mdc.clone()); } + let base_node_events = base_node.get_block_event_stream(); - MempoolSyncProtocol::new(config, notif_rx, connectivity_event_subscription, mempool) + MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events) .run() .await; }); diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index 952f5622493..57d80f75d32 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -79,7 +79,7 @@ pub use initializer::MempoolSyncInitializer; use log::*; use prost::Message; use tari_comms::{ - connectivity::{ConnectivityEvent, ConnectivityEventRx}, + connectivity::{ConnectivityEvent, ConnectivityEventRx, ConnectivityRequester, ConnectivitySelection}, framing, framing::CanonicalFraming, message::MessageExt, @@ -97,6 +97,8 @@ use tokio::{ }; use crate::{ + base_node::comms_interface::{BlockEvent, BlockEventReceiver}, + chain_storage::BlockAddResult, mempool::{metrics, proto, Mempool, MempoolServiceConfig}, proto as shared_proto, transactions::transaction_components::Transaction, @@ -113,13 +115,19 @@ const LOG_TARGET: &str = "c::mempool::sync_protocol"; pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1"); +pub struct MempoolSyncStreams { + pub block_event_stream: BlockEventReceiver, + pub connectivity_events: ConnectivityEventRx, +} + pub struct MempoolSyncProtocol { config: MempoolServiceConfig, protocol_notifier: ProtocolNotificationRx, - connectivity_events: ConnectivityEventRx, mempool: Mempool, num_synched: Arc, permits: Arc, + connectivity: ConnectivityRequester, + block_event_stream: BlockEventReceiver } impl MempoolSyncProtocol @@ -128,25 +136,30 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static pub fn new( config: MempoolServiceConfig, protocol_notifier: ProtocolNotificationRx, - connectivity_events: ConnectivityEventRx, mempool: Mempool, + connectivity: ConnectivityRequester, + block_event_stream: BlockEventReceiver ) -> Self { Self { config, protocol_notifier, - connectivity_events, mempool, num_synched: Arc::new(AtomicUsize::new(0)), permits: Arc::new(Semaphore::new(1)), + connectivity,block_event_stream } } pub async fn run(mut self) { info!(target: LOG_TARGET, "Mempool protocol handler has started"); + let mut connectivity_events = self.connectivity.get_event_subscription(); loop { tokio::select! { - Ok(event) = self.connectivity_events.recv() => { + Ok(block_event) = self.block_event_stream.recv() => { + self.handle_block_event(&block_event).await; + }, + Ok(event) = connectivity_events.recv() => { self.handle_connectivity_event(event).await; }, @@ -174,6 +187,55 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static } } + async fn handle_block_event(&mut self, block_event: &BlockEvent) { + use BlockEvent::{BlockSyncComplete, ValidBlockAdded}; + if self.permits.available_permits() < 1 { + // Sync is already in progress, so we should not bother trying to sync. + return; + } + match block_event { + ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => { + if added.len() < self.config.block_sync_trigger { + return; + } + }, + BlockSyncComplete(tip, starting_sync_height) => { + let added = tip.height() - starting_sync_height; + if added < self.config.block_sync_trigger as u64 { + return; + } + }, + _ => { + return; + }, + } + // we need to make sure the service can start a sync + if self.num_synched.load(Ordering::Acquire) >= self.config.initial_sync_num_peers { + self.num_synched.fetch_sub(1, Ordering::Release); + } + let connection = match self + .connectivity + .select_connections(ConnectivitySelection::random_nodes(1, vec![])) + .await + { + Ok(mut v) => { + if v.is_empty() { + error!(target: LOG_TARGET, "Mempool sync could not get a peer to sync to"); + return; + }; + v.pop().unwrap() + }, + Err(e) => { + error!( + target: LOG_TARGET, + "Mempool sync could not get a peer to sync to: {}", e + ); + return; + }, + }; + self.spawn_initiator_protocol(connection).await; + } + fn is_synched(&self) -> bool { self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers } diff --git a/base_layer/core/src/mempool/sync_protocol/test.rs b/base_layer/core/src/mempool/sync_protocol/test.rs index 1189772ba86..29431f33c74 100644 --- a/base_layer/core/src/mempool/sync_protocol/test.rs +++ b/base_layer/core/src/mempool/sync_protocol/test.rs @@ -31,7 +31,10 @@ use tari_comms::{ message::MessageExt, peer_manager::PeerFeatures, protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx}, - test_utils::{mocks::create_peer_connection_mock_pair, node_identity::build_node_identity}, + test_utils::{ + mocks::{create_connectivity_mock, create_peer_connection_mock_pair}, + node_identity::build_node_identity, + }, Bytes, BytesMut, }; @@ -87,14 +90,19 @@ async fn setup( let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1); let (connectivity_events_tx, connectivity_events_rx) = broadcast::channel(10); let (mempool, transactions) = new_mempool_with_transactions(num_txns).await; + let (connectivity, _) = create_connectivity_mock(); + let (block_event_sender, _) = broadcast::channel(1); + let block_receiver = block_event_sender.subscribe(); + let protocol = MempoolSyncProtocol::new( Default::default(), protocol_notif_rx, connectivity_events_rx, mempool.clone(), + connectivity, ); - task::spawn(protocol.run()); + task::spawn(protocol.run(block_receiver)); (protocol_notif_tx, connectivity_events_tx, mempool, transactions) } diff --git a/common/config/presets/c_base_node.toml b/common/config/presets/c_base_node.toml index 8b72c4a9896..fc98a0d3c3e 100644 --- a/common/config/presets/c_base_node.toml +++ b/common/config/presets/c_base_node.toml @@ -110,6 +110,8 @@ track_reorgs = true #service.initial_sync_num_peers = 2 # The maximum number of transactions to sync in a single sync session Default: 10_000 #service.initial_sync_max_transactions = 10_000 +# The maximum number of blocks added via sync or re-org to triggering a sync +#block_sync_trigger = 5 [base_node.state_machine] # The initial max sync latency. If a peer fails to stream a header/block within this deadline another sync peer will be