Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ETH): balance event streaming for ETH #2041

Merged
merged 17 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ spl-token = { version = "3", optional = true }
spl-associated-token-account = { version = "1", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
instant = "0.1.12"
js-sys = { version = "0.3.27" }
mm2_db = { path = "../mm2_db" }
mm2_metamask = { path = "../mm2_metamask" }
Expand All @@ -137,6 +138,7 @@ hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp"] }
# using webpki-tokio to avoid rejecting valid certificates
# got "invalid certificate: UnknownIssuer" for https://ropsten.infura.io on iOS using default-features
hyper-rustls = { version = "0.23", default-features = false, features = ["http1", "http2", "webpki-tokio"] }
instant = { version = "0.1.12", features = ["wasm-bindgen"] }
lightning = "0.0.113"
lightning-background-processor = "0.0.113"
lightning-invoice = { version = "0.21.0", features = ["serde"] }
Expand Down
25 changes: 21 additions & 4 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use futures01::Future;
use http::StatusCode;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_net::transport::{slurp_url, GuiAuthValidation, GuiAuthValidationGenerator, SlurpError};
use mm2_number::bigdecimal_custom::CheckedDivision;
use mm2_number::{BigDecimal, MmNumber};
Expand Down Expand Up @@ -102,6 +103,7 @@ use super::{coin_conf, lp_coinfind_or_err, AsyncMutex, BalanceError, BalanceFut,
INVALID_RECEIVER_ERR_LOG, INVALID_SENDER_ERR_LOG, INVALID_SWAP_ID_ERR_LOG};
pub use rlp;

mod eth_balance_events;
#[cfg(test)] mod eth_tests;
#[cfg(target_arch = "wasm32")] mod eth_wasm_tests;
mod web3_transport;
Expand Down Expand Up @@ -4772,6 +4774,16 @@ impl EthCoin {
};
Box::new(fut.boxed().compat())
}

async fn spawn_balance_stream_if_enabled(&self, ctx: &MmArc) -> Result<(), String> {
if let Some(stream_config) = &ctx.event_stream_configuration {
if let EventInitStatus::Failed(err) = EventBehaviour::spawn_if_active(self.clone(), stream_config).await {
return ERR!("Failed spawning balance events. Error: {}", err);
}
}

Ok(())
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -5561,9 +5573,10 @@ pub async fn eth_coin_from_conf_and_request(
EthCoinType::Erc20 { ref platform, .. } => String::from(platform),
};

let mut map = NONCE_LOCK.lock().unwrap();

let nonce_lock = map.entry(key_lock).or_insert_with(new_nonce_lock).clone();
let nonce_lock = {
let mut map = NONCE_LOCK.lock().unwrap();
map.entry(key_lock).or_insert_with(new_nonce_lock).clone()
};

// Create an abortable system linked to the `MmCtx` so if the context is stopped via `MmArc::stop`,
// all spawned futures related to `ETH` coin will be aborted as well.
Expand Down Expand Up @@ -5593,7 +5606,11 @@ pub async fn eth_coin_from_conf_and_request(
erc20_tokens_infos: Default::default(),
abortable_system,
};
Ok(EthCoin(Arc::new(coin)))

let coin = EthCoin(Arc::new(coin));
coin.spawn_balance_stream_if_enabled(ctx).await?;

borngraced marked this conversation as resolved.
Show resolved Hide resolved
Ok(coin)
}

/// Displays the address in mixed-case checksum form
Expand Down
170 changes: 170 additions & 0 deletions mm2src/coins/eth/eth_balance_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use async_trait::async_trait;
use common::{executor::{AbortSettings, SpawnAbortable, Timer},
log, Future01CompatExt};
use futures::{channel::oneshot::{self, Receiver, Sender},
stream::FuturesUnordered,
StreamExt};
use instant::Instant;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_event_stream::{behaviour::{EventBehaviour, EventInitStatus},
Event, EventStreamConfiguration};
use mm2_number::BigDecimal;
use std::collections::HashMap;

