From e48f8675755b8f5a614a4fb8cea94d7cf179617a Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Mon, 7 Mar 2022 08:21:26 +0000 Subject: [PATCH] refactor: use clap as a commands parser (#3867) Description --- The PR refactors replace our own parser with a derived by the `clap` crate. It also adds the `watch` command, and avoids unexpected status printing. Motivation and Context --- Improve the user experience of the node's shell. Simplify maintenance and adding new commands. Also periodically status prnting breaks the rustyline and prompt chars not printed properly. How Has This Been Tested? --- Manually --- Cargo.lock | 41 +- .../tari_app_utilities/src/utilities.rs | 2 + applications/tari_base_node/Cargo.toml | 5 +- .../tari_base_node/src/commands/args.rs | 96 -- .../src/commands/command/ban_peer.rs | 83 ++ .../src/commands/command/block_timing.rs | 48 + .../src/commands/command/check_db.rs | 63 + .../src/commands/command/check_for_updates.rs | 39 + .../src/commands/command/dial_peer.rs | 36 + .../src/commands/command/discover_peer.rs | 40 + .../src/commands/command/get_block.rs | 62 + .../commands/command/get_chain_metadata.rs | 24 + .../src/commands/command/get_db_stats.rs | 93 ++ .../src/commands/command/get_mempool_state.rs | 69 + .../src/commands/command/get_mempool_stats.rs | 25 + .../src/commands/command/get_network_stats.rs | 72 + .../src/commands/command/get_peer.rs | 86 ++ .../src/commands/command/get_state_info.rs | 24 + .../src/commands/command/header_stats.rs | 147 ++ .../src/commands/command/list_banned_peers.rs | 31 + .../src/commands/command/list_connections.rs | 89 ++ .../src/commands/command/list_headers.rs | 47 + .../src/commands/command/list_peers.rs | 117 ++ .../src/commands/command/list_reorgs.rs | 49 + .../src/commands/command/mod.rs | 284 ++++ .../src/commands/command/period_stats.rs | 110 ++ .../src/commands/command/ping_peer.rs | 53 + .../src/commands/command/quit.rs | 22 + .../commands/command/reset_offline_peers.rs | 35 + .../src/commands/command/rewind_blockchain.rs | 31 + .../src/commands/command/search_kernel.rs | 44 + .../src/commands/command/search_utxo.rs | 38 + .../src/commands/command/status.rs | 119 ++ .../src/commands/command/unban_all_peers.rs | 32 + .../src/commands/command/version.rs | 40 + .../src/commands/command/watch_command.rs | 38 + .../src/commands/command/whoami.rs | 25 + .../src/commands/command_handler.rs | 1212 ----------------- .../tari_base_node/src/commands/mod.rs | 6 +- .../tari_base_node/src/commands/nom_parser.rs | 81 ++ .../tari_base_node/src/commands/parser.rs | 79 +- .../tari_base_node/src/commands/performer.rs | 413 ------ .../src/commands/status_line.rs | 8 + applications/tari_base_node/src/main.rs | 98 +- 44 files changed, 2346 insertions(+), 1810 deletions(-) create mode 100644 applications/tari_base_node/src/commands/command/ban_peer.rs create mode 100644 applications/tari_base_node/src/commands/command/block_timing.rs create mode 100644 applications/tari_base_node/src/commands/command/check_db.rs create mode 100644 applications/tari_base_node/src/commands/command/check_for_updates.rs create mode 100644 applications/tari_base_node/src/commands/command/dial_peer.rs create mode 100644 applications/tari_base_node/src/commands/command/discover_peer.rs create mode 100644 applications/tari_base_node/src/commands/command/get_block.rs create mode 100644 applications/tari_base_node/src/commands/command/get_chain_metadata.rs create mode 100644 applications/tari_base_node/src/commands/command/get_db_stats.rs create mode 100644 applications/tari_base_node/src/commands/command/get_mempool_state.rs create mode 100644 applications/tari_base_node/src/commands/command/get_mempool_stats.rs create mode 100644 applications/tari_base_node/src/commands/command/get_network_stats.rs create mode 100644 applications/tari_base_node/src/commands/command/get_peer.rs create mode 100644 applications/tari_base_node/src/commands/command/get_state_info.rs create mode 100644 applications/tari_base_node/src/commands/command/header_stats.rs create mode 100644 applications/tari_base_node/src/commands/command/list_banned_peers.rs create mode 100644 applications/tari_base_node/src/commands/command/list_connections.rs create mode 100644 applications/tari_base_node/src/commands/command/list_headers.rs create mode 100644 applications/tari_base_node/src/commands/command/list_peers.rs create mode 100644 applications/tari_base_node/src/commands/command/list_reorgs.rs create mode 100644 applications/tari_base_node/src/commands/command/mod.rs create mode 100644 applications/tari_base_node/src/commands/command/period_stats.rs create mode 100644 applications/tari_base_node/src/commands/command/ping_peer.rs create mode 100644 applications/tari_base_node/src/commands/command/quit.rs create mode 100644 applications/tari_base_node/src/commands/command/reset_offline_peers.rs create mode 100644 applications/tari_base_node/src/commands/command/rewind_blockchain.rs create mode 100644 applications/tari_base_node/src/commands/command/search_kernel.rs create mode 100644 applications/tari_base_node/src/commands/command/search_utxo.rs create mode 100644 applications/tari_base_node/src/commands/command/status.rs create mode 100644 applications/tari_base_node/src/commands/command/unban_all_peers.rs create mode 100644 applications/tari_base_node/src/commands/command/version.rs create mode 100644 applications/tari_base_node/src/commands/command/watch_command.rs create mode 100644 applications/tari_base_node/src/commands/command/whoami.rs delete mode 100644 applications/tari_base_node/src/commands/command_handler.rs create mode 100644 applications/tari_base_node/src/commands/nom_parser.rs delete mode 100644 applications/tari_base_node/src/commands/performer.rs diff --git a/Cargo.lock b/Cargo.lock index 9ae766e3dc3..c3a6c12b406 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1024,19 +1024,34 @@ dependencies = [ [[package]] name = "clap" -version = "3.0.14" +version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62" +checksum = "6d76c22c9b9b215eeb8d016ad3a90417bd13cb24cf8142756e6472445876cab7" dependencies = [ "atty", "bitflags 1.3.2", + "clap_derive", "indexmap", + "lazy_static 1.4.0", "os_str_bytes", "strsim 0.10.0", "termcolor", "textwrap 0.14.2", ] +[[package]] +name = "clap_derive" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd1122e63869df2cb309f449da1ad54a7c6dfeb7c7e6ccd8e0825d9eb93bb72" +dependencies = [ + "heck 0.4.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "clear_on_drop" version = "0.2.4" @@ -3720,6 +3735,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.3.7" @@ -4000,6 +4021,17 @@ dependencies = [ "version_check 0.9.4", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check 0.9.4", +] + [[package]] name = "notify-rust" version = "4.5.6" @@ -6504,8 +6536,10 @@ name = "tari_base_node" version = "0.28.1" dependencies = [ "anyhow", + "async-trait", "bincode", "chrono", + "clap 3.1.1", "config 0.9.3", "crossterm 0.22.1", "derive_more", @@ -6513,6 +6547,7 @@ dependencies = [ "futures 0.3.21", "log", "log-mdc", + "nom 7.1.0", "num_cpus", "opentelemetry", "opentelemetry-jaeger", @@ -7413,7 +7448,7 @@ dependencies = [ "attohttpc", "bincode", "cfg_aliases", - "clap 3.0.14", + "clap 3.1.1", "dirs-next 2.0.0", "either", "embed_plist", diff --git a/applications/tari_app_utilities/src/utilities.rs b/applications/tari_app_utilities/src/utilities.rs index cdaa6365bc8..0470f1f1609 100644 --- a/applications/tari_app_utilities/src/utilities.rs +++ b/applications/tari_app_utilities/src/utilities.rs @@ -205,6 +205,7 @@ pub fn either_to_node_id(either: Either) -> NodeId { } } +#[derive(Debug)] pub struct UniPublicKey(PublicKey); impl FromStr for UniPublicKey { @@ -227,6 +228,7 @@ impl From for PublicKey { } } +#[derive(Debug)] pub enum UniNodeId { PublicKey(PublicKey), NodeId(NodeId), diff --git a/applications/tari_base_node/Cargo.toml b/applications/tari_base_node/Cargo.toml index f1c9658186f..32769158ea4 100644 --- a/applications/tari_base_node/Cargo.toml +++ b/applications/tari_base_node/Cargo.toml @@ -24,8 +24,10 @@ tari_shutdown = { path = "../../infrastructure/shutdown" } tari_utilities = "0.3.0" anyhow = "1.0.53" +async-trait = "0.1.52" bincode = "1.3.1" chrono = { version = "0.4.19", default-features = false } +clap = { version = "3.1.1", features = ["derive"] } config = { version = "0.9.3" } crossterm = "0.22" derive_more = "0.99.17" @@ -34,10 +36,11 @@ futures = { version = "^0.3.16", default-features = false, features = ["alloc"] log = { version = "0.4.8", features = ["std"] } log-mdc = "0.1.0" num_cpus = "1" +nom = "7.1.0" regex = "1" rustyline = "9.0" rustyline-derive = "0.5" -strum = "0.22" +strum = { version = "0.22", features = ["derive"] } strum_macros = "0.22" thiserror = "^1.0.26" tokio = { version = "1.11", features = ["signal"] } diff --git a/applications/tari_base_node/src/commands/args.rs b/applications/tari_base_node/src/commands/args.rs index 32c7cc16783..8b137891791 100644 --- a/applications/tari_base_node/src/commands/args.rs +++ b/applications/tari_base_node/src/commands/args.rs @@ -1,97 +1 @@ -use std::{ - iter::Peekable, - str::{FromStr, SplitWhitespace}, -}; -use tari_utilities::hex::{Hex, HexError}; -use thiserror::Error; - -#[derive(Debug, Error)] -#[error("{name} {reason}")] -pub struct ArgsError { - name: &'static str, - reason: ArgsReason, -} - -impl ArgsError { - pub fn new(name: &'static str, reason: impl Into) -> Self { - Self { - name, - reason: reason.into(), - } - } -} - -#[derive(Debug, Error)] -pub enum ArgsReason { - #[error("argument required")] - Required, - #[error("argument can't be parsed: {details}")] - NotParsed { details: String }, - #[error("argument is not valid: {description}")] - Inconsistent { description: String }, -} - -impl> From for ArgsReason { - fn from(value: T) -> Self { - Self::Inconsistent { - description: value.as_ref().to_owned(), - } - } -} - -pub struct Args<'a> { - splitted: Peekable>, -} - -impl<'a> Args<'a> { - pub fn split(s: &'a str) -> Self { - Self { - splitted: s.split_whitespace().peekable(), - } - } - - // TODO: Remove - pub fn shift_one(&mut self) { - self.splitted.next(); - } - - // TODO: Use `next` always - pub fn try_take_next(&mut self, name: &'static str) -> Result, ArgsError> - where - T: FromStr, - T::Err: ToString, - { - match self.splitted.peek().map(|s| s.parse()) { - Some(Ok(value)) => Ok(Some(value)), - Some(Err(err)) => Err(ArgsError::new(name, ArgsReason::NotParsed { - details: err.to_string(), - })), - None => Ok(None), - } - } - - pub fn take_next(&mut self, name: &'static str) -> Result - where - T: FromStr, - T::Err: ToString, - { - match self.try_take_next(name)? { - Some(value) => { - self.shift_one(); - Ok(value) - }, - None => Err(ArgsError::new(name, ArgsReason::Required)), - } - } -} - -pub struct FromHex(pub T); - -impl FromStr for FromHex { - type Err = HexError; - - fn from_str(s: &str) -> Result { - T::from_hex(s).map(Self) - } -} diff --git a/applications/tari_base_node/src/commands/command/ban_peer.rs b/applications/tari_base_node/src/commands/command/ban_peer.rs new file mode 100644 index 00000000000..dfa1ce28bf5 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/ban_peer.rs @@ -0,0 +1,83 @@ +use std::time::Duration; + +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::utilities::UniNodeId; +use tari_comms::peer_manager::NodeId; + +use super::{CommandContext, HandleCommand}; +use crate::LOG_TARGET; + +/// Bans a peer +#[derive(Debug, Parser)] +pub struct ArgsBan { + /// hex public key or emoji id + node_id: UniNodeId, + /// length of time to ban the peer for in seconds + #[clap(default_value_t = std::u64::MAX)] + length: u64, +} + +/// Removes a peer ban +#[derive(Debug, Parser)] +pub struct ArgsUnban { + /// hex public key or emoji id + node_id: UniNodeId, + /// length of time to ban the peer for in seconds + #[clap(default_value_t = std::u64::MAX)] + length: u64, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: ArgsBan) -> Result<(), Error> { + let node_id = args.node_id.into(); + let duration = Duration::from_secs(args.length); + self.ban_peer(node_id, duration, true).await + } +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: ArgsUnban) -> Result<(), Error> { + let node_id = args.node_id.into(); + let duration = Duration::from_secs(args.length); + self.ban_peer(node_id, duration, false).await + } +} + +impl CommandContext { + pub async fn ban_peer(&mut self, node_id: NodeId, duration: Duration, must_ban: bool) -> Result<(), Error> { + if self.base_node_identity.node_id() == &node_id { + println!("Cannot ban our own node"); + } else if must_ban { + // TODO: Use errors + match self + .connectivity + .ban_peer_until(node_id.clone(), duration, "UI manual ban".to_string()) + .await + { + Ok(_) => println!("Peer was banned in base node."), + Err(err) => { + println!("Failed to ban peer: {:?}", err); + log::error!(target: LOG_TARGET, "Could not ban peer: {:?}", err); + }, + } + } else { + match self.peer_manager.unban_peer(&node_id).await { + Ok(_) => { + println!("Peer ban was removed from base node."); + }, + Err(err) if err.is_peer_not_found() => { + println!("Peer not found in base node"); + }, + Err(err) => { + println!("Failed to ban peer: {:?}", err); + log::error!(target: LOG_TARGET, "Could not ban peer: {:?}", err); + }, + } + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/block_timing.rs b/applications/tari_base_node/src/commands/command/block_timing.rs new file mode 100644 index 00000000000..ccedc4d217b --- /dev/null +++ b/applications/tari_base_node/src/commands/command/block_timing.rs @@ -0,0 +1,48 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_core::blocks::BlockHeader; + +use super::{CommandContext, HandleCommand}; + +/// Calculates the maximum, minimum, and average time taken to mine a given range of blocks +#[derive(Debug, Parser)] +pub struct Args { + /// number of blocks from chain tip or start height + start: u64, + /// end height + end: Option, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + // TODO: is that possible to validate it with clap? + if args.end.is_none() && args.start < 2 { + Err(Error::msg("Number of headers must be at least 2.")) + } else { + self.block_timing(args.start, args.end).await + } + } +} + +impl CommandContext { + pub async fn block_timing(&self, start: u64, end: Option) -> Result<(), Error> { + let headers = self.get_chain_headers(start, end).await?; + if !headers.is_empty() { + let headers = headers.into_iter().map(|ch| ch.into_header()).rev().collect::>(); + let (max, min, avg) = BlockHeader::timing_stats(&headers); + println!( + "Timing for blocks #{} - #{}", + headers.first().unwrap().height, + headers.last().unwrap().height + ); + println!("Max block time: {}", max); + println!("Min block time: {}", min); + println!("Avg block time: {}", avg); + } else { + println!("No headers found"); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/check_db.rs b/applications/tari_base_node/src/commands/command/check_db.rs new file mode 100644 index 00000000000..3f793e3cdf7 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/check_db.rs @@ -0,0 +1,63 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tokio::io::{self, AsyncWriteExt}; + +use super::{CommandContext, HandleCommand}; +use crate::LOG_TARGET; + +/// Checks the blockchain database for missing blocks and headers +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.check_db().await + } +} + +impl CommandContext { + /// Function to process the check-db command + pub async fn check_db(&mut self) -> Result<(), Error> { + let meta = self.node_service.get_metadata().await?; + let mut height = meta.height_of_longest_chain(); + let mut missing_blocks = Vec::new(); + let mut missing_headers = Vec::new(); + print!("Searching for height: "); + // We need to check every header, but not every block. + let horizon_height = meta.horizon_block(height); + while height > 0 { + print!("{}", height); + io::stdout().flush().await?; + // we can only check till the pruning horizon, 0 is archive node so it needs to check every block. + if height > horizon_height { + match self.node_service.get_block(height).await { + Err(err) => { + // We need to check the data itself, as FetchMatchingBlocks will suppress any error, only + // logging it. + log::error!(target: LOG_TARGET, "{}", err); + missing_blocks.push(height); + }, + Ok(Some(_)) => {}, + Ok(None) => missing_blocks.push(height), + }; + } + height -= 1; + let next_header = self.node_service.get_header(height).await.ok().flatten(); + if next_header.is_none() { + // this header is missing, so we stop here and need to ask for this header + missing_headers.push(height); + }; + print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); + } + println!("Complete"); + for missing_block in missing_blocks { + println!("Missing block at height: {}", missing_block); + } + for missing_header_height in missing_headers { + println!("Missing header at height: {}", missing_header_height) + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/check_for_updates.rs b/applications/tari_base_node/src/commands/command/check_for_updates.rs new file mode 100644 index 00000000000..7b25e212dbc --- /dev/null +++ b/applications/tari_base_node/src/commands/command/check_for_updates.rs @@ -0,0 +1,39 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::consts; + +use super::{CommandContext, HandleCommand}; + +/// Checks for software updates if auto update is enabled +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.check_for_updates().await + } +} + +impl CommandContext { + /// Check for updates + pub async fn check_for_updates(&mut self) -> Result<(), Error> { + println!("Checking for updates (current version: {})...", consts::APP_VERSION); + match self.software_updater.check_for_updates().await { + Some(update) => { + println!( + "Version {} of the {} is available: {} (sha: {})", + update.version(), + update.app(), + update.download_url(), + update.to_hash_hex() + ); + }, + None => { + println!("No updates found.",); + }, + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/dial_peer.rs b/applications/tari_base_node/src/commands/command/dial_peer.rs new file mode 100644 index 00000000000..6d998425104 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/dial_peer.rs @@ -0,0 +1,36 @@ +use std::time::Instant; + +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::utilities::UniNodeId; +use tari_comms::peer_manager::NodeId; + +use super::{CommandContext, HandleCommand}; + +/// Attempt to connect to a known peer +#[derive(Debug, Parser)] +pub struct Args { + /// hex public key or emoji id + node_id: UniNodeId, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.dial_peer(args.node_id.into()).await + } +} + +impl CommandContext { + /// Function to process the dial-peer command + pub async fn dial_peer(&self, dest_node_id: NodeId) -> Result<(), Error> { + let start = Instant::now(); + println!("☎️ Dialing peer..."); + + let connection = self.connectivity.dial_peer(dest_node_id).await?; + println!("⚡️ Peer connected in {}ms!", start.elapsed().as_millis()); + println!("Connection: {}", connection); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/discover_peer.rs b/applications/tari_base_node/src/commands/command/discover_peer.rs new file mode 100644 index 00000000000..8e700db9371 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/discover_peer.rs @@ -0,0 +1,40 @@ +use std::{ops::Deref, time::Instant}; + +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::utilities::UniPublicKey; +use tari_comms_dht::envelope::NodeDestination; +use tari_crypto::ristretto::RistrettoPublicKey; + +use super::{CommandContext, HandleCommand}; + +/// Attempt to discover a peer on the Tari network +#[derive(Debug, Parser)] +pub struct Args { + /// hex public key or emoji id + id: UniPublicKey, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.discover_peer(Box::new(args.id.into())).await + } +} + +impl CommandContext { + /// Function to process the discover-peer command + pub async fn discover_peer(&mut self, dest_pubkey: Box) -> Result<(), Error> { + let start = Instant::now(); + println!("🌎 Peer discovery started."); + let peer = self + .discovery_service + .discover_peer(dest_pubkey.deref().clone(), NodeDestination::PublicKey(dest_pubkey)) + .await?; + println!("⚡️ Discovery succeeded in {}ms!", start.elapsed().as_millis()); + println!("This peer was found:"); + println!("{}", peer); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_block.rs b/applications/tari_base_node/src/commands/command/get_block.rs new file mode 100644 index 00000000000..2e9f5a56851 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_block.rs @@ -0,0 +1,62 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_common_types::types::HashOutput; +use tari_utilities::message_format::MessageFormat; + +use super::{CommandContext, HandleCommand, TypeOrHex}; +use crate::commands::parser::Format; + +/// Display a block by height or hash +#[derive(Debug, Parser)] +pub struct Args { + /// The height or hash of the block to fetch + /// from the main chain. The genesis block + /// has height zero. + value: TypeOrHex, + /// Supported options are 'json' and 'text'. 'text' is the default if omitted. + #[clap(default_value_t)] + format: Format, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + let format = args.format; + match args.value { + TypeOrHex::Type(value) => self.get_block(value, format).await, + TypeOrHex::Hex(hex) => self.get_block_by_hash(hex.0, format).await, + } + } +} + +impl CommandContext { + pub async fn get_block(&self, height: u64, format: Format) -> Result<(), Error> { + let mut data = self.blockchain_db.fetch_blocks(height..=height).await?; + match (data.pop(), format) { + (Some(block), Format::Text) => { + let block_data = self + .blockchain_db + .fetch_block_accumulated_data(block.hash().clone()) + .await?; + + println!("{}", block); + println!("-- Accumulated data --"); + println!("{}", block_data); + }, + (Some(block), Format::Json) => println!("{}", block.to_json()?), + (None, _) => println!("Block not found at height {}", height), + } + Ok(()) + } + + pub async fn get_block_by_hash(&self, hash: HashOutput, format: Format) -> Result<(), Error> { + let data = self.blockchain_db.fetch_block_by_hash(hash).await?; + match (data, format) { + (Some(block), Format::Text) => println!("{}", block), + (Some(block), Format::Json) => println!("{}", block.to_json()?), + (None, _) => println!("Block not found"), + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_chain_metadata.rs b/applications/tari_base_node/src/commands/command/get_chain_metadata.rs new file mode 100644 index 00000000000..faa8ad698bb --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_chain_metadata.rs @@ -0,0 +1,24 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +/// Gets your base node chain meta data +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.get_chain_meta().await + } +} + +impl CommandContext { + pub async fn get_chain_meta(&mut self) -> Result<(), Error> { + let data = self.node_service.get_metadata().await?; + println!("{}", data); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_db_stats.rs b/applications/tari_base_node/src/commands/command/get_db_stats.rs new file mode 100644 index 00000000000..6e8716a130b --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_db_stats.rs @@ -0,0 +1,93 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; +use crate::table::Table; + +/// Gets your base node database stats +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.get_blockchain_db_stats().await + } +} + +impl CommandContext { + pub async fn get_blockchain_db_stats(&self) -> Result<(), Error> { + const BYTES_PER_MB: usize = 1024 * 1024; + + let stats = self.blockchain_db.get_stats().await?; + let mut table = Table::new(); + table.set_titles(vec![ + "Name", + "Entries", + "Depth", + "Branch Pages", + "Leaf Pages", + "Overflow Pages", + "Est. Size (MiB)", + "% of total", + ]); + let total_db_size = stats.db_stats().iter().map(|s| s.total_page_size()).sum::(); + stats.db_stats().iter().for_each(|stat| { + table.add_row(row![ + stat.name, + stat.entries, + stat.depth, + stat.branch_pages, + stat.leaf_pages, + stat.overflow_pages, + format!("{:.2}", stat.total_page_size() as f32 / BYTES_PER_MB as f32), + format!("{:.2}%", (stat.total_page_size() as f32 / total_db_size as f32) * 100.0) + ]); + }); + + table.print_stdout(); + println!(); + println!( + "{} databases, {:.2} MiB used ({:.2}%), page size: {} bytes, env_info = ({})", + stats.root().entries, + total_db_size as f32 / BYTES_PER_MB as f32, + (total_db_size as f32 / stats.env_info().mapsize as f32) * 100.0, + stats.root().psize as usize, + stats.env_info() + ); + + println!(); + println!("Totalling DB entry sizes. This may take a few seconds..."); + println!(); + let stats = self.blockchain_db.fetch_total_size_stats().await?; + println!(); + let mut table = Table::new(); + table.set_titles(vec![ + "Name", + "Entries", + "Total Size (MiB)", + "Avg. Size/Entry (bytes)", + "% of total", + ]); + let total_data_size = stats.sizes().iter().map(|s| s.total()).sum::(); + stats.sizes().iter().for_each(|size| { + let total = size.total() as f32 / BYTES_PER_MB as f32; + table.add_row(row![ + size.name, + size.num_entries, + format!("{:.2}", total), + format!("{}", size.avg_bytes_per_entry()), + format!("{:.2}%", (size.total() as f32 / total_data_size as f32) * 100.0) + ]) + }); + table.print_stdout(); + println!(); + println!( + "Total blockchain data size: {:.2} MiB ({:.2} % of LMDB map size)", + total_data_size as f32 / BYTES_PER_MB as f32, + (total_data_size as f32 / total_db_size as f32) * 100.0 + ); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_mempool_state.rs b/applications/tari_base_node/src/commands/command/get_mempool_state.rs new file mode 100644 index 00000000000..ea3b1ba297a --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_mempool_state.rs @@ -0,0 +1,69 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_utilities::hex::Hex; + +use super::{CommandContext, HandleCommand}; + +/// Retrieves your mempools state +#[derive(Debug, Parser)] +pub struct Args {} + +/// Filters and retrieves details about transactions from the mempool's state +#[derive(Debug, Parser)] +pub struct ArgsTx { + filter: String, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.get_mempool_state(None).await + } +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: ArgsTx) -> Result<(), Error> { + self.get_mempool_state(Some(args.filter)).await + } +} + +impl CommandContext { + /// Function to process the get-mempool-state command + pub async fn get_mempool_state(&mut self, filter: Option) -> Result<(), Error> { + let state = self.mempool_service.get_mempool_state().await?; + println!("----------------- Mempool -----------------"); + println!("--- Unconfirmed Pool ---"); + for tx in &state.unconfirmed_pool { + let tx_sig = tx + .first_kernel_excess_sig() + .map(|sig| sig.get_signature().to_hex()) + .unwrap_or_else(|| "N/A".to_string()); + if let Some(ref filter) = filter { + if !tx_sig.contains(filter) { + println!("--- TX: {} ---", tx_sig); + println!("{}", tx.body); + continue; + } + } else { + println!( + " {} Fee: {}, Outputs: {}, Kernels: {}, Inputs: {}, metadata: {} bytes", + tx_sig, + tx.body.get_total_fee(), + tx.body.outputs().len(), + tx.body.kernels().len(), + tx.body.inputs().len(), + tx.body.sum_metadata_size(), + ); + } + } + if filter.is_none() { + println!("--- Reorg Pool ---"); + for excess_sig in &state.reorg_pool { + println!(" {}", excess_sig.get_signature().to_hex()); + } + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_mempool_stats.rs b/applications/tari_base_node/src/commands/command/get_mempool_stats.rs new file mode 100644 index 00000000000..9df00c38e4d --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_mempool_stats.rs @@ -0,0 +1,25 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +/// Retrieves your mempools stats +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.get_mempool_stats().await + } +} + +impl CommandContext { + /// Function to process the get-mempool-stats command + pub async fn get_mempool_stats(&mut self) -> Result<(), Error> { + let stats = self.mempool_service.get_mempool_stats().await?; + println!("{}", stats); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_network_stats.rs b/applications/tari_base_node/src/commands/command/get_network_stats.rs new file mode 100644 index 00000000000..76a86ccdcc4 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_network_stats.rs @@ -0,0 +1,72 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; +use crate::table::Table; + +/// Displays network stats +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.get_network_stats() + } +} + +impl CommandContext { + #[cfg(not(feature = "metrics"))] + pub fn get_network_stats(&self) -> Result<(), Error> { + println!( + "Metrics are not enabled in this binary. Recompile Tari base node with `--features metrics` to enable \ + them." + ); + Ok(()) + } + + #[cfg(feature = "metrics")] + pub fn get_network_stats(&self) -> Result<(), Error> { + use tari_metrics::proto::MetricType; + let metric_families = tari_metrics::get_default_registry().gather(); + let metric_family_iter = metric_families + .into_iter() + .filter(|family| family.get_name().starts_with("tari_comms")); + + // TODO: Make this useful + let mut table = Table::new(); + table.set_titles(vec!["name", "type", "value"]); + for family in metric_family_iter { + let field_type = family.get_field_type(); + let name = family.get_name(); + for metric in family.get_metric() { + let value = match field_type { + MetricType::COUNTER => metric.get_counter().get_value(), + MetricType::GAUGE => metric.get_gauge().get_value(), + MetricType::SUMMARY => { + let summary = metric.get_summary(); + summary.get_sample_sum() / summary.get_sample_count() as f64 + }, + MetricType::UNTYPED => metric.get_untyped().get_value(), + MetricType::HISTOGRAM => { + let histogram = metric.get_histogram(); + histogram.get_sample_sum() / histogram.get_sample_count() as f64 + }, + }; + + let field_type = match field_type { + MetricType::COUNTER => "COUNTER", + MetricType::GAUGE => "GAUGE", + MetricType::SUMMARY => "SUMMARY", + MetricType::UNTYPED => "UNTYPED", + MetricType::HISTOGRAM => "HISTOGRAM", + }; + + table.add_row(row![name, field_type, value]); + } + } + table.print_stdout(); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_peer.rs b/applications/tari_base_node/src/commands/command/get_peer.rs new file mode 100644 index 00000000000..ff278def5f6 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_peer.rs @@ -0,0 +1,86 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::utilities::{parse_emoji_id_or_public_key, UniNodeId}; +use tari_common_types::emoji::EmojiId; +use tari_comms::peer_manager::NodeId; +use tari_utilities::ByteArray; + +use super::{CommandContext, HandleCommand, TypeOrHex}; + +/// Get all available info about peer +#[derive(Debug, Parser)] +pub struct Args { + /// Partial NodeId | PublicKey | EmojiId + value: String, +} + +impl From> for Vec { + fn from(value: TypeOrHex) -> Self { + match value { + TypeOrHex::Type(value) => NodeId::from(value).to_vec(), + TypeOrHex::Hex(vec) => vec.0, + } + } +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + let value: TypeOrHex = args.value.parse()?; + self.get_peer(value.into(), args.value).await + } +} + +impl CommandContext { + pub async fn get_peer(&self, partial: Vec, original_str: String) -> Result<(), Error> { + let peer = match self.peer_manager.find_all_starts_with(&partial).await { + Ok(peers) if peers.is_empty() => { + if let Some(pk) = parse_emoji_id_or_public_key(&original_str) { + if let Ok(Some(peer)) = self.peer_manager.find_by_public_key(&pk).await { + peer + } else { + println!("No peer matching '{}'", original_str); + // TODO: Return error + return Ok(()); + } + } else { + println!("No peer matching '{}'", original_str); + // TODO: Return error + return Ok(()); + } + }, + Ok(mut peers) => peers.remove(0), + Err(err) => { + println!("{}", err); + // TODO: Return error + return Ok(()); + }, + }; + + let eid = EmojiId::from_pubkey(&peer.public_key); + println!("Emoji ID: {}", eid); + println!("Public Key: {}", peer.public_key); + println!("NodeId: {}", peer.node_id); + println!("Addresses:"); + peer.addresses.iter().for_each(|a| { + println!("- {}", a); + }); + println!("User agent: {}", peer.user_agent); + println!("Features: {:?}", peer.features); + println!("Supported protocols:"); + peer.supported_protocols.iter().for_each(|p| { + println!("- {}", String::from_utf8_lossy(p)); + }); + if let Some(dt) = peer.banned_until() { + println!("Banned until {}, reason: {}", dt, peer.banned_reason); + } + if let Some(dt) = peer.last_seen() { + println!("Last seen: {}", dt); + } + if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) { + println!("Last updated: {} (UTC)", updated_at); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/get_state_info.rs b/applications/tari_base_node/src/commands/command/get_state_info.rs new file mode 100644 index 00000000000..6bf513023e5 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/get_state_info.rs @@ -0,0 +1,24 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +/// Prints out the status of the base node state machine +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.state_info() + } +} + +impl CommandContext { + /// Function to process the get-state-info command + pub fn state_info(&self) -> Result<(), Error> { + println!("Current state machine state:\n{}", *self.state_machine_info.borrow()); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/header_stats.rs b/applications/tari_base_node/src/commands/command/header_stats.rs new file mode 100644 index 00000000000..9e6712f747b --- /dev/null +++ b/applications/tari_base_node/src/commands/command/header_stats.rs @@ -0,0 +1,147 @@ +use std::{cmp, io::Write}; + +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_core::proof_of_work::PowAlgorithm; +use tari_utilities::{hex::Hex, Hashable}; +use tokio::{ + fs::File, + io::{self, AsyncWriteExt}, +}; + +use super::{CommandContext, HandleCommand}; + +/// Prints out certain stats to of the block chain in csv format for easy copy, use as follows: +/// header-stats 0 1000 +/// header-stats 0 1000 sample2.csv +/// header-stats 0 1000 monero-sample.csv monero +#[derive(Debug, Parser)] +pub struct Args { + /// start height + start_height: u64, + /// end height + end_height: u64, + /// dump file + #[clap(default_value = "header-data.csv")] + filename: String, + /// filter:monero|sha3 + pow_algo: Option, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.save_header_stats(args.start_height, args.end_height, args.filename, args.pow_algo) + .await + } +} + +impl CommandContext { + pub async fn save_header_stats( + &self, + start_height: u64, + end_height: u64, + filename: String, + pow_algo: Option, + ) -> Result<(), Error> { + let mut output = File::create(&filename).await?; + + println!( + "Loading header from height {} to {} and dumping to file [working-dir]/{}.{}", + start_height, + end_height, + filename, + pow_algo.map(|a| format!(" PoW algo = {}", a)).unwrap_or_default() + ); + + let start_height = cmp::max(start_height, 1); + let mut prev_header = self.blockchain_db.fetch_chain_header(start_height - 1).await?; + + let mut buff = Vec::new(); + writeln!( + buff, + "Height,Achieved,TargetDifficulty,CalculatedDifficulty,SolveTime,NormalizedSolveTime,Algo,Timestamp,\ + Window,Acc.Monero,Acc.Sha3" + )?; + output.write_all(&buff).await?; + + for height in start_height..=end_height { + let header = self.blockchain_db.fetch_chain_header(height).await?; + + // Optionally, filter out pow algos + if pow_algo.map(|algo| header.header().pow_algo() != algo).unwrap_or(false) { + continue; + } + + let target_diff = self + .blockchain_db + .fetch_target_difficulties_for_next_block(prev_header.hash().clone()) + .await?; + let pow_algo = header.header().pow_algo(); + + let min = self + .consensus_rules + .consensus_constants(height) + .min_pow_difficulty(pow_algo); + let max = self + .consensus_rules + .consensus_constants(height) + .max_pow_difficulty(pow_algo); + + let calculated_target_difficulty = target_diff.get(pow_algo).calculate(min, max); + let existing_target_difficulty = header.accumulated_data().target_difficulty; + let achieved = header.accumulated_data().achieved_difficulty; + let solve_time = header.header().timestamp.as_u64() as i64 - prev_header.header().timestamp.as_u64() as i64; + let normalized_solve_time = cmp::min( + cmp::max(solve_time, 1) as u64, + self.consensus_rules + .consensus_constants(height) + .get_difficulty_max_block_interval(pow_algo), + ); + let acc_sha3 = header.accumulated_data().accumulated_sha_difficulty; + let acc_monero = header.accumulated_data().accumulated_monero_difficulty; + + buff.clear(); + writeln!( + buff, + "{},{},{},{},{},{},{},{},{},{},{}", + height, + achieved.as_u64(), + existing_target_difficulty.as_u64(), + calculated_target_difficulty.as_u64(), + solve_time, + normalized_solve_time, + pow_algo, + chrono::DateTime::from(header.header().timestamp), + target_diff.get(pow_algo).len(), + acc_monero.as_u64(), + acc_sha3.as_u64(), + )?; + output.write_all(&buff).await?; + + if header.header().hash() != header.accumulated_data().hash { + eprintln!( + "Difference in hash at {}! header = {} and accum hash = {}", + height, + header.header().hash().to_hex(), + header.accumulated_data().hash.to_hex() + ); + } + + if existing_target_difficulty != calculated_target_difficulty { + eprintln!( + "Difference at {}! existing = {} and calculated = {}", + height, existing_target_difficulty, calculated_target_difficulty + ); + } + + print!("{}", height); + io::stdout().flush().await?; + print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); + prev_header = header; + } + println!("Complete"); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/list_banned_peers.rs b/applications/tari_base_node/src/commands/command/list_banned_peers.rs new file mode 100644 index 00000000000..94ca9cd62bd --- /dev/null +++ b/applications/tari_base_node/src/commands/command/list_banned_peers.rs @@ -0,0 +1,31 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +/// Lists peers that have been banned by the node or wallet +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.list_banned_peers().await + } +} + +impl CommandContext { + pub async fn list_banned_peers(&self) -> Result<(), Error> { + let banned = self.fetch_banned_peers().await?; + if banned.is_empty() { + println!("No peers banned from node.") + } else { + println!("Peers banned from node ({}):", banned.len()); + for peer in banned { + println!("{}", peer); + } + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/list_connections.rs b/applications/tari_base_node/src/commands/command/list_connections.rs new file mode 100644 index 00000000000..d3551fa17bb --- /dev/null +++ b/applications/tari_base_node/src/commands/command/list_connections.rs @@ -0,0 +1,89 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_comms::peer_manager::PeerFeatures; +use tari_core::base_node::state_machine_service::states::PeerMetadata; + +use super::{CommandContext, HandleCommand}; +use crate::{table::Table, utils::format_duration_basic}; + +/// Lists the peer connections currently held by this node +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.list_connections().await + } +} + +impl CommandContext { + /// Function to process the list-connections command + pub async fn list_connections(&mut self) -> Result<(), Error> { + let conns = self.connectivity.get_active_connections().await?; + if conns.is_empty() { + println!("No active peer connections."); + } else { + println!(); + let num_connections = conns.len(); + let mut table = Table::new(); + table.set_titles(vec![ + "NodeId", + "Public Key", + "Address", + "Direction", + "Age", + "Role", + "User Agent", + "Info", + ]); + for conn in conns { + let peer = self + .peer_manager + .find_by_node_id(conn.peer_node_id()) + .await + .expect("Unexpected peer database error") + .expect("Peer not found"); + + let chain_height = peer + .get_metadata(1) + .and_then(|v| bincode::deserialize::(v).ok()) + .map(|metadata| format!("height: {}", metadata.metadata.height_of_longest_chain())); + + let ua = peer.user_agent; + table.add_row(row![ + peer.node_id, + peer.public_key, + conn.address(), + conn.direction(), + format_duration_basic(conn.age()), + { + if peer.features == PeerFeatures::COMMUNICATION_CLIENT { + "Wallet" + } else { + "Base node" + } + }, + { + if ua.is_empty() { + "" + } else { + ua.as_ref() + } + }, + format!( + "substreams: {}{}", + conn.substream_count(), + chain_height.map(|s| format!(", {}", s)).unwrap_or_default() + ), + ]); + } + + table.print_stdout(); + + println!("{} active connection(s)", num_connections); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/list_headers.rs b/applications/tari_base_node/src/commands/command/list_headers.rs new file mode 100644 index 00000000000..77e62c1228f --- /dev/null +++ b/applications/tari_base_node/src/commands/command/list_headers.rs @@ -0,0 +1,47 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_utilities::hex::Hex; + +use super::{CommandContext, HandleCommand}; +use crate::LOG_TARGET; + +/// List the amount of headers, can be called in the following two ways: +#[derive(Debug, Parser)] +pub struct Args { + /// number of headers starting from the chain tip back or the first header height (if the last set too) + start: u64, + /// last header height + end: Option, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.list_headers(args.start, args.end).await + } +} + +impl CommandContext { + /// Function to process the list-headers command + pub async fn list_headers(&self, start: u64, end: Option) -> Result<(), Error> { + let res = self.get_chain_headers(start, end).await; + match res { + Ok(h) if h.is_empty() => { + println!("No headers found"); + }, + Ok(headers) => { + for header in headers { + println!("\n\nHeader hash: {}", header.hash().to_hex()); + println!("{}", header); + } + }, + // TODO: Handle results properly + Err(err) => { + println!("Failed to retrieve headers: {:?}", err); + log::warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,); + }, + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/list_peers.rs b/applications/tari_base_node/src/commands/command/list_peers.rs new file mode 100644 index 00000000000..7a132c9939f --- /dev/null +++ b/applications/tari_base_node/src/commands/command/list_peers.rs @@ -0,0 +1,117 @@ +use anyhow::Error; +use async_trait::async_trait; +use chrono::Utc; +use clap::Parser; +use tari_comms::peer_manager::{PeerFeatures, PeerQuery}; +use tari_core::base_node::state_machine_service::states::PeerMetadata; + +use super::{CommandContext, HandleCommand}; +use crate::{table::Table, utils::format_duration_basic}; + +/// Lists the peers that this node knows about +#[derive(Debug, Parser)] +pub struct Args { + filter: Option, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.list_peers(args.filter).await + } +} + +impl CommandContext { + pub async fn list_peers(&self, filter: Option) -> Result<(), Error> { + let mut query = PeerQuery::new(); + if let Some(f) = filter { + let filter = f.to_lowercase(); + query = query.select_where(move |p| match filter.as_str() { + "basenode" | "basenodes" | "base_node" | "base-node" | "bn" => { + p.features == PeerFeatures::COMMUNICATION_NODE + }, + "wallet" | "wallets" | "w" => p.features == PeerFeatures::COMMUNICATION_CLIENT, + _ => false, + }) + } + let peers = self.peer_manager.perform_query(query).await?; + let num_peers = peers.len(); + println!(); + let mut table = Table::new(); + table.set_titles(vec!["NodeId", "Public Key", "Role", "User Agent", "Info"]); + + for peer in peers { + let info_str = { + let mut s = vec![]; + + if peer.is_offline() { + if !peer.is_banned() { + s.push("OFFLINE".to_string()); + } + } else if let Some(dt) = peer.last_seen() { + s.push(format!( + "LAST_SEEN: {}", + Utc::now() + .naive_utc() + .signed_duration_since(dt) + .to_std() + .map(format_duration_basic) + .unwrap_or_else(|_| "?".into()) + )); + } + + if let Some(dt) = peer.banned_until() { + s.push(format!( + "BANNED({}, {})", + dt.signed_duration_since(Utc::now().naive_utc()) + .to_std() + .map(format_duration_basic) + .unwrap_or_else(|_| "∞".to_string()), + peer.banned_reason + )); + } + + if let Some(metadata) = peer + .get_metadata(1) + .and_then(|v| bincode::deserialize::(v).ok()) + { + s.push(format!("chain height: {}", metadata.metadata.height_of_longest_chain())); + } + + if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) { + s.push(format!("updated_at: {} (UTC)", updated_at)); + } + + if s.is_empty() { + "--".to_string() + } else { + s.join(", ") + } + }; + let ua = peer.user_agent; + table.add_row(row![ + peer.node_id, + peer.public_key, + { + if peer.features == PeerFeatures::COMMUNICATION_CLIENT { + "Wallet" + } else { + "Base node" + } + }, + { + if ua.is_empty() { + "" + } else { + ua.as_ref() + } + }, + info_str, + ]); + } + table.print_stdout(); + + println!("{} peer(s) known by this node", num_peers); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/list_reorgs.rs b/applications/tari_base_node/src/commands/command/list_reorgs.rs new file mode 100644 index 00000000000..ed53783ed9d --- /dev/null +++ b/applications/tari_base_node/src/commands/command/list_reorgs.rs @@ -0,0 +1,49 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_utilities::hex::Hex; + +use super::{CommandContext, HandleCommand}; +use crate::table::Table; + +/// List tracked reorgs +/// This feature must be enabled by +/// setting `track_reorgs = true` in +/// the [base_node] section of your config." +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.list_reorgs() + } +} + +impl CommandContext { + pub fn list_reorgs(&self) -> Result<(), Error> { + if !self.config.blockchain_track_reorgs { + // TODO: Return error/report + println!( + "Reorg tracking is turned off. Add `track_reorgs = true` to the [base_node] section of your config to \ + turn it on." + ); + } else { + let reorgs = self.blockchain_db.inner().fetch_all_reorgs()?; + let mut table = Table::new(); + table.set_titles(vec!["#", "New Tip", "Prev Tip", "Depth", "Timestamp"]); + + for (i, reorg) in reorgs.iter().enumerate() { + table.add_row(row![ + i + 1, + format!("#{} ({})", reorg.new_height, reorg.new_hash.to_hex()), + format!("#{} ({})", reorg.prev_height, reorg.prev_hash.to_hex()), + format!("{} added, {} removed", reorg.num_blocks_added, reorg.num_blocks_removed), + reorg.local_time + ]); + } + table.enable_row_count().print_stdout(); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/mod.rs b/applications/tari_base_node/src/commands/command/mod.rs new file mode 100644 index 00000000000..6ab9a0b493a --- /dev/null +++ b/applications/tari_base_node/src/commands/command/mod.rs @@ -0,0 +1,284 @@ +mod ban_peer; +mod block_timing; +mod check_db; +mod check_for_updates; +mod dial_peer; +mod discover_peer; +mod get_block; +mod get_chain_metadata; +mod get_db_stats; +mod get_mempool_state; +mod get_mempool_stats; +mod get_network_stats; +mod get_peer; +mod get_state_info; +mod header_stats; +mod list_banned_peers; +mod list_connections; +mod list_headers; +mod list_peers; +mod list_reorgs; +mod period_stats; +mod ping_peer; +mod quit; +mod reset_offline_peers; +mod rewind_blockchain; +mod search_kernel; +mod search_utxo; +mod status; +mod unban_all_peers; +mod version; +mod watch_command; +mod whoami; + +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use async_trait::async_trait; +use clap::{CommandFactory, FromArgMatches, Parser, Subcommand}; +use strum::{EnumVariantNames, VariantNames}; +use tari_common::GlobalConfig; +use tari_comms::{ + connectivity::ConnectivityRequester, + peer_manager::{Peer, PeerManager, PeerManagerError, PeerQuery}, + protocol::rpc::RpcServerHandle, + NodeIdentity, +}; +use tari_comms_dht::{DhtDiscoveryRequester, MetricsCollectorHandle}; +use tari_core::{ + base_node::{state_machine_service::states::StatusInfo, LocalNodeCommsInterface}, + blocks::ChainHeader, + chain_storage::{async_db::AsyncBlockchainDb, LMDBDatabase}, + consensus::ConsensusManager, + mempool::service::LocalMempoolService, +}; +use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle}; +use tari_shutdown::Shutdown; +use tokio::{sync::watch, time}; +use watch_command::WatchCommand; + +use crate::{ + builder::BaseNodeContext, + commands::{nom_parser::ParsedCommand, parser::FromHex}, +}; + +#[derive(Debug, Parser)] +pub struct Args { + #[clap(subcommand)] + pub command: Command, +} + +#[derive(Debug, Subcommand, EnumVariantNames)] +#[strum(serialize_all = "kebab-case")] +pub enum Command { + Version(version::Args), + CheckForUpdates(check_for_updates::Args), + Status(status::Args), + GetChainMetadata(get_chain_metadata::Args), + GetDbStats(get_db_stats::Args), + GetPeer(get_peer::Args), + ListPeers(list_peers::Args), + DialPeer(dial_peer::Args), + PingPeer(ping_peer::Args), + ResetOfflinePeers(reset_offline_peers::Args), + RewindBlockchain(rewind_blockchain::Args), + BanPeer(ban_peer::ArgsBan), + UnbanPeer(ban_peer::ArgsUnban), + UnbanAllPeers(unban_all_peers::Args), + ListBannedPeers(list_banned_peers::Args), + ListConnections(list_connections::Args), + ListHeaders(list_headers::Args), + CheckDb(check_db::Args), + PeriodStats(period_stats::Args), + HeaderStats(header_stats::Args), + BlockTiming(block_timing::Args), + CalcTiming(block_timing::Args), + ListReorgs(list_reorgs::Args), + DiscoverPeer(discover_peer::Args), + GetBlock(get_block::Args), + SearchUtxo(search_utxo::Args), + SearchKernel(search_kernel::Args), + GetMempoolStats(get_mempool_stats::Args), + GetMempoolState(get_mempool_state::Args), + GetMempoolTx(get_mempool_state::ArgsTx), + Whoami(whoami::Args), + GetStateInfo(get_state_info::Args), + GetNetworkStats(get_network_stats::Args), + Quit(quit::Args), + Exit(quit::Args), + Watch(watch_command::Args), +} + +impl Command { + pub fn variants() -> Vec { + Command::VARIANTS.iter().map(|s| s.to_string()).collect() + } +} + +#[async_trait] +pub trait HandleCommand { + async fn handle_command(&mut self, args: T) -> Result<(), Error>; +} + +pub struct CommandContext { + pub config: Arc, + consensus_rules: ConsensusManager, + blockchain_db: AsyncBlockchainDb, + discovery_service: DhtDiscoveryRequester, + dht_metrics_collector: MetricsCollectorHandle, + rpc_server: RpcServerHandle, + base_node_identity: Arc, + peer_manager: Arc, + connectivity: ConnectivityRequester, + liveness: LivenessHandle, + node_service: LocalNodeCommsInterface, + mempool_service: LocalMempoolService, + state_machine_info: watch::Receiver, + pub software_updater: SoftwareUpdaterHandle, + last_time_full: Instant, + pub shutdown: Shutdown, +} + +impl CommandContext { + pub fn new(ctx: &BaseNodeContext, shutdown: Shutdown) -> Self { + Self { + config: ctx.config(), + consensus_rules: ctx.consensus_rules().clone(), + blockchain_db: ctx.blockchain_db().into(), + discovery_service: ctx.base_node_dht().discovery_service_requester(), + dht_metrics_collector: ctx.base_node_dht().metrics_collector(), + rpc_server: ctx.rpc_server(), + base_node_identity: ctx.base_node_identity(), + peer_manager: ctx.base_node_comms().peer_manager(), + connectivity: ctx.base_node_comms().connectivity(), + liveness: ctx.liveness(), + node_service: ctx.local_node(), + mempool_service: ctx.local_mempool(), + state_machine_info: ctx.get_state_machine_info_channel(), + software_updater: ctx.software_updater(), + last_time_full: Instant::now(), + shutdown, + } + } + + pub async fn handle_command_str(&mut self, line: &str) -> Result, Error> { + let args: Args = line.parse()?; + if let Command::Watch(command) = args.command { + Ok(Some(command)) + } else { + let fut = self.handle_command(args.command); + time::timeout(Duration::from_secs(70), fut).await??; + Ok(None) + } + } +} + +impl FromStr for Args { + type Err = Error; + + fn from_str(line: &str) -> Result { + let args = ParsedCommand::parse(line)?; + let matches = Args::command().no_binary_name(true).try_get_matches_from(args)?; + let command = Args::from_arg_matches(&matches)?; + Ok(command) + } +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, command: Command) -> Result<(), Error> { + match command { + Command::Version(args) => self.handle_command(args).await, + Command::CheckForUpdates(args) => self.handle_command(args).await, + Command::Status(args) => self.handle_command(args).await, + Command::GetChainMetadata(args) => self.handle_command(args).await, + Command::GetDbStats(args) => self.handle_command(args).await, + Command::GetPeer(args) => self.handle_command(args).await, + Command::GetStateInfo(args) => self.handle_command(args).await, + Command::GetNetworkStats(args) => self.handle_command(args).await, + Command::ListPeers(args) => self.handle_command(args).await, + Command::DialPeer(args) => self.handle_command(args).await, + Command::PingPeer(args) => self.handle_command(args).await, + Command::BanPeer(args) => self.handle_command(args).await, + Command::UnbanPeer(args) => self.handle_command(args).await, + Command::ResetOfflinePeers(args) => self.handle_command(args).await, + Command::RewindBlockchain(args) => self.handle_command(args).await, + Command::UnbanAllPeers(args) => self.handle_command(args).await, + Command::ListHeaders(args) => self.handle_command(args).await, + Command::CheckDb(args) => self.handle_command(args).await, + Command::PeriodStats(args) => self.handle_command(args).await, + Command::HeaderStats(args) => self.handle_command(args).await, + Command::BlockTiming(args) | Command::CalcTiming(args) => self.handle_command(args).await, + Command::ListReorgs(args) => self.handle_command(args).await, + Command::DiscoverPeer(args) => self.handle_command(args).await, + Command::GetBlock(args) => self.handle_command(args).await, + Command::SearchUtxo(args) => self.handle_command(args).await, + Command::SearchKernel(args) => self.handle_command(args).await, + Command::ListConnections(args) => self.handle_command(args).await, + Command::GetMempoolStats(args) => self.handle_command(args).await, + Command::GetMempoolState(args) => self.handle_command(args).await, + Command::GetMempoolTx(args) => self.handle_command(args).await, + Command::Whoami(args) => self.handle_command(args).await, + Command::ListBannedPeers(args) => self.handle_command(args).await, + Command::Quit(args) | Command::Exit(args) => self.handle_command(args).await, + Command::Watch(args) => self.handle_command(args).await, + } + } +} + +impl CommandContext { + pub fn global_config(&self) -> Arc { + self.config.clone() + } + + async fn fetch_banned_peers(&self) -> Result, PeerManagerError> { + let pm = &self.peer_manager; + let query = PeerQuery::new().select_where(|p| p.is_banned()); + pm.perform_query(query).await + } + + /// Function to process the get-headers command + async fn get_chain_headers(&self, start: u64, end: Option) -> Result, Error> { + let blockchain_db = &self.blockchain_db; + match end { + Some(end) => blockchain_db.fetch_chain_headers(start..=end).await.map_err(Into::into), + None => { + let from_tip = start; + if from_tip == 0 { + return Ok(Vec::new()); + } + let tip = blockchain_db.fetch_tip_header().await?.height(); + blockchain_db + .fetch_chain_headers(tip.saturating_sub(from_tip - 1)..=tip) + .await + .map_err(Into::into) + }, + } + } +} + +#[derive(Debug)] +pub enum TypeOrHex { + Type(T), + Hex(FromHex>), +} + +impl FromStr for TypeOrHex +where + T: FromStr, + Error: From, +{ + type Err = Error; + + fn from_str(s: &str) -> Result { + if let Ok(hex) = FromHex::from_str(s) { + Ok(Self::Hex(hex)) + } else { + T::from_str(s).map(Self::Type).map_err(Error::from) + } + } +} diff --git a/applications/tari_base_node/src/commands/command/period_stats.rs b/applications/tari_base_node/src/commands/command/period_stats.rs new file mode 100644 index 00000000000..d300f84dc16 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/period_stats.rs @@ -0,0 +1,110 @@ +use anyhow::{anyhow, Error}; +use async_trait::async_trait; +use clap::Parser; +use tokio::io::{self, AsyncWriteExt}; + +use super::{CommandContext, HandleCommand}; + +/// Prints out certain aggregated stats to +/// of the block chain in csv format for +/// easy copy. +#[derive(Debug, Parser)] +pub struct Args { + /// start time in unix timestamp + period_end: u64, + /// end time in unix timestamp + period_ticker_end: u64, + /// interval period time in unix timestamp + period: u64, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.period_stats(args.period_end, args.period_ticker_end, args.period) + .await + } +} + +impl CommandContext { + #[allow(deprecated)] + pub async fn period_stats( + &mut self, + period_end: u64, + mut period_ticker_end: u64, + period: u64, + ) -> Result<(), Error> { + let meta = self.node_service.get_metadata().await?; + + let mut height = meta.height_of_longest_chain(); + // Currently gets the stats for: tx count, hash rate estimation, target difficulty, solvetime. + let mut results: Vec<(usize, f64, u64, u64, usize)> = Vec::new(); + + let mut period_ticker_start = period_ticker_end - period; + let mut period_tx_count = 0; + let mut period_block_count = 0; + let mut period_hash = 0.0; + let mut period_difficulty = 0; + let mut period_solvetime = 0; + print!("Searching for height: "); + while height > 0 { + print!("{}", height); + io::stdout().flush().await?; + + let block = self + .node_service + .get_block(height) + .await? + .ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?; + + let prev_block = self + .node_service + .get_block(height - 1) + .await? + .ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?; + + height -= 1; + if block.header().timestamp.as_u64() > period_ticker_end { + print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); + continue; + }; + while block.header().timestamp.as_u64() < period_ticker_start { + results.push(( + period_tx_count, + period_hash, + period_difficulty, + period_solvetime, + period_block_count, + )); + period_tx_count = 0; + period_block_count = 0; + period_hash = 0.0; + period_difficulty = 0; + period_solvetime = 0; + period_ticker_end -= period; + period_ticker_start -= period; + } + period_tx_count += block.block().body.kernels().len() - 1; + period_block_count += 1; + let st = if prev_block.header().timestamp.as_u64() >= block.header().timestamp.as_u64() { + 1.0 + } else { + (block.header().timestamp.as_u64() - prev_block.header().timestamp.as_u64()) as f64 + }; + let diff = block.accumulated_data.target_difficulty.as_u64(); + period_difficulty += diff; + period_solvetime += st as u64; + period_hash += diff as f64 / st / 1_000_000.0; + if period_ticker_end <= period_end { + break; + } + print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); + } + println!("Complete"); + println!("Results of tx count, hash rate estimation, target difficulty, solvetime, block count"); + for data in results { + println!("{},{},{},{},{}", data.0, data.1, data.2, data.3, data.4); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/ping_peer.rs b/applications/tari_base_node/src/commands/command/ping_peer.rs new file mode 100644 index 00000000000..4a43f8798b5 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/ping_peer.rs @@ -0,0 +1,53 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::utilities::UniNodeId; +use tari_comms::peer_manager::NodeId; +use tari_p2p::services::liveness::LivenessEvent; +use tokio::sync::broadcast; + +use super::{CommandContext, HandleCommand}; + +/// Send a ping to a known peer and wait for a pong reply +#[derive(Debug, Parser)] +pub struct Args { + /// hex public key or emoji id + node_id: UniNodeId, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.ping_peer(args.node_id.into()).await + } +} + +impl CommandContext { + /// Function to process the dial-peer command + pub async fn ping_peer(&mut self, dest_node_id: NodeId) -> Result<(), Error> { + println!("🏓 Pinging peer..."); + let mut liveness_events = self.liveness.get_event_stream(); + + self.liveness.send_ping(dest_node_id.clone()).await?; + loop { + match liveness_events.recv().await { + Ok(event) => { + if let LivenessEvent::ReceivedPong(pong) = &*event { + if pong.node_id == dest_node_id { + println!( + "🏓️ Pong received, latency in is {:.2?}!", + pong.latency.unwrap_or_default() + ); + break; + } + } + }, + Err(broadcast::error::RecvError::Closed) => { + break; + }, + _ => {}, + } + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/quit.rs b/applications/tari_base_node/src/commands/command/quit.rs new file mode 100644 index 00000000000..ce238832b13 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/quit.rs @@ -0,0 +1,22 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; +use crate::LOG_TARGET; + +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _args: Args) -> Result<(), Error> { + println!("Shutting down..."); + log::info!( + target: LOG_TARGET, + "Termination signal received from user. Shutting node down." + ); + self.shutdown.trigger(); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/reset_offline_peers.rs b/applications/tari_base_node/src/commands/command/reset_offline_peers.rs new file mode 100644 index 00000000000..707fda20cc7 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/reset_offline_peers.rs @@ -0,0 +1,35 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +/// Clear offline flag from all peers +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.reset_offline_peers().await + } +} + +impl CommandContext { + pub async fn reset_offline_peers(&self) -> Result<(), Error> { + let num_updated = self + .peer_manager + .update_each(|mut peer| { + if peer.is_offline() { + peer.set_offline(false); + Some(peer) + } else { + None + } + }) + .await?; + + println!("{} peer(s) were unmarked as offline.", num_updated); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/rewind_blockchain.rs b/applications/tari_base_node/src/commands/command/rewind_blockchain.rs new file mode 100644 index 00000000000..c695586c351 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/rewind_blockchain.rs @@ -0,0 +1,31 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_core::base_node::comms_interface::BlockEvent; + +use super::{CommandContext, HandleCommand}; + +/// Rewinds the blockchain to the given height +#[derive(Debug, Parser)] +pub struct Args { + /// new_height must be less than the current height + new_height: u64, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.rewind_blockchain(args.new_height).await + } +} + +impl CommandContext { + pub async fn rewind_blockchain(&self, new_height: u64) -> Result<(), Error> { + let blocks = self.blockchain_db.rewind_to_height(new_height).await?; + if !blocks.is_empty() { + self.node_service + .publish_block_event(BlockEvent::BlockSyncRewind(blocks)); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/search_kernel.rs b/applications/tari_base_node/src/commands/command/search_kernel.rs new file mode 100644 index 00000000000..2a55a2ee295 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/search_kernel.rs @@ -0,0 +1,44 @@ +use anyhow::{anyhow, Error}; +use async_trait::async_trait; +use clap::Parser; +use tari_common_types::types::{PrivateKey, PublicKey, Signature}; +use tari_utilities::hex::Hex; + +use super::{CommandContext, HandleCommand}; +use crate::commands::parser::FromHex; + +/// This will search the main chain for the kernel. +/// If the kernel is found, it will print out the +/// block it was found in. +/// This searches for the kernel via the +/// excess signature +#[derive(Debug, Parser)] +pub struct Args { + /// hex of nonce + public_nonce: FromHex, + /// hex of signature + signature: FromHex, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + let kernel_sig = Signature::new(args.public_nonce.0, args.signature.0); + self.search_kernel(kernel_sig).await + } +} + +impl CommandContext { + /// Function to process the search kernel command + pub async fn search_kernel(&mut self, excess_sig: Signature) -> Result<(), Error> { + let hex_sig = excess_sig.get_signature().to_hex(); + let v = self + .node_service + .get_blocks_with_kernels(vec![excess_sig]) + .await? + .pop() + .ok_or_else(|| anyhow!("No kernel with signature {} found", hex_sig))?; + println!("{}", v); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/search_utxo.rs b/applications/tari_base_node/src/commands/command/search_utxo.rs new file mode 100644 index 00000000000..f4f3df7162a --- /dev/null +++ b/applications/tari_base_node/src/commands/command/search_utxo.rs @@ -0,0 +1,38 @@ +use anyhow::{anyhow, Error}; +use async_trait::async_trait; +use clap::Parser; +use tari_common_types::types::Commitment; +use tari_utilities::hex::Hex; + +use super::{CommandContext, HandleCommand}; +use crate::commands::parser::FromHex; + +/// This will search the main chain for the utxo. +/// If the utxo is found, it will print out +/// the block it was found in. +#[derive(Debug, Parser)] +pub struct Args { + /// hex of commitment of the utxo + commitment: FromHex, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.search_utxo(args.commitment.0).await + } +} + +impl CommandContext { + /// Function to process the search utxo command + pub async fn search_utxo(&mut self, commitment: Commitment) -> Result<(), Error> { + let v = self + .node_service + .fetch_blocks_with_utxos(vec![commitment.clone()]) + .await? + .pop() + .ok_or_else(|| anyhow!("Block not found for utxo commitment {}", commitment.to_hex()))?; + println!("{}", v.block()); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/status.rs b/applications/tari_base_node/src/commands/command/status.rs new file mode 100644 index 00000000000..a8d659c0cca --- /dev/null +++ b/applications/tari_base_node/src/commands/command/status.rs @@ -0,0 +1,119 @@ +use std::time::{Duration, Instant}; + +use anyhow::{anyhow, Error}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use clap::Parser; +use tari_app_utilities::consts; + +use super::{CommandContext, HandleCommand}; +use crate::commands::status_line::{StatusLine, StatusLineOutput}; + +/// Prints out the status of this node +#[derive(Debug, Parser)] +pub struct Args { + #[clap(default_value_t = StatusLineOutput::StdOutAndLog)] + output: StatusLineOutput, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.status(args.output).await + } +} + +impl CommandContext { + pub async fn status(&mut self, output: StatusLineOutput) -> Result<(), Error> { + let mut full_log = false; + if self.last_time_full.elapsed() > Duration::from_secs(120) { + self.last_time_full = Instant::now(); + full_log = true; + } + + let mut status_line = StatusLine::new(); + status_line.add_field("", format!("v{}", consts::APP_VERSION_NUMBER)); + status_line.add_field("", self.config.network); + status_line.add_field("State", self.state_machine_info.borrow().state_info.short_desc()); + + let metadata = self.node_service.get_metadata().await?; + let height = metadata.height_of_longest_chain(); + let last_header = self + .node_service + .get_header(height) + .await? + .ok_or_else(|| anyhow!("No last header"))?; + let last_block_time = DateTime::::from(last_header.header().timestamp); + status_line.add_field( + "Tip", + format!( + "{} ({})", + metadata.height_of_longest_chain(), + last_block_time.to_rfc2822() + ), + ); + + let constants = self + .consensus_rules + .consensus_constants(metadata.height_of_longest_chain()); + let mempool_stats = self.mempool_service.get_mempool_stats().await?; + status_line.add_field( + "Mempool", + format!( + "{}tx ({}g, +/- {}blks)", + mempool_stats.unconfirmed_txs, + mempool_stats.total_weight, + if mempool_stats.total_weight == 0 { + 0 + } else { + 1 + mempool_stats.total_weight / constants.get_max_block_transaction_weight() + }, + ), + ); + + let conns = self.connectivity.get_active_connections().await?; + status_line.add_field("Connections", conns.len()); + let banned_peers = self.fetch_banned_peers().await?; + status_line.add_field("Banned", banned_peers.len()); + + let num_messages = self + .dht_metrics_collector + .get_total_message_count_in_timespan(Duration::from_secs(60)) + .await?; + status_line.add_field("Messages (last 60s)", num_messages); + + let num_active_rpc_sessions = self.rpc_server.get_num_active_sessions().await?; + status_line.add_field( + "Rpc", + format!( + "{}/{}", + num_active_rpc_sessions, + self.config + .comms_rpc_max_simultaneous_sessions + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "∞".to_string()), + ), + ); + if full_log { + status_line.add_field( + "RandomX", + format!( + "#{} with flags {:?}", + self.state_machine_info.borrow().randomx_vm_cnt, + self.state_machine_info.borrow().randomx_vm_flags + ), + ); + } + + let target = "base_node::app::status"; + match output { + StatusLineOutput::StdOutAndLog => { + println!("{}", status_line); + log::info!(target: target, "{}", status_line); + }, + StatusLineOutput::Log => log::info!(target: target, "{}", status_line), + }; + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/unban_all_peers.rs b/applications/tari_base_node/src/commands/command/unban_all_peers.rs new file mode 100644 index 00000000000..52694abe91a --- /dev/null +++ b/applications/tari_base_node/src/commands/command/unban_all_peers.rs @@ -0,0 +1,32 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_comms::peer_manager::PeerQuery; + +use super::{CommandContext, HandleCommand}; + +/// Unbans all peers +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.unban_all_peers().await + } +} + +impl CommandContext { + pub async fn unban_all_peers(&self) -> Result<(), Error> { + let query = PeerQuery::new().select_where(|p| p.is_banned()); + let peers = self.peer_manager.perform_query(query).await?; + let num_peers = peers.len(); + for peer in peers { + if let Err(err) = self.peer_manager.unban_peer(&peer.node_id).await { + println!("Failed to unban peer: {}", err); + } + } + println!("Unbanned {} peer(s) from node", num_peers); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/version.rs b/applications/tari_base_node/src/commands/command/version.rs new file mode 100644 index 00000000000..edad56260e1 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/version.rs @@ -0,0 +1,40 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; +use tari_app_utilities::consts; + +use super::{CommandContext, HandleCommand}; + +/// Gets the current application version +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.print_version() + } +} + +impl CommandContext { + /// Function process the version command + pub fn print_version(&self) -> Result<(), Error> { + println!("Version: {}", consts::APP_VERSION); + println!("Author: {}", consts::APP_AUTHOR); + println!("Avx2: {}", match cfg!(feature = "avx2") { + true => "enabled", + false => "disabled", + }); + + if let Some(ref update) = *self.software_updater.new_update_notifier().borrow() { + println!( + "Version {} of the {} is available: {} (sha: {})", + update.version(), + update.app(), + update.download_url(), + update.to_hash_hex() + ); + } + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/watch_command.rs b/applications/tari_base_node/src/commands/command/watch_command.rs new file mode 100644 index 00000000000..6435cccdc27 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/watch_command.rs @@ -0,0 +1,38 @@ +use std::fmt; + +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +pub type WatchCommand = Args; + +impl fmt::Display for WatchCommand { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + self.line().fmt(f) + } +} + +/// Repeat a command within an interval. +#[derive(Debug, Parser)] +pub struct Args { + /// Interval in seconds + #[clap(short, long)] + pub interval: Option, + /// The command to perform. `status` if empty. + pub command: Option, +} + +impl Args { + pub fn line(&self) -> &str { + self.command.as_ref().map(String::as_ref).unwrap_or("status") + } +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command/whoami.rs b/applications/tari_base_node/src/commands/command/whoami.rs new file mode 100644 index 00000000000..468141a98c6 --- /dev/null +++ b/applications/tari_base_node/src/commands/command/whoami.rs @@ -0,0 +1,25 @@ +use anyhow::Error; +use async_trait::async_trait; +use clap::Parser; + +use super::{CommandContext, HandleCommand}; + +/// Display identity information about this node, +/// including: public key, node ID and the public address +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.whoami() + } +} + +impl CommandContext { + /// Function to process the whoami command + pub fn whoami(&self) -> Result<(), Error> { + println!("{}", self.base_node_identity); + Ok(()) + } +} diff --git a/applications/tari_base_node/src/commands/command_handler.rs b/applications/tari_base_node/src/commands/command_handler.rs deleted file mode 100644 index c216d367f86..00000000000 --- a/applications/tari_base_node/src/commands/command_handler.rs +++ /dev/null @@ -1,1212 +0,0 @@ -// Copyright 2020. The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -use std::{ - cmp, - io::{self, Write}, - ops::Deref, - str::FromStr, - string::ToString, - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::{anyhow, Error}; -use chrono::{DateTime, Utc}; -use log::*; -use tari_app_utilities::{consts, utilities::parse_emoji_id_or_public_key}; -use tari_common::GlobalConfig; -use tari_common_types::{ - emoji::EmojiId, - types::{Commitment, HashOutput, Signature}, -}; -use tari_comms::{ - connectivity::ConnectivityRequester, - peer_manager::{NodeId, Peer, PeerFeatures, PeerManager, PeerManagerError, PeerQuery}, - protocol::rpc::RpcServerHandle, - NodeIdentity, -}; -use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester, MetricsCollectorHandle}; -use tari_core::{ - base_node::{ - comms_interface::BlockEvent, - state_machine_service::states::{PeerMetadata, StatusInfo}, - LocalNodeCommsInterface, - }, - blocks::{BlockHeader, ChainHeader}, - chain_storage::{async_db::AsyncBlockchainDb, LMDBDatabase}, - consensus::ConsensusManager, - mempool::service::LocalMempoolService, - proof_of_work::PowAlgorithm, -}; -use tari_crypto::ristretto::RistrettoPublicKey; -use tari_p2p::{ - auto_update::SoftwareUpdaterHandle, - services::liveness::{LivenessEvent, LivenessHandle}, -}; -use tari_utilities::{hex::Hex, message_format::MessageFormat, Hashable}; -use thiserror::Error; -use tokio::{ - fs::File, - io::AsyncWriteExt, - sync::{broadcast, watch}, -}; - -use super::status_line::StatusLine; -use crate::{builder::BaseNodeContext, table::Table, utils::format_duration_basic, LOG_TARGET}; - -pub enum StatusLineOutput { - Log, - StdOutAndLog, -} - -pub struct CommandHandler { - config: Arc, - consensus_rules: ConsensusManager, - blockchain_db: AsyncBlockchainDb, - discovery_service: DhtDiscoveryRequester, - dht_metrics_collector: MetricsCollectorHandle, - rpc_server: RpcServerHandle, - base_node_identity: Arc, - peer_manager: Arc, - connectivity: ConnectivityRequester, - liveness: LivenessHandle, - node_service: LocalNodeCommsInterface, - mempool_service: LocalMempoolService, - state_machine_info: watch::Receiver, - software_updater: SoftwareUpdaterHandle, - last_time_full: Instant, -} - -impl CommandHandler { - pub fn new(ctx: &BaseNodeContext) -> Self { - Self { - config: ctx.config(), - consensus_rules: ctx.consensus_rules().clone(), - blockchain_db: ctx.blockchain_db().into(), - discovery_service: ctx.base_node_dht().discovery_service_requester(), - dht_metrics_collector: ctx.base_node_dht().metrics_collector(), - rpc_server: ctx.rpc_server(), - base_node_identity: ctx.base_node_identity(), - peer_manager: ctx.base_node_comms().peer_manager(), - connectivity: ctx.base_node_comms().connectivity(), - liveness: ctx.liveness(), - node_service: ctx.local_node(), - mempool_service: ctx.local_mempool(), - state_machine_info: ctx.get_state_machine_info_channel(), - software_updater: ctx.software_updater(), - last_time_full: Instant::now(), - } - } - - pub fn global_config(&self) -> Arc { - self.config.clone() - } - - pub async fn status(&mut self, output: StatusLineOutput) -> Result<(), Error> { - let mut full_log = false; - if self.last_time_full.elapsed() > Duration::from_secs(120) { - self.last_time_full = Instant::now(); - full_log = true; - } - - let mut status_line = StatusLine::new(); - status_line.add_field("", format!("v{}", consts::APP_VERSION_NUMBER)); - status_line.add_field("", self.config.network); - status_line.add_field("State", self.state_machine_info.borrow().state_info.short_desc()); - - let metadata = self.node_service.get_metadata().await?; - let height = metadata.height_of_longest_chain(); - let last_header = self - .node_service - .get_header(height) - .await? - .ok_or_else(|| anyhow!("No last header"))?; - let last_block_time = DateTime::::from(last_header.header().timestamp); - status_line.add_field( - "Tip", - format!( - "{} ({})", - metadata.height_of_longest_chain(), - last_block_time.to_rfc2822() - ), - ); - - let constants = self - .consensus_rules - .consensus_constants(metadata.height_of_longest_chain()); - let mempool_stats = self.mempool_service.get_mempool_stats().await?; - status_line.add_field( - "Mempool", - format!( - "{}tx ({}g, +/- {}blks)", - mempool_stats.unconfirmed_txs, - mempool_stats.total_weight, - if mempool_stats.total_weight == 0 { - 0 - } else { - 1 + mempool_stats.total_weight / constants.get_max_block_transaction_weight() - }, - ), - ); - - let conns = self.connectivity.get_active_connections().await?; - status_line.add_field("Connections", conns.len()); - let banned_peers = fetch_banned_peers(&self.peer_manager).await?; - status_line.add_field("Banned", banned_peers.len()); - - let num_messages = self - .dht_metrics_collector - .get_total_message_count_in_timespan(Duration::from_secs(60)) - .await?; - status_line.add_field("Messages (last 60s)", num_messages); - - let num_active_rpc_sessions = self.rpc_server.get_num_active_sessions().await?; - status_line.add_field( - "Rpc", - format!( - "{}/{}", - num_active_rpc_sessions, - self.config - .comms_rpc_max_simultaneous_sessions - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "∞".to_string()), - ), - ); - if full_log { - status_line.add_field( - "RandomX", - format!( - "#{} with flags {:?}", - self.state_machine_info.borrow().randomx_vm_cnt, - self.state_machine_info.borrow().randomx_vm_flags - ), - ); - } - - let target = "base_node::app::status"; - match output { - StatusLineOutput::StdOutAndLog => { - println!("{}", status_line); - info!(target: target, "{}", status_line); - }, - StatusLineOutput::Log => info!(target: target, "{}", status_line), - }; - Ok(()) - } - - /// Function to process the get-state-info command - pub fn state_info(&self) -> Result<(), Error> { - println!("Current state machine state:\n{}", *self.state_machine_info.borrow()); - Ok(()) - } - - /// Check for updates - pub async fn check_for_updates(&mut self) -> Result<(), Error> { - println!("Checking for updates (current version: {})...", consts::APP_VERSION); - match self.software_updater.check_for_updates().await { - Some(update) => { - println!( - "Version {} of the {} is available: {} (sha: {})", - update.version(), - update.app(), - update.download_url(), - update.to_hash_hex() - ); - }, - None => { - println!("No updates found.",); - }, - } - Ok(()) - } - - /// Function process the version command - pub fn print_version(&self) -> Result<(), Error> { - println!("Version: {}", consts::APP_VERSION); - println!("Author: {}", consts::APP_AUTHOR); - println!("Avx2: {}", match cfg!(feature = "avx2") { - true => "enabled", - false => "disabled", - }); - - if let Some(ref update) = *self.software_updater.new_update_notifier().borrow() { - println!( - "Version {} of the {} is available: {} (sha: {})", - update.version(), - update.app(), - update.download_url(), - update.to_hash_hex() - ); - } - Ok(()) - } - - pub async fn get_chain_meta(&mut self) -> Result<(), Error> { - let data = self.node_service.get_metadata().await?; - println!("{}", data); - Ok(()) - } - - pub async fn get_block(&self, height: u64, format: Format) -> Result<(), Error> { - let mut data = self.blockchain_db.fetch_blocks(height..=height).await?; - match (data.pop(), format) { - (Some(block), Format::Text) => { - let block_data = self - .blockchain_db - .fetch_block_accumulated_data(block.hash().clone()) - .await?; - - println!("{}", block); - println!("-- Accumulated data --"); - println!("{}", block_data); - }, - (Some(block), Format::Json) => println!("{}", block.to_json()?), - (None, _) => println!("Block not found at height {}", height), - } - Ok(()) - } - - pub async fn get_block_by_hash(&self, hash: HashOutput, format: Format) -> Result<(), Error> { - let data = self.blockchain_db.fetch_block_by_hash(hash).await?; - match (data, format) { - (Some(block), Format::Text) => println!("{}", block), - (Some(block), Format::Json) => println!("{}", block.to_json()?), - (None, _) => println!("Block not found"), - } - Ok(()) - } - - pub async fn search_utxo(&mut self, commitment: Commitment) -> Result<(), Error> { - let v = self - .node_service - .fetch_blocks_with_utxos(vec![commitment.clone()]) - .await? - .pop() - .ok_or_else(|| anyhow!("Block not found for utxo commitment {}", commitment.to_hex()))?; - println!("{}", v.block()); - Ok(()) - } - - pub async fn search_kernel(&mut self, excess_sig: Signature) -> Result<(), Error> { - let hex_sig = excess_sig.get_signature().to_hex(); - let v = self - .node_service - .get_blocks_with_kernels(vec![excess_sig]) - .await? - .pop() - .ok_or_else(|| anyhow!("No kernel with signature {} found", hex_sig))?; - println!("{}", v); - Ok(()) - } - - /// Function to process the get-mempool-stats command - pub async fn get_mempool_stats(&mut self) -> Result<(), Error> { - let stats = self.mempool_service.get_mempool_stats().await?; - println!("{}", stats); - Ok(()) - } - - /// Function to process the get-mempool-state command - pub async fn get_mempool_state(&mut self, filter: Option) -> Result<(), Error> { - let state = self.mempool_service.get_mempool_state().await?; - println!("----------------- Mempool -----------------"); - println!("--- Unconfirmed Pool ---"); - for tx in &state.unconfirmed_pool { - let tx_sig = tx - .first_kernel_excess_sig() - .map(|sig| sig.get_signature().to_hex()) - .unwrap_or_else(|| "N/A".to_string()); - if let Some(ref filter) = filter { - if !tx_sig.contains(filter) { - println!("--- TX: {} ---", tx_sig); - println!("{}", tx.body); - continue; - } - } else { - println!( - " {} Fee: {}, Outputs: {}, Kernels: {}, Inputs: {}, metadata: {} bytes", - tx_sig, - tx.body.get_total_fee(), - tx.body.outputs().len(), - tx.body.kernels().len(), - tx.body.inputs().len(), - tx.body.sum_metadata_size(), - ); - } - } - if filter.is_none() { - println!("--- Reorg Pool ---"); - for excess_sig in &state.reorg_pool { - println!(" {}", excess_sig.get_signature().to_hex()); - } - } - Ok(()) - } - - pub async fn discover_peer(&mut self, dest_pubkey: Box) -> Result<(), Error> { - let start = Instant::now(); - println!("🌎 Peer discovery started."); - let peer = self - .discovery_service - .discover_peer(dest_pubkey.deref().clone(), NodeDestination::PublicKey(dest_pubkey)) - .await?; - println!("⚡️ Discovery succeeded in {}ms!", start.elapsed().as_millis()); - println!("This peer was found:"); - println!("{}", peer); - Ok(()) - } - - pub async fn get_peer(&self, partial: Vec, original_str: String) { - let peer = match self.peer_manager.find_all_starts_with(&partial).await { - Ok(peers) if peers.is_empty() => { - if let Some(pk) = parse_emoji_id_or_public_key(&original_str) { - if let Ok(Some(peer)) = self.peer_manager.find_by_public_key(&pk).await { - peer - } else { - println!("No peer matching '{}'", original_str); - return; - } - } else { - println!("No peer matching '{}'", original_str); - return; - } - }, - Ok(mut peers) => peers.remove(0), - Err(err) => { - println!("{}", err); - return; - }, - }; - - let eid = EmojiId::from_pubkey(&peer.public_key); - println!("Emoji ID: {}", eid); - println!("Public Key: {}", peer.public_key); - println!("NodeId: {}", peer.node_id); - println!("Addresses:"); - peer.addresses.iter().for_each(|a| { - println!("- {}", a); - }); - println!("User agent: {}", peer.user_agent); - println!("Features: {:?}", peer.features); - println!("Supported protocols:"); - peer.supported_protocols.iter().for_each(|p| { - println!("- {}", String::from_utf8_lossy(p)); - }); - if let Some(dt) = peer.banned_until() { - println!("Banned until {}, reason: {}", dt, peer.banned_reason); - } - if let Some(dt) = peer.last_seen() { - println!("Last seen: {}", dt); - } - if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) { - println!("Last updated: {} (UTC)", updated_at); - } - } - - pub async fn list_peers(&self, filter: Option) -> Result<(), Error> { - let mut query = PeerQuery::new(); - if let Some(f) = filter { - let filter = f.to_lowercase(); - query = query.select_where(move |p| match filter.as_str() { - "basenode" | "basenodes" | "base_node" | "base-node" | "bn" => { - p.features == PeerFeatures::COMMUNICATION_NODE - }, - "wallet" | "wallets" | "w" => p.features == PeerFeatures::COMMUNICATION_CLIENT, - _ => false, - }) - } - let peers = self.peer_manager.perform_query(query).await?; - let num_peers = peers.len(); - println!(); - let mut table = Table::new(); - table.set_titles(vec!["NodeId", "Public Key", "Role", "User Agent", "Info"]); - - for peer in peers { - let info_str = { - let mut s = vec![]; - - if peer.is_offline() { - if !peer.is_banned() { - s.push("OFFLINE".to_string()); - } - } else if let Some(dt) = peer.last_seen() { - s.push(format!( - "LAST_SEEN: {}", - Utc::now() - .naive_utc() - .signed_duration_since(dt) - .to_std() - .map(format_duration_basic) - .unwrap_or_else(|_| "?".into()) - )); - } - - if let Some(dt) = peer.banned_until() { - s.push(format!( - "BANNED({}, {})", - dt.signed_duration_since(Utc::now().naive_utc()) - .to_std() - .map(format_duration_basic) - .unwrap_or_else(|_| "∞".to_string()), - peer.banned_reason - )); - } - - if let Some(metadata) = peer - .get_metadata(1) - .and_then(|v| bincode::deserialize::(v).ok()) - { - s.push(format!("chain height: {}", metadata.metadata.height_of_longest_chain())); - } - - if let Some(updated_at) = peer.identity_signature.map(|i| i.updated_at()) { - s.push(format!("updated_at: {} (UTC)", updated_at)); - } - - if s.is_empty() { - "--".to_string() - } else { - s.join(", ") - } - }; - let ua = peer.user_agent; - table.add_row(row![ - peer.node_id, - peer.public_key, - { - if peer.features == PeerFeatures::COMMUNICATION_CLIENT { - "Wallet" - } else { - "Base node" - } - }, - { - if ua.is_empty() { - "" - } else { - ua.as_ref() - } - }, - info_str, - ]); - } - table.print_stdout(); - - println!("{} peer(s) known by this node", num_peers); - Ok(()) - } - - pub async fn dial_peer(&self, dest_node_id: NodeId) -> Result<(), Error> { - let start = Instant::now(); - println!("☎️ Dialing peer..."); - - let connection = self.connectivity.dial_peer(dest_node_id).await?; - println!("⚡️ Peer connected in {}ms!", start.elapsed().as_millis()); - println!("Connection: {}", connection); - Ok(()) - } - - pub async fn ping_peer(&mut self, dest_node_id: NodeId) -> Result<(), Error> { - println!("🏓 Pinging peer..."); - let mut liveness_events = self.liveness.get_event_stream(); - - self.liveness.send_ping(dest_node_id.clone()).await?; - loop { - match liveness_events.recv().await { - Ok(event) => { - if let LivenessEvent::ReceivedPong(pong) = &*event { - if pong.node_id == dest_node_id { - println!( - "🏓️ Pong received, latency in is {:.2?}!", - pong.latency.unwrap_or_default() - ); - break; - } - } - }, - Err(broadcast::error::RecvError::Closed) => { - break; - }, - _ => {}, - } - } - Ok(()) - } - - pub async fn ban_peer(&mut self, node_id: NodeId, duration: Duration, must_ban: bool) { - if self.base_node_identity.node_id() == &node_id { - println!("Cannot ban our own node"); - return; - } - - if must_ban { - match self - .connectivity - .ban_peer_until(node_id.clone(), duration, "UI manual ban".to_string()) - .await - { - Ok(_) => println!("Peer was banned in base node."), - Err(err) => { - println!("Failed to ban peer: {:?}", err); - error!(target: LOG_TARGET, "Could not ban peer: {:?}", err); - }, - } - } else { - match self.peer_manager.unban_peer(&node_id).await { - Ok(_) => { - println!("Peer ban was removed from base node."); - }, - Err(err) if err.is_peer_not_found() => { - println!("Peer not found in base node"); - }, - Err(err) => { - println!("Failed to ban peer: {:?}", err); - error!(target: LOG_TARGET, "Could not ban peer: {:?}", err); - }, - } - } - } - - pub async fn unban_all_peers(&self) -> Result<(), Error> { - let query = PeerQuery::new().select_where(|p| p.is_banned()); - let peers = self.peer_manager.perform_query(query).await?; - let num_peers = peers.len(); - for peer in peers { - if let Err(err) = self.peer_manager.unban_peer(&peer.node_id).await { - println!("Failed to unban peer: {}", err); - } - } - println!("Unbanned {} peer(s) from node", num_peers); - Ok(()) - } - - pub async fn list_banned_peers(&self) -> Result<(), Error> { - let banned = fetch_banned_peers(&self.peer_manager).await?; - if banned.is_empty() { - println!("No peers banned from node.") - } else { - println!("Peers banned from node ({}):", banned.len()); - for peer in banned { - println!("{}", peer); - } - } - Ok(()) - } - - /// Function to process the list-connections command - pub async fn list_connections(&mut self) -> Result<(), Error> { - let conns = self.connectivity.get_active_connections().await?; - if conns.is_empty() { - println!("No active peer connections."); - } else { - println!(); - let num_connections = conns.len(); - let mut table = Table::new(); - table.set_titles(vec![ - "NodeId", - "Public Key", - "Address", - "Direction", - "Age", - "Role", - "User Agent", - "Info", - ]); - for conn in conns { - let peer = self - .peer_manager - .find_by_node_id(conn.peer_node_id()) - .await - .expect("Unexpected peer database error") - .expect("Peer not found"); - - let chain_height = peer - .get_metadata(1) - .and_then(|v| bincode::deserialize::(v).ok()) - .map(|metadata| format!("height: {}", metadata.metadata.height_of_longest_chain())); - - let ua = peer.user_agent; - table.add_row(row![ - peer.node_id, - peer.public_key, - conn.address(), - conn.direction(), - format_duration_basic(conn.age()), - { - if peer.features == PeerFeatures::COMMUNICATION_CLIENT { - "Wallet" - } else { - "Base node" - } - }, - { - if ua.is_empty() { - "" - } else { - ua.as_ref() - } - }, - format!( - "substreams: {}{}", - conn.substream_count(), - chain_height.map(|s| format!(", {}", s)).unwrap_or_default() - ), - ]); - } - - table.print_stdout(); - - println!("{} active connection(s)", num_connections); - } - Ok(()) - } - - pub async fn reset_offline_peers(&self) -> Result<(), Error> { - let num_updated = self - .peer_manager - .update_each(|mut peer| { - if peer.is_offline() { - peer.set_offline(false); - Some(peer) - } else { - None - } - }) - .await?; - - println!("{} peer(s) were unmarked as offline.", num_updated); - Ok(()) - } - - pub async fn list_headers(&self, start: u64, end: Option) { - let headers = match Self::get_chain_headers(&self.blockchain_db, start, end).await { - Ok(h) if h.is_empty() => { - println!("No headers found"); - return; - }, - Ok(h) => h, - Err(err) => { - println!("Failed to retrieve headers: {:?}", err); - warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,); - return; - }, - }; - - for header in headers { - println!("\n\nHeader hash: {}", header.hash().to_hex()); - println!("{}", header); - } - } - - /// Function to process the get-headers command - async fn get_chain_headers( - blockchain_db: &AsyncBlockchainDb, - start: u64, - end: Option, - ) -> Result, anyhow::Error> { - match end { - Some(end) => blockchain_db.fetch_chain_headers(start..=end).await.map_err(Into::into), - None => { - let from_tip = start; - if from_tip == 0 { - return Ok(Vec::new()); - } - let tip = blockchain_db.fetch_tip_header().await?.height(); - blockchain_db - .fetch_chain_headers(tip.saturating_sub(from_tip - 1)..=tip) - .await - .map_err(Into::into) - }, - } - } - - pub async fn block_timing(&self, start: u64, end: Option) -> Result<(), Error> { - let headers = Self::get_chain_headers(&self.blockchain_db, start, end).await?; - if !headers.is_empty() { - let headers = headers.into_iter().map(|ch| ch.into_header()).rev().collect::>(); - let (max, min, avg) = BlockHeader::timing_stats(&headers); - println!( - "Timing for blocks #{} - #{}", - headers.first().unwrap().height, - headers.last().unwrap().height - ); - println!("Max block time: {}", max); - println!("Min block time: {}", min); - println!("Avg block time: {}", avg); - } else { - println!("No headers found"); - } - Ok(()) - } - - /// Function to process the check-db command - pub async fn check_db(&mut self) -> Result<(), Error> { - let meta = self.node_service.get_metadata().await?; - let mut height = meta.height_of_longest_chain(); - let mut missing_blocks = Vec::new(); - let mut missing_headers = Vec::new(); - print!("Searching for height: "); - // We need to check every header, but not every block. - let horizon_height = meta.horizon_block(height); - while height > 0 { - print!("{}", height); - io::stdout().flush()?; - // we can only check till the pruning horizon, 0 is archive node so it needs to check every block. - if height > horizon_height { - match self.node_service.get_block(height).await { - Err(err) => { - // We need to check the data itself, as FetchMatchingBlocks will suppress any error, only - // logging it. - error!(target: LOG_TARGET, "{}", err); - missing_blocks.push(height); - }, - Ok(Some(_)) => {}, - Ok(None) => missing_blocks.push(height), - }; - } - height -= 1; - let next_header = self.node_service.get_header(height).await.ok().flatten(); - if next_header.is_none() { - // this header is missing, so we stop here and need to ask for this header - missing_headers.push(height); - }; - print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); - } - println!("Complete"); - for missing_block in missing_blocks { - println!("Missing block at height: {}", missing_block); - } - for missing_header_height in missing_headers { - println!("Missing header at height: {}", missing_header_height) - } - Ok(()) - } - - #[allow(deprecated)] - pub async fn period_stats( - &mut self, - period_end: u64, - mut period_ticker_end: u64, - period: u64, - ) -> Result<(), Error> { - let meta = self.node_service.get_metadata().await?; - - let mut height = meta.height_of_longest_chain(); - // Currently gets the stats for: tx count, hash rate estimation, target difficulty, solvetime. - let mut results: Vec<(usize, f64, u64, u64, usize)> = Vec::new(); - - let mut period_ticker_start = period_ticker_end - period; - let mut period_tx_count = 0; - let mut period_block_count = 0; - let mut period_hash = 0.0; - let mut period_difficulty = 0; - let mut period_solvetime = 0; - print!("Searching for height: "); - while height > 0 { - print!("{}", height); - io::stdout().flush()?; - - let block = self - .node_service - .get_block(height) - .await? - .ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?; - - let prev_block = self - .node_service - .get_block(height - 1) - .await? - .ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?; - - height -= 1; - if block.header().timestamp.as_u64() > period_ticker_end { - print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); - continue; - }; - while block.header().timestamp.as_u64() < period_ticker_start { - results.push(( - period_tx_count, - period_hash, - period_difficulty, - period_solvetime, - period_block_count, - )); - period_tx_count = 0; - period_block_count = 0; - period_hash = 0.0; - period_difficulty = 0; - period_solvetime = 0; - period_ticker_end -= period; - period_ticker_start -= period; - } - period_tx_count += block.block().body.kernels().len() - 1; - period_block_count += 1; - let st = if prev_block.header().timestamp.as_u64() >= block.header().timestamp.as_u64() { - 1.0 - } else { - (block.header().timestamp.as_u64() - prev_block.header().timestamp.as_u64()) as f64 - }; - let diff = block.accumulated_data.target_difficulty.as_u64(); - period_difficulty += diff; - period_solvetime += st as u64; - period_hash += diff as f64 / st / 1_000_000.0; - if period_ticker_end <= period_end { - break; - } - print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); - } - println!("Complete"); - println!("Results of tx count, hash rate estimation, target difficulty, solvetime, block count"); - for data in results { - println!("{},{},{},{},{}", data.0, data.1, data.2, data.3, data.4); - } - Ok(()) - } - - pub async fn save_header_stats( - &self, - start_height: u64, - end_height: u64, - filename: String, - pow_algo: Option, - ) -> Result<(), Error> { - let mut output = File::create(&filename).await?; - - println!( - "Loading header from height {} to {} and dumping to file [working-dir]/{}.{}", - start_height, - end_height, - filename, - pow_algo.map(|a| format!(" PoW algo = {}", a)).unwrap_or_default() - ); - - let start_height = cmp::max(start_height, 1); - let mut prev_header = self.blockchain_db.fetch_chain_header(start_height - 1).await?; - - let mut buff = Vec::new(); - writeln!( - buff, - "Height,Achieved,TargetDifficulty,CalculatedDifficulty,SolveTime,NormalizedSolveTime,Algo,Timestamp,\ - Window,Acc.Monero,Acc.Sha3" - )?; - output.write_all(&buff).await?; - - for height in start_height..=end_height { - let header = self.blockchain_db.fetch_chain_header(height).await?; - - // Optionally, filter out pow algos - if pow_algo.map(|algo| header.header().pow_algo() != algo).unwrap_or(false) { - continue; - } - - let target_diff = self - .blockchain_db - .fetch_target_difficulties_for_next_block(prev_header.hash().clone()) - .await?; - let pow_algo = header.header().pow_algo(); - - let min = self - .consensus_rules - .consensus_constants(height) - .min_pow_difficulty(pow_algo); - let max = self - .consensus_rules - .consensus_constants(height) - .max_pow_difficulty(pow_algo); - - let calculated_target_difficulty = target_diff.get(pow_algo).calculate(min, max); - let existing_target_difficulty = header.accumulated_data().target_difficulty; - let achieved = header.accumulated_data().achieved_difficulty; - let solve_time = header.header().timestamp.as_u64() as i64 - prev_header.header().timestamp.as_u64() as i64; - let normalized_solve_time = cmp::min( - cmp::max(solve_time, 1) as u64, - self.consensus_rules - .consensus_constants(height) - .get_difficulty_max_block_interval(pow_algo), - ); - let acc_sha3 = header.accumulated_data().accumulated_sha_difficulty; - let acc_monero = header.accumulated_data().accumulated_monero_difficulty; - - buff.clear(); - writeln!( - buff, - "{},{},{},{},{},{},{},{},{},{},{}", - height, - achieved.as_u64(), - existing_target_difficulty.as_u64(), - calculated_target_difficulty.as_u64(), - solve_time, - normalized_solve_time, - pow_algo, - chrono::DateTime::from(header.header().timestamp), - target_diff.get(pow_algo).len(), - acc_monero.as_u64(), - acc_sha3.as_u64(), - )?; - output.write_all(&buff).await?; - - if header.header().hash() != header.accumulated_data().hash { - eprintln!( - "Difference in hash at {}! header = {} and accum hash = {}", - height, - header.header().hash().to_hex(), - header.accumulated_data().hash.to_hex() - ); - } - - if existing_target_difficulty != calculated_target_difficulty { - eprintln!( - "Difference at {}! existing = {} and calculated = {}", - height, existing_target_difficulty, calculated_target_difficulty - ); - } - - print!("{}", height); - io::stdout().flush()?; - print!("\x1B[{}D\x1B[K", (height + 1).to_string().chars().count()); - prev_header = header; - } - println!("Complete"); - Ok(()) - } - - pub async fn rewind_blockchain(&self, new_height: u64) -> Result<(), Error> { - let blocks = self.blockchain_db.rewind_to_height(new_height).await?; - if !blocks.is_empty() { - self.node_service - .publish_block_event(BlockEvent::BlockSyncRewind(blocks)); - } - Ok(()) - } - - /// Function to process the whoami command - pub fn whoami(&self) -> Result<(), Error> { - println!("{}", self.base_node_identity); - Ok(()) - } - - pub(crate) fn get_software_updater(&self) -> SoftwareUpdaterHandle { - self.software_updater.clone() - } - - pub async fn get_blockchain_db_stats(&self) -> Result<(), Error> { - const BYTES_PER_MB: usize = 1024 * 1024; - - let stats = self.blockchain_db.get_stats().await?; - let mut table = Table::new(); - table.set_titles(vec![ - "Name", - "Entries", - "Depth", - "Branch Pages", - "Leaf Pages", - "Overflow Pages", - "Est. Size (MiB)", - "% of total", - ]); - let total_db_size = stats.db_stats().iter().map(|s| s.total_page_size()).sum::(); - stats.db_stats().iter().for_each(|stat| { - table.add_row(row![ - stat.name, - stat.entries, - stat.depth, - stat.branch_pages, - stat.leaf_pages, - stat.overflow_pages, - format!("{:.2}", stat.total_page_size() as f32 / BYTES_PER_MB as f32), - format!("{:.2}%", (stat.total_page_size() as f32 / total_db_size as f32) * 100.0) - ]); - }); - - table.print_stdout(); - println!(); - println!( - "{} databases, {:.2} MiB used ({:.2}%), page size: {} bytes, env_info = ({})", - stats.root().entries, - total_db_size as f32 / BYTES_PER_MB as f32, - (total_db_size as f32 / stats.env_info().mapsize as f32) * 100.0, - stats.root().psize as usize, - stats.env_info() - ); - - println!(); - println!("Totalling DB entry sizes. This may take a few seconds..."); - println!(); - let stats = self.blockchain_db.fetch_total_size_stats().await?; - println!(); - let mut table = Table::new(); - table.set_titles(vec![ - "Name", - "Entries", - "Total Size (MiB)", - "Avg. Size/Entry (bytes)", - "% of total", - ]); - let total_data_size = stats.sizes().iter().map(|s| s.total()).sum::(); - stats.sizes().iter().for_each(|size| { - let total = size.total() as f32 / BYTES_PER_MB as f32; - table.add_row(row![ - size.name, - size.num_entries, - format!("{:.2}", total), - format!("{}", size.avg_bytes_per_entry()), - format!("{:.2}%", (size.total() as f32 / total_data_size as f32) * 100.0) - ]) - }); - table.print_stdout(); - println!(); - println!( - "Total blockchain data size: {:.2} MiB ({:.2} % of LMDB map size)", - total_data_size as f32 / BYTES_PER_MB as f32, - (total_data_size as f32 / total_db_size as f32) * 100.0 - ); - Ok(()) - } - - #[cfg(not(feature = "metrics"))] - pub fn get_network_stats(&self) -> Result<(), Error> { - println!( - "Metrics are not enabled in this binary. Recompile Tari base node with `--features metrics` to enable \ - them." - ); - Ok(()) - } - - #[cfg(feature = "metrics")] - pub fn get_network_stats(&self) -> Result<(), Error> { - use tari_metrics::proto::MetricType; - let metric_families = tari_metrics::get_default_registry().gather(); - let metric_family_iter = metric_families - .into_iter() - .filter(|family| family.get_name().starts_with("tari_comms")); - - // TODO: Make this useful - let mut table = Table::new(); - table.set_titles(vec!["name", "type", "value"]); - for family in metric_family_iter { - let field_type = family.get_field_type(); - let name = family.get_name(); - for metric in family.get_metric() { - let value = match field_type { - MetricType::COUNTER => metric.get_counter().get_value(), - MetricType::GAUGE => metric.get_gauge().get_value(), - MetricType::SUMMARY => { - let summary = metric.get_summary(); - summary.get_sample_sum() / summary.get_sample_count() as f64 - }, - MetricType::UNTYPED => metric.get_untyped().get_value(), - MetricType::HISTOGRAM => { - let histogram = metric.get_histogram(); - histogram.get_sample_sum() / histogram.get_sample_count() as f64 - }, - }; - - let field_type = match field_type { - MetricType::COUNTER => "COUNTER", - MetricType::GAUGE => "GAUGE", - MetricType::SUMMARY => "SUMMARY", - MetricType::UNTYPED => "UNTYPED", - MetricType::HISTOGRAM => "HISTOGRAM", - }; - - table.add_row(row![name, field_type, value]); - } - } - table.print_stdout(); - Ok(()) - } - - pub fn list_reorgs(&self) -> Result<(), Error> { - if !self.config.blockchain_track_reorgs { - // TODO: Return error/report - println!( - "Reorg tracking is turned off. Add `track_reorgs = true` to the [base_node] section of your config to \ - turn it on." - ); - } else { - let reorgs = self.blockchain_db.inner().fetch_all_reorgs()?; - let mut table = Table::new(); - table.set_titles(vec!["#", "New Tip", "Prev Tip", "Depth", "Timestamp"]); - - for (i, reorg) in reorgs.iter().enumerate() { - table.add_row(row![ - i + 1, - format!("#{} ({})", reorg.new_height, reorg.new_hash.to_hex()), - format!("#{} ({})", reorg.prev_height, reorg.prev_hash.to_hex()), - format!("{} added, {} removed", reorg.num_blocks_added, reorg.num_blocks_removed), - reorg.local_time - ]); - } - table.enable_row_count().print_stdout(); - } - Ok(()) - } -} - -async fn fetch_banned_peers(pm: &PeerManager) -> Result, PeerManagerError> { - let query = PeerQuery::new().select_where(|p| p.is_banned()); - pm.perform_query(query).await -} - -#[derive(Debug, Error)] -#[error("invalid format '{0}'")] -pub struct FormatParseError(String); - -pub enum Format { - Json, - Text, -} - -impl Default for Format { - fn default() -> Self { - Self::Text - } -} - -impl FromStr for Format { - type Err = FormatParseError; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_ref() { - "json" => Ok(Self::Json), - "text" => Ok(Self::Text), - _ => Err(FormatParseError(s.into())), - } - } -} - -// TODO: This is not currently used, but could be pretty useful (maybe as an iterator) -// Function to delimit arguments using spaces and pairs of quotation marks, which may include spaces -// pub fn delimit_command_string(command_str: &str) -> Vec { -// // Delimit arguments using spaces and pairs of quotation marks, which may include spaces -// let arg_temp = command_str.trim().to_string(); -// let re = Regex::new(r#"[^\s"]+|"(?:\\"|[^"])+""#).unwrap(); -// let arg_temp_vec: Vec<&str> = re.find_iter(&arg_temp).map(|mat| mat.as_str()).collect(); -// // Remove quotation marks left behind by `Regex` - it does not support look ahead and look behind -// let mut del_arg_vec = Vec::new(); -// for arg in arg_temp_vec.iter().skip(1) { -// del_arg_vec.push(str::replace(arg, "\"", "")); -// } -// del_arg_vec -// } diff --git a/applications/tari_base_node/src/commands/mod.rs b/applications/tari_base_node/src/commands/mod.rs index e456732564b..4c6e87b58ce 100644 --- a/applications/tari_base_node/src/commands/mod.rs +++ b/applications/tari_base_node/src/commands/mod.rs @@ -1,7 +1,7 @@ mod args; pub mod cli; -pub mod command_handler; +pub mod command; +pub mod nom_parser; pub mod parser; -pub mod performer; pub mod reader; -mod status_line; +pub mod status_line; diff --git a/applications/tari_base_node/src/commands/nom_parser.rs b/applications/tari_base_node/src/commands/nom_parser.rs new file mode 100644 index 00000000000..8d40d411d73 --- /dev/null +++ b/applications/tari_base_node/src/commands/nom_parser.rs @@ -0,0 +1,81 @@ +use derive_more::IntoIterator; +use nom::{ + branch::alt, + bytes::complete::{tag, take_until, take_while1}, + character::complete::multispace0, + error::Error, + multi::many0, + sequence::{delimited, preceded}, + Err, + IResult, +}; + +#[derive(IntoIterator)] +pub struct ParsedCommand<'a> { + items: Vec<&'a str>, +} + +impl<'a> ParsedCommand<'a> { + pub fn parse(line: &'a str) -> Result { + parse(line).map_err(|err| anyhow::Error::msg(err.to_string())) + } +} + +fn parse(input: &str) -> Result, Err>> { + parse_command(input).map(|pair| pair.1) +} + +fn parse_command(input: &str) -> IResult<&str, ParsedCommand<'_>> { + let input = input.trim(); + let (input, pairs) = parse_parameters(input)?; + let command = ParsedCommand { items: pairs }; + Ok((input, command)) +} + +fn parse_parameters(input: &str) -> IResult<&str, Vec<&str>> { + many0(preceded(multispace0, parse_item))(input) +} + +const PQ: &str = "\""; +const SQ: &str = "'"; + +fn is_valid_char(c: char) -> bool { + c.is_alphanumeric() || c == '_' || c == '-' +} + +fn valid_item(input: &str) -> IResult<&str, &str> { + take_while1(is_valid_char)(input) +} + +fn parse_item(input: &str) -> IResult<&str, &str> { + alt(( + delimited(tag(PQ), take_until(PQ), tag(PQ)), + delimited(tag(SQ), take_until(SQ), tag(SQ)), + valid_item, + ))(input) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parser() { + let items = parse("command").unwrap().items; + assert_eq!(items, vec!["command"]); + let items = parse("command with parameters").unwrap().items; + assert_eq!(items, vec!["command", "with", "parameters"]); + let items = parse("command with 'quoted long' \"parameters in\" \"a different \" format") + .unwrap() + .items; + assert_eq!(items, vec![ + "command", + "with", + "quoted long", + "parameters in", + "a different ", + "format" + ]); + // TODO: Support emojis + } +} diff --git a/applications/tari_base_node/src/commands/parser.rs b/applications/tari_base_node/src/commands/parser.rs index d73214c6052..64fcd30aea0 100644 --- a/applications/tari_base_node/src/commands/parser.rs +++ b/applications/tari_base_node/src/commands/parser.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::string::ToString; +use std::str::FromStr; use rustyline::{ completion::Completer, @@ -30,49 +30,38 @@ use rustyline::{ Context, }; use rustyline_derive::{Helper, Highlighter, Validator}; -use strum::IntoEnumIterator; -use strum_macros::{Display, EnumIter, EnumString}; - -/// Enum representing commands used by the basenode -#[derive(Clone, Copy, PartialEq, Debug, Display, EnumIter, EnumString)] -#[strum(serialize_all = "kebab_case")] -pub enum BaseNodeCommand { - Help, - Version, - CheckForUpdates, - Status, - GetChainMetadata, - GetDbStats, - GetPeer, - ListPeers, - DialPeer, - PingPeer, - ResetOfflinePeers, - RewindBlockchain, - BanPeer, - UnbanPeer, - UnbanAllPeers, - ListBannedPeers, - ListConnections, - ListHeaders, - CheckDb, - PeriodStats, - HeaderStats, - BlockTiming, - CalcTiming, - ListReorgs, - DiscoverPeer, - GetBlock, - SearchUtxo, - SearchKernel, - GetMempoolStats, - GetMempoolState, - GetMempoolTx, - Whoami, - GetStateInfo, - GetNetworkStats, - Quit, - Exit, +use strum::{Display, EnumString}; +use tari_utilities::hex::{Hex, HexError}; +use thiserror::Error; + +use super::command::Command; + +#[derive(Debug, Error)] +#[error("invalid format '{0}'")] +pub struct FormatParseError(String); + +#[derive(Debug, Display, EnumString)] +#[strum(serialize_all = "kebab-case")] +pub enum Format { + Json, + Text, +} + +impl Default for Format { + fn default() -> Self { + Self::Text + } +} + +#[derive(Debug)] +pub struct FromHex(pub T); + +impl FromStr for FromHex { + type Err = HexError; + + fn from_str(s: &str) -> Result { + T::from_hex(s).map(Self) + } } /// This is used to parse commands from the user and execute them @@ -115,7 +104,7 @@ impl Parser { /// creates a new parser struct pub fn new() -> Self { Parser { - commands: BaseNodeCommand::iter().map(|x| x.to_string()).collect(), + commands: Command::variants(), hinter: HistoryHinter {}, } } diff --git a/applications/tari_base_node/src/commands/performer.rs b/applications/tari_base_node/src/commands/performer.rs deleted file mode 100644 index b25cd49fa6c..00000000000 --- a/applications/tari_base_node/src/commands/performer.rs +++ /dev/null @@ -1,413 +0,0 @@ -use std::time::Duration; - -use anyhow::Error; -use derive_more::{Deref, DerefMut}; -use log::*; -use strum::IntoEnumIterator; -use tari_app_utilities::utilities::{UniNodeId, UniPublicKey}; -use tari_common_types::types::{Commitment, PrivateKey, PublicKey, Signature}; -use tari_comms::peer_manager::NodeId; -use tari_core::proof_of_work::PowAlgorithm; -use tari_shutdown::Shutdown; -use tari_utilities::ByteArray; - -use super::{ - args::{Args, ArgsError, ArgsReason, FromHex}, - command_handler::{CommandHandler, StatusLineOutput}, - parser::BaseNodeCommand, -}; -use crate::LOG_TARGET; - -#[derive(Deref, DerefMut)] -pub struct Performer { - command_handler: CommandHandler, -} - -impl Performer { - pub fn new(command_handler: CommandHandler) -> Self { - Self { command_handler } - } - - /// This will parse the provided command and execute the task - pub async fn handle_command(&mut self, command_str: &str, shutdown: &mut Shutdown) { - if command_str.trim().is_empty() { - return; - } - - let mut typed_args = Args::split(command_str); - let command = typed_args.take_next("command"); - match command { - Ok(command) => { - let res = self.process_command(command, typed_args, shutdown).await; - if let Err(err) = res { - println!("Command Error: {}", err); - self.print_help(command); - } - }, - Err(_) => { - println!("{} is not a valid command, please enter a valid command", command_str); - println!("Enter help or press tab for available commands"); - }, - } - } - - /// Function to process commands - async fn process_command<'a>( - &mut self, - command: BaseNodeCommand, - mut typed_args: Args<'a>, - shutdown: &mut Shutdown, - ) -> Result<(), Error> { - use BaseNodeCommand::*; - match command { - Help => { - let command = typed_args.take_next("help-command")?; - self.print_help(command); - Ok(()) - }, - Status => self.command_handler.status(StatusLineOutput::StdOutAndLog).await, - GetStateInfo => self.command_handler.state_info(), - Version => self.command_handler.print_version(), - CheckForUpdates => self.command_handler.check_for_updates().await, - GetChainMetadata => self.command_handler.get_chain_meta().await, - GetDbStats => self.command_handler.get_blockchain_db_stats().await, - DialPeer => self.process_dial_peer(typed_args).await, - PingPeer => self.process_ping_peer(typed_args).await, - DiscoverPeer => self.process_discover_peer(typed_args).await, - GetPeer => self.process_get_peer(typed_args).await, - ListPeers => self.process_list_peers(typed_args).await, - ResetOfflinePeers => self.command_handler.reset_offline_peers().await, - RewindBlockchain => self.process_rewind_blockchain(typed_args).await, - CheckDb => self.command_handler.check_db().await, - PeriodStats => self.process_period_stats(typed_args).await, - HeaderStats => self.process_header_stats(typed_args).await, - BanPeer => self.process_ban_peer(typed_args, true).await, - UnbanPeer => self.process_ban_peer(typed_args, false).await, - UnbanAllPeers => self.command_handler.unban_all_peers().await, - ListBannedPeers => self.command_handler.list_banned_peers().await, - ListConnections => self.command_handler.list_connections().await, - ListHeaders => self.process_list_headers(typed_args).await, - BlockTiming | CalcTiming => self.process_block_timing(typed_args).await, - ListReorgs => self.process_list_reorgs().await, - GetBlock => self.process_get_block(typed_args).await, - SearchUtxo => self.process_search_utxo(typed_args).await, - SearchKernel => self.process_search_kernel(typed_args).await, - GetMempoolStats => self.command_handler.get_mempool_stats().await, - GetMempoolState => self.command_handler.get_mempool_state(None).await, - GetMempoolTx => self.get_mempool_state_tx(typed_args).await, - Whoami => self.command_handler.whoami(), - GetNetworkStats => self.command_handler.get_network_stats(), - Exit | Quit => { - println!("Shutting down..."); - info!( - target: LOG_TARGET, - "Termination signal received from user. Shutting node down." - ); - let _ = shutdown.trigger(); - Ok(()) - }, - } - } - - /// Displays the commands or context specific help for a given command - fn print_help(&self, command: BaseNodeCommand) { - use BaseNodeCommand::*; - match command { - Help => { - println!("Available commands are: "); - // TODO: Improve that - let joined = BaseNodeCommand::iter() - .map(|item| item.to_string()) - .collect::>() - .join(", "); - println!("{}", joined); - }, - Status => { - println!("Prints out the status of this node"); - }, - GetStateInfo => { - println!("Prints out the status of the base node state machine"); - }, - Version => { - println!("Gets the current application version"); - }, - CheckForUpdates => { - println!("Checks for software updates if auto update is enabled"); - }, - GetChainMetadata => { - println!("Gets your base node chain meta data"); - }, - GetDbStats => { - println!("Gets your base node database stats"); - }, - DialPeer => { - println!("Attempt to connect to a known peer"); - println!("dial-peer [hex public key or emoji id]"); - }, - PingPeer => { - println!("Send a ping to a known peer and wait for a pong reply"); - println!("ping-peer [hex public key or emoji id]"); - }, - DiscoverPeer => { - println!("Attempt to discover a peer on the Tari network"); - println!("discover-peer [hex public key or emoji id]"); - }, - GetPeer => { - println!("Get all available info about peer"); - println!("Usage: get-peer [Partial NodeId | PublicKey | EmojiId]"); - }, - ListPeers => { - println!("Lists the peers that this node knows about"); - }, - ResetOfflinePeers => { - println!("Clear offline flag from all peers"); - }, - RewindBlockchain => { - println!("Rewinds the blockchain to the given height."); - println!("Usage: {} [new_height]", command); - println!("new_height must be less than the current height."); - }, - BanPeer => { - println!("Bans a peer"); - println!( - "ban-peer/unban-peer [hex public key or emoji id] (length of time to ban the peer for in seconds)" - ); - }, - UnbanPeer => { - println!("Removes a peer ban"); - }, - UnbanAllPeers => { - println!("Unbans all peers"); - }, - ListBannedPeers => { - println!("Lists peers that have been banned by the node or wallet"); - }, - CheckDb => { - println!("Checks the blockchain database for missing blocks and headers"); - }, - HeaderStats => { - println!( - "Prints out certain stats to of the block chain in csv format for easy copy, use as follows: " - ); - println!("header-stats [start height] [end height] (dump_file) (filter:monero|sha3)"); - println!("e.g."); - println!("header-stats 0 1000"); - println!("header-stats 0 1000 sample2.csv"); - println!("header-stats 0 1000 monero-sample.csv monero"); - }, - PeriodStats => { - println!( - "Prints out certain aggregated stats to of the block chain in csv format for easy copy, use as \ - follows: " - ); - println!( - "Period-stats [start time in unix timestamp] [end time in unix timestamp] [interval period time \ - in unix timestamp]" - ); - }, - ListConnections => { - println!("Lists the peer connections currently held by this node"); - }, - ListHeaders => { - println!("List the amount of headers, can be called in the following two ways: "); - println!("list-headers [first header height] [last header height]"); - println!("list-headers [number of headers starting from the chain tip back]"); - }, - BlockTiming | CalcTiming => { - println!("Calculates the maximum, minimum, and average time taken to mine a given range of blocks."); - println!("block-timing [start height] [end height]"); - println!("block-timing [number of blocks from chain tip]"); - }, - ListReorgs => { - println!("List tracked reorgs."); - println!( - "This feature must be enabled by setting `track_reorgs = true` in the [base_node] section of your \ - config." - ); - }, - GetBlock => { - println!("Display a block by height or hash:"); - println!("get-block [height or hash of the block] [format]"); - println!( - "[height or hash of the block] The height or hash of the block to fetch from the main chain. The \ - genesis block has height zero." - ); - println!( - "[format] Optional. Supported options are 'json' and 'text'. 'text' is the default if omitted." - ); - }, - SearchUtxo => { - println!( - "This will search the main chain for the utxo. If the utxo is found, it will print out the block \ - it was found in." - ); - println!("search-utxo [hex of commitment of the utxo]"); - }, - SearchKernel => { - println!( - "This will search the main chain for the kernel. If the kernel is found, it will print out the \ - block it was found in." - ); - println!("This searches for the kernel via the excess signature"); - println!("search-kernel [hex of nonce] [Hex of signature]"); - }, - GetMempoolStats => { - println!("Retrieves your mempools stats"); - }, - GetMempoolState => { - println!("Retrieves your mempools state"); - }, - GetMempoolTx => { - println!("Filters and retrieves details about transactions from the mempool's state"); - }, - Whoami => { - println!( - "Display identity information about this node, including: public key, node ID and the public \ - address" - ); - }, - GetNetworkStats => { - println!("Displays network stats"); - }, - Exit | Quit => { - println!("Exits the base node"); - }, - } - } - - /// Function to process the get-block command - async fn process_get_block<'a>(&self, mut args: Args<'a>) -> Result<(), Error> { - let height = args.try_take_next("height")?; - let hash: Option>> = args.try_take_next("hash")?; - args.shift_one(); - let format = args.try_take_next("format")?.unwrap_or_default(); - - match (height, hash) { - (Some(height), _) => self.command_handler.get_block(height, format).await, - (_, Some(hash)) => self.command_handler.get_block_by_hash(hash.0, format).await, - _ => Err(ArgsError::new( - "height", - "Invalid block height or hash provided. Height must be an integer.", - ) - .into()), - } - } - - /// Function to process the search utxo command - async fn process_search_utxo<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let commitment: FromHex = args.take_next("hex")?; - self.command_handler.search_utxo(commitment.0).await - } - - /// Function to process the search kernel command - async fn process_search_kernel<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let public_nonce: FromHex = args.take_next("public-key")?; - let signature: FromHex = args.take_next("private-key")?; - let kernel_sig = Signature::new(public_nonce.0, signature.0); - self.command_handler.search_kernel(kernel_sig).await - } - - async fn get_mempool_state_tx<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let filter = args.take_next("filter").ok(); - self.command_handler.get_mempool_state(filter).await - } - - /// Function to process the discover-peer command - async fn process_discover_peer<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let key: UniPublicKey = args.take_next("id")?; - self.command_handler.discover_peer(Box::new(key.into())).await - } - - async fn process_get_peer<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let original_str = args - .try_take_next("node_id")? - .ok_or_else(|| ArgsError::new("node_id", ArgsReason::Required))?; - let node_id: Option = args.try_take_next("node_id")?; - let partial; - if let Some(node_id) = node_id { - partial = NodeId::from(node_id).to_vec(); - } else { - let data: FromHex<_> = args.take_next("node_id")?; - partial = data.0; - } - self.command_handler.get_peer(partial, original_str).await; - Ok(()) - } - - /// Function to process the list-peers command - async fn process_list_peers<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let filter = args.take_next("filter").ok(); - self.command_handler.list_peers(filter).await - } - - /// Function to process the dial-peer command - async fn process_dial_peer<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let dest_node_id: UniNodeId = args.take_next("node-id")?; - self.command_handler.dial_peer(dest_node_id.into()).await - } - - /// Function to process the dial-peer command - async fn process_ping_peer<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let dest_node_id: UniNodeId = args.take_next("node-id")?; - self.command_handler.ping_peer(dest_node_id.into()).await - } - - /// Function to process the ban-peer command - async fn process_ban_peer<'a>(&mut self, mut args: Args<'a>, must_ban: bool) -> Result<(), Error> { - let node_id: UniNodeId = args.take_next("node-id")?; - let secs = args.try_take_next("length")?.unwrap_or(std::u64::MAX); - let duration = Duration::from_secs(secs); - self.command_handler.ban_peer(node_id.into(), duration, must_ban).await; - Ok(()) - } - - /// Function to process the list-headers command - async fn process_list_headers<'a>(&self, mut args: Args<'a>) -> Result<(), Error> { - let start = args.take_next("start")?; - let end = args.try_take_next("end")?; - self.command_handler.list_headers(start, end).await; - Ok(()) - } - - /// Function to process the calc-timing command - async fn process_block_timing<'a>(&self, mut args: Args<'a>) -> Result<(), Error> { - let start = args.take_next("start")?; - let end = args.try_take_next("end")?; - if end.is_none() && start < 2 { - Err(ArgsError::new("start", "Number of headers must be at least 2.").into()) - } else { - self.command_handler.block_timing(start, end).await - } - } - - async fn process_period_stats<'a>(&mut self, mut args: Args<'a>) -> Result<(), Error> { - let period_end = args.take_next("period_end")?; - let period_ticker_end = args.take_next("period_ticker_end")?; - let period = args.take_next("period")?; - self.command_handler - .period_stats(period_end, period_ticker_end, period) - .await - } - - async fn process_header_stats<'a>(&self, mut args: Args<'a>) -> Result<(), Error> { - let start_height = args.take_next("start_height")?; - let end_height = args.take_next("end_height")?; - let filename = args - .try_take_next("filename")? - .unwrap_or_else(|| "header-data.csv".into()); - let algo: Option = args.try_take_next("algo")?; - - self.command_handler - .save_header_stats(start_height, end_height, filename, algo) - .await - } - - async fn process_rewind_blockchain<'a>(&self, mut args: Args<'a>) -> Result<(), Error> { - let new_height = args.take_next("new_height")?; - self.command_handler.rewind_blockchain(new_height).await - } - - async fn process_list_reorgs(&self) -> Result<(), Error> { - self.command_handler.list_reorgs() - } -} diff --git a/applications/tari_base_node/src/commands/status_line.rs b/applications/tari_base_node/src/commands/status_line.rs index dfc663443e9..1cc025b78de 100644 --- a/applications/tari_base_node/src/commands/status_line.rs +++ b/applications/tari_base_node/src/commands/status_line.rs @@ -23,6 +23,14 @@ use std::{fmt, fmt::Display}; use chrono::Local; +use strum::{Display, EnumString}; + +#[derive(Debug, Display, EnumString)] +#[strum(serialize_all = "lowercase")] +pub enum StatusLineOutput { + Log, + StdOutAndLog, +} #[derive(Debug, Clone, Default)] pub struct StatusLine { diff --git a/applications/tari_base_node/src/main.rs b/applications/tari_base_node/src/main.rs index 82599707078..a251d097888 100644 --- a/applications/tari_base_node/src/main.rs +++ b/applications/tari_base_node/src/main.rs @@ -102,10 +102,10 @@ use std::{ }; use commands::{ - command_handler::{CommandHandler, StatusLineOutput}, + command::CommandContext, parser::Parser, - performer::Performer, reader::{CommandEvent, CommandReader}, + status_line::StatusLineOutput, }; use futures::FutureExt; use log::*; @@ -135,7 +135,7 @@ use tari_core::chain_storage::ChainStorageError; #[cfg(all(unix, feature = "libtor"))] use tari_libtor::tor::Tor; use tari_shutdown::{Shutdown, ShutdownSignal}; -use tokio::{task, time}; +use tokio::{signal, task, time}; use tonic::transport::Server; use tracing_subscriber::{layer::SubscriberExt, Registry}; @@ -284,16 +284,16 @@ async fn run_node( } // Run, node, run! - let command_handler = CommandHandler::new(&ctx); + let context = CommandContext::new(&ctx, shutdown); if bootstrap.non_interactive_mode { - task::spawn(status_loop(command_handler, shutdown)); + task::spawn(status_loop(context)); println!("Node started in non-interactive mode (pid = {})", process::id()); } else { info!( target: LOG_TARGET, "Node has been successfully configured and initialized. Starting CLI loop." ); - task::spawn(cli_loop(command_handler, config.clone(), shutdown)); + task::spawn(cli_loop(context)); } if !config.force_sync_peers.is_empty() { warn!( @@ -361,10 +361,10 @@ fn get_status_interval(start_time: Instant, long_interval: Duration) -> time::Sl time::sleep(duration) } -async fn status_loop(mut command_handler: CommandHandler, shutdown: Shutdown) { +async fn status_loop(mut context: CommandContext) { let start_time = Instant::now(); - let mut shutdown_signal = shutdown.to_signal(); - let status_interval = command_handler.global_config().base_node_status_line_interval; + let mut shutdown_signal = context.shutdown.to_signal(); + let status_interval = context.global_config().base_node_status_line_interval; loop { let interval = get_status_interval(start_time, status_interval); tokio::select! { @@ -374,7 +374,7 @@ async fn status_loop(mut command_handler: CommandHandler, shutdown: Shutdown) { } _ = interval => { - command_handler.status(StatusLineOutput::Log).await.ok(); + context.status(StatusLineOutput::Log).await.ok(); }, } } @@ -387,11 +387,11 @@ async fn status_loop(mut command_handler: CommandHandler, shutdown: Shutdown) { /// /// ## Returns /// Doesn't return anything -async fn cli_loop(command_handler: CommandHandler, config: Arc, mut shutdown: Shutdown) { +async fn cli_loop(mut context: CommandContext) { let parser = Parser::new(); commands::cli::print_banner(parser.get_commands(), 3); - let mut performer = Performer::new(command_handler); + // TODO: Check for a new version here let cli_config = Config::builder() .history_ignore_space(true) .completion_type(CompletionType::List) @@ -403,25 +403,28 @@ async fn cli_loop(command_handler: CommandHandler, config: Arc, mu rustyline.set_helper(Some(parser)); let mut reader = CommandReader::new(rustyline); - let mut shutdown_signal = shutdown.to_signal(); + let mut shutdown_signal = context.shutdown.to_signal(); let start_time = Instant::now(); - let mut software_update_notif = performer.get_software_updater().new_update_notifier().clone(); + let mut software_update_notif = context.software_updater.new_update_notifier().clone(); let mut first_signal = false; - // TODO: Add heartbeat here - // Show status immediately on startup - let _ = performer.status(StatusLineOutput::StdOutAndLog).await; + let config = context.config.clone(); loop { - let interval = get_status_interval(start_time, config.base_node_status_line_interval); + let mut watch_task = None; tokio::select! { res = reader.next_command() => { if let Some(event) = res { match event { CommandEvent::Command(line) => { first_signal = false; - let fut = performer.handle_command(line.as_str(), &mut shutdown); - let res = time::timeout(Duration::from_secs(70), fut).await; - if let Err(_err) = res { - println!("Time for command execution elapsed: `{}`", line); + if !line.is_empty() { + match context.handle_command_str(&line).await { + Err(err) => { + println!("Command `{}` failed: {}", line, err); + } + Ok(command) => { + watch_task = command; + } + } } } CommandEvent::Interrupt => { @@ -443,24 +446,45 @@ async fn cli_loop(command_handler: CommandHandler, config: Arc, mu break; } }, - Ok(_) = software_update_notif.changed() => { - if let Some(ref update) = *software_update_notif.borrow() { - println!( - "Version {} of the {} is available: {} (sha: {})", - update.version(), - update.app(), - update.download_url(), - update.to_hash_hex() - ); - } - } - _ = interval => { - // TODO: Execute `watch` command here + use the result - let _ = performer.status(StatusLineOutput::StdOutAndLog).await; - }, _ = shutdown_signal.wait() => { break; } } + if let Some(command) = watch_task.as_ref() { + let line = command.line(); + let interval = command + .interval + .map(Duration::from_secs) + .unwrap_or(config.base_node_status_line_interval); + if let Err(err) = context.handle_command_str(line).await { + println!("Wrong command to watch `{}`. Failed with: {}", line, err); + } else { + loop { + let interval = get_status_interval(start_time, interval); + tokio::select! { + _ = interval => { + if let Err(err) = context.handle_command_str(line).await { + println!("Watched command `{}` failed: {}", line, err); + } + }, + _ = signal::ctrl_c() => { + break; + } + // TODO: Is that good idea? Or add a separate command? + Ok(_) = software_update_notif.changed() => { + if let Some(ref update) = *software_update_notif.borrow() { + println!( + "Version {} of the {} is available: {} (sha: {})", + update.version(), + update.app(), + update.download_url(), + update.to_hash_hex() + ); + } + } + } + } + } + } } }