Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(peer_db)!: more accurate peer stats per address #5142

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 175 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions applications/tari_app_grpc/proto/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ message Peer{
/// Features supported by the peer
uint64 features = 9;
/// Connection statics for the peer
google.protobuf.Timestamp last_connected_at = 10; /// Protocols supported by the peer. This should not be considered a definitive list of supported protocols and is
// google.protobuf.Timestamp last_connected_at = 10; /// Protocols supported by the peer. This should not be considered a definitive list of supported protocols and is
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

/// used as information for more efficient protocol negotiation.
repeated bytes supported_protocols = 11;
/// User agent advertised by the peer
Expand Down Expand Up @@ -77,7 +77,6 @@ message Address{
bytes address =1;
string last_seen = 2;
uint32 connection_attempts = 3;
uint32 rejected_message_count = 4;
uint64 avg_latency = 5;
}

Expand Down
23 changes: 8 additions & 15 deletions applications/tari_app_grpc/src/conversions/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_comms::{connectivity::ConnectivityStatus, net_address::MutliaddrWithStats, peer_manager::Peer};
use tari_comms::{connectivity::ConnectivityStatus, net_address::MultiaddrWithStats, peer_manager::Peer};
use tari_utilities::ByteArray;

use crate::{conversions::naive_datetime_to_timestamp, tari_rpc as grpc};
Expand All @@ -31,21 +31,17 @@ impl From<Peer> for grpc::Peer {
fn from(peer: Peer) -> Self {
let public_key = peer.public_key.to_vec();
let node_id = peer.node_id.to_vec();
let mut addresses = Vec::with_capacity(peer.addresses.addresses.len());
let last_connection = peer
.addresses
.last_seen()
.map(|v| naive_datetime_to_timestamp(v.naive_utc()));
for address in peer.addresses.addresses {
let mut addresses = Vec::with_capacity(peer.addresses.len());
let last_connection = peer.addresses.last_seen().map(naive_datetime_to_timestamp);
for address in peer.addresses.addresses() {
addresses.push(address.clone().into())
}
let flags = u32::from(peer.flags.bits());
let banned_until = peer.banned_until.map(naive_datetime_to_timestamp);
let banned_reason = peer.banned_reason.to_string();
let offline_at = peer.offline_at.map(naive_datetime_to_timestamp);
let offline_at = peer.offline_at().map(naive_datetime_to_timestamp);
let features = peer.features.bits();

let last_connected_at = peer.connection_stats.last_connected_at.map(naive_datetime_to_timestamp);
let supported_protocols = peer.supported_protocols.into_iter().map(|p| p.to_vec()).collect();
let user_agent = peer.user_agent;
Self {
Expand All @@ -58,28 +54,25 @@ impl From<Peer> for grpc::Peer {
banned_reason,
offline_at,
features,
last_connected_at,
supported_protocols,
user_agent,
}
}
}

impl From<MutliaddrWithStats> for grpc::Address {
fn from(address_with_stats: MutliaddrWithStats) -> Self {
let address = address_with_stats.address.to_vec();
impl From<MultiaddrWithStats> for grpc::Address {
fn from(address_with_stats: MultiaddrWithStats) -> Self {
let address = address_with_stats.address().to_vec();
let last_seen = match address_with_stats.last_seen {
Some(v) => v.to_string(),
None => String::new(),
};
let connection_attempts = address_with_stats.connection_attempts;
let rejected_message_count = address_with_stats.rejected_message_count;
let avg_latency = address_with_stats.avg_latency.as_secs();
Self {
address,
last_seen,
connection_attempts,
rejected_message_count,
avg_latency,
}
}
Expand Down
1 change: 1 addition & 0 deletions applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bincode = "1.3.1"
borsh = "0.9.3"
chrono = { version = "0.4.19", default-features = false }
clap = { version = "3.1.1", features = ["derive", "env"] }
console-subscriber = "0.1.8"
config = { version = "0.13.0" }
crossterm = { version = "0.23.1", features = ["event-stream"] }
derive_more = "0.99.17"
Expand Down
66 changes: 39 additions & 27 deletions applications/tari_base_node/src/commands/command/get_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl CommandContext {
pub async fn get_peer(&self, partial: Vec<u8>, original_str: String) -> Result<(), Error> {
let peer_manager = self.comms.peer_manager();
let peers = peer_manager.find_all_starts_with(&partial).await?;
let peer = {
if let Some(peer) = peers.into_iter().next() {
peer
let peers = {
if !peers.is_empty() {
peers
} else {
let pk = parse_emoji_id_or_public_key(&original_str).ok_or_else(|| ArgsError::NoPeerMatching {
original_str: original_str.clone(),
Expand All @@ -76,33 +76,45 @@ impl CommandContext {
.find_by_public_key(&pk)
.await?
.ok_or(ArgsError::NoPeerMatching { original_str })?;
peer
vec![peer]
}
};

let eid = EmojiId::from_public_key(&peer.public_key).to_emoji_string();
println!("Emoji ID: {}", eid);
println!("Public Key: {}", peer.public_key);
println!("NodeId: {}", peer.node_id);
println!("Addresses:");
peer.addresses.iter().for_each(|a| {
println!("- {}", a);
});
println!("User agent: {}", peer.user_agent);
println!("Features: {:?}", peer.features);
println!("Flags: {:?}", peer.flags);
println!("Supported protocols:");
peer.supported_protocols.iter().for_each(|p| {
println!("- {}", String::from_utf8_lossy(p));
});
if let Some(dt) = peer.banned_until() {
println!("Banned until {}, reason: {}", dt, peer.banned_reason);
}
if let Some(dt) = peer.last_seen() {
println!("Last seen: {}", dt);
}
if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) {
println!("Last updated: {} (UTC)", updated_at);
for peer in peers {
let eid = EmojiId::from_public_key(&peer.public_key).to_emoji_string();
println!("Emoji ID: {}", eid);
println!("Public Key: {}", peer.public_key);
println!("NodeId: {}", peer.node_id);
println!("Addresses:");
peer.addresses.addresses().iter().for_each(|a| {
println!(
"- {} Score: {} - Latency: {:?} - Last Seen: {} - Last Failure:{}",
a.address(),
a.quality_score,
a.avg_latency,
a.last_seen
.as_ref()
.map(|t| t.to_string())
.unwrap_or_else(|| "Never".to_string()),
a.last_failed_reason.as_ref().unwrap_or(&"None".to_string())
);
});
println!("User agent: {}", peer.user_agent);
println!("Features: {:?}", peer.features);
println!("Flags: {:?}", peer.flags);
println!("Supported protocols:");
peer.supported_protocols.iter().for_each(|p| {
println!("- {}", String::from_utf8_lossy(p));
});
if let Some(dt) = peer.banned_until() {
println!("Banned until {}, reason: {}", dt, peer.banned_reason);
}
if let Some(dt) = peer.last_seen() {
println!("Last seen: {}", dt);
}
if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) {
println!("Last updated: {} (UTC)", updated_at);
}
}
Ok(())
}
Expand Down
24 changes: 10 additions & 14 deletions applications/tari_base_node/src/commands/command/list_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ impl CommandContext {
_ => false,
})
}
let peers = self.comms.peer_manager().perform_query(query).await?;
let mut peers = self.comms.peer_manager().perform_query(query).await?;
let num_peers = peers.len();
println!();
let mut table = Table::new();
table.set_titles(vec!["NodeId", "Public Key", "Role", "User Agent", "Info"]);

peers.sort_by(|a, b| a.node_id.cmp(&b.node_id));
for peer in peers {
let info_str = {
let mut s = vec![];
Expand All @@ -70,17 +71,6 @@ impl CommandContext {
if !peer.is_banned() {
s.push("OFFLINE".to_string());
}
} else if let Some(dt) = peer.last_seen() {
s.push(format!(
"LAST_SEEN: {}",
Utc::now()
.naive_utc()
.signed_duration_since(dt)
.to_std()
.map(format_duration_basic)
.unwrap_or_else(|_| "?".into())
));
} else {
}

if let Some(dt) = peer.banned_until() {
Expand All @@ -101,8 +91,14 @@ impl CommandContext {
s.push(format!("chain height: {}", metadata.metadata.height_of_longest_chain()));
}

if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) {
s.push(format!("updated_at: {} (UTC)", updated_at));
if let Some(last_seen) = peer.addresses.last_seen() {
let duration = Utc::now()
.naive_utc()
.signed_duration_since(last_seen)
.to_std()
.map(format_duration_basic)
.unwrap_or_else(|_| "?".into());
s.push(format!("last seen: {}", duration));
}

if s.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ impl CommandContext {
.comms
.peer_manager()
.update_each(|mut peer| {
if peer.is_offline() {
peer.set_offline(false);
Some(peer)
} else {
None
}
peer.addresses.reset_connection_attempts();
Some(peer)
})
.await?;

Expand Down
13 changes: 12 additions & 1 deletion applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
/// `whoami` - Displays identity information about this Base Node and it's wallet
/// `quit` - Exits the Base Node
/// `exit` - Same as quit
use std::{process, sync::Arc};
use std::{panic, process, sync::Arc};

use clap::Parser;
use log::*;
Expand All @@ -85,6 +85,14 @@ const LOG_TARGET: &str = "tari::base_node::app";

/// Application entry point
fn main() {
// Setup a panic hook which prints the default rust panic message but also exits the process. This makes a panic in
// any thread "crash" the system instead of silently continuing.
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |info| {
default_hook(info);
process::exit(1);
}));

if let Err(err) = main_inner() {
eprintln!("{:?}", err);
let exit_code = err.exit_code;
Expand All @@ -107,6 +115,9 @@ fn main_inner() -> Result<(), ExitError> {
let config_path = cli.common.config_path();
let cfg = load_configuration(config_path, true, &cli)?;

// Tokio console init
console_subscriber::init();

initialize_logging(
&cli.common.log_config_path("base_node"),
include_str!("../log4rs_sample.yml"),
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,11 @@ pub async fn start_wallet(

let net_address = base_node
.addresses
.first()
.best()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?;

wallet
.set_base_node_peer(base_node.public_key.clone(), net_address.address.clone())
.set_base_node_peer(base_node.public_key.clone(), net_address.address().clone())
.await
.map_err(|e| {
ExitError::new(
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/ui/components/menu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_app_utilities::consts;
use tari_comms::runtime::Handle;
use tokio::runtime::Handle;
use tui::{
backend::Backend,
layout::{Constraint, Direction, Layout, Rect},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl<B: Backend> Component<B> for NetworkTab {
let public_key = base_node.public_key.to_hex();
let address = base_node
.addresses
.first()
.best()
.map(|a| a.to_string())
.unwrap_or_else(|| "".to_string());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// Currently notifications are only added from the wallet_event_monitor which has
// add_notification method.

use tari_comms::runtime::Handle;
use tokio::runtime::Handle;
use tui::{
backend::Backend,
layout::{Constraint, Layout, Rect},
Expand Down
12 changes: 6 additions & 6 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ impl AppStateInner {
self.wallet
.set_base_node_peer(
peer.public_key.clone(),
peer.addresses.first().ok_or(UiError::NoAddress)?.address.clone(),
peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone(),
)
.await?;

Expand All @@ -869,7 +869,7 @@ impl AppStateInner {
target: LOG_TARGET,
"Setting new base node peer for wallet: {}::{}",
peer.public_key,
peer.addresses.first().ok_or(UiError::NoAddress)?.to_string(),
peer.addresses.best().ok_or(UiError::NoAddress)?.to_string(),
);

Ok(())
Expand All @@ -879,7 +879,7 @@ impl AppStateInner {
self.wallet
.set_base_node_peer(
peer.public_key.clone(),
peer.addresses.first().ok_or(UiError::NoAddress)?.address.clone(),
peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone(),
)
.await?;

Expand All @@ -900,13 +900,13 @@ impl AppStateInner {
.set_client_key_value(CUSTOM_BASE_NODE_PUBLIC_KEY_KEY.to_string(), peer.public_key.to_string())?;
self.wallet.db.set_client_key_value(
CUSTOM_BASE_NODE_ADDRESS_KEY.to_string(),
peer.addresses.first().ok_or(UiError::NoAddress)?.to_string(),
peer.addresses.best().ok_or(UiError::NoAddress)?.to_string(),
)?;
info!(
target: LOG_TARGET,
"Setting custom base node peer for wallet: {}::{}",
peer.public_key,
peer.addresses.first().ok_or(UiError::NoAddress)?.to_string(),
peer.addresses.best().ok_or(UiError::NoAddress)?.to_string(),
);

Ok(())
Expand All @@ -917,7 +917,7 @@ impl AppStateInner {
self.wallet
.set_base_node_peer(
previous.public_key.clone(),
previous.addresses.first().ok_or(UiError::NoAddress)?.address.clone(),
previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone(),
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/utils/formatting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn display_compressed_string(string: String, len_first: usize, len_last: usi

/// Utility function to display the first net address of a &Peer as a String
pub fn display_address(peer: &Peer) -> String {
match peer.addresses.first() {
match peer.addresses.best() {
Some(address) => address.to_string(),
None => "".to_string(),
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ async fn test_reorg() {
}

static EMISSION: [u64; 2] = [10, 10];
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[allow(clippy::too_many_lines)]
#[allow(clippy::identity_op)]
async fn receive_and_propagate_transaction() {
Expand Down
Loading