use super::EthCoin;
use crate::{eth::{u256_to_big_decimal, Erc20TokenInfo},
BalanceError, MmCoin};

/// This implementation differs from others, as they immediately return
/// an error if any of the requests fails. This one completes all futures
/// and returns their results individually.
async fn get_all_balance_results_concurrently(
coin: &EthCoin,
) -> Vec<Result<(String, BigDecimal), (String, MmError<BalanceError>)>> {
let mut tokens = coin.get_erc_tokens_infos();

// Workaround for performance purposes.
//
// Unlike tokens, the platform coin length is constant (=1). Instead of creating a generic
// type and mapping the platform coin and the entire token list (which can grow at any time), we map
// the platform coin to Erc20TokenInfo so that we can use the token list right away without
// additional mapping.
tokens.insert(coin.ticker.clone(), Erc20TokenInfo {
token_address: coin.my_address,
decimals: coin.decimals,
});

let jobs = tokens
.into_iter()
.map(|(token_ticker, info)| async move { fetch_balance(coin, token_ticker, &info).await })
.collect::<FuturesUnordered<_>>();

jobs.collect().await
}

async fn fetch_balance(
coin: &EthCoin,
token_ticker: String,
info: &Erc20TokenInfo,
) -> Result<(String, BigDecimal), (String, MmError<BalanceError>)> {
let (balance_as_u256, decimals) = if token_ticker == coin.ticker {
(
coin.address_balance(coin.my_address)
.compat()
.await
.map_err(|e| (token_ticker.clone(), e))?,
coin.decimals,
)
} else {
(
coin.get_token_balance_by_address(info.token_address)
.await
.map_err(|e| (token_ticker.clone(), e))?,
info.decimals,
)
Comment on lines +52 to +65
Copy link
Collaborator

@shamardy shamardy Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR will be merged before #1979
This is just a reminder for me to implement balance streaming for all addresses in the HD wallet similar to how it's done for utxos.

};

let balance_as_big_decimal =
u256_to_big_decimal(balance_as_u256, decimals).map_err(|e| (token_ticker.clone(), e.into()))?;

Ok((token_ticker, balance_as_big_decimal))
}

#[async_trait]
impl EventBehaviour for EthCoin {
const EVENT_NAME: &'static str = "COIN_BALANCE";
const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR";

async fn handle(self, interval: f64, tx: oneshot::Sender<EventInitStatus>) {
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";

async fn with_socket(_coin: EthCoin, _ctx: MmArc) { todo!() }
Copy link
Member Author

@onur-ozkan onur-ozkan Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Socket support is blocked by KomodoPlatform/komodo-defi-proxy#18 implementation which shouldn't be a blocker for review/merge in my opinion (if we review/merge this PR at this stage, GUI devs can start implementing balance streaming for eth,cosmos,utxo protocols).


async fn with_polling(coin: EthCoin, ctx: MmArc, interval: f64) {
let mut cache: HashMap<String, BigDecimal> = HashMap::new();

loop {
let now = Instant::now();

let mut balance_updates = vec![];
for result in get_all_balance_results_concurrently(&coin).await {
match result {
Ok((ticker, balance)) => {
if Some(&balance) == cache.get(&ticker) {
continue;
}

balance_updates.push(json!({
"ticker": ticker,
"balance": { "spendable": balance, "unspendable": BigDecimal::default() }
laruh marked this conversation as resolved.
Show resolved Hide resolved
}));
cache.insert(ticker.to_owned(), balance);
},
Err((ticker, e)) => {
log::error!("Failed getting balance for '{ticker}' with {interval} interval. Error: {e}");
let e = serde_json::to_value(e).expect("Serialization should't fail.");
ctx.stream_channel_controller
.broadcast(Event::new(
format!("{}:{}", EthCoin::ERROR_EVENT_NAME, ticker),
e.to_string(),
))
.await;
},
};
}

if !balance_updates.is_empty() {
ctx.stream_channel_controller
.broadcast(Event::new(
EthCoin::EVENT_NAME.to_string(),
json!(balance_updates).to_string(),
))
.await;
}

// If the interval is x seconds, our goal is to broadcast changed balances every x seconds.
// To achieve this, we need to subtract the time complexity of each iteration.
// Given that an iteration already takes 80% of the interval, this will lead to inconsistency
// in the events.
let remaining_time = interval - now.elapsed().as_secs_f64();
// Not worth to make a call for less than `0.1` durations
if remaining_time >= 0.1 {
Timer::sleep(remaining_time).await;
}
}
}

let ctx = match MmArc::from_weak(&self.ctx) {
Some(ctx) => ctx,
None => {
let msg = "MM context must have been initialized already.";
tx.send(EventInitStatus::Failed(msg.to_owned()))
.expect(RECEIVER_DROPPED_MSG);
panic!("{}", msg);
},
};

tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG);

with_polling(self, ctx, interval).await
}

