From 30bf86bd89976b8411c64fcbf2993f32fc419272 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Fri, 25 Feb 2022 15:49:54 +0200 Subject: [PATCH] feat: add contacts status to tui (#3868) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Description --- Added the contacts liveness status to the console wallet's TUI using the recently merged contacts liveness service feature. ``` Alias Public Key Emoji ID Last Seen Status node01 bn cca97dacf0667b8a94fa0e17d7449cad23b7ad59940733dca488c0eb2943ed1e 📡👾🐢..🚁🍍🌂 NeverSeen node02 has long alias nam fa9a9b87a5d575737d8559524c62c5cccb1cc8732299560a16d4f060d7095930 🚪👖👗..🎸🍬🌞 02-24T15:49 Online node02 bn fc8488da9a83d7c851166fc0b18986155958856658d20a9819061df12ef83400 🚲🐮🐳..🍳🌀📚 NeverSeen node03 d258580f20f2d2b712905b8867529b02a376dc066f2c7028d67926f8bbdc1b7a 🔎🎷🎷..🍊🐜🍈 02-24T15:49 Online nodetest d60429291ef2d6a8698a1789794cbf7ce31a138d0a7bae00d857df616407cd26 🔦🌋🍟..📣🍗🏦 02-22T22:54 Offline │``` Motivation and Context --- Experimental. How Has This Been Tested? --- System-level tests. --- .../tari_console_wallet/src/init/mod.rs | 1 + .../src/ui/components/contacts_tab.rs | 26 +++- .../src/ui/components/send_tab.rs | 36 +----- .../src/ui/state/app_state.rs | 31 +++-- .../src/ui/state/wallet_event_monitor.rs | 33 ++++- .../tari_console_wallet/src/ui/ui_contact.rs | 17 +++ .../src/utils/formatting.rs | 6 +- base_layer/wallet/src/config.rs | 3 + .../wallet/src/contacts_service/handle.rs | 63 +++++++-- base_layer/wallet/src/contacts_service/mod.rs | 19 ++- .../wallet/src/contacts_service/service.rs | 121 ++++++++++++++---- base_layer/wallet/src/wallet.rs | 6 +- base_layer/wallet/tests/contacts_service.rs | 2 +- base_layer/wallet/tests/wallet.rs | 34 +++-- base_layer/wallet_ffi/src/lib.rs | 1 + common/src/configuration/global.rs | 10 ++ common/src/configuration/utils.rs | 3 + 17 files changed, 298 insertions(+), 114 deletions(-) diff --git a/applications/tari_console_wallet/src/init/mod.rs b/applications/tari_console_wallet/src/init/mod.rs index 9e32843b1e..28aed7b95a 100644 --- a/applications/tari_console_wallet/src/init/mod.rs +++ b/applications/tari_console_wallet/src/init/mod.rs @@ -454,6 +454,7 @@ pub async fn init_wallet( Some(updater_config), config.autoupdate_check_interval, Some(Duration::from_secs(config.contacts_auto_ping_interval)), + Some(config.contacts_online_ping_window), ); let mut wallet = Wallet::start( diff --git a/applications/tari_console_wallet/src/ui/components/contacts_tab.rs b/applications/tari_console_wallet/src/ui/components/contacts_tab.rs index 169388104c..298f966fda 100644 --- a/applications/tari_console_wallet/src/ui/components/contacts_tab.rs +++ b/applications/tari_console_wallet/src/ui/components/contacts_tab.rs @@ -14,6 +14,7 @@ use crate::{ components::{Component, KeyHandled}, state::AppState, widgets::{centered_rect_absolute, draw_dialog, MultiColumnList, WindowedListState}, + UiContact, MAX_WIDTH, }, utils::formatting::display_compressed_string, @@ -76,28 +77,43 @@ impl ContactsTab { let window = self.contacts_list_state.get_start_end(); let windowed_view = app_state.get_contacts_slice(window.0, window.1); + let column_list = ContactsTab::create_column_view(windowed_view); + column_list.render(f, list_areas[1], &mut list_state); + } + + // Helper function to create the column list to be rendered + pub fn create_column_view(windowed_view: &[UiContact]) -> MultiColumnList> { let mut column0_items = Vec::new(); let mut column1_items = Vec::new(); let mut column2_items = Vec::new(); + let mut column3_items = Vec::new(); + let mut column4_items = Vec::new(); for c in windowed_view.iter() { column0_items.push(ListItem::new(Span::raw(c.alias.clone()))); - column1_items.push(ListItem::new(Span::raw(c.public_key.to_string()))); + column1_items.push(ListItem::new(Span::raw(c.public_key.clone()))); column2_items.push(ListItem::new(Span::raw(display_compressed_string( c.emoji_id.clone(), 3, 3, )))); + column3_items.push(ListItem::new(Span::raw(c.last_seen.clone()))); + column4_items.push(ListItem::new(Span::raw(c.online_status.clone()))); } let column_list = MultiColumnList::new() .highlight_style(Style::default().add_modifier(Modifier::BOLD).fg(Color::Magenta)) .heading_style(Style::default().fg(Color::Magenta)) .max_width(MAX_WIDTH) .add_column(Some("Alias"), Some(25), column0_items) - .add_column(None, Some(2), Vec::new()) + .add_column(None, Some(1), Vec::new()) .add_column(Some("Public Key"), Some(64), column1_items) - .add_column(None, Some(2), Vec::new()) - .add_column(Some("Emoji ID"), None, column2_items); - column_list.render(f, list_areas[1], &mut list_state); + .add_column(None, Some(1), Vec::new()) + .add_column(Some("Emoji ID"), Some(14), column2_items) + .add_column(None, Some(1), Vec::new()) + .add_column(Some("Last Seen"), Some(11), column3_items) + .add_column(None, Some(1), Vec::new()) + .add_column(Some("Status"), Some(10), column4_items); + + column_list } fn draw_edit_contact(&mut self, f: &mut Frame, area: Rect, _app_state: &AppState) diff --git a/applications/tari_console_wallet/src/ui/components/send_tab.rs b/applications/tari_console_wallet/src/ui/components/send_tab.rs index 614c0fb619..358141d0f2 100644 --- a/applications/tari_console_wallet/src/ui/components/send_tab.rs +++ b/applications/tari_console_wallet/src/ui/components/send_tab.rs @@ -7,19 +7,15 @@ use tui::{ layout::{Constraint, Direction, Layout, Rect}, style::{Color, Modifier, Style}, text::{Span, Spans}, - widgets::{Block, Borders, ListItem, Paragraph, Row, Table, TableState, Wrap}, + widgets::{Block, Borders, Paragraph, Row, Table, TableState, Wrap}, Frame, }; use unicode_width::UnicodeWidthStr; -use crate::{ - ui::{ - components::{balance::Balance, styles, Component, KeyHandled}, - state::{AppState, UiTransactionSendStatus}, - widgets::{draw_dialog, MultiColumnList, WindowedListState}, - MAX_WIDTH, - }, - utils::formatting::display_compressed_string, +use crate::ui::{ + components::{balance::Balance, contacts_tab::ContactsTab, styles, Component, KeyHandled}, + state::{AppState, UiTransactionSendStatus}, + widgets::{draw_dialog, WindowedListState}, }; pub struct SendTab { @@ -217,27 +213,7 @@ impl SendTab { let window = self.contacts_list_state.get_start_end(); let windowed_view = app_state.get_contacts_slice(window.0, window.1); - let mut column0_items = Vec::new(); - let mut column1_items = Vec::new(); - let mut column2_items = Vec::new(); - for c in windowed_view.iter() { - column0_items.push(ListItem::new(Span::raw(c.alias.clone()))); - column1_items.push(ListItem::new(Span::raw(c.public_key.to_string()))); - column2_items.push(ListItem::new(Span::raw(display_compressed_string( - c.emoji_id.clone(), - 3, - 3, - )))); - } - let column_list = MultiColumnList::new() - .highlight_style(Style::default().add_modifier(Modifier::BOLD).fg(Color::Magenta)) - .heading_style(Style::default().fg(Color::Magenta)) - .max_width(MAX_WIDTH) - .add_column(Some("Alias"), Some(25), column0_items) - .add_column(None, Some(2), Vec::new()) - .add_column(Some("Public Key"), Some(64), column1_items) - .add_column(None, Some(2), Vec::new()) - .add_column(Some("Emoji ID"), None, column2_items); + let column_list = ContactsTab::create_column_view(windowed_view); column_list.render(f, list_areas[1], &mut list_state); } diff --git a/applications/tari_console_wallet/src/ui/state/app_state.rs b/applications/tari_console_wallet/src/ui/state/app_state.rs index 67714012fb..58e517cfc0 100644 --- a/applications/tari_console_wallet/src/ui/state/app_state.rs +++ b/applications/tari_console_wallet/src/ui/state/app_state.rs @@ -54,7 +54,7 @@ use tari_wallet::{ assets::Asset, base_node_service::{handle::BaseNodeEventReceiver, service::BaseNodeState}, connectivity_service::{OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface}, - contacts_service::storage::database::Contact, + contacts_service::{handle::ContactsLivenessEvent, storage::database::Contact}, output_manager_service::{handle::OutputManagerEventReceiver, service::Balance}, tokens::Token, transaction_service::{ @@ -64,7 +64,7 @@ use tari_wallet::{ WalletSqlite, }; use tokio::{ - sync::{watch, RwLock}, + sync::{broadcast, watch, RwLock}, task, }; @@ -735,22 +735,25 @@ impl AppStateInner { } pub async fn refresh_contacts_state(&mut self) -> Result<(), UiError> { - let mut contacts: Vec = self - .wallet - .contacts_service - .get_contacts() - .await? - .iter() - .map(|c| UiContact::from(c.clone())) - .collect(); + let db_contacts = self.wallet.contacts_service.get_contacts().await?; + let mut ui_contacts: Vec = vec![]; + for contact in db_contacts { + // A contact's online status is a function of current time and can therefore not be stored in a database + let online_status = self + .wallet + .contacts_service + .get_contact_online_status(contact.last_seen) + .await?; + ui_contacts.push(UiContact::from(contact.clone()).with_online_status(format!("{}", online_status))); + } - contacts.sort_by(|a, b| { + ui_contacts.sort_by(|a, b| { a.alias .partial_cmp(&b.alias) .expect("Should be able to compare contact aliases") }); - self.data.contacts = contacts; + self.data.contacts = ui_contacts; self.updated = true; Ok(()) } @@ -821,6 +824,10 @@ impl AppStateInner { self.wallet.transaction_service.get_event_stream() } + pub fn get_contacts_liveness_event_stream(&self) -> broadcast::Receiver> { + self.wallet.contacts_service.get_contacts_liveness_event_stream() + } + pub fn get_output_manager_service_event_stream(&self) -> OutputManagerEventReceiver { self.wallet.output_manager_service.get_event_stream() } diff --git a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs index d01aadd520..c66cccc079 100644 --- a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; +use std::{ops::Deref, sync::Arc}; use log::*; use tari_common_types::transaction::TxId; @@ -28,6 +28,7 @@ use tari_comms::{connectivity::ConnectivityEvent, peer_manager::Peer}; use tari_wallet::{ base_node_service::{handle::BaseNodeEvent, service::BaseNodeState}, connectivity_service::WalletConnectivityInterface, + contacts_service::handle::ContactsLivenessEvent, output_manager_service::handle::OutputManagerEvent, transaction_service::handle::TransactionEvent, }; @@ -80,6 +81,8 @@ impl WalletEventMonitor { .new_update_notifier() .clone(); + let mut contacts_liveness_events = self.app_state_inner.read().await.get_contacts_liveness_event_stream(); + info!(target: LOG_TARGET, "Wallet Event Monitor starting"); loop { tokio::select! { @@ -213,6 +216,26 @@ impl WalletEventMonitor { Err(broadcast::error::RecvError::Closed) => {} } }, + event = contacts_liveness_events.recv() => { + match event { + Ok(liveness_event) => { + match liveness_event.deref() { + ContactsLivenessEvent::StatusUpdated(data) => { + trace!(target: LOG_TARGET, + "Contacts Liveness Service Callback Handler event 'StatusUpdated': {}", + data.clone(), + ); + self.trigger_contacts_refresh().await; + } + ContactsLivenessEvent::NetworkSilence => {} + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!(target: LOG_TARGET, "Missed {} from Output Manager Service events", n); + } + Err(broadcast::error::RecvError::Closed) => {} + } + } _ = shutdown_signal.wait() => { info!(target: LOG_TARGET, "Wallet Event Monitor shutting down because the shutdown signal was received"); break; @@ -293,4 +316,12 @@ impl WalletEventMonitor { let mut inner = self.app_state_inner.write().await; inner.add_notification(notification); } + + async fn trigger_contacts_refresh(&mut self) { + let mut inner = self.app_state_inner.write().await; + + if let Err(e) = inner.refresh_contacts_state().await { + warn!(target: LOG_TARGET, "Error refresh contacts state: {}", e); + } + } } diff --git a/applications/tari_console_wallet/src/ui/ui_contact.rs b/applications/tari_console_wallet/src/ui/ui_contact.rs index 49f83e8284..17ff1afeab 100644 --- a/applications/tari_console_wallet/src/ui/ui_contact.rs +++ b/applications/tari_console_wallet/src/ui/ui_contact.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Local}; use tari_common_types::emoji::EmojiId; use tari_wallet::contacts_service::storage::database::Contact; @@ -6,6 +7,15 @@ pub struct UiContact { pub alias: String, pub public_key: String, pub emoji_id: String, + pub last_seen: String, + pub online_status: String, +} + +impl UiContact { + pub fn with_online_status(mut self, online_status: String) -> Self { + self.online_status = online_status; + self + } } impl From for UiContact { @@ -14,6 +24,13 @@ impl From for UiContact { alias: c.alias, public_key: c.public_key.to_string(), emoji_id: EmojiId::from_pubkey(&c.public_key).as_str().to_string(), + last_seen: match c.last_seen { + Some(val) => DateTime::::from_utc(val, Local::now().offset().to_owned()) + .format("%m-%dT%H:%M") + .to_string(), + None => "".to_string(), + }, + online_status: "".to_string(), } } } diff --git a/applications/tari_console_wallet/src/utils/formatting.rs b/applications/tari_console_wallet/src/utils/formatting.rs index 4504804096..c06266c103 100644 --- a/applications/tari_console_wallet/src/utils/formatting.rs +++ b/applications/tari_console_wallet/src/utils/formatting.rs @@ -35,7 +35,7 @@ pub fn display_compressed_string(string: String, len_first: usize, len_last: usi for i in graphemes.iter().take(len_first) { result.push_str(i); } - result.push_str("..."); + result.push_str(".."); for i in graphemes.iter().skip(graphemes.len() - len_last) { result.push_str(i); } @@ -59,11 +59,11 @@ mod test { let short_str = "testing".to_string(); assert_eq!(display_compressed_string(short_str.clone(), 5, 5), short_str); let long_str = "abcdefghijklmnopqrstuvwxyz".to_string(); - assert_eq!(display_compressed_string(long_str, 3, 3), "abc...xyz".to_string()); + assert_eq!(display_compressed_string(long_str, 3, 3), "abc..xyz".to_string()); let emoji_str = "🐾💎🎤🎨📌🍄🎰🍉🚧💉💡👟🚒📌🔌🐶🐾🐢🔭🐨😻💨🐎🐊🚢👟🚧🐞🚜🌂🎩🎱📈".to_string(); assert_eq!( display_compressed_string(emoji_str, 3, 6), - "🐾💎🎤...🐞🚜🌂🎩🎱📈".to_string() + "🐾💎🎤..🐞🚜🌂🎩🎱📈".to_string() ); } } diff --git a/base_layer/wallet/src/config.rs b/base_layer/wallet/src/config.rs index 37974e073f..60cb293c4a 100644 --- a/base_layer/wallet/src/config.rs +++ b/base_layer/wallet/src/config.rs @@ -46,6 +46,7 @@ pub struct WalletConfig { pub updater_config: Option, pub autoupdate_check_interval: Option, pub contacts_auto_ping_interval: Duration, + pub contacts_online_ping_window: usize, } impl WalletConfig { @@ -61,6 +62,7 @@ impl WalletConfig { updater_config: Option, autoupdate_check_interval: Option, contacts_auto_ping_interval: Option, + contacts_online_ping_window: Option, ) -> Self { Self { comms_config, @@ -74,6 +76,7 @@ impl WalletConfig { updater_config, autoupdate_check_interval, contacts_auto_ping_interval: contacts_auto_ping_interval.unwrap_or_else(|| Duration::from_secs(20)), + contacts_online_ping_window: contacts_online_ping_window.unwrap_or(2), } } } diff --git a/base_layer/wallet/src/contacts_service/handle.rs b/base_layer/wallet/src/contacts_service/handle.rs index 1f34c54c5e..060dc7acf4 100644 --- a/base_layer/wallet/src/contacts_service/handle.rs +++ b/base_layer/wallet/src/contacts_service/handle.rs @@ -25,21 +25,26 @@ use std::{ sync::Arc, }; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Local, NaiveDateTime}; use tari_comms::{peer_manager::NodeId, types::CommsPublicKey}; use tari_service_framework::reply_channel::SenderService; use tokio::sync::broadcast; use tower::Service; -use crate::contacts_service::{error::ContactsServiceError, service::ContactMessageType, storage::database::Contact}; +use crate::contacts_service::{ + error::ContactsServiceError, + service::{ContactMessageType, ContactOnlineStatus}, + storage::database::Contact, +}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct ContactsLivenessData { public_key: CommsPublicKey, node_id: NodeId, latency: Option, - last_seen: DateTime, + last_seen: Option, message_type: ContactMessageType, + online_status: ContactOnlineStatus, } impl ContactsLivenessData { @@ -47,8 +52,9 @@ impl ContactsLivenessData { public_key: CommsPublicKey, node_id: NodeId, latency: Option, - last_seen: DateTime, + last_seen: Option, message_type: ContactMessageType, + online_status: ContactOnlineStatus, ) -> Self { Self { public_key, @@ -56,6 +62,7 @@ impl ContactsLivenessData { latency, last_seen, message_type, + online_status, } } @@ -71,34 +78,47 @@ impl ContactsLivenessData { self.latency } - pub fn last_ping_pong_received(&self) -> DateTime { + pub fn last_ping_pong_received(&self) -> Option { self.last_seen } - pub fn time_since_last_status_update(&self) -> Duration { - Utc::now() - self.last_seen - } - pub fn message_type(&self) -> ContactMessageType { self.message_type.clone() } + + pub fn online_status(&self) -> ContactOnlineStatus { + self.online_status.clone() + } + + pub fn set_offline(&mut self) { + self.online_status = ContactOnlineStatus::Offline + } } impl Display for ContactsLivenessData { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { writeln!( f, - "Node ID {} with latency {:?} last seen {}s ago", + "Liveness event '{}' for contact {} ({}) {}", + self.message_type, + self.public_key, self.node_id, - self.latency, - self.time_since_last_status_update().num_seconds() + if let Some(time) = self.last_seen { + let local_time = DateTime::::from_utc(time, Local::now().offset().to_owned()) + .format("%F %T") + .to_string(); + format!("last seen {} is '{}'", local_time, self.online_status) + } else { + " - contact was never seen".to_string() + } ) } } #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum ContactsLivenessEvent { - StatusUpdated(Vec), + StatusUpdated(Box), NetworkSilence, } @@ -108,6 +128,7 @@ pub enum ContactsServiceRequest { UpsertContact(Contact), RemoveContact(CommsPublicKey), GetContacts, + GetContactOnlineStatus(Option), } #[derive(Debug)] @@ -116,6 +137,7 @@ pub enum ContactsServiceResponse { ContactRemoved(Contact), Contact(Contact), Contacts(Vec), + OnlineStatus(ContactOnlineStatus), } #[derive(Clone)] @@ -186,4 +208,19 @@ impl ContactsServiceHandle { pub fn get_contacts_liveness_event_stream(&self) -> broadcast::Receiver> { self.liveness_events.subscribe() } + + /// Determines the contact's online status based on their last seen time + pub async fn get_contact_online_status( + &mut self, + last_seen: Option, + ) -> Result { + match self + .request_response_service + .call(ContactsServiceRequest::GetContactOnlineStatus(last_seen)) + .await?? + { + ContactsServiceResponse::OnlineStatus(status) => Ok(status), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/wallet/src/contacts_service/mod.rs b/base_layer/wallet/src/contacts_service/mod.rs index 0c6b4d2a0e..821684ad69 100644 --- a/base_layer/wallet/src/contacts_service/mod.rs +++ b/base_layer/wallet/src/contacts_service/mod.rs @@ -25,6 +25,8 @@ pub mod handle; pub mod service; pub mod storage; +use std::time::Duration; + use futures::future; use log::*; use tari_comms::connectivity::ConnectivityRequester; @@ -50,13 +52,19 @@ pub struct ContactsServiceInitializer where T: ContactsBackend { backend: Option, + contacts_auto_ping_interval: Duration, + contacts_online_ping_window: usize, } impl ContactsServiceInitializer where T: ContactsBackend { - pub fn new(backend: T) -> Self { - Self { backend: Some(backend) } + pub fn new(backend: T, contacts_auto_ping_interval: Duration, online_ping_window: usize) -> Self { + Self { + backend: Some(backend), + contacts_auto_ping_interval, + contacts_online_ping_window: online_ping_window, + } } } @@ -66,8 +74,7 @@ where T: ContactsBackend + 'static { async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> { let (sender, receiver) = reply_channel::unbounded(); - // Buffer size set to 1 because only the most recent metadata is applicable - let (publisher, _) = broadcast::channel(1); + let (publisher, _) = broadcast::channel(250); let contacts_handle = ContactsServiceHandle::new(sender, publisher.clone()); @@ -81,6 +88,8 @@ where T: ContactsBackend + 'static let shutdown_signal = context.get_shutdown_signal(); + let contacts_auto_ping_interval = self.contacts_auto_ping_interval; + let contacts_online_ping_window = self.contacts_online_ping_window; context.spawn_when_ready(move |handles| async move { let liveness = handles.expect_handle::(); let connectivity = handles.expect_handle::(); @@ -92,6 +101,8 @@ where T: ContactsBackend + 'static liveness, connectivity, publisher, + contacts_auto_ping_interval, + contacts_online_ping_window, ) .start(); futures::pin_mut!(service); diff --git a/base_layer/wallet/src/contacts_service/service.rs b/base_layer/wallet/src/contacts_service/service.rs index 4b32223974..c19130935e 100644 --- a/base_layer/wallet/src/contacts_service/service.rs +++ b/base_layer/wallet/src/contacts_service/service.rs @@ -20,9 +20,14 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; +use std::{ + fmt::{Display, Error, Formatter}, + ops::Sub, + sync::Arc, + time::Duration, +}; -use chrono::Utc; +use chrono::{NaiveDateTime, Utc}; use futures::{pin_mut, StreamExt}; use log::*; use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester}; @@ -44,6 +49,34 @@ const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3; pub enum ContactMessageType { Ping, Pong, + NoMessage, +} + +impl Display for ContactMessageType { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + ContactMessageType::Ping => write!(f, "Ping"), + ContactMessageType::Pong => write!(f, "Pong"), + ContactMessageType::NoMessage => write!(f, "NoMessage"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ContactOnlineStatus { + Online, + Offline, + NeverSeen, +} + +impl Display for ContactOnlineStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + ContactOnlineStatus::Online => write!(f, "Online"), + ContactOnlineStatus::Offline => write!(f, "Offline"), + ContactOnlineStatus::NeverSeen => write!(f, "NeverSeen"), + } + } } pub struct ContactsService @@ -58,6 +91,8 @@ where T: ContactsBackend + 'static connectivity: ConnectivityRequester, event_publisher: broadcast::Sender>, number_of_rounds_no_pings: u16, + contacts_auto_ping_interval: Duration, + contacts_online_ping_window: usize, } impl ContactsService @@ -73,6 +108,8 @@ where T: ContactsBackend + 'static liveness: LivenessHandle, connectivity: ConnectivityRequester, event_publisher: broadcast::Sender>, + contacts_auto_ping_interval: Duration, + contacts_online_ping_window: usize, ) -> Self { Self { db, @@ -83,6 +120,8 @@ where T: ContactsBackend + 'static connectivity, event_publisher, number_of_rounds_no_pings: 0, + contacts_auto_ping_interval, + contacts_online_ping_window, } } @@ -187,6 +226,10 @@ where T: ContactsBackend + 'static } Ok(result.map(ContactsServiceResponse::Contacts)?) }, + ContactsServiceRequest::GetContactOnlineStatus(last_seen) => { + let result = self.get_online_status(last_seen); + Ok(result.map(ContactsServiceResponse::OnlineStatus)?) + }, } } @@ -209,20 +252,10 @@ where T: ContactsBackend + 'static match event { // Received a ping, check if it contains ContactsLiveness LivenessEvent::ReceivedPing(event) => { - trace!( - target: LOG_TARGET, - "Received contact liveness ping from neighbouring node '{}'.", - event.node_id - ); self.update_with_ping_pong(event, ContactMessageType::Ping).await?; }, // Received a pong, check if our neighbour sent it and it contains ContactsLiveness LivenessEvent::ReceivedPong(event) => { - trace!( - target: LOG_TARGET, - "Received contact liveness pong from neighbouring node '{}'.", - event.node_id - ); self.update_with_ping_pong(event, ContactMessageType::Pong).await?; }, // New ping round has begun @@ -240,12 +273,54 @@ where T: ContactsBackend + 'static } } self.resize_contacts_liveness_data_buffer(*num_peers); + + // Update offline status + if let Ok(contacts) = self.db.get_contacts().await { + for contact in contacts { + let online_status = self.get_online_status(contact.last_seen)?; + if online_status == ContactOnlineStatus::Online { + continue; + } + let data = ContactsLivenessData::new( + contact.public_key.clone(), + contact.node_id.clone(), + contact.latency, + contact.last_seen, + ContactMessageType::NoMessage, + online_status, + ); + // Send only fails if there are no subscribers. + let _ = self + .event_publisher + .send(Arc::new(ContactsLivenessEvent::StatusUpdated(Box::new(data.clone())))); + trace!(target: LOG_TARGET, "{}", data); + } + }; }, } Ok(()) } + fn get_online_status(&self, last_seen: Option) -> Result { + let mut online_status = ContactOnlineStatus::NeverSeen; + if let Some(time) = last_seen { + if self.is_online(time) { + online_status = ContactOnlineStatus::Online; + } else { + online_status = ContactOnlineStatus::Offline; + } + } + Ok(online_status) + } + + fn is_online(&self, last_seen: NaiveDateTime) -> bool { + Utc::now().naive_utc().sub(last_seen) <= + chrono::Duration::seconds( + (self.contacts_online_ping_window as u64 * self.contacts_auto_ping_interval.as_secs()) as i64, + ) + } + async fn update_with_ping_pong( &mut self, event: &PingPongEvent, @@ -277,23 +352,17 @@ where T: ContactsBackend + 'static this_public_key, event.node_id.clone(), latency, - last_seen, + Some(last_seen.naive_utc()), message_type.clone(), + ContactOnlineStatus::Online, ); - self.liveness_data.push(data); + self.liveness_data.push(data.clone()); - // send only fails if there are no subscribers. - let _ = self.event_publisher.send(Arc::new(ContactsLivenessEvent::StatusUpdated( - self.liveness_data.clone(), - ))); - trace!( - target: LOG_TARGET, - "{:?} from {} last seen at {} with latency {:?} ms", - message_type, - event.node_id, - last_seen, - latency - ); + // Send only fails if there are no subscribers. + let _ = self + .event_publisher + .send(Arc::new(ContactsLivenessEvent::StatusUpdated(Box::new(data.clone())))); + trace!(target: LOG_TARGET, "{}", data); } else { trace!( target: LOG_TARGET, diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 2a90c12b4e..4ef1b99e7e 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -184,7 +184,11 @@ where }, peer_message_subscription_factory, )) - .add_initializer(ContactsServiceInitializer::new(contacts_backend)) + .add_initializer(ContactsServiceInitializer::new( + contacts_backend, + config.contacts_auto_ping_interval, + config.contacts_online_ping_window, + )) .add_initializer(BaseNodeServiceInitializer::new( config.base_node_service_config.clone(), wallet_database.clone(), diff --git a/base_layer/wallet/tests/contacts_service.rs b/base_layer/wallet/tests/contacts_service.rs index 56fe42baa2..9f4813cbd5 100644 --- a/base_layer/wallet/tests/contacts_service.rs +++ b/base_layer/wallet/tests/contacts_service.rs @@ -127,7 +127,7 @@ pub fn setup_contacts_service( }, peer_message_subscription_factory, )) - .add_initializer(ContactsServiceInitializer::new(backend)) + .add_initializer(ContactsServiceInitializer::new(backend, Duration::from_secs(5), 2)) .build(); let handles = runtime.block_on(fut).expect("Service initialization failed"); diff --git a/base_layer/wallet/tests/wallet.rs b/base_layer/wallet/tests/wallet.rs index 5b7b752262..fc920e3404 100644 --- a/base_layer/wallet/tests/wallet.rs +++ b/base_layer/wallet/tests/wallet.rs @@ -189,6 +189,7 @@ async fn create_wallet( None, None, Some(Duration::from_secs(1)), + None, ); let metadata = ChainMetadata::new(std::i64::MAX as u64, Vec::new(), 0, 0, 0); @@ -740,6 +741,7 @@ async fn test_import_utxo() { None, None, None, + None, ); let mut alice_wallet = Wallet::start( @@ -921,15 +923,13 @@ fn test_contacts_service_liveness() { loop { tokio::select! { event = liveness_event_stream_alice.recv() => { - if let ContactsLivenessEvent::StatusUpdated(data_vec) = &*event.unwrap() { - if let Some(data) = data_vec.first() { - if data.public_key() == &bob_identity.public_key().clone(){ - assert_eq!(data.node_id(), &bob_identity.node_id().clone()); - if data.message_type() == ContactMessageType::Ping { - ping_count += 1; - } else if data.message_type() == ContactMessageType::Pong { - pong_count += 1; - } + if let ContactsLivenessEvent::StatusUpdated(data) = &*event.unwrap() { + if data.public_key() == &bob_identity.public_key().clone(){ + assert_eq!(data.node_id(), &bob_identity.node_id().clone()); + if data.message_type() == ContactMessageType::Ping { + ping_count += 1; + } else if data.message_type() == ContactMessageType::Pong { + pong_count += 1; } } if ping_count > 1 && pong_count > 1 { @@ -955,15 +955,13 @@ fn test_contacts_service_liveness() { loop { tokio::select! { event = liveness_event_stream_bob.recv() => { - if let ContactsLivenessEvent::StatusUpdated(data_vec) = &*event.unwrap() { - if let Some(data) = data_vec.first() { - if data.public_key() == &alice_identity.public_key().clone(){ - assert_eq!(data.node_id(), &alice_identity.node_id().clone()); - if data.message_type() == ContactMessageType::Ping { - ping_count += 1; - } else if data.message_type() == ContactMessageType::Pong { - pong_count += 1; - } + if let ContactsLivenessEvent::StatusUpdated(data) = &*event.unwrap() { + if data.public_key() == &alice_identity.public_key().clone(){ + assert_eq!(data.node_id(), &alice_identity.node_id().clone()); + if data.message_type() == ContactMessageType::Ping { + ping_count += 1; + } else if data.message_type() == ContactMessageType::Pong { + pong_count += 1; } } if ping_count > 1 && pong_count > 1 { diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 64e50127de..f0d37b1588 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -3473,6 +3473,7 @@ pub unsafe extern "C" fn wallet_create( None, None, None, + None, ); let mut recovery_lookup = match runtime.block_on(wallet_database.get_client_key_value(RECOVERY_KEY.to_owned())) { diff --git a/common/src/configuration/global.rs b/common/src/configuration/global.rs index c298429e4b..9da8fdc7ea 100644 --- a/common/src/configuration/global.rs +++ b/common/src/configuration/global.rs @@ -97,6 +97,7 @@ pub struct GlobalConfig { pub console_wallet_peer_db_path: PathBuf, pub console_wallet_use_libtor: bool, pub contacts_auto_ping_interval: u64, + pub contacts_online_ping_window: usize, pub core_threads: Option, pub data_dir: PathBuf, pub db_config: LMDBConfig, @@ -470,6 +471,14 @@ fn convert_node_config( Err(e) => return Err(ConfigurationError::new(&key, None, &e.to_string())), }; + // Liveness last seen within multiple of ping interval to be considered 'online' + let key = config_string("wallet", net_str, "contacts_online_ping_window"); + let contacts_online_ping_window = match cfg.get_int(&key) { + Ok(window) => window as usize, + Err(ConfigError::NotFound(_)) => 2, + Err(e) => return Err(ConfigurationError::new(&key, None, &e.to_string())), + }; + // blocks_behind_before_considered_lagging when a node should switch over from listening to lagging let key = config_string("base_node", net_str, "blocks_behind_before_considered_lagging"); let blocks_behind_before_considered_lagging = optional(cfg.get_int(&key))?.unwrap_or(2) as u64; @@ -849,6 +858,7 @@ fn convert_node_config( console_wallet_peer_db_path, console_wallet_use_libtor, contacts_auto_ping_interval, + contacts_online_ping_window, core_threads, data_dir, db_config, diff --git a/common/src/configuration/utils.rs b/common/src/configuration/utils.rs index 5dd6758504..3435590243 100644 --- a/common/src/configuration/utils.rs +++ b/common/src/configuration/utils.rs @@ -249,6 +249,9 @@ fn set_common_network_defaults(cfg: &mut Config) { let key = format!("wallet.{}.contacts_auto_ping_interval", network); cfg.set_default(&key, 20).unwrap(); + let key = format!("wallet.{}.contacts_online_ping_window", network); + cfg.set_default(&key, 2).unwrap(); + let key = format!("common.{}.peer_seeds", network); cfg.set_default(&key, Vec::::new()).unwrap();