Skip to content

Commit

Permalink
fix(watchers): align taker fee validation retries with makers (#2263)
Browse files Browse the repository at this point in the history
This also fixes propagation of health check messages.
  • Loading branch information
shamardy authored Nov 19, 2024
1 parent 22719c0 commit 0a94102
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 42 deletions.
2 changes: 1 addition & 1 deletion mm2src/coins/utxo/utxo_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2085,7 +2085,7 @@ pub fn watcher_validate_taker_fee<T: UtxoCommonOps>(
if tx_confirmed_before_block {
return MmError::err(ValidatePaymentError::WrongPaymentTx(format!(
"{}: Fee tx {:?} confirmed before min_block {}",
EARLY_CONFIRMATION_ERR_LOG, taker_fee_tx, min_block_number
EARLY_CONFIRMATION_ERR_LOG, tx_from_rpc, min_block_number
)));
}

Expand Down
27 changes: 13 additions & 14 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use instant::{Duration, Instant};
use lazy_static::lazy_static;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_err_handle::prelude::*;
use mm2_libp2p::p2p_ctx::P2PContext;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix};
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::sync::Mutex;

use crate::lp_network::broadcast_p2p_msg;
use crate::lp_network::{broadcast_p2p_msg, P2PRequestError, P2PRequestResult};

pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck";

Expand Down Expand Up @@ -279,7 +280,10 @@ pub async fn peer_connection_healthcheck_rpc(
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) {
pub(crate) async fn process_p2p_healthcheck_message(
ctx: &MmArc,
message: mm2_libp2p::GossipsubMessage,
) -> P2PRequestResult<()> {
macro_rules! try_or_return {
($exp:expr, $msg: expr) => {
match $exp {
Expand All @@ -292,24 +296,17 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
};
}

let data = try_or_return!(
HealthcheckMessage::decode(&message.data),
"Couldn't decode healthcheck message"
);
let data = HealthcheckMessage::decode(&message.data)
.map_to_mm(|e| P2PRequestError::DecodeError(format!("Couldn't decode healthcheck message: {}", e)))?;
let sender_peer = data.is_received_message_valid().map_to_mm(|e| {
P2PRequestError::ValidationFailed(format!("Received an invalid healthcheck message. Error: {}", e))
})?;

let ctx = ctx.clone();

// Pass the remaining work to another thread to free up this one as soon as possible,
// so KDF can handle a high amount of healthcheck messages more efficiently.
ctx.spawner().spawn(async move {
let sender_peer = match data.is_received_message_valid() {
Ok(t) => t,
Err(e) => {
log::error!("Received an invalid healthcheck message. Error: {e}");
return;
},
};

if data.should_reply() {
// Reply the message so they know we are healthy.

Expand Down Expand Up @@ -337,6 +334,8 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
};
}
});

Ok(())
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand Down
20 changes: 15 additions & 5 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub enum P2PRequestError {
ResponseError(String),
#[display(fmt = "Expected 1 response, found {}", _0)]
ExpectedSingleResponseError(usize),
ValidationFailed(String),
}

/// Enum covering error cases that can happen during P2P message processing.
Expand Down Expand Up @@ -190,15 +191,16 @@ async fn process_p2p_message(
to_propagate = true;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Some(ticker) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, ticker).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
ctx.spawner().spawn(async {
if coin.is_utxo_in_native_mode() {
let fut = coin.send_raw_tx_bytes(&message.data);
ctx.spawner().spawn(async {
match fut.compat().await {
Ok(id) => log::debug!("Transaction broadcasted successfully: {:?} ", id),
// TODO (After https://github.com/KomodoPlatform/atomicDEX-API/pull/1433)
Expand All @@ -207,11 +209,19 @@ async fn process_p2p_message(
Err(e) => log::error!("Broadcast transaction failed (ignore this error if the transaction already sent by another seednode). {}", e),
};
})
}
}

to_propagate = true;
}
},
Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => {
lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await
if let Err(e) = lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await {
log::error!("{}", e);
return;
}

to_propagate = true;
},
None | Some(_) => (),
}
Expand Down
5 changes: 4 additions & 1 deletion mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ pub const TX_HELPER_PREFIX: TopicPrefix = "txhlp";
pub(crate) const LEGACY_SWAP_TYPE: u8 = 0;
pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1;
pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2;
const MAX_STARTED_AT_DIFF: u64 = 60;

pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6;
pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.;

const MAX_STARTED_AT_DIFF: u64 = 60;
const NEGOTIATE_SEND_INTERVAL: f64 = 30.;

/// If a certain P2P message is not received, swap will be aborted after this time expires.
Expand Down
7 changes: 4 additions & 3 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_e
wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount, MySwapInfo, NegotiationDataMsg,
NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction, SavedSwap, SavedSwapIo,
SavedTradeFee, SecretHashAlgo, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys, SwapTxDataMsg,
SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, WAIT_CONFIRM_INTERVAL_SEC};
SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, TAKER_FEE_VALIDATION_ATTEMPTS,
TAKER_FEE_VALIDATION_RETRY_DELAY_SECS, WAIT_CONFIRM_INTERVAL_SEC};
use crate::lp_dispatcher::{DispatcherContext, LpEvents};
use crate::lp_network::subscribe_to_topic;
use crate::lp_ordermatch::MakerOrderBuilder;
Expand Down Expand Up @@ -771,13 +772,13 @@ impl MakerSwap {
{
Ok(_) => break,
Err(err) => {
if attempts >= 6 {
if attempts >= TAKER_FEE_VALIDATION_ATTEMPTS {
return Ok((Some(MakerSwapCommand::Finish), vec![
MakerSwapEvent::TakerFeeValidateFailed(ERRL!("{}", err).into()),
]));
} else {
attempts += 1;
Timer::sleep(10.).await;
Timer::sleep(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS).await;
}
},
};
Expand Down
44 changes: 26 additions & 18 deletions mm2src/mm2_main/src/lp_swap/swap_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, taker_payment_spend_deadline, tx_helper_topic,
H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL_SEC};
H256Json, SwapsContext, TAKER_FEE_VALIDATION_ATTEMPTS, TAKER_FEE_VALIDATION_RETRY_DELAY_SECS,
WAIT_CONFIRM_INTERVAL_SEC};
use crate::lp_network::{P2PRequestError, P2PRequestResult};