async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus {
if let Some(event) = config.get_event(Self::EVENT_NAME) {
log::info!("{} event is activated for {}", Self::EVENT_NAME, self.ticker,);

let (tx, rx): (Sender<EventInitStatus>, Receiver<EventInitStatus>) = oneshot::channel();
let fut = self.clone().handle(event.stream_interval_seconds, tx);
let settings =
AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker));
self.spawner().spawn_with_settings(fut, settings);

rx.await.unwrap_or_else(|e| {
EventInitStatus::Failed(format!("Event initialization status must be received: {}", e))
})
} else {
EventInitStatus::Inactive
}
}
}
15 changes: 12 additions & 3 deletions mm2src/coins/eth/v2_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum EthActivationV2Error {
#[display(fmt = "Error deserializing 'derivation_path': {}", _0)]
ErrorDeserializingDerivationPath(String),
PrivKeyPolicyNotAllowed(PrivKeyPolicyNotAllowed),
#[display(fmt = "Failed spawning balance events. Error: {_0}")]
FailedSpawningBalanceEvents(String),
#[cfg(target_arch = "wasm32")]
#[from_trait(WithMetamaskRpcError::metamask_rpc_error)]
#[display(fmt = "{}", _0)]
Expand Down Expand Up @@ -293,8 +295,10 @@ pub async fn eth_coin_from_conf_and_request_v2(

let sign_message_prefix: Option<String> = json::from_value(conf["sign_message_prefix"].clone()).ok();

let mut map = NONCE_LOCK.lock().unwrap();
let nonce_lock = map.entry(ticker.clone()).or_insert_with(new_nonce_lock).clone();
let nonce_lock = {
let mut map = NONCE_LOCK.lock().unwrap();
map.entry(ticker.clone()).or_insert_with(new_nonce_lock).clone()
};

// Create an abortable system linked to the `MmCtx` so if the app is stopped on `MmArc::stop`,
// all spawned futures related to `ETH` coin will be aborted as well.
Expand Down Expand Up @@ -325,7 +329,12 @@ pub async fn eth_coin_from_conf_and_request_v2(
abortable_system,
};

Ok(EthCoin(Arc::new(coin)))
let coin = EthCoin(Arc::new(coin));
coin.spawn_balance_stream_if_enabled(ctx)
.await
.map_err(EthActivationV2Error::FailedSpawningBalanceEvents)?;

Ok(coin)
}

/// Processes the given `priv_key_policy` and generates corresponding `KeyPair`.
Expand Down
3 changes: 2 additions & 1 deletion mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,8 @@ impl NumConversError {
pub fn description(&self) -> &str { &self.0 }
}

#[derive(Clone, Debug, Display, PartialEq, Serialize)]
#[derive(Clone, Debug, Display, PartialEq, Serialize, SerializeErrorType)]
#[serde(tag = "error_type", content = "error_data")]
pub enum BalanceError {
#[display(fmt = "Transport: {}", _0)]
Transport(String),
Expand Down
11 changes: 10 additions & 1 deletion mm2src/coins/tendermint/tendermint_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{tendermint::TendermintCommons, utxo::utxo_common::big_decimal_from_s
#[async_trait]
impl EventBehaviour for TendermintCoin {
const EVENT_NAME: &'static str = "COIN_BALANCE";
const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR";

async fn handle(self, _interval: f64, tx: oneshot::Sender<EventInitStatus>) {
fn generate_subscription_query(query_filter: String) -> String {
Expand Down Expand Up @@ -120,7 +121,15 @@ impl EventBehaviour for TendermintCoin {
let balance_denom = match self.account_balance_for_denom(&self.account_id, denom).await {
Ok(balance_denom) => balance_denom,
Err(e) => {
log::error!("{e}");
log::error!("Failed getting balance for '{ticker}'. Error: {e}");
onur-ozkan marked this conversation as resolved.
Show resolved Hide resolved
let e = serde_json::to_value(e).expect("Serialization should't fail.");
ctx.stream_channel_controller
.broadcast(Event::new(
format!("{}:{}", Self::ERROR_EVENT_NAME, ticker),
e.to_string(),
))
.await;

continue;
},
};
Expand Down
15 changes: 8 additions & 7 deletions mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,17 +283,18 @@ pub enum TendermintInitErrorKind {
BalanceStreamInitError(String),
}

#[derive(Display, Debug)]
#[derive(Display, Debug, Serialize, SerializeErrorType)]
#[serde(tag = "error_type", content = "error_data")]
pub enum TendermintCoinRpcError {
Prost(DecodeError),
Prost(String),
InvalidResponse(String),
PerformError(String),
RpcClientError(String),
InternalError(String),
}

impl From<DecodeError> for TendermintCoinRpcError {
fn from(err: DecodeError) -> Self { TendermintCoinRpcError::Prost(err) }
fn from(err: DecodeError) -> Self { TendermintCoinRpcError::Prost(err.to_string()) }
}

impl From<PrivKeyPolicyNotAllowed> for TendermintCoinRpcError {
Expand All @@ -308,7 +309,7 @@ impl From<TendermintCoinRpcError> for BalanceError {
fn from(err: TendermintCoinRpcError) -> Self {
match err {
TendermintCoinRpcError::InvalidResponse(e) => BalanceError::InvalidResponse(e),
TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e.to_string()),
TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e),
TendermintCoinRpcError::PerformError(e) => BalanceError::Transport(e),
TendermintCoinRpcError::RpcClientError(e) => BalanceError::Transport(e),
TendermintCoinRpcError::InternalError(e) => BalanceError::Internal(e),
Expand All @@ -320,7 +321,7 @@ impl From<TendermintCoinRpcError> for ValidatePaymentError {
fn from(err: TendermintCoinRpcError) -> Self {
match err {
TendermintCoinRpcError::InvalidResponse(e) => ValidatePaymentError::InvalidRpcResponse(e),
TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e.to_string()),
TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e),
TendermintCoinRpcError::PerformError(e) => ValidatePaymentError::Transport(e),
TendermintCoinRpcError::RpcClientError(e) => ValidatePaymentError::Transport(e),
TendermintCoinRpcError::InternalError(e) => ValidatePaymentError::InternalError(e),
Expand Down Expand Up @@ -1078,10 +1079,10 @@ impl TendermintCoin {

ethermint_account
.base_account
.or_mm_err(|| TendermintCoinRpcError::Prost(err))?
.or_mm_err(|| TendermintCoinRpcError::Prost(err.to_string()))?
},
Err(err) => {
return MmError::err(TendermintCoinRpcError::Prost(err));
return MmError::err(TendermintCoinRpcError::Prost(err.to_string()));
},
};

Expand Down
Loading
Loading