From 19b0de25999606902d15610073d75051391fb939 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Fri, 6 Dec 2024 07:52:30 +0100 Subject: [PATCH] store the swap aborthanle in `running_swaps` --- mm2src/mm2_main/src/lp_swap.rs | 12 ++++----- mm2src/mm2_main/src/lp_swap/maker_swap.rs | 26 ++++++++++++++------ mm2src/mm2_main/src/lp_swap/taker_swap.rs | 30 ++++++++++++++++------- 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 3b3f5d6b19a..5b073b8dac2 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -530,7 +530,7 @@ struct LockedAmountInfo { } struct SwapsContext { - running_swaps: Mutex>>, + running_swaps: Mutex, AbortOnDropHandle)>>, active_swaps_v2_infos: Mutex>, banned_pubkeys: Mutex>, swap_msgs: Mutex>, @@ -633,7 +633,7 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { let mut locked = swap_lock .iter() - .filter_map(|swap| swap.upgrade()) + .filter_map(|(swap, _)| swap.upgrade()) .flat_map(|swap| swap.locked_amount()) .fold(MmNumber::from(0), |mut total_amount, locked| { if locked.coin == coin { @@ -670,7 +670,7 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { pub fn running_swaps_num(ctx: &MmArc) -> u64 { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); let swaps = swap_ctx.running_swaps.lock().unwrap(); - swaps.iter().fold(0, |total, swap| match swap.upgrade() { + swaps.iter().fold(0, |total, (swap, _)| match swap.upgrade() { Some(_) => total + 1, None => total, }) @@ -683,7 +683,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str) swap_lock .iter() - .filter_map(|swap| swap.upgrade()) + .filter_map(|(swap, _)| swap.upgrade()) .filter(|swap| swap.uuid() != except_uuid) .flat_map(|swap| swap.locked_amount()) .fold(MmNumber::from(0), |mut total_amount, locked| { @@ -703,7 +703,7 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet) -> Result< let swap_ctx = try_s!(SwapsContext::from_ctx(ctx)); let swaps = try_s!(swap_ctx.running_swaps.lock()); let mut uuids = vec![]; - for swap in swaps.iter() { + for (swap, _) in swaps.iter() { if let Some(swap) = swap.upgrade() { if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) { uuids.push(*swap.uuid()) @@ -725,7 +725,7 @@ pub fn active_swaps(ctx: &MmArc) -> Result, String> { let swap_ctx = try_s!(SwapsContext::from_ctx(ctx)); let swaps = swap_ctx.running_swaps.lock().unwrap(); let mut uuids = vec![]; - for swap in swaps.iter() { + for (swap, _) in swaps.iter() { if let Some(swap) = swap.upgrade() { uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE)) } diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 276e4d6e672..5b4786f2d68 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -21,10 +21,13 @@ use coins::{CanRefundHtlc, CheckIfMyPaymentSentArgs, ConfirmPaymentInput, FeeApp MmCoinEnum, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr, RefundPaymentArgs, SearchForSwapTxSpendInput, SendPaymentArgs, SpendPaymentArgs, SwapTxTypeWithSecretHash, TradeFee, TradePreimageValue, TransactionEnum, ValidateFeeArgs, ValidatePaymentInput}; -use common::log::{debug, error, info, warn}; -use common::{bits256, executor::Timer, now_ms, now_sec, DEX_FEE_ADDR_RAW_PUBKEY}; +use common::log::{debug, error, info, warn, LogOnError}; +use common::{bits256, + executor::{spawn_abortable, Timer}, + now_ms, now_sec, DEX_FEE_ADDR_RAW_PUBKEY}; use crypto::privkey::SerializableSecp256k1Keypair; use crypto::CryptoCtx; +use futures::channel::oneshot; use futures::{compat::Future01CompatExt, select, FutureExt}; use keys::KeyPair; use mm2_core::mm_ctx::MmArc; @@ -2090,7 +2093,6 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { let weak_ref = Arc::downgrade(&running_swap); let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap(); swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker); - swap_ctx.running_swaps.lock().unwrap().push(weak_ref); let mut swap_fut = Box::pin( async move { let mut events; @@ -2150,10 +2152,20 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { } .fuse(), ); - select! { - _swap = swap_fut => (), // swap finished normally - _touch = touch_loop => unreachable!("Touch loop can not stop!"), - }; + // Run the swap in an abortable task and wait for it to finish. + let (notifier, notification) = oneshot::channel(); + let abortable_swap = spawn_abortable(async move { + select! { + _swap = swap_fut => (), // swap finished normally + _touch = touch_loop => unreachable!("Touch loop can not stop!"), + } + if notifier.send(()).is_err() { + error!("Swap listener stopped listening!"); + } + }); + swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap)); + // Halt this function until the swap has finished (or interrupted, i.e. aborted/panic). + notification.await.error_log_with_msg("Swap interrupted!"); } pub struct MakerSwapPreparedParams { diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index d7f50f5a2e9..8c8ee450195 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -22,10 +22,11 @@ use coins::{lp_coinfind, CanRefundHtlc, CheckIfMyPaymentSentArgs, ConfirmPayment FoundSwapTxSpend, MmCoin, MmCoinEnum, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr, RefundPaymentArgs, SearchForSwapTxSpendInput, SendPaymentArgs, SpendPaymentArgs, SwapTxTypeWithSecretHash, TradeFee, TradePreimageValue, TransactionEnum, ValidatePaymentInput, WaitForHTLCTxSpendArgs}; -use common::executor::Timer; -use common::log::{debug, error, info, warn}; +use common::executor::{spawn_abortable, Timer}; +use common::log::{debug, error, info, warn, LogOnError}; use common::{bits256, now_ms, now_sec, DEX_FEE_ADDR_RAW_PUBKEY}; use crypto::{privkey::SerializableSecp256k1Keypair, CryptoCtx}; +use futures::channel::oneshot; use futures::{compat::Future01CompatExt, future::try_join, select, FutureExt}; use http::Response; use keys::KeyPair; @@ -465,8 +466,6 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { let weak_ref = Arc::downgrade(&running_swap); let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap(); swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker); - swap_ctx.running_swaps.lock().unwrap().push(weak_ref); - let mut swap_fut = Box::pin( async move { let mut events; @@ -519,10 +518,20 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { } .fuse(), ); - select! { - _swap = swap_fut => (), // swap finished normally - _touch = touch_loop => unreachable!("Touch loop can not stop!"), - }; + // Run the swap in an abortable task and wait for it to finish. + let (notifier, notification) = oneshot::channel(); + let abortable_swap = spawn_abortable(async move { + select! { + _swap = swap_fut => (), // swap finished normally + _touch = touch_loop => unreachable!("Touch loop can not stop!"), + } + if notifier.send(()).is_err() { + error!("Swap listener stopped listening!"); + } + }); + swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap)); + // Halt this function until the swap has finished (or interrupted, i.e. aborted/panic). + notification.await.error_log_with_msg("Swap interrupted!"); } #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] @@ -2680,6 +2689,7 @@ mod taker_swap_tests { use coins::eth::{addr_from_str, signed_eth_tx_from_bytes, SignedEthTx}; use coins::utxo::UtxoTx; use coins::{FoundSwapTxSpend, MarketCoinOps, MmCoin, SwapOps, TestCoin}; + use common::executor::spawn_abortable; use common::{block_on, new_uuid}; use mm2_test_helpers::for_tests::{mm_ctx_with_iguana, ETH_SEPOLIA_SWAP_CONTRACT}; use mocktopus::mocking::*; @@ -3208,7 +3218,9 @@ mod taker_swap_tests { let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap(); let arc = Arc::new(swap); let weak_ref = Arc::downgrade(&arc); - swaps_ctx.running_swaps.lock().unwrap().push(weak_ref); + // Create a dummy abort handle as if it was a running swap. + let abortable_swap = spawn_abortable(async move {}); + swaps_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap)); let actual = get_locked_amount(&ctx, "RICK"); assert_eq!(actual, MmNumber::from(0));