diff --git a/Cargo.lock b/Cargo.lock index 5981ce776e..13e2329a07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1027,6 +1027,7 @@ dependencies = [ "http 0.2.7", "hyper", "hyper-rustls", + "instant", "itertools", "js-sys", "jsonrpc-core", diff --git a/mm2src/coins/Cargo.toml b/mm2src/coins/Cargo.toml index b2ccc8c227..3b361e269b 100644 --- a/mm2src/coins/Cargo.toml +++ b/mm2src/coins/Cargo.toml @@ -121,6 +121,7 @@ spl-token = { version = "3", optional = true } spl-associated-token-account = { version = "1", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] +instant = "0.1.12" js-sys = { version = "0.3.27" } mm2_db = { path = "../mm2_db" } mm2_metamask = { path = "../mm2_metamask" } @@ -137,6 +138,7 @@ hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp"] } # using webpki-tokio to avoid rejecting valid certificates # got "invalid certificate: UnknownIssuer" for https://ropsten.infura.io on iOS using default-features hyper-rustls = { version = "0.23", default-features = false, features = ["http1", "http2", "webpki-tokio"] } +instant = { version = "0.1.12", features = ["wasm-bindgen"] } lightning = "0.0.113" lightning-background-processor = "0.0.113" lightning-invoice = { version = "0.21.0", features = ["serde"] } diff --git a/mm2src/coins/eth.rs b/mm2src/coins/eth.rs index 1c068a26f8..641765e78b 100644 --- a/mm2src/coins/eth.rs +++ b/mm2src/coins/eth.rs @@ -50,6 +50,7 @@ use futures01::Future; use http::StatusCode; use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; +use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus}; use mm2_net::transport::{slurp_url, GuiAuthValidation, GuiAuthValidationGenerator, SlurpError}; use mm2_number::bigdecimal_custom::CheckedDivision; use mm2_number::{BigDecimal, MmNumber}; @@ -102,6 +103,7 @@ use super::{coin_conf, lp_coinfind_or_err, AsyncMutex, BalanceError, BalanceFut, INVALID_RECEIVER_ERR_LOG, INVALID_SENDER_ERR_LOG, INVALID_SWAP_ID_ERR_LOG}; pub use rlp; +mod eth_balance_events; #[cfg(test)] mod eth_tests; #[cfg(target_arch = "wasm32")] mod eth_wasm_tests; mod web3_transport; @@ -4772,6 +4774,16 @@ impl EthCoin { }; Box::new(fut.boxed().compat()) } + + async fn spawn_balance_stream_if_enabled(&self, ctx: &MmArc) -> Result<(), String> { + if let Some(stream_config) = &ctx.event_stream_configuration { + if let EventInitStatus::Failed(err) = EventBehaviour::spawn_if_active(self.clone(), stream_config).await { + return ERR!("Failed spawning balance events. Error: {}", err); + } + } + + Ok(()) + } } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] @@ -5561,9 +5573,10 @@ pub async fn eth_coin_from_conf_and_request( EthCoinType::Erc20 { ref platform, .. } => String::from(platform), }; - let mut map = NONCE_LOCK.lock().unwrap(); - - let nonce_lock = map.entry(key_lock).or_insert_with(new_nonce_lock).clone(); + let nonce_lock = { + let mut map = NONCE_LOCK.lock().unwrap(); + map.entry(key_lock).or_insert_with(new_nonce_lock).clone() + }; // Create an abortable system linked to the `MmCtx` so if the context is stopped via `MmArc::stop`, // all spawned futures related to `ETH` coin will be aborted as well. @@ -5593,7 +5606,11 @@ pub async fn eth_coin_from_conf_and_request( erc20_tokens_infos: Default::default(), abortable_system, }; - Ok(EthCoin(Arc::new(coin))) + + let coin = EthCoin(Arc::new(coin)); + coin.spawn_balance_stream_if_enabled(ctx).await?; + + Ok(coin) } /// Displays the address in mixed-case checksum form diff --git a/mm2src/coins/eth/eth_balance_events.rs b/mm2src/coins/eth/eth_balance_events.rs new file mode 100644 index 0000000000..7f31f9db87 --- /dev/null +++ b/mm2src/coins/eth/eth_balance_events.rs @@ -0,0 +1,170 @@ +use async_trait::async_trait; +use common::{executor::{AbortSettings, SpawnAbortable, Timer}, + log, Future01CompatExt}; +use futures::{channel::oneshot::{self, Receiver, Sender}, + stream::FuturesUnordered, + StreamExt}; +use instant::Instant; +use mm2_core::mm_ctx::MmArc; +use mm2_err_handle::prelude::MmError; +use mm2_event_stream::{behaviour::{EventBehaviour, EventInitStatus}, + Event, EventStreamConfiguration}; +use mm2_number::BigDecimal; +use std::collections::HashMap; + +use super::EthCoin; +use crate::{eth::{u256_to_big_decimal, Erc20TokenInfo}, + BalanceError, MmCoin}; + +/// This implementation differs from others, as they immediately return +/// an error if any of the requests fails. This one completes all futures +/// and returns their results individually. +async fn get_all_balance_results_concurrently( + coin: &EthCoin, +) -> Vec)>> { + let mut tokens = coin.get_erc_tokens_infos(); + + // Workaround for performance purposes. + // + // Unlike tokens, the platform coin length is constant (=1). Instead of creating a generic + // type and mapping the platform coin and the entire token list (which can grow at any time), we map + // the platform coin to Erc20TokenInfo so that we can use the token list right away without + // additional mapping. + tokens.insert(coin.ticker.clone(), Erc20TokenInfo { + token_address: coin.my_address, + decimals: coin.decimals, + }); + + let jobs = tokens + .into_iter() + .map(|(token_ticker, info)| async move { fetch_balance(coin, token_ticker, &info).await }) + .collect::>(); + + jobs.collect().await +} + +async fn fetch_balance( + coin: &EthCoin, + token_ticker: String, + info: &Erc20TokenInfo, +) -> Result<(String, BigDecimal), (String, MmError)> { + let (balance_as_u256, decimals) = if token_ticker == coin.ticker { + ( + coin.address_balance(coin.my_address) + .compat() + .await + .map_err(|e| (token_ticker.clone(), e))?, + coin.decimals, + ) + } else { + ( + coin.get_token_balance_by_address(info.token_address) + .await + .map_err(|e| (token_ticker.clone(), e))?, + info.decimals, + ) + }; + + let balance_as_big_decimal = + u256_to_big_decimal(balance_as_u256, decimals).map_err(|e| (token_ticker.clone(), e.into()))?; + + Ok((token_ticker, balance_as_big_decimal)) +} + +#[async_trait] +impl EventBehaviour for EthCoin { + const EVENT_NAME: &'static str = "COIN_BALANCE"; + const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR"; + + async fn handle(self, interval: f64, tx: oneshot::Sender) { + const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; + + async fn with_socket(_coin: EthCoin, _ctx: MmArc) { todo!() } + + async fn with_polling(coin: EthCoin, ctx: MmArc, interval: f64) { + let mut cache: HashMap = HashMap::new(); + + loop { + let now = Instant::now(); + + let mut balance_updates = vec![]; + for result in get_all_balance_results_concurrently(&coin).await { + match result { + Ok((ticker, balance)) => { + if Some(&balance) == cache.get(&ticker) { + continue; + } + + balance_updates.push(json!({ + "ticker": ticker, + "balance": { "spendable": balance, "unspendable": BigDecimal::default() } + })); + cache.insert(ticker.to_owned(), balance); + }, + Err((ticker, e)) => { + log::error!("Failed getting balance for '{ticker}' with {interval} interval. Error: {e}"); + let e = serde_json::to_value(e).expect("Serialization should't fail."); + ctx.stream_channel_controller + .broadcast(Event::new( + format!("{}:{}", EthCoin::ERROR_EVENT_NAME, ticker), + e.to_string(), + )) + .await; + }, + }; + } + + if !balance_updates.is_empty() { + ctx.stream_channel_controller + .broadcast(Event::new( + EthCoin::EVENT_NAME.to_string(), + json!(balance_updates).to_string(), + )) + .await; + } + + // If the interval is x seconds, our goal is to broadcast changed balances every x seconds. + // To achieve this, we need to subtract the time complexity of each iteration. + // Given that an iteration already takes 80% of the interval, this will lead to inconsistency + // in the events. + let remaining_time = interval - now.elapsed().as_secs_f64(); + // Not worth to make a call for less than `0.1` durations + if remaining_time >= 0.1 { + Timer::sleep(remaining_time).await; + } + } + } + + let ctx = match MmArc::from_weak(&self.ctx) { + Some(ctx) => ctx, + None => { + let msg = "MM context must have been initialized already."; + tx.send(EventInitStatus::Failed(msg.to_owned())) + .expect(RECEIVER_DROPPED_MSG); + panic!("{}", msg); + }, + }; + + tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG); + + with_polling(self, ctx, interval).await + } + + async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus { + if let Some(event) = config.get_event(Self::EVENT_NAME) { + log::info!("{} event is activated for {}", Self::EVENT_NAME, self.ticker,); + + let (tx, rx): (Sender, Receiver) = oneshot::channel(); + let fut = self.clone().handle(event.stream_interval_seconds, tx); + let settings = + AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker)); + self.spawner().spawn_with_settings(fut, settings); + + rx.await.unwrap_or_else(|e| { + EventInitStatus::Failed(format!("Event initialization status must be received: {}", e)) + }) + } else { + EventInitStatus::Inactive + } + } +} diff --git a/mm2src/coins/eth/v2_activation.rs b/mm2src/coins/eth/v2_activation.rs index fddf8da03f..eec453ae54 100644 --- a/mm2src/coins/eth/v2_activation.rs +++ b/mm2src/coins/eth/v2_activation.rs @@ -28,6 +28,8 @@ pub enum EthActivationV2Error { #[display(fmt = "Error deserializing 'derivation_path': {}", _0)] ErrorDeserializingDerivationPath(String), PrivKeyPolicyNotAllowed(PrivKeyPolicyNotAllowed), + #[display(fmt = "Failed spawning balance events. Error: {_0}")] + FailedSpawningBalanceEvents(String), #[cfg(target_arch = "wasm32")] #[from_trait(WithMetamaskRpcError::metamask_rpc_error)] #[display(fmt = "{}", _0)] @@ -293,8 +295,10 @@ pub async fn eth_coin_from_conf_and_request_v2( let sign_message_prefix: Option = json::from_value(conf["sign_message_prefix"].clone()).ok(); - let mut map = NONCE_LOCK.lock().unwrap(); - let nonce_lock = map.entry(ticker.clone()).or_insert_with(new_nonce_lock).clone(); + let nonce_lock = { + let mut map = NONCE_LOCK.lock().unwrap(); + map.entry(ticker.clone()).or_insert_with(new_nonce_lock).clone() + }; // Create an abortable system linked to the `MmCtx` so if the app is stopped on `MmArc::stop`, // all spawned futures related to `ETH` coin will be aborted as well. @@ -325,7 +329,12 @@ pub async fn eth_coin_from_conf_and_request_v2( abortable_system, }; - Ok(EthCoin(Arc::new(coin))) + let coin = EthCoin(Arc::new(coin)); + coin.spawn_balance_stream_if_enabled(ctx) + .await + .map_err(EthActivationV2Error::FailedSpawningBalanceEvents)?; + + Ok(coin) } /// Processes the given `priv_key_policy` and generates corresponding `KeyPair`. diff --git a/mm2src/coins/lp_coins.rs b/mm2src/coins/lp_coins.rs index 1ca02100a7..a937f9a474 100644 --- a/mm2src/coins/lp_coins.rs +++ b/mm2src/coins/lp_coins.rs @@ -2068,7 +2068,8 @@ impl NumConversError { pub fn description(&self) -> &str { &self.0 } } -#[derive(Clone, Debug, Display, PartialEq, Serialize)] +#[derive(Clone, Debug, Display, PartialEq, Serialize, SerializeErrorType)] +#[serde(tag = "error_type", content = "error_data")] pub enum BalanceError { #[display(fmt = "Transport: {}", _0)] Transport(String), diff --git a/mm2src/coins/tendermint/tendermint_balance_events.rs b/mm2src/coins/tendermint/tendermint_balance_events.rs index 2bbdd59824..3b876b550b 100644 --- a/mm2src/coins/tendermint/tendermint_balance_events.rs +++ b/mm2src/coins/tendermint/tendermint_balance_events.rs @@ -17,6 +17,7 @@ use crate::{tendermint::TendermintCommons, utxo::utxo_common::big_decimal_from_s #[async_trait] impl EventBehaviour for TendermintCoin { const EVENT_NAME: &'static str = "COIN_BALANCE"; + const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR"; async fn handle(self, _interval: f64, tx: oneshot::Sender) { fn generate_subscription_query(query_filter: String) -> String { @@ -120,7 +121,15 @@ impl EventBehaviour for TendermintCoin { let balance_denom = match self.account_balance_for_denom(&self.account_id, denom).await { Ok(balance_denom) => balance_denom, Err(e) => { - log::error!("{e}"); + log::error!("Failed getting balance for '{ticker}'. Error: {e}"); + let e = serde_json::to_value(e).expect("Serialization should't fail."); + ctx.stream_channel_controller + .broadcast(Event::new( + format!("{}:{}", Self::ERROR_EVENT_NAME, ticker), + e.to_string(), + )) + .await; + continue; }, }; diff --git a/mm2src/coins/tendermint/tendermint_coin.rs b/mm2src/coins/tendermint/tendermint_coin.rs index 18d825977e..54281c5ad7 100644 --- a/mm2src/coins/tendermint/tendermint_coin.rs +++ b/mm2src/coins/tendermint/tendermint_coin.rs @@ -283,9 +283,10 @@ pub enum TendermintInitErrorKind { BalanceStreamInitError(String), } -#[derive(Display, Debug)] +#[derive(Display, Debug, Serialize, SerializeErrorType)] +#[serde(tag = "error_type", content = "error_data")] pub enum TendermintCoinRpcError { - Prost(DecodeError), + Prost(String), InvalidResponse(String), PerformError(String), RpcClientError(String), @@ -293,7 +294,7 @@ pub enum TendermintCoinRpcError { } impl From for TendermintCoinRpcError { - fn from(err: DecodeError) -> Self { TendermintCoinRpcError::Prost(err) } + fn from(err: DecodeError) -> Self { TendermintCoinRpcError::Prost(err.to_string()) } } impl From for TendermintCoinRpcError { @@ -308,7 +309,7 @@ impl From for BalanceError { fn from(err: TendermintCoinRpcError) -> Self { match err { TendermintCoinRpcError::InvalidResponse(e) => BalanceError::InvalidResponse(e), - TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e.to_string()), + TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e), TendermintCoinRpcError::PerformError(e) => BalanceError::Transport(e), TendermintCoinRpcError::RpcClientError(e) => BalanceError::Transport(e), TendermintCoinRpcError::InternalError(e) => BalanceError::Internal(e), @@ -320,7 +321,7 @@ impl From for ValidatePaymentError { fn from(err: TendermintCoinRpcError) -> Self { match err { TendermintCoinRpcError::InvalidResponse(e) => ValidatePaymentError::InvalidRpcResponse(e), - TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e.to_string()), + TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e), TendermintCoinRpcError::PerformError(e) => ValidatePaymentError::Transport(e), TendermintCoinRpcError::RpcClientError(e) => ValidatePaymentError::Transport(e), TendermintCoinRpcError::InternalError(e) => ValidatePaymentError::InternalError(e), @@ -1078,10 +1079,10 @@ impl TendermintCoin { ethermint_account .base_account - .or_mm_err(|| TendermintCoinRpcError::Prost(err))? + .or_mm_err(|| TendermintCoinRpcError::Prost(err.to_string()))? }, Err(err) => { - return MmError::err(TendermintCoinRpcError::Prost(err)); + return MmError::err(TendermintCoinRpcError::Prost(err.to_string())); }, }; diff --git a/mm2src/coins/utxo/utxo_balance_events.rs b/mm2src/coins/utxo/utxo_balance_events.rs index 7ff6957c3b..d54895b3ab 100644 --- a/mm2src/coins/utxo/utxo_balance_events.rs +++ b/mm2src/coins/utxo/utxo_balance_events.rs @@ -32,9 +32,8 @@ macro_rules! try_or_continue { #[async_trait] impl EventBehaviour for UtxoStandardCoin { const EVENT_NAME: &'static str = "COIN_BALANCE"; + const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR"; - // TODO: On certain errors, send an error event to clients (e.g., when not being able to read the - // balance or not being able to subscribe to scripthash/address.). async fn handle(self, _interval: f64, tx: oneshot::Sender) { const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; @@ -106,6 +105,13 @@ impl EventBehaviour for UtxoStandardCoin { Ok(map) => scripthash_to_address_map.extend(map), Err(e) => { log::error!("{e}"); + + ctx.stream_channel_controller + .broadcast(Event::new( + format!("{}:{}", Self::ERROR_EVENT_NAME, self.ticker()), + json!({ "error": e }).to_string(), + )) + .await; }, }; @@ -117,6 +123,13 @@ impl EventBehaviour for UtxoStandardCoin { Ok(map) => scripthash_to_address_map = map, Err(e) => { log::error!("{e}"); + + ctx.stream_channel_controller + .broadcast(Event::new( + format!("{}:{}", Self::ERROR_EVENT_NAME, self.ticker()), + json!({ "error": e }).to_string(), + )) + .await; }, }; @@ -153,7 +166,23 @@ impl EventBehaviour for UtxoStandardCoin { }, }; - let balance = try_or_continue!(address_balance(&self, &address).await); + let balance = match address_balance(&self, &address).await { + Ok(t) => t, + Err(e) => { + let ticker = self.ticker(); + log::error!("Failed getting balance for '{ticker}'. Error: {e}"); + let e = serde_json::to_value(e).expect("Serialization should't fail."); + + ctx.stream_channel_controller + .broadcast(Event::new( + format!("{}:{}", Self::ERROR_EVENT_NAME, ticker), + e.to_string(), + )) + .await; + + continue; + }, + }; let payload = json!({ "ticker": self.ticker(), diff --git a/mm2src/coins_activation/src/eth_with_token_activation.rs b/mm2src/coins_activation/src/eth_with_token_activation.rs index 3a93f3ad07..7d1b695f55 100644 --- a/mm2src/coins_activation/src/eth_with_token_activation.rs +++ b/mm2src/coins_activation/src/eth_with_token_activation.rs @@ -28,7 +28,8 @@ impl From for EnablePlatformCoinWithTokensError { match err { EthActivationV2Error::InvalidPayload(e) | EthActivationV2Error::InvalidSwapContractAddr(e) - | EthActivationV2Error::InvalidFallbackSwapContract(e) => { + | EthActivationV2Error::InvalidFallbackSwapContract(e) + | EthActivationV2Error::ErrorDeserializingDerivationPath(e) => { EnablePlatformCoinWithTokensError::InvalidPayload(e) }, #[cfg(target_arch = "wasm32")] @@ -44,12 +45,12 @@ impl From for EnablePlatformCoinWithTokensError { EthActivationV2Error::CouldNotFetchBalance(e) | EthActivationV2Error::UnreachableNodes(e) => { EnablePlatformCoinWithTokensError::Transport(e) }, - EthActivationV2Error::ErrorDeserializingDerivationPath(e) => { - EnablePlatformCoinWithTokensError::InvalidPayload(e) - }, EthActivationV2Error::PrivKeyPolicyNotAllowed(e) => { EnablePlatformCoinWithTokensError::PrivKeyPolicyNotAllowed(e) }, + EthActivationV2Error::FailedSpawningBalanceEvents(e) => { + EnablePlatformCoinWithTokensError::FailedSpawningBalanceEvents(e) + }, #[cfg(target_arch = "wasm32")] EthActivationV2Error::MetamaskError(metamask) => { EnablePlatformCoinWithTokensError::Transport(metamask.to_string()) diff --git a/mm2src/coins_activation/src/platform_coin_with_tokens.rs b/mm2src/coins_activation/src/platform_coin_with_tokens.rs index bd12c99a22..2d2f914446 100644 --- a/mm2src/coins_activation/src/platform_coin_with_tokens.rs +++ b/mm2src/coins_activation/src/platform_coin_with_tokens.rs @@ -220,6 +220,8 @@ pub enum EnablePlatformCoinWithTokensError { Transport(String), AtLeastOneNodeRequired(String), InvalidPayload(String), + #[display(fmt = "Failed spawning balance events. Error: {_0}")] + FailedSpawningBalanceEvents(String), Internal(String), } @@ -288,6 +290,7 @@ impl HttpStatusCode for EnablePlatformCoinWithTokensError { | EnablePlatformCoinWithTokensError::UnexpectedPlatformProtocol { .. } | EnablePlatformCoinWithTokensError::InvalidPayload { .. } | EnablePlatformCoinWithTokensError::AtLeastOneNodeRequired(_) + | EnablePlatformCoinWithTokensError::FailedSpawningBalanceEvents(_) | EnablePlatformCoinWithTokensError::UnexpectedTokenProtocol { .. } => StatusCode::BAD_REQUEST, } } diff --git a/mm2src/mm2_event_stream/src/behaviour.rs b/mm2src/mm2_event_stream/src/behaviour.rs index 8539754061..d09424dcdc 100644 --- a/mm2src/mm2_event_stream/src/behaviour.rs +++ b/mm2src/mm2_event_stream/src/behaviour.rs @@ -14,6 +14,9 @@ pub trait EventBehaviour { /// Unique name of the event. const EVENT_NAME: &'static str; + /// Name of the error event with default value "ERROR". + const ERROR_EVENT_NAME: &'static str = "ERROR"; + /// Event handler that is responsible for broadcasting event data to the streaming channels. async fn handle(self, interval: f64, tx: oneshot::Sender);