Skip to content

Commit

Permalink
Remove hanging MmArc pointers
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyboyko0791 committed Jun 16, 2021
1 parent 8bf6ed3 commit ed40dc1
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 24 deletions.
13 changes: 9 additions & 4 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
use async_trait::async_trait;
use bigdecimal::{BigDecimal, ParseBigDecimalError};
use common::executor::{spawn, Timer};
use common::mm_ctx::{from_ctx, MmArc};
use common::mm_ctx::{from_ctx, MmArc, MmWeak};
use common::mm_error::prelude::*;
use common::mm_metrics::MetricsWeak;
use common::mm_number::MmNumber;
Expand Down Expand Up @@ -1183,9 +1183,9 @@ pub async fn lp_coininit(ctx: &MmArc, ticker: &str, req: &Json) -> Result<MmCoin
if history {
try_s!(lp_spawn_tx_history(ctx.clone(), coin.clone()));
}
let ctxʹ = ctx.clone();
let ticker = ticker.to_owned();
spawn(async move { check_balance_update_loop(ctxʹ, ticker).await });
let ctx_weak = ctx.weak();
spawn(async move { check_balance_update_loop(ctx_weak, ticker).await });
Ok(coin)
}

Expand Down Expand Up @@ -1513,10 +1513,15 @@ pub async fn show_priv_key(ctx: MmArc, req: Json) -> Result<Response<Vec<u8>>, S
}

// TODO: Refactor this, it's actually not required to check balance and trade fee when there no orders using the coin
pub async fn check_balance_update_loop(ctx: MmArc, ticker: String) {
pub async fn check_balance_update_loop(ctx: MmWeak, ticker: String) {
let mut current_balance = None;
loop {
Timer::sleep(10.).await;
let ctx = match MmArc::from_weak(&ctx) {
Some(ctx) => ctx,
None => return,
};

match lp_coinfind(&ctx, &ticker).await {
Ok(Some(coin)) => {
let balance = match coin.my_spendable_balance().compat().await {
Expand Down
4 changes: 2 additions & 2 deletions mm2src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,14 @@ async fn init_p2p(mypubport: u16, ctx: MmArc) -> Result<(), String> {
let mut p2p_abort = Some(p2p_abort);
ctx.on_stop(Box::new(move || {
if let Some(handle) = p2p_abort.take() {
drop(handle);
handle.abort();
}
Ok(())
}));
try_s!(ctx.peer_id.pin(peer_id.to_string()));
let p2p_context = P2PContext::new(cmd_tx);
p2p_context.store_to_mm_arc(&ctx);
spawn(p2p_event_process_loop(ctx.clone(), event_rx, i_am_seed));
spawn(p2p_event_process_loop(ctx.weak(), event_rx, i_am_seed));

Ok(())
}
23 changes: 11 additions & 12 deletions mm2src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//
use common::executor::spawn;
use common::log;
use common::mm_ctx::MmArc;
use common::mm_ctx::{MmArc, MmWeak};
use common::mm_metrics::{ClockOps, MetricsOps};
use futures::{channel::oneshot, lock::Mutex as AsyncMutex, StreamExt};
use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse,
Expand Down Expand Up @@ -62,24 +62,23 @@ impl P2PContext {
}
}

pub async fn p2p_event_process_loop(ctx: MmArc, mut rx: AdexEventRx, i_am_relay: bool) {
while !ctx.is_stopping() {
match rx.next().await {
pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay: bool) {
loop {
let adex_event = rx.next().await;
let ctx = match MmArc::from_weak(&ctx) {
Some(ctx) => ctx,
None => return,
};
match adex_event {
Some(AdexBehaviourEvent::Message(peer_id, message_id, message)) => {
spawn(process_p2p_message(
ctx.clone(),
peer_id,
message_id,
message,
i_am_relay,
));
spawn(process_p2p_message(ctx, peer_id, message_id, message, i_am_relay));
},
Some(AdexBehaviourEvent::PeerRequest {
peer_id,
request,
response_channel,
}) => {
if let Err(e) = process_p2p_request(ctx.clone(), peer_id, request, response_channel).await {
if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel).await {
log::error!("Error on process P2P request: {:?}", e);
}
},
Expand Down
16 changes: 10 additions & 6 deletions mm2src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,34 +780,38 @@ async fn maker_order_cancelled_p2p_notify(ctx: MmArc, order: &MakerOrder) {
}

pub struct BalanceUpdateOrdermatchHandler {
ctx: MmArc,
ctx: MmWeak,
}

impl BalanceUpdateOrdermatchHandler {
pub fn new(ctx: MmArc) -> Self { BalanceUpdateOrdermatchHandler { ctx } }
pub fn new(ctx: MmArc) -> Self { BalanceUpdateOrdermatchHandler { ctx: ctx.weak() } }
}

#[async_trait]
impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler {
async fn balance_updated(&self, coin: &MmCoinEnum, new_balance: &BigDecimal) {
let ctx = match MmArc::from_weak(&self.ctx) {
Some(ctx) => ctx,
None => return,
};
// Get the max maker available volume to check if the wallet balances are sufficient for the issued maker orders.
// Note although the maker orders are issued already, but they are not matched yet, so pass the `OrderIssue` stage.
let new_volume = match calc_max_maker_vol(&self.ctx, coin, new_balance, FeeApproxStage::OrderIssue).await {
let new_volume = match calc_max_maker_vol(&ctx, coin, new_balance, FeeApproxStage::OrderIssue).await {
Ok(v) => v,
Err(e) if e.get_inner().not_sufficient_balance() => MmNumber::from(0),
Err(e) => {
log::warn!("Couldn't handle the 'balance_updated' event: {}", e);
return;
},
};
let ordermatch_ctx = OrdermatchContext::from_ctx(&self.ctx).unwrap();
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap();
let mut maker_orders = ordermatch_ctx.my_maker_orders.lock().await;
*maker_orders = maker_orders
.drain()
.filter_map(|(uuid, order)| {
if order.base == coin.ticker() {
if new_volume < order.min_base_vol {
let ctx = self.ctx.clone();
let ctx = ctx.clone();
delete_my_maker_order(&ctx, &order, MakerOrderCancellationReason::InsufficientBalance);
spawn(async move { maker_order_cancelled_p2p_notify(ctx, &order).await });
None
Expand All @@ -816,7 +820,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler {
update_msg.with_new_max_volume(new_volume.to_ratio());
let base = order.base.to_owned();
let rel = order.rel.to_owned();
let ctx = self.ctx.clone();
let ctx = ctx.clone();
spawn(async move { maker_order_updated_p2p_notify(ctx, &base, &rel, update_msg).await });
Some((uuid, order))
} else {
Expand Down

0 comments on commit ed40dc1

Please sign in to comment.