Skip to content

Commit

Permalink
store the swap aborthanle in running_swaps
Browse files Browse the repository at this point in the history
  • Loading branch information
mariocynicys committed Dec 6, 2024
1 parent 5c1bb6f commit 19b0de2
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 22 deletions.
12 changes: 6 additions & 6 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ struct LockedAmountInfo {
}

struct SwapsContext {
running_swaps: Mutex<Vec<Weak<dyn AtomicSwap>>>,
running_swaps: Mutex<Vec<(Weak<dyn AtomicSwap>, AbortOnDropHandle)>>,
active_swaps_v2_infos: Mutex<HashMap<Uuid, ActiveSwapV2Info>>,
banned_pubkeys: Mutex<HashMap<H256Json, BanReason>>,
swap_msgs: Mutex<HashMap<Uuid, SwapMsgStore>>,
Expand Down Expand Up @@ -633,7 +633,7 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {

let mut locked = swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.filter_map(|(swap, _)| swap.upgrade())
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
Expand Down Expand Up @@ -670,7 +670,7 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
pub fn running_swaps_num(ctx: &MmArc) -> u64 {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swaps = swap_ctx.running_swaps.lock().unwrap();
swaps.iter().fold(0, |total, swap| match swap.upgrade() {
swaps.iter().fold(0, |total, (swap, _)| match swap.upgrade() {
Some(_) => total + 1,
None => total,
})
Expand All @@ -683,7 +683,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str)

swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.filter_map(|(swap, _)| swap.upgrade())
.filter(|swap| swap.uuid() != except_uuid)
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
Expand All @@ -703,7 +703,7 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = try_s!(swap_ctx.running_swaps.lock());
let mut uuids = vec![];
for swap in swaps.iter() {
for (swap, _) in swaps.iter() {
if let Some(swap) = swap.upgrade() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
Expand All @@ -725,7 +725,7 @@ pub fn active_swaps(ctx: &MmArc) -> Result<Vec<(Uuid, u8)>, String> {
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = swap_ctx.running_swaps.lock().unwrap();
let mut uuids = vec![];
for swap in swaps.iter() {
for (swap, _) in swaps.iter() {
if let Some(swap) = swap.upgrade() {
uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE))
}
Expand Down
26 changes: 19 additions & 7 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ use coins::{CanRefundHtlc, CheckIfMyPaymentSentArgs, ConfirmPaymentInput, FeeApp
MmCoinEnum, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr, RefundPaymentArgs,
SearchForSwapTxSpendInput, SendPaymentArgs, SpendPaymentArgs, SwapTxTypeWithSecretHash, TradeFee,
TradePreimageValue, TransactionEnum, ValidateFeeArgs, ValidatePaymentInput};
use common::log::{debug, error, info, warn};
use common::{bits256, executor::Timer, now_ms, now_sec, DEX_FEE_ADDR_RAW_PUBKEY};
use common::log::{debug, error, info, warn, LogOnError};
use common::{bits256,
executor::{spawn_abortable, Timer},
now_ms, now_sec, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::privkey::SerializableSecp256k1Keypair;
use crypto::CryptoCtx;
use futures::channel::oneshot;
use futures::{compat::Future01CompatExt, select, FutureExt};
use keys::KeyPair;
use mm2_core::mm_ctx::MmArc;
Expand Down Expand Up @@ -2090,7 +2093,6 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand Down Expand Up @@ -2150,10 +2152,20 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
}
.fuse(),
);
select! {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Run the swap in an abortable task and wait for it to finish.
let (notifier, notification) = oneshot::channel();
let abortable_swap = spawn_abortable(async move {
select! {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
}
if notifier.send(()).is_err() {
error!("Swap listener stopped listening!");
}
});
swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
// Halt this function until the swap has finished (or interrupted, i.e. aborted/panic).
notification.await.error_log_with_msg("Swap interrupted!");
}

pub struct MakerSwapPreparedParams {
Expand Down
30 changes: 21 additions & 9 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ use coins::{lp_coinfind, CanRefundHtlc, CheckIfMyPaymentSentArgs, ConfirmPayment
FoundSwapTxSpend, MmCoin, MmCoinEnum, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr,
RefundPaymentArgs, SearchForSwapTxSpendInput, SendPaymentArgs, SpendPaymentArgs, SwapTxTypeWithSecretHash,
TradeFee, TradePreimageValue, TransactionEnum, ValidatePaymentInput, WaitForHTLCTxSpendArgs};
use common::executor::Timer;
use common::log::{debug, error, info, warn};
use common::executor::{spawn_abortable, Timer};
use common::log::{debug, error, info, warn, LogOnError};
use common::{bits256, now_ms, now_sec, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::{privkey::SerializableSecp256k1Keypair, CryptoCtx};
use futures::channel::oneshot;
use futures::{compat::Future01CompatExt, future::try_join, select, FutureExt};
use http::Response;
use keys::KeyPair;
Expand Down Expand Up @@ -465,8 +466,6 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);

let mut swap_fut = Box::pin(
async move {
let mut events;
Expand Down Expand Up @@ -519,10 +518,20 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
}
.fuse(),
);
select! {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Run the swap in an abortable task and wait for it to finish.
let (notifier, notification) = oneshot::channel();
let abortable_swap = spawn_abortable(async move {
select! {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
}
if notifier.send(()).is_err() {
error!("Swap listener stopped listening!");
}
});
swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
// Halt this function until the swap has finished (or interrupted, i.e. aborted/panic).
notification.await.error_log_with_msg("Swap interrupted!");
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -2680,6 +2689,7 @@ mod taker_swap_tests {
use coins::eth::{addr_from_str, signed_eth_tx_from_bytes, SignedEthTx};
use coins::utxo::UtxoTx;
use coins::{FoundSwapTxSpend, MarketCoinOps, MmCoin, SwapOps, TestCoin};
use common::executor::spawn_abortable;
use common::{block_on, new_uuid};
use mm2_test_helpers::for_tests::{mm_ctx_with_iguana, ETH_SEPOLIA_SWAP_CONTRACT};
use mocktopus::mocking::*;
Expand Down Expand Up @@ -3208,7 +3218,9 @@ mod taker_swap_tests {
let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap();
let arc = Arc::new(swap);
let weak_ref = Arc::downgrade(&arc);
swaps_ctx.running_swaps.lock().unwrap().push(weak_ref);
// Create a dummy abort handle as if it was a running swap.
let abortable_swap = spawn_abortable(async move {});
swaps_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));

let actual = get_locked_amount(&ctx, "RICK");
assert_eq!(actual, MmNumber::from(0));
Expand Down

0 comments on commit 19b0de2

Please sign in to comment.