use crate::MmError;
Expand Down Expand Up @@ -181,24 +182,31 @@ impl State for ValidateTakerFee {

async fn on_changed(self: Box<Self>, watcher_ctx: &mut WatcherStateMachine) -> StateResult<WatcherStateMachine> {
debug!("Watcher validate taker fee");
let validated_f = watcher_ctx
.taker_coin
.watcher_validate_taker_fee(WatcherValidateTakerFeeInput {
taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(),
sender_pubkey: watcher_ctx.verified_pub.clone(),
min_block_number: watcher_ctx.data.taker_coin_start_block,
fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(),
lock_duration: watcher_ctx.data.lock_duration,
})
.compat();

if let Err(err) = validated_f.await {
return Self::change_state(Stopped::from_reason(StopReason::Error(
WatcherError::InvalidTakerFee(format!("{:?}", err)).into(),
)));
};

Self::change_state(ValidateTakerPayment {})
let validation_result = retry_on_err!(async {
watcher_ctx
.taker_coin
.watcher_validate_taker_fee(WatcherValidateTakerFeeInput {
taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(),
sender_pubkey: watcher_ctx.verified_pub.clone(),
min_block_number: watcher_ctx.data.taker_coin_start_block,
fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(),
lock_duration: watcher_ctx.data.lock_duration,
})
.compat()
.await
})
.repeat_every_secs(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS)
.attempts(TAKER_FEE_VALIDATION_ATTEMPTS)
.inspect_err(|e| error!("Error validating taker fee: {}", e))
.await;

match validation_result {
Ok(_) => Self::change_state(ValidateTakerPayment {}),
Err(repeat_err) => Self::change_state(Stopped::from_reason(StopReason::Error(
WatcherError::InvalidTakerFee(repeat_err.to_string()).into(),
))),
}
}
}

Expand Down

0 comments on commit 0a94102

Please sign in to comment.