diff --git a/applications/tari_app_grpc/proto/wallet.proto b/applications/tari_app_grpc/proto/wallet.proto index 6c051b5cad..3ba81bbf0f 100644 --- a/applications/tari_app_grpc/proto/wallet.proto +++ b/applications/tari_app_grpc/proto/wallet.proto @@ -44,6 +44,8 @@ service Wallet { rpc GetCompletedTransactions (GetCompletedTransactionsRequest) returns (stream GetCompletedTransactionsResponse); // Returns the balance rpc GetBalance (GetBalanceRequest) returns (GetBalanceResponse); + // Returns unspent amounts + rpc GetUnspentAmounts (Empty) returns (GetUnspentAmountsResponse); // Request the wallet perform a coinsplit rpc CoinSplit (CoinSplitRequest) returns (CoinSplitResponse); // Import Utxo to wallet @@ -206,6 +208,10 @@ message GetBalanceResponse { uint64 pending_outgoing_balance = 3; } +message GetUnspentAmountsResponse { + repeated uint64 amount = 1; +} + message GetCoinbaseRequest { uint64 reward = 1; uint64 fee = 2; diff --git a/applications/tari_collectibles/src-tauri/src/clients/wallet_client.rs b/applications/tari_collectibles/src-tauri/src/clients/wallet_client.rs index 4b416c67ed..6e50e615b6 100644 --- a/applications/tari_collectibles/src-tauri/src/clients/wallet_client.rs +++ b/applications/tari_collectibles/src-tauri/src/clients/wallet_client.rs @@ -123,4 +123,19 @@ impl WalletClient { debug!(target: LOG_TARGET, "result {:?}", result); Ok(result.into_inner()) } + + pub async fn get_unspent_amounts( + &mut self, + ) -> Result { + let inner = self.inner.as_mut().unwrap(); + let request = grpc::Empty {}; + let result = inner.get_unspent_amounts(request).await.map_err(|source| { + CollectiblesError::ClientRequestError { + request: "get_unspent_amounts".to_string(), + source, + } + })?; + debug!(target: LOG_TARGET, "result {:?}", result); + Ok(result.into_inner()) + } } diff --git a/applications/tari_collectibles/src-tauri/src/commands/asset_wallets/mod.rs b/applications/tari_collectibles/src-tauri/src/commands/asset_wallets/mod.rs index a9537bb58a..0963975423 100644 --- a/applications/tari_collectibles/src-tauri/src/commands/asset_wallets/mod.rs +++ b/applications/tari_collectibles/src-tauri/src/commands/asset_wallets/mod.rs @@ -175,6 +175,16 @@ pub(crate) async fn asset_wallets_get_balance( Ok(total) } +#[tauri::command] +pub(crate) async fn asset_wallets_get_unspent_amounts( + state: tauri::State<'_, ConcurrentAppState>, +) -> Result, Status> { + let mut client = state.create_wallet_client().await; + client.connect().await?; + let result = client.get_unspent_amounts().await?; + Ok(result.amount) +} + #[tauri::command] pub(crate) async fn asset_wallets_list( state: tauri::State<'_, ConcurrentAppState>, diff --git a/applications/tari_collectibles/src-tauri/src/main.rs b/applications/tari_collectibles/src-tauri/src/main.rs index 1e080a08d5..0254de8b01 100644 --- a/applications/tari_collectibles/src-tauri/src/main.rs +++ b/applications/tari_collectibles/src-tauri/src/main.rs @@ -45,6 +45,7 @@ fn main() -> Result<(), Box> { commands::asset_wallets::asset_wallets_create, commands::asset_wallets::asset_wallets_list, commands::asset_wallets::asset_wallets_get_balance, + commands::asset_wallets::asset_wallets_get_unspent_amounts, commands::asset_wallets::asset_wallets_get_latest_address, commands::asset_wallets::asset_wallets_create_address, commands::asset_wallets::asset_wallets_send_to, diff --git a/applications/tari_collectibles/web-app/src/Create.js b/applications/tari_collectibles/web-app/src/Create.js index de95adf2da..93c5a496d3 100644 --- a/applications/tari_collectibles/web-app/src/Create.js +++ b/applications/tari_collectibles/web-app/src/Create.js @@ -156,6 +156,11 @@ class Create extends React.Component { templateIds.push(721); } + let outputs = await binding.command_asset_wallets_get_unspent_amounts(); + + if (outputs.length <= 1) { + throw { message: "You need at least two unspent outputs" }; + } let publicKey = await binding.command_assets_create( name, description, diff --git a/applications/tari_collectibles/web-app/src/binding.js b/applications/tari_collectibles/web-app/src/binding.js index e45b3fc584..b774099e8a 100644 --- a/applications/tari_collectibles/web-app/src/binding.js +++ b/applications/tari_collectibles/web-app/src/binding.js @@ -137,6 +137,10 @@ async function command_asset_wallets_get_balance(assetPublicKey) { return await invoke("asset_wallets_get_balance", { assetPublicKey }); } +async function command_asset_wallets_get_unspent_amounts() { + return await invoke("asset_wallets_get_unspent_amounts", {}); +} + const commands = { command_create_db, command_assets_create, @@ -147,6 +151,7 @@ const commands = { command_next_asset_public_key, command_asset_wallets_create, command_asset_wallets_get_balance, + command_asset_wallets_get_unspent_amounts, command_asset_wallets_list, command_asset_wallets_get_latest_address, command_asset_wallets_create_address, diff --git a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs index 4cc1bf39f3..d394bcb338 100644 --- a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -51,6 +51,7 @@ use tari_app_grpc::{ GetOwnedAssetsResponse, GetTransactionInfoRequest, GetTransactionInfoResponse, + GetUnspentAmountsResponse, GetVersionRequest, GetVersionResponse, ImportUtxosRequest, @@ -163,6 +164,25 @@ impl wallet_server::Wallet for WalletGrpcServer { })) } + async fn get_unspent_amounts( + &self, + _: Request, + ) -> Result, Status> { + let mut output_service = self.get_output_manager_service(); + let unspent_amounts; + match output_service.get_unspent_outputs().await { + Ok(uo) => unspent_amounts = uo, + Err(e) => return Err(Status::not_found(format!("GetUnspentAmounts error! {}", e))), + } + Ok(Response::new(GetUnspentAmountsResponse { + amount: unspent_amounts + .into_iter() + .map(|o| o.value.as_u64()) + .filter(|&a| a > 0) + .collect(), + })) + } + async fn revalidate_all_transactions( &self, _request: Request, diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index bc737d6883..1ab749cefc 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -20,10 +20,9 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{fs, fs::File, io::BufReader, path::Path, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; -use futures::future::try_join_all; -use log::*; +use log::info; use tari_common::{configuration::ValidatorNodeConfig, GlobalConfig}; use tari_comms::{types::CommsPublicKey, NodeIdentity}; use tari_comms_dht::Dht; @@ -31,6 +30,7 @@ use tari_crypto::tari_utilities::hex::Hex; use tari_dan_core::{ models::{AssetDefinition, Committee}, services::{ + BaseNodeClient, ConcreteAssetProcessor, ConcreteCheckpointManager, ConcreteCommitteeManager, @@ -46,7 +46,7 @@ use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService}; use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType}; use tari_service_framework::ServiceHandles; use tari_shutdown::ShutdownSignal; -use tokio::task; +use tokio::{task, time}; use crate::{ default_service_specification::DefaultServiceSpecification, @@ -58,7 +58,7 @@ use crate::{ ExitCodes, }; -const LOG_TARGET: &str = "tari::dan::dan_node"; +const LOG_TARGET: &str = "tari::validator_node::app"; pub struct DanNode { config: GlobalConfig, @@ -84,84 +84,63 @@ impl DanNode { .as_ref() .ok_or_else(|| ExitCodes::ConfigError("Missing dan section".to_string()))?; - let asset_definitions = self.read_asset_definitions(&dan_config.asset_config_directory)?; - if asset_definitions.is_empty() { - warn!( - target: LOG_TARGET, - "No assets to process. Add assets by putting definitions in the `assets` folder with a `.asset` \ - extension." - ); - } - - let mut tasks = vec![]; - for asset in asset_definitions { - let node_identitiy = node_identity.as_ref().clone(); - let mempool = mempool_service.clone(); - let handles = handles.clone(); - let subscription_factory = subscription_factory.clone(); - let shutdown = shutdown.clone(); - let dan_config = dan_config.clone(); - let db_factory = db_factory.clone(); - - tasks.push(task::spawn(async move { - DanNode::start_asset_worker( - asset, - node_identitiy, - mempool, - handles, - subscription_factory, - shutdown, - dan_config, - db_factory, - ) - .await - })); - } - - if tasks.is_empty() { - // If there are no assets to process, work in proxy mode - tasks.push(task::spawn(DanNode::wait_for_exit())); - } - try_join_all(tasks) - .await - .map_err(|err| ExitCodes::UnknownError(format!("Join error occurred. {}", err)))? - .into_iter() - .collect::>()?; - - Ok(()) - } - - fn read_asset_definitions(&self, path: &Path) -> Result, ExitCodes> { - if !path.exists() { - fs::create_dir_all(path).expect("Could not create dir"); - } - let paths = fs::read_dir(path).expect("Could not read asset definitions"); - - let mut result = vec![]; - for path in paths { - let path = path.expect("Not a valid file").path(); - - if !path.is_dir() && path.extension().unwrap_or_default() == "asset" { - let file = File::open(path).expect("could not open file"); - let reader = BufReader::new(file); - - let def: AssetDefinition = serde_json::from_reader(reader).expect("lol not a valid json"); - result.push(def); - } - } - Ok(result) - } - - async fn wait_for_exit() -> Result<(), ExitCodes> { - println!("Type `exit` to exit"); + let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address); + let mut tasks = HashMap::new(); + let mut next_scanned_height = 0u64; loop { - let mut line = String::new(); - let _ = std::io::stdin().read_line(&mut line).expect("Failed to read line"); - if line.to_lowercase().trim() == "exit" { - return Err(ExitCodes::UnknownError("User cancelled".to_string())); - } else { - println!("Type `exit` to exit"); + let tip = base_node_client.get_tip_info().await.unwrap(); + if tip.height_of_longest_chain >= next_scanned_height { + info!( + target: LOG_TARGET, + "Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain + ); + if dan_config.scan_for_assets { + next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval; + info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height); + } else { + next_scanned_height = u64::MAX; // Never run again. + } + + let assets = base_node_client + .get_assets_for_dan_node(node_identity.public_key().clone()) + .await + .unwrap(); + for asset in assets { + if tasks.contains_key(&asset.public_key) { + continue; + } + if let Some(allow_list) = &dan_config.assets_allow_list { + if !allow_list.contains(&asset.public_key.to_hex()) { + continue; + } + } + info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key); + let node_identitiy = node_identity.as_ref().clone(); + let mempool = mempool_service.clone(); + let handles = handles.clone(); + let subscription_factory = subscription_factory.clone(); + let shutdown = shutdown.clone(); + let dan_config = dan_config.clone(); + let db_factory = db_factory.clone(); + tasks.insert( + asset.public_key.clone(), + task::spawn(async move { + DanNode::start_asset_worker( + asset.clone(), + node_identitiy, + mempool, + handles, + subscription_factory, + shutdown, + dan_config, + db_factory, + ) + .await + }), + ); + } } + time::sleep(Duration::from_secs(120)).await; } } diff --git a/applications/tari_validator_node/src/grpc/services/base_node_client.rs b/applications/tari_validator_node/src/grpc/services/base_node_client.rs index 8654288a24..236627f353 100644 --- a/applications/tari_validator_node/src/grpc/services/base_node_client.rs +++ b/applications/tari_validator_node/src/grpc/services/base_node_client.rs @@ -27,7 +27,7 @@ use tari_app_grpc::tari_rpc as grpc; use tari_common_types::types::PublicKey; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::{BaseLayerMetadata, BaseLayerOutput}, + models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput}, services::BaseNodeClient, DigitalAssetError, }; @@ -100,4 +100,48 @@ impl BaseNodeClient for GrpcBaseNodeClient { .transpose()?; Ok(output) } + + async fn get_assets_for_dan_node( + &mut self, + dan_node_public_key: PublicKey, + ) -> Result, DigitalAssetError> { + let inner = match self.inner.as_mut() { + Some(i) => i, + None => { + self.connect().await?; + self.inner.as_mut().unwrap() + }, + }; + let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 }; + let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner(); + let mut assets: Vec = vec![]; + let tip = self.get_tip_info().await?; + while let Some(r) = result.message().await.unwrap() { + if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) { + if let Some(checkpoint) = self + .get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32]) + .await? + { + if let Some(committee) = checkpoint.get_side_chain_committee() { + if committee.contains(&dan_node_public_key) { + assets.push(AssetDefinition { + public_key: asset_public_key, + template_parameters: r + .features + .unwrap() + .asset + .unwrap() + .template_parameters + .into_iter() + .map(|tp| tp.into()) + .collect(), + ..Default::default() + }); + } + } + } + } + } + Ok(assets) + } } diff --git a/common/config/presets/validator_node.toml b/common/config/presets/validator_node.toml index f6af4517a9..735cd0def8 100644 --- a/common/config/presets/validator_node.toml +++ b/common/config/presets/validator_node.toml @@ -9,3 +9,10 @@ committee = ["2ea0df3059caf4411624d6bf5b9c02238d607d2798c586b3e6c2a054da3f205a"] # cannot be of zero size phase_timeout = 30 template_id = "EditableMetadata" + +# If set to false, there will be no scanning at all. +scan_for_assets = true +# How often do we want to scan the base layer for changes. +new_asset_scanning_interval = 10 +# If set then only the specific assets will be checked. +# assets_allow_list = [""] diff --git a/common/src/configuration/validator_node_config.rs b/common/src/configuration/validator_node_config.rs index 243b42aa16..79c9eba215 100644 --- a/common/src/configuration/validator_node_config.rs +++ b/common/src/configuration/validator_node_config.rs @@ -41,6 +41,9 @@ pub struct ValidatorNodeConfig { pub base_node_grpc_address: SocketAddr, #[serde(default = "default_wallet_grpc_address")] pub wallet_grpc_address: SocketAddr, + pub scan_for_assets: bool, + pub new_asset_scanning_interval: u64, + pub assets_allow_list: Option>, } fn default_asset_config_directory() -> PathBuf { diff --git a/dan_layer/core/src/services/base_node_client.rs b/dan_layer/core/src/services/base_node_client.rs index 99cacf2a4b..ae77144a8d 100644 --- a/dan_layer/core/src/services/base_node_client.rs +++ b/dan_layer/core/src/services/base_node_client.rs @@ -25,7 +25,7 @@ use tari_common_types::types::PublicKey; use crate::{ digital_assets_error::DigitalAssetError, - models::{BaseLayerMetadata, BaseLayerOutput}, + models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput}, }; #[async_trait] @@ -38,4 +38,9 @@ pub trait BaseNodeClient { asset_public_key: PublicKey, checkpoint_unique_id: Vec, ) -> Result, DigitalAssetError>; + + async fn get_assets_for_dan_node( + &mut self, + dan_node_public_key: PublicKey, + ) -> Result, DigitalAssetError>; } diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index 67e520f52e..b5989823bd 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -197,6 +197,13 @@ impl BaseNodeClient for MockBaseNodeClient { ) -> Result, DigitalAssetError> { todo!(); } + + async fn get_assets_for_dan_node( + &mut self, + _dan_node_public_key: PublicKey, + ) -> Result, DigitalAssetError> { + todo!(); + } } pub fn mock_base_node_client() -> MockBaseNodeClient {